Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: Add statistical data of TableScanning in ScanContext (release-7.5) #8900

Merged
merged 7 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/tipb
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,7 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max
size_t total_local_region_num = mvcc_query_info->regions_query_info.size();
if (total_local_region_num == 0)
return;
mvcc_query_info->scan_context->total_local_region_num = total_local_region_num;
mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num);
const auto table_query_infos = generateSelectQueryInfos();
bool has_multiple_partitions = table_query_infos.size() > 1;
// MultiPartitionStreamPool will be disabled in no partition mode or single-partition case
Expand Down Expand Up @@ -1250,7 +1250,7 @@ void DAGStorageInterpreter::buildLocalExec(
size_t total_local_region_num = mvcc_query_info->regions_query_info.size();
if (total_local_region_num == 0)
return;
mvcc_query_info->scan_context->total_local_region_num = total_local_region_num;
mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num);
const auto table_query_infos = generateSelectQueryInfos();
bool has_multiple_partitions = table_query_infos.size() > 1;
ConcatBuilderPool builder_pool{max_streams};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ bool equalSummaries(const ExecutionSummary & left, const ExecutionSummary & righ
(left.num_iterations == right.num_iterations) && //
(left.num_produced_rows == right.num_produced_rows) && //
(left.time_processed_ns == right.time_processed_ns) && //
(left.scan_context->total_dmfile_scanned_rows == right.scan_context->total_dmfile_scanned_rows) && //
(left.scan_context->total_dmfile_skipped_rows == right.scan_context->total_dmfile_skipped_rows);
(left.scan_context->dmfile_data_scanned_rows == right.scan_context->dmfile_data_scanned_rows) && //
(left.scan_context->dmfile_data_skipped_rows == right.scan_context->dmfile_data_skipped_rows);
}

struct MockWriter
Expand All @@ -73,10 +73,12 @@ struct MockWriter
summary.concurrency = 1;
summary.scan_context = std::make_unique<DM::ScanContext>();

