Skip to content

Commit

Permalink
Flush segment cache when doing the compaction
Browse files Browse the repository at this point in the history
Signed-off-by: Wish <[email protected]>
  • Loading branch information
breezewish committed Jul 4, 2022
1 parent a89222a commit d50ae3b
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 0 deletions.
7 changes: 7 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,13 @@ std::optional<DM::RowKeyRange> DeltaMergeStore::mergeDeltaBySegment(const Contex
segment = segment_it->second;
}

if (!segment->flushCache(*dm_context))
{
// If the flush failed, it means there are parallel updates to the segment in the background.
// In this case, we try again.
continue;
}

const auto new_segment = segmentMergeDelta(*dm_context, segment, run_thread);
if (new_segment)
{
Expand Down
49 changes: 49 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3762,6 +3762,55 @@ try
CATCH


// Verify that unflushed data will also be compacted.
TEST_P(DeltaMergeStoreMergeDeltaBySegmentTest, Flush)
try
{
{
// Write data to first 3 segments and flush.
auto newly_written_rows = helper->rows_by_segments[0] + helper->rows_by_segments[1] + helper->rows_by_segments[2];
Block block = DMTestEnv::prepareSimpleWriteBlock(0, newly_written_rows, false, pk_type, 5 /* new tso */);
store->write(*db_context, db_context->getSettingsRef(), block);
store->flushCache(dm_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));

helper->expected_delta_rows[0] += helper->rows_by_segments[0];
helper->expected_delta_rows[1] += helper->rows_by_segments[1];
helper->expected_delta_rows[2] += helper->rows_by_segments[2];
helper->verifyExpectedRowsForAllSegments();

auto segment1 = std::next(store->segments.begin())->second;
ASSERT_EQ(segment1->getDelta()->getUnsavedRows(), 0);
}
{
// Write new data to segment[1] without flush.
auto newly_written_rows = helper->rows_by_segments[1];
Block block = DMTestEnv::prepareSimpleWriteBlock(helper->rows_by_segments[0], helper->rows_by_segments[0] + newly_written_rows, false, pk_type, 10 /* new tso */);
store->write(*db_context, db_context->getSettingsRef(), block);

helper->expected_delta_rows[1] += helper->rows_by_segments[1];
helper->verifyExpectedRowsForAllSegments();

auto segment1 = std::next(store->segments.begin())->second;
ASSERT_GT(segment1->getDelta()->getUnsavedRows(), 0);
}
{
auto segment1 = std::next(store->segments.begin())->second;
auto result = store->mergeDeltaBySegment(*db_context, segment1->getRowKeyRange().start, DeltaMergeStore::TaskRunThread::Foreground);
ASSERT_NE(result, std::nullopt);

segment1 = std::next(store->segments.begin())->second;
ASSERT_EQ(*result, segment1->getRowKeyRange());

helper->expected_stable_rows[1] += helper->expected_delta_rows[1];
helper->expected_delta_rows[1] = 0;
helper->verifyExpectedRowsForAllSegments();

ASSERT_EQ(segment1->getDelta()->getUnsavedRows(), 0);
}
}
CATCH


} // namespace tests
} // namespace DM
} // namespace DB

0 comments on commit d50ae3b

Please sign in to comment.