Skip to content

Commit

Permalink
flush cache before segment merge (#4955) (#4968)
Browse files Browse the repository at this point in the history
close #4956
  • Loading branch information
ti-chi-bot authored Jun 1, 2022
1 parent 1d20105 commit aa14a01
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 35 deletions.
23 changes: 23 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1779,6 +1779,29 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le
right->info(),
dm_context.min_version);

/// This segment may contain some rows that not belong to this segment range which is left by previous split operation.
/// And only saved data in this segment will be filtered by the segment range in the merge process,
/// unsaved data will be directly copied to the new segment.
/// So we flush here to make sure that all potential data left by previous split operation is saved.
while (!left->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (left->hasAbandoned())
{
LOG_FMT_DEBUG(log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId());
return;
}
}
while (!right->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (right->hasAbandoned())
{
LOG_FMT_DEBUG(log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId());
return;
}
}

SegmentSnapshotPtr left_snap;
SegmentSnapshotPtr right_snap;
ColumnDefinesPtr schema_snap;
Expand Down
85 changes: 60 additions & 25 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ DMFilePtr writeIntoNewDMFile(DMContext & dm_context, //
{
auto dmfile = DMFile::create(file_id, parent_path, flags.isSingleFile(), dm_context.createChecksumConfig(flags.isSingleFile()));
auto output_stream = std::make_shared<DMFileBlockOutputStream>(dm_context.db_context, dmfile, *schema_snap, flags);
auto * mvcc_stream = typeid_cast<const DMVersionFilterBlockInputStream<DM_VERSION_FILTER_MODE_COMPACT> *>(input_stream.get());
const auto * mvcc_stream = typeid_cast<const DMVersionFilterBlockInputStream<DM_VERSION_FILTER_MODE_COMPACT> *>(input_stream.get());

input_stream->readPrefix();
output_stream->writePrefix();
Expand Down Expand Up @@ -290,7 +290,7 @@ bool Segment::writeToCache(DMContext & dm_context, const Block & block, size_t o
return delta->appendToCache(dm_context, block, offset, limit);
}

bool Segment::write(DMContext & dm_context, const Block & block)
bool Segment::write(DMContext & dm_context, const Block & block, bool flush_cache)
{
LOG_TRACE(log, "Segment [" << segment_id << "] write to disk rows: " << block.rows());
WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter());
Expand All @@ -300,7 +300,14 @@ bool Segment::write(DMContext & dm_context, const Block & block)

if (delta->appendPack(dm_context, pack))
{
flushCache(dm_context);
if (flush_cache)
{
while (!flushCache(dm_context))
{
if (hasAbandoned())
return false;
}
}
return true;
}
else
Expand Down Expand Up @@ -440,7 +447,7 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_co
const SegmentSnapshotPtr & segment_snap,
const RowKeyRange & data_range,
size_t expected_block_size,
bool reorgnize_block) const
bool reorganize_block) const
{
RowKeyRanges data_ranges{data_range};
auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, data_ranges);
Expand All @@ -457,7 +464,7 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_co