summary.scan_context->total_dmfile_scanned_packs = 1;
summary.scan_context->total_dmfile_skipped_packs = 2;
summary.scan_context->total_dmfile_scanned_rows = 8000;
summary.scan_context->total_dmfile_skipped_rows = 15000;
summary.scan_context->dmfile_data_scanned_rows = 8000;
summary.scan_context->dmfile_data_skipped_rows = 15000;
summary.scan_context->dmfile_mvcc_scanned_rows = 8000;
summary.scan_context->dmfile_mvcc_skipped_rows = 15000;
summary.scan_context->dmfile_lm_filter_scanned_rows = 8000;
summary.scan_context->dmfile_lm_filter_skipped_rows = 15000;
summary.scan_context->total_dmfile_rough_set_index_check_time_ns = 10;
summary.scan_context->total_dmfile_read_time_ns = 200;
summary.scan_context->create_snapshot_time_ns = 5;
Expand Down
62 changes: 49 additions & 13 deletions dbms/src/Flash/Statistics/TableScanImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,15 @@ namespace DB
{
String TableScanDetail::toJson() const
{
return fmt::format(R"({{"is_local":{},"packets":{},"bytes":{}}})", is_local, packets, bytes);
auto max_cost_ms = max_stream_cost_ns < 0 ? 0 : max_stream_cost_ns / 1'000'000.0;
auto min_cost_ms = min_stream_cost_ns < 0 ? 0 : min_stream_cost_ns / 1'000'000.0;
return fmt::format(
R"({{"is_local":{},"packets":{},"bytes":{},"max":{},"min":{}}})",
is_local,
packets,
bytes,
max_cost_ms,
min_cost_ms);
}

void TableScanStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const
Expand Down Expand Up @@ -53,24 +61,25 @@ void TableScanStatistics::collectExtraRuntimeDetail()
break;
case ExecutionMode::Stream:
transformInBoundIOProfileForStream(dag_context, executor_id, [&](const IBlockInputStream & stream) {
const auto * cop_stream = dynamic_cast<const CoprocessorBlockInputStream *>(&stream);
/// In tiflash_compute node, TableScan will be converted to ExchangeReceiver.
const auto * exchange_stream = dynamic_cast<const ExchangeReceiverInputStream *>(&stream);
if (cop_stream || exchange_stream)
if (const auto * cop_stream = dynamic_cast<const CoprocessorBlockInputStream *>(&stream); cop_stream)
{
const std::vector<ConnectionProfileInfo> * connection_profile_infos = nullptr;
if (cop_stream)
connection_profile_infos = &cop_stream->getConnectionProfileInfos();
else if (exchange_stream)
connection_profile_infos = &exchange_stream->getConnectionProfileInfos();

updateTableScanDetail(*connection_profile_infos);
/// remote read
updateTableScanDetail(cop_stream->getConnectionProfileInfos());
// TODO: Can not get the execution time of remote read streams?
}
else if (const auto * local_stream = dynamic_cast<const IProfilingBlockInputStream *>(&stream);
local_stream)
{
/// local read input stream also is IProfilingBlockInputStream
local_table_scan_detail.bytes += local_stream->getProfileInfo().bytes;
const auto & prof = local_stream->getProfileInfo();
local_table_scan_detail.bytes += prof.bytes;
const double this_execution_time = prof.execution_time * 1.0;
if (local_table_scan_detail.max_stream_cost_ns < 0.0 // not inited
|| local_table_scan_detail.max_stream_cost_ns < this_execution_time)
local_table_scan_detail.max_stream_cost_ns = this_execution_time;
if (local_table_scan_detail.min_stream_cost_ns < 0.0 // not inited
|| local_table_scan_detail.min_stream_cost_ns > this_execution_time)
local_table_scan_detail.min_stream_cost_ns = this_execution_time;
}
else
{
Expand All @@ -81,12 +90,39 @@ void TableScanStatistics::collectExtraRuntimeDetail()
case ExecutionMode::Pipeline:
transformInBoundIOProfileForPipeline(dag_context, executor_id, [&](const IOProfileInfo & profile_info) {
if (profile_info.is_local)
{
local_table_scan_detail.bytes += profile_info.operator_info->bytes;
const double this_execution_time = profile_info.operator_info->execution_time * 1.0;
if (local_table_scan_detail.max_stream_cost_ns < 0.0 // not inited
|| local_table_scan_detail.max_stream_cost_ns < this_execution_time)
local_table_scan_detail.max_stream_cost_ns = this_execution_time;
if (local_table_scan_detail.min_stream_cost_ns < 0.0 // not inited
|| local_table_scan_detail.min_stream_cost_ns > this_execution_time)
local_table_scan_detail.min_stream_cost_ns = this_execution_time;
}
else
{
updateTableScanDetail(profile_info.connection_profile_infos);
const double this_execution_time = profile_info.operator_info->execution_time * 1.0;
if (remote_table_scan_detail.max_stream_cost_ns < 0.0 // not inited
|| remote_table_scan_detail.max_stream_cost_ns < this_execution_time)
remote_table_scan_detail.max_stream_cost_ns = this_execution_time;
if (remote_table_scan_detail.min_stream_cost_ns < 0.0 // not inited
|| remote_table_scan_detail.max_stream_cost_ns > this_execution_time)
remote_table_scan_detail.min_stream_cost_ns = this_execution_time;
}
});
break;
}

if (auto it = dag_context.scan_context_map.find(executor_id); it != dag_context.scan_context_map.end())
{
it->second->setStreamCost(
std::max(local_table_scan_detail.min_stream_cost_ns, 0.0),
std::max(local_table_scan_detail.max_stream_cost_ns, 0.0),
std::max(remote_table_scan_detail.min_stream_cost_ns, 0.0),
std::max(remote_table_scan_detail.max_stream_cost_ns, 0.0));
}
}

TableScanStatistics::TableScanStatistics(const tipb::Executor * executor, DAGContext & dag_context_)
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Statistics/TableScanImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ namespace DB
{
struct TableScanDetail : public ConnectionProfileInfo
{
bool is_local;
const bool is_local;
double min_stream_cost_ns = -1.0;
double max_stream_cost_ns = -1.0;

explicit TableScanDetail(bool is_local_)
: is_local(is_local_)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct Settings
* but we are not going to do it, because settings is used everywhere as static struct fields.
*/

// clang-format off
// clang-format off
#define APPLY_FOR_SETTINGS(M) \
M(SettingString, regions, "", "Deprecated. the region need to be read.") \
M(SettingBool, resolve_locks, false, "resolve locks for TiDB transaction") \
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Operators/IOProfileInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct IOProfileInfo

OperatorProfileInfoPtr operator_info;

bool is_local;
const bool is_local;
std::vector<ConnectionProfileInfo> connection_profile_infos{};
RemoteExecutionSummary remote_execution_summary{};
};
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,8 @@ int Server::main(const std::vector<std::string> & /*args*/)

TiFlashErrorRegistry::instance(); // This invocation is for initializing

DM::ScanContext::initCurrentInstanceId(config(), log);

const auto disaggregated_mode = getDisaggregatedMode(config());
const auto use_autoscaler = useAutoScaler(config());
const bool use_autoscaler_without_s3 = useAutoScalerWithoutS3(config());
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
#include <Core/Block.h>
#include <IO/WriteHelpers.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDataProvider_fwd.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/ReadMode.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool_fwd.h>
#include <Storages/Page/PageDefinesBase.h>
Expand Down Expand Up @@ -178,6 +180,8 @@ class ColumnFileReader

/// Create a new reader from current reader with different columns to read.
virtual ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & col_defs) = 0;

virtual void setReadTag(ReadTag /*read_tag*/) {}
};

std::pair<size_t, size_t> copyColumnsData(
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ void ColumnFileBigReader::initStream()
DMFileBlockInputStreamBuilder builder(context.db_context);
file_stream
= builder.setTracingID(context.tracing_id)
.setReadTag(read_tag)
.build(column_file.getFile(), *col_defs, RowKeyRanges{column_file.segment_range}, context.scan_context);

header = file_stream->getHeader();
Expand Down Expand Up @@ -386,5 +387,12 @@ ColumnFileReaderPtr ColumnFileBigReader::createNewReader(const ColumnDefinesPtr
return std::make_shared<ColumnFileBigReader>(context, column_file, new_col_defs);
}

void ColumnFileBigReader::setReadTag(ReadTag read_tag_)
{
// `read_tag` should be set before `file_stream` is initialized.
RUNTIME_CHECK(file_stream == nullptr);
read_tag = read_tag_;
}

} // namespace DM
} // namespace DB
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ class ColumnFileBigReader : public ColumnFileReader
Block cur_block;
Columns cur_block_data; // The references to columns in cur_block, for faster access.

