Skip to content

Commit

Permalink
Storages: Add statistical data of TableScanning in ScanContext (relea…
Browse files Browse the repository at this point in the history
…se-7.5) (pingcap#8900)

ref pingcap#8675
  • Loading branch information
JaySon-Huang committed Jun 27, 2024
1 parent 7b56cf4 commit 963d857
Show file tree
Hide file tree
Showing 37 changed files with 704 additions and 169 deletions.
4 changes: 0 additions & 4 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@
[submodule "contrib/kvproto"]
path = contrib/kvproto
url = https://github.com/pingcap/kvproto.git
[submodule "contrib/tipb"]
path = contrib/tipb
url = https://github.com/pingcap/tipb.git
branch = master
[submodule "contrib/client-c"]
path = contrib/client-c
url = https://github.com/tikv/client-c.git
Expand Down
1 change: 0 additions & 1 deletion contrib/tipb
Submodule tipb deleted from 56b021
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1188,7 +1188,7 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max
size_t total_local_region_num = mvcc_query_info->regions_query_info.size();
if (total_local_region_num == 0)
return;
mvcc_query_info->scan_context->total_local_region_num = total_local_region_num;
mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num);
const auto table_query_infos = generateSelectQueryInfos();
bool has_multiple_partitions = table_query_infos.size() > 1;
// MultiPartitionStreamPool will be disabled in no partition mode or single-partition case
Expand Down Expand Up @@ -1254,7 +1254,7 @@ void DAGStorageInterpreter::buildLocalExec(
size_t total_local_region_num = mvcc_query_info->regions_query_info.size();
if (total_local_region_num == 0)
return;
mvcc_query_info->scan_context->total_local_region_num = total_local_region_num;
mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num);
const auto table_query_infos = generateSelectQueryInfos();
bool has_multiple_partitions = table_query_infos.size() > 1;
ConcatBuilderPool builder_pool{max_streams};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ bool equalSummaries(const ExecutionSummary & left, const ExecutionSummary & righ
(left.num_iterations == right.num_iterations) && //
(left.num_produced_rows == right.num_produced_rows) && //
(left.time_processed_ns == right.time_processed_ns) && //
(left.scan_context->total_dmfile_scanned_rows == right.scan_context->total_dmfile_scanned_rows) && //
(left.scan_context->total_dmfile_skipped_rows == right.scan_context->total_dmfile_skipped_rows);
(left.scan_context->dmfile_data_scanned_rows == right.scan_context->dmfile_data_scanned_rows) && //
(left.scan_context->dmfile_data_skipped_rows == right.scan_context->dmfile_data_skipped_rows);
}

struct MockWriter
Expand All @@ -73,10 +73,12 @@ struct MockWriter
summary.concurrency = 1;
summary.scan_context = std::make_unique<DM::ScanContext>();

