Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: Add statistical data of TableScanning in ScanContext #8842

Merged
merged 5 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/tipb
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,7 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max
size_t total_local_region_num = mvcc_query_info->regions_query_info.size();
if (total_local_region_num == 0)
return;
mvcc_query_info->scan_context->total_local_region_num = total_local_region_num;
mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num);
const auto table_query_infos = generateSelectQueryInfos();
bool has_multiple_partitions = table_query_infos.size() > 1;
// MultiPartitionStreamPool will be disabled in no partition mode or single-partition case
Expand Down Expand Up @@ -1253,7 +1253,7 @@ void DAGStorageInterpreter::buildLocalExec(
size_t total_local_region_num = mvcc_query_info->regions_query_info.size();
if (total_local_region_num == 0)
return;
mvcc_query_info->scan_context->total_local_region_num = total_local_region_num;
mvcc_query_info->scan_context->setRegionNumOfCurrentInstance(total_local_region_num);
const auto table_query_infos = generateSelectQueryInfos();
bool has_multiple_partitions = table_query_infos.size() > 1;
ConcatBuilderPool builder_pool{max_streams};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ bool equalSummaries(const ExecutionSummary & left, const ExecutionSummary & righ
(left.num_iterations == right.num_iterations) && //
(left.num_produced_rows == right.num_produced_rows) && //
(left.time_processed_ns == right.time_processed_ns) && //
(left.scan_context->total_dmfile_scanned_rows == right.scan_context->total_dmfile_scanned_rows) && //
(left.scan_context->total_dmfile_skipped_rows == right.scan_context->total_dmfile_skipped_rows);
(left.scan_context->dmfile_data_scanned_rows == right.scan_context->dmfile_data_scanned_rows) && //
(left.scan_context->dmfile_data_skipped_rows == right.scan_context->dmfile_data_skipped_rows);
}

struct MockWriter
Expand All @@ -72,10 +72,12 @@ struct MockWriter
summary.concurrency = 1;
summary.scan_context = std::make_unique<DM::ScanContext>();

summary.scan_context->total_dmfile_scanned_packs = 1;
summary.scan_context->total_dmfile_skipped_packs = 2;
summary.scan_context->total_dmfile_scanned_rows = 8000;
summary.scan_context->total_dmfile_skipped_rows = 15000;
summary.scan_context->dmfile_data_scanned_rows = 8000;
summary.scan_context->dmfile_data_skipped_rows = 15000;
summary.scan_context->dmfile_mvcc_scanned_rows = 8000;
summary.scan_context->dmfile_mvcc_skipped_rows = 15000;
summary.scan_context->dmfile_lm_filter_scanned_rows = 8000;
summary.scan_context->dmfile_lm_filter_skipped_rows = 15000;
summary.scan_context->total_dmfile_rough_set_index_check_time_ns = 10;
summary.scan_context->total_dmfile_read_time_ns = 200;
summary.scan_context->create_snapshot_time_ns = 5;
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Flash/Statistics/TableScanImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ void TableScanStatistics::collectExtraRuntimeDetail()
});
break;
}

if (auto it = dag_context.scan_context_map.find(executor_id); it != dag_context.scan_context_map.end())
{
it->second->setStreamCost(
std::max(local_table_scan_detail.min_stream_cost_ns, 0.0),
std::max(local_table_scan_detail.max_stream_cost_ns, 0.0),
std::max(remote_table_scan_detail.min_stream_cost_ns, 0.0),
std::max(remote_table_scan_detail.max_stream_cost_ns, 0.0));
}
}

TableScanStatistics::TableScanStatistics(const tipb::Executor * executor, DAGContext & dag_context_)
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,8 @@ int Server::main(const std::vector<std::string> & /*args*/)

TiFlashErrorRegistry::instance(); // This invocation is for initializing

DM::ScanContext::initCurrentInstanceId(config(), log);

const auto disaggregated_mode = getDisaggregatedMode(config());
const auto use_autoscaler = useAutoScaler(config());

Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <IO/WriteHelpers.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDataProvider_fwd.h>
#include <Storages/DeltaMerge/DMContext_fwd.h>
#include <Storages/DeltaMerge/ReadMode.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool_fwd.h>
#include <Storages/Page/PageDefinesBase.h>
Expand Down Expand Up @@ -177,6 +178,8 @@ class ColumnFileReader

/// Create a new reader from current reader with different columns to read.
virtual ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & col_defs) = 0;

virtual void setReadTag(ReadTag /*read_tag*/) {}
};

