Skip to content

Commit

Permalink
Storage: Fix getSquashDeleteRange does not return correctly squashe…
Browse files Browse the repository at this point in the history
…d key-range when using common-handle (#9530) (#9565)

close #9529

Storage: Fix `getSquashDeleteRange` does not return correctly squashed key-range when using common-handle

Co-authored-by: JaySon <[email protected]>
Co-authored-by: JaySon-Huang <[email protected]>
  • Loading branch information
ti-chi-bot and JaySon-Huang authored Nov 1, 2024
1 parent 1d5458b commit 1b2a2ea
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace DB
{
namespace DM
{
RowKeyRange ColumnFileSetSnapshot::getSquashDeleteRange() const
RowKeyRange ColumnFileSetSnapshot::getSquashDeleteRange(bool is_common_handle, size_t rowkey_column_size) const
{
RowKeyRange squashed_delete_range = RowKeyRange::newNone(is_common_handle, rowkey_column_size);
for (const auto & column_file : column_files)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class BlockOrDelete
{}

bool isBlock() { return static_cast<bool>(block); }
auto & getBlock() { return block; };
auto & getBlock() { return block; }
auto getBlockOffset() const { return block_offset; }
auto & getDeleteRange() { return delete_range; }
};
Expand All @@ -65,9 +65,6 @@ class ColumnFileSetSnapshot : public std::enable_shared_from_this<ColumnFileSetS
size_t bytes{0};
size_t deletes{0};

bool is_common_handle{false};
size_t rowkey_column_size{0};

public:
/// This field is public writeable intentionally. It allows us to build a snapshot first,
/// then change how these data can be read later.
Expand All @@ -88,8 +85,6 @@ class ColumnFileSetSnapshot : public std::enable_shared_from_this<ColumnFileSetS
c->rows = rows;
c->bytes = bytes;
c->deletes = deletes;
c->is_common_handle = is_common_handle;
c->rowkey_column_size = rowkey_column_size;

return c;
}
Expand All @@ -101,7 +96,7 @@ class ColumnFileSetSnapshot : public std::enable_shared_from_this<ColumnFileSetS
size_t getBytes() const { return bytes; }
size_t getDeletes() const { return deletes; }

RowKeyRange getSquashDeleteRange() const;
RowKeyRange getSquashDeleteRange(bool is_common_handle, size_t rowkey_column_size) const;

const auto & getDataProvider() const { return data_provider; }
};
Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,14 @@ class DeltaValueSnapshot
size_t getMemTableSetRowsOffset() const { return persisted_files_snap->getRows(); }
size_t getMemTableSetDeletesOffset() const { return persisted_files_snap->getDeletes(); }

RowKeyRange getSquashDeleteRange() const;
/**
* Get a "squash" delete range in the delta layer. We can use this to ** estimate ** how many rows in the
* stable layer is pending for delete.
* The squash delete range may be larger than that will actually removed rows. For example, if there are
* two delete range [0, 30) and [100, 200), the squash delete range will be [0, 200). Actually after applying
* the two delete range, the rows [30, 100) will not be removed.
*/
RowKeyRange getSquashDeleteRange(bool is_common_handle, size_t rowkey_column_size) const;

const auto & getSharedDeltaIndex() { return shared_delta_index; }
size_t getDeltaIndexEpoch() const { return delta_index_epoch; }
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ DeltaSnapshotPtr DeltaValueSpace::createSnapshot(const DMContext & context, bool
return snap;
}

RowKeyRange DeltaValueSnapshot::getSquashDeleteRange() const
RowKeyRange DeltaValueSnapshot::getSquashDeleteRange(bool is_common_handle, size_t rowkey_column_size) const
{
auto delete_range1 = mem_table_snap->getSquashDeleteRange();
auto delete_range2 = persisted_files_snap->getSquashDeleteRange();
auto delete_range1 = mem_table_snap->getSquashDeleteRange(is_common_handle, rowkey_column_size);
auto delete_range2 = persisted_files_snap->getSquashDeleteRange(is_common_handle, rowkey_column_size);
return delete_range1.merge(delete_range2);
}

Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ extern const char pause_until_dt_background_delta_merge[];

namespace DM
{

// A callback class for scanning the DMFiles on local filesystem
class LocalDMFileGcScanner final
{
Expand Down Expand Up @@ -475,7 +474,9 @@ bool shouldCompactStableWithTooManyInvalidVersion(const SegmentPtr & seg, DB::Ti

bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentPtr & segment, const SegmentSnapshotPtr & snap, const RowKeyRange & segment_range, double invalid_data_ratio_threshold, const LoggerPtr & log)
{
auto actual_delete_range = snap->delta->getSquashDeleteRange().shrink(segment_range);
auto actual_delete_range
= snap->delta->getSquashDeleteRange(segment_range.is_common_handle, segment_range.rowkey_column_size)
.shrink(segment_range);
if (actual_delete_range.none())
return false;

Expand Down
30 changes: 30 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
#include <Encryption/createReadBufferFromFileBaseByFileProvider.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/File/DMFilePackFilter.h>
#include <Storages/DeltaMerge/Filter/FilterHelper.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/ScanContext.h>

#include <magic_enum.hpp>

namespace ProfileEvents
{
extern const Event DMFileFilterNoFilter;
Expand Down Expand Up @@ -142,6 +145,33 @@ class DMFilePackFilter
std::vector<RSOperatorPtr> handle_filters;
for (auto & rowkey_range : rowkey_ranges)
handle_filters.emplace_back(toFilter(rowkey_range));
#ifndef NDEBUG
// sanity check under debug mode to ensure the rowkey_range is correct common-handle or int64-handle
if (!rowkey_ranges.empty())
{
bool is_common_handle = rowkey_ranges.begin()->is_common_handle;
auto handle_col_type = dmfile->getColumnStat(EXTRA_HANDLE_COLUMN_ID).type;
if (is_common_handle)
RUNTIME_CHECK_MSG(
handle_col_type->getTypeId() == TypeIndex::String,
"handle_col_type_id={}",
magic_enum::enum_name(handle_col_type->getTypeId()));
else
RUNTIME_CHECK_MSG(
handle_col_type->getTypeId() == TypeIndex::Int64,
"handle_col_type_id={}",
magic_enum::enum_name(handle_col_type->getTypeId()));
for (size_t i = 1; i < rowkey_ranges.size(); ++i)
{
RUNTIME_CHECK_MSG(
is_common_handle == rowkey_ranges[i].is_common_handle,
"i={} is_common_handle={} ith.is_common_handle={}",
i,
is_common_handle,
rowkey_ranges[i].is_common_handle);
}
}
#endif
for (size_t i = 0; i < pack_count; ++i)
{
handle_res[i] = RSResult::None;
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Storages/DeltaMerge/Filter/FilterHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ namespace DM
{
inline RSOperatorPtr toFilter(RowKeyRange & rowkey_range)
{
Attr handle_attr = {EXTRA_HANDLE_COLUMN_NAME,
EXTRA_HANDLE_COLUMN_ID,
rowkey_range.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE};
Attr handle_attr = {
EXTRA_HANDLE_COLUMN_NAME,
EXTRA_HANDLE_COLUMN_ID,
rowkey_range.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE,
};
if (rowkey_range.is_common_handle)
{
auto left = createGreaterEqual(handle_attr, Field(rowkey_range.start.value->data(), rowkey_range.start.value->size()), -1);
Expand All @@ -42,4 +44,4 @@ inline RSOperatorPtr toFilter(RowKeyRange & rowkey_range)
}

} // namespace DM
} // namespace DB
} // namespace DB
2 changes: 0 additions & 2 deletions dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ ColumnFileSetSnapshotPtr Serializer::deserializeColumnFileSet(
{
auto empty_data_provider = std::make_shared<ColumnFileDataProviderNop>();
auto ret = std::make_shared<ColumnFileSetSnapshot>(empty_data_provider);
ret->is_common_handle = segment_range.is_common_handle;
ret->rowkey_column_size = segment_range.rowkey_column_size;
ret->column_files.reserve(proto.size());
for (const auto & remote_column_file : proto)
{
Expand Down
42 changes: 27 additions & 15 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/File/DMFileBlockOutputStream.h>
#include <Storages/DeltaMerge/File/DMFileWriter.h>
#include <Storages/DeltaMerge/Range.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/tests/DMTestEnv.h>
Expand Down Expand Up @@ -1523,21 +1524,32 @@ try
RowKeyRange range;
Int64 start, end;
};
std::vector<QueryRangeInfo> ranges;
ranges.emplace_back(
DMTestEnv::getRowKeyRangeForClusteredIndex(0, span_per_part, rowkey_column_size),
0,
span_per_part); // only first part
ranges.emplace_back(DMTestEnv::getRowKeyRangeForClusteredIndex(800, num_rows_write, rowkey_column_size), 800, num_rows_write);
ranges.emplace_back(DMTestEnv::getRowKeyRangeForClusteredIndex(256, 700, rowkey_column_size), 256, 700); //
ranges.emplace_back(DMTestEnv::getRowKeyRangeForClusteredIndex(0, 0, rowkey_column_size), 0, 0); // none
ranges.emplace_back(DMTestEnv::getRowKeyRangeForClusteredIndex(0, num_rows_write, rowkey_column_size), 0, num_rows_write); // full range
ranges.emplace_back(DMTestEnv::getRowKeyRangeForClusteredIndex(
std::numeric_limits<Int64>::min(),
std::numeric_limits<Int64>::max(),
rowkey_column_size),
std::numeric_limits<Int64>::min(),
std::numeric_limits<Int64>::max()); // full range
std::vector<QueryRangeInfo> ranges{
QueryRangeInfo{
DMTestEnv::getRowKeyRangeForClusteredIndex(0, span_per_part, rowkey_column_size),
0,
span_per_part // only first part
},
QueryRangeInfo{
DMTestEnv::getRowKeyRangeForClusteredIndex(800, num_rows_write, rowkey_column_size),
800,
num_rows_write},
QueryRangeInfo{DMTestEnv::getRowKeyRangeForClusteredIndex(256, 700, rowkey_column_size), 256, 700},
QueryRangeInfo{DMTestEnv::getRowKeyRangeForClusteredIndex(0, 0, rowkey_column_size), 0, 0}, //none
QueryRangeInfo{
DMTestEnv::getRowKeyRangeForClusteredIndex(0, num_rows_write, rowkey_column_size),
0,
num_rows_write,
}, // full range
QueryRangeInfo{
DMTestEnv::getRowKeyRangeForClusteredIndex(
std::numeric_limits<Int64>::min(),
std::numeric_limits<Int64>::max(),
rowkey_column_size),
std::numeric_limits<Int64>::min(),
std::numeric_limits<Int64>::max(),
}, // full range
};
for (const auto & range : ranges)
{
// Test read
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ try
auto check_segment_squash_delete_range = [this](SegmentPtr & segment, const HandleRange & expect_range) {
// set `is_update=false` to get full squash delete range
auto snap = segment->createSnapshot(dmContext(), /*for_update*/ false, CurrentMetrics::DT_SnapshotOfRead);
auto squash_range = snap->delta->getSquashDeleteRange();
auto squash_range = snap->delta->getSquashDeleteRange(/*is_common_handle=*/false, /*rowkey_column_size=*/1);
ASSERT_ROWKEY_RANGE_EQ(squash_range, RowKeyRange::fromHandleRange(expect_range));
};

Expand Down Expand Up @@ -873,6 +873,7 @@ try
HandleRange del{1, 32};
segment->write(dmContext(), {RowKeyRange::fromHandleRange(del)});
SCOPED_TRACE("check after range: " + del.toDebugString());
// suqash_delete_range will consider [1, 100) maybe deleted
check_segment_squash_delete_range(segment, HandleRange{1, 100});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@
#include <ctime>
#include <memory>

namespace DB
namespace CurrentMetrics
{
namespace DM
{
namespace tests
extern const Metric DT_SnapshotOfRead;
extern const Metric DT_SnapshotOfReadRaw;
extern const Metric DT_SnapshotOfSegmentSplit;
extern const Metric DT_SnapshotOfSegmentMerge;
extern const Metric DT_SnapshotOfDeltaMerge;
extern const Metric DT_SnapshotOfPlaceIndex;
} // namespace CurrentMetrics
namespace DB::DM::tests
{
class SegmentCommonHandleTest : public DB::base::TiFlashStorageTestBasic
{
Expand Down Expand Up @@ -517,16 +522,27 @@ try
}

{
// flush segment
// do delta-merge move data to stable
segment = segment->mergeDelta(dmContext(), tableColumns());
}

auto check_segment_squash_delete_range = [this](SegmentPtr & segment, const RowKeyRange & expect_range) {
// set `is_update=false` to get full squash delete range
auto snap = segment->createSnapshot(dmContext(), /*for_update*/ false, CurrentMetrics::DT_SnapshotOfRead);
auto squash_range = snap->delta->getSquashDeleteRange(is_common_handle, rowkey_column_size);
ASSERT_ROWKEY_RANGE_EQ(squash_range, expect_range);
};

{
// Test delete range [70, 100)
segment->write(dmContext(), {DMTestEnv::getRowKeyRangeForClusteredIndex(70, 100, rowkey_column_size)});
// flush segment
auto del_row_range = DMTestEnv::getRowKeyRangeForClusteredIndex(70, 100, rowkey_column_size);
SCOPED_TRACE("check after range: " + del_row_range.toDebugString()); // Add trace msg when ASSERT failed
// mem-table
segment->write(dmContext(), {del_row_range});
check_segment_squash_delete_range(segment, del_row_range);
// persisted-file
segment->flushCache(dmContext());
segment = segment->mergeDelta(dmContext(), tableColumns());
check_segment_squash_delete_range(segment, del_row_range);
}

{
Expand All @@ -551,10 +567,15 @@ try

{
// Test delete range [63, 70)
segment->write(dmContext(), {DMTestEnv::getRowKeyRangeForClusteredIndex(63, 70, rowkey_column_size)});
// flush segment
auto del_row_range = DMTestEnv::getRowKeyRangeForClusteredIndex(63, 70, rowkey_column_size);
auto merged_del_range = DMTestEnv::getRowKeyRangeForClusteredIndex(63, 100, rowkey_column_size);
SCOPED_TRACE("check after range: " + del_row_range.toDebugString()); // Add trace msg when ASSERT failed
// mem-table
segment->write(dmContext(), {del_row_range});
check_segment_squash_delete_range(segment, merged_del_range);
// persisted-file
segment->flushCache(dmContext());
segment = segment->mergeDelta(dmContext(), tableColumns());
check_segment_squash_delete_range(segment, merged_del_range);
}

{
Expand All @@ -579,10 +600,14 @@ try

{
// Test delete range [1, 32)
segment->write(dmContext(), {DMTestEnv::getRowKeyRangeForClusteredIndex(1, 32, rowkey_column_size)});
// flush segment
segment->flushCache(dmContext());
segment = segment->mergeDelta(dmContext(), tableColumns());
auto del_row_range = DMTestEnv::getRowKeyRangeForClusteredIndex(1, 32, rowkey_column_size);
SCOPED_TRACE("check after range: " + del_row_range.toDebugString()); // Add trace msg when ASSERT failed
segment->write(dmContext(), {del_row_range});
auto merged_del_range = DMTestEnv::getRowKeyRangeForClusteredIndex(
1,
100,
rowkey_column_size); // suqash_delete_range will consider [1, 100) maybe deleted
check_segment_squash_delete_range(segment, merged_del_range);
}

{
Expand Down Expand Up @@ -762,7 +787,7 @@ try

ASSERT_EQ(c1->size(), c2->size());

for (Int64 i = 0; i < Int64(c1->size()); i++)
for (Int64 i = 0; i < static_cast<Int64>(c1->size()); i++)
{
if (iter1->name == DMTestEnv::pk_name)
{
Expand Down Expand Up @@ -859,10 +884,12 @@ try
// Delete some records so that the following condition can be satisfied:
// if pk % 5 < 2, then the record would be deleted
// if pk % 5 >= 2, then the record would be reserved
segment->write(dmContext(),
DMTestEnv::getRowKeyRangeForClusteredIndex(Int64((num_batches_written - 1) * num_rows_per_write),
Int64((num_batches_written - 1) * num_rows_per_write + 2),
rowkey_column_size));
segment->write(
dmContext(),
DMTestEnv::getRowKeyRangeForClusteredIndex(
static_cast<Int64>((num_batches_written - 1) * num_rows_per_write),
static_cast<Int64>((num_batches_written - 1) * num_rows_per_write + 2),
rowkey_column_size));
}

{
Expand All @@ -872,7 +899,7 @@ try

for (size_t i = (num_batches_written - 1) * num_rows_per_write + 2; i < num_batches_written * num_rows_per_write; i++)
{
temp.push_back(Int64(i));
temp.push_back(static_cast<Int64>(i));
}

{
Expand Down Expand Up @@ -906,6 +933,4 @@ try
}
CATCH

} // namespace tests
} // namespace DM
} // namespace DB
} // namespace DB::DM::tests

0 comments on commit 1b2a2ea

Please sign in to comment.