From 76f9e05f540e8cc6ee081359f80e7898e339f319 Mon Sep 17 00:00:00 2001 From: jinhelin Date: Sun, 28 Apr 2024 15:21:29 +0800 Subject: [PATCH 1/5] Storages: Change the element of `use_pack` from `bool` to `RSResult` in DMFilePackFilter --- .../gtest_ti_remote_block_inputstream.cpp | 2 +- .../Storages/DeltaMerge/File/ColumnStream.cpp | 6 +- .../DeltaMerge/File/DMFilePackFilter.cpp | 83 ++++++++++++------- .../DeltaMerge/File/DMFilePackFilter.h | 17 ++-- .../Storages/DeltaMerge/File/DMFileReader.cpp | 38 ++++----- dbms/src/Storages/DeltaMerge/Index/RSResult.h | 4 + dbms/src/Storages/DeltaMerge/ScanContext.cpp | 7 ++ dbms/src/Storages/DeltaMerge/ScanContext.h | 22 +++-- dbms/src/Storages/DeltaMerge/Segment.cpp | 4 +- .../Storages/DeltaMerge/StableValueSpace.cpp | 13 +-- .../DeltaMerge/tests/gtest_dm_file.cpp | 16 ++-- 11 files changed, 128 insertions(+), 84 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp index 9448bd8477b..0aa6d59dd93 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp @@ -78,7 +78,7 @@ struct MockWriter summary.scan_context->dmfile_mvcc_skipped_rows = 15000; summary.scan_context->dmfile_lm_filter_scanned_rows = 8000; summary.scan_context->dmfile_lm_filter_skipped_rows = 15000; - summary.scan_context->total_dmfile_rough_set_index_check_time_ns = 10; + summary.scan_context->total_rs_pack_filter_check_time_ns = 10; summary.scan_context->total_dmfile_read_time_ns = 200; summary.scan_context->create_snapshot_time_ns = 5; summary.scan_context->total_local_region_num = 10; diff --git a/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp b/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp index 381df2dbd8e..6ee1ebdbae1 100644 --- a/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp @@ -157,10 +157,10 @@ std::unique_ptr ColumnReadStream::buildColDataRe // Try to get the largest buffer size of reading continuous packs size_t buffer_size = 0; - const auto & use_packs = reader.pack_filter.getUsePacksConst(); + const auto & pack_res = reader.pack_filter.getPackResConst(); for (size_t i = 0; i < n_packs; /*empty*/) { - if (!use_packs[i]) + if (!isUse(pack_res[i])) { ++i; continue; @@ -168,7 +168,7 @@ std::unique_ptr ColumnReadStream::buildColDataRe size_t cur_offset_in_file = getOffsetInFile(i); size_t end = i + 1; // First, find the end of current available range. - while (end < n_packs && use_packs[end]) + while (end < n_packs && isUse(pack_res[end])) ++end; // Second, if the end of range is inside the block, we will need to read it too. diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp index 849b3ccff25..36034cb2a4b 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp @@ -21,6 +21,8 @@ namespace DB::DM void DMFilePackFilter::init() { + Stopwatch watch; + SCOPE_EXIT({ scan_context->total_rs_pack_filter_check_time_ns += watch.elapsed(); }); size_t pack_count = dmfile->getPacks(); auto read_all_packs = (rowkey_ranges.size() == 1 && rowkey_ranges[0].all()) || rowkey_ranges.empty(); if (!read_all_packs) @@ -47,33 +49,21 @@ void DMFilePackFilter::init() ProfileEvents::increment(ProfileEvents::DMFileFilterNoFilter, pack_count); - size_t after_pk = 0; - size_t after_read_packs = 0; - size_t after_filter = 0; - /// Check packs by handle_res - for (size_t i = 0; i < pack_count; ++i) - { - use_packs[i] = handle_res[i] != None; - } - - for (auto u : use_packs) - after_pk += u; + pack_res = handle_res; + auto after_pk = countUsePack(); /// Check packs by read_packs if (read_packs) { for (size_t i = 0; i < pack_count; ++i) { - use_packs[i] = (static_cast(use_packs[i])) && read_packs->contains(i); + pack_res[i] = pack_res[i] && (read_packs->contains(i) ? RSResult::Some : RSResult::None); } } - - for (auto u : use_packs) - after_read_packs += u; + auto after_read_packs = countUsePack(); ProfileEvents::increment(ProfileEvents::DMFileFilterAftPKAndPackSet, after_read_packs); - /// Check packs by filter in where clause if (filter) { @@ -84,20 +74,20 @@ void DMFilePackFilter::init() tryLoadIndex(id); } - Stopwatch watch; const auto check_results = filter->roughCheck(0, pack_count, param); std::transform( - use_packs.begin(), - use_packs.end(), - check_results.begin(), - use_packs.begin(), - [](UInt8 a, RSResult b) { return (static_cast(a)) && (b != None); }); - scan_context->total_dmfile_rough_set_index_check_time_ns += watch.elapsed(); + pack_res.cbegin(), + pack_res.cend(), + check_results.cbegin(), + pack_res.begin(), + [](RSResult a, RSResult b) { return a && b; }); } - - for (auto u : use_packs) - after_filter += u; + auto [none_count, some_count, all_count] = countPackRes(); + auto after_filter = some_count + all_count; ProfileEvents::increment(ProfileEvents::DMFileFilterAftRoughSet, after_filter); + scan_context->rs_pack_filter_none += none_count; + scan_context->rs_pack_filter_some += some_count; + scan_context->rs_pack_filter_all += all_count; Float64 filter_rate = 0.0; if (after_read_packs != 0) @@ -108,14 +98,47 @@ void DMFilePackFilter::init() LOG_DEBUG( log, "RSFilter exclude rate: {:.2f}, after_pk: {}, after_read_packs: {}, after_filter: {}, handle_ranges: {}" - ", read_packs: {}, pack_count: {}", + ", read_packs: {}, pack_count: {}, none_count: {}, some_count: {}, all_count: {}", ((after_read_packs == 0) ? std::numeric_limits::quiet_NaN() : filter_rate), after_pk, after_read_packs, after_filter, toDebugString(rowkey_ranges), ((read_packs == nullptr) ? 0 : read_packs->size()), - pack_count); + pack_count, + none_count, + some_count, + all_count); +} + +std::tuple DMFilePackFilter::countPackRes() const +{ + UInt64 none_count = 0; + UInt64 some_count = 0; + UInt64 all_count = 0; + for (auto res : pack_res) + { + switch (res) + { + case RSResult::None: + ++none_count; + break; + case RSResult::Some: + ++some_count; + break; + case RSResult::All: + ++all_count; + break; + default: + throw Exception(ErrorCodes::LOGICAL_ERROR, "{} is invalid", magic_enum::enum_name(res)); + } + } + return {none_count, some_count, all_count}; +} + +UInt64 DMFilePackFilter::countUsePack() const +{ + return std::count_if(pack_res.cbegin(), pack_res.cend(), [](RSResult res) { return isUse(res); }); } void DMFilePackFilter::loadIndex( @@ -224,7 +247,7 @@ void DMFilePackFilter::loadIndex( indexes.emplace(col_id, RSIndex(type, minmax_index)); } -void DMFilePackFilter::tryLoadIndex(const ColId col_id) +void DMFilePackFilter::tryLoadIndex(ColId col_id) { if (param.indexes.count(col_id)) return; @@ -234,8 +257,6 @@ void DMFilePackFilter::tryLoadIndex(const ColId col_id) Stopwatch watch; loadIndex(param.indexes, dmfile, file_provider, index_cache, set_cache_if_miss, col_id, read_limiter, scan_context); - - scan_context->total_dmfile_rough_set_index_check_time_ns += watch.elapsed(); } } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h index 2a227a0dc39..f4ba53d0712 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h @@ -68,9 +68,10 @@ class DMFilePackFilter return pack_filter; } - inline const std::vector & getHandleRes() const { return handle_res; } - inline const std::vector & getUsePacksConst() const { return use_packs; } - inline std::vector & getUsePacks() { return use_packs; } + const RSResults & getHandleRes() const { return handle_res; } + const RSResults & getPackResConst() const { return pack_res; } + RSResults & getPackRes() { return pack_res; } + UInt64 countUsePack() const; Handle getMinHandle(size_t pack_id) { @@ -104,7 +105,7 @@ class DMFilePackFilter const auto & pack_stats = dmfile->getPackStats(); for (size_t i = 0; i < pack_stats.size(); ++i) { - if (use_packs[i]) + if (isUse(pack_res[i])) { rows += pack_stats[i].rows; bytes += pack_stats[i].bytes; @@ -133,7 +134,6 @@ class DMFilePackFilter , read_packs(read_packs_) , file_provider(file_provider_) , handle_res(dmfile->getPacks(), RSResult::All) - , use_packs(dmfile->getPacks()) , scan_context(scan_context_) , log(Logger::get(tracing_id)) , read_limiter(read_limiter_) @@ -151,7 +151,10 @@ class DMFilePackFilter const ReadLimiterPtr & read_limiter, const ScanContextPtr & scan_context); - void tryLoadIndex(const ColId col_id); + void tryLoadIndex(ColId col_id); + + // None, Some, All + std::tuple countPackRes() const; private: DMFilePtr dmfile; @@ -165,7 +168,7 @@ class DMFilePackFilter RSCheckParam param; std::vector handle_res; - std::vector use_packs; + std::vector pack_res; const ScanContextPtr scan_context; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 33452997292..027f790d227 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -108,16 +108,16 @@ DMFileReader::DMFileReader( bool DMFileReader::getSkippedRows(size_t & skip_rows) { skip_rows = 0; - const auto & use_packs = pack_filter.getUsePacksConst(); + const auto & pack_res = pack_filter.getPackResConst(); const auto & pack_stats = dmfile->getPackStats(); - for (; next_pack_id < use_packs.size() && !use_packs[next_pack_id]; ++next_pack_id) + for (; next_pack_id < pack_res.size() && !isUse(pack_res[next_pack_id]); ++next_pack_id) { skip_rows += pack_stats[next_pack_id].rows; addSkippedRows(pack_stats[next_pack_id].rows); } next_row_offset += skip_rows; // return false if it is the end of stream. - return next_pack_id < use_packs.size(); + return next_pack_id < pack_res.size(); } // Skip the block which should be returned by next read() @@ -144,14 +144,14 @@ size_t DMFileReader::skipNextBlock() // Move forward next_pack_id and next_row_offset size_t DMFileReader::getReadRows() { - const auto & use_packs = pack_filter.getUsePacksConst(); + const auto & pack_res = pack_filter.getPackResConst(); const size_t start_pack_id = next_pack_id; // When read_one_pack_every_time is true, we can just read one pack every time. // std::numeric_limits::max() means no limit const size_t read_pack_limit = read_one_pack_every_time ? 1 : std::numeric_limits::max(); const auto & pack_stats = dmfile->getPackStats(); size_t read_rows = 0; - for (; next_pack_id < use_packs.size() && use_packs[next_pack_id] && read_rows < rows_threshold_per_read; + for (; next_pack_id < pack_res.size() && isUse(pack_res[next_pack_id]) && read_rows < rows_threshold_per_read; ++next_pack_id) { if (next_pack_id - start_pack_id >= read_pack_limit) @@ -172,10 +172,10 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter) return {}; } - /// 2. Mark use_packs[i] = false if all rows in the i-th pack are filtered out by filter. + /// 2. Mark pack_res[i] = None if all rows in the i-th pack are filtered out by filter. const auto & pack_stats = dmfile->getPackStats(); - auto & use_packs = pack_filter.getUsePacks(); + auto & pack_res = pack_filter.getPackRes(); size_t start_row_offset = next_row_offset; size_t start_pack_id = next_pack_id; @@ -187,18 +187,18 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter) for (size_t i = start_pack_id; i < last_pack_id; ++i) { if (countBytesInFilter(filter, offset, pack_stats[i].rows) == 0) - use_packs[i] = false; + pack_res[i] = RSResult::None; offset += pack_stats[i].rows; } } - /// 3. Mark the use_packs[last_pack_id] as false temporarily to avoid reading it and its following packs in this round + /// 3. Mark the pack_res[last_pack_id] as None temporarily to avoid reading it and its following packs in this round - bool next_pack_id_use_packs_cp = false; - if (last_pack_id < use_packs.size()) + auto next_pack_id_pack_res_cp = RSResult::None; + if (last_pack_id < pack_res.size()) { - next_pack_id_use_packs_cp = use_packs[last_pack_id]; - use_packs[last_pack_id] = false; + next_pack_id_pack_res_cp = pack_res[last_pack_id]; + pack_res[last_pack_id] = RSResult::None; } /// 4. Read and filter packs @@ -221,11 +221,11 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter) { // When the next pack is not used or the pack is the last pack, call read() to read theses packs and filter them // For example: - // When next_pack_id_cp = use_packs.size() and use_packs[next_pack_id:next_pack_id_cp] = [true, true, false, true, true, true] + // When next_pack_id_cp = pack_res.size() and pack_res[next_pack_id:next_pack_id_cp] = [true, true, false, true, true, true] // The algorithm runs as follows: // When i = next_pack_id + 2, call read() to read {next_pack_id, next_pack_id + 1}th packs // When i = next_pack_id + 5, call read() to read {next_pack_id + 3, next_pack_id + 4, next_pack_id + 5}th packs - if (use_packs[pack_id] && (pack_id + 1 == use_packs.size() || !use_packs[pack_id + 1])) + if (isUse(pack_res[pack_id]) && (pack_id + 1 == pack_res.size() || !isUse(pack_res[pack_id + 1]))) { Block block = read(); size_t rows = block.rows(); @@ -256,16 +256,16 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter) } offset += rows; } - else if (!use_packs[pack_id]) + else if (!isUse(pack_res[pack_id])) { offset += pack_stats[pack_id].rows; } } - /// 5. Restore the use_packs[last_pack_id] + /// 5. Restore the pack_res[last_pack_id] - if (last_pack_id < use_packs.size()) - use_packs[last_pack_id] = next_pack_id_use_packs_cp; + if (last_pack_id < pack_res.size()) + pack_res[last_pack_id] = next_pack_id_pack_res_cp; Block res = getHeader().cloneWithColumns(std::move(columns)); res.setStartOffset(start_row_offset); diff --git a/dbms/src/Storages/DeltaMerge/Index/RSResult.h b/dbms/src/Storages/DeltaMerge/Index/RSResult.h index 9617f900dad..a09cb65c5a4 100644 --- a/dbms/src/Storages/DeltaMerge/Index/RSResult.h +++ b/dbms/src/Storages/DeltaMerge/Index/RSResult.h @@ -75,6 +75,10 @@ inline RSResult operator&&(RSResult v0, RSResult v1) return Some; } +inline bool isUse(RSResult res) noexcept +{ + return res != RSResult::None; +} } // namespace DM } // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ScanContext.cpp b/dbms/src/Storages/DeltaMerge/ScanContext.cpp index 8dae573e5d8..5a5b126a40e 100644 --- a/dbms/src/Storages/DeltaMerge/ScanContext.cpp +++ b/dbms/src/Storages/DeltaMerge/ScanContext.cpp @@ -104,6 +104,13 @@ String ScanContext::toJson() const json->set("dmfile_lm_filter_skipped_rows", dmfile_lm_filter_skipped_rows.load()); json->set("dmfile_read_time", fmt::format("{:.3f}ms", total_dmfile_read_time_ns.load() / NS_TO_MS_SCALE)); + json->set( + "rs_pack_filter_check_time", + fmt::format("{:.3f}ms", total_rs_pack_filter_check_time_ns.load() / NS_TO_MS_SCALE)); + json->set("rs_pack_filter_none", rs_pack_filter_none.load()); + json->set("rs_pack_filter_some", rs_pack_filter_some.load()); + json->set("rs_pack_filter_all", rs_pack_filter_all.load()); + json->set("num_remote_region", total_remote_region_num.load()); json->set("num_local_region", total_local_region_num.load()); json->set("num_stale_read", num_stale_read.load()); diff --git a/dbms/src/Storages/DeltaMerge/ScanContext.h b/dbms/src/Storages/DeltaMerge/ScanContext.h index 78321dd8737..fdf26eb5200 100644 --- a/dbms/src/Storages/DeltaMerge/ScanContext.h +++ b/dbms/src/Storages/DeltaMerge/ScanContext.h @@ -40,10 +40,13 @@ class ScanContext std::atomic dmfile_mvcc_skipped_rows{0}; std::atomic dmfile_lm_filter_scanned_rows{0}; std::atomic dmfile_lm_filter_skipped_rows{0}; - - std::atomic total_dmfile_rough_set_index_check_time_ns{0}; std::atomic total_dmfile_read_time_ns{0}; + std::atomic total_rs_pack_filter_check_time_ns{0}; + std::atomic rs_pack_filter_none{0}; + std::atomic rs_pack_filter_some{0}; + std::atomic rs_pack_filter_all{0}; + std::atomic total_remote_region_num{0}; std::atomic total_local_region_num{0}; std::atomic num_stale_read{0}; @@ -75,7 +78,6 @@ class ScanContext std::atomic mvcc_output_rows{0}; std::atomic late_materialization_skip_rows{0}; - // TODO: filter // Learner read std::atomic learner_read_ns{0}; // Create snapshot from PageStorage @@ -98,7 +100,8 @@ class ScanContext dmfile_mvcc_skipped_rows = tiflash_scan_context_pb.dmfile_mvcc_skipped_rows(); dmfile_lm_filter_scanned_rows = tiflash_scan_context_pb.dmfile_lm_filter_scanned_rows(); dmfile_lm_filter_skipped_rows = tiflash_scan_context_pb.dmfile_lm_filter_skipped_rows(); - total_dmfile_rough_set_index_check_time_ns = tiflash_scan_context_pb.total_dmfile_rs_check_ms() * 1000000; + total_rs_pack_filter_check_time_ns = tiflash_scan_context_pb.total_dmfile_rs_check_ms() * 1000000; + // TODO: rs_pack_filter_none, rs_pack_filter_some, rs_pack_filter_all total_dmfile_read_time_ns = tiflash_scan_context_pb.total_dmfile_read_ms() * 1000000; create_snapshot_time_ns = tiflash_scan_context_pb.total_build_snapshot_ms() * 1000000; total_remote_region_num = tiflash_scan_context_pb.remote_regions(); @@ -140,7 +143,8 @@ class ScanContext tiflash_scan_context_pb.set_dmfile_mvcc_skipped_rows(dmfile_mvcc_skipped_rows); tiflash_scan_context_pb.set_dmfile_lm_filter_scanned_rows(dmfile_lm_filter_scanned_rows); tiflash_scan_context_pb.set_dmfile_lm_filter_skipped_rows(dmfile_lm_filter_skipped_rows); - tiflash_scan_context_pb.set_total_dmfile_rs_check_ms(total_dmfile_rough_set_index_check_time_ns / 1000000); + tiflash_scan_context_pb.set_total_dmfile_rs_check_ms(total_rs_pack_filter_check_time_ns / 1000000); + // TODO: pack_filter_none, pack_filter_some, pack_filter_all tiflash_scan_context_pb.set_total_dmfile_read_ms(total_dmfile_read_time_ns / 1000000); tiflash_scan_context_pb.set_total_build_snapshot_ms(create_snapshot_time_ns / 1000000); tiflash_scan_context_pb.set_remote_regions(total_remote_region_num); @@ -182,7 +186,10 @@ class ScanContext dmfile_mvcc_skipped_rows += other.dmfile_mvcc_skipped_rows; dmfile_lm_filter_scanned_rows += other.dmfile_lm_filter_scanned_rows; dmfile_lm_filter_skipped_rows += other.dmfile_lm_filter_skipped_rows; - total_dmfile_rough_set_index_check_time_ns += other.total_dmfile_rough_set_index_check_time_ns; + total_rs_pack_filter_check_time_ns += other.total_rs_pack_filter_check_time_ns; + rs_pack_filter_none += other.rs_pack_filter_none; + rs_pack_filter_some += other.rs_pack_filter_some; + rs_pack_filter_all += other.rs_pack_filter_all; total_dmfile_read_time_ns += other.total_dmfile_read_time_ns; total_local_region_num += other.total_local_region_num; @@ -227,7 +234,8 @@ class ScanContext dmfile_mvcc_skipped_rows += other.dmfile_mvcc_skipped_rows(); dmfile_lm_filter_scanned_rows += other.dmfile_lm_filter_scanned_rows(); dmfile_lm_filter_skipped_rows += other.dmfile_lm_filter_skipped_rows(); - total_dmfile_rough_set_index_check_time_ns += other.total_dmfile_rs_check_ms() * 1000000; + total_rs_pack_filter_check_time_ns += other.total_dmfile_rs_check_ms() * 1000000; + // TODO: rs_pack_filter_none, rs_pack_filter_some, rs_pack_filter_all total_dmfile_read_time_ns += other.total_dmfile_read_ms() * 1000000; create_snapshot_time_ns += other.total_build_snapshot_ms() * 1000000; total_local_region_num += other.local_regions(); diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index f5fc2a0a654..07dc45c0dd8 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -2945,7 +2945,7 @@ std::pair, std::vector> parseDMFilePackInfo( dm_context.global_context.getReadLimiter(), dm_context.scan_context, dm_context.tracing_id); - const auto & use_packs = pack_filter.getUsePacksConst(); + const auto & pack_res = pack_filter.getPackResConst(); const auto & handle_res = pack_filter.getHandleRes(); const auto & pack_stats = dmfile->getPackStats(); @@ -2955,7 +2955,7 @@ std::pair, std::vector> parseDMFilePackInfo( { const auto & pack_stat = pack_stats[pack_id]; preceded_rows += pack_stat.rows; - if (!use_packs[pack_id]) + if (!isUse(pack_res[pack_id])) { continue; } diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index fe11a7ce203..a508170ce8c 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -440,21 +440,22 @@ void StableValueSpace::calculateStableProperty( context.getReadLimiter(), context.scan_context, context.tracing_id); - const auto & use_packs = pack_filter.getUsePacksConst(); + const auto & pack_res = pack_filter.getPackResConst(); size_t new_pack_properties_index = 0; const bool use_new_pack_properties = pack_properties.property_size() == 0; if (use_new_pack_properties) { - const size_t use_packs_count = std::count(use_packs.begin(), use_packs.end(), true); + const size_t use_packs_count = pack_filter.countUsePack(); + RUNTIME_CHECK_MSG( static_cast(new_pack_properties.property_size()) == use_packs_count, "size doesn't match, new_pack_properties_size={} use_packs_size={}", new_pack_properties.property_size(), use_packs_count); } - for (size_t pack_id = 0; pack_id < use_packs.size(); ++pack_id) + for (size_t pack_id = 0; pack_id < pack_res.size(); ++pack_id) { - if (!use_packs[pack_id]) + if (!isUse(pack_res[pack_id])) continue; property.num_versions += pack_stats[pack_id].rows; property.num_puts += pack_stats[pack_id].rows - pack_stats[pack_id].not_clean; @@ -589,10 +590,10 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext & context.scan_context, context.tracing_id); const auto & pack_stats = f->getPackStats(); - const auto & use_packs = filter.getUsePacksConst(); + const auto & pack_res = filter.getPackResConst(); for (size_t i = 0; i < pack_stats.size(); ++i) { - if (use_packs[i]) + if (isUse(pack_res[i])) { ++match_packs; total_match_rows += pack_stats[i].rows; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp index ba2627fbe93..10517e91071 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp @@ -168,9 +168,9 @@ class DMFileMetaV2Test : public DB::base::TiFlashStorageTestBasic ASSERT_EQ(n, s.size()); } - static std::vector & getReaderUsePacks(DMFileBlockInputStreamPtr & stream) + static RSResults & getReaderPackRes(DMFileBlockInputStreamPtr & stream) { - return stream->reader.pack_filter.getUsePacks(); + return stream->reader.pack_filter.getPackRes(); } protected: @@ -905,10 +905,10 @@ try auto stream = builder.setColumnCache(column_cache) .build(dm_file, *cols, RowKeyRanges{RowKeyRange::newAll(false, 1)}, std::make_shared()); - auto & use_packs = getReaderUsePacks(stream); - use_packs[1] = false; + auto & pack_res = getReaderPackRes(stream); + pack_res[1] = RSResult::None; stream->skipNextBlock(); - use_packs[1] = true; + pack_res[1] = RSResult::Some; std::vector partial_expect_arr_values; partial_expect_arr_values.insert( partial_expect_arr_values.cend(), @@ -1117,10 +1117,10 @@ try auto stream = builder.setColumnCache(column_cache) .build(dm_file, *cols, RowKeyRanges{RowKeyRange::newAll(false, 1)}, std::make_shared()); - auto & use_packs = getReaderUsePacks(stream); - use_packs[1] = false; + auto & pack_res = getReaderPackRes(stream); + pack_res[1] = RSResult::None; ASSERT_EQ(stream->skipNextBlock(), num_rows_write / 3); - use_packs[1] = true; + pack_res[1] = RSResult::Some; ASSERT_INPUTSTREAM_COLS_UR( stream, Strings({DMTestEnv::pk_name}), From b8d742c53815ed68f1075391897f5397013b5984 Mon Sep 17 00:00:00 2001 From: jinhelin Date: Thu, 13 Jun 2024 16:46:52 +0800 Subject: [PATCH 2/5] Address comments --- dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp index 36034cb2a4b..40a1119fa00 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp @@ -130,7 +130,7 @@ std::tuple DMFilePackFilter::countPackRes() const ++all_count; break; default: - throw Exception(ErrorCodes::LOGICAL_ERROR, "{} is invalid", magic_enum::enum_name(res)); + throw Exception(ErrorCodes::LOGICAL_ERROR, "{} is invalid", static_cast(res)); } } return {none_count, some_count, all_count}; From 6d98e35f83001c67d86c02c98aa383fe441c4c6f Mon Sep 17 00:00:00 2001 From: jinhelin Date: Thu, 13 Jun 2024 16:58:17 +0800 Subject: [PATCH 3/5] Update dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp Co-authored-by: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> --- dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp index 40a1119fa00..f8ac8eace57 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp @@ -58,7 +58,7 @@ void DMFilePackFilter::init() { for (size_t i = 0; i < pack_count; ++i) { - pack_res[i] = pack_res[i] && (read_packs->contains(i) ? RSResult::Some : RSResult::None); + pack_res[i] = read_packs->contains(i) ? pack_res[i] : RSResult::None; } } auto after_read_packs = countUsePack(); From 90d97bebf4fecdde826ea0e806f94a3df79fb2b1 Mon Sep 17 00:00:00 2001 From: jinhelin Date: Thu, 13 Jun 2024 18:27:00 +0800 Subject: [PATCH 4/5] Update dbms/src/Storages/DeltaMerge/Index/RSResult.h Co-authored-by: JaySon --- dbms/src/Storages/DeltaMerge/Index/RSResult.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/DeltaMerge/Index/RSResult.h b/dbms/src/Storages/DeltaMerge/Index/RSResult.h index a09cb65c5a4..21c1cad6911 100644 --- a/dbms/src/Storages/DeltaMerge/Index/RSResult.h +++ b/dbms/src/Storages/DeltaMerge/Index/RSResult.h @@ -75,7 +75,7 @@ inline RSResult operator&&(RSResult v0, RSResult v1) return Some; } -inline bool isUse(RSResult res) noexcept +ALWAYS_INLINE inline bool isUse(RSResult res) noexcept { return res != RSResult::None; } From f161605a5d74563c1a6e0f51e666ac88cd6b9c33 Mon Sep 17 00:00:00 2001 From: jinhelin Date: Thu, 13 Jun 2024 18:28:35 +0800 Subject: [PATCH 5/5] address comment --- dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h index f4ba53d0712..16c55b96f4b 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h @@ -167,7 +167,9 @@ class DMFilePackFilter RSCheckParam param; + // `handle_res` is the filter results of `rowkey_ranges`. std::vector handle_res; + // `pack_res` is the filter results of `rowkey_ranges && filter && read_packs`. std::vector pack_res; const ScanContextPtr scan_context;