std::pair<size_t, size_t> copyColumnsData(
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ void ColumnFileBigReader::initStream()

DMFileBlockInputStreamBuilder builder(dm_context.global_context);
file_stream = builder.setTracingID(dm_context.tracing_id)
.setReadTag(read_tag)
.build(
column_file.getFile(),
*col_defs,
Expand Down Expand Up @@ -394,5 +395,12 @@ ColumnFileReaderPtr ColumnFileBigReader::createNewReader(const ColumnDefinesPtr
return std::make_shared<ColumnFileBigReader>(dm_context, column_file, new_col_defs);
}

void ColumnFileBigReader::setReadTag(ReadTag read_tag_)
{
// `read_tag` should be set before `file_stream` is initialized.
RUNTIME_CHECK(file_stream == nullptr);
read_tag = read_tag_;
}

} // namespace DM
} // namespace DB
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ class ColumnFileBigReader : public ColumnFileReader
Block cur_block;
Columns cur_block_data; // The references to columns in cur_block, for faster access.

ReadTag read_tag;

private:
void initStream();
std::pair<size_t, size_t> readRowsRepeatedly(
Expand Down Expand Up @@ -192,6 +194,8 @@ class ColumnFileBigReader : public ColumnFileReader
size_t skipNextBlock() override;

ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs) override;

void setReadTag(ReadTag read_tag_) override;
};

} // namespace DM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ ColumnFileSetReader::ColumnFileSetReader(
const DMContext & context_,
const ColumnFileSetSnapshotPtr & snapshot_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: context(context_)
, snapshot(snapshot_)
, col_defs(col_defs_)
Expand All @@ -98,10 +99,11 @@ ColumnFileSetReader::ColumnFileSetReader(
column_file_rows.push_back(f->getRows());
column_file_rows_end.push_back(total_rows);
column_file_readers.push_back(f->getReader(context, snapshot->getDataProvider(), col_defs));
column_file_readers.back()->setReadTag(read_tag_);
}
}

ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag)
{
auto * new_reader = new ColumnFileSetReader(context);
new_reader->snapshot = snapshot;
Expand All @@ -111,7 +113,10 @@ ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesP
new_reader->column_file_rows_end = column_file_rows_end;

for (auto & fr : column_file_readers)
{
new_reader->column_file_readers.push_back(fr->createNewReader(new_col_defs));
new_reader->column_file_readers.back()->setReadTag(read_tag);
}

return std::shared_ptr<ColumnFileSetReader>(new_reader);
}
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ class ColumnFileSetReader
const DMContext & context_,
const ColumnFileSetSnapshotPtr & snapshot_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_);
const RowKeyRange & segment_range_,
ReadTag read_tag_);

// If we need to read columns besides pk and version, a ColumnFileSetReader can NOT be used more than once.
// This method create a new reader based on the current one. It will reuse some caches in the current reader.
ColumnFileSetReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs);
ColumnFileSetReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag);

// Use for DeltaMergeBlockInputStream to read rows from MemTableSet to do full compaction with other layer.
// This method will check whether offset and limit are valid. It only return those valid rows.
Expand Down Expand Up @@ -104,8 +105,9 @@ class ColumnFileSetInputStream : public SkippableBlockInputStream
const DMContext & context_,
const ColumnFileSetSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
: reader(context_, delta_snap_, col_defs_, segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: reader(context_, delta_snap_, col_defs_, segment_range_, read_tag_)
, column_files(reader.snapshot->getColumnFiles())
, column_files_count(column_files.size())
{}
Expand Down
17 changes: 12 additions & 5 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,12 @@ class DeltaValueReader
const DMContext & context_,
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_);
const RowKeyRange & segment_range_,
ReadTag read_tag_);

// If we need to read columns besides pk and version, a DeltaValueReader can NOT be used more than once.
// This method create a new reader based on then current one. It will reuse some caches in the current reader.
DeltaValueReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs);
DeltaValueReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag);

void setDeltaIndex(const DeltaIndexCompactedPtr & delta_index_) { compacted_delta_index = delta_index_; }

Expand Down Expand Up @@ -492,9 +493,15 @@ class DeltaValueInputStream : public SkippableBlockInputStream
const DMContext & context_,
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
: mem_table_input_stream(context_, delta_snap_->getMemTableSetSnapshot(), col_defs_, segment_range_)
, persisted_files_input_stream(context_, delta_snap_->getPersistedFileSetSnapshot(), col_defs_, segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: mem_table_input_stream(context_, delta_snap_->getMemTableSetSnapshot(), col_defs_, segment_range_, read_tag_)
, persisted_files_input_stream(
context_,
delta_snap_->getPersistedFileSetSnapshot(),
col_defs_,
segment_range_,
read_tag_)
{}

String getName() const override { return "DeltaValue"; }
Expand Down
15 changes: 9 additions & 6 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,32 @@ DeltaValueReader::DeltaValueReader(
const DMContext & context,
const DeltaSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: delta_snap(delta_snap_)
, mem_table_reader(std::make_shared<ColumnFileSetReader>(
context,
delta_snap_->getMemTableSetSnapshot(),
col_defs_,
segment_range_))
segment_range_,
read_tag_))
, persisted_files_reader(std::make_shared<ColumnFileSetReader>(
context,
delta_snap_->getPersistedFileSetSnapshot(),
col_defs_,
segment_range_))
segment_range_,
read_tag_))
, col_defs(col_defs_)
, segment_range(segment_range_)
{}

