diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.cpp index 84274ee6b3e..c3218ef41d7 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.cpp @@ -20,7 +20,7 @@ namespace DB { namespace DM { -RowKeyRange ColumnFileSetSnapshot::getSquashDeleteRange() const +RowKeyRange ColumnFileSetSnapshot::getSquashDeleteRange(bool is_common_handle, size_t rowkey_column_size) const { RowKeyRange squashed_delete_range = RowKeyRange::newNone(is_common_handle, rowkey_column_size); for (const auto & column_file : column_files) diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h index b5ef4966c6c..42b55a11e94 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetSnapshot.h @@ -45,7 +45,7 @@ class BlockOrDelete {} bool isBlock() { return static_cast(block); } - auto & getBlock() { return block; }; + auto & getBlock() { return block; } auto getBlockOffset() const { return block_offset; } auto & getDeleteRange() { return delete_range; } }; @@ -66,9 +66,6 @@ class ColumnFileSetSnapshot size_t bytes{0}; size_t deletes{0}; - bool is_common_handle{false}; - size_t rowkey_column_size{0}; - public: /// This field is public writeable intentionally. It allows us to build a snapshot first, /// then change how these data can be read later. @@ -89,8 +86,6 @@ class ColumnFileSetSnapshot c->rows = rows; c->bytes = bytes; c->deletes = deletes; - c->is_common_handle = is_common_handle; - c->rowkey_column_size = rowkey_column_size; return c; } @@ -102,7 +97,7 @@ class ColumnFileSetSnapshot size_t getBytes() const { return bytes; } size_t getDeletes() const { return deletes; } - RowKeyRange getSquashDeleteRange() const; + RowKeyRange getSquashDeleteRange(bool is_common_handle, size_t rowkey_column_size) const; const auto & getDataProvider() const { return data_provider; } }; diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h index d915eb2c8d1..537795bcf95 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h @@ -393,7 +393,14 @@ class DeltaValueSnapshot size_t getMemTableSetRowsOffset() const { return persisted_files_snap->getRows(); } size_t getMemTableSetDeletesOffset() const { return persisted_files_snap->getDeletes(); } - RowKeyRange getSquashDeleteRange() const; + /** + * Get a "squash" delete range in the delta layer. We can use this to ** estimate ** how many rows in the + * stable layer is pending for delete. + * The squash delete range may be larger than that will actually removed rows. For example, if there are + * two delete range [0, 30) and [100, 200), the squash delete range will be [0, 200). Actually after applying + * the two delete range, the rows [30, 100) will not be removed. + */ + RowKeyRange getSquashDeleteRange(bool is_common_handle, size_t rowkey_column_size) const; const auto & getSharedDeltaIndex() { return shared_delta_index; } size_t getDeltaIndexEpoch() const { return delta_index_epoch; } diff --git a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp index 0e0490d8ce9..c15044a0caa 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp @@ -56,10 +56,10 @@ DeltaSnapshotPtr DeltaValueSpace::createSnapshot( return snap; } -RowKeyRange DeltaValueSnapshot::getSquashDeleteRange() const +RowKeyRange DeltaValueSnapshot::getSquashDeleteRange(bool is_common_handle, size_t rowkey_column_size) const { - auto delete_range1 = mem_table_snap->getSquashDeleteRange(); - auto delete_range2 = persisted_files_snap->getSquashDeleteRange(); + auto delete_range1 = mem_table_snap->getSquashDeleteRange(is_common_handle, rowkey_column_size); + auto delete_range2 = persisted_files_snap->getSquashDeleteRange(is_common_handle, rowkey_column_size); return delete_range1.merge(delete_range2); } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp index 9bdcd4900e7..a31c2a5a80a 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp @@ -509,7 +509,9 @@ bool shouldCompactDeltaWithStable( double invalid_data_ratio_threshold, const LoggerPtr & log) { - auto actual_delete_range = snap->delta->getSquashDeleteRange().shrink(segment_range); + auto actual_delete_range + = snap->delta->getSquashDeleteRange(segment_range.is_common_handle, segment_range.rowkey_column_size) + .shrink(segment_range); if (actual_delete_range.none()) return false; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h index 3391af14444..53ea70bbd3a 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h @@ -27,6 +27,8 @@ #include #include +#include + namespace ProfileEvents { extern const Event DMFileFilterNoFilter; @@ -153,6 +155,33 @@ class DMFilePackFilter std::vector handle_filters; for (auto & rowkey_range : rowkey_ranges) handle_filters.emplace_back(toFilter(rowkey_range)); +#ifndef NDEBUG + // sanity check under debug mode to ensure the rowkey_range is correct common-handle or int64-handle + if (!rowkey_ranges.empty()) + { + bool is_common_handle = rowkey_ranges.begin()->is_common_handle; + auto handle_col_type = dmfile->getColumnStat(EXTRA_HANDLE_COLUMN_ID).type; + if (is_common_handle) + RUNTIME_CHECK_MSG( + handle_col_type->getTypeId() == TypeIndex::String, + "handle_col_type_id={}", + magic_enum::enum_name(handle_col_type->getTypeId())); + else + RUNTIME_CHECK_MSG( + handle_col_type->getTypeId() == TypeIndex::Int64, + "handle_col_type_id={}", + magic_enum::enum_name(handle_col_type->getTypeId())); + for (size_t i = 1; i < rowkey_ranges.size(); ++i) + { + RUNTIME_CHECK_MSG( + is_common_handle == rowkey_ranges[i].is_common_handle, + "i={} is_common_handle={} ith.is_common_handle={}", + i, + is_common_handle, + rowkey_ranges[i].is_common_handle); + } + } +#endif for (size_t i = 0; i < pack_count; ++i) { handle_res[i] = RSResult::None; diff --git a/dbms/src/Storages/DeltaMerge/Filter/FilterHelper.h b/dbms/src/Storages/DeltaMerge/Filter/FilterHelper.h index fd994d3d031..6ea4c069fb0 100644 --- a/dbms/src/Storages/DeltaMerge/Filter/FilterHelper.h +++ b/dbms/src/Storages/DeltaMerge/Filter/FilterHelper.h @@ -23,10 +23,11 @@ namespace DB::DM inline RSOperatorPtr toFilter(RowKeyRange & rowkey_range) { - Attr handle_attr - = {EXTRA_HANDLE_COLUMN_NAME, - EXTRA_HANDLE_COLUMN_ID, - rowkey_range.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE}; + Attr handle_attr = { + EXTRA_HANDLE_COLUMN_NAME, + EXTRA_HANDLE_COLUMN_ID, + rowkey_range.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE, + }; if (rowkey_range.is_common_handle) { auto left = createGreaterEqual( diff --git a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h index 86ac2a7e62f..2d9d88ad5b7 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h +++ b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h @@ -16,7 +16,6 @@ #include #include -#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp index 5a391078d2d..611703fd77d 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp @@ -214,8 +214,6 @@ ColumnFileSetSnapshotPtr Serializer::deserializeColumnFileSet( { auto empty_data_provider = std::make_shared(); auto ret = std::make_shared(empty_data_provider); - ret->is_common_handle = segment_range.is_common_handle; - ret->rowkey_column_size = segment_range.rowkey_column_size; ret->column_files.reserve(proto.size()); for (const auto & remote_column_file : proto) { diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp index 6081444e54f..b3cbdd560f0 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -1609,28 +1610,33 @@ try RowKeyRange range; Int64 start, end; }; - std::vector ranges; - ranges.emplace_back( - DMTestEnv::getRowKeyRangeForClusteredIndex(0, span_per_part, rowkey_column_size), - 0, - span_per_part); // only first part - ranges.emplace_back( - DMTestEnv::getRowKeyRangeForClusteredIndex(800, num_rows_write, rowkey_column_size), - 800, - num_rows_write); - ranges.emplace_back(DMTestEnv::getRowKeyRangeForClusteredIndex(256, 700, rowkey_column_size), 256, 700); // - ranges.emplace_back(DMTestEnv::getRowKeyRangeForClusteredIndex(0, 0, rowkey_column_size), 0, 0); // none - ranges.emplace_back( - DMTestEnv::getRowKeyRangeForClusteredIndex(0, num_rows_write, rowkey_column_size), - 0, - num_rows_write); // full range - ranges.emplace_back( - DMTestEnv::getRowKeyRangeForClusteredIndex( + std::vector ranges{ + QueryRangeInfo{ + DMTestEnv::getRowKeyRangeForClusteredIndex(0, span_per_part, rowkey_column_size), + 0, + span_per_part // only first part + }, + QueryRangeInfo{ + DMTestEnv::getRowKeyRangeForClusteredIndex(800, num_rows_write, rowkey_column_size), + 800, + num_rows_write}, + QueryRangeInfo{DMTestEnv::getRowKeyRangeForClusteredIndex(256, 700, rowkey_column_size), 256, 700}, + QueryRangeInfo{DMTestEnv::getRowKeyRangeForClusteredIndex(0, 0, rowkey_column_size), 0, 0}, //none + QueryRangeInfo{ + DMTestEnv::getRowKeyRangeForClusteredIndex(0, num_rows_write, rowkey_column_size), + 0, + num_rows_write, + }, // full range + QueryRangeInfo{ + DMTestEnv::getRowKeyRangeForClusteredIndex( + std::numeric_limits::min(), + std::numeric_limits::max(), + rowkey_column_size), std::numeric_limits::min(), std::numeric_limits::max(), - rowkey_column_size), - std::numeric_limits::min(), - std::numeric_limits::max()); // full range + }, // full range + }; + for (const auto & range : ranges) { // Test read diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 6e8a9e8dffc..13aae4b16ff 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -909,7 +909,7 @@ try auto check_segment_squash_delete_range = [this](SegmentPtr & segment, const HandleRange & expect_range) { // set `is_update=false` to get full squash delete range auto snap = segment->createSnapshot(dmContext(), /*for_update*/ false, CurrentMetrics::DT_SnapshotOfRead); - auto squash_range = snap->delta->getSquashDeleteRange(); + auto squash_range = snap->delta->getSquashDeleteRange(/*is_common_handle=*/false, /*rowkey_column_size=*/1); ASSERT_ROWKEY_RANGE_EQ(squash_range, RowKeyRange::fromHandleRange(expect_range)); }; @@ -955,6 +955,7 @@ try HandleRange del{1, 32}; segment->write(dmContext(), {RowKeyRange::fromHandleRange(del)}); SCOPED_TRACE("check after range: " + del.toDebugString()); + // suqash_delete_range will consider [1, 100) maybe deleted check_segment_squash_delete_range(segment, HandleRange{1, 100}); } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp index 883fc2ed386..154ffb7e828 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp @@ -28,12 +28,18 @@ #include #include -namespace DB +namespace CurrentMetrics { -namespace DM -{ -namespace tests +extern const Metric DT_SnapshotOfRead; +extern const Metric DT_SnapshotOfReadRaw; +extern const Metric DT_SnapshotOfSegmentSplit; +extern const Metric DT_SnapshotOfSegmentMerge; +extern const Metric DT_SnapshotOfDeltaMerge; +extern const Metric DT_SnapshotOfPlaceIndex; +} // namespace CurrentMetrics +namespace DB::DM::tests { + class SegmentCommonHandleTest : public DB::base::TiFlashStorageTestBasic { public: @@ -580,16 +586,27 @@ try } { - // flush segment + // do delta-merge move data to stable segment = segment->mergeDelta(dmContext(), tableColumns()); } + auto check_segment_squash_delete_range = [this](SegmentPtr & segment, const RowKeyRange & expect_range) { + // set `is_update=false` to get full squash delete range + auto snap = segment->createSnapshot(dmContext(), /*for_update*/ false, CurrentMetrics::DT_SnapshotOfRead); + auto squash_range = snap->delta->getSquashDeleteRange(is_common_handle, rowkey_column_size); + ASSERT_ROWKEY_RANGE_EQ(squash_range, expect_range); + }; + { // Test delete range [70, 100) - segment->write(dmContext(), {DMTestEnv::getRowKeyRangeForClusteredIndex(70, 100, rowkey_column_size)}); - // flush segment + auto del_row_range = DMTestEnv::getRowKeyRangeForClusteredIndex(70, 100, rowkey_column_size); + SCOPED_TRACE("check after range: " + del_row_range.toDebugString()); // Add trace msg when ASSERT failed + // mem-table + segment->write(dmContext(), {del_row_range}); + check_segment_squash_delete_range(segment, del_row_range); + // persisted-file segment->flushCache(dmContext()); - segment = segment->mergeDelta(dmContext(), tableColumns()); + check_segment_squash_delete_range(segment, del_row_range); } { @@ -619,10 +636,15 @@ try { // Test delete range [63, 70) - segment->write(dmContext(), {DMTestEnv::getRowKeyRangeForClusteredIndex(63, 70, rowkey_column_size)}); - // flush segment + auto del_row_range = DMTestEnv::getRowKeyRangeForClusteredIndex(63, 70, rowkey_column_size); + auto merged_del_range = DMTestEnv::getRowKeyRangeForClusteredIndex(63, 100, rowkey_column_size); + SCOPED_TRACE("check after range: " + del_row_range.toDebugString()); // Add trace msg when ASSERT failed + // mem-table + segment->write(dmContext(), {del_row_range}); + check_segment_squash_delete_range(segment, merged_del_range); + // persisted-file segment->flushCache(dmContext()); - segment = segment->mergeDelta(dmContext(), tableColumns()); + check_segment_squash_delete_range(segment, merged_del_range); } { @@ -652,10 +674,14 @@ try { // Test delete range [1, 32) - segment->write(dmContext(), {DMTestEnv::getRowKeyRangeForClusteredIndex(1, 32, rowkey_column_size)}); - // flush segment - segment->flushCache(dmContext()); - segment = segment->mergeDelta(dmContext(), tableColumns()); + auto del_row_range = DMTestEnv::getRowKeyRangeForClusteredIndex(1, 32, rowkey_column_size); + SCOPED_TRACE("check after range: " + del_row_range.toDebugString()); // Add trace msg when ASSERT failed + segment->write(dmContext(), {del_row_range}); + auto merged_del_range = DMTestEnv::getRowKeyRangeForClusteredIndex( + 1, + 100, + rowkey_column_size); // suqash_delete_range will consider [1, 100) maybe deleted + check_segment_squash_delete_range(segment, merged_del_range); } { @@ -869,7 +895,7 @@ try ASSERT_EQ(c1->size(), c2->size()); - for (Int64 i = 0; i < Int64(c1->size()); i++) + for (Int64 i = 0; i < static_cast(c1->size()); i++) { if (iter1->name == DMTestEnv::pk_name) { @@ -970,8 +996,8 @@ try segment->write( dmContext(), DMTestEnv::getRowKeyRangeForClusteredIndex( - Int64((num_batches_written - 1) * num_rows_per_write), - Int64((num_batches_written - 1) * num_rows_per_write + 2), + static_cast((num_batches_written - 1) * num_rows_per_write), + static_cast((num_batches_written - 1) * num_rows_per_write + 2), rowkey_column_size)); } @@ -984,7 +1010,7 @@ try i < num_batches_written * num_rows_per_write; i++) { - temp.push_back(Int64(i)); + temp.push_back(static_cast(i)); } { @@ -1023,6 +1049,4 @@ try } CATCH -} // namespace tests -} // namespace DM -} // namespace DB +} // namespace DB::DM::tests