summary.scan_context->total_dmfile_scanned_packs = 1;
summary.scan_context->total_dmfile_skipped_packs = 2;
summary.scan_context->total_dmfile_scanned_rows = 8000;
summary.scan_context->total_dmfile_skipped_rows = 15000;
summary.scan_context->dmfile_data_scanned_rows = 8000;
summary.scan_context->dmfile_data_skipped_rows = 15000;
summary.scan_context->dmfile_mvcc_scanned_rows = 8000;
summary.scan_context->dmfile_mvcc_skipped_rows = 15000;
summary.scan_context->dmfile_lm_filter_scanned_rows = 8000;
summary.scan_context->dmfile_lm_filter_skipped_rows = 15000;
summary.scan_context->total_dmfile_rough_set_index_check_time_ns = 10;
summary.scan_context->total_dmfile_read_time_ns = 200;
summary.scan_context->create_snapshot_time_ns = 5;
Expand Down
62 changes: 49 additions & 13 deletions dbms/src/Flash/Statistics/TableScanImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,15 @@ namespace DB
{
String TableScanDetail::toJson() const
{
return fmt::format(R"({{"is_local":{},"packets":{},"bytes":{}}})", is_local, packets, bytes);
auto max_cost_ms = max_stream_cost_ns < 0 ? 0 : max_stream_cost_ns / 1'000'000.0;
auto min_cost_ms = min_stream_cost_ns < 0 ? 0 : min_stream_cost_ns / 1'000'000.0;
return fmt::format(
R"({{"is_local":{},"packets":{},"bytes":{},"max":{},"min":{}}})",
is_local,
packets,
bytes,
max_cost_ms,
min_cost_ms);
}

void TableScanStatistics::appendExtraJson(FmtBuffer & fmt_buffer) const
Expand Down Expand Up @@ -53,24 +61,25 @@ void TableScanStatistics::collectExtraRuntimeDetail()
break;
case ExecutionMode::Stream:
transformInBoundIOProfileForStream(dag_context, executor_id, [&](const IBlockInputStream & stream) {
const auto * cop_stream = dynamic_cast<const CoprocessorBlockInputStream *>(&stream);
/// In tiflash_compute node, TableScan will be converted to ExchangeReceiver.
const auto * exchange_stream = dynamic_cast<const ExchangeReceiverInputStream *>(&stream);
if (cop_stream || exchange_stream)
if (const auto * cop_stream = dynamic_cast<const CoprocessorBlockInputStream *>(&stream); cop_stream)
{
const std::vector<ConnectionProfileInfo> * connection_profile_infos = nullptr;
if (cop_stream)
connection_profile_infos = &cop_stream->getConnectionProfileInfos();
else if (exchange_stream)
connection_profile_infos = &exchange_stream->getConnectionProfileInfos();

updateTableScanDetail(*connection_profile_infos);
/// remote read
updateTableScanDetail(cop_stream->getConnectionProfileInfos());
// TODO: Can not get the execution time of remote read streams?
}
else if (const auto * local_stream = dynamic_cast<const IProfilingBlockInputStream *>(&stream);
local_stream)
{
/// local read input stream also is IProfilingBlockInputStream
local_table_scan_detail.bytes += local_stream->getProfileInfo().bytes;
const auto & prof = local_stream->getProfileInfo();
local_table_scan_detail.bytes += prof.bytes;
const double this_execution_time = prof.execution_time * 1.0;
if (local_table_scan_detail.max_stream_cost_ns < 0.0 // not inited
|| local_table_scan_detail.max_stream_cost_ns < this_execution_time)
local_table_scan_detail.max_stream_cost_ns = this_execution_time;
if (local_table_scan_detail.min_stream_cost_ns < 0.0 // not inited
|| local_table_scan_detail.min_stream_cost_ns > this_execution_time)
local_table_scan_detail.min_stream_cost_ns = this_execution_time;
}
else
{
Expand All @@ -81,12 +90,39 @@ void TableScanStatistics::collectExtraRuntimeDetail()
case ExecutionMode::Pipeline:
transformInBoundIOProfileForPipeline(dag_context, executor_id, [&](const IOProfileInfo & profile_info) {
if (profile_info.is_local)
{
local_table_scan_detail.bytes += profile_info.operator_info->bytes;
const double this_execution_time = profile_info.operator_info->execution_time * 1.0;
if (local_table_scan_detail.max_stream_cost_ns < 0.0 // not inited
|| local_table_scan_detail.max_stream_cost_ns < this_execution_time)
local_table_scan_detail.max_stream_cost_ns = this_execution_time;
if (local_table_scan_detail.min_stream_cost_ns < 0.0 // not inited
|| local_table_scan_detail.min_stream_cost_ns > this_execution_time)
local_table_scan_detail.min_stream_cost_ns = this_execution_time;
}
else
{
updateTableScanDetail(profile_info.connection_profile_infos);
const double this_execution_time = profile_info.operator_info->execution_time * 1.0;
if (remote_table_scan_detail.max_stream_cost_ns < 0.0 // not inited
|| remote_table_scan_detail.max_stream_cost_ns < this_execution_time)
remote_table_scan_detail.max_stream_cost_ns = this_execution_time;
if (remote_table_scan_detail.min_stream_cost_ns < 0.0 // not inited
|| remote_table_scan_detail.max_stream_cost_ns > this_execution_time)
remote_table_scan_detail.min_stream_cost_ns = this_execution_time;
}
});
break;
}

if (auto it = dag_context.scan_context_map.find(executor_id); it != dag_context.scan_context_map.end())
{
it->second->setStreamCost(
std::max(local_table_scan_detail.min_stream_cost_ns, 0.0),
std::max(local_table_scan_detail.max_stream_cost_ns, 0.0),
std::max(remote_table_scan_detail.min_stream_cost_ns, 0.0),
std::max(remote_table_scan_detail.max_stream_cost_ns, 0.0));
}
}

TableScanStatistics::TableScanStatistics(const tipb::Executor * executor, DAGContext & dag_context_)
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Statistics/TableScanImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ namespace DB
{
struct TableScanDetail : public ConnectionProfileInfo
{
bool is_local;
const bool is_local;
double min_stream_cost_ns = -1.0;
double max_stream_cost_ns = -1.0;

explicit TableScanDetail(bool is_local_)
: is_local(is_local_)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct Settings
* but we are not going to do it, because settings is used everywhere as static struct fields.
*/

// clang-format off
// clang-format off
#define APPLY_FOR_SETTINGS(M) \
M(SettingString, regions, "", "Deprecated. the region need to be read.") \
M(SettingBool, resolve_locks, false, "resolve locks for TiDB transaction") \
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Operators/IOProfileInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct IOProfileInfo

OperatorProfileInfoPtr operator_info;

bool is_local;
const bool is_local;
std::vector<ConnectionProfileInfo> connection_profile_infos{};
RemoteExecutionSummary remote_execution_summary{};
};
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,8 @@ int Server::main(const std::vector<std::string> & /*args*/)

TiFlashErrorRegistry::instance(); // This invocation is for initializing

DM::ScanContext::initCurrentInstanceId(config(), log);

const auto disaggregated_mode = getDisaggregatedMode(config());
const auto use_autoscaler = useAutoScaler(config());
const bool use_autoscaler_without_s3 = useAutoScalerWithoutS3(config());
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
#include <Core/Block.h>
#include <IO/WriteHelpers.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDataProvider_fwd.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/ReadMode.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool_fwd.h>
#include <Storages/Page/PageDefinesBase.h>
Expand Down Expand Up @@ -178,6 +180,8 @@ class ColumnFileReader

/// Create a new reader from current reader with different columns to read.
virtual ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & col_defs) = 0;

virtual void setReadTag(ReadTag /*read_tag*/) {}
};

std::pair<size_t, size_t> copyColumnsData(
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ void ColumnFileBigReader::initStream()
DMFileBlockInputStreamBuilder builder(context.db_context);
file_stream
= builder.setTracingID(context.tracing_id)
.setReadTag(read_tag)
.build(column_file.getFile(), *col_defs, RowKeyRanges{column_file.segment_range}, context.scan_context);

header = file_stream->getHeader();
Expand Down Expand Up @@ -388,5 +389,12 @@ ColumnFileReaderPtr ColumnFileBigReader::createNewReader(const ColumnDefinesPtr
return std::make_shared<ColumnFileBigReader>(context, column_file, new_col_defs);
}

void ColumnFileBigReader::setReadTag(ReadTag read_tag_)
{
// `read_tag` should be set before `file_stream` is initialized.
RUNTIME_CHECK(file_stream == nullptr);
read_tag = read_tag_;
}

} // namespace DM
} // namespace DB
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ class ColumnFileBigReader : public ColumnFileReader
Block cur_block;
Columns cur_block_data; // The references to columns in cur_block, for faster access.

