From b4b934c9e4becdbf3d326f1f85be5c140429c8e0 Mon Sep 17 00:00:00 2001 From: jinhelin Date: Mon, 18 Mar 2024 12:39:42 +0800 Subject: [PATCH] Storages: Add statistical data of TableScanning in ScanContext (#8842) ref pingcap/tiflash#8675 --- contrib/tipb | 2 +- .../Coprocessor/DAGStorageInterpreter.cpp | 4 +- .../gtest_ti_remote_block_inputstream.cpp | 14 +- dbms/src/Flash/Statistics/TableScanImpl.cpp | 9 + dbms/src/Server/Server.cpp | 2 + .../DeltaMerge/ColumnFile/ColumnFile.h | 4 + .../DeltaMerge/ColumnFile/ColumnFileBig.cpp | 8 + .../DeltaMerge/ColumnFile/ColumnFileBig.h | 4 + .../ColumnFile/ColumnFileSetReader.cpp | 9 +- .../ColumnFile/ColumnFileSetReader.h | 10 +- .../DeltaMerge/Delta/DeltaValueSpace.h | 17 +- .../Storages/DeltaMerge/Delta/Snapshot.cpp | 15 +- .../File/DMFileBlockInputStream.cpp | 3 +- .../DeltaMerge/File/DMFileBlockInputStream.h | 7 + .../Storages/DeltaMerge/File/DMFileReader.cpp | 56 ++++- .../Storages/DeltaMerge/File/DMFileReader.h | 7 +- dbms/src/Storages/DeltaMerge/ReadMode.h | 7 + dbms/src/Storages/DeltaMerge/ScanContext.cpp | 179 ++++++++++++++- dbms/src/Storages/DeltaMerge/ScanContext.h | 217 +++++++++++++----- dbms/src/Storages/DeltaMerge/Segment.cpp | 121 +++++++--- dbms/src/Storages/DeltaMerge/Segment.h | 11 +- .../Storages/DeltaMerge/StableValueSpace.cpp | 4 +- .../Storages/DeltaMerge/StableValueSpace.h | 3 + .../tests/gtest_dm_delta_merge_store.cpp | 12 +- .../tests/gtest_dm_delta_value_space.cpp | 46 +++- .../gtest_skippable_block_input_stream.cpp | 4 +- dbms/src/Storages/StorageDisaggregated.cpp | 6 +- dbms/src/Storages/StorageDisaggregated.h | 2 +- .../Storages/StorageDisaggregatedRemote.cpp | 4 +- 29 files changed, 625 insertions(+), 162 deletions(-) diff --git a/contrib/tipb b/contrib/tipb index dfd7d194838..55a7867ddd5 160000 --- a/contrib/tipb +++ b/contrib/tipb @@ -1 +1 @@ -Subproject commit dfd7d194838f13b620635380ac6ae6b8b360171a +Subproject commit 55a7867ddd50b46322607f5f2a71f12035a5d4a9 diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 7c68765a0e9..139295c04e7 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -1184,7 +1184,7 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max size_t total_local_region_num = mvcc_query_info->regions_query_info.size(); if (total_local_region_num == 0) return; - mvcc_query_info->scan_context->total_local_region_num = total_local_region_num; + mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num); const auto table_query_infos = generateSelectQueryInfos(); bool has_multiple_partitions = table_query_infos.size() > 1; // MultiPartitionStreamPool will be disabled in no partition mode or single-partition case @@ -1250,7 +1250,7 @@ void DAGStorageInterpreter::buildLocalExec( size_t total_local_region_num = mvcc_query_info->regions_query_info.size(); if (total_local_region_num == 0) return; - mvcc_query_info->scan_context->total_local_region_num = total_local_region_num; + mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num); const auto table_query_infos = generateSelectQueryInfos(); bool has_multiple_partitions = table_query_infos.size() > 1; ConcatBuilderPool builder_pool{max_streams}; diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp index 6139c449755..3db949cac85 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp @@ -53,8 +53,8 @@ bool equalSummaries(const ExecutionSummary & left, const ExecutionSummary & righ (left.num_iterations == right.num_iterations) && // (left.num_produced_rows == right.num_produced_rows) && // (left.time_processed_ns == right.time_processed_ns) && // - (left.scan_context->total_dmfile_scanned_rows == right.scan_context->total_dmfile_scanned_rows) && // - (left.scan_context->total_dmfile_skipped_rows == right.scan_context->total_dmfile_skipped_rows); + (left.scan_context->dmfile_data_scanned_rows == right.scan_context->dmfile_data_scanned_rows) && // + (left.scan_context->dmfile_data_skipped_rows == right.scan_context->dmfile_data_skipped_rows); } struct MockWriter @@ -73,10 +73,12 @@ struct MockWriter summary.concurrency = 1; summary.scan_context = std::make_unique(); - summary.scan_context->total_dmfile_scanned_packs = 1; - summary.scan_context->total_dmfile_skipped_packs = 2; - summary.scan_context->total_dmfile_scanned_rows = 8000; - summary.scan_context->total_dmfile_skipped_rows = 15000; + summary.scan_context->dmfile_data_scanned_rows = 8000; + summary.scan_context->dmfile_data_skipped_rows = 15000; + summary.scan_context->dmfile_mvcc_scanned_rows = 8000; + summary.scan_context->dmfile_mvcc_skipped_rows = 15000; + summary.scan_context->dmfile_lm_filter_scanned_rows = 8000; + summary.scan_context->dmfile_lm_filter_skipped_rows = 15000; summary.scan_context->total_dmfile_rough_set_index_check_time_ns = 10; summary.scan_context->total_dmfile_read_time_ns = 200; summary.scan_context->create_snapshot_time_ns = 5; diff --git a/dbms/src/Flash/Statistics/TableScanImpl.cpp b/dbms/src/Flash/Statistics/TableScanImpl.cpp index 7316164a686..5a601e8ef0a 100644 --- a/dbms/src/Flash/Statistics/TableScanImpl.cpp +++ b/dbms/src/Flash/Statistics/TableScanImpl.cpp @@ -114,6 +114,15 @@ void TableScanStatistics::collectExtraRuntimeDetail() }); break; } + + if (auto it = dag_context.scan_context_map.find(executor_id); it != dag_context.scan_context_map.end()) + { + it->second->setStreamCost( + std::max(local_table_scan_detail.min_stream_cost_ns, 0.0), + std::max(local_table_scan_detail.max_stream_cost_ns, 0.0), + std::max(remote_table_scan_detail.min_stream_cost_ns, 0.0), + std::max(remote_table_scan_detail.max_stream_cost_ns, 0.0)); + } } TableScanStatistics::TableScanStatistics(const tipb::Executor * executor, DAGContext & dag_context_) diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index e900bac2000..d2025f860c9 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -951,6 +951,8 @@ int Server::main(const std::vector & /*args*/) TiFlashErrorRegistry::instance(); // This invocation is for initializing + DM::ScanContext::initCurrentInstanceId(config(), log); + const auto disaggregated_mode = getDisaggregatedMode(config()); const auto use_autoscaler = useAutoScaler(config()); const bool use_autoscaler_without_s3 = useAutoScalerWithoutS3(config()); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h index 5f70867584f..41d904dc640 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h @@ -19,6 +19,8 @@ #include #include #include +#include +#include #include #include #include @@ -178,6 +180,8 @@ class ColumnFileReader /// Create a new reader from current reader with different columns to read. virtual ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & col_defs) = 0; + + virtual void setReadTag(ReadTag /*read_tag*/) {} }; std::pair copyColumnsData( diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index 9278bfe6c6d..ad16bf27e4d 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -175,6 +175,7 @@ void ColumnFileBigReader::initStream() DMFileBlockInputStreamBuilder builder(context.db_context); file_stream = builder.setTracingID(context.tracing_id) + .setReadTag(read_tag) .build(column_file.getFile(), *col_defs, RowKeyRanges{column_file.segment_range}, context.scan_context); header = file_stream->getHeader(); @@ -386,5 +387,12 @@ ColumnFileReaderPtr ColumnFileBigReader::createNewReader(const ColumnDefinesPtr return std::make_shared(context, column_file, new_col_defs); } +void ColumnFileBigReader::setReadTag(ReadTag read_tag_) +{ + // `read_tag` should be set before `file_stream` is initialized. + RUNTIME_CHECK(file_stream == nullptr); + read_tag = read_tag_; +} + } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h index be45911ae8e..c29b17ed689 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h @@ -137,6 +137,8 @@ class ColumnFileBigReader : public ColumnFileReader Block cur_block; Columns cur_block_data; // The references to columns in cur_block, for faster access. + ReadTag read_tag; + private: void initStream(); std::pair readRowsRepeatedly( @@ -190,6 +192,8 @@ class ColumnFileBigReader : public ColumnFileReader size_t skipNextBlock() override; ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs) override; + + void setReadTag(ReadTag read_tag_) override; }; } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp index 886ce232740..fc9315b36cd 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp @@ -84,7 +84,8 @@ ColumnFileSetReader::ColumnFileSetReader( const DMContext & context_, const ColumnFileSetSnapshotPtr & snapshot_, const ColumnDefinesPtr & col_defs_, - const RowKeyRange & segment_range_) + const RowKeyRange & segment_range_, + ReadTag read_tag_) : context(context_) , snapshot(snapshot_) , col_defs(col_defs_) @@ -98,10 +99,11 @@ ColumnFileSetReader::ColumnFileSetReader( column_file_rows.push_back(f->getRows()); column_file_rows_end.push_back(total_rows); column_file_readers.push_back(f->getReader(context, snapshot->getDataProvider(), col_defs)); + column_file_readers.back()->setReadTag(read_tag_); } } -ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesPtr & new_col_defs) +ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag) { auto * new_reader = new ColumnFileSetReader(context); new_reader->snapshot = snapshot; @@ -111,7 +113,10 @@ ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesP new_reader->column_file_rows_end = column_file_rows_end; for (auto & fr : column_file_readers) + { new_reader->column_file_readers.push_back(fr->createNewReader(new_col_defs)); + new_reader->column_file_readers.back()->setReadTag(read_tag); + } return std::shared_ptr(new_reader); } diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h index 551e4d312b8..ce18da5c407 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h @@ -57,11 +57,12 @@ class ColumnFileSetReader const DMContext & context_, const ColumnFileSetSnapshotPtr & snapshot_, const ColumnDefinesPtr & col_defs_, - const RowKeyRange & segment_range_); + const RowKeyRange & segment_range_, + ReadTag read_tag_); // If we need to read columns besides pk and version, a ColumnFileSetReader can NOT be used more than once. // This method create a new reader based on the current one. It will reuse some caches in the current reader. - ColumnFileSetReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs); + ColumnFileSetReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag); // Use for DeltaMergeBlockInputStream to read rows from MemTableSet to do full compaction with other layer. // This method will check whether offset and limit are valid. It only return those valid rows. @@ -104,8 +105,9 @@ class ColumnFileSetInputStream : public SkippableBlockInputStream const DMContext & context_, const ColumnFileSetSnapshotPtr & delta_snap_, const ColumnDefinesPtr & col_defs_, - const RowKeyRange & segment_range_) - : reader(context_, delta_snap_, col_defs_, segment_range_) + const RowKeyRange & segment_range_, + ReadTag read_tag_) + : reader(context_, delta_snap_, col_defs_, segment_range_, read_tag_) , column_files(reader.snapshot->getColumnFiles()) , column_files_count(column_files.size()) {} diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h index 1d229f975d1..c56d588b613 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h @@ -427,11 +427,12 @@ class DeltaValueReader const DMContext & context_, const DeltaSnapshotPtr & delta_snap_, const ColumnDefinesPtr & col_defs_, - const RowKeyRange & segment_range_); + const RowKeyRange & segment_range_, + ReadTag read_tag_); // If we need to read columns besides pk and version, a DeltaValueReader can NOT be used more than once. // This method create a new reader based on then current one. It will reuse some caches in the current reader. - DeltaValueReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs); + DeltaValueReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag); void setDeltaIndex(const DeltaIndexCompactedPtr & delta_index_) { compacted_delta_index = delta_index_; } @@ -473,9 +474,15 @@ class DeltaValueInputStream : public SkippableBlockInputStream const DMContext & context_, const DeltaSnapshotPtr & delta_snap_, const ColumnDefinesPtr & col_defs_, - const RowKeyRange & segment_range_) - : mem_table_input_stream(context_, delta_snap_->getMemTableSetSnapshot(), col_defs_, segment_range_) - , persisted_files_input_stream(context_, delta_snap_->getPersistedFileSetSnapshot(), col_defs_, segment_range_) + const RowKeyRange & segment_range_, + ReadTag read_tag_) + : mem_table_input_stream(context_, delta_snap_->getMemTableSetSnapshot(), col_defs_, segment_range_, read_tag_) + , persisted_files_input_stream( + context_, + delta_snap_->getPersistedFileSetSnapshot(), + col_defs_, + segment_range_, + read_tag_) {} String getName() const override { return "DeltaValue"; } diff --git a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp index a6b7151aa39..7ab504a9d55 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp @@ -71,29 +71,32 @@ DeltaValueReader::DeltaValueReader( const DMContext & context, const DeltaSnapshotPtr & delta_snap_, const ColumnDefinesPtr & col_defs_, - const RowKeyRange & segment_range_) + const RowKeyRange & segment_range_, + ReadTag read_tag_) : delta_snap(delta_snap_) , mem_table_reader(std::make_shared( context, delta_snap_->getMemTableSetSnapshot(), col_defs_, - segment_range_)) + segment_range_, + read_tag_)) , persisted_files_reader(std::make_shared( context, delta_snap_->getPersistedFileSetSnapshot(), col_defs_, - segment_range_)) + segment_range_, + read_tag_)) , col_defs(col_defs_) , segment_range(segment_range_) {} -DeltaValueReaderPtr DeltaValueReader::createNewReader(const ColumnDefinesPtr & new_col_defs) +DeltaValueReaderPtr DeltaValueReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag) { auto * new_reader = new DeltaValueReader(); new_reader->delta_snap = delta_snap; new_reader->compacted_delta_index = compacted_delta_index; - new_reader->persisted_files_reader = persisted_files_reader->createNewReader(new_col_defs); - new_reader->mem_table_reader = mem_table_reader->createNewReader(new_col_defs); + new_reader->persisted_files_reader = persisted_files_reader->createNewReader(new_col_defs, read_tag); + new_reader->mem_table_reader = mem_table_reader->createNewReader(new_col_defs, read_tag); new_reader->col_defs = new_col_defs; new_reader->segment_range = segment_range; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp index 768e4302601..ccf94a6b3a6 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -99,7 +99,8 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build( read_one_pack_every_time, tracing_id, max_sharing_column_count, - scan_context); + scan_context, + read_tag); return std::make_shared(std::move(reader), max_sharing_column_count > 0); } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h index daf10bcbc73..72a79fb5219 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h @@ -151,6 +151,12 @@ class DMFileBlockInputStreamBuilder return *this; } + DMFileBlockInputStreamBuilder & setReadTag(ReadTag read_tag_) + { + read_tag = read_tag_; + return *this; + } + private: // These methods are called by the ctor @@ -198,6 +204,7 @@ class DMFileBlockInputStreamBuilder size_t max_sharing_column_bytes_for_all = 0; size_t max_sharing_column_count = 0; String tracing_id; + ReadTag read_tag = ReadTag::Internal; }; /** diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 5a08f69a870..32413e5c164 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -49,7 +49,9 @@ namespace FailPoints { extern const char skip_seek_before_read_dmfile[]; } // namespace FailPoints -namespace DM +} + +namespace DB::DM { DMFileReader::Stream::Stream( DMFileReader & reader, @@ -292,7 +294,8 @@ DMFileReader::DMFileReader( bool read_one_pack_every_time_, const String & tracing_id_, size_t max_sharing_column_count, - const ScanContextPtr & scan_context_) + const ScanContextPtr & scan_context_, + const ReadTag read_tag_) : dmfile(dmfile_) , read_columns(read_columns_) , is_common_handle(is_common_handle_) @@ -307,6 +310,7 @@ DMFileReader::DMFileReader( , enable_column_cache(enable_column_cache_ && column_cache_) , column_cache(column_cache_) , scan_context(scan_context_) + , read_tag(read_tag_) , rows_threshold_per_read(rows_threshold_per_read_) , file_provider(file_provider_) , log(Logger::get(tracing_id_)) @@ -357,8 +361,7 @@ bool DMFileReader::getSkippedRows(size_t & skip_rows) for (; next_pack_id < use_packs.size() && !use_packs[next_pack_id]; ++next_pack_id) { skip_rows += pack_stats[next_pack_id].rows; - scan_context->total_dmfile_skipped_packs += 1; - scan_context->total_dmfile_skipped_rows += pack_stats[next_pack_id].rows; + addSkippedRows(pack_stats[next_pack_id].rows); } next_row_offset += skip_rows; return next_pack_id < use_packs.size(); @@ -391,10 +394,9 @@ size_t DMFileReader::skipNextBlock() break; read_rows += pack_stats[next_pack_id].rows; - scan_context->total_dmfile_skipped_packs += 1; } - scan_context->total_dmfile_skipped_rows += read_rows; + addSkippedRows(read_rows); next_row_offset += read_rows; // When we read dmfile, if the previous pack is not read, @@ -583,8 +585,7 @@ Block DMFileReader::read() size_t read_packs = next_pack_id - start_pack_id; - scan_context->total_dmfile_scanned_packs += read_packs; - scan_context->total_dmfile_scanned_rows += read_rows; + addScannedRows(read_rows); // TODO: this will need better algorithm: we should separate those packs which can and can not do clean read. bool do_clean_read_on_normal_mode @@ -870,5 +871,40 @@ bool DMFileReader::getCachedPacks( col_data_cache->del(col_id, next_pack_id); return found; } -} // namespace DM -} // namespace DB + +void DMFileReader::addScannedRows(UInt64 rows) +{ + switch (read_tag) + { + case ReadTag::Query: + scan_context->dmfile_data_scanned_rows += rows; + break; + case ReadTag::MVCC: + scan_context->dmfile_mvcc_scanned_rows += rows; + break; + case ReadTag::LMFilter: + scan_context->dmfile_lm_filter_scanned_rows += rows; + break; + default: + break; + } +} + +void DMFileReader::addSkippedRows(UInt64 rows) +{ + switch (read_tag) + { + case ReadTag::Query: + scan_context->dmfile_data_skipped_rows += rows; + break; + case ReadTag::MVCC: + scan_context->dmfile_mvcc_skipped_rows += rows; + break; + case ReadTag::LMFilter: + scan_context->dmfile_lm_filter_skipped_rows += rows; + break; + default: + break; + } +} +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index a43f2abde79..9534fbc1910 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -91,7 +91,8 @@ class DMFileReader bool read_one_pack_every_time_, const String & tracing_id_, size_t max_sharing_column_count, - const ScanContextPtr & scan_context_); + const ScanContextPtr & scan_context_, + ReadTag read_tag_); Block getHeader() const { return toEmptyBlock(read_columns); } @@ -137,6 +138,9 @@ class DMFileReader size_t skip_packs); bool getCachedPacks(ColId col_id, size_t start_pack_id, size_t pack_count, size_t read_rows, ColumnPtr & col) const; + void addScannedRows(UInt64 rows); + void addSkippedRows(UInt64 rows); + DMFilePtr dmfile; ColumnDefines read_columns; ColumnStreams column_streams{}; @@ -174,6 +178,7 @@ class DMFileReader ColumnCachePtr column_cache; const ScanContextPtr scan_context; + const ReadTag read_tag; const size_t rows_threshold_per_read; diff --git a/dbms/src/Storages/DeltaMerge/ReadMode.h b/dbms/src/Storages/DeltaMerge/ReadMode.h index 8ec6c4654e2..ec3623d969d 100644 --- a/dbms/src/Storages/DeltaMerge/ReadMode.h +++ b/dbms/src/Storages/DeltaMerge/ReadMode.h @@ -39,4 +39,11 @@ enum class ReadMode Bitmap, }; +enum class ReadTag +{ + Internal, // Read columns required by some internal tasks. + Query, // Read columns required by queries. + MVCC, // Read columns to build MVCC bitmap. + LMFilter, // Read columns required by late-materialization filter. +}; } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ScanContext.cpp b/dbms/src/Storages/DeltaMerge/ScanContext.cpp index af92e6f5997..8dae573e5d8 100644 --- a/dbms/src/Storages/DeltaMerge/ScanContext.cpp +++ b/dbms/src/Storages/DeltaMerge/ScanContext.cpp @@ -12,26 +12,101 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" #include #pragma GCC diagnostic pop +#include #include #include namespace DB::DM { +void ScanContext::setRegionNumOfCurrentInstance(uint64_t region_num) +{ + region_num_of_instance[current_instance_id] = region_num; + // total_local_region_num may be updated later if some regions are not available in current instance. + total_local_region_num = region_num; +} +void ScanContext::setStreamCost( + uint64_t local_min_ns, + uint64_t local_max_ns, + uint64_t remote_min_ns, + uint64_t remote_max_ns) +{ + local_min_stream_cost_ns = local_min_ns; + local_max_stream_cost_ns = local_max_ns; + remote_min_stream_cost_ns = remote_min_ns; + remote_max_stream_cost_ns = remote_max_ns; +} + +void ScanContext::serializeRegionNumOfInstance(tipb::TiFlashScanContext & proto) const +{ + for (const auto & [id, num] : region_num_of_instance) + { + auto * p = proto.add_regions_of_instance(); + p->set_instance_id(id); + p->set_region_num(num); + } +} + +void ScanContext::deserializeRegionNumberOfInstance(const tipb::TiFlashScanContext & proto) +{ + for (const auto & t : proto.regions_of_instance()) + { + region_num_of_instance[t.instance_id()] = t.region_num(); + } +} + +void ScanContext::mergeRegionNumberOfInstance(const ScanContext & other) +{ + for (const auto & [id, num] : other.region_num_of_instance) + { + region_num_of_instance[id] += num; + } +} + +void ScanContext::mergeRegionNumberOfInstance(const tipb::TiFlashScanContext & other) +{ + for (const auto & t : other.regions_of_instance()) + { + region_num_of_instance[t.instance_id()] += t.region_num(); + } +} + +void ScanContext::mergeStreamCost( + uint64_t local_min_ns, + uint64_t local_max_ns, + uint64_t remote_min_ns, + uint64_t remote_max_ns) +{ + if (local_min_stream_cost_ns == 0 || local_min_ns < local_min_stream_cost_ns) + local_min_stream_cost_ns = local_min_ns; + if (local_max_ns > local_max_stream_cost_ns) + local_max_stream_cost_ns = local_max_ns; + if (remote_min_stream_cost_ns == 0 || remote_min_ns < remote_min_stream_cost_ns) + remote_min_stream_cost_ns = remote_min_ns; + if (remote_max_ns > remote_max_stream_cost_ns) + remote_max_stream_cost_ns = remote_max_ns; +} + String ScanContext::toJson() const { static constexpr double NS_TO_MS_SCALE = 1'000'000.0; Poco::JSON::Object::Ptr json = new Poco::JSON::Object(); - json->set("dmfile_scan_rows", total_dmfile_scanned_rows.load()); - json->set("dmfile_skip_rows", total_dmfile_skipped_rows.load()); + json->set("dmfile_data_scanned_rows", dmfile_data_scanned_rows.load()); + json->set("dmfile_data_skipped_rows", dmfile_data_skipped_rows.load()); + json->set("dmfile_mvcc_scanned_rows", dmfile_mvcc_scanned_rows.load()); + json->set("dmfile_mvcc_skipped_rows", dmfile_mvcc_skipped_rows.load()); + json->set("dmfile_lm_filter_scanned_rows", dmfile_lm_filter_scanned_rows.load()); + json->set("dmfile_lm_filter_skipped_rows", dmfile_lm_filter_skipped_rows.load()); json->set("dmfile_read_time", fmt::format("{:.3f}ms", total_dmfile_read_time_ns.load() / NS_TO_MS_SCALE)); - json->set("remote_region_num", total_remote_region_num.load()); - json->set("local_region_num", total_local_region_num.load()); + json->set("num_remote_region", total_remote_region_num.load()); + json->set("num_local_region", total_local_region_num.load()); + json->set("num_stale_read", num_stale_read.load()); json->set("read_bytes", user_read_bytes.load()); @@ -59,8 +134,104 @@ String ScanContext::toJson() const json->set("build_stream_time", fmt::format("{:.3f}ms", build_inputstream_time_ns.load() / NS_TO_MS_SCALE)); json->set("build_bitmap_time", fmt::format("{:.3f}ms", build_bitmap_time_ns.load() / NS_TO_MS_SCALE)); + json->set("local_min_stream_cost_ms", fmt::format("{:.3f}ms", local_min_stream_cost_ns / NS_TO_MS_SCALE)); + json->set("local_max_stream_cost_ms", fmt::format("{:.3f}ms", local_max_stream_cost_ns / NS_TO_MS_SCALE)); + json->set("remote_min_stream_cost_ms", fmt::format("{:.3f}ms", remote_min_stream_cost_ns / NS_TO_MS_SCALE)); + json->set("remote_max_stream_cost_ms", fmt::format("{:.3f}ms", remote_max_stream_cost_ns / NS_TO_MS_SCALE)); + + auto to_json_object = [](const String & id, uint64_t num) { + Poco::JSON::Object::Ptr json = new Poco::JSON::Object(); + json->set("instance_id", id); + json->set("region_num", num); + return json; + }; + auto to_json_array = [&to_json_object](const RegionNumOfInstance & region_num_of_instance) { + Poco::JSON::Array::Ptr arr = new Poco::JSON::Array(); + for (const auto & [id, num] : region_num_of_instance) + { + arr->add(to_json_object(id, num)); + } + return arr; + }; + json->set("region_num_of_instance", to_json_array(region_num_of_instance)); + std::stringstream buf; json->stringify(buf); return buf.str(); } + +String getHostName(const LoggerPtr & log) +{ + char hostname[1024]; + if (::gethostname(hostname, sizeof(hostname)) != 0) + { + LOG_ERROR(log, "gethostname failed: {}", errno); + return {}; + } + return hostname; +} + +bool isLocalAddress(const String & address) +{ + static const std::vector local_list{// ivp4 + "0.0.0.0", + "127.", + "localhost", + // ipv6 + "0:0:0:0:0:0:0", + "[0:0:0:0:0:0:0", + ":", + "[:"}; + for (const auto & local_prefix : local_list) + { + if (address.starts_with(local_prefix)) + { + return true; + } + } + return false; +} + +String getPort(const String & address) +{ + auto pos = address.find_last_of(':'); + if (pos == std::string::npos) + { + return {}; + } + return address.substr(pos + 1); +} + +String getCurrentInstanceId(const String & flash_server_addr, const LoggerPtr & log) +{ + if (!isLocalAddress(flash_server_addr)) + { + return flash_server_addr; + } + + auto hostname = getHostName(log); + if (hostname.empty()) + { + return Poco::UUIDGenerator().createRandom().toString(); + } + + auto port = getPort(flash_server_addr); + if (!port.empty()) + { + return hostname + ":" + port; + } + else + { + auto uuid = Poco::UUIDGenerator().createRandom().toString(); + // hostname + uuid may too long, so cut the uuid. + return hostname + "-" + uuid.substr(0, std::min(8, uuid.size())); + } +} + +void ScanContext::initCurrentInstanceId(Poco::Util::AbstractConfiguration & config, const LoggerPtr & log) +{ + auto flash_server_addr = config.getString("flash.service_addr", "0.0.0.0:3930"); + current_instance_id = getCurrentInstanceId(flash_server_addr, log); + LOG_INFO(log, "flash_server_addr={}, current_instance_id={}", flash_server_addr, current_instance_id); +} } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ScanContext.h b/dbms/src/Storages/DeltaMerge/ScanContext.h index 20d852082ce..5203f7bf83e 100644 --- a/dbms/src/Storages/DeltaMerge/ScanContext.h +++ b/dbms/src/Storages/DeltaMerge/ScanContext.h @@ -14,6 +14,8 @@ #pragma once +#include +#include #include #include #include @@ -32,17 +34,12 @@ namespace DB::DM class ScanContext { public: - /// sum of scanned packs in dmfiles(both stable and ColumnFileBig) among this query - std::atomic total_dmfile_scanned_packs{0}; - - /// sum of skipped packs in dmfiles(both stable and ColumnFileBig) among this query - std::atomic total_dmfile_skipped_packs{0}; - - /// sum of scanned rows in dmfiles(both stable and ColumnFileBig) among this query - std::atomic total_dmfile_scanned_rows{0}; - - /// sum of skipped rows in dmfiles(both stable and ColumnFileBig) among this query - std::atomic total_dmfile_skipped_rows{0}; + std::atomic dmfile_data_scanned_rows{0}; + std::atomic dmfile_data_skipped_rows{0}; + std::atomic dmfile_mvcc_scanned_rows{0}; + std::atomic dmfile_mvcc_skipped_rows{0}; + std::atomic dmfile_lm_filter_scanned_rows{0}; + std::atomic dmfile_lm_filter_skipped_rows{0}; std::atomic total_dmfile_rough_set_index_check_time_ns{0}; std::atomic total_dmfile_read_time_ns{0}; @@ -93,49 +90,96 @@ class ScanContext void deserialize(const tipb::TiFlashScanContext & tiflash_scan_context_pb) { - total_dmfile_scanned_packs = tiflash_scan_context_pb.total_dmfile_scanned_packs(); - total_dmfile_skipped_packs = tiflash_scan_context_pb.total_dmfile_skipped_packs(); - total_dmfile_scanned_rows = tiflash_scan_context_pb.total_dmfile_scanned_rows(); - total_dmfile_skipped_rows = tiflash_scan_context_pb.total_dmfile_skipped_rows(); - total_dmfile_rough_set_index_check_time_ns - = tiflash_scan_context_pb.total_dmfile_rough_set_index_check_time_ms() * 1000000; - total_dmfile_read_time_ns = tiflash_scan_context_pb.total_dmfile_read_time_ms() * 1000000; - create_snapshot_time_ns = tiflash_scan_context_pb.total_create_snapshot_time_ms() * 1000000; - total_remote_region_num = tiflash_scan_context_pb.total_remote_region_num(); - total_local_region_num = tiflash_scan_context_pb.total_local_region_num(); - user_read_bytes = tiflash_scan_context_pb.total_user_read_bytes(); + dmfile_data_scanned_rows = tiflash_scan_context_pb.dmfile_data_scanned_rows(); + dmfile_data_skipped_rows = tiflash_scan_context_pb.dmfile_data_skipped_rows(); + dmfile_mvcc_scanned_rows = tiflash_scan_context_pb.dmfile_mvcc_scanned_rows(); + dmfile_mvcc_skipped_rows = tiflash_scan_context_pb.dmfile_mvcc_skipped_rows(); + dmfile_lm_filter_scanned_rows = tiflash_scan_context_pb.dmfile_lm_filter_scanned_rows(); + dmfile_lm_filter_skipped_rows = tiflash_scan_context_pb.dmfile_lm_filter_skipped_rows(); + total_dmfile_rough_set_index_check_time_ns = tiflash_scan_context_pb.total_dmfile_rs_check_ms() * 1000000; + total_dmfile_read_time_ns = tiflash_scan_context_pb.total_dmfile_read_ms() * 1000000; + create_snapshot_time_ns = tiflash_scan_context_pb.total_build_snapshot_ms() * 1000000; + total_remote_region_num = tiflash_scan_context_pb.remote_regions(); + total_local_region_num = tiflash_scan_context_pb.local_regions(); + user_read_bytes = tiflash_scan_context_pb.user_read_bytes(); learner_read_ns = tiflash_scan_context_pb.total_learner_read_ms() * 1000000; - disagg_read_cache_hit_size = tiflash_scan_context_pb.total_disagg_read_cache_hit_size(); - disagg_read_cache_miss_size = tiflash_scan_context_pb.total_disagg_read_cache_miss_size(); + disagg_read_cache_hit_size = tiflash_scan_context_pb.disagg_read_cache_hit_bytes(); + disagg_read_cache_miss_size = tiflash_scan_context_pb.disagg_read_cache_miss_bytes(); + + num_segments = tiflash_scan_context_pb.segments(); + num_read_tasks = tiflash_scan_context_pb.read_tasks(); + + delta_rows = tiflash_scan_context_pb.delta_rows(); + delta_bytes = tiflash_scan_context_pb.delta_bytes(); + + mvcc_input_rows = tiflash_scan_context_pb.mvcc_input_rows(); + mvcc_input_bytes = tiflash_scan_context_pb.mvcc_input_bytes(); + mvcc_output_rows = tiflash_scan_context_pb.mvcc_output_rows(); + late_materialization_skip_rows = tiflash_scan_context_pb.lm_skip_rows(); + build_bitmap_time_ns = tiflash_scan_context_pb.total_build_bitmap_ms() * 1000000; + num_stale_read = tiflash_scan_context_pb.stale_read_regions(); + build_inputstream_time_ns = tiflash_scan_context_pb.total_build_inputstream_ms() * 1000000; + + setStreamCost( + tiflash_scan_context_pb.min_local_stream_ms() * 1000000, + tiflash_scan_context_pb.max_local_stream_ms() * 1000000, + tiflash_scan_context_pb.min_remote_stream_ms() * 1000000, + tiflash_scan_context_pb.max_remote_stream_ms() * 1000000); + + deserializeRegionNumberOfInstance(tiflash_scan_context_pb); } tipb::TiFlashScanContext serialize() { tipb::TiFlashScanContext tiflash_scan_context_pb{}; - tiflash_scan_context_pb.set_total_dmfile_scanned_packs(total_dmfile_scanned_packs); - tiflash_scan_context_pb.set_total_dmfile_skipped_packs(total_dmfile_skipped_packs); - tiflash_scan_context_pb.set_total_dmfile_scanned_rows(total_dmfile_scanned_rows); - tiflash_scan_context_pb.set_total_dmfile_skipped_rows(total_dmfile_skipped_rows); - tiflash_scan_context_pb.set_total_dmfile_rough_set_index_check_time_ms( - total_dmfile_rough_set_index_check_time_ns / 1000000); - tiflash_scan_context_pb.set_total_dmfile_read_time_ms(total_dmfile_read_time_ns / 1000000); - tiflash_scan_context_pb.set_total_create_snapshot_time_ms(create_snapshot_time_ns / 1000000); - tiflash_scan_context_pb.set_total_remote_region_num(total_remote_region_num); - tiflash_scan_context_pb.set_total_local_region_num(total_local_region_num); - tiflash_scan_context_pb.set_total_user_read_bytes(user_read_bytes); + tiflash_scan_context_pb.set_dmfile_data_scanned_rows(dmfile_data_scanned_rows); + tiflash_scan_context_pb.set_dmfile_data_skipped_rows(dmfile_data_skipped_rows); + tiflash_scan_context_pb.set_dmfile_mvcc_scanned_rows(dmfile_mvcc_scanned_rows); + tiflash_scan_context_pb.set_dmfile_mvcc_skipped_rows(dmfile_mvcc_skipped_rows); + tiflash_scan_context_pb.set_dmfile_lm_filter_scanned_rows(dmfile_lm_filter_scanned_rows); + tiflash_scan_context_pb.set_dmfile_lm_filter_skipped_rows(dmfile_lm_filter_skipped_rows); + tiflash_scan_context_pb.set_total_dmfile_rs_check_ms(total_dmfile_rough_set_index_check_time_ns / 1000000); + tiflash_scan_context_pb.set_total_dmfile_read_ms(total_dmfile_read_time_ns / 1000000); + tiflash_scan_context_pb.set_total_build_snapshot_ms(create_snapshot_time_ns / 1000000); + tiflash_scan_context_pb.set_remote_regions(total_remote_region_num); + tiflash_scan_context_pb.set_local_regions(total_local_region_num); + tiflash_scan_context_pb.set_user_read_bytes(user_read_bytes); tiflash_scan_context_pb.set_total_learner_read_ms(learner_read_ns / 1000000); - tiflash_scan_context_pb.set_total_disagg_read_cache_hit_size(disagg_read_cache_hit_size); - tiflash_scan_context_pb.set_total_disagg_read_cache_miss_size(disagg_read_cache_miss_size); + tiflash_scan_context_pb.set_disagg_read_cache_hit_bytes(disagg_read_cache_hit_size); + tiflash_scan_context_pb.set_disagg_read_cache_miss_bytes(disagg_read_cache_miss_size); + + tiflash_scan_context_pb.set_segments(num_segments); + tiflash_scan_context_pb.set_read_tasks(num_read_tasks); + + tiflash_scan_context_pb.set_delta_rows(delta_rows); + tiflash_scan_context_pb.set_delta_bytes(delta_bytes); + + tiflash_scan_context_pb.set_mvcc_input_rows(mvcc_input_rows); + tiflash_scan_context_pb.set_mvcc_input_bytes(mvcc_input_bytes); + tiflash_scan_context_pb.set_mvcc_output_rows(mvcc_output_rows); + tiflash_scan_context_pb.set_lm_skip_rows(late_materialization_skip_rows); + tiflash_scan_context_pb.set_total_build_bitmap_ms(build_bitmap_time_ns / 1000000); + tiflash_scan_context_pb.set_stale_read_regions(num_stale_read); + tiflash_scan_context_pb.set_total_build_inputstream_ms(build_inputstream_time_ns / 1000000); + + tiflash_scan_context_pb.set_min_local_stream_ms(local_min_stream_cost_ns / 1000000); + tiflash_scan_context_pb.set_max_local_stream_ms(local_max_stream_cost_ns / 1000000); + tiflash_scan_context_pb.set_min_remote_stream_ms(remote_min_stream_cost_ns / 1000000); + tiflash_scan_context_pb.set_max_remote_stream_ms(remote_max_stream_cost_ns / 1000000); + + serializeRegionNumOfInstance(tiflash_scan_context_pb); return tiflash_scan_context_pb; } void merge(const ScanContext & other) { - total_dmfile_scanned_packs += other.total_dmfile_scanned_packs; - total_dmfile_skipped_packs += other.total_dmfile_skipped_packs; - total_dmfile_scanned_rows += other.total_dmfile_scanned_rows; - total_dmfile_skipped_rows += other.total_dmfile_skipped_rows; + dmfile_data_scanned_rows += other.dmfile_data_scanned_rows; + dmfile_data_skipped_rows += other.dmfile_data_skipped_rows; + dmfile_mvcc_scanned_rows += other.dmfile_mvcc_scanned_rows; + dmfile_mvcc_skipped_rows += other.dmfile_mvcc_skipped_rows; + dmfile_lm_filter_scanned_rows += other.dmfile_lm_filter_scanned_rows; + dmfile_lm_filter_skipped_rows += other.dmfile_lm_filter_skipped_rows; total_dmfile_rough_set_index_check_time_ns += other.total_dmfile_rough_set_index_check_time_ns; total_dmfile_read_time_ns += other.total_dmfile_read_time_ns; @@ -155,26 +199,93 @@ class ScanContext mvcc_input_rows += other.mvcc_input_rows; mvcc_input_bytes += other.mvcc_input_bytes; mvcc_output_rows += other.mvcc_output_rows; + late_materialization_skip_rows += other.late_materialization_skip_rows; + + learner_read_ns += other.learner_read_ns; + create_snapshot_time_ns += other.create_snapshot_time_ns; + build_inputstream_time_ns += other.build_inputstream_time_ns; + build_bitmap_time_ns += other.build_bitmap_time_ns; + + num_stale_read += other.num_stale_read; + + mergeStreamCost( + other.local_min_stream_cost_ns, + other.local_max_stream_cost_ns, + other.remote_min_stream_cost_ns, + other.remote_max_stream_cost_ns); + + mergeRegionNumberOfInstance(other); } void merge(const tipb::TiFlashScanContext & other) { - total_dmfile_scanned_packs += other.total_dmfile_scanned_packs(); - total_dmfile_skipped_packs += other.total_dmfile_skipped_packs(); - total_dmfile_scanned_rows += other.total_dmfile_scanned_rows(); - total_dmfile_skipped_rows += other.total_dmfile_skipped_rows(); - total_dmfile_rough_set_index_check_time_ns += other.total_dmfile_rough_set_index_check_time_ms() * 1000000; - total_dmfile_read_time_ns += other.total_dmfile_read_time_ms() * 1000000; - create_snapshot_time_ns += other.total_create_snapshot_time_ms() * 1000000; - total_local_region_num += other.total_local_region_num(); - total_remote_region_num += other.total_remote_region_num(); - user_read_bytes += other.total_user_read_bytes(); + dmfile_data_scanned_rows += other.dmfile_data_scanned_rows(); + dmfile_data_skipped_rows += other.dmfile_data_skipped_rows(); + dmfile_mvcc_scanned_rows += other.dmfile_mvcc_scanned_rows(); + dmfile_mvcc_skipped_rows += other.dmfile_mvcc_skipped_rows(); + dmfile_lm_filter_scanned_rows += other.dmfile_lm_filter_scanned_rows(); + dmfile_lm_filter_skipped_rows += other.dmfile_lm_filter_skipped_rows(); + total_dmfile_rough_set_index_check_time_ns += other.total_dmfile_rs_check_ms() * 1000000; + total_dmfile_read_time_ns += other.total_dmfile_read_ms() * 1000000; + create_snapshot_time_ns += other.total_build_snapshot_ms() * 1000000; + total_local_region_num += other.local_regions(); + total_remote_region_num += other.remote_regions(); + user_read_bytes += other.user_read_bytes(); learner_read_ns += other.total_learner_read_ms() * 1000000; - disagg_read_cache_hit_size += other.total_disagg_read_cache_hit_size(); - disagg_read_cache_miss_size += other.total_disagg_read_cache_miss_size(); + disagg_read_cache_hit_size += other.disagg_read_cache_hit_bytes(); + disagg_read_cache_miss_size += other.disagg_read_cache_miss_bytes(); + + num_segments += other.segments(); + num_read_tasks += other.read_tasks(); + + delta_rows += other.delta_rows(); + delta_bytes += other.delta_bytes(); + + mvcc_input_rows += other.mvcc_input_rows(); + mvcc_input_bytes += other.mvcc_input_bytes(); + mvcc_output_rows += other.mvcc_output_rows(); + late_materialization_skip_rows += other.lm_skip_rows(); + build_bitmap_time_ns += other.total_build_bitmap_ms() * 1000000; + num_stale_read += other.stale_read_regions(); + build_inputstream_time_ns += other.total_build_inputstream_ms() * 1000000; + + mergeStreamCost( + other.min_local_stream_ms() * 1000000, + other.max_local_stream_ms() * 1000000, + other.min_remote_stream_ms() * 1000000, + other.max_remote_stream_ms() * 1000000); + + mergeRegionNumberOfInstance(other); } String toJson() const; + + void setRegionNumOfCurrentInstance(uint64_t region_num); + void setStreamCost(uint64_t local_min_ns, uint64_t local_max_ns, uint64_t remote_min_ns, uint64_t remote_max_ns); + + static void initCurrentInstanceId(Poco::Util::AbstractConfiguration & config, const LoggerPtr & log); + +private: + void serializeRegionNumOfInstance(tipb::TiFlashScanContext & proto) const; + void deserializeRegionNumberOfInstance(const tipb::TiFlashScanContext & proto); + void mergeRegionNumberOfInstance(const ScanContext & other); + void mergeRegionNumberOfInstance(const tipb::TiFlashScanContext & other); + void mergeStreamCost(uint64_t local_min_ns, uint64_t local_max_ns, uint64_t remote_min_ns, uint64_t remote_max_ns); + + // instance_id -> number of regions. + // `region_num_of_instance` is accessed by a single thread. + using RegionNumOfInstance = std::unordered_map; + RegionNumOfInstance region_num_of_instance; + + // These members `*_stream_cost_ns` are accessed by a single thread. + uint64_t local_min_stream_cost_ns{0}; + uint64_t local_max_stream_cost_ns{0}; + uint64_t remote_min_stream_cost_ns{0}; + uint64_t remote_max_stream_cost_ns{0}; + + // `current_instance_id` is a identification of this store. + // It only used to identify which store generated the ScanContext object. + inline static String current_instance_id; }; } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index e70efc34bfd..9df38d1fb97 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -583,8 +583,12 @@ bool Segment::isDefinitelyEmpty(DMContext & dm_context, const SegmentSnapshotPtr auto read_ranges = RowKeyRanges{rowkey_range}; { - BlockInputStreamPtr delta_stream - = std::make_shared(dm_context, segment_snap->delta, columns_to_read, rowkey_range); + BlockInputStreamPtr delta_stream = std::make_shared( + dm_context, + segment_snap->delta, + columns_to_read, + rowkey_range, + ReadTag::Internal); delta_stream = std::make_shared>(delta_stream, read_ranges, 0); delta_stream->readPrefix(); while (true) @@ -832,7 +836,8 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal( { LOG_TRACE(segment_snap->log, "Begin segment create input stream"); - auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, read_ranges, max_version); + auto read_tag = need_row_id ? ReadTag::MVCC : ReadTag::Query; + auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, read_ranges, read_tag, max_version); RowKeyRanges real_ranges; for (const auto & read_range : read_ranges) @@ -858,10 +863,12 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal( filter, max_version, expected_block_size, - false); + false, + read_tag); } else if (useCleanRead(segment_snap, columns_to_read)) { + RUNTIME_CHECK_MSG(!need_row_id, "'need_row_id is true, should not come here'"); // No delta, let's try some optimizations. stream = segment_snap->stable->getInputStream( dm_context, @@ -870,7 +877,8 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal( filter, max_version, expected_block_size, - true); + true, + read_tag); } else { @@ -880,10 +888,11 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal( real_ranges, filter, segment_snap->stable, - read_info.getDeltaReader(), + read_info.getDeltaReader(need_row_id ? ReadTag::MVCC : ReadTag::Query), read_info.index_begin, read_info.index_end, expected_block_size, + read_tag, max_version, need_row_id); } @@ -936,7 +945,7 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport( bool reorganize_block) const { RowKeyRanges data_ranges{data_range}; - auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, data_ranges); + auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, data_ranges, ReadTag::Internal); BlockInputStreamPtr data_stream = getPlacedStream( dm_context, @@ -944,10 +953,11 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport( data_ranges, EMPTY_RS_OPERATOR, segment_snap->stable, - read_info.getDeltaReader(), + read_info.getDeltaReader(ReadTag::Internal), read_info.index_begin, read_info.index_end, - expected_block_size); + expected_block_size, + ReadTag::Internal); data_stream = std::make_shared>(data_stream, data_ranges, 0); @@ -1026,15 +1036,17 @@ BlockInputStreamPtr Segment::getInputStreamModeFast( filter, std::numeric_limits::max(), expected_block_size, - /* enable_handle_clean_read */ enable_handle_clean_read, + enable_handle_clean_read, + ReadTag::Query, /* is_fast_scan */ true, - /* enable_del_clean_read */ enable_del_clean_read); + enable_del_clean_read); BlockInputStreamPtr delta_stream = std::make_shared( dm_context, segment_snap->delta, new_columns_to_read, - this->rowkey_range); + this->rowkey_range, + ReadTag::Query); // Do row key filtering based on data_ranges. delta_stream = std::make_shared>(delta_stream, data_ranges, 0); @@ -1090,13 +1102,15 @@ BlockInputStreamPtr Segment::getInputStreamModeRaw( EMPTY_RS_OPERATOR, std::numeric_limits::max(), expected_block_size, - /* enable_handle_clean_read */ false); + /* enable_handle_clean_read */ false, + ReadTag::Query); BlockInputStreamPtr delta_stream = std::make_shared( dm_context, segment_snap->delta, new_columns_to_read, - this->rowkey_range); + this->rowkey_range, + ReadTag::Query); // Do row key filtering based on data_ranges. delta_stream = std::make_shared>(delta_stream, data_ranges, 0); @@ -1504,7 +1518,7 @@ std::optional Segment::getSplitPointSlow( const auto & pk_col = getExtraHandleColumnDefine(is_common_handle); auto pk_col_defs = std::make_shared(ColumnDefines{pk_col}); // We need to create a new delta_reader here, because the one in read_info is used to read columns other than PK column. - auto delta_reader = read_info.getDeltaReader(pk_col_defs); + auto delta_reader = read_info.getDeltaReader(pk_col_defs, ReadTag::Internal); size_t exact_rows = 0; @@ -1519,7 +1533,8 @@ std::optional Segment::getSplitPointSlow( delta_reader, read_info.index_begin, read_info.index_end, - dm_context.stable_pack_rows); + dm_context.stable_pack_rows, + ReadTag::Internal); stream = std::make_shared>(stream, rowkey_ranges, 0); @@ -1545,7 +1560,8 @@ std::optional Segment::getSplitPointSlow( delta_reader, read_info.index_begin, read_info.index_end, - dm_context.stable_pack_rows); + dm_context.stable_pack_rows, + ReadTag::Internal); stream = std::make_shared>(stream, rowkey_ranges, 0); @@ -1792,7 +1808,8 @@ std::optional Segment::prepareSplitPhysical( // dm_context, *schema_snap, segment_snap, - {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}, + ReadTag::Internal); if (!opt_split_point.has_value()) opt_split_point = getSplitPointSlow(dm_context, read_info, segment_snap); @@ -1818,7 +1835,7 @@ std::optional Segment::prepareSplitPhysical( // StableValueSpacePtr other_stable; { - auto my_delta_reader = read_info.getDeltaReader(schema_snap); + auto my_delta_reader = read_info.getDeltaReader(schema_snap, ReadTag::Internal); RowKeyRanges my_ranges{my_range}; BlockInputStreamPtr my_data = getPlacedStream( @@ -1830,7 +1847,8 @@ std::optional Segment::prepareSplitPhysical( // my_delta_reader, read_info.index_begin, read_info.index_end, - dm_context.stable_pack_rows); + dm_context.stable_pack_rows, + ReadTag::Internal); my_data = std::make_shared>(my_data, my_ranges, 0); @@ -1849,7 +1867,7 @@ std::optional Segment::prepareSplitPhysical( // { // Write new segment's data - auto other_delta_reader = read_info.getDeltaReader(schema_snap); + auto other_delta_reader = read_info.getDeltaReader(schema_snap, ReadTag::Internal); RowKeyRanges other_ranges{other_range}; BlockInputStreamPtr other_data = getPlacedStream( @@ -1861,7 +1879,8 @@ std::optional Segment::prepareSplitPhysical( // other_delta_reader, read_info.index_begin, read_info.index_end, - dm_context.stable_pack_rows); + dm_context.stable_pack_rows, + ReadTag::Internal); other_data = std::make_shared>(other_data, other_ranges, 0); @@ -2051,7 +2070,8 @@ StableValueSpacePtr Segment::prepareMerge( dm_context, *schema_snap, segment_snap, - {RowKeyRange::newAll(segment->is_common_handle, segment->rowkey_column_size)}); + {RowKeyRange::newAll(segment->is_common_handle, segment->rowkey_column_size)}, + ReadTag::Internal); RowKeyRanges rowkey_ranges{segment->rowkey_range}; BlockInputStreamPtr stream = getPlacedStream( dm_context, @@ -2059,10 +2079,11 @@ StableValueSpacePtr Segment::prepareMerge( rowkey_ranges, EMPTY_RS_OPERATOR, segment_snap->stable, - read_info.getDeltaReader(), + read_info.getDeltaReader(ReadTag::Internal), read_info.index_begin, read_info.index_end, - dm_context.stable_pack_rows); + dm_context.stable_pack_rows, + ReadTag::Internal); stream = std::make_shared>(stream, rowkey_ranges, 0); stream = std::make_shared>( @@ -2243,7 +2264,8 @@ void Segment::placeDeltaIndex(DMContext & dm_context, const SegmentSnapshotPtr & dm_context, /*read_columns=*/{getExtraHandleColumnDefine(is_common_handle)}, segment_snap, - {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}, + ReadTag::Internal); } String Segment::simpleInfo() const @@ -2325,6 +2347,7 @@ Segment::ReadInfo Segment::getReadInfo( const ColumnDefines & read_columns, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, + ReadTag read_tag, UInt64 max_version) const { LOG_DEBUG(segment_snap->log, "Begin segment getReadInfo"); @@ -2332,9 +2355,14 @@ Segment::ReadInfo Segment::getReadInfo( auto new_read_columns = arrangeReadColumns(getExtraHandleColumnDefine(is_common_handle), read_columns); auto pk_ver_col_defs = std::make_shared( ColumnDefines{getExtraHandleColumnDefine(dm_context.is_common_handle), getVersionColumnDefine()}); - // Create a reader only for pk and version columns. - auto delta_reader - = std::make_shared(dm_context, segment_snap->delta, pk_ver_col_defs, this->rowkey_range); + // Create a reader that reads pk and version columns to update deltaindex. + // It related to MVCC, so always set a `ReadTag::MVCC` for it. + auto delta_reader = std::make_shared( + dm_context, + segment_snap->delta, + pk_ver_col_defs, + this->rowkey_range, + ReadTag::MVCC); auto [my_delta_index, fully_indexed] = ensurePlace(dm_context, segment_snap, delta_reader, read_ranges, max_version); @@ -2364,7 +2392,7 @@ Segment::ReadInfo Segment::getReadInfo( manager->refreshRef(segment_snap->delta->getSharedDeltaIndex()); return ReadInfo( - delta_reader->createNewReader(new_read_columns), + delta_reader->createNewReader(new_read_columns, read_tag), compacted_index->begin(), compacted_index->end(), new_read_columns); @@ -2399,6 +2427,7 @@ SkippableBlockInputStreamPtr Segment::getPlacedStream( const IndexIterator & delta_index_begin, const IndexIterator & delta_index_end, size_t expected_block_size, + ReadTag read_tag, UInt64 max_version, bool need_row_id) { @@ -2412,7 +2441,10 @@ SkippableBlockInputStreamPtr Segment::getPlacedStream( filter, max_version, expected_block_size, - false); + /* enable_handle_clean_read */ false, + read_tag, + /* is_fast_scan */ false, + /* enable_del_clean_read */ false); RowKeyRange rowkey_range = rowkey_ranges.size() == 1 ? rowkey_ranges[0] : mergeRanges(rowkey_ranges, rowkey_ranges[0].is_common_handle, rowkey_ranges[0].rowkey_column_size); @@ -2606,7 +2638,8 @@ bool Segment::placeUpsert( delta_reader, compacted_index->begin(), compacted_index->end(), - dm_context.stable_pack_rows); + dm_context.stable_pack_rows, + ReadTag::MVCC); if (do_sort) return DM::placeInsert( @@ -2658,7 +2691,8 @@ bool Segment::placeDelete( delta_reader, compacted_index->begin(), compacted_index->end(), - dm_context.stable_pack_rows); + dm_context.stable_pack_rows, + ReadTag::MVCC); delete_stream = std::make_shared>(delete_stream, delete_ranges, 0); @@ -2695,7 +2729,8 @@ bool Segment::placeDelete( delta_reader, compacted_index->begin(), compacted_index->end(), - dm_context.stable_pack_rows); + dm_context.stable_pack_rows, + ReadTag::MVCC); fully_indexed &= DM::placeDelete( merged_stream, block, @@ -2939,6 +2974,7 @@ BitmapFilterPtr Segment::buildBitmapFilterStableOnly( max_version, expected_block_size, /*enable_handle_clean_read*/ false, + ReadTag::MVCC, /*is_fast_scan*/ false, /*enable_del_clean_read*/ false, /*read_packs*/ some_packs_sets, @@ -2988,6 +3024,7 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream( max_version, expected_block_size, enable_handle_clean_read, + ReadTag::Query, is_fast_scan, enable_del_clean_read); @@ -2996,7 +3033,8 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream( dm_context, segment_snap->delta, columns_to_read_ptr, - this->rowkey_range); + this->rowkey_range, + ReadTag::Query); return std::make_shared( columns_to_read, @@ -3032,10 +3070,15 @@ BlockInputStreamPtr Segment::getLateMaterializationStream( max_version, expected_block_size, enable_handle_clean_read, + ReadTag::LMFilter, is_fast_scan, enable_del_clean_read); - SkippableBlockInputStreamPtr filter_column_delta_stream - = std::make_shared(dm_context, segment_snap->delta, filter_columns, this->rowkey_range); + SkippableBlockInputStreamPtr filter_column_delta_stream = std::make_shared( + dm_context, + segment_snap->delta, + filter_columns, + this->rowkey_range, + ReadTag::LMFilter); if (unlikely(filter_columns->size() == columns_to_read.size())) { @@ -3114,13 +3157,15 @@ BlockInputStreamPtr Segment::getLateMaterializationStream( max_version, expected_block_size, enable_handle_clean_read, + ReadTag::Query, is_fast_scan, enable_del_clean_read); SkippableBlockInputStreamPtr rest_column_delta_stream = std::make_shared( dm_context, segment_snap->delta, rest_columns_to_read, - this->rowkey_range); + this->rowkey_range, + ReadTag::Query); SkippableBlockInputStreamPtr rest_column_stream = std::make_shared( *rest_columns_to_read, rest_column_stable_stream, diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index e11e7b19bf1..402184e906e 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -112,10 +112,13 @@ class Segment , read_columns(read_columns_) {} - DeltaValueReaderPtr getDeltaReader() const { return delta_reader->createNewReader(read_columns); } - DeltaValueReaderPtr getDeltaReader(ColumnDefinesPtr columns) const + DeltaValueReaderPtr getDeltaReader(ReadTag read_tag) const { - return delta_reader->createNewReader(columns); + return delta_reader->createNewReader(read_columns, read_tag); + } + DeltaValueReaderPtr getDeltaReader(ColumnDefinesPtr columns, ReadTag read_tag) const + { + return delta_reader->createNewReader(columns, read_tag); } }; @@ -603,6 +606,7 @@ class Segment const ColumnDefines & read_columns, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, + ReadTag read_tag, UInt64 max_version = std::numeric_limits::max()) const; static ColumnDefinesPtr arrangeReadColumns(const ColumnDefine & handle, const ColumnDefines & columns_to_read); @@ -619,6 +623,7 @@ class Segment const IndexIterator & delta_index_begin, const IndexIterator & delta_index_end, size_t expected_block_size, + ReadTag read_tag, UInt64 max_version = std::numeric_limits::max(), bool need_row_id = false); diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 6657255b3bf..96e5030c2e4 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -446,6 +446,7 @@ SkippableBlockInputStreamPtr StableValueSpace::Snapshot::getInputStream( UInt64 max_data_version, size_t expected_block_size, bool enable_handle_clean_read, + ReadTag read_tag, bool is_fast_scan, bool enable_del_clean_read, const std::vector & read_packs, @@ -470,7 +471,8 @@ SkippableBlockInputStreamPtr StableValueSpace::Snapshot::getInputStream( .setColumnCache(column_caches[i]) .setTracingID(context.tracing_id) .setRowsThreshold(expected_block_size) - .setReadPacks(read_packs.size() > i ? read_packs[i] : nullptr); + .setReadPacks(read_packs.size() > i ? read_packs[i] : nullptr) + .setReadTag(read_tag); streams.push_back(builder.build(stable->files[i], read_columns, rowkey_ranges, context.scan_context)); rows.push_back(stable->files[i]->getRows()); } diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.h b/dbms/src/Storages/DeltaMerge/StableValueSpace.h index 878924a7587..75f6c2145af 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.h @@ -17,6 +17,8 @@ #include #include #include +#include +#include #include #include #include @@ -227,6 +229,7 @@ class StableValueSpace : public std::enable_shared_from_this UInt64 max_data_version, size_t expected_block_size, bool enable_handle_clean_read, + ReadTag read_tag, bool is_fast_scan = false, bool enable_del_clean_read = false, const std::vector & read_packs = {}, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 496a08eb072..1e62907bc4b 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -577,10 +577,8 @@ try while (in->read()) {}; in->readSuffix(); - ASSERT_EQ(scan_context->total_dmfile_scanned_packs, 7); - ASSERT_EQ(scan_context->total_dmfile_scanned_rows, 50000); - ASSERT_EQ(scan_context->total_dmfile_skipped_packs, 0); - ASSERT_EQ(scan_context->total_dmfile_skipped_rows, 0); + ASSERT_EQ(scan_context->dmfile_data_scanned_rows, 50000); + ASSERT_EQ(scan_context->dmfile_data_skipped_rows, 0); auto filter = createGreater( Attr{col_a_define.name, col_a_define.id, DataTypeFactory::instance().get("Int64")}, @@ -609,10 +607,8 @@ try while (in->read()) {}; in->readSuffix(); - ASSERT_EQ(scan_context->total_dmfile_scanned_packs, 6); - ASSERT_EQ(scan_context->total_dmfile_scanned_rows, 41808); - ASSERT_EQ(scan_context->total_dmfile_skipped_packs, 1); - ASSERT_EQ(scan_context->total_dmfile_skipped_rows, 8192); + ASSERT_EQ(scan_context->dmfile_data_scanned_rows, 41808); + ASSERT_EQ(scan_context->dmfile_data_skipped_rows, 8192); } CATCH diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp index 7512a01082f..36473e94b4d 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp @@ -215,8 +215,12 @@ static void checkDeltaValueSpaceData( ASSERT_EQ(rows, expected_all_rows); { - auto reader - = std::make_shared(dm_context, snapshot, table_columns, RowKeyRange::newAll(false, 1)); + auto reader = std::make_shared( + dm_context, + snapshot, + table_columns, + RowKeyRange::newAll(false, 1), + ReadTag::Internal); auto columns = expected_all_blocks[0].cloneEmptyColumns(); ASSERT_EQ(reader->readRows(columns, 0, expected_all_rows, nullptr), expected_all_rows); Blocks result_blocks; @@ -227,8 +231,12 @@ static void checkDeltaValueSpaceData( // read with a specific range { // For `ColumnFileBig`, the same column file reader cannot be used twice, wo we create a new `DeltaValueReader` here. - auto reader - = std::make_shared(dm_context, snapshot, table_columns, RowKeyRange::newAll(false, 1)); + auto reader = std::make_shared( + dm_context, + snapshot, + table_columns, + RowKeyRange::newAll(false, 1), + ReadTag::Internal); auto columns = expected_all_blocks[0].cloneEmptyColumns(); RowKeyRange read_range = RowKeyRange::fromHandleRange(handle_range); ASSERT_EQ(reader->readRows(columns, 0, expected_all_rows, &read_range), expected_range_rows); @@ -536,7 +544,8 @@ TEST_F(DeltaValueSpaceTest, Restore) dmContext(), old_delta_snapshot, table_columns, - RowKeyRange::newAll(false, 1)); + RowKeyRange::newAll(false, 1), + ReadTag::Internal); old_delta_stream.readPrefix(); while (true) { @@ -555,7 +564,8 @@ TEST_F(DeltaValueSpaceTest, Restore) dmContext(), new_delta_snapshot, table_columns, - RowKeyRange::newAll(false, 1)); + RowKeyRange::newAll(false, 1), + ReadTag::Internal); new_delta_stream.readPrefix(); while (true) { @@ -676,8 +686,12 @@ TEST_F(DeltaValueSpaceTest, GetPlaceItems) // write some more data after create snapshot appendBlockToDeltaValueSpace(dmContext(), delta, total_rows_write, num_rows_write_per_batch); ASSERT_EQ(delta->getRows(true), total_rows_write + num_rows_write_per_batch); - auto reader - = std::make_shared(dmContext(), snapshot, table_columns, RowKeyRange::newAll(false, 1)); + auto reader = std::make_shared( + dmContext(), + snapshot, + table_columns, + RowKeyRange::newAll(false, 1), + ReadTag::Internal); auto place_items = reader->getPlaceItems(0, 0, snapshot->getRows(), snapshot->getDeletes()); ASSERT_EQ(place_items.size(), 2); size_t total_place_rows = 0; @@ -697,8 +711,12 @@ TEST_F(DeltaValueSpaceTest, ShouldPlace) appendBlockToDeltaValueSpace(dmContext(), delta, 0, num_rows_write_per_batch, tso); { auto snapshot = delta->createSnapshot(dmContext(), false, CurrentMetrics::DT_SnapshotOfRead); - auto reader - = std::make_shared(dmContext(), snapshot, table_columns, RowKeyRange::newAll(false, 1)); + auto reader = std::make_shared( + dmContext(), + snapshot, + table_columns, + RowKeyRange::newAll(false, 1), + ReadTag::Internal); ASSERT_TRUE(reader->shouldPlace( dmContext(), snapshot->getSharedDeltaIndex(), @@ -715,8 +733,12 @@ TEST_F(DeltaValueSpaceTest, ShouldPlace) { delta->flush(dmContext()); auto snapshot = delta->createSnapshot(dmContext(), false, CurrentMetrics::DT_SnapshotOfRead); - auto reader - = std::make_shared(dmContext(), snapshot, table_columns, RowKeyRange::newAll(false, 1)); + auto reader = std::make_shared( + dmContext(), + snapshot, + table_columns, + RowKeyRange::newAll(false, 1), + ReadTag::Internal); ASSERT_TRUE(reader->shouldPlace( dmContext(), snapshot->getSharedDeltaIndex(), diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_skippable_block_input_stream.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_skippable_block_input_stream.cpp index 76e3f5c78dd..e2fc16f20c9 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_skippable_block_input_stream.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_skippable_block_input_stream.cpp @@ -53,6 +53,7 @@ class SkippableBlockInputStreamTest : public SegmentTestBasic std::numeric_limits::max(), DEFAULT_BLOCK_SIZE, enable_handle_clean_read, + ReadTag::Internal, is_fast_scan, enable_del_clean_read); @@ -61,7 +62,8 @@ class SkippableBlockInputStreamTest : public SegmentTestBasic *dm_context, snapshot->delta, columns_to_read_ptr, - segment->getRowKeyRange()); + segment->getRowKeyRange(), + ReadTag::Internal); return std::make_shared( columns_to_read, diff --git a/dbms/src/Storages/StorageDisaggregated.cpp b/dbms/src/Storages/StorageDisaggregated.cpp index 1b56877406a..ecdde33bbdf 100644 --- a/dbms/src/Storages/StorageDisaggregated.cpp +++ b/dbms/src/Storages/StorageDisaggregated.cpp @@ -126,9 +126,10 @@ void StorageDisaggregated::readThroughExchange( filterConditions(exec_context, group_builder, *analyzer); } -std::vector StorageDisaggregated::buildRemoteTableRanges() +std::tuple, UInt64> StorageDisaggregated::buildRemoteTableRanges() { std::unordered_map all_remote_regions; + UInt64 region_num = 0; for (auto physical_table_id : table_scan.getPhysicalTableIDs()) { const auto & table_regions_info = context.getDAGContext()->getTableRegionsInfoByTableID(physical_table_id); @@ -136,6 +137,7 @@ std::vector StorageDisaggregated::buildR RUNTIME_CHECK_MSG( table_regions_info.local_regions.empty(), "in disaggregated_compute_mode, local_regions should be empty"); + region_num += table_regions_info.remote_regions.size(); for (const auto & reg : table_regions_info.remote_regions) all_remote_regions[physical_table_id].emplace_back(std::cref(reg)); } @@ -149,7 +151,7 @@ std::vector StorageDisaggregated::buildR auto key_ranges = RemoteRequest::buildKeyRanges(remote_regions); remote_table_ranges.emplace_back(RemoteTableRange{physical_table_id, key_ranges}); } - return remote_table_ranges; + return std::make_tuple(std::move(remote_table_ranges), region_num); } std::vector StorageDisaggregated::buildBatchCopTasks( diff --git a/dbms/src/Storages/StorageDisaggregated.h b/dbms/src/Storages/StorageDisaggregated.h index 3a38f1bee89..b192881cb37 100644 --- a/dbms/src/Storages/StorageDisaggregated.h +++ b/dbms/src/Storages/StorageDisaggregated.h @@ -138,7 +138,7 @@ class StorageDisaggregated : public IStorage private: using RemoteTableRange = std::pair; - std::vector buildRemoteTableRanges(); + std::tuple, UInt64> buildRemoteTableRanges(); std::vector buildBatchCopTasks( const std::vector & remote_table_ranges, const pingcap::kv::LabelFilter & label_filter); diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index e554d227751..5a2ae4235f1 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -172,7 +172,8 @@ DM::Remote::RNReadTaskPtr StorageDisaggregated::buildReadTask( // First split the read task for different write nodes. // For each write node, a BatchCopTask is built. { - auto remote_table_ranges = buildRemoteTableRanges(); + auto [remote_table_ranges, region_num] = buildRemoteTableRanges(); + scan_context->setRegionNumOfCurrentInstance(region_num); // only send to tiflash node with label [{"engine":"tiflash"}, {"engine-role":"write"}] const auto label_filter = pingcap::kv::labelFilterOnlyTiFlashWriteNode; batch_cop_tasks = buildBatchCopTasks(remote_table_ranges, label_filter); @@ -201,6 +202,7 @@ DM::Remote::RNReadTaskPtr StorageDisaggregated::buildReadTask( // TODO } + scan_context->num_segments = output_seg_tasks.size(); return DM::Remote::RNReadTask::create(output_seg_tasks); }