Skip to content

Commit

Permalink
Storage: Add the time cost about stream in local/remote (pingcap#8676)
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang committed May 7, 2024
1 parent 9002cc3 commit a92744f
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 19 deletions.
53 changes: 40 additions & 13 deletions dbms/src/Flash/Statistics/TableScanImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,15 @@ namespace DB
{
String TableScanDetail::toJson() const
{
return fmt::format(R"({{"is_local":{},"packets":{},"bytes":{}}})", is_local, packets, bytes);
auto max_cost_ms = max_stream_cost_ns < 0 ? 0 : max_stream_cost_ns / 1'000'000.0;
auto min_cost_ms = min_stream_cost_ns < 0 ? 0 : min_stream_cost_ns / 1'000'000.0;
return fmt::format(
R"({{"is_local":{},"packets":{},"bytes":{},"max":{},"min":{}}})",
is_local,
packets,
bytes,
max_cost_ms,
min_cost_ms);
}

void TableScanStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const
Expand Down Expand Up @@ -53,24 +61,25 @@ void TableScanStatistics::collectExtraRuntimeDetail()
break;
case ExecutionMode::Stream:
transformInBoundIOProfileForStream(dag_context, executor_id, [&](const IBlockInputStream & stream) {
const auto * cop_stream = dynamic_cast<const CoprocessorBlockInputStream *>(&stream);
/// In tiflash_compute node, TableScan will be converted to ExchangeReceiver.
const auto * exchange_stream = dynamic_cast<const ExchangeReceiverInputStream *>(&stream);
if (cop_stream || exchange_stream)
if (const auto * cop_stream = dynamic_cast<const CoprocessorBlockInputStream *>(&stream); cop_stream)
{
const std::vector<ConnectionProfileInfo> * connection_profile_infos = nullptr;
if (cop_stream)
connection_profile_infos = &cop_stream->getConnectionProfileInfos();
else if (exchange_stream)
connection_profile_infos = &exchange_stream->getConnectionProfileInfos();

updateTableScanDetail(*connection_profile_infos);
/// remote read
updateTableScanDetail(cop_stream->getConnectionProfileInfos());
// TODO: Can not get the execution time of remote read streams?
}
else if (const auto * local_stream = dynamic_cast<const IProfilingBlockInputStream *>(&stream);
local_stream)
{
/// local read input stream also is IProfilingBlockInputStream
local_table_scan_detail.bytes += local_stream->getProfileInfo().bytes;
const auto & prof = local_stream->getProfileInfo();
local_table_scan_detail.bytes += prof.bytes;
const double this_execution_time = prof.execution_time * 1.0;
if (local_table_scan_detail.max_stream_cost_ns < 0.0 // not inited
|| local_table_scan_detail.max_stream_cost_ns < this_execution_time)
local_table_scan_detail.max_stream_cost_ns = this_execution_time;
if (local_table_scan_detail.min_stream_cost_ns < 0.0 // not inited
|| local_table_scan_detail.min_stream_cost_ns > this_execution_time)
local_table_scan_detail.min_stream_cost_ns = this_execution_time;
}
else
{
Expand All @@ -81,9 +90,27 @@ void TableScanStatistics::collectExtraRuntimeDetail()
case ExecutionMode::Pipeline:
transformInBoundIOProfileForPipeline(dag_context, executor_id, [&](const IOProfileInfo & profile_info) {
if (profile_info.is_local)
{
local_table_scan_detail.bytes += profile_info.operator_info->bytes;
const double this_execution_time = profile_info.operator_info->execution_time * 1.0;
if (local_table_scan_detail.max_stream_cost_ns < 0.0 // not inited
|| local_table_scan_detail.max_stream_cost_ns < this_execution_time)
local_table_scan_detail.max_stream_cost_ns = this_execution_time;
if (local_table_scan_detail.min_stream_cost_ns < 0.0 // not inited
|| local_table_scan_detail.min_stream_cost_ns > this_execution_time)
local_table_scan_detail.min_stream_cost_ns = this_execution_time;
}
else
{
updateTableScanDetail(profile_info.connection_profile_infos);
const double this_execution_time = profile_info.operator_info->execution_time * 1.0;
if (remote_table_scan_detail.max_stream_cost_ns < 0.0 // not inited
|| remote_table_scan_detail.max_stream_cost_ns < this_execution_time)
remote_table_scan_detail.max_stream_cost_ns = this_execution_time;
if (remote_table_scan_detail.min_stream_cost_ns < 0.0 // not inited
|| remote_table_scan_detail.max_stream_cost_ns > this_execution_time)
remote_table_scan_detail.min_stream_cost_ns = this_execution_time;
}
});
break;
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Operators/IOProfileInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct IOProfileInfo

OperatorProfileInfoPtr operator_info;

bool is_local;
const bool is_local;
std::vector<ConnectionProfileInfo> connection_profile_infos{};
RemoteExecutionSummary remote_execution_summary{};
};
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/ScanContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ String ScanContext::toJson() const

json->set("learner_read_time", fmt::format("{:.3f}ms", learner_read_ns.load() / NS_TO_MS_SCALE));
json->set("create_snapshot_time", fmt::format("{:.3f}ms", create_snapshot_time_ns.load() / NS_TO_MS_SCALE));
json->set("build_stream_time", fmt::format("{:.3f}ms", build_inputstream_time_ns.load() / NS_TO_MS_SCALE));
json->set("build_bitmap_time", fmt::format("{:.3f}ms", build_bitmap_time_ns.load() / NS_TO_MS_SCALE));

std::stringstream buf;
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/DeltaMerge/ScanContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,10 @@ class ScanContext
total_dmfile_skipped_rows += other.total_dmfile_skipped_rows;
total_dmfile_rough_set_index_check_time_ns += other.total_dmfile_rough_set_index_check_time_ns;
total_dmfile_read_time_ns += other.total_dmfile_read_time_ns;
create_snapshot_time_ns += other.create_snapshot_time_ns;

total_local_region_num += other.total_local_region_num;
total_remote_region_num += other.total_remote_region_num;
user_read_bytes += other.user_read_bytes;
learner_read_ns += other.learner_read_ns;
disagg_read_cache_hit_size += other.disagg_read_cache_hit_size;
disagg_read_cache_miss_size += other.disagg_read_cache_miss_size;

Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -768,8 +768,13 @@ BlockInputStreamPtr Segment::getInputStream(
UInt64 max_version,
size_t expected_block_size)
{
auto clipped_block_rows
= clipBlockRows(dm_context.db_context, expected_block_size, columns_to_read, segment_snap->stable->stable);
Stopwatch sw;
SCOPE_EXIT({ dm_context.scan_context->build_inputstream_time_ns += sw.elapsed(); });
auto clipped_block_rows = clipBlockRows( //
dm_context.db_context,
expected_block_size,
columns_to_read,
segment_snap->stable->stable);
switch (read_mode)
{
case ReadMode::Normal:
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/Read/LearnerRead.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ LearnerReadSnapshot doLearnerRead(

if (auto * dag_context = context.getDAGContext())
{
// TODO(observability): add info about the number of stale read regions
mvcc_query_info.scan_context->num_stale_read = worker.getStats().num_stale_read;
dag_context->has_read_wait_index = true;
dag_context->read_wait_index_start_timestamp = start_time;
dag_context->read_wait_index_end_timestamp = end_time;
Expand Down

0 comments on commit a92744f

Please sign in to comment.