From 2313643c2262995256328aa8b480e5b3c919ba66 Mon Sep 17 00:00:00 2001 From: JaySon Date: Tue, 15 Oct 2024 18:17:32 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #9530 Signed-off-by: ti-chi-bot --- .../ColumnFile/ColumnFileSetSnapshot.cpp | 2 +- .../ColumnFile/ColumnFileSetSnapshot.h | 9 +- .../DeltaMerge/Delta/DeltaValueSpace.h | 9 +- .../Storages/DeltaMerge/Delta/Snapshot.cpp | 6 +- .../DeltaMerge/DeltaMergeStore_InternalBg.cpp | 4 +- .../DeltaMerge/File/DMFilePackFilter.cpp | 311 ++++++++++++++++++ .../Storages/DeltaMerge/Filter/FilterHelper.h | 9 +- .../DeltaMerge/Remote/DisaggSnapshot.h | 1 - .../Storages/DeltaMerge/Remote/Serializer.cpp | 2 - .../DeltaMerge/tests/gtest_dm_file.cpp | 46 +-- .../DeltaMerge/tests/gtest_dm_segment.cpp | 3 +- .../tests/gtest_dm_segment_common_handle.cpp | 68 ++-- 12 files changed, 407 insertions(+), 63 deletions(-) create mode 100644 dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.cpp index 84274ee6b3e..c3218ef41d7 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.cpp @@ -20,7 +20,7 @@ namespace DB { namespace DM { -RowKeyRange ColumnFileSetSnapshot::getSquashDeleteRange() const +RowKeyRange ColumnFileSetSnapshot::getSquashDeleteRange(bool is_common_handle, size_t rowkey_column_size) const { RowKeyRange squashed_delete_range = RowKeyRange::newNone(is_common_handle, rowkey_column_size); for (const auto & column_file : column_files) diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h index b5ef4966c6c..42b55a11e94 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h @@ -45,7 +45,7 @@ class BlockOrDelete {} bool isBlock() { return static_cast(block); } - auto & getBlock() { return block; }; + auto & getBlock() { return block; } auto getBlockOffset() const { return block_offset; } auto & getDeleteRange() { return delete_range; } }; @@ -66,9 +66,6 @@ class ColumnFileSetSnapshot size_t bytes{0}; size_t deletes{0}; - bool is_common_handle{false}; - size_t rowkey_column_size{0}; - public: /// This field is public writeable intentionally. It allows us to build a snapshot first, /// then change how these data can be read later. @@ -89,8 +86,6 @@ class ColumnFileSetSnapshot c->rows = rows; c->bytes = bytes; c->deletes = deletes; - c->is_common_handle = is_common_handle; - c->rowkey_column_size = rowkey_column_size; return c; } @@ -102,7 +97,7 @@ class ColumnFileSetSnapshot size_t getBytes() const { return bytes; } size_t getDeletes() const { return deletes; } - RowKeyRange getSquashDeleteRange() const; + RowKeyRange getSquashDeleteRange(bool is_common_handle, size_t rowkey_column_size) const; const auto & getDataProvider() const { return data_provider; } }; diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h index d915eb2c8d1..537795bcf95 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h @@ -393,7 +393,14 @@ class DeltaValueSnapshot size_t getMemTableSetRowsOffset() const { return persisted_files_snap->getRows(); } size_t getMemTableSetDeletesOffset() const { return persisted_files_snap->getDeletes(); } - RowKeyRange getSquashDeleteRange() const; + /** + * Get a "squash" delete range in the delta layer. We can use this to ** estimate ** how many rows in the + * stable layer is pending for delete. + * The squash delete range may be larger than that will actually removed rows. For example, if there are + * two delete range [0, 30) and [100, 200), the squash delete range will be [0, 200). Actually after applying + * the two delete range, the rows [30, 100) will not be removed. + */ + RowKeyRange getSquashDeleteRange(bool is_common_handle, size_t rowkey_column_size) const; const auto & getSharedDeltaIndex() { return shared_delta_index; } size_t getDeltaIndexEpoch() const { return delta_index_epoch; } diff --git a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp index 0e0490d8ce9..c15044a0caa 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp @@ -56,10 +56,10 @@ DeltaSnapshotPtr DeltaValueSpace::createSnapshot( return snap; } -RowKeyRange DeltaValueSnapshot::getSquashDeleteRange() const +RowKeyRange DeltaValueSnapshot::getSquashDeleteRange(bool is_common_handle, size_t rowkey_column_size) const { - auto delete_range1 = mem_table_snap->getSquashDeleteRange(); - auto delete_range2 = persisted_files_snap->getSquashDeleteRange(); + auto delete_range1 = mem_table_snap->getSquashDeleteRange(is_common_handle, rowkey_column_size); + auto delete_range2 = persisted_files_snap->getSquashDeleteRange(is_common_handle, rowkey_column_size); return delete_range1.merge(delete_range2); } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp index 9bdcd4900e7..a31c2a5a80a 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp @@ -509,7 +509,9 @@ bool shouldCompactDeltaWithStable( double invalid_data_ratio_threshold, const LoggerPtr & log) { - auto actual_delete_range = snap->delta->getSquashDeleteRange().shrink(segment_range); + auto actual_delete_range + = snap->delta->getSquashDeleteRange(segment_range.is_common_handle, segment_range.rowkey_column_size) + .shrink(segment_range); if (actual_delete_range.none()) return false; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp new file mode 100644 index 00000000000..e465f11caad --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp @@ -0,0 +1,311 @@ + +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include + +namespace DB::DM +{ + +void DMFilePackFilter::init(ReadTag read_tag) +{ + Stopwatch watch; + SCOPE_EXIT({ scan_context->total_rs_pack_filter_check_time_ns += watch.elapsed(); }); + size_t pack_count = dmfile->getPacks(); + auto read_all_packs = (rowkey_ranges.size() == 1 && rowkey_ranges[0].all()) || rowkey_ranges.empty(); + if (!read_all_packs) + { + tryLoadIndex(EXTRA_HANDLE_COLUMN_ID); + std::vector handle_filters; + for (auto & rowkey_range : rowkey_ranges) + handle_filters.emplace_back(toFilter(rowkey_range)); +#ifndef NDEBUG + // sanity check under debug mode to ensure the rowkey_range is correct common-handle or int64-handle + if (!rowkey_ranges.empty()) + { + bool is_common_handle = rowkey_ranges.begin()->is_common_handle; + auto handle_col_type = dmfile->getColumnStat(EXTRA_HANDLE_COLUMN_ID).type; + if (is_common_handle) + RUNTIME_CHECK_MSG( + handle_col_type->getTypeId() == TypeIndex::String, + "handle_col_type_id={}", + magic_enum::enum_name(handle_col_type->getTypeId())); + else + RUNTIME_CHECK_MSG( + handle_col_type->getTypeId() == TypeIndex::Int64, + "handle_col_type_id={}", + magic_enum::enum_name(handle_col_type->getTypeId())); + for (size_t i = 1; i < rowkey_ranges.size(); ++i) + { + RUNTIME_CHECK_MSG( + is_common_handle == rowkey_ranges[i].is_common_handle, + "i={} is_common_handle={} ith.is_common_handle={}", + i, + is_common_handle, + rowkey_ranges[i].is_common_handle); + } + } +#endif + for (size_t i = 0; i < pack_count; ++i) + { + handle_res[i] = RSResult::None; + } + for (auto & handle_filter : handle_filters) + { + auto res = handle_filter->roughCheck(0, pack_count, param); + std::transform( + handle_res.begin(), + handle_res.end(), + res.begin(), + handle_res.begin(), + [](RSResult a, RSResult b) { return a || b; }); + } + } + + ProfileEvents::increment(ProfileEvents::DMFileFilterNoFilter, pack_count); + + /// Check packs by handle_res + pack_res = handle_res; + auto after_pk = countUsePack(); + + /// Check packs by read_packs + if (read_packs) + { + for (size_t i = 0; i < pack_count; ++i) + { + pack_res[i] = read_packs->contains(i) ? pack_res[i] : RSResult::None; + } + } + auto after_read_packs = countUsePack(); + ProfileEvents::increment(ProfileEvents::DMFileFilterAftPKAndPackSet, after_read_packs); + + /// Check packs by filter in where clause + if (filter) + { + // Load index based on filter. + ColIds ids = filter->getColumnIDs(); + for (const auto & id : ids) + { + tryLoadIndex(id); + } + + const auto check_results = filter->roughCheck(0, pack_count, param); + std::transform( + pack_res.cbegin(), + pack_res.cend(), + check_results.cbegin(), + pack_res.begin(), + [](RSResult a, RSResult b) { return a && b; }); + } + else + { + // ColumnFileBig in DeltaValueSpace never pass a filter to DMFilePackFilter. + // Assume its filter always return Some. + std::transform(pack_res.cbegin(), pack_res.cend(), pack_res.begin(), [](RSResult a) { + return a && RSResult::Some; + }); + } + + auto [none_count, some_count, all_count, all_null_count] = countPackRes(); + auto after_filter = some_count + all_count + all_null_count; + ProfileEvents::increment(ProfileEvents::DMFileFilterAftRoughSet, after_filter); + // In table scanning, DMFilePackFilter of a DMFile may be created several times: + // 1. When building MVCC bitmap (ReadTag::MVCC). + // 2. When building LM filter stream (ReadTag::LM). + // 3. When building stream of other columns (ReadTag::Query). + // Only need to count the filter result once. + // TODO: We can create DMFilePackFilter at the beginning and pass it to the stages described above. + if (read_tag == ReadTag::Query) + { + scan_context->rs_pack_filter_none += none_count; + scan_context->rs_pack_filter_some += some_count; + scan_context->rs_pack_filter_all += all_count; + scan_context->rs_pack_filter_all_null += all_null_count; + } + + Float64 filter_rate = 0.0; + if (after_read_packs != 0) + { + filter_rate = (after_read_packs - after_filter) * 100.0 / after_read_packs; + GET_METRIC(tiflash_storage_rough_set_filter_rate, type_dtfile_pack).Observe(filter_rate); + } + LOG_DEBUG( + log, + "RSFilter exclude rate: {:.2f}, after_pk: {}, after_read_packs: {}, after_filter: {}, handle_ranges: {}" + ", read_packs: {}, pack_count: {}, none_count: {}, some_count: {}, all_count: {}, all_null_count: {}, " + "read_tag: {}", + ((after_read_packs == 0) ? std::numeric_limits::quiet_NaN() : filter_rate), + after_pk, + after_read_packs, + after_filter, + toDebugString(rowkey_ranges), + ((read_packs == nullptr) ? 0 : read_packs->size()), + pack_count, + none_count, + some_count, + all_count, + all_null_count, + magic_enum::enum_name(read_tag)); +} + +std::tuple DMFilePackFilter::countPackRes() const +{ + UInt64 none_count = 0; + UInt64 some_count = 0; + UInt64 all_count = 0; + UInt64 all_null_count = 0; + for (auto res : pack_res) + { + if (res == RSResult::None || res == RSResult::NoneNull) + ++none_count; + else if (res == RSResult::Some || res == RSResult::SomeNull) + ++some_count; + else if (res == RSResult::All) + ++all_count; + else if (res == RSResult::AllNull) + ++all_null_count; + } + return {none_count, some_count, all_count, all_null_count}; +} + +UInt64 DMFilePackFilter::countUsePack() const +{ + return std::count_if(pack_res.cbegin(), pack_res.cend(), [](RSResult res) { return res.isUse(); }); +} + +void DMFilePackFilter::loadIndex( + ColumnIndexes & indexes, + const DMFilePtr & dmfile, + const FileProviderPtr & file_provider, + const MinMaxIndexCachePtr & index_cache, + bool set_cache_if_miss, + ColId col_id, + const ReadLimiterPtr & read_limiter, + const ScanContextPtr & scan_context) +{ + const auto & type = dmfile->getColumnStat(col_id).type; + const auto file_name_base = DMFile::getFileNameBase(col_id); + + auto load = [&]() { + auto index_file_size = dmfile->colIndexSize(col_id); + if (index_file_size == 0) + return std::make_shared(*type); + auto index_guard = S3::S3RandomAccessFile::setReadFileInfo({ + .size = dmfile->getReadFileSize(col_id, colIndexFileName(file_name_base)), + .scan_context = scan_context, + }); + if (!dmfile->getConfiguration()) // v1 + { + auto index_buf = ReadBufferFromRandomAccessFileBuilder::build( + file_provider, + dmfile->colIndexPath(file_name_base), + dmfile->encryptionIndexPath(file_name_base), + std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), index_file_size), + read_limiter); + return MinMaxIndex::read(*type, index_buf, index_file_size); + } + else if (dmfile->useMetaV2()) // v3 + { + const auto * dmfile_meta = typeid_cast(dmfile->meta.get()); + assert(dmfile_meta != nullptr); + auto info = dmfile_meta->merged_sub_file_infos.find(colIndexFileName(file_name_base)); + if (info == dmfile_meta->merged_sub_file_infos.end()) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Unknown index file {}", + dmfile->colIndexPath(file_name_base)); + } + + auto file_path = dmfile->meta->mergedPath(info->second.number); + auto encryp_path = dmfile_meta->encryptionMergedPath(info->second.number); + auto offset = info->second.offset; + auto data_size = info->second.size; + + auto buffer = ReadBufferFromRandomAccessFileBuilder::build( + file_provider, + file_path, + encryp_path, + dmfile->getConfiguration()->getChecksumFrameLength(), + read_limiter); + buffer.seek(offset); + + String raw_data; + raw_data.resize(data_size); + + buffer.read(reinterpret_cast(raw_data.data()), data_size); + + auto buf = ChecksumReadBufferBuilder::build( + std::move(raw_data), + dmfile->colIndexPath(file_name_base), // just for debug + dmfile->getConfiguration()->getChecksumFrameLength(), + dmfile->getConfiguration()->getChecksumAlgorithm(), + dmfile->getConfiguration()->getChecksumFrameLength()); + + auto header_size = dmfile->getConfiguration()->getChecksumHeaderLength(); + auto frame_total_size = dmfile->getConfiguration()->getChecksumFrameLength() + header_size; + auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); + + return MinMaxIndex::read(*type, *buf, index_file_size - header_size * frame_count); + } + else + { // v2 + auto index_buf = ChecksumReadBufferBuilder::build( + file_provider, + dmfile->colIndexPath(file_name_base), + dmfile->encryptionIndexPath(file_name_base), + index_file_size, + read_limiter, + dmfile->getConfiguration()->getChecksumAlgorithm(), + dmfile->getConfiguration()->getChecksumFrameLength()); + auto header_size = dmfile->getConfiguration()->getChecksumHeaderLength(); + auto frame_total_size = dmfile->getConfiguration()->getChecksumFrameLength() + header_size; + auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); + return MinMaxIndex::read(*type, *index_buf, index_file_size - header_size * frame_count); + } + }; + MinMaxIndexPtr minmax_index; + if (index_cache && set_cache_if_miss) + { + minmax_index = index_cache->getOrSet(dmfile->colIndexCacheKey(file_name_base), load); + } + else + { + // try load from the cache first + if (index_cache) + minmax_index = index_cache->get(dmfile->colIndexCacheKey(file_name_base)); + if (minmax_index == nullptr) + minmax_index = load(); + } + indexes.emplace(col_id, RSIndex(type, minmax_index)); +} + +void DMFilePackFilter::tryLoadIndex(ColId col_id) +{ + if (param.indexes.count(col_id)) + return; + + if (!dmfile->isColIndexExist(col_id)) + return; + + Stopwatch watch; + loadIndex(param.indexes, dmfile, file_provider, index_cache, set_cache_if_miss, col_id, read_limiter, scan_context); +} + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Filter/FilterHelper.h b/dbms/src/Storages/DeltaMerge/Filter/FilterHelper.h index fd994d3d031..6ea4c069fb0 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/FilterHelper.h +++ b/dbms/src/Storages/DeltaMerge/Filter/FilterHelper.h @@ -23,10 +23,11 @@ namespace DB::DM inline RSOperatorPtr toFilter(RowKeyRange & rowkey_range) { - Attr handle_attr - = {EXTRA_HANDLE_COLUMN_NAME, - EXTRA_HANDLE_COLUMN_ID, - rowkey_range.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE}; + Attr handle_attr = { + EXTRA_HANDLE_COLUMN_NAME, + EXTRA_HANDLE_COLUMN_ID, + rowkey_range.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE, + }; if (rowkey_range.is_common_handle) { auto left = createGreaterEqual( diff --git a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h index 86ac2a7e62f..2d9d88ad5b7 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h +++ b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h @@ -16,7 +16,6 @@ #include #include -#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp index 5a391078d2d..611703fd77d 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp @@ -214,8 +214,6 @@ ColumnFileSetSnapshotPtr Serializer::deserializeColumnFileSet( { auto empty_data_provider = std::make_shared(); auto ret = std::make_shared(empty_data_provider); - ret->is_common_handle = segment_range.is_common_handle; - ret->rowkey_column_size = segment_range.rowkey_column_size; ret->column_files.reserve(proto.size()); for (const auto & remote_column_file : proto) { diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp index 6081444e54f..b3cbdd560f0 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -1609,28 +1610,33 @@ try RowKeyRange range; Int64 start, end; }; - std::vector ranges; - ranges.emplace_back( - DMTestEnv::getRowKeyRangeForClusteredIndex(0, span_per_part, rowkey_column_size), - 0, - span_per_part); // only first part - ranges.emplace_back( - DMTestEnv::getRowKeyRangeForClusteredIndex(800, num_rows_write, rowkey_column_size), - 800, - num_rows_write); - ranges.emplace_back(DMTestEnv::getRowKeyRangeForClusteredIndex(256, 700, rowkey_column_size), 256, 700); // - ranges.emplace_back(DMTestEnv::getRowKeyRangeForClusteredIndex(0, 0, rowkey_column_size), 0, 0); // none - ranges.emplace_back( - DMTestEnv::getRowKeyRangeForClusteredIndex(0, num_rows_write, rowkey_column_size), - 0, - num_rows_write); // full range - ranges.emplace_back( - DMTestEnv::getRowKeyRangeForClusteredIndex( + std::vector ranges{ + QueryRangeInfo{ + DMTestEnv::getRowKeyRangeForClusteredIndex(0, span_per_part, rowkey_column_size), + 0, + span_per_part // only first part + }, + QueryRangeInfo{ + DMTestEnv::getRowKeyRangeForClusteredIndex(800, num_rows_write, rowkey_column_size), + 800, + num_rows_write}, + QueryRangeInfo{DMTestEnv::getRowKeyRangeForClusteredIndex(256, 700, rowkey_column_size), 256, 700}, + QueryRangeInfo{DMTestEnv::getRowKeyRangeForClusteredIndex(0, 0, rowkey_column_size), 0, 0}, //none + QueryRangeInfo{ + DMTestEnv::getRowKeyRangeForClusteredIndex(0, num_rows_write, rowkey_column_size), + 0, + num_rows_write, + }, // full range + QueryRangeInfo{ + DMTestEnv::getRowKeyRangeForClusteredIndex( + std::numeric_limits::min(), + std::numeric_limits::max(), + rowkey_column_size), std::numeric_limits::min(), std::numeric_limits::max(), - rowkey_column_size), - std::numeric_limits::min(), - std::numeric_limits::max()); // full range + }, // full range + }; + for (const auto & range : ranges) { // Test read diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 6e8a9e8dffc..13aae4b16ff 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -909,7 +909,7 @@ try auto check_segment_squash_delete_range = [this](SegmentPtr & segment, const HandleRange & expect_range) { // set `is_update=false` to get full squash delete range auto snap = segment->createSnapshot(dmContext(), /*for_update*/ false, CurrentMetrics::DT_SnapshotOfRead); - auto squash_range = snap->delta->getSquashDeleteRange(); + auto squash_range = snap->delta->getSquashDeleteRange(/*is_common_handle=*/false, /*rowkey_column_size=*/1); ASSERT_ROWKEY_RANGE_EQ(squash_range, RowKeyRange::fromHandleRange(expect_range)); }; @@ -955,6 +955,7 @@ try HandleRange del{1, 32}; segment->write(dmContext(), {RowKeyRange::fromHandleRange(del)}); SCOPED_TRACE("check after range: " + del.toDebugString()); + // suqash_delete_range will consider [1, 100) maybe deleted check_segment_squash_delete_range(segment, HandleRange{1, 100}); } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp index 883fc2ed386..154ffb7e828 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp @@ -28,12 +28,18 @@ #include #include -namespace DB +namespace CurrentMetrics { -namespace DM -{ -namespace tests +extern const Metric DT_SnapshotOfRead; +extern const Metric DT_SnapshotOfReadRaw; +extern const Metric DT_SnapshotOfSegmentSplit; +extern const Metric DT_SnapshotOfSegmentMerge; +extern const Metric DT_SnapshotOfDeltaMerge; +extern const Metric DT_SnapshotOfPlaceIndex; +} // namespace CurrentMetrics +namespace DB::DM::tests { + class SegmentCommonHandleTest : public DB::base::TiFlashStorageTestBasic { public: @@ -580,16 +586,27 @@ try } { - // flush segment + // do delta-merge move data to stable segment = segment->mergeDelta(dmContext(), tableColumns()); } + auto check_segment_squash_delete_range = [this](SegmentPtr & segment, const RowKeyRange & expect_range) { + // set `is_update=false` to get full squash delete range + auto snap = segment->createSnapshot(dmContext(), /*for_update*/ false, CurrentMetrics::DT_SnapshotOfRead); + auto squash_range = snap->delta->getSquashDeleteRange(is_common_handle, rowkey_column_size); + ASSERT_ROWKEY_RANGE_EQ(squash_range, expect_range); + }; + { // Test delete range [70, 100) - segment->write(dmContext(), {DMTestEnv::getRowKeyRangeForClusteredIndex(70, 100, rowkey_column_size)}); - // flush segment + auto del_row_range = DMTestEnv::getRowKeyRangeForClusteredIndex(70, 100, rowkey_column_size); + SCOPED_TRACE("check after range: " + del_row_range.toDebugString()); // Add trace msg when ASSERT failed + // mem-table + segment->write(dmContext(), {del_row_range}); + check_segment_squash_delete_range(segment, del_row_range); + // persisted-file segment->flushCache(dmContext()); - segment = segment->mergeDelta(dmContext(), tableColumns()); + check_segment_squash_delete_range(segment, del_row_range); } { @@ -619,10 +636,15 @@ try { // Test delete range [63, 70) - segment->write(dmContext(), {DMTestEnv::getRowKeyRangeForClusteredIndex(63, 70, rowkey_column_size)}); - // flush segment + auto del_row_range = DMTestEnv::getRowKeyRangeForClusteredIndex(63, 70, rowkey_column_size); + auto merged_del_range = DMTestEnv::getRowKeyRangeForClusteredIndex(63, 100, rowkey_column_size); + SCOPED_TRACE("check after range: " + del_row_range.toDebugString()); // Add trace msg when ASSERT failed + // mem-table + segment->write(dmContext(), {del_row_range}); + check_segment_squash_delete_range(segment, merged_del_range); + // persisted-file segment->flushCache(dmContext()); - segment = segment->mergeDelta(dmContext(), tableColumns()); + check_segment_squash_delete_range(segment, merged_del_range); } { @@ -652,10 +674,14 @@ try { // Test delete range [1, 32) - segment->write(dmContext(), {DMTestEnv::getRowKeyRangeForClusteredIndex(1, 32, rowkey_column_size)}); - // flush segment - segment->flushCache(dmContext()); - segment = segment->mergeDelta(dmContext(), tableColumns()); + auto del_row_range = DMTestEnv::getRowKeyRangeForClusteredIndex(1, 32, rowkey_column_size); + SCOPED_TRACE("check after range: " + del_row_range.toDebugString()); // Add trace msg when ASSERT failed + segment->write(dmContext(), {del_row_range}); + auto merged_del_range = DMTestEnv::getRowKeyRangeForClusteredIndex( + 1, + 100, + rowkey_column_size); // suqash_delete_range will consider [1, 100) maybe deleted + check_segment_squash_delete_range(segment, merged_del_range); } { @@ -869,7 +895,7 @@ try ASSERT_EQ(c1->size(), c2->size()); - for (Int64 i = 0; i < Int64(c1->size()); i++) + for (Int64 i = 0; i < static_cast(c1->size()); i++) { if (iter1->name == DMTestEnv::pk_name) { @@ -970,8 +996,8 @@ try segment->write( dmContext(), DMTestEnv::getRowKeyRangeForClusteredIndex( - Int64((num_batches_written - 1) * num_rows_per_write), - Int64((num_batches_written - 1) * num_rows_per_write + 2), + static_cast((num_batches_written - 1) * num_rows_per_write), + static_cast((num_batches_written - 1) * num_rows_per_write + 2), rowkey_column_size)); } @@ -984,7 +1010,7 @@ try i < num_batches_written * num_rows_per_write; i++) { - temp.push_back(Int64(i)); + temp.push_back(static_cast(i)); } { @@ -1023,6 +1049,4 @@ try } CATCH -} // namespace tests -} // namespace DM -} // namespace DB +} // namespace DB::DM::tests From 9b954f6ce18af974973b1c73f05348061fa445a1 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 17 Oct 2024 18:21:42 +0800 Subject: [PATCH 2/2] Resolve conflicts Signed-off-by: JaySon-Huang --- .../DeltaMerge/File/DMFilePackFilter.cpp | 311 ------------------ .../DeltaMerge/File/DMFilePackFilter.h | 29 ++ 2 files changed, 29 insertions(+), 311 deletions(-) delete mode 100644 dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp deleted file mode 100644 index e465f11caad..00000000000 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp +++ /dev/null @@ -1,311 +0,0 @@ - -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include - -#include - -namespace DB::DM -{ - -void DMFilePackFilter::init(ReadTag read_tag) -{ - Stopwatch watch; - SCOPE_EXIT({ scan_context->total_rs_pack_filter_check_time_ns += watch.elapsed(); }); - size_t pack_count = dmfile->getPacks(); - auto read_all_packs = (rowkey_ranges.size() == 1 && rowkey_ranges[0].all()) || rowkey_ranges.empty(); - if (!read_all_packs) - { - tryLoadIndex(EXTRA_HANDLE_COLUMN_ID); - std::vector handle_filters; - for (auto & rowkey_range : rowkey_ranges) - handle_filters.emplace_back(toFilter(rowkey_range)); -#ifndef NDEBUG - // sanity check under debug mode to ensure the rowkey_range is correct common-handle or int64-handle - if (!rowkey_ranges.empty()) - { - bool is_common_handle = rowkey_ranges.begin()->is_common_handle; - auto handle_col_type = dmfile->getColumnStat(EXTRA_HANDLE_COLUMN_ID).type; - if (is_common_handle) - RUNTIME_CHECK_MSG( - handle_col_type->getTypeId() == TypeIndex::String, - "handle_col_type_id={}", - magic_enum::enum_name(handle_col_type->getTypeId())); - else - RUNTIME_CHECK_MSG( - handle_col_type->getTypeId() == TypeIndex::Int64, - "handle_col_type_id={}", - magic_enum::enum_name(handle_col_type->getTypeId())); - for (size_t i = 1; i < rowkey_ranges.size(); ++i) - { - RUNTIME_CHECK_MSG( - is_common_handle == rowkey_ranges[i].is_common_handle, - "i={} is_common_handle={} ith.is_common_handle={}", - i, - is_common_handle, - rowkey_ranges[i].is_common_handle); - } - } -#endif - for (size_t i = 0; i < pack_count; ++i) - { - handle_res[i] = RSResult::None; - } - for (auto & handle_filter : handle_filters) - { - auto res = handle_filter->roughCheck(0, pack_count, param); - std::transform( - handle_res.begin(), - handle_res.end(), - res.begin(), - handle_res.begin(), - [](RSResult a, RSResult b) { return a || b; }); - } - } - - ProfileEvents::increment(ProfileEvents::DMFileFilterNoFilter, pack_count); - - /// Check packs by handle_res - pack_res = handle_res; - auto after_pk = countUsePack(); - - /// Check packs by read_packs - if (read_packs) - { - for (size_t i = 0; i < pack_count; ++i) - { - pack_res[i] = read_packs->contains(i) ? pack_res[i] : RSResult::None; - } - } - auto after_read_packs = countUsePack(); - ProfileEvents::increment(ProfileEvents::DMFileFilterAftPKAndPackSet, after_read_packs); - - /// Check packs by filter in where clause - if (filter) - { - // Load index based on filter. - ColIds ids = filter->getColumnIDs(); - for (const auto & id : ids) - { - tryLoadIndex(id); - } - - const auto check_results = filter->roughCheck(0, pack_count, param); - std::transform( - pack_res.cbegin(), - pack_res.cend(), - check_results.cbegin(), - pack_res.begin(), - [](RSResult a, RSResult b) { return a && b; }); - } - else - { - // ColumnFileBig in DeltaValueSpace never pass a filter to DMFilePackFilter. - // Assume its filter always return Some. - std::transform(pack_res.cbegin(), pack_res.cend(), pack_res.begin(), [](RSResult a) { - return a && RSResult::Some; - }); - } - - auto [none_count, some_count, all_count, all_null_count] = countPackRes(); - auto after_filter = some_count + all_count + all_null_count; - ProfileEvents::increment(ProfileEvents::DMFileFilterAftRoughSet, after_filter); - // In table scanning, DMFilePackFilter of a DMFile may be created several times: - // 1. When building MVCC bitmap (ReadTag::MVCC). - // 2. When building LM filter stream (ReadTag::LM). - // 3. When building stream of other columns (ReadTag::Query). - // Only need to count the filter result once. - // TODO: We can create DMFilePackFilter at the beginning and pass it to the stages described above. - if (read_tag == ReadTag::Query) - { - scan_context->rs_pack_filter_none += none_count; - scan_context->rs_pack_filter_some += some_count; - scan_context->rs_pack_filter_all += all_count; - scan_context->rs_pack_filter_all_null += all_null_count; - } - - Float64 filter_rate = 0.0; - if (after_read_packs != 0) - { - filter_rate = (after_read_packs - after_filter) * 100.0 / after_read_packs; - GET_METRIC(tiflash_storage_rough_set_filter_rate, type_dtfile_pack).Observe(filter_rate); - } - LOG_DEBUG( - log, - "RSFilter exclude rate: {:.2f}, after_pk: {}, after_read_packs: {}, after_filter: {}, handle_ranges: {}" - ", read_packs: {}, pack_count: {}, none_count: {}, some_count: {}, all_count: {}, all_null_count: {}, " - "read_tag: {}", - ((after_read_packs == 0) ? std::numeric_limits::quiet_NaN() : filter_rate), - after_pk, - after_read_packs, - after_filter, - toDebugString(rowkey_ranges), - ((read_packs == nullptr) ? 0 : read_packs->size()), - pack_count, - none_count, - some_count, - all_count, - all_null_count, - magic_enum::enum_name(read_tag)); -} - -std::tuple DMFilePackFilter::countPackRes() const -{ - UInt64 none_count = 0; - UInt64 some_count = 0; - UInt64 all_count = 0; - UInt64 all_null_count = 0; - for (auto res : pack_res) - { - if (res == RSResult::None || res == RSResult::NoneNull) - ++none_count; - else if (res == RSResult::Some || res == RSResult::SomeNull) - ++some_count; - else if (res == RSResult::All) - ++all_count; - else if (res == RSResult::AllNull) - ++all_null_count; - } - return {none_count, some_count, all_count, all_null_count}; -} - -UInt64 DMFilePackFilter::countUsePack() const -{ - return std::count_if(pack_res.cbegin(), pack_res.cend(), [](RSResult res) { return res.isUse(); }); -} - -void DMFilePackFilter::loadIndex( - ColumnIndexes & indexes, - const DMFilePtr & dmfile, - const FileProviderPtr & file_provider, - const MinMaxIndexCachePtr & index_cache, - bool set_cache_if_miss, - ColId col_id, - const ReadLimiterPtr & read_limiter, - const ScanContextPtr & scan_context) -{ - const auto & type = dmfile->getColumnStat(col_id).type; - const auto file_name_base = DMFile::getFileNameBase(col_id); - - auto load = [&]() { - auto index_file_size = dmfile->colIndexSize(col_id); - if (index_file_size == 0) - return std::make_shared(*type); - auto index_guard = S3::S3RandomAccessFile::setReadFileInfo({ - .size = dmfile->getReadFileSize(col_id, colIndexFileName(file_name_base)), - .scan_context = scan_context, - }); - if (!dmfile->getConfiguration()) // v1 - { - auto index_buf = ReadBufferFromRandomAccessFileBuilder::build( - file_provider, - dmfile->colIndexPath(file_name_base), - dmfile->encryptionIndexPath(file_name_base), - std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), index_file_size), - read_limiter); - return MinMaxIndex::read(*type, index_buf, index_file_size); - } - else if (dmfile->useMetaV2()) // v3 - { - const auto * dmfile_meta = typeid_cast(dmfile->meta.get()); - assert(dmfile_meta != nullptr); - auto info = dmfile_meta->merged_sub_file_infos.find(colIndexFileName(file_name_base)); - if (info == dmfile_meta->merged_sub_file_infos.end()) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Unknown index file {}", - dmfile->colIndexPath(file_name_base)); - } - - auto file_path = dmfile->meta->mergedPath(info->second.number); - auto encryp_path = dmfile_meta->encryptionMergedPath(info->second.number); - auto offset = info->second.offset; - auto data_size = info->second.size; - - auto buffer = ReadBufferFromRandomAccessFileBuilder::build( - file_provider, - file_path, - encryp_path, - dmfile->getConfiguration()->getChecksumFrameLength(), - read_limiter); - buffer.seek(offset); - - String raw_data; - raw_data.resize(data_size); - - buffer.read(reinterpret_cast(raw_data.data()), data_size); - - auto buf = ChecksumReadBufferBuilder::build( - std::move(raw_data), - dmfile->colIndexPath(file_name_base), // just for debug - dmfile->getConfiguration()->getChecksumFrameLength(), - dmfile->getConfiguration()->getChecksumAlgorithm(), - dmfile->getConfiguration()->getChecksumFrameLength()); - - auto header_size = dmfile->getConfiguration()->getChecksumHeaderLength(); - auto frame_total_size = dmfile->getConfiguration()->getChecksumFrameLength() + header_size; - auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); - - return MinMaxIndex::read(*type, *buf, index_file_size - header_size * frame_count); - } - else - { // v2 - auto index_buf = ChecksumReadBufferBuilder::build( - file_provider, - dmfile->colIndexPath(file_name_base), - dmfile->encryptionIndexPath(file_name_base), - index_file_size, - read_limiter, - dmfile->getConfiguration()->getChecksumAlgorithm(), - dmfile->getConfiguration()->getChecksumFrameLength()); - auto header_size = dmfile->getConfiguration()->getChecksumHeaderLength(); - auto frame_total_size = dmfile->getConfiguration()->getChecksumFrameLength() + header_size; - auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); - return MinMaxIndex::read(*type, *index_buf, index_file_size - header_size * frame_count); - } - }; - MinMaxIndexPtr minmax_index; - if (index_cache && set_cache_if_miss) - { - minmax_index = index_cache->getOrSet(dmfile->colIndexCacheKey(file_name_base), load); - } - else - { - // try load from the cache first - if (index_cache) - minmax_index = index_cache->get(dmfile->colIndexCacheKey(file_name_base)); - if (minmax_index == nullptr) - minmax_index = load(); - } - indexes.emplace(col_id, RSIndex(type, minmax_index)); -} - -void DMFilePackFilter::tryLoadIndex(ColId col_id) -{ - if (param.indexes.count(col_id)) - return; - - if (!dmfile->isColIndexExist(col_id)) - return; - - Stopwatch watch; - loadIndex(param.indexes, dmfile, file_provider, index_cache, set_cache_if_miss, col_id, read_limiter, scan_context); -} - -} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h index 3391af14444..53ea70bbd3a 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h @@ -27,6 +27,8 @@ #include #include +#include + namespace ProfileEvents { extern const Event DMFileFilterNoFilter; @@ -153,6 +155,33 @@ class DMFilePackFilter std::vector handle_filters; for (auto & rowkey_range : rowkey_ranges) handle_filters.emplace_back(toFilter(rowkey_range)); +#ifndef NDEBUG + // sanity check under debug mode to ensure the rowkey_range is correct common-handle or int64-handle + if (!rowkey_ranges.empty()) + { + bool is_common_handle = rowkey_ranges.begin()->is_common_handle; + auto handle_col_type = dmfile->getColumnStat(EXTRA_HANDLE_COLUMN_ID).type; + if (is_common_handle) + RUNTIME_CHECK_MSG( + handle_col_type->getTypeId() == TypeIndex::String, + "handle_col_type_id={}", + magic_enum::enum_name(handle_col_type->getTypeId())); + else + RUNTIME_CHECK_MSG( + handle_col_type->getTypeId() == TypeIndex::Int64, + "handle_col_type_id={}", + magic_enum::enum_name(handle_col_type->getTypeId())); + for (size_t i = 1; i < rowkey_ranges.size(); ++i) + { + RUNTIME_CHECK_MSG( + is_common_handle == rowkey_ranges[i].is_common_handle, + "i={} is_common_handle={} ith.is_common_handle={}", + i, + is_common_handle, + rowkey_ranges[i].is_common_handle); + } + } +#endif for (size_t i = 0; i < pack_count; ++i) { handle_res[i] = RSResult::None;