diff --git a/contrib/tipb b/contrib/tipb index 9bb28c43bbf..f85f989ac0d 160000 --- a/contrib/tipb +++ b/contrib/tipb @@ -1 +1 @@ -Subproject commit 9bb28c43bbfc33dce85b33fd1b5771bad163a060 +Subproject commit f85f989ac0d73c53f36369f9f2d0707590e3b917 diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 47e60a810f7..73317911a93 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -1187,7 +1187,7 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max size_t total_local_region_num = mvcc_query_info->regions_query_info.size(); if (total_local_region_num == 0) return; - mvcc_query_info->scan_context->total_local_region_num = total_local_region_num; + mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num); const auto table_query_infos = generateSelectQueryInfos(); bool has_multiple_partitions = table_query_infos.size() > 1; // MultiPartitionStreamPool will be disabled in no partition mode or single-partition case @@ -1253,7 +1253,7 @@ void DAGStorageInterpreter::buildLocalExec( size_t total_local_region_num = mvcc_query_info->regions_query_info.size(); if (total_local_region_num == 0) return; - mvcc_query_info->scan_context->total_local_region_num = total_local_region_num; + mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num); const auto table_query_infos = generateSelectQueryInfos(); bool has_multiple_partitions = table_query_infos.size() > 1; ConcatBuilderPool builder_pool{max_streams}; diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp index a1fcab3f206..9448bd8477b 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp @@ -52,8 +52,8 @@ bool equalSummaries(const ExecutionSummary & left, const ExecutionSummary & righ (left.num_iterations == right.num_iterations) && // (left.num_produced_rows == right.num_produced_rows) && // (left.time_processed_ns == right.time_processed_ns) && // - (left.scan_context->total_dmfile_scanned_rows == right.scan_context->total_dmfile_scanned_rows) && // - (left.scan_context->total_dmfile_skipped_rows == right.scan_context->total_dmfile_skipped_rows); + (left.scan_context->dmfile_data_scanned_rows == right.scan_context->dmfile_data_scanned_rows) && // + (left.scan_context->dmfile_data_skipped_rows == right.scan_context->dmfile_data_skipped_rows); } struct MockWriter @@ -72,10 +72,12 @@ struct MockWriter summary.concurrency = 1; summary.scan_context = std::make_unique(); - summary.scan_context->total_dmfile_scanned_packs = 1; - summary.scan_context->total_dmfile_skipped_packs = 2; - summary.scan_context->total_dmfile_scanned_rows = 8000; - summary.scan_context->total_dmfile_skipped_rows = 15000; + summary.scan_context->dmfile_data_scanned_rows = 8000; + summary.scan_context->dmfile_data_skipped_rows = 15000; + summary.scan_context->dmfile_mvcc_scanned_rows = 8000; + summary.scan_context->dmfile_mvcc_skipped_rows = 15000; + summary.scan_context->dmfile_lm_filter_scanned_rows = 8000; + summary.scan_context->dmfile_lm_filter_skipped_rows = 15000; summary.scan_context->total_dmfile_rough_set_index_check_time_ns = 10; summary.scan_context->total_dmfile_read_time_ns = 200; summary.scan_context->create_snapshot_time_ns = 5; diff --git a/dbms/src/Flash/Statistics/TableScanImpl.cpp b/dbms/src/Flash/Statistics/TableScanImpl.cpp index 7316164a686..5a601e8ef0a 100644 --- a/dbms/src/Flash/Statistics/TableScanImpl.cpp +++ b/dbms/src/Flash/Statistics/TableScanImpl.cpp @@ -114,6 +114,15 @@ void TableScanStatistics::collectExtraRuntimeDetail() }); break; } + + if (auto it = dag_context.scan_context_map.find(executor_id); it != dag_context.scan_context_map.end()) + { + it->second->setStreamCost( + std::max(local_table_scan_detail.min_stream_cost_ns, 0.0), + std::max(local_table_scan_detail.max_stream_cost_ns, 0.0), + std::max(remote_table_scan_detail.min_stream_cost_ns, 0.0), + std::max(remote_table_scan_detail.max_stream_cost_ns, 0.0)); + } } TableScanStatistics::TableScanStatistics(const tipb::Executor * executor, DAGContext & dag_context_) diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 25c61a4984c..df6b1367583 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -949,6 +949,8 @@ int Server::main(const std::vector & /*args*/) TiFlashErrorRegistry::instance(); // This invocation is for initializing + DM::ScanContext::initCurrentInstanceId(config(), log); + const auto disaggregated_mode = getDisaggregatedMode(config()); const auto use_autoscaler = useAutoScaler(config()); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h index 762b165d8eb..089b32ddb85 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -177,6 +178,8 @@ class ColumnFileReader /// Create a new reader from current reader with different columns to read. virtual ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & col_defs) = 0; + + virtual void setReadTag(ReadTag /*read_tag*/) {} }; std::pair copyColumnsData( diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index 889cc62e75a..c4f5f328bd8 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -179,6 +179,7 @@ void ColumnFileBigReader::initStream() DMFileBlockInputStreamBuilder builder(dm_context.global_context); file_stream = builder.setTracingID(dm_context.tracing_id) + .setReadTag(read_tag) .build( column_file.getFile(), *col_defs, @@ -394,5 +395,12 @@ ColumnFileReaderPtr ColumnFileBigReader::createNewReader(const ColumnDefinesPtr return std::make_shared(dm_context, column_file, new_col_defs); } +void ColumnFileBigReader::setReadTag(ReadTag read_tag_) +{ + // `read_tag` should be set before `file_stream` is initialized. + RUNTIME_CHECK(file_stream == nullptr); + read_tag = read_tag_; +} + } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h index 6b44764d0c4..7e30d30a2af 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h @@ -139,6 +139,8 @@ class ColumnFileBigReader : public ColumnFileReader Block cur_block; Columns cur_block_data; // The references to columns in cur_block, for faster access. + ReadTag read_tag; + private: void initStream(); std::pair readRowsRepeatedly( @@ -192,6 +194,8 @@ class ColumnFileBigReader : public ColumnFileReader size_t skipNextBlock() override; ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs) override; + + void setReadTag(ReadTag read_tag_) override; }; } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp index 886ce232740..fc9315b36cd 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp @@ -84,7 +84,8 @@ ColumnFileSetReader::ColumnFileSetReader( const DMContext & context_, const ColumnFileSetSnapshotPtr & snapshot_, const ColumnDefinesPtr & col_defs_, - const RowKeyRange & segment_range_) + const RowKeyRange & segment_range_, + ReadTag read_tag_) : context(context_) , snapshot(snapshot_) , col_defs(col_defs_) @@ -98,10 +99,11 @@ ColumnFileSetReader::ColumnFileSetReader( column_file_rows.push_back(f->getRows()); column_file_rows_end.push_back(total_rows); column_file_readers.push_back(f->getReader(context, snapshot->getDataProvider(), col_defs)); + column_file_readers.back()->setReadTag(read_tag_); } } -ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesPtr & new_col_defs) +ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag) { auto * new_reader = new ColumnFileSetReader(context); new_reader->snapshot = snapshot; @@ -111,7 +113,10 @@ ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesP new_reader->column_file_rows_end = column_file_rows_end; for (auto & fr : column_file_readers) + { new_reader->column_file_readers.push_back(fr->createNewReader(new_col_defs)); + new_reader->column_file_readers.back()->setReadTag(read_tag); + } return std::shared_ptr(new_reader); } diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h index 551e4d312b8..ce18da5c407 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h @@ -57,11 +57,12 @@ class ColumnFileSetReader const DMContext & context_, const ColumnFileSetSnapshotPtr & snapshot_, const ColumnDefinesPtr & col_defs_, - const RowKeyRange & segment_range_); + const RowKeyRange & segment_range_, + ReadTag read_tag_); // If we need to read columns besides pk and version, a ColumnFileSetReader can NOT be used more than once. // This method create a new reader based on the current one. It will reuse some caches in the current reader. - ColumnFileSetReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs); + ColumnFileSetReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag); // Use for DeltaMergeBlockInputStream to read rows from MemTableSet to do full compaction with other layer. // This method will check whether offset and limit are valid. It only return those valid rows. @@ -104,8 +105,9 @@ class ColumnFileSetInputStream : public SkippableBlockInputStream const DMContext & context_, const ColumnFileSetSnapshotPtr & delta_snap_, const ColumnDefinesPtr & col_defs_, - const RowKeyRange & segment_range_) - : reader(context_, delta_snap_, col_defs_, segment_range_) + const RowKeyRange & segment_range_, + ReadTag read_tag_) + : reader(context_, delta_snap_, col_defs_, segment_range_, read_tag_) , column_files(reader.snapshot->getColumnFiles()) , column_files_count(column_files.size()) {} diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h index a68e02c5628..526af6f682b 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h @@ -446,11 +446,12 @@ class DeltaValueReader const DMContext & context_, const DeltaSnapshotPtr & delta_snap_, const ColumnDefinesPtr & col_defs_, - const RowKeyRange & segment_range_); + const RowKeyRange & segment_range_, + ReadTag read_tag_); // If we need to read columns besides pk and version, a DeltaValueReader can NOT be used more than once. // This method create a new reader based on then current one. It will reuse some caches in the current reader. - DeltaValueReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs); + DeltaValueReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag); void setDeltaIndex(const DeltaIndexCompactedPtr & delta_index_) { compacted_delta_index = delta_index_; } @@ -492,9 +493,15 @@ class DeltaValueInputStream : public SkippableBlockInputStream const DMContext & context_, const DeltaSnapshotPtr & delta_snap_, const ColumnDefinesPtr & col_defs_, - const RowKeyRange & segment_range_) - : mem_table_input_stream(context_, delta_snap_->getMemTableSetSnapshot(), col_defs_, segment_range_) - , persisted_files_input_stream(context_, delta_snap_->getPersistedFileSetSnapshot(), col_defs_, segment_range_) + const RowKeyRange & segment_range_, + ReadTag read_tag_) + : mem_table_input_stream(context_, delta_snap_->getMemTableSetSnapshot(), col_defs_, segment_range_, read_tag_) + , persisted_files_input_stream( + context_, + delta_snap_->getPersistedFileSetSnapshot(), + col_defs_, + segment_range_, + read_tag_) {} String getName() const override { return "DeltaValue"; } diff --git a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp index 2c0b5bc7d1e..30543398143 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp @@ -71,29 +71,32 @@ DeltaValueReader::DeltaValueReader( const DMContext & context, const DeltaSnapshotPtr & delta_snap_, const ColumnDefinesPtr & col_defs_, - const RowKeyRange & segment_range_) + const RowKeyRange & segment_range_, + ReadTag read_tag_) : delta_snap(delta_snap_) , mem_table_reader(std::make_shared( context, delta_snap_->getMemTableSetSnapshot(), col_defs_, - segment_range_)) + segment_range_, + read_tag_)) , persisted_files_reader(std::make_shared( context, delta_snap_->getPersistedFileSetSnapshot(), col_defs_, - segment_range_)) + segment_range_, + read_tag_)) , col_defs(col_defs_) , segment_range(segment_range_) {} -DeltaValueReaderPtr DeltaValueReader::createNewReader(const ColumnDefinesPtr & new_col_defs) +DeltaValueReaderPtr DeltaValueReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag) { auto * new_reader = new DeltaValueReader(); new_reader->delta_snap = delta_snap; new_reader->compacted_delta_index = compacted_delta_index; - new_reader->persisted_files_reader = persisted_files_reader->createNewReader(new_col_defs); - new_reader->mem_table_reader = mem_table_reader->createNewReader(new_col_defs); + new_reader->persisted_files_reader = persisted_files_reader->createNewReader(new_col_defs, read_tag); + new_reader->mem_table_reader = mem_table_reader->createNewReader(new_col_defs, read_tag); new_reader->col_defs = new_col_defs; new_reader->segment_range = segment_range; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp index 9b314622efb..ca58a245359 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -98,7 +98,8 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build( read_one_pack_every_time, tracing_id, max_sharing_column_count, - scan_context); + scan_context, + read_tag); return std::make_shared(std::move(reader), max_sharing_column_count > 0); } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h index fe00b1d5528..094914397b3 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h @@ -149,6 +149,12 @@ class DMFileBlockInputStreamBuilder return *this; } + DMFileBlockInputStreamBuilder & setReadTag(ReadTag read_tag_) + { + read_tag = read_tag_; + return *this; + } + private: // These methods are called by the ctor @@ -194,6 +200,7 @@ class DMFileBlockInputStreamBuilder size_t max_sharing_column_bytes_for_all = 0; size_t max_sharing_column_count = 0; String tracing_id; + ReadTag read_tag = ReadTag::Internal; }; /** diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 756b3995afc..e9903d103c7 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -66,7 +66,8 @@ DMFileReader::DMFileReader( bool read_one_pack_every_time_, const String & tracing_id_, size_t max_sharing_column_count, - const ScanContextPtr & scan_context_) + const ScanContextPtr & scan_context_, + const ReadTag read_tag_) : dmfile(dmfile_) , read_columns(read_columns_) , is_common_handle(is_common_handle_) @@ -81,6 +82,7 @@ DMFileReader::DMFileReader( , mark_cache(mark_cache_) , column_cache(column_cache_) , scan_context(scan_context_) + , read_tag(read_tag_) , rows_threshold_per_read(rows_threshold_per_read_) , file_provider(file_provider_) , log(Logger::get(tracing_id_)) @@ -130,8 +132,7 @@ bool DMFileReader::getSkippedRows(size_t & skip_rows) for (; next_pack_id < use_packs.size() && !use_packs[next_pack_id]; ++next_pack_id) { skip_rows += pack_stats[next_pack_id].rows; - scan_context->total_dmfile_skipped_packs += 1; - scan_context->total_dmfile_skipped_rows += pack_stats[next_pack_id].rows; + addSkippedRows(pack_stats[next_pack_id].rows); } next_row_offset += skip_rows; // return false if it is the end of stream. @@ -165,10 +166,9 @@ size_t DMFileReader::skipNextBlock() break; read_rows += pack_stats[next_pack_id].rows; - scan_context->total_dmfile_skipped_packs += 1; } - scan_context->total_dmfile_skipped_rows += read_rows; + addSkippedRows(read_rows); next_row_offset += read_rows; // When we read dmfile, if the previous pack is not read, @@ -357,8 +357,7 @@ Block DMFileReader::read() size_t read_packs = next_pack_id - start_pack_id; - scan_context->total_dmfile_scanned_packs += read_packs; - scan_context->total_dmfile_scanned_rows += read_rows; + addScannedRows(read_rows); // TODO: this will need better algorithm: we should separate those packs which can and can not do clean read. bool do_clean_read_on_normal_mode @@ -639,4 +638,40 @@ bool DMFileReader::getCachedPacks( col_data_cache->del(col_id, next_pack_id); return found; } + +void DMFileReader::addScannedRows(UInt64 rows) +{ + switch (read_tag) + { + case ReadTag::Query: + scan_context->dmfile_data_scanned_rows += rows; + break; + case ReadTag::MVCC: + scan_context->dmfile_mvcc_scanned_rows += rows; + break; + case ReadTag::LMFilter: + scan_context->dmfile_lm_filter_scanned_rows += rows; + break; + default: + break; + } +} + +void DMFileReader::addSkippedRows(UInt64 rows) +{ + switch (read_tag) + { + case ReadTag::Query: + scan_context->dmfile_data_skipped_rows += rows; + break; + case ReadTag::MVCC: + scan_context->dmfile_mvcc_skipped_rows += rows; + break; + case ReadTag::LMFilter: + scan_context->dmfile_lm_filter_skipped_rows += rows; + break; + default: + break; + } +} } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index 5b0cf1eb4bb..34bed3eaea5 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -62,7 +62,8 @@ class DMFileReader bool read_one_pack_every_time_, const String & tracing_id_, size_t max_sharing_column_count, - const ScanContextPtr & scan_context_); + const ScanContextPtr & scan_context_, + ReadTag read_tag_); Block getHeader() const { return toEmptyBlock(read_columns); } @@ -112,6 +113,9 @@ class DMFileReader size_t skip_packs); bool getCachedPacks(ColId col_id, size_t start_pack_id, size_t pack_count, size_t read_rows, ColumnPtr & col) const; + void addScannedRows(UInt64 rows); + void addSkippedRows(UInt64 rows); + DMFilePtr dmfile; ColumnDefines read_columns; ColumnReadStreamMap column_streams; @@ -144,6 +148,7 @@ class DMFileReader ColumnCachePtr column_cache; const ScanContextPtr scan_context; + const ReadTag read_tag; const size_t rows_threshold_per_read; diff --git a/dbms/src/Storages/DeltaMerge/ReadMode.h b/dbms/src/Storages/DeltaMerge/ReadMode.h index 8ec6c4654e2..ec3623d969d 100644 --- a/dbms/src/Storages/DeltaMerge/ReadMode.h +++ b/dbms/src/Storages/DeltaMerge/ReadMode.h @@ -39,4 +39,11 @@ enum class ReadMode Bitmap, }; +enum class ReadTag +{ + Internal, // Read columns required by some internal tasks. + Query, // Read columns required by queries. + MVCC, // Read columns to build MVCC bitmap. + LMFilter, // Read columns required by late-materialization filter. +}; } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ScanContext.cpp b/dbms/src/Storages/DeltaMerge/ScanContext.cpp index fce3291c882..723282f70d0 100644 --- a/dbms/src/Storages/DeltaMerge/ScanContext.cpp +++ b/dbms/src/Storages/DeltaMerge/ScanContext.cpp @@ -12,26 +12,100 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" #include #pragma GCC diagnostic pop +#include #include #include namespace DB::DM { +void ScanContext::setRegionNumOfCurrentInstance(uint64_t region_num) +{ + region_num_of_instance[current_instance_id] = region_num; + // total_local_region_num may be updated later if some regions are not available in current instance. + total_local_region_num = region_num; +} +void ScanContext::setStreamCost( + uint64_t local_min_ns, + uint64_t local_max_ns, + uint64_t remote_min_ns, + uint64_t remote_max_ns) +{ + local_min_stream_cost_ns = local_min_ns; + local_max_stream_cost_ns = local_max_ns; + remote_min_stream_cost_ns = remote_min_ns; + remote_max_stream_cost_ns = remote_max_ns; +} + +void ScanContext::serializeRegionNumOfInstance(tipb::TiFlashScanContext & proto) const +{ + for (const auto & [id, num] : region_num_of_instance) + { + auto * p = proto.add_region_num_of_instance(); + p->set_instance_id(id); + p->set_region_num(num); + } +} + +void ScanContext::deserializeRegionNumberOfInstance(const tipb::TiFlashScanContext & proto) +{ + for (const auto & t : proto.region_num_of_instance()) + { + region_num_of_instance[t.instance_id()] = t.region_num(); + } +} + +void ScanContext::mergeRegionNumberOfInstance(const ScanContext & other) +{ + for (const auto & [id, num] : other.region_num_of_instance) + { + region_num_of_instance[id] += num; + } +} + +void ScanContext::mergeRegionNumberOfInstance(const tipb::TiFlashScanContext & other) +{ + for (const auto & t : other.region_num_of_instance()) + { + region_num_of_instance[t.instance_id()] += t.region_num(); + } +} + +void ScanContext::mergeStreamCost( + uint64_t local_min_ns, + uint64_t local_max_ns, + uint64_t remote_min_ns, + uint64_t remote_max_ns) +{ + if (local_min_stream_cost_ns == 0 || local_min_ns < local_min_stream_cost_ns) + local_min_stream_cost_ns = local_min_ns; + if (local_max_ns > local_max_stream_cost_ns) + local_max_stream_cost_ns = local_max_ns; + if (remote_min_stream_cost_ns == 0 || remote_min_ns < remote_min_stream_cost_ns) + remote_min_stream_cost_ns = remote_min_ns; + if (remote_max_ns > remote_max_stream_cost_ns) + remote_max_stream_cost_ns = remote_max_ns; +} + String ScanContext::toJson() const { static constexpr double NS_TO_MS_SCALE = 1'000'000.0; Poco::JSON::Object::Ptr json = new Poco::JSON::Object(); - json->set("dmfile_scan_rows", total_dmfile_scanned_rows.load()); - json->set("dmfile_skip_rows", total_dmfile_skipped_rows.load()); + json->set("dmfile_data_scanned_rows", dmfile_data_scanned_rows.load()); + json->set("dmfile_data_skipped_rows", dmfile_data_skipped_rows.load()); + json->set("dmfile_mvcc_scanned_rows", dmfile_mvcc_scanned_rows.load()); + json->set("dmfile_mvcc_skipped_rows", dmfile_mvcc_skipped_rows.load()); + json->set("dmfile_lm_filter_scanned_rows", dmfile_lm_filter_scanned_rows.load()); + json->set("dmfile_lm_filter_skipped_rows", dmfile_lm_filter_skipped_rows.load()); json->set("dmfile_read_time", fmt::format("{:.3f}ms", total_dmfile_read_time_ns.load() / NS_TO_MS_SCALE)); json->set("num_remote_region", total_remote_region_num.load()); - json->set("num_local_region", total_remote_region_num.load()); + json->set("num_local_region", total_local_region_num.load()); json->set("num_stale_read", num_stale_read.load()); json->set("read_bytes", user_read_bytes.load()); @@ -60,8 +134,104 @@ String ScanContext::toJson() const json->set("build_stream_time", fmt::format("{:.3f}ms", build_inputstream_time_ns.load() / NS_TO_MS_SCALE)); json->set("build_bitmap_time", fmt::format("{:.3f}ms", build_bitmap_time_ns.load() / NS_TO_MS_SCALE)); + json->set("local_min_stream_cost_ms", fmt::format("{:.3f}ms", local_min_stream_cost_ns / NS_TO_MS_SCALE)); + json->set("local_max_stream_cost_ms", fmt::format("{:.3f}ms", local_max_stream_cost_ns / NS_TO_MS_SCALE)); + json->set("remote_min_stream_cost_ms", fmt::format("{:.3f}ms", remote_min_stream_cost_ns / NS_TO_MS_SCALE)); + json->set("remote_max_stream_cost_ms", fmt::format("{:.3f}ms", remote_max_stream_cost_ns / NS_TO_MS_SCALE)); + + auto to_json_object = [](const String & id, uint64_t num) { + Poco::JSON::Object::Ptr json = new Poco::JSON::Object(); + json->set("instance_id", id); + json->set("region_num", num); + return json; + }; + auto to_json_array = [&to_json_object](const RegionNumOfInstance & region_num_of_instance) { + Poco::JSON::Array::Ptr arr = new Poco::JSON::Array(); + for (const auto & [id, num] : region_num_of_instance) + { + arr->add(to_json_object(id, num)); + } + return arr; + }; + json->set("region_num_of_instance", to_json_array(region_num_of_instance)); + std::stringstream buf; json->stringify(buf); return buf.str(); } + +String getHostName(const LoggerPtr & log) +{ + char hostname[1024]; + if (::gethostname(hostname, sizeof(hostname)) != 0) + { + LOG_ERROR(log, "gethostname failed: {}", errno); + return {}; + } + return hostname; +} + +bool isLocalAddress(const String & address) +{ + static const std::vector local_list{// ivp4 + "0.0.0.0", + "127.", + "localhost", + // ipv6 + "0:0:0:0:0:0:0", + "[0:0:0:0:0:0:0", + ":", + "[:"}; + for (const auto & local_prefix : local_list) + { + if (address.starts_with(local_prefix)) + { + return true; + } + } + return false; +} + +String getPort(const String & address) +{ + auto pos = address.find_last_of(':'); + if (pos == std::string::npos) + { + return {}; + } + return address.substr(pos + 1); +} + +String getCurrentInstanceId(const String & flash_server_addr, const LoggerPtr & log) +{ + if (!isLocalAddress(flash_server_addr)) + { + return flash_server_addr; + } + + auto hostname = getHostName(log); + if (hostname.empty()) + { + return Poco::UUIDGenerator().createRandom().toString(); + } + + auto port = getPort(flash_server_addr); + if (!port.empty()) + { + return hostname + ":" + port; + } + else + { + auto uuid = Poco::UUIDGenerator().createRandom().toString(); + // hostname + uuid may too long, so cut the uuid. + return hostname + "-" + uuid.substr(0, std::min(8, uuid.size())); + } +} + +void ScanContext::initCurrentInstanceId(Poco::Util::AbstractConfiguration & config, const LoggerPtr & log) +{ + auto flash_server_addr = config.getString("flash.service_addr", "0.0.0.0:3930"); + current_instance_id = getCurrentInstanceId(flash_server_addr, log); + LOG_INFO(log, "flash_server_addr={}, current_instance_id={}", flash_server_addr, current_instance_id); +} } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ScanContext.h b/dbms/src/Storages/DeltaMerge/ScanContext.h index c9909fbb352..4c2354093de 100644 --- a/dbms/src/Storages/DeltaMerge/ScanContext.h +++ b/dbms/src/Storages/DeltaMerge/ScanContext.h @@ -14,6 +14,8 @@ #pragma once +#include +#include #include #include #include @@ -32,17 +34,12 @@ namespace DB::DM class ScanContext { public: - /// sum of scanned packs in dmfiles(both stable and ColumnFileBig) among this query - std::atomic total_dmfile_scanned_packs{0}; - - /// sum of skipped packs in dmfiles(both stable and ColumnFileBig) among this query - std::atomic total_dmfile_skipped_packs{0}; - - /// sum of scanned rows in dmfiles(both stable and ColumnFileBig) among this query - std::atomic total_dmfile_scanned_rows{0}; - - /// sum of skipped rows in dmfiles(both stable and ColumnFileBig) among this query - std::atomic total_dmfile_skipped_rows{0}; + std::atomic dmfile_data_scanned_rows{0}; + std::atomic dmfile_data_skipped_rows{0}; + std::atomic dmfile_mvcc_scanned_rows{0}; + std::atomic dmfile_mvcc_skipped_rows{0}; + std::atomic dmfile_lm_filter_scanned_rows{0}; + std::atomic dmfile_lm_filter_skipped_rows{0}; std::atomic total_dmfile_rough_set_index_check_time_ns{0}; std::atomic total_dmfile_read_time_ns{0}; @@ -95,49 +92,98 @@ class ScanContext void deserialize(const tipb::TiFlashScanContext & tiflash_scan_context_pb) { - total_dmfile_scanned_packs = tiflash_scan_context_pb.total_dmfile_scanned_packs(); - total_dmfile_skipped_packs = tiflash_scan_context_pb.total_dmfile_skipped_packs(); - total_dmfile_scanned_rows = tiflash_scan_context_pb.total_dmfile_scanned_rows(); - total_dmfile_skipped_rows = tiflash_scan_context_pb.total_dmfile_skipped_rows(); + dmfile_data_scanned_rows = tiflash_scan_context_pb.dmfile_data_scanned_rows(); + dmfile_data_skipped_rows = tiflash_scan_context_pb.dmfile_data_skipped_rows(); + dmfile_mvcc_scanned_rows = tiflash_scan_context_pb.dmfile_mvcc_scanned_rows(); + dmfile_mvcc_skipped_rows = tiflash_scan_context_pb.dmfile_mvcc_skipped_rows(); + dmfile_lm_filter_scanned_rows = tiflash_scan_context_pb.dmfile_lm_filter_scanned_rows(); + dmfile_lm_filter_skipped_rows = tiflash_scan_context_pb.dmfile_lm_filter_skipped_rows(); total_dmfile_rough_set_index_check_time_ns - = tiflash_scan_context_pb.total_dmfile_rough_set_index_check_time_ms() * 1000000; - total_dmfile_read_time_ns = tiflash_scan_context_pb.total_dmfile_read_time_ms() * 1000000; - create_snapshot_time_ns = tiflash_scan_context_pb.total_create_snapshot_time_ms() * 1000000; - total_remote_region_num = tiflash_scan_context_pb.total_remote_region_num(); - total_local_region_num = tiflash_scan_context_pb.total_local_region_num(); - user_read_bytes = tiflash_scan_context_pb.total_user_read_bytes(); - learner_read_ns = tiflash_scan_context_pb.total_learner_read_ms() * 1000000; - disagg_read_cache_hit_size = tiflash_scan_context_pb.total_disagg_read_cache_hit_size(); - disagg_read_cache_miss_size = tiflash_scan_context_pb.total_disagg_read_cache_miss_size(); + = tiflash_scan_context_pb.dmfile_rough_set_index_check_time_ms() * 1000000; + total_dmfile_read_time_ns = tiflash_scan_context_pb.dmfile_read_time_ms() * 1000000; + create_snapshot_time_ns = tiflash_scan_context_pb.create_snapshot_time_ms() * 1000000; + total_remote_region_num = tiflash_scan_context_pb.remote_region_num(); + total_local_region_num = tiflash_scan_context_pb.local_region_num(); + user_read_bytes = tiflash_scan_context_pb.user_read_bytes(); + learner_read_ns = tiflash_scan_context_pb.learner_read_ms() * 1000000; + disagg_read_cache_hit_size = tiflash_scan_context_pb.disagg_read_cache_hit_size(); + disagg_read_cache_miss_size = tiflash_scan_context_pb.disagg_read_cache_miss_size(); + + num_segments = tiflash_scan_context_pb.num_segments(); + num_read_tasks = tiflash_scan_context_pb.num_read_tasks(); + + delta_rows = tiflash_scan_context_pb.delta_rows(); + delta_bytes = tiflash_scan_context_pb.delta_bytes(); + + mvcc_input_rows = tiflash_scan_context_pb.mvcc_input_rows(); + mvcc_input_bytes = tiflash_scan_context_pb.mvcc_input_bytes(); + mvcc_output_rows = tiflash_scan_context_pb.mvcc_output_rows(); + late_materialization_skip_rows = tiflash_scan_context_pb.late_materialization_skip_rows(); + build_bitmap_time_ns = tiflash_scan_context_pb.build_bitmap_time_ms() * 1000000; + num_stale_read = tiflash_scan_context_pb.num_stale_read(); + build_inputstream_time_ns = tiflash_scan_context_pb.build_inputstream_time_ms() * 1000000; + + setStreamCost( + tiflash_scan_context_pb.local_min_stream_cost_ms() * 1000000, + tiflash_scan_context_pb.local_max_stream_cost_ms() * 1000000, + tiflash_scan_context_pb.remote_min_stream_cost_ms() * 1000000, + tiflash_scan_context_pb.remote_max_stream_cost_ms() * 1000000); + + deserializeRegionNumberOfInstance(tiflash_scan_context_pb); } tipb::TiFlashScanContext serialize() { tipb::TiFlashScanContext tiflash_scan_context_pb{}; - tiflash_scan_context_pb.set_total_dmfile_scanned_packs(total_dmfile_scanned_packs); - tiflash_scan_context_pb.set_total_dmfile_skipped_packs(total_dmfile_skipped_packs); - tiflash_scan_context_pb.set_total_dmfile_scanned_rows(total_dmfile_scanned_rows); - tiflash_scan_context_pb.set_total_dmfile_skipped_rows(total_dmfile_skipped_rows); - tiflash_scan_context_pb.set_total_dmfile_rough_set_index_check_time_ms( + tiflash_scan_context_pb.set_dmfile_data_scanned_rows(dmfile_data_scanned_rows); + tiflash_scan_context_pb.set_dmfile_data_skipped_rows(dmfile_data_skipped_rows); + tiflash_scan_context_pb.set_dmfile_mvcc_scanned_rows(dmfile_mvcc_scanned_rows); + tiflash_scan_context_pb.set_dmfile_mvcc_skipped_rows(dmfile_mvcc_skipped_rows); + tiflash_scan_context_pb.set_dmfile_lm_filter_scanned_rows(dmfile_lm_filter_scanned_rows); + tiflash_scan_context_pb.set_dmfile_lm_filter_skipped_rows(dmfile_lm_filter_skipped_rows); + tiflash_scan_context_pb.set_dmfile_rough_set_index_check_time_ms( total_dmfile_rough_set_index_check_time_ns / 1000000); - tiflash_scan_context_pb.set_total_dmfile_read_time_ms(total_dmfile_read_time_ns / 1000000); - tiflash_scan_context_pb.set_total_create_snapshot_time_ms(create_snapshot_time_ns / 1000000); - tiflash_scan_context_pb.set_total_remote_region_num(total_remote_region_num); - tiflash_scan_context_pb.set_total_local_region_num(total_local_region_num); - tiflash_scan_context_pb.set_total_user_read_bytes(user_read_bytes); - tiflash_scan_context_pb.set_total_learner_read_ms(learner_read_ns / 1000000); - tiflash_scan_context_pb.set_total_disagg_read_cache_hit_size(disagg_read_cache_hit_size); - tiflash_scan_context_pb.set_total_disagg_read_cache_miss_size(disagg_read_cache_miss_size); + tiflash_scan_context_pb.set_dmfile_read_time_ms(total_dmfile_read_time_ns / 1000000); + tiflash_scan_context_pb.set_create_snapshot_time_ms(create_snapshot_time_ns / 1000000); + tiflash_scan_context_pb.set_remote_region_num(total_remote_region_num); + tiflash_scan_context_pb.set_local_region_num(total_local_region_num); + tiflash_scan_context_pb.set_user_read_bytes(user_read_bytes); + tiflash_scan_context_pb.set_learner_read_ms(learner_read_ns / 1000000); + tiflash_scan_context_pb.set_disagg_read_cache_hit_size(disagg_read_cache_hit_size); + tiflash_scan_context_pb.set_disagg_read_cache_miss_size(disagg_read_cache_miss_size); + + tiflash_scan_context_pb.set_num_segments(num_segments); + tiflash_scan_context_pb.set_num_read_tasks(num_read_tasks); + + tiflash_scan_context_pb.set_delta_rows(delta_rows); + tiflash_scan_context_pb.set_delta_bytes(delta_bytes); + + tiflash_scan_context_pb.set_mvcc_input_rows(mvcc_input_rows); + tiflash_scan_context_pb.set_mvcc_input_bytes(mvcc_input_bytes); + tiflash_scan_context_pb.set_mvcc_output_rows(mvcc_output_rows); + tiflash_scan_context_pb.set_late_materialization_skip_rows(late_materialization_skip_rows); + tiflash_scan_context_pb.set_build_bitmap_time_ms(build_bitmap_time_ns / 1000000); + tiflash_scan_context_pb.set_num_stale_read(num_stale_read); + tiflash_scan_context_pb.set_build_inputstream_time_ms(build_inputstream_time_ns / 1000000); + + tiflash_scan_context_pb.set_local_min_stream_cost_ms(local_min_stream_cost_ns / 1000000); + tiflash_scan_context_pb.set_local_max_stream_cost_ms(local_max_stream_cost_ns / 1000000); + tiflash_scan_context_pb.set_remote_min_stream_cost_ms(remote_min_stream_cost_ns / 1000000); + tiflash_scan_context_pb.set_remote_max_stream_cost_ms(remote_max_stream_cost_ns / 1000000); + + serializeRegionNumOfInstance(tiflash_scan_context_pb); return tiflash_scan_context_pb; } void merge(const ScanContext & other) { - total_dmfile_scanned_packs += other.total_dmfile_scanned_packs; - total_dmfile_skipped_packs += other.total_dmfile_skipped_packs; - total_dmfile_scanned_rows += other.total_dmfile_scanned_rows; - total_dmfile_skipped_rows += other.total_dmfile_skipped_rows; + dmfile_data_scanned_rows += other.dmfile_data_scanned_rows; + dmfile_data_skipped_rows += other.dmfile_data_skipped_rows; + dmfile_mvcc_scanned_rows += other.dmfile_mvcc_scanned_rows; + dmfile_mvcc_skipped_rows += other.dmfile_mvcc_skipped_rows; + dmfile_lm_filter_scanned_rows += other.dmfile_lm_filter_scanned_rows; + dmfile_lm_filter_skipped_rows += other.dmfile_lm_filter_skipped_rows; total_dmfile_rough_set_index_check_time_ns += other.total_dmfile_rough_set_index_check_time_ns; total_dmfile_read_time_ns += other.total_dmfile_read_time_ns; @@ -163,26 +209,87 @@ class ScanContext create_snapshot_time_ns += other.create_snapshot_time_ns; build_inputstream_time_ns += other.build_inputstream_time_ns; build_bitmap_time_ns += other.build_bitmap_time_ns; + + num_stale_read += other.num_stale_read; + + mergeStreamCost( + other.local_min_stream_cost_ns, + other.local_max_stream_cost_ns, + other.remote_min_stream_cost_ns, + other.remote_max_stream_cost_ns); + + mergeRegionNumberOfInstance(other); } void merge(const tipb::TiFlashScanContext & other) { - total_dmfile_scanned_packs += other.total_dmfile_scanned_packs(); - total_dmfile_skipped_packs += other.total_dmfile_skipped_packs(); - total_dmfile_scanned_rows += other.total_dmfile_scanned_rows(); - total_dmfile_skipped_rows += other.total_dmfile_skipped_rows(); - total_dmfile_rough_set_index_check_time_ns += other.total_dmfile_rough_set_index_check_time_ms() * 1000000; - total_dmfile_read_time_ns += other.total_dmfile_read_time_ms() * 1000000; - create_snapshot_time_ns += other.total_create_snapshot_time_ms() * 1000000; - total_local_region_num += other.total_local_region_num(); - total_remote_region_num += other.total_remote_region_num(); - user_read_bytes += other.total_user_read_bytes(); - learner_read_ns += other.total_learner_read_ms() * 1000000; - disagg_read_cache_hit_size += other.total_disagg_read_cache_hit_size(); - disagg_read_cache_miss_size += other.total_disagg_read_cache_miss_size(); + dmfile_data_scanned_rows += other.dmfile_data_scanned_rows(); + dmfile_data_skipped_rows += other.dmfile_data_skipped_rows(); + dmfile_mvcc_scanned_rows += other.dmfile_mvcc_scanned_rows(); + dmfile_mvcc_skipped_rows += other.dmfile_mvcc_skipped_rows(); + dmfile_lm_filter_scanned_rows += other.dmfile_lm_filter_scanned_rows(); + dmfile_lm_filter_skipped_rows += other.dmfile_lm_filter_skipped_rows(); + total_dmfile_rough_set_index_check_time_ns += other.dmfile_rough_set_index_check_time_ms() * 1000000; + total_dmfile_read_time_ns += other.dmfile_read_time_ms() * 1000000; + create_snapshot_time_ns += other.create_snapshot_time_ms() * 1000000; + total_local_region_num += other.local_region_num(); + total_remote_region_num += other.remote_region_num(); + user_read_bytes += other.user_read_bytes(); + learner_read_ns += other.learner_read_ms() * 1000000; + disagg_read_cache_hit_size += other.disagg_read_cache_hit_size(); + disagg_read_cache_miss_size += other.disagg_read_cache_miss_size(); + + num_segments += other.num_segments(); + num_read_tasks += other.num_read_tasks(); + + delta_rows += other.delta_rows(); + delta_bytes += other.delta_bytes(); + + mvcc_input_rows += other.mvcc_input_rows(); + mvcc_input_bytes += other.mvcc_input_bytes(); + mvcc_output_rows += other.mvcc_output_rows(); + late_materialization_skip_rows += other.late_materialization_skip_rows(); + build_bitmap_time_ns += other.build_bitmap_time_ms() * 1000000; + num_stale_read += other.num_stale_read(); + build_inputstream_time_ns += other.build_inputstream_time_ms() * 1000000; + + mergeStreamCost( + other.local_min_stream_cost_ms() * 1000000, + other.local_max_stream_cost_ms() * 1000000, + other.remote_min_stream_cost_ms() * 1000000, + other.remote_max_stream_cost_ms() * 1000000); + + mergeRegionNumberOfInstance(other); } String toJson() const; + + void setRegionNumOfCurrentInstance(uint64_t region_num); + void setStreamCost(uint64_t local_min_ns, uint64_t local_max_ns, uint64_t remote_min_ns, uint64_t remote_max_ns); + + static void initCurrentInstanceId(Poco::Util::AbstractConfiguration & config, const LoggerPtr & log); + +private: + void serializeRegionNumOfInstance(tipb::TiFlashScanContext & proto) const; + void deserializeRegionNumberOfInstance(const tipb::TiFlashScanContext & proto); + void mergeRegionNumberOfInstance(const ScanContext & other); + void mergeRegionNumberOfInstance(const tipb::TiFlashScanContext & other); + void mergeStreamCost(uint64_t local_min_ns, uint64_t local_max_ns, uint64_t remote_min_ns, uint64_t remote_max_ns); + + // instance_id -> number of regions. + // `region_num_of_instance` is accessed by a single thread. + using RegionNumOfInstance = std::unordered_map; + RegionNumOfInstance region_num_of_instance; + + // These members `*_stream_cost_ns` are accessed by a single thread. + uint64_t local_min_stream_cost_ns{0}; + uint64_t local_max_stream_cost_ns{0}; + uint64_t remote_min_stream_cost_ns{0}; + uint64_t remote_max_stream_cost_ns{0}; + + // `current_instance_id` is a identification of this store. + // It only used to identify which store generated the ScanContext object. + inline static String current_instance_id; }; } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 04c6bf67e2b..5d98d776aa9 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -619,8 +619,12 @@ bool Segment::isDefinitelyEmpty(DMContext & dm_context, const SegmentSnapshotPtr auto read_ranges = RowKeyRanges{rowkey_range}; { - BlockInputStreamPtr delta_stream - = std::make_shared(dm_context, segment_snap->delta, columns_to_read, rowkey_range); + BlockInputStreamPtr delta_stream = std::make_shared( + dm_context, + segment_snap->delta, + columns_to_read, + rowkey_range, + ReadTag::Internal); delta_stream = std::make_shared>(delta_stream, read_ranges, 0); delta_stream->readPrefix(); while (true) @@ -870,7 +874,8 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal( { LOG_TRACE(segment_snap->log, "Begin segment create input stream"); - auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, read_ranges, max_version); + auto read_tag = need_row_id ? ReadTag::MVCC : ReadTag::Query; + auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, read_ranges, read_tag, max_version); RowKeyRanges real_ranges; for (const auto & read_range : read_ranges) @@ -896,10 +901,12 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal( filter, max_version, expected_block_size, - false); + false, + read_tag); } else if (useCleanRead(segment_snap, columns_to_read)) { + RUNTIME_CHECK_MSG(!need_row_id, "'need_row_id is true, should not come here'"); // No delta, let's try some optimizations. stream = segment_snap->stable->getInputStream( dm_context, @@ -908,7 +915,8 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal( filter, max_version, expected_block_size, - true); + true, + read_tag); } else { @@ -918,10 +926,11 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal( real_ranges, filter, segment_snap->stable, - read_info.getDeltaReader(), + read_info.getDeltaReader(need_row_id ? ReadTag::MVCC : ReadTag::Query), read_info.index_begin, read_info.index_end, expected_block_size, + read_tag, max_version, need_row_id); } @@ -974,7 +983,7 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport( bool reorganize_block) const { RowKeyRanges data_ranges{data_range}; - auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, data_ranges); + auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, data_ranges, ReadTag::Internal); BlockInputStreamPtr data_stream = getPlacedStream( dm_context, @@ -982,10 +991,11 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport( data_ranges, EMPTY_RS_OPERATOR, segment_snap->stable, - read_info.getDeltaReader(), + read_info.getDeltaReader(ReadTag::Internal), read_info.index_begin, read_info.index_end, - expected_block_size); + expected_block_size, + ReadTag::Internal); data_stream = std::make_shared>(data_stream, data_ranges, 0); @@ -1064,15 +1074,17 @@ BlockInputStreamPtr Segment::getInputStreamModeFast( filter, std::numeric_limits::max(), expected_block_size, - /* enable_handle_clean_read */ enable_handle_clean_read, + enable_handle_clean_read, + ReadTag::Query, /* is_fast_scan */ true, - /* enable_del_clean_read */ enable_del_clean_read); + enable_del_clean_read); BlockInputStreamPtr delta_stream = std::make_shared( dm_context, segment_snap->delta, new_columns_to_read, - this->rowkey_range); + this->rowkey_range, + ReadTag::Query); // Do row key filtering based on data_ranges. delta_stream = std::make_shared>(delta_stream, data_ranges, 0); @@ -1128,13 +1140,15 @@ BlockInputStreamPtr Segment::getInputStreamModeRaw( EMPTY_RS_OPERATOR, std::numeric_limits::max(), expected_block_size, - /* enable_handle_clean_read */ false); + /* enable_handle_clean_read */ false, + ReadTag::Query); BlockInputStreamPtr delta_stream = std::make_shared( dm_context, segment_snap->delta, new_columns_to_read, - this->rowkey_range); + this->rowkey_range, + ReadTag::Query); // Do row key filtering based on data_ranges. delta_stream = std::make_shared>(delta_stream, data_ranges, 0); @@ -1543,7 +1557,7 @@ std::optional Segment::getSplitPointSlow( const auto & pk_col = getExtraHandleColumnDefine(is_common_handle); auto pk_col_defs = std::make_shared(ColumnDefines{pk_col}); // We need to create a new delta_reader here, because the one in read_info is used to read columns other than PK column. - auto delta_reader = read_info.getDeltaReader(pk_col_defs); + auto delta_reader = read_info.getDeltaReader(pk_col_defs, ReadTag::Internal); size_t exact_rows = 0; @@ -1558,7 +1572,8 @@ std::optional Segment::getSplitPointSlow( delta_reader, read_info.index_begin, read_info.index_end, - dm_context.stable_pack_rows); + dm_context.stable_pack_rows, + ReadTag::Internal); stream = std::make_shared>(stream, rowkey_ranges, 0); @@ -1584,7 +1599,8 @@ std::optional Segment::getSplitPointSlow( delta_reader, read_info.index_begin, read_info.index_end, - dm_context.stable_pack_rows); + dm_context.stable_pack_rows, + ReadTag::Internal); stream = std::make_shared>(stream, rowkey_ranges, 0); @@ -1832,7 +1848,8 @@ std::optional Segment::prepareSplitPhysical( // dm_context, *schema_snap, segment_snap, - {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}, + ReadTag::Internal); if (!opt_split_point.has_value()) opt_split_point = getSplitPointSlow(dm_context, read_info, segment_snap); @@ -1858,7 +1875,7 @@ std::optional Segment::prepareSplitPhysical( // StableValueSpacePtr other_stable; { - auto my_delta_reader = read_info.getDeltaReader(schema_snap); + auto my_delta_reader = read_info.getDeltaReader(schema_snap, ReadTag::Internal); RowKeyRanges my_ranges{my_range}; BlockInputStreamPtr my_data = getPlacedStream( @@ -1870,7 +1887,8 @@ std::optional Segment::prepareSplitPhysical( // my_delta_reader, read_info.index_begin, read_info.index_end, - dm_context.stable_pack_rows); + dm_context.stable_pack_rows, + ReadTag::Internal); my_data = std::make_shared>(my_data, my_ranges, 0); @@ -1889,7 +1907,7 @@ std::optional Segment::prepareSplitPhysical( // { // Write new segment's data - auto other_delta_reader = read_info.getDeltaReader(schema_snap); + auto other_delta_reader = read_info.getDeltaReader(schema_snap, ReadTag::Internal); RowKeyRanges other_ranges{other_range}; BlockInputStreamPtr other_data = getPlacedStream( @@ -1901,7 +1919,8 @@ std::optional Segment::prepareSplitPhysical( // other_delta_reader, read_info.index_begin, read_info.index_end, - dm_context.stable_pack_rows); + dm_context.stable_pack_rows, + ReadTag::Internal); other_data = std::make_shared>(other_data, other_ranges, 0); @@ -2091,7 +2110,8 @@ StableValueSpacePtr Segment::prepareMerge( dm_context, *schema_snap, segment_snap, - {RowKeyRange::newAll(segment->is_common_handle, segment->rowkey_column_size)}); + {RowKeyRange::newAll(segment->is_common_handle, segment->rowkey_column_size)}, + ReadTag::Internal); RowKeyRanges rowkey_ranges{segment->rowkey_range}; BlockInputStreamPtr stream = getPlacedStream( dm_context, @@ -2099,10 +2119,11 @@ StableValueSpacePtr Segment::prepareMerge( rowkey_ranges, EMPTY_RS_OPERATOR, segment_snap->stable, - read_info.getDeltaReader(), + read_info.getDeltaReader(ReadTag::Internal), read_info.index_begin, read_info.index_end, - dm_context.stable_pack_rows); + dm_context.stable_pack_rows, + ReadTag::Internal); stream = std::make_shared>(stream, rowkey_ranges, 0); stream = std::make_shared>( @@ -2283,7 +2304,8 @@ void Segment::placeDeltaIndex(DMContext & dm_context, const SegmentSnapshotPtr & dm_context, /*read_columns=*/{getExtraHandleColumnDefine(is_common_handle)}, segment_snap, - {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}, + ReadTag::Internal); } String Segment::simpleInfo() const @@ -2375,6 +2397,7 @@ Segment::ReadInfo Segment::getReadInfo( const ColumnDefines & read_columns, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, + ReadTag read_tag, UInt64 max_version) const { LOG_DEBUG(segment_snap->log, "Begin segment getReadInfo"); @@ -2382,9 +2405,14 @@ Segment::ReadInfo Segment::getReadInfo( auto new_read_columns = arrangeReadColumns(getExtraHandleColumnDefine(is_common_handle), read_columns); auto pk_ver_col_defs = std::make_shared( ColumnDefines{getExtraHandleColumnDefine(dm_context.is_common_handle), getVersionColumnDefine()}); - // Create a reader only for pk and version columns. - auto delta_reader - = std::make_shared(dm_context, segment_snap->delta, pk_ver_col_defs, this->rowkey_range); + // Create a reader that reads pk and version columns to update deltaindex. + // It related to MVCC, so always set a `ReadTag::MVCC` for it. + auto delta_reader = std::make_shared( + dm_context, + segment_snap->delta, + pk_ver_col_defs, + this->rowkey_range, + ReadTag::MVCC); auto [my_delta_index, fully_indexed] = ensurePlace(dm_context, segment_snap, delta_reader, read_ranges, max_version); @@ -2414,7 +2442,7 @@ Segment::ReadInfo Segment::getReadInfo( manager->refreshRef(segment_snap->delta->getSharedDeltaIndex()); return ReadInfo( - delta_reader->createNewReader(new_read_columns), + delta_reader->createNewReader(new_read_columns, read_tag), compacted_index->begin(), compacted_index->end(), new_read_columns); @@ -2449,6 +2477,7 @@ SkippableBlockInputStreamPtr Segment::getPlacedStream( const IndexIterator & delta_index_begin, const IndexIterator & delta_index_end, size_t expected_block_size, + ReadTag read_tag, UInt64 max_version, bool need_row_id) { @@ -2462,7 +2491,10 @@ SkippableBlockInputStreamPtr Segment::getPlacedStream( filter, max_version, expected_block_size, - false); + /* enable_handle_clean_read */ false, + read_tag, + /* is_fast_scan */ false, + /* enable_del_clean_read */ false); RowKeyRange rowkey_range = rowkey_ranges.size() == 1 ? rowkey_ranges[0] : mergeRanges(rowkey_ranges, rowkey_ranges[0].is_common_handle, rowkey_ranges[0].rowkey_column_size); @@ -2656,7 +2688,8 @@ bool Segment::placeUpsert( delta_reader, compacted_index->begin(), compacted_index->end(), - dm_context.stable_pack_rows); + dm_context.stable_pack_rows, + ReadTag::MVCC); if (do_sort) return DM::placeInsert( @@ -2708,7 +2741,8 @@ bool Segment::placeDelete( delta_reader, compacted_index->begin(), compacted_index->end(), - dm_context.stable_pack_rows); + dm_context.stable_pack_rows, + ReadTag::MVCC); delete_stream = std::make_shared>(delete_stream, delete_ranges, 0); @@ -2745,7 +2779,8 @@ bool Segment::placeDelete( delta_reader, compacted_index->begin(), compacted_index->end(), - dm_context.stable_pack_rows); + dm_context.stable_pack_rows, + ReadTag::MVCC); fully_indexed &= DM::placeDelete( merged_stream, block, @@ -2989,6 +3024,7 @@ BitmapFilterPtr Segment::buildBitmapFilterStableOnly( max_version, expected_block_size, /*enable_handle_clean_read*/ false, + ReadTag::MVCC, /*is_fast_scan*/ false, /*enable_del_clean_read*/ false, /*read_packs*/ some_packs_sets, @@ -3038,6 +3074,7 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream( max_version, expected_block_size, enable_handle_clean_read, + ReadTag::Query, is_fast_scan, enable_del_clean_read); @@ -3046,7 +3083,8 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream( dm_context, segment_snap->delta, columns_to_read_ptr, - this->rowkey_range); + this->rowkey_range, + ReadTag::Query); return std::make_shared( columns_to_read, @@ -3082,10 +3120,15 @@ BlockInputStreamPtr Segment::getLateMaterializationStream( max_version, expected_block_size, enable_handle_clean_read, + ReadTag::LMFilter, is_fast_scan, enable_del_clean_read); - SkippableBlockInputStreamPtr filter_column_delta_stream - = std::make_shared(dm_context, segment_snap->delta, filter_columns, this->rowkey_range); + SkippableBlockInputStreamPtr filter_column_delta_stream = std::make_shared( + dm_context, + segment_snap->delta, + filter_columns, + this->rowkey_range, + ReadTag::LMFilter); if (unlikely(filter_columns->size() == columns_to_read.size())) { @@ -3164,13 +3207,15 @@ BlockInputStreamPtr Segment::getLateMaterializationStream( max_version, expected_block_size, enable_handle_clean_read, + ReadTag::Query, is_fast_scan, enable_del_clean_read); SkippableBlockInputStreamPtr rest_column_delta_stream = std::make_shared( dm_context, segment_snap->delta, rest_columns_to_read, - this->rowkey_range); + this->rowkey_range, + ReadTag::Query); SkippableBlockInputStreamPtr rest_column_stream = std::make_shared( *rest_columns_to_read, rest_column_stable_stream, diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 5f1a73b7f24..35155b11e86 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -109,10 +109,13 @@ class Segment , read_columns(read_columns_) {} - DeltaValueReaderPtr getDeltaReader() const { return delta_reader->createNewReader(read_columns); } - DeltaValueReaderPtr getDeltaReader(ColumnDefinesPtr columns) const + DeltaValueReaderPtr getDeltaReader(ReadTag read_tag) const { - return delta_reader->createNewReader(columns); + return delta_reader->createNewReader(read_columns, read_tag); + } + DeltaValueReaderPtr getDeltaReader(ColumnDefinesPtr columns, ReadTag read_tag) const + { + return delta_reader->createNewReader(columns, read_tag); } }; @@ -605,6 +608,7 @@ class Segment const ColumnDefines & read_columns, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, + ReadTag read_tag, UInt64 max_version = std::numeric_limits::max()) const; static ColumnDefinesPtr arrangeReadColumns(const ColumnDefine & handle, const ColumnDefines & columns_to_read); @@ -621,6 +625,7 @@ class Segment const IndexIterator & delta_index_begin, const IndexIterator & delta_index_end, size_t expected_block_size, + ReadTag read_tag, UInt64 max_version = std::numeric_limits::max(), bool need_row_id = false); diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 279e4b85c92..f602cb2142a 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -472,6 +472,7 @@ SkippableBlockInputStreamPtr StableValueSpace::Snapshot::getInputStream( UInt64 max_data_version, size_t expected_block_size, bool enable_handle_clean_read, + ReadTag read_tag, bool is_fast_scan, bool enable_del_clean_read, const std::vector & read_packs, @@ -496,7 +497,8 @@ SkippableBlockInputStreamPtr StableValueSpace::Snapshot::getInputStream( .setColumnCache(column_caches[i]) .setTracingID(context.tracing_id) .setRowsThreshold(expected_block_size) - .setReadPacks(read_packs.size() > i ? read_packs[i] : nullptr); + .setReadPacks(read_packs.size() > i ? read_packs[i] : nullptr) + .setReadTag(read_tag); streams.push_back(builder.build(stable->files[i], read_columns, rowkey_ranges, context.scan_context)); rows.push_back(stable->files[i]->getRows()); } diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.h b/dbms/src/Storages/DeltaMerge/StableValueSpace.h index af4c18b9f9b..b87bda0eb87 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.h @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -230,6 +231,7 @@ class StableValueSpace : public std::enable_shared_from_this UInt64 max_data_version, size_t expected_block_size, bool enable_handle_clean_read, + ReadTag read_tag, bool is_fast_scan = false, bool enable_del_clean_read = false, const std::vector & read_packs = {}, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 2f1741916c6..072cf8c715b 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -578,10 +578,8 @@ try while (in->read()) {}; in->readSuffix(); - ASSERT_EQ(scan_context->total_dmfile_scanned_packs, 7); - ASSERT_EQ(scan_context->total_dmfile_scanned_rows, 50000); - ASSERT_EQ(scan_context->total_dmfile_skipped_packs, 0); - ASSERT_EQ(scan_context->total_dmfile_skipped_rows, 0); + ASSERT_EQ(scan_context->dmfile_data_scanned_rows, 50000); + ASSERT_EQ(scan_context->dmfile_data_skipped_rows, 0); auto filter = createGreater( Attr{col_a_define.name, col_a_define.id, DataTypeFactory::instance().get("Int64")}, @@ -609,10 +607,8 @@ try while (in->read()) {}; in->readSuffix(); - ASSERT_EQ(scan_context->total_dmfile_scanned_packs, 6); - ASSERT_EQ(scan_context->total_dmfile_scanned_rows, 41808); - ASSERT_EQ(scan_context->total_dmfile_skipped_packs, 1); - ASSERT_EQ(scan_context->total_dmfile_skipped_rows, 8192); + ASSERT_EQ(scan_context->dmfile_data_scanned_rows, 41808); + ASSERT_EQ(scan_context->dmfile_data_skipped_rows, 8192); } CATCH diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp index aff4cef9bc2..c659817877b 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp @@ -215,8 +215,12 @@ static void checkDeltaValueSpaceData( ASSERT_EQ(rows, expected_all_rows); { - auto reader - = std::make_shared(dm_context, snapshot, table_columns, RowKeyRange::newAll(false, 1)); + auto reader = std::make_shared( + dm_context, + snapshot, + table_columns, + RowKeyRange::newAll(false, 1), + ReadTag::Internal); auto columns = expected_all_blocks[0].cloneEmptyColumns(); ASSERT_EQ(reader->readRows(columns, 0, expected_all_rows, nullptr), expected_all_rows); Blocks result_blocks; @@ -227,8 +231,12 @@ static void checkDeltaValueSpaceData( // read with a specific range { // For `ColumnFileBig`, the same column file reader cannot be used twice, wo we create a new `DeltaValueReader` here. - auto reader - = std::make_shared(dm_context, snapshot, table_columns, RowKeyRange::newAll(false, 1)); + auto reader = std::make_shared( + dm_context, + snapshot, + table_columns, + RowKeyRange::newAll(false, 1), + ReadTag::Internal); auto columns = expected_all_blocks[0].cloneEmptyColumns(); RowKeyRange read_range = RowKeyRange::fromHandleRange(handle_range); ASSERT_EQ(reader->readRows(columns, 0, expected_all_rows, &read_range), expected_range_rows); @@ -536,7 +544,8 @@ TEST_F(DeltaValueSpaceTest, Restore) dmContext(), old_delta_snapshot, table_columns, - RowKeyRange::newAll(false, 1)); + RowKeyRange::newAll(false, 1), + ReadTag::Internal); old_delta_stream.readPrefix(); while (true) { @@ -555,7 +564,8 @@ TEST_F(DeltaValueSpaceTest, Restore) dmContext(), new_delta_snapshot, table_columns, - RowKeyRange::newAll(false, 1)); + RowKeyRange::newAll(false, 1), + ReadTag::Internal); new_delta_stream.readPrefix(); while (true) { @@ -676,8 +686,12 @@ TEST_F(DeltaValueSpaceTest, GetPlaceItems) // write some more data after create snapshot appendBlockToDeltaValueSpace(dmContext(), delta, total_rows_write, num_rows_write_per_batch); ASSERT_EQ(delta->getRows(true), total_rows_write + num_rows_write_per_batch); - auto reader - = std::make_shared(dmContext(), snapshot, table_columns, RowKeyRange::newAll(false, 1)); + auto reader = std::make_shared( + dmContext(), + snapshot, + table_columns, + RowKeyRange::newAll(false, 1), + ReadTag::Internal); auto place_items = reader->getPlaceItems(0, 0, snapshot->getRows(), snapshot->getDeletes()); ASSERT_EQ(place_items.size(), 2); size_t total_place_rows = 0; @@ -697,8 +711,12 @@ TEST_F(DeltaValueSpaceTest, ShouldPlace) appendBlockToDeltaValueSpace(dmContext(), delta, 0, num_rows_write_per_batch, tso); { auto snapshot = delta->createSnapshot(dmContext(), false, CurrentMetrics::DT_SnapshotOfRead); - auto reader - = std::make_shared(dmContext(), snapshot, table_columns, RowKeyRange::newAll(false, 1)); + auto reader = std::make_shared( + dmContext(), + snapshot, + table_columns, + RowKeyRange::newAll(false, 1), + ReadTag::Internal); ASSERT_TRUE(reader->shouldPlace( dmContext(), snapshot->getSharedDeltaIndex(), @@ -715,8 +733,12 @@ TEST_F(DeltaValueSpaceTest, ShouldPlace) { delta->flush(dmContext()); auto snapshot = delta->createSnapshot(dmContext(), false, CurrentMetrics::DT_SnapshotOfRead); - auto reader - = std::make_shared(dmContext(), snapshot, table_columns, RowKeyRange::newAll(false, 1)); + auto reader = std::make_shared( + dmContext(), + snapshot, + table_columns, + RowKeyRange::newAll(false, 1), + ReadTag::Internal); ASSERT_TRUE(reader->shouldPlace( dmContext(), snapshot->getSharedDeltaIndex(), diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_skippable_block_input_stream.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_skippable_block_input_stream.cpp index 76e3f5c78dd..e2fc16f20c9 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_skippable_block_input_stream.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_skippable_block_input_stream.cpp @@ -53,6 +53,7 @@ class SkippableBlockInputStreamTest : public SegmentTestBasic std::numeric_limits::max(), DEFAULT_BLOCK_SIZE, enable_handle_clean_read, + ReadTag::Internal, is_fast_scan, enable_del_clean_read); @@ -61,7 +62,8 @@ class SkippableBlockInputStreamTest : public SegmentTestBasic *dm_context, snapshot->delta, columns_to_read_ptr, - segment->getRowKeyRange()); + segment->getRowKeyRange(), + ReadTag::Internal); return std::make_shared( columns_to_read,