Skip to content

Commit

Permalink
flush cache before segment merge (#4955) (#4966)
Browse files Browse the repository at this point in the history
close #4956
  • Loading branch information
ti-chi-bot authored Jun 2, 2022
1 parent 4eac5c3 commit f8b2a80
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 12 deletions.
23 changes: 23 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1721,6 +1721,29 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le
(is_foreground ? "Foreground" : "Background")
<< " merge Segment [" << left->info() << "] and [" << right->info() << "], safe point:" << dm_context.min_version);

/// This segment may contain some rows that not belong to this segment range which is left by previous split operation.
/// And only saved data in this segment will be filtered by the segment range in the merge process,
/// unsaved data will be directly copied to the new segment.
/// So we flush here to make sure that all potential data left by previous split operation is saved.
while (!left->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (left->hasAbandoned())
{
LOG_DEBUG(log, "Give up merge segments left [" << left->segmentId() << "], right [" << right->segmentId() << "]");
return;
}
}
while (!right->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (right->hasAbandoned())
{
LOG_DEBUG(log, "Give up merge segments left [" << left->segmentId() << "], right [" << right->segmentId() << "]");
return;
}
}

SegmentSnapshotPtr left_snap;
SegmentSnapshotPtr right_snap;
ColumnDefinesPtr schema_snap;
Expand Down
38 changes: 36 additions & 2 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ bool Segment::writeToCache(DMContext & dm_context, const Block & block, size_t o
return delta->appendToCache(dm_context, block, offset, limit);
}

bool Segment::write(DMContext & dm_context, const Block & block)
bool Segment::write(DMContext & dm_context, const Block & block, bool flush_cache)
{
LOG_TRACE(log, "Segment [" << segment_id << "] write to disk rows: " << block.rows());
WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter());
Expand All @@ -296,7 +296,14 @@ bool Segment::write(DMContext & dm_context, const Block & block)

if (delta->appendPack(dm_context, pack))
{
flushCache(dm_context);
if (flush_cache)
{
while (!flushCache(dm_context))
{
if (hasAbandoned())
return false;
}
}
return true;
}
else
Expand Down Expand Up @@ -1056,6 +1063,29 @@ SegmentPair Segment::applySplit(DMContext & dm_context, //
SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schema_snap, const SegmentPtr & left, const SegmentPtr & right)
{
WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter());
/// This segment may contain some rows that not belong to this segment range which is left by previous split operation.
/// And only saved data in this segment will be filtered by the segment range in the merge process,
/// unsaved data will be directly copied to the new segment.
/// So we flush here to make sure that all potential data left by previous split operation is saved.
while (!left->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (left->hasAbandoned())
{
LOG_DEBUG(left->log, "Give up merge segments left [" << left->segmentId() << "], right [" << right->segmentId() << "]");
return {};
}
}
while (!right->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (right->hasAbandoned())
{
LOG_DEBUG(right->log, "Give up merge segments left [" << left->segmentId() << "], right [" << right->segmentId() << "]");
return {};
}
}


auto left_snap = left->createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentMerge);
auto right_snap = right->createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentMerge);
Expand All @@ -1076,6 +1106,10 @@ SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schem
return merged;
}

/// Segments may contain some rows that not belong to its range which is left by previous split operation.
/// And only saved data in the segment will be filtered by the segment range in the merge process,
/// unsaved data will be directly copied to the new segment.
/// So remember to do a flush for the segments before merge.
StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, //
const ColumnDefinesPtr & schema_snap,
const SegmentPtr & left,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class Segment : private boost::noncopyable

bool writeToDisk(DMContext & dm_context, const DeltaPackPtr & pack);
bool writeToCache(DMContext & dm_context, const Block & block, size_t offset, size_t limit);
bool write(DMContext & dm_context, const Block & block); // For test only
bool write(DMContext & dm_context, const Block & block, bool flush_cache = true); // For test only
bool write(DMContext & dm_context, const RowKeyRange & delete_range);
bool ingestPacks(DMContext & dm_context, const RowKeyRange & range, const DeltaPacks & packs, bool clear_data_in_range);

Expand Down
28 changes: 19 additions & 9 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -783,11 +783,17 @@ CATCH
TEST_F(Segment_test, Split)
try
{
const size_t num_rows_write = 100;
const size_t num_rows_write_per_batch = 100;
const size_t num_rows_write = num_rows_write_per_batch * 2;
{
// write to segment
Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false);
segment->write(dmContext(), std::move(block));
// write to segment and flush
Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write_per_batch, false);
segment->write(dmContext(), std::move(block), true);
}
{
// write to segment and don't flush
Block block = DMTestEnv::prepareSimpleWriteBlock(num_rows_write_per_batch, 2 * num_rows_write_per_batch, false);
segment->write(dmContext(), std::move(block), false);
}

{
Expand Down Expand Up @@ -823,7 +829,7 @@ try
size_t num_rows_seg2 = 0;
{
{
auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)});
auto in = segment->getInputStream(dmContext(), *tableColumns(), {segment->getRowKeyRange()});
in->readPrefix();
while (Block block = in->read())
{
Expand All @@ -832,7 +838,7 @@ try
in->readSuffix();
}
{
auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)});
auto in = new_segment->getInputStream(dmContext(), *tableColumns(), {new_segment->getRowKeyRange()});
in->readPrefix();
while (Block block = in->read())
{
Expand All @@ -843,9 +849,13 @@ try
ASSERT_EQ(num_rows_seg1 + num_rows_seg2, num_rows_write);
}

// delete rows in the right segment
{
new_segment->write(dmContext(), /*delete_range*/ new_segment->getRowKeyRange());
new_segment->flushCache(dmContext());
}

// merge segments
// TODO: enable merge test!
if (false)
{
segment = Segment::merge(dmContext(), tableColumns(), segment, new_segment);
{
Expand All @@ -864,7 +874,7 @@ try
num_rows_read += block.rows();
}
in->readSuffix();
EXPECT_EQ(num_rows_read, num_rows_write);
EXPECT_EQ(num_rows_read, num_rows_seg1);
}
}
}
Expand Down

0 comments on commit f8b2a80

Please sign in to comment.