Skip to content

Commit

Permalink
Storages: Fix cloning delta index when there are duplicated tuples (#…
Browse files Browse the repository at this point in the history
…9000) (#9016)

close #8845

Signed-off-by: ti-chi-bot <[email protected]>

Co-authored-by: jinhelin <[email protected]>
  • Loading branch information
ti-chi-bot and JinheLin authored May 29, 2024
1 parent 7f6cd1b commit 61348e8
Show file tree
Hide file tree
Showing 10 changed files with 433 additions and 20 deletions.
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ size_t DeltaValueReader::readRows(MutableColumns & output_cols, size_t offset, s
//
// So here, we should filter out those out-of-range rows.

auto mem_table_rows_offset = delta_snap->getMemTableSetRowsOffset();
auto total_delta_rows = delta_snap->getRows();
const auto mem_table_rows_offset = delta_snap->getMemTableSetRowsOffset();
const auto total_delta_rows = delta_snap->getRows();

auto persisted_files_start = std::min(offset, mem_table_rows_offset);
auto persisted_files_end = std::min(offset + limit, mem_table_rows_offset);
Expand Down
28 changes: 16 additions & 12 deletions dbms/src/Storages/DeltaMerge/DeltaIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@ class DeltaIndex
}
}

DeltaIndexPtr tryCloneInner(size_t placed_deletes_limit, const Updates * updates = nullptr)
DeltaIndexPtr tryCloneInner(size_t rows_limit, size_t placed_deletes_limit, const Updates * updates = nullptr)
{
DeltaTreePtr delta_tree_copy;
size_t placed_rows_copy = 0;
size_t placed_deletes_copy = 0;
// Make sure the delta index do not place more deletes than `placed_deletes_limit`.
// Because delete ranges can break MVCC view.
{
std::scoped_lock lock(mutex);
// Safe to reuse the copy of the existing DeltaIndex
if (placed_deletes <= placed_deletes_limit)
// Make sure the MVCC view will not be broken by the mismatch of delta index and snapshot:
// - First, make sure the delta index do not place more deletes than `placed_deletes_limit`.
// - Second, make sure the snapshot includes all duplicated tuples in the delta index.
if (placed_deletes <= placed_deletes_limit && delta_tree->maxDupTupleID() < static_cast<Int64>(rows_limit))
{
delta_tree_copy = delta_tree;
placed_rows_copy = placed_rows;
Expand Down Expand Up @@ -186,8 +186,9 @@ class DeltaIndex
{
std::scoped_lock lock(mutex);

if ((maybe_advanced.placed_rows >= placed_rows && maybe_advanced.placed_deletes >= placed_deletes)
&& !(maybe_advanced.placed_rows == placed_rows && maybe_advanced.placed_deletes == placed_deletes))
if ((maybe_advanced.placed_rows >= placed_rows && maybe_advanced.placed_deletes >= placed_deletes) // advance
// not excatly the same
&& (maybe_advanced.placed_rows != placed_rows || maybe_advanced.placed_deletes != placed_deletes))
{
delta_tree = maybe_advanced.delta_tree;
placed_rows = maybe_advanced.placed_rows;
Expand All @@ -197,14 +198,17 @@ class DeltaIndex
return false;
}

DeltaIndexPtr tryClone(size_t /*rows*/, size_t deletes) { return tryCloneInner(deletes); }
/**
* Try to get a clone of current instance.
* Return an empty DeltaIndex if `deletes < this->placed_deletes` because the advanced delta-index will break
* the MVCC view.
*/
DeltaIndexPtr tryClone(size_t rows, size_t deletes) { return tryCloneInner(rows, deletes); }

DeltaIndexPtr cloneWithUpdates(const Updates & updates)
{
if (unlikely(updates.empty()))
throw Exception("Unexpected empty updates");

return tryCloneInner(updates.front().delete_ranges_offset, &updates);
RUNTIME_CHECK_MSG(!updates.empty(), "Unexpected empty updates");
return tryCloneInner(updates.front().rows_offset, updates.front().delete_ranges_offset, &updates);
}
};

Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaPlace.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,10 @@ bool placeInsert(const SkippableBlockInputStreamPtr & stable, //
tuple_id = delta_value_space_offset + (offset + i);

if (dup)
{
delta_tree.addDelete(rid);
delta_tree.setMaxDupTupleID(tuple_id);
}
delta_tree.addInsert(rid, tuple_id);
}

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,7 @@ class DeltaTree
size_t num_inserts = 0;
size_t num_deletes = 0;
size_t num_entries = 0;
Int64 max_dup_tuple_id = -1;

std::unique_ptr<Allocator> allocator;
size_t bytes = 0;
Expand Down Expand Up @@ -989,6 +990,8 @@ class DeltaTree
size_t numEntries() const { return num_entries; }
size_t numInserts() const { return num_inserts; }
size_t numDeletes() const { return num_deletes; }
Int64 maxDupTupleID() const { return max_dup_tuple_id; }
void setMaxDupTupleID(Int64 tuple_id) { max_dup_tuple_id = std::max(tuple_id, max_dup_tuple_id); }

void addDelete(UInt64 rid);
void addInsert(UInt64 rid, UInt64 tuple_id);
Expand All @@ -1005,6 +1008,7 @@ DT_CLASS::DeltaTree(const DT_CLASS::Self & o)
, num_inserts(o.num_inserts)
, num_deletes(o.num_deletes)
, num_entries(o.num_entries)
, max_dup_tuple_id(o.max_dup_tuple_id)
, allocator(std::make_unique<Allocator>())
{
// If exception is thrown before clear copying_nodes, all nodes will be destroyed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ extern const char pause_when_reading_from_dt_stream[];

namespace DB::DM
{
namespace tests
{
class DeltaMergeStoreRWTest;
}
class UnorderedInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "UnorderedInputStream";
Expand Down Expand Up @@ -141,5 +145,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream
int64_t ref_no;
size_t total_rows = 0;
bool task_pool_added;

friend tests::DeltaMergeStoreRWTest;
};
} // namespace DB::DM
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1998,7 +1998,8 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(const DMContext & dm_context
UInt64 max_version) const
{
auto delta_snap = delta_reader->getDeltaSnap();
// Clone a new delta index.
// Try to clone from the sahred delta index, if it fails to reuse the shared delta index,
// it will return an empty delta index and we should place it in the following branch.
auto my_delta_index = delta_snap->getSharedDeltaIndex()->tryClone(delta_snap->getRows(), delta_snap->getDeletes());
auto my_delta_tree = my_delta_index->getDeltaTree();

Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@

namespace DB::DM
{
namespace tests
{
class DeltaMergeStoreRWTest;
}

class Segment;
struct SegmentSnapshot;
using SegmentSnapshotPtr = std::shared_ptr<SegmentSnapshot>;
Expand Down Expand Up @@ -616,6 +621,8 @@ class Segment

const LoggerPtr parent_log; // Used when constructing new segments in split
const LoggerPtr log;

friend tests::DeltaMergeStoreRWTest;
};

} // namespace DB::DM
180 changes: 180 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <algorithm>
#include <future>
#include <iterator>
#include <memory>
#include <random>

namespace DB
Expand Down Expand Up @@ -3460,6 +3461,185 @@ try
CATCH


void DeltaMergeStoreRWTest::dupHandleVersionAndDeltaIndexAdvancedThanSnapshot()
{
auto table_column_defines = DMTestEnv::getDefaultColumns();
store = reload(table_column_defines);

auto create_block = [&](UInt64 beg, UInt64 end, UInt64 ts) {
auto block = DMTestEnv::prepareSimpleWriteBlock(beg, end, false, ts);
block.checkNumberOfRows();
return block;
};

auto write_block = [&](UInt64 beg, UInt64 end, UInt64 ts) {
auto block = create_block(beg, end, ts);
store->write(*db_context, db_context->getSettingsRef(), block);
};

auto create_stream = [&]() {
return store->read(
*db_context,
db_context->getSettingsRef(),
store->getTableColumns(),
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* start_ts= */ std::numeric_limits<UInt64>::max(),
EMPTY_FILTER,
TRACING_NAME,
/* keep_order= */ false,
/* is_fast_scan= */ false,
DEFAULT_BLOCK_SIZE)[0];
};

auto count_rows = [](BlockInputStreamPtr stream) {
std::size_t count = 0;
stream->readPrefix();
for (;;)
{
auto block = stream->read();
if (!block)
{
break;
}
count += block.rows();
}
stream->readSuffix();
return count;
};

auto get_seg_read_task = [&](BlockInputStreamPtr stream) {
auto unordered_stream = std::dynamic_pointer_cast<UnorderedInputStream>(stream);
const auto & tasks = unordered_stream->task_pool->getTasks();
RUNTIME_CHECK(tasks.size() == 1, tasks.size());
return tasks.begin()->second;
};

auto clone_delta_index = [](SegmentReadTaskPtr seg_read_task) {
auto delta_snap = seg_read_task->read_snapshot->delta;
return delta_snap->getSharedDeltaIndex()->tryClone(delta_snap->getRows(), delta_snap->getDeletes());
};

auto check_delta_index
= [](DeltaIndexPtr delta_index, size_t expect_rows, size_t expect_deletes, Int64 expect_max_dup_tuple_id) {
auto [placed_rows, placed_deletes] = delta_index->getPlacedStatus();
ASSERT_EQ(placed_rows, expect_rows);
ASSERT_EQ(placed_deletes, expect_deletes);
ASSERT_EQ(delta_index->getDeltaTree()->maxDupTupleID(), expect_max_dup_tuple_id);
};

auto ensure_place = [&](SegmentReadTaskPtr seg_read_task) {
auto pk_ver_col_defs = std::make_shared<ColumnDefines>(
ColumnDefines{getExtraHandleColumnDefine(dm_context->is_common_handle), getVersionColumnDefine()});
auto delta_reader = std::make_shared<DeltaValueReader>(
*dm_context,
seg_read_task->read_snapshot->delta,
pk_ver_col_defs,
RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));
return seg_read_task->segment->ensurePlace(
*dm_context,
seg_read_task->read_snapshot->stable,
delta_reader,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
std::numeric_limits<UInt64>::max());
};

// Write [0, 128) with ts 1 for initializing stable.
write_block(0, 128, 1);
store->mergeDeltaAll(*db_context);

// Write [50, 60) with ts 2 for initializing delta.
write_block(50, 60, 2);

// Scan table normally.
{
auto stream = create_stream();
auto count = count_rows(stream);
ASSERT_EQ(count, 128);
}

// The snapshot does not include all the duplicated tuples of the delta index.
// This snapshot should rebuild delta index for itself.
// https://github.com/pingcap/tiflash/issues/8845
{
// Create snapshot but not place index
auto stream1 = create_stream();

// !!!Duplicated!!!: Write [50, 60) with ts 2
write_block(50, 60, 2);

// Place index with newest data.
auto stream2 = create_stream();
auto count2 = count_rows(stream2);
ASSERT_EQ(count2, 128);

// stream1 should not resue delta index of stream2

// Check cloning delta index
{
auto seg_read_task = get_seg_read_task(stream1);

// Shared delta index has been placed to the newest by `count_rows(stream2)`.
auto shared_delta_index = seg_read_task->read_snapshot->delta->getSharedDeltaIndex();
check_delta_index(shared_delta_index, 20, 0, 19);

// Cannot clone delta index because it contains duplicated records in the gap of snapshot and the shared delta index.
auto cloned_delta_index = clone_delta_index(seg_read_task);
check_delta_index(cloned_delta_index, 0, 0, -1);
}
// Check scanning result of stream1
auto count1 = count_rows(stream1);
ASSERT_EQ(count1, count2);
}

// Make sure shared delta index can be reused by new snapshot
{
auto stream = create_stream();
auto seg_read_task = get_seg_read_task(stream);
auto cloned_delta_index = clone_delta_index(seg_read_task);
check_delta_index(cloned_delta_index, 20, 0, 19);
}

// The snapshot includes all the duplicated tuples of the delta index.
// Delta index can be reused safely.
{
write_block(70, 80, 2);
auto stream = create_stream();
auto seg_read_task = get_seg_read_task(stream);
auto shared_delta_index = seg_read_task->read_snapshot->delta->getSharedDeltaIndex();
check_delta_index(shared_delta_index, 20, 0, 19);
auto cloned_delta_index = clone_delta_index(seg_read_task);
check_delta_index(cloned_delta_index, 20, 0, 19);
auto [placed_delta_index, fully_indexed] = ensure_place(seg_read_task);
ASSERT_TRUE(fully_indexed);
check_delta_index(placed_delta_index, 30, 0, 19);
auto count = count_rows(stream);
ASSERT_EQ(count, 128);
}

{
write_block(75, 85, 2);
auto stream = create_stream();
auto seg_read_task = get_seg_read_task(stream);
auto shared_delta_index = seg_read_task->read_snapshot->delta->getSharedDeltaIndex();
check_delta_index(shared_delta_index, 30, 0, 19);
auto cloned_delta_index = clone_delta_index(seg_read_task);
check_delta_index(cloned_delta_index, 30, 0, 19);
auto [placed_delta_index, fully_indexed] = ensure_place(seg_read_task);
ASSERT_TRUE(fully_indexed);
check_delta_index(placed_delta_index, 40, 0, 34);
auto count = count_rows(stream);
ASSERT_EQ(count, 128);
}
}

TEST_P(DeltaMergeStoreRWTest, DupHandleVersionAndDeltaIndexAdvancedThanSnapshot)
try
{
dupHandleVersionAndDeltaIndexAdvancedThanSnapshot();
}
CATCH

} // namespace tests
} // namespace DM
} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class DeltaMergeStoreRWTest
{
TiFlashStorageTestBasic::SetUp();
store = reload();
dm_context = store->newDMContext(*db_context, db_context->getSettingsRef());
}

DeltaMergeStorePtr
Expand Down Expand Up @@ -182,8 +183,11 @@ class DeltaMergeStoreRWTest
protected:
TestMode mode;
DeltaMergeStorePtr store;
DMContextPtr dm_context;

constexpr static const char * TRACING_NAME = "DeltaMergeStoreRWTest";

void dupHandleVersionAndDeltaIndexAdvancedThanSnapshot();
};
} // namespace tests
} // namespace DM
Expand Down
Loading

0 comments on commit 61348e8

Please sign in to comment.