DeltaValueReaderPtr DeltaValueReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
DeltaValueReaderPtr DeltaValueReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag)
{
auto * new_reader = new DeltaValueReader();
new_reader->delta_snap = delta_snap;
new_reader->compacted_delta_index = compacted_delta_index;
new_reader->persisted_files_reader = persisted_files_reader->createNewReader(new_col_defs);
new_reader->mem_table_reader = mem_table_reader->createNewReader(new_col_defs);
new_reader->persisted_files_reader = persisted_files_reader->createNewReader(new_col_defs, read_tag);
new_reader->mem_table_reader = mem_table_reader->createNewReader(new_col_defs, read_tag);
new_reader->col_defs = new_col_defs;
new_reader->segment_range = segment_range;

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(
read_one_pack_every_time,
tracing_id,
max_sharing_column_count,
scan_context);
scan_context,
read_tag);

return std::make_shared<DMFileBlockInputStream>(std::move(reader), max_sharing_column_count > 0);
}
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ class DMFileBlockInputStreamBuilder
return *this;
}

DMFileBlockInputStreamBuilder & setReadTag(ReadTag read_tag_)
{
read_tag = read_tag_;
return *this;
}

private:
// These methods are called by the ctor

Expand Down Expand Up @@ -194,6 +200,7 @@ class DMFileBlockInputStreamBuilder
size_t max_sharing_column_bytes_for_all = 0;
size_t max_sharing_column_count = 0;
String tracing_id;
ReadTag read_tag = ReadTag::Internal;
};

/**
Expand Down
49 changes: 42 additions & 7 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ DMFileReader::DMFileReader(
bool read_one_pack_every_time_,
const String & tracing_id_,
size_t max_sharing_column_count,
const ScanContextPtr & scan_context_)
const ScanContextPtr & scan_context_,
const ReadTag read_tag_)
: dmfile(dmfile_)
, read_columns(read_columns_)
, is_common_handle(is_common_handle_)
Expand All @@ -81,6 +82,7 @@ DMFileReader::DMFileReader(
, mark_cache(mark_cache_)
, column_cache(column_cache_)
, scan_context(scan_context_)
, read_tag(read_tag_)
, rows_threshold_per_read(rows_threshold_per_read_)
, file_provider(file_provider_)
, log(Logger::get(tracing_id_))
Expand Down Expand Up @@ -130,8 +132,7 @@ bool DMFileReader::getSkippedRows(size_t & skip_rows)
for (; next_pack_id < use_packs.size() && !use_packs[next_pack_id]; ++next_pack_id)
{
skip_rows += pack_stats[next_pack_id].rows;
scan_context->total_dmfile_skipped_packs += 1;
scan_context->total_dmfile_skipped_rows += pack_stats[next_pack_id].rows;
addSkippedRows(pack_stats[next_pack_id].rows);
}
next_row_offset += skip_rows;
// return false if it is the end of stream.
Expand Down Expand Up @@ -165,10 +166,9 @@ size_t DMFileReader::skipNextBlock()
break;

read_rows += pack_stats[next_pack_id].rows;
scan_context->total_dmfile_skipped_packs += 1;
}

scan_context->total_dmfile_skipped_rows += read_rows;
addSkippedRows(read_rows);
next_row_offset += read_rows;

// When we read dmfile, if the previous pack is not read,
Expand Down Expand Up @@ -357,8 +357,7 @@ Block DMFileReader::read()

size_t read_packs = next_pack_id - start_pack_id;

scan_context->total_dmfile_scanned_packs += read_packs;
scan_context->total_dmfile_scanned_rows += read_rows;
addScannedRows(read_rows);

// TODO: this will need better algorithm: we should separate those packs which can and can not do clean read.
bool do_clean_read_on_normal_mode
Expand Down Expand Up @@ -639,4 +638,40 @@ bool DMFileReader::getCachedPacks(
col_data_cache->del(col_id, next_pack_id);
return found;
}

void DMFileReader::addScannedRows(UInt64 rows)
{
switch (read_tag)
{
case ReadTag::Query:
scan_context->dmfile_data_scanned_rows += rows;
break;
case ReadTag::MVCC:
scan_context->dmfile_mvcc_scanned_rows += rows;
break;
case ReadTag::LMFilter:
scan_context->dmfile_lm_filter_scanned_rows += rows;
break;
default:
break;
}
}

void DMFileReader::addSkippedRows(UInt64 rows)
{
switch (read_tag)
{
case ReadTag::Query:
scan_context->dmfile_data_skipped_rows += rows;
break;
case ReadTag::MVCC:
scan_context->dmfile_mvcc_skipped_rows += rows;
break;
case ReadTag::LMFilter:
scan_context->dmfile_lm_filter_skipped_rows += rows;
break;
default:
break;
}
}
} // namespace DB::DM
Loading