data_stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(data_stream, data_ranges, 0);
if (reorgnize_block)
if (reorganize_block)
{
data_stream = std::make_shared<PKSquashingBlockInputStream<false>>(data_stream, EXTRA_HANDLE_COLUMN_ID, is_common_handle);
}
Expand Down Expand Up @@ -665,7 +672,7 @@ std::optional<RowKeyValue> Segment::getSplitPointFast(DMContext & dm_context, co

size_t split_row_index = stable_rows / 2;

auto & dmfiles = stable_snap->getDMFiles();
const auto & dmfiles = stable_snap->getDMFiles();

DMFilePtr read_file;
size_t file_index = 0;
Expand All @@ -675,13 +682,13 @@ std::optional<RowKeyValue> Segment::getSplitPointFast(DMContext & dm_context, co
size_t cur_rows = 0;
for (size_t index = 0; index < dmfiles.size(); index++)
{
auto & file = dmfiles[index];
const auto & file = dmfiles[index];
size_t rows_in_file = file->getRows();
cur_rows += rows_in_file;
if (cur_rows > split_row_index)
{
cur_rows -= rows_in_file;
auto & pack_stats = file->getPackStats();
const auto & pack_stats = file->getPackStats();
for (size_t pack_id = 0; pack_id < pack_stats.size(); ++pack_id)
{
cur_rows += pack_stats[pack_id].rows;
Expand Down Expand Up @@ -745,7 +752,7 @@ std::optional<RowKeyValue> Segment::getSplitPointSlow(
{
EventRecorder recorder(ProfileEvents::DMSegmentGetSplitPoint, ProfileEvents::DMSegmentGetSplitPointNS);

auto & pk_col = getExtraHandleColumnDefine(is_common_handle);
const auto & pk_col = getExtraHandleColumnDefine(is_common_handle);
auto pk_col_defs = std::make_shared<ColumnDefines>(ColumnDefines{pk_col});
// We need to create a new delta_reader here, because the one in read_info is used to read columns other than PK column.
auto delta_reader = read_info.getDeltaReader(pk_col_defs);
Expand Down Expand Up @@ -878,13 +885,15 @@ std::optional<Segment::SplitInfo> Segment::prepareSplitLogical(DMContext & dm_co
return {};
}

GenPageId log_gen_page_id = std::bind(&StoragePool::newLogPageId, &storage_pool);
GenPageId log_gen_page_id = [&]() {
return storage_pool.newLogPageId();
};

DMFiles my_stable_files;
DMFiles other_stable_files;

auto delegate = dm_context.path_pool.getStableDiskDelegator();
for (auto & dmfile : segment_snap->stable->getDMFiles())
for (const auto & dmfile : segment_snap->stable->getDMFiles())
{
auto ori_ref_id = dmfile->refId();
auto file_id = dmfile->fileId();
Expand Down Expand Up @@ -1020,7 +1029,7 @@ std::optional<Segment::SplitInfo> Segment::prepareSplitPhysical(DMContext & dm_c
LOG_INFO(log, "prepare other_stable done");

// Remove old stable's files.
for (auto & file : stable->getDMFiles())
for (const auto & file : stable->getDMFiles())
{
// Here we should remove the ref id instead of file_id.
// Because a dmfile could be used by several segments, and only after all ref_ids are removed, then the file_id removed.
Expand Down Expand Up @@ -1091,6 +1100,29 @@ SegmentPair Segment::applySplit(DMContext & dm_context, //
SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schema_snap, const SegmentPtr & left, const SegmentPtr & right)
{
WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter());
/// This segment may contain some rows that not belong to this segment range which is left by previous split operation.
/// And only saved data in this segment will be filtered by the segment range in the merge process,
/// unsaved data will be directly copied to the new segment.
/// So we flush here to make sure that all potential data left by previous split operation is saved.
while (!left->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (left->hasAbandoned())
{
LOG_FMT_DEBUG(left->log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId());
return {};
}
}
while (!right->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (right->hasAbandoned())
{
LOG_FMT_DEBUG(right->log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId());
return {};
}
}


auto left_snap = left->createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentMerge);
auto right_snap = right->createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentMerge);
Expand All @@ -1111,6 +1143,10 @@ SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schem
return merged;
}

/// Segments may contain some rows that not belong to its range which is left by previous split operation.
/// And only saved data in the segment will be filtered by the segment range in the merge process,
/// unsaved data will be directly copied to the new segment.
/// So remember to do a flush for the segments before merge.
StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, //
const ColumnDefinesPtr & schema_snap,
const SegmentPtr & left,
Expand All @@ -1125,7 +1161,7 @@ StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, //
throw Exception("The ranges of merge segments are not consecutive: first end: " + left->rowkey_range.getEnd().toDebugString()
+ ", second start: " + right->rowkey_range.getStart().toDebugString());

auto getStream = [&](const SegmentPtr & segment, const SegmentSnapshotPtr & segment_snap) {
auto get_stream = [&](const SegmentPtr & segment, const SegmentSnapshotPtr & segment_snap) {
auto read_info = segment->getReadInfo(
dm_context,
*schema_snap,
Expand Down Expand Up @@ -1153,8 +1189,8 @@ StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, //
return stream;
};

auto left_stream = getStream(left, left_snap);
auto right_stream = getStream(right, right_snap);
auto left_stream = get_stream(left, left_snap);
auto right_stream = get_stream(right, right_snap);

BlockInputStreamPtr merged_stream = std::make_shared<ConcatBlockInputStream>(BlockInputStreams{left_stream, right_stream}, nullptr);
// for the purpose to calculate StableProperty of the new segment
Expand Down Expand Up @@ -1193,16 +1229,16 @@ SegmentPtr Segment::applyMerge(DMContext & dm_context, //
/// Make sure saved packs are appended before unsaved packs.
DeltaPacks merged_packs;

auto L_first_unsaved
auto l_first_unsaved
= std::find_if(left_tail_packs.begin(), left_tail_packs.end(), [](const DeltaPackPtr & p) { return !p->isSaved(); });
auto R_first_unsaved
auto r_first_unsaved
= std::find_if(right_tail_packs.begin(), right_tail_packs.end(), [](const DeltaPackPtr & p) { return !p->isSaved(); });

merged_packs.insert(merged_packs.end(), left_tail_packs.begin(), L_first_unsaved);
merged_packs.insert(merged_packs.end(), right_tail_packs.begin(), R_first_unsaved);
merged_packs.insert(merged_packs.end(), left_tail_packs.begin(), l_first_unsaved);
merged_packs.insert(merged_packs.end(), right_tail_packs.begin(), r_first_unsaved);

merged_packs.insert(merged_packs.end(), L_first_unsaved, left_tail_packs.end());
merged_packs.insert(merged_packs.end(), R_first_unsaved, right_tail_packs.end());
merged_packs.insert(merged_packs.end(), l_first_unsaved, left_tail_packs.end());
merged_packs.insert(merged_packs.end(), r_first_unsaved, right_tail_packs.end());

auto merged_delta = std::make_shared<DeltaValueSpace>(left->delta->getId(), merged_packs);

Expand Down Expand Up @@ -1334,9 +1370,8 @@ ColumnDefinesPtr Segment::arrangeReadColumns(const ColumnDefine & handle, const
new_columns_to_read.push_back(getVersionColumnDefine());
new_columns_to_read.push_back(getTagColumnDefine());

for (size_t i = 0; i < columns_to_read.size(); ++i)
for (const auto & c : columns_to_read)
{
auto & c = columns_to_read[i];
if (c.id != handle.id && c.id != VERSION_COLUMN_ID && c.id != TAG_COLUMN_ID)
new_columns_to_read.push_back(c);
}
Expand Down Expand Up @@ -1495,7 +1530,7 @@ bool Segment::placeUpsert(const DMContext & dm_context,

IColumn::Permutation perm;

auto & handle = getExtraHandleColumnDefine(is_common_handle);
const auto & handle = getExtraHandleColumnDefine(is_common_handle);
bool do_sort = sortBlockByPk(handle, block, perm);
RowKeyValueRef first_rowkey = RowKeyColumnContainer(block.getByPosition(0).column, is_common_handle).getRowKeyValue(0);
RowKeyValueRef range_start = relevant_range.getStart();
Expand Down Expand Up @@ -1549,7 +1584,7 @@ bool Segment::placeDelete(const DMContext & dm_context,
{
EventRecorder recorder(ProfileEvents::DMPlaceDeleteRange, ProfileEvents::DMPlaceDeleteRangeNS);

auto & handle = getExtraHandleColumnDefine(is_common_handle);
const auto & handle = getExtraHandleColumnDefine(is_common_handle);

RowKeyRanges delete_ranges{delete_range};
Blocks delete_data;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class Segment : private boost::noncopyable

bool writeToDisk(DMContext & dm_context, const DeltaPackPtr & pack);
bool writeToCache(DMContext & dm_context, const Block & block, size_t offset, size_t limit);
bool write(DMContext & dm_context, const Block & block); // For test only
bool write(DMContext & dm_context, const Block & block, bool flush_cache = true); // For test only
bool write(DMContext & dm_context, const RowKeyRange & delete_range);
bool ingestPacks(DMContext & dm_context, const RowKeyRange & range, const DeltaPacks & packs, bool clear_data_in_range);

Expand Down
28 changes: 19 additions & 9 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -935,11 +935,17 @@ CATCH
TEST_F(Segment_test, Split)
try
{
const size_t num_rows_write = 100;
const size_t num_rows_write_per_batch = 100;
const size_t num_rows_write = num_rows_write_per_batch * 2;
{
// write to segment
Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false);
segment->write(dmContext(), std::move(block));
// write to segment and flush
Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write_per_batch, false);
segment->write(dmContext(), std::move(block), true);
}
{
// write to segment and don't flush
Block block = DMTestEnv::prepareSimpleWriteBlock(num_rows_write_per_batch, 2 * num_rows_write_per_batch, false);
segment->write(dmContext(), std::move(block), false);
}

{
Expand Down Expand Up @@ -975,7 +981,7 @@ try
size_t num_rows_seg2 = 0;
{
{
auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)});
auto in = segment->getInputStream(dmContext(), *tableColumns(), {segment->getRowKeyRange()});
in->readPrefix();
while (Block block = in->read())
{
Expand All @@ -984,7 +990,7 @@ try
in->readSuffix();
}
{
auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)});
auto in = new_segment->getInputStream(dmContext(), *tableColumns(), {new_segment->getRowKeyRange()});
in->readPrefix();
while (Block block = in->read())
{
Expand All @@ -995,9 +1001,13 @@ try
ASSERT_EQ(num_rows_seg1 + num_rows_seg2, num_rows_write);
}

// delete rows in the right segment
{
new_segment->write(dmContext(), /*delete_range*/ new_segment->getRowKeyRange());
new_segment->flushCache(dmContext());
}

// merge segments
// TODO: enable merge test!
if (false)
{
segment = Segment::merge(dmContext(), tableColumns(), segment, new_segment);
{
Expand All @@ -1016,7 +1026,7 @@ try
num_rows_read += block.rows();
}
in->readSuffix();
EXPECT_EQ(num_rows_read, num_rows_write);
EXPECT_EQ(num_rows_read, num_rows_seg1);
}
}
}
Expand Down

0 comments on commit aa14a01

Please sign in to comment.