ReadTag read_tag;

private:
void initStream();
std::pair<size_t, size_t> readRowsRepeatedly(
Expand Down Expand Up @@ -190,6 +192,8 @@ class ColumnFileBigReader : public ColumnFileReader
size_t skipNextBlock() override;

ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs) override;

void setReadTag(ReadTag read_tag_) override;
};

} // namespace DM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ ColumnFileSetReader::ColumnFileSetReader(
const DMContext & context_,
const ColumnFileSetSnapshotPtr & snapshot_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: context(context_)
, snapshot(snapshot_)
, col_defs(col_defs_)
Expand All @@ -98,10 +99,11 @@ ColumnFileSetReader::ColumnFileSetReader(
column_file_rows.push_back(f->getRows());
column_file_rows_end.push_back(total_rows);
column_file_readers.push_back(f->getReader(context, snapshot->getDataProvider(), col_defs));
column_file_readers.back()->setReadTag(read_tag_);
}
}

ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag)
{
auto * new_reader = new ColumnFileSetReader(context);
new_reader->snapshot = snapshot;
Expand All @@ -111,7 +113,10 @@ ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesP
new_reader->column_file_rows_end = column_file_rows_end;

for (auto & fr : column_file_readers)
{
new_reader->column_file_readers.push_back(fr->createNewReader(new_col_defs));
new_reader->column_file_readers.back()->setReadTag(read_tag);
}

return std::shared_ptr<ColumnFileSetReader>(new_reader);
}
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ class ColumnFileSetReader
const DMContext & context_,
const ColumnFileSetSnapshotPtr & snapshot_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_);
const RowKeyRange & segment_range_,
ReadTag read_tag_);

