diff --git a/dbms/src/Common/CurrentMetrics.cpp b/dbms/src/Common/CurrentMetrics.cpp index 8673784c590..a607ac4da75 100644 --- a/dbms/src/Common/CurrentMetrics.cpp +++ b/dbms/src/Common/CurrentMetrics.cpp @@ -52,6 +52,7 @@ M(DT_SnapshotOfDeltaMerge) \ M(DT_SnapshotOfDeltaCompact) \ M(DT_SnapshotOfPlaceIndex) \ + M(DT_SnapshotOfSegmentIngest) \ M(IOLimiterPendingBgWriteReq) \ M(IOLimiterPendingFgWriteReq) \ M(IOLimiterPendingBgReadReq) \ diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp index a2483872680..c3032706299 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp @@ -114,8 +114,8 @@ Segments DeltaMergeStore::ingestDTFilesUsingColumnFile( } } - // We have to commit those file_ids to PageStorage, because as soon as packs are written into segments, - // they are visible for readers who require file_ids to be found in PageStorage. + // We have to commit those file_ids to PageStorage before applying the ingest, because after the write + // they are visible for readers immediately, who require file_ids to be found in PageStorage. wbs.writeLogAndData(); bool ingest_success = segment->ingestColumnFiles(*dm_context, range.shrink(segment_range), column_files, clear_data_in_range); @@ -389,8 +389,36 @@ bool DeltaMergeStore::ingestDTFileIntoSegmentUsingSplit( * │----------- Segment ----------│ * │-------- Ingest Range --------│ */ - const auto new_segment_or_null = segmentDangerouslyReplaceData(dm_context, segment, file); + auto delegate = dm_context.path_pool.getStableDiskDelegator(); + auto file_provider = dm_context.db_context.getFileProvider(); + + WriteBatches wbs(*storage_pool, dm_context.getWriteLimiter()); + + // Generate DMFile instance with a new ref_id pointed to the file_id, + // because we may use the same DMFile to ingest into multiple segments. + auto new_page_id = storage_pool->newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__); + auto ref_file = DMFile::restore( + file_provider, + file->fileId(), + new_page_id, + file->parentPath(), + DMFile::ReadMetaMode::all()); + wbs.data.putRefPage(new_page_id, file->pageId()); + + // We have to commit those file_ids to PageStorage before applying the ingest, because after the write + // they are visible for readers immediately, who require file_ids to be found in PageStorage. + wbs.writeLogAndData(); + + const auto new_segment_or_null = segmentDangerouslyReplaceData(dm_context, segment, ref_file); const bool succeeded = new_segment_or_null != nullptr; + + if (!succeeded) + { + // When segment is not valid anymore, replaceData will fail. + // In this case, we just discard this ref file. + wbs.rollbackWrittenLogAndData(); + } + return succeeded; } else if (is_start_matching) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp index bf3097d61cc..3d667c19e38 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp @@ -488,8 +488,6 @@ SegmentPtr DeltaMergeStore::segmentDangerouslyReplaceData( { LOG_INFO(log, "ReplaceData - Begin, segment={} data_file={}", segment->info(), data_file->path()); - WriteBatches wbs(*storage_pool, dm_context.getWriteLimiter()); - SegmentPtr new_segment; { std::unique_lock lock(read_write_mutex); @@ -500,14 +498,11 @@ SegmentPtr DeltaMergeStore::segmentDangerouslyReplaceData( } auto segment_lock = segment->mustGetUpdateLock(); - new_segment = segment->dangerouslyReplaceData(segment_lock, dm_context, data_file, wbs); + new_segment = segment->replaceData(segment_lock, dm_context, data_file); RUNTIME_CHECK(compare(segment->getRowKeyRange().getEnd(), new_segment->getRowKeyRange().getEnd()) == 0, segment->info(), new_segment->info()); RUNTIME_CHECK(segment->segmentId() == new_segment->segmentId(), segment->info(), new_segment->info()); - wbs.writeLogAndData(); - wbs.writeMeta(); - segment->abandon(dm_context); segments[segment->getRowKeyRange().getEnd()] = new_segment; id_to_segment[segment->segmentId()] = new_segment; @@ -515,8 +510,6 @@ SegmentPtr DeltaMergeStore::segmentDangerouslyReplaceData( LOG_INFO(log, "ReplaceData - Finish, old_segment={} new_segment={}", segment->info(), new_segment->info()); } - wbs.writeRemoves(); - if constexpr (DM_RUN_CHECK) check(dm_context.db_context); diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 1f302372efe..1bbb41a3d3a 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -873,50 +873,40 @@ SegmentPtr Segment::applyMergeDelta(const Segment::Lock & lock, // return new_me; } -SegmentPtr Segment::dangerouslyReplaceDataForTest(DMContext & dm_context, // - const DMFilePtr & data_file) const +SegmentPtr Segment::replaceData(const Segment::Lock & lock, // + DMContext & context, + const DMFilePtr & data_file, + SegmentSnapshotPtr segment_snap_opt) const { - WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter()); - - auto lock = mustGetUpdateLock(); - auto new_segment = dangerouslyReplaceData(lock, dm_context, data_file, wbs); - - wbs.writeAll(); - return new_segment; -} + LOG_DEBUG(log, "ReplaceData - Begin, snapshot_rows={} data_file={}", segment_snap_opt == nullptr ? "" : std::to_string(segment_snap_opt->getRows()), data_file->path()); -SegmentPtr Segment::dangerouslyReplaceData(const Segment::Lock &, // - DMContext & dm_context, - const DMFilePtr & data_file, - WriteBatches & wbs) const -{ - LOG_DEBUG(log, "ReplaceData - Begin, data_file={}", data_file->path()); + ColumnFiles in_memory_files{}; + ColumnFilePersisteds persisted_files{}; - auto & storage_pool = dm_context.storage_pool; - auto delegate = dm_context.path_pool.getStableDiskDelegator(); + WriteBatches wbs(context.storage_pool, context.getWriteLimiter()); - RUNTIME_CHECK(delegate.getDTFilePath(data_file->fileId()) == data_file->parentPath()); + // If a snapshot is specified, we retain newly written data since the snapshot. + // Otherwise, we just discard everything in the delta layer. + if (segment_snap_opt != nullptr) + { + std::tie(in_memory_files, persisted_files) = delta->cloneNewlyAppendedColumnFiles( + lock, + context, + rowkey_range, + *segment_snap_opt->delta, + wbs); + } - // Always create a ref to the file to allow `data_file` being shared. - auto new_page_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__); - // TODO: We could allow assigning multiple DMFiles in future. - auto ref_file = DMFile::restore( - dm_context.db_context.getFileProvider(), - data_file->fileId(), - new_page_id, - data_file->parentPath(), - DMFile::ReadMetaMode::all()); - wbs.data.putRefPage(new_page_id, data_file->pageId()); + auto new_delta = std::make_shared( + delta->getId(), + persisted_files, + in_memory_files); + new_delta->saveMeta(wbs); auto new_stable = std::make_shared(stable->getId()); - new_stable->setFiles({ref_file}, rowkey_range, &dm_context); + new_stable->setFiles({data_file}, rowkey_range, &context); new_stable->saveMeta(wbs.meta); - // Empty new delta - auto new_delta = std::make_shared( - delta->getId()); - new_delta->saveMeta(wbs); - auto new_me = std::make_shared( // parent_log, epoch + 1, @@ -930,6 +920,8 @@ SegmentPtr Segment::dangerouslyReplaceData(const Segment::Lock &, // delta->recordRemoveColumnFilesPages(wbs); stable->recordRemovePacksPages(wbs); + wbs.writeAll(); + LOG_DEBUG(log, "ReplaceData - Finish, old_me={} new_me={}", info(), new_me->info()); return new_me; diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 004e53c55d2..b5852e79aac 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -355,29 +355,20 @@ class Segment : private boost::noncopyable const StableValueSpacePtr & new_stable) const; /** - * Only used in tests as a shortcut. - * Normally you should use `dangerouslyReplaceData`. - */ - [[nodiscard]] SegmentPtr dangerouslyReplaceDataForTest(DMContext & dm_context, const DMFilePtr & data_file) const; - - /** - * Discard all data in the current delta and stable layer, and use the specified DMFile as the stable instead. - * This API does not have a prepare & apply pair, as it should be quick enough. The specified DMFile is safe - * to be shared for multiple segments. + * Replace all data in the snapshot using the specified DMFile as the stable instead. + * Newly appended data since the snapshot was created will be retained the segment. * - * Note 1: Should be protected behind the Segment update lock to ensure no new data will be appended to this - * segment during the function call. Otherwise these new data will be lost in the new segment. + * Snapshot is optional. If the snapshot is not specified, it means everything in the + * segment now will be replaced. * - * Note 2: This function will not enable GC for the new_stable_file for you, in case of you may want to share the same - * stable file for multiple segments. It is your own duty to enable GC later. + * This API does not have a prepare & apply pair, as it should be quick enough. * - * Note 3: You must ensure the specified new_stable_file has been managed by the storage pool, and has been written - * to the PageStorage's data. Otherwise there will be exceptions. - * - * Note 4: This API is subjected to be changed in future, as it relies on the knowledge that all current data - * in this segment is useless, which is a pretty tough requirement. + * Note 1: You must ensure the DMFile is not shared in multiple segments. + * Note 2: You must enable the GC for the DMFile by yourself. + * Note 3: You must ensure the DMFile has been managed by the storage pool, and has been written + * to the PageStorage's data. */ - [[nodiscard]] SegmentPtr dangerouslyReplaceData(const Lock &, DMContext & dm_context, const DMFilePtr & data_file, WriteBatches & wbs) const; + [[nodiscard]] SegmentPtr replaceData(const Lock &, DMContext & dm_context, const DMFilePtr & data_file, SegmentSnapshotPtr segment_snap_opt = nullptr) const; [[nodiscard]] SegmentPtr dropNextSegment(WriteBatches & wbs, const RowKeyRange & next_segment_range); @@ -437,9 +428,13 @@ class Segment : private boost::noncopyable return std::exchange(lock_opt, std::nullopt).value(); } - /// Marks this segment as abandoned. - /// Note: Segment member functions never abandon the segment itself. - /// The abandon state is usually triggered by the DeltaMergeStore. + /** + * Marks this segment as abandoned. + * Note: Segment member functions never abandon the segment itself. + * The abandon state is usually triggered by the DeltaMergeStore. + * When triggering, remember to hold a unique_lock from the DeltaMergeStore. + * Otherwise, the abandon operation may break an existing segment update operation. + */ void abandon(DMContext & context) { LOG_DEBUG(log, "Abandon segment, segment={}", simpleInfo()); @@ -532,7 +527,7 @@ class Segment : private boost::noncopyable bool relevant_place) const; private: - /// The version of this segment. After split / merge / mergeDelta / dangerouslyReplaceData, epoch got increased by 1. + /// The version of this segment. After split / merge / mergeDelta / replaceData, epoch got increased by 1. const UInt64 epoch; RowKeyRange rowkey_range; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_data.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_data.cpp index 16c6c54d7bf..1c308c6a192 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_data.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_data.cpp @@ -25,6 +25,11 @@ #include +namespace CurrentMetrics +{ +extern const Metric DT_SnapshotOfSegmentIngest; +} // namespace CurrentMetrics + namespace DB { @@ -59,56 +64,6 @@ INSTANTIATE_TEST_CASE_P( SegmentReplaceDataTest, testing::Values(0, 37)); // Note: some tests rely on the exact value of 37. Adding arbitrary values may break test. -class SegmentReplaceDataBasicTest : public SegmentTestBasic -{ -}; - -TEST_F(SegmentReplaceDataBasicTest, ThrowWhenDMFileNotInDelegator) -try -{ - auto delegator = storage_path_pool->getStableDiskDelegator(); - auto file_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); - auto input_stream = std::make_shared(Block{}); - auto dm_file = writeIntoNewDMFile( - *dm_context, - table_columns, - input_stream, - file_id, - delegator.choosePath(), - DMFileBlockOutputStream::Flags{}); - - ASSERT_THROW({ - replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, dm_file); - }, - DB::Exception); -} -CATCH - - -TEST_F(SegmentReplaceDataBasicTest, ThrowWhenDMFileNotInPS) -try -{ - auto delegator = storage_path_pool->getStableDiskDelegator(); - auto file_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); - auto input_stream = std::make_shared(Block{}); - auto dm_file = writeIntoNewDMFile( - *dm_context, - table_columns, - input_stream, - file_id, - delegator.choosePath(), - DMFileBlockOutputStream::Flags{}); - - delegator.addDTFile(file_id, dm_file->getBytesOnDisk(), dm_file->parentPath()); - - ASSERT_THROW({ - replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, dm_file); - }, - DB::Exception); -} -CATCH - - TEST_P(SegmentReplaceDataTest, Basic) try { @@ -117,7 +72,7 @@ try ASSERT_EQ(100, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); { auto replace_block = prepareWriteBlock(/* from */ 0, /* to */ replace_to_rows); - replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, replace_block); + replaceSegmentData(DELTA_MERGE_FIRST_SEGMENT_ID, replace_block); } ASSERT_EQ(replace_to_rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); @@ -129,7 +84,7 @@ try storage_pool->log_storage_v3->gc(/* not_skip */ true); storage_pool->data_storage_v3->gc(/* not_skip */ true); ASSERT_EQ(storage_pool->log_storage_v3->getNumberOfPages(), 0); - ASSERT_EQ(storage_pool->data_storage_v3->getNumberOfPages(), 2); // 1 DMFile, 1 Ref + ASSERT_EQ(storage_pool->data_storage_v3->getNumberOfPages(), 1); // 1 DMFile PageId replaced_stable_id{}; { auto stable_page_ids = storage_pool->data_storage_v3->getAliveExternalPageIds(NAMESPACE_ID); @@ -162,7 +117,7 @@ try // Now let's replace data again. Everything in the current stable will be discarded. { auto replace_block = prepareWriteBlock(/* from */ 0, /* to */ replace_to_rows); - replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, replace_block); + replaceSegmentData(DELTA_MERGE_FIRST_SEGMENT_ID, replace_block); } ASSERT_EQ(replace_to_rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); { @@ -187,7 +142,7 @@ try ASSERT_EQ(100, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); { auto replace_block = prepareWriteBlock(/* from */ 0, /* to */ replace_to_rows); - replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, replace_block); + replaceSegmentData(DELTA_MERGE_FIRST_SEGMENT_ID, replace_block); } ASSERT_EQ(replace_to_rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); @@ -200,12 +155,15 @@ try } CATCH +class SegmentReplaceDataSimpleTest : public SegmentTestBasic +{ +}; /** * This test verify that, the DMFile will never be marked as GCable, during different segment operations. * Otherwise, the DMFile will be unsafe to be used in another replaceData. */ -TEST_F(SegmentReplaceDataBasicTest, DMFileGCIsUnchanged) +TEST_F(SegmentReplaceDataSimpleTest, DMFileGCIsUnchanged) try { WriteBatches ingest_wbs(dm_context->storage_pool, dm_context->getWriteLimiter()); @@ -225,7 +183,7 @@ try ingest_wbs.writeLogAndData(); delegator.addDTFile(file_id, dm_file->getBytesOnDisk(), dm_file->parentPath()); - replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, dm_file); + replaceSegmentData(DELTA_MERGE_FIRST_SEGMENT_ID, dm_file); ASSERT_EQ(0, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); ingest_wbs.rollbackWrittenLogAndData(); @@ -257,73 +215,14 @@ try CATCH -TEST_P(SegmentReplaceDataTest, MultipleSegmentsSharingDMFile) -try -{ - std::optional seg_right_id; - Block block{}; - - if (replace_to_rows == 0) - { - seg_right_id = splitSegmentAt(DELTA_MERGE_FIRST_SEGMENT_ID, 0); - // block is empty, split point doesn't matter. - } - else - { - seg_right_id = splitSegmentAt(DELTA_MERGE_FIRST_SEGMENT_ID, replace_to_rows - 10); /* right seg should contain 10 rows after replacing data */ - block = prepareWriteBlock(0, replace_to_rows); - } - - ASSERT_TRUE(seg_right_id.has_value()); - replaceSegmentData({*seg_right_id, DELTA_MERGE_FIRST_SEGMENT_ID}, block); - ASSERT_TRUE(areSegmentsSharingStable({*seg_right_id, DELTA_MERGE_FIRST_SEGMENT_ID})); - - UInt64 expected_left_rows, expected_right_rows; - if (replace_to_rows == 0) - { - expected_left_rows = 0; - expected_right_rows = 0; - } - else - { - expected_left_rows = replace_to_rows - 10; - expected_right_rows = 10; - } - ASSERT_EQ(expected_left_rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); - ASSERT_EQ(expected_right_rows, getSegmentRowNum(*seg_right_id)); - - // Now let's write something and perform merge delta for the right seg - writeSegment(*seg_right_id, 151); - expected_right_rows += 151; - ASSERT_EQ(expected_right_rows, getSegmentRowNumWithoutMVCC(*seg_right_id)); - flushSegmentCache(*seg_right_id); - mergeSegmentDelta(*seg_right_id); - ASSERT_EQ(expected_right_rows, getSegmentRowNumWithoutMVCC(*seg_right_id)); - // Left is not affected - ASSERT_EQ(expected_left_rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); - ASSERT_FALSE(areSegmentsSharingStable({*seg_right_id, DELTA_MERGE_FIRST_SEGMENT_ID})); - - ASSERT_TRUE(storage_pool->log_storage_v3 != nullptr); - storage_pool->data_storage_v3->gc(/* not_skip */ true); - auto stable_page_ids = storage_pool->data_storage_v3->getAliveExternalPageIds(NAMESPACE_ID); - ASSERT_EQ(2, stable_page_ids.size()); - - mergeSegment({DELTA_MERGE_FIRST_SEGMENT_ID, *seg_right_id}); - storage_pool->data_storage_v3->gc(/* not_skip */ true); - stable_page_ids = storage_pool->data_storage_v3->getAliveExternalPageIds(NAMESPACE_ID); - ASSERT_EQ(1, stable_page_ids.size()); -} -CATCH - - -TEST_F(SegmentReplaceDataBasicTest, ReplaceMultipleTimes) +TEST_F(SegmentReplaceDataSimpleTest, ReplaceMultipleTimes) try { for (size_t i = 0; i < 20; ++i) { auto rows = std::uniform_int_distribution<>(1, 100)(random); auto block = prepareWriteBlock(0, rows); - replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, block); + replaceSegmentData(DELTA_MERGE_FIRST_SEGMENT_ID, block); ASSERT_EQ(rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); // Write some rows doesn't affect our next replaceData @@ -338,51 +237,10 @@ try CATCH -TEST_P(SegmentReplaceDataTest, ReplaceSameDMFileMultipleTimes) -try -{ - auto block = prepareWriteBlock(0, replace_to_rows); - - WriteBatches ingest_wbs(dm_context->storage_pool, dm_context->getWriteLimiter()); - - auto delegator = storage_path_pool->getStableDiskDelegator(); - auto file_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); - auto input_stream = std::make_shared(block); - auto dm_file = writeIntoNewDMFile( - *dm_context, - table_columns, - input_stream, - file_id, - delegator.choosePath(), - DMFileBlockOutputStream::Flags{}); - - ingest_wbs.data.putExternal(file_id, /* tag */ 0); - ingest_wbs.writeLogAndData(); - delegator.addDTFile(file_id, dm_file->getBytesOnDisk(), dm_file->parentPath()); - - for (size_t i = 0; i < 20; ++i) - { - replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, block); - ASSERT_EQ(replace_to_rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); - // Write some rows doesn't affect our next replaceData - writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID); - } - - dm_file->enableGC(); - ingest_wbs.rollbackWrittenLogAndData(); - - ASSERT_TRUE(storage_pool->log_storage_v3 != nullptr); - storage_pool->data_storage_v3->gc(/* not_skip */ true); - auto stable_page_ids = storage_pool->data_storage_v3->getAliveExternalPageIds(NAMESPACE_ID); - ASSERT_EQ(1, stable_page_ids.size()); -} -CATCH - - /** * The out of bound data introduced by replaceData should not be seen after the merge. */ -TEST_F(SegmentReplaceDataBasicTest, ReplaceOutOfBoundAndMerge) +TEST_F(SegmentReplaceDataSimpleTest, ReplaceOutOfBoundAndMerge) try { auto seg_right_id = splitSegmentAt(DELTA_MERGE_FIRST_SEGMENT_ID, 100, Segment::SplitMode::Physical); @@ -394,7 +252,7 @@ try auto block = prepareWriteBlock(0, 300); // Only replace this block to the left seg, whose range is [-∞, 100). - replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, block); + replaceSegmentData(DELTA_MERGE_FIRST_SEGMENT_ID, block); ASSERT_EQ(100, getSegmentRowNumWithoutMVCC(DELTA_MERGE_FIRST_SEGMENT_ID)); ASSERT_EQ(10, getSegmentRowNumWithoutMVCC(*seg_right_id)); @@ -412,7 +270,7 @@ try CATCH -TEST_F(SegmentReplaceDataBasicTest, ReleaseExistingSharedDMFile) +TEST_F(SegmentReplaceDataSimpleTest, ReleaseExistingSharedDMFile) try { writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 500, /* at */ 0); @@ -436,7 +294,7 @@ try // Now let's replace one segment. auto block = prepareWriteBlock(0, 300); - replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, block); + replaceSegmentData(DELTA_MERGE_FIRST_SEGMENT_ID, block); ASSERT_EQ(100, getSegmentRowNumWithoutMVCC(DELTA_MERGE_FIRST_SEGMENT_ID)); // We should only see [0, 100) ASSERT_EQ(400, getSegmentRowNumWithoutMVCC(*seg_right_id)); @@ -450,7 +308,7 @@ try CATCH -TEST_F(SegmentReplaceDataBasicTest, ReadSnapshotBeforeReplace) +TEST_F(SegmentReplaceDataSimpleTest, ReadSnapshotBeforeReplace) try { writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 400); // 400 in stable @@ -463,7 +321,7 @@ try // Now let's replace data. auto block = prepareWriteBlock(0, 233); - replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, block); + replaceSegmentData(DELTA_MERGE_FIRST_SEGMENT_ID, block); // There is a snapshot alive, so we should have 2 stables. storage_pool->data_storage_v3->gc(/* not_skip */ true); @@ -485,6 +343,48 @@ try CATCH +TEST_F(SegmentReplaceDataSimpleTest, NewWriteInMemtableAfterSnapshot) +try +{ + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 400, /* at */ 0); // [0, 400) + auto snapshot = segments[DELTA_MERGE_FIRST_SEGMENT_ID]->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfSegmentIngest); + ASSERT_TRUE(snapshot != nullptr); + + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 200, /* at */ 300); // [300, 500) + + ASSERT_EQ(500, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); + + auto block = prepareWriteBlock(0, 233); + replaceSegmentData(DELTA_MERGE_FIRST_SEGMENT_ID, block, snapshot); + + // We should have [0, 233) and [300, 500) + ASSERT_EQ(233 + 200, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); +} +CATCH + + +TEST_F(SegmentReplaceDataSimpleTest, NewWriteInPersistedAfterSnapshot) +try +{ + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 400, /* at */ 0); // [0, 400) + auto snapshot = segments[DELTA_MERGE_FIRST_SEGMENT_ID]->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfSegmentIngest); + ASSERT_TRUE(snapshot != nullptr); + + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 200, /* at */ 300); // [300, 500) + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 50, /* at */ 220); // [220, 270) + + ASSERT_EQ(500, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); + + auto block = prepareWriteBlock(0, 233); + replaceSegmentData(DELTA_MERGE_FIRST_SEGMENT_ID, block, snapshot); + + // We should have [0, 270) and [300, 500) + ASSERT_EQ(270 + 200, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); +} +CATCH + + } // namespace tests } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index c5f9199f241..d4f7c3a7d0b 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -488,9 +488,11 @@ void SegmentTestBasic::deleteRangeSegment(PageId segment_id) EXPECT_EQ(getSegmentRowNum(segment_id), 0); } -void SegmentTestBasic::replaceSegmentData(const std::vector & segments_id, const Block & block) +void SegmentTestBasic::replaceSegmentData(PageId segment_id, const Block & block, SegmentSnapshotPtr snapshot) { - LOG_DEBUG(logger, "replace segment data using block, segments_id={} block_rows={}", fmt::join(segments_id, ","), block.rows()); + // This function always create a new DTFile for the block. + + LOG_DEBUG(logger, "replace segment data using block, segment_id={} block_rows={}", segment_id, block.rows()); auto delegator = storage_path_pool->getStableDiskDelegator(); auto parent_path = delegator.choosePath(); @@ -504,27 +506,31 @@ void SegmentTestBasic::replaceSegmentData(const std::vector & segments_i ingest_wbs.data.putExternal(file_id, /* tag */ 0); ingest_wbs.writeLogAndData(); + delegator.addDTFile(file_id, dm_file->getBytesOnDisk(), parent_path); - replaceSegmentData(segments_id, dm_file); + replaceSegmentData(segment_id, dm_file, snapshot); dm_file->enableGC(); - ingest_wbs.rollbackWrittenLogAndData(); } -void SegmentTestBasic::replaceSegmentData(const std::vector & segments_id, const DMFilePtr & file) +void SegmentTestBasic::replaceSegmentData(PageId segment_id, const DMFilePtr & file, SegmentSnapshotPtr snapshot) { - LOG_INFO(logger_op, "replaceSegmentData, segments_id={} file_rows={} file={}", fmt::join(segments_id, ","), file->getRows(), file->path()); + LOG_INFO(logger_op, "replaceSegmentData, segment_id={} file_rows={} file=dmf_{}", segment_id, file->getRows(), file->fileId()); - for (const auto segment_id : segments_id) + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); + auto segment = segments[segment_id]; { - RUNTIME_CHECK(segments.find(segment_id) != segments.end()); - auto segment = segments[segment_id]; - auto new_segment = segment->dangerouslyReplaceDataForTest(*dm_context, file); - ASSERT_TRUE(new_segment != nullptr); - segments[new_segment->segmentId()] = new_segment; + auto lock = segment->mustGetUpdateLock(); + auto new_segment = segment->replaceData(lock, *dm_context, file, snapshot); + if (new_segment != nullptr) + segments[new_segment->segmentId()] = new_segment; } - operation_statistics["replaceData"]++; + + if (snapshot != nullptr) + operation_statistics["replaceDataWithSnapshot"]++; + else + operation_statistics["replaceData"]++; } bool SegmentTestBasic::areSegmentsSharingStable(const std::vector & segments_id) const @@ -700,6 +706,7 @@ try } CATCH + } // namespace tests } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h index bc3e1da5cde..e089d5514d3 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h @@ -70,8 +70,8 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic /** * This function does not check rows. */ - void replaceSegmentData(const std::vector & segments_id, const DMFilePtr & file); - void replaceSegmentData(const std::vector & segments_id, const Block & block); + void replaceSegmentData(PageId segment_id, const DMFilePtr & file, SegmentSnapshotPtr snapshot = nullptr); + void replaceSegmentData(PageId segment_id, const Block & block, SegmentSnapshotPtr snapshot = nullptr); Block prepareWriteBlock(Int64 start_key, Int64 end_key, bool is_deleted = false); Block prepareWriteBlockInSegmentRange(PageId segment_id, UInt64 total_write_rows, std::optional write_start_key = std::nullopt, bool is_deleted = false); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_randomized.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_randomized.cpp index eb2126f107b..6ac5c476410 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_randomized.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_randomized.cpp @@ -17,6 +17,11 @@ #include +namespace CurrentMetrics +{ +extern const Metric DT_SnapshotOfSegmentIngest; +} // namespace CurrentMetrics + namespace DB { namespace DM @@ -81,7 +86,14 @@ class SegmentRandomizedTest : public SegmentTestBasic {1.0, &SegmentRandomizedTest::mergeDeltaRandomSegment}, {1.0, &SegmentRandomizedTest::flushCacheRandomSegment}, {0.5, &SegmentRandomizedTest::replaceRandomSegmentsData}, - {0.25, &SegmentRandomizedTest::writeRandomSegmentWithDeletedPack}}; + {0.25, &SegmentRandomizedTest::writeRandomSegmentWithDeletedPack}, + + // This keeps a for-update snapshot. The snapshot will be revoked before doing other updates. + {0.25, &SegmentRandomizedTest::prepareReplaceDataSnapshot}}; + + SegmentSnapshotPtr for_update_snapshot; + std::optional for_update_snapshot_segment_id; + std::optional for_update_snapshot_rows; /** * (-∞, rand_min). Hack: This segment is intentionally removed from the "segments" map to avoid being picked up. @@ -93,12 +105,41 @@ class SegmentRandomizedTest : public SegmentTestBasic */ PageId outbound_right_seg{}; + void clearReplaceDataSnapshot() + { + if (for_update_snapshot != nullptr) + { + LOG_DEBUG(logger, "cleared for_update snapshot to be used in replace data"); + for_update_snapshot = nullptr; + for_update_snapshot_segment_id = std::nullopt; + for_update_snapshot_rows = std::nullopt; + } + } + + void prepareReplaceDataSnapshot() + { + if (segments.empty()) + return; + clearReplaceDataSnapshot(); + auto segment_id = getRandomSegmentId(); + for_update_snapshot_segment_id = segment_id; + LOG_DEBUG(logger, "prepare a for_update snapshot, segment_id={}", segment_id); + for_update_snapshot = segments[segment_id]->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfSegmentIngest); + for_update_snapshot_rows = getSegmentRowNumWithoutMVCC(segment_id); + EXPECT_TRUE(for_update_snapshot != nullptr); + } + void verifySegmentsIsEmpty() { // For all segments, when isEmpty() == true, verify the result against getSegmentRowNum. for (const auto & seg_it : segments) { const auto seg_id = seg_it.first; + + if (for_update_snapshot_segment_id.has_value() && *for_update_snapshot_segment_id == seg_id) + // Skip segments when there is a buffered for_update snapshot. + continue; + if (isSegmentDefinitelyEmpty(seg_id)) { auto rows = getSegmentRowNum(seg_id); @@ -147,6 +188,7 @@ class SegmentRandomizedTest : public SegmentTestBasic // correlated actions. if (segments.size() > 10) return; + clearReplaceDataSnapshot(); auto segment_id = getRandomSegmentId(); auto split_mode = getRandomSplitMode(); LOG_DEBUG(logger, "start random split, segment_id={} mode={} all_segments={}", segment_id, magic_enum::enum_name(split_mode), segments.size()); @@ -159,6 +201,7 @@ class SegmentRandomizedTest : public SegmentTestBasic return; if (segments.size() > 10) return; + clearReplaceDataSnapshot(); auto segment_id = getRandomSegmentId(); auto split_mode = getRandomSplitMode(); const auto [start, end] = getSegmentKeyRange(segment_id); @@ -173,6 +216,7 @@ class SegmentRandomizedTest : public SegmentTestBasic { if (segments.size() < 2) return; + clearReplaceDataSnapshot(); auto segments_id = getRandomMergeableSegments(); LOG_DEBUG(logger, "start random merge, segments_id=[{}] all_segments={}", fmt::join(segments_id, ","), segments.size()); mergeSegment(segments_id); @@ -182,6 +226,7 @@ class SegmentRandomizedTest : public SegmentTestBasic { if (segments.empty()) return; + clearReplaceDataSnapshot(); PageId random_segment_id = getRandomSegmentId(); LOG_DEBUG(logger, "start random merge delta, segment_id={} all_segments={}", random_segment_id, segments.size()); mergeSegmentDelta(random_segment_id); @@ -201,61 +246,48 @@ class SegmentRandomizedTest : public SegmentTestBasic if (segments.empty()) return; - auto segments_to_pick = std::uniform_int_distribution{1, 5}(random); - std::vector segments_list; - std::map expected_data_each_segment; - for (size_t i = 0; i < segments_to_pick; ++i) - { - auto id = getRandomSegmentId(); // allow duplicate - segments_list.emplace_back(id); - expected_data_each_segment[id] = 0; - } + auto segment_id = getRandomSegmentId(); + size_t newly_written_rows_since_snapshot = 0; - auto [min_key, max_key] = getSegmentKeyRange(segments_list[0]); - for (size_t i = 1; i < segments_to_pick; ++i) + if (for_update_snapshot != nullptr) { - auto [new_min_key, new_max_key] = getSegmentKeyRange(segments_list[i]); - if (new_min_key < min_key) - min_key = new_min_key; - if (new_max_key > max_key) - max_key = new_max_key; + LOG_DEBUG(logger, "random replace segment data using an existing snapshot, snapshot_segment_id={}", *for_update_snapshot_segment_id); + segment_id = *for_update_snapshot_segment_id; + newly_written_rows_since_snapshot = getSegmentRowNumWithoutMVCC(segment_id) - *for_update_snapshot_rows; } + auto [min_key, max_key] = getSegmentKeyRange(segment_id); + + std::vector n_rows_collection{0, 10, 50, 1000}; + auto block_rows = n_rows_collection[std::uniform_int_distribution{0, n_rows_collection.size() - 1}(random)]; + + Int64 block_start_key = 0, block_end_key = 0; Block block{}; - if (max_key > min_key) + + if (block_rows > 0) { - // Now let's generate some data. - std::vector n_rows_collection{0, 10, 50, 1000}; - auto block_rows = n_rows_collection[std::uniform_int_distribution{0, n_rows_collection.size() - 1}(random)]; - if (block_rows > 0) - { - auto block_start_key = std::uniform_int_distribution{min_key, max_key - 1}(random); - auto block_end_key = block_start_key + static_cast(block_rows); - block = prepareWriteBlock(block_start_key, block_end_key); - - // How many data will we have for each segment after replacing data? It should be BlockRange ∩ SegmentRange. - for (auto segment_id : segments_list) - { - auto [seg_min_key, seg_max_key] = getSegmentKeyRange(segment_id); - auto intersect_min = std::max(seg_min_key, block_start_key); - auto intersect_max = std::min(seg_max_key, block_end_key); - if (intersect_min <= intersect_max) - { - // There is an intersection - expected_data_each_segment[segment_id] = static_cast(intersect_max - intersect_min); - } - } - } + block_start_key = std::uniform_int_distribution{min_key - 100, max_key + 100}(random); + block_end_key = block_start_key + block_rows; + block = prepareWriteBlock(block_start_key, block_end_key); } - LOG_DEBUG(logger, "start random replace segment data, segments_id={} block_rows={} all_segments={}", fmt::join(segments_list, ","), block.rows(), segments.size()); - replaceSegmentData({segments_list}, block); + LOG_DEBUG(logger, "start random replace segment data, segment_id={} block=[{}, {}) all_segments={}", segment_id, block_start_key, block_end_key, segments.size()); + replaceSegmentData(segment_id, block, for_update_snapshot); - // Verify rows. - for (auto segment_id : segments_list) + size_t data_in_segments = 0; + if (block_rows > 0) { - EXPECT_EQ(getSegmentRowNum(segment_id), expected_data_each_segment[segment_id]); + auto data_start_key = std::max(min_key, block_start_key); + auto data_end_key = std::min(max_key, block_end_key); + data_in_segments = data_end_key >= data_start_key ? data_end_key - data_start_key : 0; } + + if (for_update_snapshot == nullptr) + EXPECT_EQ(getSegmentRowNum(segment_id), data_in_segments); + else + EXPECT_EQ(getSegmentRowNumWithoutMVCC(segment_id), data_in_segments + newly_written_rows_since_snapshot); + + clearReplaceDataSnapshot(); } Segment::SplitMode getRandomSplitMode()