ReadTag read_tag;

private:
void initStream();
std::pair<size_t, size_t> readRowsRepeatedly(
Expand Down Expand Up @@ -191,6 +193,8 @@ class ColumnFileBigReader : public ColumnFileReader
size_t skipNextBlock() override;

ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs) override;

void setReadTag(ReadTag read_tag_) override;
};

} // namespace DM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ ColumnFileSetReader::ColumnFileSetReader(
const DMContext & context_,
const ColumnFileSetSnapshotPtr & snapshot_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: context(context_)
, snapshot(snapshot_)
, col_defs(col_defs_)
Expand All @@ -98,10 +99,11 @@ ColumnFileSetReader::ColumnFileSetReader(
column_file_rows.push_back(f->getRows());
column_file_rows_end.push_back(total_rows);
column_file_readers.push_back(f->getReader(context, snapshot->getDataProvider(), col_defs));
column_file_readers.back()->setReadTag(read_tag_);
}
}

ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag)
{
auto * new_reader = new ColumnFileSetReader(context);
new_reader->snapshot = snapshot;
Expand All @@ -111,7 +113,10 @@ ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesP
new_reader->column_file_rows_end = column_file_rows_end;

for (auto & fr : column_file_readers)
{
new_reader->column_file_readers.push_back(fr->createNewReader(new_col_defs));
new_reader->column_file_readers.back()->setReadTag(read_tag);
}

return std::shared_ptr<ColumnFileSetReader>(new_reader);
}
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ class ColumnFileSetReader
const DMContext & context_,
const ColumnFileSetSnapshotPtr & snapshot_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_);
const RowKeyRange & segment_range_,
ReadTag read_tag_);

