From 4440b5e6fcd7eff2d5b52b1a9ec5df3222b701b4 Mon Sep 17 00:00:00 2001 From: jinhelin Date: Wed, 27 Mar 2024 21:48:48 +0800 Subject: [PATCH] Storages: log version of records if enabled (#8874) ref pingcap/tiflash#8864 --- dbms/src/Interpreters/Settings.h | 1 + dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp | 11 +++++++++++ dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp | 10 ++++++---- .../DeltaMerge/tests/gtest_segment_read_task_pool.cpp | 11 +++++++---- 4 files changed, 25 insertions(+), 8 deletions(-) diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 830aacd31da..e507943718e 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -234,6 +234,7 @@ struct Settings M(SettingDouble, dt_prepare_stream_concurrency_scale, 2.0, "Concurrency of preparing streams of one query equals to num_streams * dt_prepare_stream_concurrency_scale.") \ M(SettingBool, dt_enable_delta_index_error_fallback, true, "Whether fallback to an empty delta index if a delta index error is detected") \ M(SettingBool, dt_enable_fetch_memtableset, true, "Whether fetching delta cache in FetchDisaggPages") \ + M(SettingBool, dt_log_record_version, false, "Whether log the version of records when write them to storage") \ M(SettingUInt64, init_thread_count_scale, 100, "Number of thread = number of logical cpu cores * init_thread_count_scale. It just works for thread pool for initStores and loadMetadata") \ M(SettingDouble, cpu_thread_count_scale, 1.0, "Number of thread of computation-intensive thread pool = number of logical cpu cores * cpu_thread_count_scale. Only takes effects at server startup.") \ \ diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index fdc73af0f8d..e05aa1d77bc 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -545,6 +545,17 @@ DM::WriteResult DeltaMergeStore::write(const Context & db_context, const DB::Set auto dm_context = newDMContext(db_context, db_settings, "write"); + if (db_context.getSettingsRef().dt_log_record_version) + { + const auto & ver_col = block.getByName(VERSION_COLUMN_NAME).column; + const auto * ver = toColumnVectorDataPtr(ver_col); + std::unordered_set dedup_ver; + for (auto v : *ver) + { + dedup_ver.insert(v); + } + LOG_DEBUG(log, "Record count: {}, Versions: {}", block.rows(), dedup_ver); + } const auto bytes = block.bytes(); { diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp index 71febf0cb55..a272f4442a4 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp @@ -740,21 +740,23 @@ String SegmentReadTask::toString() const if (dm_context->keyspace_id == DB::NullspaceID) { return fmt::format( - "s{}_t{}_{}_{}_{}", + "s{}_t{}_{}_{}_{}_{}", store_id, dm_context->physical_table_id, segment->segmentId(), segment->segmentEpoch(), - read_snapshot->delta->getDeltaIndexEpoch()); + read_snapshot->delta->getDeltaIndexEpoch(), + read_snapshot->getRows()); } return fmt::format( - "s{}_ks{}_t{}_{}_{}_{}", + "s{}_ks{}_t{}_{}_{}_{}_{}", store_id, dm_context->keyspace_id, dm_context->physical_table_id, segment->segmentId(), segment->segmentEpoch(), - read_snapshot->delta->getDeltaIndexEpoch()); + read_snapshot->delta->getDeltaIndexEpoch(), + read_snapshot->getRows()); } GlobalSegmentID SegmentReadTask::getGlobalSegmentID() const diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp index e380b1ec933..9057c310c29 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp @@ -31,11 +31,14 @@ class SegmentReadTasksPoolTest : public SegmentTestBasic return std::make_shared(Logger::get(), 0, RowKeyRange{}, seg_id, seg_id + 1, nullptr, nullptr); } - static SegmentSnapshotPtr createSegmentSnapshot() + SegmentSnapshotPtr createSegmentSnapshot() { - auto delta_snap = std::make_shared(CurrentMetrics::Metric{}); - delta_snap->delta = std::make_shared(nullptr); - return std::make_shared(std::move(delta_snap), /*stable*/ nullptr, Logger::get()); + auto delta = std::make_shared(1); + auto delta_snap = delta->createSnapshot(*createDMContext(), false, CurrentMetrics::Metric{}); + + auto stable = std::make_shared(1); + auto stable_snap = stable->createSnapshot(); + return std::make_shared(std::move(delta_snap), std::move(stable_snap), Logger::get()); } SegmentReadTaskPtr createSegmentReadTask(PageIdU64 seg_id)