// If we need to read columns besides pk and version, a ColumnFileSetReader can NOT be used more than once.
// This method create a new reader based on the current one. It will reuse some caches in the current reader.
ColumnFileSetReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs);
ColumnFileSetReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag);

// Use for DeltaMergeBlockInputStream to read rows from MemTableSet to do full compaction with other layer.
// This method will check whether offset and limit are valid. It only return those valid rows.
Expand Down Expand Up @@ -104,8 +105,9 @@ class ColumnFileSetInputStream : public SkippableBlockInputStream
const DMContext & context_,
const ColumnFileSetSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
: reader(context_, delta_snap_, col_defs_, segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: reader(context_, delta_snap_, col_defs_, segment_range_, read_tag_)
, column_files(reader.snapshot->getColumnFiles())
, column_files_count(column_files.size())
{}
Expand Down
17 changes: 12 additions & 5 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -427,11 +427,12 @@ class DeltaValueReader
const DMContext & context_,
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_);
const RowKeyRange & segment_range_,
ReadTag read_tag_);

// If we need to read columns besides pk and version, a DeltaValueReader can NOT be used more than once.
// This method create a new reader based on then current one. It will reuse some caches in the current reader.
DeltaValueReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs);
DeltaValueReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag);

void setDeltaIndex(const DeltaIndexCompactedPtr & delta_index_) { compacted_delta_index = delta_index_; }

Expand Down Expand Up @@ -473,9 +474,15 @@ class DeltaValueInputStream : public SkippableBlockInputStream
const DMContext & context_,
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
: mem_table_input_stream(context_, delta_snap_->getMemTableSetSnapshot(), col_defs_, segment_range_)
, persisted_files_input_stream(context_, delta_snap_->getPersistedFileSetSnapshot(), col_defs_, segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: mem_table_input_stream(context_, delta_snap_->getMemTableSetSnapshot(), col_defs_, segment_range_, read_tag_)
, persisted_files_input_stream(
context_,
delta_snap_->getPersistedFileSetSnapshot(),
col_defs_,
segment_range_,
read_tag_)
{}

String getName() const override { return "DeltaValue"; }
Expand Down
15 changes: 9 additions & 6 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,32 @@ DeltaValueReader::DeltaValueReader(
const DMContext & context,
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: delta_snap(delta_snap_)
, mem_table_reader(std::make_shared<ColumnFileSetReader>(
context,
delta_snap_->getMemTableSetSnapshot(),
col_defs_,
segment_range_))
segment_range_,
read_tag_))
, persisted_files_reader(std::make_shared<ColumnFileSetReader>(
context,
delta_snap_->getPersistedFileSetSnapshot(),
col_defs_,
segment_range_))
segment_range_,
read_tag_))
, col_defs(col_defs_)
, segment_range(segment_range_)
{}

DeltaValueReaderPtr DeltaValueReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
DeltaValueReaderPtr DeltaValueReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag)
{
auto * new_reader = new DeltaValueReader();
new_reader->delta_snap = delta_snap;
new_reader->compacted_delta_index = compacted_delta_index;
new_reader->persisted_files_reader = persisted_files_reader->createNewReader(new_col_defs);
new_reader->mem_table_reader = mem_table_reader->createNewReader(new_col_defs);
new_reader->persisted_files_reader = persisted_files_reader->createNewReader(new_col_defs, read_tag);
new_reader->mem_table_reader = mem_table_reader->createNewReader(new_col_defs, read_tag);
new_reader->col_defs = new_col_defs;
new_reader->segment_range = segment_range;

Expand Down
Loading