diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.cpp index 32f68c82343..0daf8cb4659 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.cpp @@ -97,6 +97,11 @@ ColumnFileBig * ColumnFile::tryToBigFile() return !isBigFile() ? nullptr : static_cast(this); } +ColumnFilePersisted * ColumnFile::tryToColumnFilePersisted() +{ + return !isPersisted() ? nullptr : static_cast(this); +} + template String columnFilesToString(const T & column_files) { diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h index 21b424dffce..7f150d838c2 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h @@ -36,6 +36,7 @@ class ColumnFileInMemory; class ColumnFileTiny; class ColumnFileDeleteRange; class ColumnFileBig; +class ColumnFilePersisted; class ColumnFileReader; using ColumnFileReaderPtr = std::shared_ptr; @@ -97,20 +98,24 @@ class ColumnFile virtual Type getType() const = 0; - /// Is a ColumnInMemoryFile or not. + /// Is a ColumnFileInMemory or not. bool isInMemoryFile() const { return getType() == Type::INMEMORY_FILE; } - /// Is a ColumnTinyFile or not. + /// Is a ColumnFileTiny or not. bool isTinyFile() const { return getType() == Type::TINY_FILE; } - /// Is a ColumnDeleteRangeFile or not. + /// Is a ColumnFileDeleteRange or not. bool isDeleteRange() const { return getType() == Type::DELETE_RANGE; }; - /// Is a ColumnBigFile or not. + /// Is a ColumnFileBig or not. bool isBigFile() const { return getType() == Type::BIG_FILE; }; + /// Is a ColumnFilePersisted or not + bool isPersisted() const { return getType() != Type::INMEMORY_FILE; }; ColumnFileInMemory * tryToInMemoryFile(); ColumnFileTiny * tryToTinyFile(); ColumnFileDeleteRange * tryToDeleteRange(); ColumnFileBig * tryToBigFile(); + ColumnFilePersisted * tryToColumnFilePersisted(); + virtual ColumnFileReaderPtr getReader(const DMContext & context, const StorageSnapshotPtr & storage_snap, const ColumnDefinesPtr & col_defs) const = 0; diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp index 8a69b7573e2..f605909cdd1 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp @@ -97,6 +97,8 @@ size_t DeltaValueSpace::getValidCacheRows() const void DeltaValueSpace::recordRemoveColumnFilesPages(WriteBatches & wbs) const { persisted_file_set->recordRemoveColumnFilesPages(wbs); + // there could be some persisted column files in the `mem_table_set` which should be removed. + mem_table_set->recordRemoveColumnFilesPages(wbs); } bool DeltaValueSpace::appendColumnFile(DMContext & /*context*/, const ColumnFilePtr & column_file) diff --git a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp index d339b699d8d..e94912b8a9d 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp @@ -120,6 +120,17 @@ ColumnFiles MemTableSet::cloneColumnFiles(DMContext & context, const RowKeyRange return cloned_column_files; } +void MemTableSet::recordRemoveColumnFilesPages(WriteBatches & wbs) const +{ + for (const auto & column_file : column_files) + { + if (auto * p = column_file->tryToColumnFilePersisted(); p) + { + p->removeData(wbs); + } + } +} + void MemTableSet::appendColumnFile(const ColumnFilePtr & column_file) { appendColumnFileInner(column_file); diff --git a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h index 4f0bc4f857e..2a83ed98d57 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h @@ -84,6 +84,7 @@ class MemTableSet : public std::enable_shared_from_this ColumnFiles cloneColumnFiles(DMContext & context, const RowKeyRange & target_range, WriteBatches & wbs); + void recordRemoveColumnFilesPages(WriteBatches & wbs) const; /// The following methods returning false means this operation failed, caused by other threads could have done /// some updates on this instance. E.g. this instance have been abandoned. diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 91d65b7fee2..e9a9121b64c 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -612,6 +612,8 @@ SegmentPtr Segment::mergeDelta(DMContext & dm_context, const ColumnDefinesPtr & wbs.writeLogAndData(); new_stable->enableDMFilesGC(); + SYNC_FOR("before_Segment::applyMergeDelta"); // pause without holding the lock on the segment + auto lock = mustGetUpdateLock(); auto new_segment = applyMergeDelta(dm_context, segment_snap, wbs, new_stable); @@ -702,6 +704,8 @@ SegmentPair Segment::split(DMContext & dm_context, const ColumnDefinesPtr & sche split_info.my_stable->enableDMFilesGC(); split_info.other_stable->enableDMFilesGC(); + SYNC_FOR("before_Segment::applySplit"); // pause without holding the lock on the segment + auto lock = mustGetUpdateLock(); auto segment_pair = applySplit(dm_context, segment_snap, wbs, split_info); @@ -1199,6 +1203,8 @@ SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schem wbs.writeLogAndData(); merged_stable->enableDMFilesGC(); + SYNC_FOR("before_Segment::applyMerge"); // pause without holding the lock on segments to be merged + auto left_lock = left->mustGetUpdateLock(); auto right_lock = right->mustGetUpdateLock(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp index dc43ef3713b..da3be071239 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. #include +#include +#include #include #include #include #include +#include namespace DB { @@ -28,6 +31,13 @@ class SegmentOperationTest : public SegmentTestBasic { protected: static void SetUpTestCase() {} + + void SetUp() override + { + log = DB::Logger::get("SegmentOperationTest"); + } + + DB::LoggerPtr log; }; TEST_F(SegmentOperationTest, Issue4956) @@ -83,6 +93,188 @@ try } CATCH +TEST_F(SegmentOperationTest, WriteDuringSegmentMergeDelta) +try +{ + SegmentTestOptions options; + reloadWithOptions(options); + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + { + LOG_DEBUG(log, "beginSegmentMergeDelta"); + + // Start a segment merge and suspend it before applyMerge + auto sp_seg_merge_delta_apply = SyncPointCtl::enableInScope("before_Segment::applyMergeDelta"); + auto th_seg_merge_delta = std::async([&]() { + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID, /* check_rows */ false); + }); + sp_seg_merge_delta_apply.waitAndPause(); + + LOG_DEBUG(log, "pausedBeforeApplyMergeDelta"); + + // non-flushed column files + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + ingestDTFileIntoSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + sp_seg_merge_delta_apply.next(); + th_seg_merge_delta.wait(); + + LOG_DEBUG(log, "finishApplyMergeDelta"); + } + + for (const auto & [seg_id, seg] : segments) + { + UNUSED(seg); + deleteRangeSegment(seg_id); + flushSegmentCache(seg_id); + mergeSegmentDelta(seg_id); + } + ASSERT_EQ(segments.size(), 1); + + /// make sure all column file in delta value space is deleted + ASSERT_TRUE(storage_pool->log_storage_v3 != nullptr || storage_pool->log_storage_v2 != nullptr); + if (storage_pool->log_storage_v3) + { + storage_pool->log_storage_v3->gc(/* not_skip */ true); + storage_pool->data_storage_v3->gc(/* not_skip */ true); + ASSERT_EQ(storage_pool->log_storage_v3->getNumberOfPages(), 0); + ASSERT_EQ(storage_pool->data_storage_v3->getNumberOfPages(), 1); + } + if (storage_pool->log_storage_v2) + { + storage_pool->log_storage_v2->gc(/* not_skip */ true); + storage_pool->data_storage_v2->gc(/* not_skip */ true); + ASSERT_EQ(storage_pool->log_storage_v2->getNumberOfPages(), 0); + ASSERT_EQ(storage_pool->data_storage_v2->getNumberOfPages(), 1); + } +} +CATCH + +TEST_F(SegmentOperationTest, WriteDuringSegmentSplit) +try +{ + SegmentTestOptions options; + reloadWithOptions(options); + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + { + LOG_DEBUG(log, "beginSegmentSplit"); + + // Start a segment merge and suspend it before applyMerge + auto sp_seg_split_apply = SyncPointCtl::enableInScope("before_Segment::applySplit"); + PageId new_seg_id; + auto th_seg_split = std::async([&]() { + auto new_seg_id_opt = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID, /* check_rows */ false); + ASSERT_TRUE(new_seg_id_opt.has_value()); + new_seg_id = new_seg_id_opt.value(); + }); + sp_seg_split_apply.waitAndPause(); + + LOG_DEBUG(log, "pausedBeforeApplySplit"); + + // non-flushed column files + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + ingestDTFileIntoSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + sp_seg_split_apply.next(); + th_seg_split.wait(); + + LOG_DEBUG(log, "finishApplySplit"); + mergeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, new_seg_id); + } + + for (const auto & [seg_id, seg] : segments) + { + UNUSED(seg); + deleteRangeSegment(seg_id); + flushSegmentCache(seg_id); + mergeSegmentDelta(seg_id); + } + ASSERT_EQ(segments.size(), 1); + + /// make sure all column file in delta value space is deleted + ASSERT_TRUE(storage_pool->log_storage_v3 != nullptr || storage_pool->log_storage_v2 != nullptr); + if (storage_pool->log_storage_v3) + { + storage_pool->log_storage_v3->gc(/* not_skip */ true); + storage_pool->data_storage_v3->gc(/* not_skip */ true); + ASSERT_EQ(storage_pool->log_storage_v3->getNumberOfPages(), 0); + ASSERT_EQ(storage_pool->data_storage_v3->getNumberOfPages(), 1); + } + if (storage_pool->log_storage_v2) + { + storage_pool->log_storage_v2->gc(/* not_skip */ true); + storage_pool->data_storage_v2->gc(/* not_skip */ true); + ASSERT_EQ(storage_pool->log_storage_v2->getNumberOfPages(), 0); + ASSERT_EQ(storage_pool->data_storage_v2->getNumberOfPages(), 1); + } +} +CATCH + +TEST_F(SegmentOperationTest, WriteDuringSegmentMerge) +try +{ + SegmentTestOptions options; + reloadWithOptions(options); + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + auto new_seg_id_opt = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID); + ASSERT_TRUE(new_seg_id_opt.has_value()); + auto new_seg_id = new_seg_id_opt.value(); + + { + LOG_DEBUG(log, "beginSegmentMerge"); + + // Start a segment merge and suspend it before applyMerge + auto sp_seg_merge_apply = SyncPointCtl::enableInScope("before_Segment::applyMerge"); + auto th_seg_merge = std::async([&]() { + mergeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, new_seg_id, /* check_rows */ false); + }); + sp_seg_merge_apply.waitAndPause(); + + LOG_DEBUG(log, "pausedBeforeApplyMerge"); + + // non-flushed column files + writeSegment(new_seg_id, 100); + ingestDTFileIntoSegment(new_seg_id, 100); + sp_seg_merge_apply.next(); + th_seg_merge.wait(); + + LOG_DEBUG(log, "finishApplyMerge"); + } + + for (const auto & [seg_id, seg] : segments) + { + UNUSED(seg); + deleteRangeSegment(seg_id); + flushSegmentCache(seg_id); + mergeSegmentDelta(seg_id); + } + ASSERT_EQ(segments.size(), 1); + + /// make sure all column file in delta value space is deleted + ASSERT_TRUE(storage_pool->log_storage_v3 != nullptr || storage_pool->log_storage_v2 != nullptr); + if (storage_pool->log_storage_v3) + { + storage_pool->log_storage_v3->gc(/* not_skip */ true); + storage_pool->data_storage_v3->gc(/* not_skip */ true); + ASSERT_EQ(storage_pool->log_storage_v3->getNumberOfPages(), 0); + ASSERT_EQ(storage_pool->data_storage_v3->getNumberOfPages(), 1); + } + if (storage_pool->log_storage_v2) + { + storage_pool->log_storage_v2->gc(/* not_skip */ true); + storage_pool->data_storage_v2->gc(/* not_skip */ true); + ASSERT_EQ(storage_pool->log_storage_v2->getNumberOfPages(), 0); + ASSERT_EQ(storage_pool->data_storage_v2->getNumberOfPages(), 1); + } +} +CATCH + // run in CI weekly TEST_F(SegmentOperationTest, DISABLED_TestSegmentRandomForCI) try diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index c676f2e08d5..1965053dc71 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -26,6 +27,13 @@ namespace DB { namespace DM { +extern DMFilePtr writeIntoNewDMFile(DMContext & dm_context, + const ColumnDefinesPtr & schema_snap, + const BlockInputStreamPtr & input_stream, + UInt64 file_id, + const String & parent_path, + DMFileBlockOutputStream::Flags flags); + namespace tests { void SegmentTestBasic::reloadWithOptions(SegmentTestOptions config) @@ -105,7 +113,7 @@ void SegmentTestBasic::checkSegmentRow(PageId segment_id, size_t expected_row_nu ASSERT_EQ(num_rows_read, expected_row_num); } -std::optional SegmentTestBasic::splitSegment(PageId segment_id) +std::optional SegmentTestBasic::splitSegment(PageId segment_id, bool check_rows) { auto origin_segment = segments[segment_id]; size_t origin_segment_row_num = getSegmentRowNum(segment_id); @@ -116,13 +124,16 @@ std::optional SegmentTestBasic::splitSegment(PageId segment_id) segments[new_segment->segmentId()] = new_segment; segments[segment_id] = segment; - EXPECT_EQ(origin_segment_row_num, getSegmentRowNum(segment_id) + getSegmentRowNum(new_segment->segmentId())); + if (check_rows) + { + EXPECT_EQ(origin_segment_row_num, getSegmentRowNum(segment_id) + getSegmentRowNum(new_segment->segmentId())); + } return new_segment->segmentId(); } return std::nullopt; } -void SegmentTestBasic::mergeSegment(PageId left_segment_id, PageId right_segment_id) +void SegmentTestBasic::mergeSegment(PageId left_segment_id, PageId right_segment_id, bool check_rows) { auto left_segment = segments[left_segment_id]; auto right_segment = segments[right_segment_id]; @@ -138,16 +149,22 @@ void SegmentTestBasic::mergeSegment(PageId left_segment_id, PageId right_segment { segments.erase(it); } - EXPECT_EQ(getSegmentRowNum(merged_segment->segmentId()), left_segment_row_num + right_segment_row_num); + if (check_rows) + { + EXPECT_EQ(getSegmentRowNum(merged_segment->segmentId()), left_segment_row_num + right_segment_row_num); + } } -void SegmentTestBasic::mergeSegmentDelta(PageId segment_id) +void SegmentTestBasic::mergeSegmentDelta(PageId segment_id, bool check_rows) { auto segment = segments[segment_id]; size_t segment_row_num = getSegmentRowNum(segment_id); SegmentPtr merged_segment = segment->mergeDelta(dmContext(), tableColumns()); segments[merged_segment->segmentId()] = merged_segment; - EXPECT_EQ(getSegmentRowNum(merged_segment->segmentId()), segment_row_num); + if (check_rows) + { + EXPECT_EQ(getSegmentRowNum(merged_segment->segmentId()), segment_row_num); + } } void SegmentTestBasic::flushSegmentCache(PageId segment_id) @@ -219,6 +236,77 @@ void SegmentTestBasic::writeSegment(PageId segment_id, UInt64 write_rows) EXPECT_EQ(getSegmentRowNumWithoutMVCC(segment_id), segment_row_num + write_rows); } +void SegmentTestBasic::ingestDTFileIntoSegment(PageId segment_id, UInt64 write_rows) +{ + if (write_rows == 0) + { + return; + } + + auto write_data = [&](SegmentPtr segment, const Block & block) { + WriteBatches ingest_wbs(dm_context->storage_pool, dm_context->getWriteLimiter()); + auto delegator = storage_path_pool->getStableDiskDelegator(); + auto parent_path = delegator.choosePath(); + auto file_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto input_stream = std::make_shared(block); + DMFileBlockOutputStream::Flags flags; + auto dm_file = writeIntoNewDMFile( + *dm_context, + table_columns, + input_stream, + file_id, + parent_path, + flags); + ingest_wbs.data.putExternal(file_id, /* tag */ 0); + ingest_wbs.writeLogAndData(); + delegator.addDTFile(file_id, dm_file->getBytesOnDisk(), parent_path); + { + WriteBatches wbs(dm_context->storage_pool, dm_context->getWriteLimiter()); + auto ref_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + wbs.data.putRefPage(ref_id, dm_file->pageId()); + auto ref_file = DMFile::restore(dm_context->db_context.getFileProvider(), file_id, ref_id, parent_path, DMFile::ReadMetaMode::all()); + wbs.writeLogAndData(); + auto column_file = std::make_shared(*dm_context, ref_file, segment->getRowKeyRange()); + ColumnFiles column_files; + column_files.push_back(column_file); + ASSERT_TRUE(segment->ingestColumnFiles(*dm_context, segment->getRowKeyRange(), column_files, /* clear_data_in_range */ true)); + } + ingest_wbs.rollbackWrittenLogAndData(); + }; + + auto segment = segments[segment_id]; + size_t segment_row_num = getSegmentRowNumWithoutMVCC(segment_id); + std::pair keys = getSegmentKeyRange(segment); + Int64 start_key = keys.first; + Int64 end_key = keys.second; + UInt64 remain_row_num = 0; + if (static_cast(end_key - start_key) > write_rows) + { + end_key = start_key + write_rows; + } + else + { + remain_row_num = write_rows - static_cast(end_key - start_key); + } + { + // write to segment and not flush + Block block = DMTestEnv::prepareSimpleWriteBlock(start_key, end_key, false, version, DMTestEnv::pk_name, EXTRA_HANDLE_COLUMN_ID, options.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE, options.is_common_handle); + write_data(segment, block); + LOG_FMT_TRACE(&Poco::Logger::root(), "ingest key range [{}, {})", start_key, end_key); + version++; + } + while (remain_row_num > 0) + { + UInt64 write_num = std::min(remain_row_num, static_cast(end_key - start_key)); + Block block = DMTestEnv::prepareSimpleWriteBlock(start_key, write_num + start_key, false, version, DMTestEnv::pk_name, EXTRA_HANDLE_COLUMN_ID, options.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE, options.is_common_handle); + write_data(segment, block); + remain_row_num -= write_num; + LOG_FMT_TRACE(&Poco::Logger::root(), "ingest key range [{}, {})", start_key, write_num + start_key); + version++; + } + EXPECT_EQ(getSegmentRowNumWithoutMVCC(segment_id), segment_row_num + write_rows); +} + void SegmentTestBasic::writeSegmentWithDeletedPack(PageId segment_id) { UInt64 write_rows = DEFAULT_MERGE_BLOCK_SIZE; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h index ab0c7d6d0be..03f52d36ed5 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h @@ -38,11 +38,15 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic public: void reloadWithOptions(SegmentTestOptions config); - std::optional splitSegment(PageId segment_id); - void mergeSegment(PageId left_segment_id, PageId right_segment_id); - void mergeSegmentDelta(PageId segment_id); + // When `check_rows` is true, it will compare the rows num before and after the segment update. + // So if there is some write during the segment update, it will report false failure if `check_rows` is true. + std::optional splitSegment(PageId segment_id, bool check_rows = true); + void mergeSegment(PageId left_segment_id, PageId right_segment_id, bool check_rows = true); + void mergeSegmentDelta(PageId segment_id, bool check_rows = true); + void flushSegmentCache(PageId segment_id); void writeSegment(PageId segment_id, UInt64 write_rows = 100); + void ingestDTFileIntoSegment(PageId segment_id, UInt64 write_rows = 100); void writeSegmentWithDeletedPack(PageId segment_id); void deleteRangeSegment(PageId segment_id);