diff --git a/dbms/src/Flash/Statistics/TableScanImpl.cpp b/dbms/src/Flash/Statistics/TableScanImpl.cpp index c58613b6f07..7316164a686 100644 --- a/dbms/src/Flash/Statistics/TableScanImpl.cpp +++ b/dbms/src/Flash/Statistics/TableScanImpl.cpp @@ -21,7 +21,15 @@ namespace DB { String TableScanDetail::toJson() const { - return fmt::format(R"({{"is_local":{},"packets":{},"bytes":{}}})", is_local, packets, bytes); + auto max_cost_ms = max_stream_cost_ns < 0 ? 0 : max_stream_cost_ns / 1'000'000.0; + auto min_cost_ms = min_stream_cost_ns < 0 ? 0 : min_stream_cost_ns / 1'000'000.0; + return fmt::format( + R"({{"is_local":{},"packets":{},"bytes":{},"max":{},"min":{}}})", + is_local, + packets, + bytes, + max_cost_ms, + min_cost_ms); } void TableScanStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const @@ -53,24 +61,25 @@ void TableScanStatistics::collectExtraRuntimeDetail() break; case ExecutionMode::Stream: transformInBoundIOProfileForStream(dag_context, executor_id, [&](const IBlockInputStream & stream) { - const auto * cop_stream = dynamic_cast(&stream); - /// In tiflash_compute node, TableScan will be converted to ExchangeReceiver. - const auto * exchange_stream = dynamic_cast(&stream); - if (cop_stream || exchange_stream) + if (const auto * cop_stream = dynamic_cast(&stream); cop_stream) { - const std::vector * connection_profile_infos = nullptr; - if (cop_stream) - connection_profile_infos = &cop_stream->getConnectionProfileInfos(); - else if (exchange_stream) - connection_profile_infos = &exchange_stream->getConnectionProfileInfos(); - - updateTableScanDetail(*connection_profile_infos); + /// remote read + updateTableScanDetail(cop_stream->getConnectionProfileInfos()); + // TODO: Can not get the execution time of remote read streams? } else if (const auto * local_stream = dynamic_cast(&stream); local_stream) { /// local read input stream also is IProfilingBlockInputStream - local_table_scan_detail.bytes += local_stream->getProfileInfo().bytes; + const auto & prof = local_stream->getProfileInfo(); + local_table_scan_detail.bytes += prof.bytes; + const double this_execution_time = prof.execution_time * 1.0; + if (local_table_scan_detail.max_stream_cost_ns < 0.0 // not inited + || local_table_scan_detail.max_stream_cost_ns < this_execution_time) + local_table_scan_detail.max_stream_cost_ns = this_execution_time; + if (local_table_scan_detail.min_stream_cost_ns < 0.0 // not inited + || local_table_scan_detail.min_stream_cost_ns > this_execution_time) + local_table_scan_detail.min_stream_cost_ns = this_execution_time; } else { @@ -81,9 +90,27 @@ void TableScanStatistics::collectExtraRuntimeDetail() case ExecutionMode::Pipeline: transformInBoundIOProfileForPipeline(dag_context, executor_id, [&](const IOProfileInfo & profile_info) { if (profile_info.is_local) + { local_table_scan_detail.bytes += profile_info.operator_info->bytes; + const double this_execution_time = profile_info.operator_info->execution_time * 1.0; + if (local_table_scan_detail.max_stream_cost_ns < 0.0 // not inited + || local_table_scan_detail.max_stream_cost_ns < this_execution_time) + local_table_scan_detail.max_stream_cost_ns = this_execution_time; + if (local_table_scan_detail.min_stream_cost_ns < 0.0 // not inited + || local_table_scan_detail.min_stream_cost_ns > this_execution_time) + local_table_scan_detail.min_stream_cost_ns = this_execution_time; + } else + { updateTableScanDetail(profile_info.connection_profile_infos); + const double this_execution_time = profile_info.operator_info->execution_time * 1.0; + if (remote_table_scan_detail.max_stream_cost_ns < 0.0 // not inited + || remote_table_scan_detail.max_stream_cost_ns < this_execution_time) + remote_table_scan_detail.max_stream_cost_ns = this_execution_time; + if (remote_table_scan_detail.min_stream_cost_ns < 0.0 // not inited + || remote_table_scan_detail.max_stream_cost_ns > this_execution_time) + remote_table_scan_detail.min_stream_cost_ns = this_execution_time; + } }); break; } diff --git a/dbms/src/Operators/IOProfileInfo.h b/dbms/src/Operators/IOProfileInfo.h index 2c8312fda1a..036a33fb66d 100644 --- a/dbms/src/Operators/IOProfileInfo.h +++ b/dbms/src/Operators/IOProfileInfo.h @@ -46,7 +46,7 @@ struct IOProfileInfo OperatorProfileInfoPtr operator_info; - bool is_local; + const bool is_local; std::vector connection_profile_infos{}; RemoteExecutionSummary remote_execution_summary{}; }; diff --git a/dbms/src/Storages/DeltaMerge/ScanContext.cpp b/dbms/src/Storages/DeltaMerge/ScanContext.cpp index c8f459dbdd0..af92e6f5997 100644 --- a/dbms/src/Storages/DeltaMerge/ScanContext.cpp +++ b/dbms/src/Storages/DeltaMerge/ScanContext.cpp @@ -56,6 +56,7 @@ String ScanContext::toJson() const json->set("learner_read_time", fmt::format("{:.3f}ms", learner_read_ns.load() / NS_TO_MS_SCALE)); json->set("create_snapshot_time", fmt::format("{:.3f}ms", create_snapshot_time_ns.load() / NS_TO_MS_SCALE)); + json->set("build_stream_time", fmt::format("{:.3f}ms", build_inputstream_time_ns.load() / NS_TO_MS_SCALE)); json->set("build_bitmap_time", fmt::format("{:.3f}ms", build_bitmap_time_ns.load() / NS_TO_MS_SCALE)); std::stringstream buf; diff --git a/dbms/src/Storages/DeltaMerge/ScanContext.h b/dbms/src/Storages/DeltaMerge/ScanContext.h index ae7c5130db1..20d852082ce 100644 --- a/dbms/src/Storages/DeltaMerge/ScanContext.h +++ b/dbms/src/Storages/DeltaMerge/ScanContext.h @@ -138,12 +138,10 @@ class ScanContext total_dmfile_skipped_rows += other.total_dmfile_skipped_rows; total_dmfile_rough_set_index_check_time_ns += other.total_dmfile_rough_set_index_check_time_ns; total_dmfile_read_time_ns += other.total_dmfile_read_time_ns; - create_snapshot_time_ns += other.create_snapshot_time_ns; total_local_region_num += other.total_local_region_num; total_remote_region_num += other.total_remote_region_num; user_read_bytes += other.user_read_bytes; - learner_read_ns += other.learner_read_ns; disagg_read_cache_hit_size += other.disagg_read_cache_hit_size; disagg_read_cache_miss_size += other.disagg_read_cache_miss_size; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 43845200c74..e70efc34bfd 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -768,8 +768,13 @@ BlockInputStreamPtr Segment::getInputStream( UInt64 max_version, size_t expected_block_size) { - auto clipped_block_rows - = clipBlockRows(dm_context.db_context, expected_block_size, columns_to_read, segment_snap->stable->stable); + Stopwatch sw; + SCOPE_EXIT({ dm_context.scan_context->build_inputstream_time_ns += sw.elapsed(); }); + auto clipped_block_rows = clipBlockRows( // + dm_context.db_context, + expected_block_size, + columns_to_read, + segment_snap->stable->stable); switch (read_mode) { case ReadMode::Normal: diff --git a/dbms/src/Storages/KVStore/Read/LearnerRead.cpp b/dbms/src/Storages/KVStore/Read/LearnerRead.cpp index 929b710ed45..6fb18979698 100644 --- a/dbms/src/Storages/KVStore/Read/LearnerRead.cpp +++ b/dbms/src/Storages/KVStore/Read/LearnerRead.cpp @@ -565,7 +565,7 @@ LearnerReadSnapshot doLearnerRead( if (auto * dag_context = context.getDAGContext()) { - // TODO(observability): add info about the number of stale read regions + mvcc_query_info.scan_context->num_stale_read = worker.getStats().num_stale_read; dag_context->has_read_wait_index = true; dag_context->read_wait_index_start_timestamp = start_time; dag_context->read_wait_index_end_timestamp = end_time;