// If we need to read columns besides pk and version, a ColumnFileSetReader can NOT be used more than once.
// This method create a new reader based on the current one. It will reuse some caches in the current reader.
ColumnFileSetReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs);
ColumnFileSetReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag);

// Use for DeltaMergeBlockInputStream to read rows from MemTableSet to do full compaction with other layer.
// This method will check whether offset and limit are valid. It only return those valid rows.
Expand Down Expand Up @@ -104,8 +105,9 @@ class ColumnFileSetInputStream : public SkippableBlockInputStream
const DMContext & context_,
const ColumnFileSetSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
: reader(context_, delta_snap_, col_defs_, segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: reader(context_, delta_snap_, col_defs_, segment_range_, read_tag_)
, column_files(reader.snapshot->getColumnFiles())
, column_files_count(column_files.size())
{}
Expand Down
17 changes: 12 additions & 5 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -440,11 +440,12 @@ class DeltaValueReader
const DMContext & context_,
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_);
const RowKeyRange & segment_range_,
ReadTag read_tag_);

// If we need to read columns besides pk and version, a DeltaValueReader can NOT be used more than once.
// This method create a new reader based on then current one. It will reuse some caches in the current reader.
DeltaValueReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs);
DeltaValueReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag);

void setDeltaIndex(const DeltaIndexCompactedPtr & delta_index_) { compacted_delta_index = delta_index_; }

Expand Down Expand Up @@ -486,9 +487,15 @@ class DeltaValueInputStream : public SkippableBlockInputStream
const DMContext & context_,
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
: mem_table_input_stream(context_, delta_snap_->getMemTableSetSnapshot(), col_defs_, segment_range_)
, persisted_files_input_stream(context_, delta_snap_->getPersistedFileSetSnapshot(), col_defs_, segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: mem_table_input_stream(context_, delta_snap_->getMemTableSetSnapshot(), col_defs_, segment_range_, read_tag_)
, persisted_files_input_stream(
context_,
delta_snap_->getPersistedFileSetSnapshot(),
col_defs_,
segment_range_,
read_tag_)
{}

String getName() const override { return "DeltaValue"; }
Expand Down
15 changes: 9 additions & 6 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,32 @@ DeltaValueReader::DeltaValueReader(
const DMContext & context,
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: delta_snap(delta_snap_)
, mem_table_reader(std::make_shared<ColumnFileSetReader>(
context,
delta_snap_->getMemTableSetSnapshot(),
col_defs_,
segment_range_))
segment_range_,
read_tag_))
, persisted_files_reader(std::make_shared<ColumnFileSetReader>(
context,
delta_snap_->getPersistedFileSetSnapshot(),
col_defs_,
segment_range_))
segment_range_,
read_tag_))
, col_defs(col_defs_)
, segment_range(segment_range_)
{}

DeltaValueReaderPtr DeltaValueReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
DeltaValueReaderPtr DeltaValueReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag)
{
auto * new_reader = new DeltaValueReader();
new_reader->delta_snap = delta_snap;
new_reader->compacted_delta_index = compacted_delta_index;
new_reader->persisted_files_reader = persisted_files_reader->createNewReader(new_col_defs);
new_reader->mem_table_reader = mem_table_reader->createNewReader(new_col_defs);
new_reader->persisted_files_reader = persisted_files_reader->createNewReader(new_col_defs, read_tag);
new_reader->mem_table_reader = mem_table_reader->createNewReader(new_col_defs, read_tag);
new_reader->col_defs = new_col_defs;
new_reader->segment_range = segment_range;

Expand Down
Loading

0 comments on commit 963d857

Please sign in to comment.