Skip to content

Commit

Permalink
Storages: Fix statistical data of DMFilePackFilter (#9254)
Browse files Browse the repository at this point in the history
ref #9103

Co-authored-by: Lloyd-Pottiger <[email protected]>
  • Loading branch information
JinheLin and Lloyd-Pottiger authored Jul 25, 2024
1 parent 335522f commit a4f0ec6
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 21 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ void ColumnFileBig::calculateStat(const DMContext & dm_context)
dm_context.global_context.getFileProvider(),
dm_context.getReadLimiter(),
dm_context.scan_context,
/*tracing_id*/ dm_context.tracing_id);
/*tracing_id*/ dm_context.tracing_id,
ReadTag::Internal);

std::tie(valid_rows, valid_bytes) = pack_filter.validRowsAndBytes();
}
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(
file_provider,
read_limiter,
scan_context,
tracing_id);
tracing_id,
read_tag);

bool enable_read_thread = SegmentReaderPoolManager::instance().isSegmentReader();

Expand Down
22 changes: 16 additions & 6 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
namespace DB::DM
{

void DMFilePackFilter::init()
void DMFilePackFilter::init(ReadTag read_tag)
{
Stopwatch watch;
SCOPE_EXIT({ scan_context->total_rs_pack_filter_check_time_ns += watch.elapsed(); });
Expand Down Expand Up @@ -85,9 +85,18 @@ void DMFilePackFilter::init()
auto [none_count, some_count, all_count] = countPackRes();
auto after_filter = some_count + all_count;
ProfileEvents::increment(ProfileEvents::DMFileFilterAftRoughSet, after_filter);
scan_context->rs_pack_filter_none += none_count;
scan_context->rs_pack_filter_some += some_count;
scan_context->rs_pack_filter_all += all_count;
// In table scanning, DMFilePackFilter of a DMFile may be created several times:
// 1. When building MVCC bitmap (ReadTag::MVCC).
// 2. When building LM filter stream (ReadTag::LM).
// 3. When building stream of other columns (ReadTag::Query).
// Only need to count the filter result once.
// TODO: We can create DMFilePackFilter at the beginning and pass it to the stages described above.
if (read_tag == ReadTag::Query)
{
scan_context->rs_pack_filter_none += none_count;
scan_context->rs_pack_filter_some += some_count;
scan_context->rs_pack_filter_all += all_count;
}

Float64 filter_rate = 0.0;
if (after_read_packs != 0)
Expand All @@ -98,7 +107,7 @@ void DMFilePackFilter::init()
LOG_DEBUG(
log,
"RSFilter exclude rate: {:.2f}, after_pk: {}, after_read_packs: {}, after_filter: {}, handle_ranges: {}"
", read_packs: {}, pack_count: {}, none_count: {}, some_count: {}, all_count: {}",
", read_packs: {}, pack_count: {}, none_count: {}, some_count: {}, all_count: {}, read_tag: {}",
((after_read_packs == 0) ? std::numeric_limits<double>::quiet_NaN() : filter_rate),
after_pk,
after_read_packs,
Expand All @@ -108,7 +117,8 @@ void DMFilePackFilter::init()
pack_count,
none_count,
some_count,
all_count);
all_count,
magic_enum::enum_name(read_tag));
}

std::tuple<UInt64, UInt64, UInt64> DMFilePackFilter::countPackRes() const
Expand Down
20 changes: 12 additions & 8 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <Storages/DeltaMerge/File/DMFilePackFilter_fwd.h>
#include <Storages/DeltaMerge/Filter/FilterHelper.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>
#include <Storages/DeltaMerge/ReadMode.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>
#include <Storages/S3/S3Common.h>
Expand Down Expand Up @@ -51,9 +52,10 @@ class DMFilePackFilter
const FileProviderPtr & file_provider,
const ReadLimiterPtr & read_limiter,
const ScanContextPtr & scan_context,
const String & tracing_id)
const String & tracing_id,
const ReadTag read_tag)
{
auto pack_filter = DMFilePackFilter(
return DMFilePackFilter(
dmfile,
index_cache,
set_cache_if_miss,
Expand All @@ -63,9 +65,8 @@ class DMFilePackFilter
file_provider,
read_limiter,
scan_context,
tracing_id);
pack_filter.init();
return pack_filter;
tracing_id,
read_tag);
}

const RSResults & getHandleRes() const { return handle_res; }
Expand Down Expand Up @@ -125,7 +126,8 @@ class DMFilePackFilter
const FileProviderPtr & file_provider_,
const ReadLimiterPtr & read_limiter_,
const ScanContextPtr & scan_context_,
const String & tracing_id)
const String & tracing_id,
const ReadTag read_tag)
: dmfile(dmfile_)
, index_cache(index_cache_)
, set_cache_if_miss(set_cache_if_miss_)
Expand All @@ -137,9 +139,11 @@ class DMFilePackFilter
, scan_context(scan_context_)
, log(Logger::get(tracing_id))
, read_limiter(read_limiter_)
{}
{
init(read_tag);
}

void init();
void init(ReadTag read_tag);

static void loadIndex(
ColumnIndexes & indexes,
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2944,7 +2944,8 @@ std::pair<std::vector<Range>, std::vector<IdSetPtr>> parseDMFilePackInfo(
dm_context.global_context.getFileProvider(),
dm_context.global_context.getReadLimiter(),
dm_context.scan_context,
dm_context.tracing_id);
dm_context.tracing_id,
ReadTag::MVCC);
const auto & pack_res = pack_filter.getPackResConst();
const auto & handle_res = pack_filter.getHandleRes();
const auto & pack_stats = dmfile->getPackStats();
Expand Down
12 changes: 8 additions & 4 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ void StableValueSpace::setFiles(const DMFiles & files_, const RowKeyRange & rang
dm_context->global_context.getFileProvider(),
dm_context->getReadLimiter(),
dm_context->scan_context,
dm_context->tracing_id);
dm_context->tracing_id,
ReadTag::Internal);
auto [file_valid_rows, file_valid_bytes] = pack_filter.validRowsAndBytes();
rows += file_valid_rows;
bytes += file_valid_bytes;
Expand Down Expand Up @@ -439,7 +440,8 @@ void StableValueSpace::calculateStableProperty(
context.global_context.getFileProvider(),
context.getReadLimiter(),
context.scan_context,
context.tracing_id);
context.tracing_id,
ReadTag::Internal);
const auto & pack_res = pack_filter.getPackResConst();
size_t new_pack_properties_index = 0;
const bool use_new_pack_properties = pack_properties.property_size() == 0;
Expand Down Expand Up @@ -588,7 +590,8 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext &
context.global_context.getFileProvider(),
context.getReadLimiter(),
context.scan_context,
context.tracing_id);
context.tracing_id,
ReadTag::Internal);
const auto & pack_stats = f->getPackStats();
const auto & pack_res = filter.getPackResConst();
for (size_t i = 0; i < pack_stats.size(); ++i)
Expand Down Expand Up @@ -633,7 +636,8 @@ StableValueSpace::Snapshot::getAtLeastRowsAndBytes(const DMContext & context, co
context.global_context.getFileProvider(),
context.getReadLimiter(),
context.scan_context,
context.tracing_id);
context.tracing_id,
ReadTag::Internal);
const auto & handle_filter_result = filter.getHandleRes();
if (file_idx == 0)
{
Expand Down

0 comments on commit a4f0ec6

Please sign in to comment.