diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 5f8770a2356..9f2b43d75e4 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1779,6 +1779,29 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le right->info(), dm_context.min_version); + /// This segment may contain some rows that not belong to this segment range which is left by previous split operation. + /// And only saved data in this segment will be filtered by the segment range in the merge process, + /// unsaved data will be directly copied to the new segment. + /// So we flush here to make sure that all potential data left by previous split operation is saved. + while (!left->flushCache(dm_context)) + { + // keep flush until success if not abandoned + if (left->hasAbandoned()) + { + LOG_FMT_DEBUG(log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId()); + return; + } + } + while (!right->flushCache(dm_context)) + { + // keep flush until success if not abandoned + if (right->hasAbandoned()) + { + LOG_FMT_DEBUG(log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId()); + return; + } + } + SegmentSnapshotPtr left_snap; SegmentSnapshotPtr right_snap; ColumnDefinesPtr schema_snap; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 22bca4c135c..3f427298873 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -88,7 +88,7 @@ DMFilePtr writeIntoNewDMFile(DMContext & dm_context, // { auto dmfile = DMFile::create(file_id, parent_path, flags.isSingleFile(), dm_context.createChecksumConfig(flags.isSingleFile())); auto output_stream = std::make_shared(dm_context.db_context, dmfile, *schema_snap, flags); - auto * mvcc_stream = typeid_cast *>(input_stream.get()); + const auto * mvcc_stream = typeid_cast *>(input_stream.get()); input_stream->readPrefix(); output_stream->writePrefix(); @@ -290,7 +290,7 @@ bool Segment::writeToCache(DMContext & dm_context, const Block & block, size_t o return delta->appendToCache(dm_context, block, offset, limit); } -bool Segment::write(DMContext & dm_context, const Block & block) +bool Segment::write(DMContext & dm_context, const Block & block, bool flush_cache) { LOG_TRACE(log, "Segment [" << segment_id << "] write to disk rows: " << block.rows()); WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter()); @@ -300,7 +300,14 @@ bool Segment::write(DMContext & dm_context, const Block & block) if (delta->appendPack(dm_context, pack)) { - flushCache(dm_context); + if (flush_cache) + { + while (!flushCache(dm_context)) + { + if (hasAbandoned()) + return false; + } + } return true; } else @@ -440,7 +447,7 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_co const SegmentSnapshotPtr & segment_snap, const RowKeyRange & data_range, size_t expected_block_size, - bool reorgnize_block) const + bool reorganize_block) const { RowKeyRanges data_ranges{data_range}; auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, data_ranges); @@ -457,7 +464,7 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_co data_stream = std::make_shared>(data_stream, data_ranges, 0); - if (reorgnize_block) + if (reorganize_block) { data_stream = std::make_shared>(data_stream, EXTRA_HANDLE_COLUMN_ID, is_common_handle); } @@ -665,7 +672,7 @@ std::optional Segment::getSplitPointFast(DMContext & dm_context, co size_t split_row_index = stable_rows / 2; - auto & dmfiles = stable_snap->getDMFiles(); + const auto & dmfiles = stable_snap->getDMFiles(); DMFilePtr read_file; size_t file_index = 0; @@ -675,13 +682,13 @@ std::optional Segment::getSplitPointFast(DMContext & dm_context, co size_t cur_rows = 0; for (size_t index = 0; index < dmfiles.size(); index++) { - auto & file = dmfiles[index]; + const auto & file = dmfiles[index]; size_t rows_in_file = file->getRows(); cur_rows += rows_in_file; if (cur_rows > split_row_index) { cur_rows -= rows_in_file; - auto & pack_stats = file->getPackStats(); + const auto & pack_stats = file->getPackStats(); for (size_t pack_id = 0; pack_id < pack_stats.size(); ++pack_id) { cur_rows += pack_stats[pack_id].rows; @@ -745,7 +752,7 @@ std::optional Segment::getSplitPointSlow( { EventRecorder recorder(ProfileEvents::DMSegmentGetSplitPoint, ProfileEvents::DMSegmentGetSplitPointNS); - auto & pk_col = getExtraHandleColumnDefine(is_common_handle); + const auto & pk_col = getExtraHandleColumnDefine(is_common_handle); auto pk_col_defs = std::make_shared(ColumnDefines{pk_col}); // We need to create a new delta_reader here, because the one in read_info is used to read columns other than PK column. auto delta_reader = read_info.getDeltaReader(pk_col_defs); @@ -878,13 +885,15 @@ std::optional Segment::prepareSplitLogical(DMContext & dm_co return {}; } - GenPageId log_gen_page_id = std::bind(&StoragePool::newLogPageId, &storage_pool); + GenPageId log_gen_page_id = [&]() { + return storage_pool.newLogPageId(); + }; DMFiles my_stable_files; DMFiles other_stable_files; auto delegate = dm_context.path_pool.getStableDiskDelegator(); - for (auto & dmfile : segment_snap->stable->getDMFiles()) + for (const auto & dmfile : segment_snap->stable->getDMFiles()) { auto ori_ref_id = dmfile->refId(); auto file_id = dmfile->fileId(); @@ -1020,7 +1029,7 @@ std::optional Segment::prepareSplitPhysical(DMContext & dm_c LOG_INFO(log, "prepare other_stable done"); // Remove old stable's files. - for (auto & file : stable->getDMFiles()) + for (const auto & file : stable->getDMFiles()) { // Here we should remove the ref id instead of file_id. // Because a dmfile could be used by several segments, and only after all ref_ids are removed, then the file_id removed. @@ -1091,6 +1100,29 @@ SegmentPair Segment::applySplit(DMContext & dm_context, // SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schema_snap, const SegmentPtr & left, const SegmentPtr & right) { WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter()); + /// This segment may contain some rows that not belong to this segment range which is left by previous split operation. + /// And only saved data in this segment will be filtered by the segment range in the merge process, + /// unsaved data will be directly copied to the new segment. + /// So we flush here to make sure that all potential data left by previous split operation is saved. + while (!left->flushCache(dm_context)) + { + // keep flush until success if not abandoned + if (left->hasAbandoned()) + { + LOG_FMT_DEBUG(left->log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId()); + return {}; + } + } + while (!right->flushCache(dm_context)) + { + // keep flush until success if not abandoned + if (right->hasAbandoned()) + { + LOG_FMT_DEBUG(right->log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId()); + return {}; + } + } + auto left_snap = left->createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentMerge); auto right_snap = right->createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentMerge); @@ -1111,6 +1143,10 @@ SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schem return merged; } +/// Segments may contain some rows that not belong to its range which is left by previous split operation. +/// And only saved data in the segment will be filtered by the segment range in the merge process, +/// unsaved data will be directly copied to the new segment. +/// So remember to do a flush for the segments before merge. StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, // const ColumnDefinesPtr & schema_snap, const SegmentPtr & left, @@ -1125,7 +1161,7 @@ StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, // throw Exception("The ranges of merge segments are not consecutive: first end: " + left->rowkey_range.getEnd().toDebugString() + ", second start: " + right->rowkey_range.getStart().toDebugString()); - auto getStream = [&](const SegmentPtr & segment, const SegmentSnapshotPtr & segment_snap) { + auto get_stream = [&](const SegmentPtr & segment, const SegmentSnapshotPtr & segment_snap) { auto read_info = segment->getReadInfo( dm_context, *schema_snap, @@ -1153,8 +1189,8 @@ StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, // return stream; }; - auto left_stream = getStream(left, left_snap); - auto right_stream = getStream(right, right_snap); + auto left_stream = get_stream(left, left_snap); + auto right_stream = get_stream(right, right_snap); BlockInputStreamPtr merged_stream = std::make_shared(BlockInputStreams{left_stream, right_stream}, nullptr); // for the purpose to calculate StableProperty of the new segment @@ -1193,16 +1229,16 @@ SegmentPtr Segment::applyMerge(DMContext & dm_context, // /// Make sure saved packs are appended before unsaved packs. DeltaPacks merged_packs; - auto L_first_unsaved + auto l_first_unsaved = std::find_if(left_tail_packs.begin(), left_tail_packs.end(), [](const DeltaPackPtr & p) { return !p->isSaved(); }); - auto R_first_unsaved + auto r_first_unsaved = std::find_if(right_tail_packs.begin(), right_tail_packs.end(), [](const DeltaPackPtr & p) { return !p->isSaved(); }); - merged_packs.insert(merged_packs.end(), left_tail_packs.begin(), L_first_unsaved); - merged_packs.insert(merged_packs.end(), right_tail_packs.begin(), R_first_unsaved); + merged_packs.insert(merged_packs.end(), left_tail_packs.begin(), l_first_unsaved); + merged_packs.insert(merged_packs.end(), right_tail_packs.begin(), r_first_unsaved); - merged_packs.insert(merged_packs.end(), L_first_unsaved, left_tail_packs.end()); - merged_packs.insert(merged_packs.end(), R_first_unsaved, right_tail_packs.end()); + merged_packs.insert(merged_packs.end(), l_first_unsaved, left_tail_packs.end()); + merged_packs.insert(merged_packs.end(), r_first_unsaved, right_tail_packs.end()); auto merged_delta = std::make_shared(left->delta->getId(), merged_packs); @@ -1334,9 +1370,8 @@ ColumnDefinesPtr Segment::arrangeReadColumns(const ColumnDefine & handle, const new_columns_to_read.push_back(getVersionColumnDefine()); new_columns_to_read.push_back(getTagColumnDefine()); - for (size_t i = 0; i < columns_to_read.size(); ++i) + for (const auto & c : columns_to_read) { - auto & c = columns_to_read[i]; if (c.id != handle.id && c.id != VERSION_COLUMN_ID && c.id != TAG_COLUMN_ID) new_columns_to_read.push_back(c); } @@ -1495,7 +1530,7 @@ bool Segment::placeUpsert(const DMContext & dm_context, IColumn::Permutation perm; - auto & handle = getExtraHandleColumnDefine(is_common_handle); + const auto & handle = getExtraHandleColumnDefine(is_common_handle); bool do_sort = sortBlockByPk(handle, block, perm); RowKeyValueRef first_rowkey = RowKeyColumnContainer(block.getByPosition(0).column, is_common_handle).getRowKeyValue(0); RowKeyValueRef range_start = relevant_range.getStart(); @@ -1549,7 +1584,7 @@ bool Segment::placeDelete(const DMContext & dm_context, { EventRecorder recorder(ProfileEvents::DMPlaceDeleteRange, ProfileEvents::DMPlaceDeleteRangeNS); - auto & handle = getExtraHandleColumnDefine(is_common_handle); + const auto & handle = getExtraHandleColumnDefine(is_common_handle); RowKeyRanges delete_ranges{delete_range}; Blocks delete_data; diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index e28c0841140..f55ce222fdc 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -124,7 +124,7 @@ class Segment : private boost::noncopyable bool writeToDisk(DMContext & dm_context, const DeltaPackPtr & pack); bool writeToCache(DMContext & dm_context, const Block & block, size_t offset, size_t limit); - bool write(DMContext & dm_context, const Block & block); // For test only + bool write(DMContext & dm_context, const Block & block, bool flush_cache = true); // For test only bool write(DMContext & dm_context, const RowKeyRange & delete_range); bool ingestPacks(DMContext & dm_context, const RowKeyRange & range, const DeltaPacks & packs, bool clear_data_in_range); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 682710f738d..f98ef93c79f 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -935,11 +935,17 @@ CATCH TEST_F(Segment_test, Split) try { - const size_t num_rows_write = 100; + const size_t num_rows_write_per_batch = 100; + const size_t num_rows_write = num_rows_write_per_batch * 2; { - // write to segment - Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); - segment->write(dmContext(), std::move(block)); + // write to segment and flush + Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write_per_batch, false); + segment->write(dmContext(), std::move(block), true); + } + { + // write to segment and don't flush + Block block = DMTestEnv::prepareSimpleWriteBlock(num_rows_write_per_batch, 2 * num_rows_write_per_batch, false); + segment->write(dmContext(), std::move(block), false); } { @@ -975,7 +981,7 @@ try size_t num_rows_seg2 = 0; { { - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStream(dmContext(), *tableColumns(), {segment->getRowKeyRange()}); in->readPrefix(); while (Block block = in->read()) { @@ -984,7 +990,7 @@ try in->readSuffix(); } { - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = new_segment->getInputStream(dmContext(), *tableColumns(), {new_segment->getRowKeyRange()}); in->readPrefix(); while (Block block = in->read()) { @@ -995,9 +1001,13 @@ try ASSERT_EQ(num_rows_seg1 + num_rows_seg2, num_rows_write); } + // delete rows in the right segment + { + new_segment->write(dmContext(), /*delete_range*/ new_segment->getRowKeyRange()); + new_segment->flushCache(dmContext()); + } + // merge segments - // TODO: enable merge test! - if (false) { segment = Segment::merge(dmContext(), tableColumns(), segment, new_segment); { @@ -1016,7 +1026,7 @@ try num_rows_read += block.rows(); } in->readSuffix(); - EXPECT_EQ(num_rows_read, num_rows_write); + EXPECT_EQ(num_rows_read, num_rows_seg1); } } }