diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index e321ea19118..831adbd241e 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -22,51 +22,55 @@ namespace DB { std::unordered_map> FailPointHelper::fail_point_wait_channels; -#define APPLY_FOR_FAILPOINTS_ONCE(M) \ - M(exception_between_drop_meta_and_data) \ - M(exception_between_alter_data_and_meta) \ - M(exception_drop_table_during_remove_meta) \ - M(exception_between_rename_table_data_and_metadata) \ - M(exception_between_create_database_meta_and_directory) \ - M(exception_before_rename_table_old_meta_removed) \ - M(exception_after_step_1_in_exchange_partition) \ - M(exception_before_step_2_rename_in_exchange_partition) \ - M(exception_after_step_2_in_exchange_partition) \ - M(exception_before_step_3_rename_in_exchange_partition) \ - M(exception_after_step_3_in_exchange_partition) \ - M(region_exception_after_read_from_storage_some_error) \ - M(region_exception_after_read_from_storage_all_error) \ - M(exception_before_dmfile_remove_encryption) \ - M(exception_before_dmfile_remove_from_disk) \ - M(force_enable_region_persister_compatible_mode) \ - M(force_disable_region_persister_compatible_mode) \ - M(force_triggle_background_merge_delta) \ - M(force_triggle_foreground_flush) \ - M(exception_before_mpp_register_non_root_mpp_task) \ - M(exception_before_mpp_register_tunnel_for_non_root_mpp_task) \ - M(exception_during_mpp_register_tunnel_for_non_root_mpp_task) \ - M(exception_before_mpp_non_root_task_run) \ - M(exception_during_mpp_non_root_task_run) \ - M(exception_before_mpp_register_root_mpp_task) \ - M(exception_before_mpp_register_tunnel_for_root_mpp_task) \ - M(exception_before_mpp_root_task_run) \ - M(exception_during_mpp_root_task_run) \ - M(exception_during_mpp_write_err_to_tunnel) \ - M(exception_during_mpp_close_tunnel) \ - M(exception_during_write_to_storage) \ - M(force_set_sst_to_dtfile_block_size) \ - M(force_set_sst_decode_rand) \ - M(exception_before_page_file_write_sync) \ - M(force_set_segment_ingest_packs_fail) \ - M(segment_merge_after_ingest_packs) \ - M(force_formal_page_file_not_exists) \ - M(force_legacy_or_checkpoint_page_file_exists) \ - M(exception_in_creating_set_input_stream) \ - M(exception_when_read_from_log) \ - M(exception_mpp_hash_build) \ - M(exception_before_drop_segment) \ - M(exception_after_drop_segment) \ - M(exception_between_schema_change_in_the_same_diff) +#define APPLY_FOR_FAILPOINTS_ONCE(M) \ + M(exception_between_drop_meta_and_data) \ + M(exception_between_alter_data_and_meta) \ + M(exception_drop_table_during_remove_meta) \ + M(exception_between_rename_table_data_and_metadata) \ + M(exception_between_create_database_meta_and_directory) \ + M(exception_before_rename_table_old_meta_removed) \ + M(exception_after_step_1_in_exchange_partition) \ + M(exception_before_step_2_rename_in_exchange_partition) \ + M(exception_after_step_2_in_exchange_partition) \ + M(exception_before_step_3_rename_in_exchange_partition) \ + M(exception_after_step_3_in_exchange_partition) \ + M(region_exception_after_read_from_storage_some_error) \ + M(region_exception_after_read_from_storage_all_error) \ + M(exception_before_dmfile_remove_encryption) \ + M(exception_before_dmfile_remove_from_disk) \ + M(force_enable_region_persister_compatible_mode) \ + M(force_disable_region_persister_compatible_mode) \ + M(force_triggle_background_merge_delta) \ + M(force_triggle_foreground_flush) \ + M(exception_before_mpp_register_non_root_mpp_task) \ + M(exception_before_mpp_register_tunnel_for_non_root_mpp_task) \ + M(exception_during_mpp_register_tunnel_for_non_root_mpp_task) \ + M(exception_before_mpp_non_root_task_run) \ + M(exception_during_mpp_non_root_task_run) \ + M(exception_before_mpp_register_root_mpp_task) \ + M(exception_before_mpp_register_tunnel_for_root_mpp_task) \ + M(exception_before_mpp_root_task_run) \ + M(exception_during_mpp_root_task_run) \ + M(exception_during_mpp_write_err_to_tunnel) \ + M(exception_during_mpp_close_tunnel) \ + M(exception_during_write_to_storage) \ + M(force_set_sst_to_dtfile_block_size) \ + M(force_set_sst_decode_rand) \ + M(exception_before_page_file_write_sync) \ + M(force_set_segment_ingest_packs_fail) \ + M(segment_merge_after_ingest_packs) \ + M(force_formal_page_file_not_exists) \ + M(force_legacy_or_checkpoint_page_file_exists) \ + M(exception_in_creating_set_input_stream) \ + M(exception_when_read_from_log) \ + M(exception_mpp_hash_build) \ + M(exception_before_drop_segment) \ + M(exception_after_drop_segment) \ + M(exception_between_schema_change_in_the_same_diff) \ + /* try to use logical split, could fall back to physical split */ \ + M(try_segment_logical_split) \ + /* must perform logical split, otherwise throw exception */ \ + M(force_segment_logical_split) #define APPLY_FOR_FAILPOINTS(M) \ M(skip_check_segment_update) \ diff --git a/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp b/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp index 038619f033b..48fd9fe5789 100644 --- a/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp +++ b/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp @@ -79,7 +79,7 @@ class BasicManualCompactTest storage = StorageDeltaMerge::create("TiFlash", "default" /* db_name */, "test_table" /* table_name */, - std::ref(table_info), + table_info, ColumnsDescription{columns}, astptr, 0, diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 1536508ef68..1b055800011 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -37,6 +38,7 @@ #include #include #include +#include #include #include @@ -92,6 +94,12 @@ extern const int LOGICAL_ERROR; extern const int UNKNOWN_FORMAT_VERSION; } // namespace ErrorCodes +namespace FailPoints +{ +extern const char try_segment_logical_split[]; +extern const char force_segment_logical_split[]; +} // namespace FailPoints + namespace DM { const static size_t SEGMENT_BUFFER_SIZE = 128; // More than enough. @@ -868,9 +876,16 @@ std::optional Segment::prepareSplit(DMContext & dm_context, { SYNC_FOR("before_Segment::prepareSplit"); - if (!dm_context.enable_logical_split // - || segment_snap->stable->getPacks() <= 3 // - || segment_snap->delta->getRows() > segment_snap->stable->getRows()) + bool try_logical_split = dm_context.enable_logical_split // + && segment_snap->stable->getPacks() > 3 // + && segment_snap->delta->getRows() <= segment_snap->stable->getRows(); +#ifdef FIU_ENABLE + bool force_logical_split = false; + fiu_do_on(FailPoints::try_segment_logical_split, { try_logical_split = true; }); + fiu_do_on(FailPoints::force_segment_logical_split, { try_logical_split = true; force_logical_split = true; }); +#endif + + if (!try_logical_split) { return prepareSplitPhysical(dm_context, schema_snap, segment_snap, wbs); } @@ -887,6 +902,9 @@ std::optional Segment::prepareSplit(DMContext & dm_context, "Got bad split point [{}] for segment {}, fall back to split physical.", (split_point_opt.has_value() ? split_point_opt->toRowKeyValueRef().toDebugString() : "no value"), info()); +#ifdef FIU_ENABLE + RUNTIME_CHECK(!force_logical_split, Exception, "Can not perform logical split while failpoint `force_segment_logical_split` is true"); +#endif return prepareSplitPhysical(dm_context, schema_snap, segment_snap, wbs); } else @@ -1173,6 +1191,8 @@ SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schem wbs.writeLogAndData(); merged_stable->enableDMFilesGC(); + SYNC_FOR("before_Segment::applyMerge"); // pause without holding the lock on segments to be merged + auto left_lock = left->mustGetUpdateLock(); auto right_lock = right->mustGetUpdateLock(); diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool.cpp index 9db1c10bc48..08c38659636 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -127,6 +128,11 @@ void GlobalStoragePool::restore() false); } +FileUsageStatistics GlobalStoragePool::getLogFileUsage() const +{ + return log_storage->getFileUsageStatistics(); +} + bool GlobalStoragePool::gc() { return gc(global_context.getSettingsRef(), true, DELTA_MERGE_GC_PERIOD); diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.h b/dbms/src/Storages/DeltaMerge/StoragePool.h index 77684ea46cb..f3dde1b9c6e 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.h +++ b/dbms/src/Storages/DeltaMerge/StoragePool.h @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -57,6 +58,8 @@ class GlobalStoragePool : private boost::noncopyable // Only used on dbgFuncMisc bool gc(); + FileUsageStatistics getLogFileUsage() const; + private: bool gc(const Settings & settings, bool immediately = false, const Seconds & try_gc_period = DELTA_MERGE_GC_PERIOD); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp index 1c68ba3bb2a..e02bc5eb096 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp @@ -12,14 +12,27 @@ // See the License for the specific language governing permissions and // limitations under the License. #include +#include +#include +#include #include #include #include #include +#include +#include +#include namespace DB { + +namespace FailPoints +{ +extern const char try_segment_logical_split[]; +extern const char force_segment_logical_split[]; +} // namespace FailPoints + namespace DM { namespace tests @@ -28,6 +41,12 @@ class SegmentOperationTest : public SegmentTestBasic { protected: static void SetUpTestCase() {} + void SetUp() override + { + log = DB::Logger::get("SegmentOperationTest"); + } + + DB::LoggerPtr log; }; TEST_F(SegmentOperationTest, Issue4956) @@ -81,6 +100,194 @@ try randomSegmentTest(100); } CATCH + +// run in CI weekly +TEST_F(SegmentOperationTest, DISABLED_TestSegmentRandomForCI) +try +{ + srand(time(nullptr)); + SegmentTestOptions options; + options.is_common_handle = true; + reloadWithOptions(options); + randomSegmentTest(10000); +} +CATCH + +TEST_F(SegmentOperationTest, SegmentLogicalSplit) +try +{ + { + SegmentTestOptions options; + options.db_settings.dt_segment_stable_pack_rows = 100; + reloadWithOptions(options); + } + + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + // non flushed pack before split, should be ref in new splitted segments + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + FailPointHelper::enableFailPoint(FailPoints::force_segment_logical_split); + auto new_seg_id_opt = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID); + ASSERT_TRUE(new_seg_id_opt.has_value()); + + + for (size_t test_round = 0; test_round < 20; ++test_round) + { + // try further logical split + auto rand_seg_id = getRandomSegmentId(); + auto seg_nrows = getSegmentRowNum(rand_seg_id); + LOG_FMT_TRACE(&Poco::Logger::root(), "test_round={} seg={} nrows={}", test_round, rand_seg_id, seg_nrows); + writeSegment(rand_seg_id, 150); + flushSegmentCache(rand_seg_id); + + FailPointHelper::enableFailPoint(FailPoints::try_segment_logical_split); + splitSegment(rand_seg_id); + } +} +CATCH + +TEST_F(SegmentOperationTest, Issue5570) +try +{ + { + SegmentTestOptions options; + // a smaller pack rows for logical split + options.db_settings.dt_segment_stable_pack_rows = 100; + reloadWithOptions(options); + } + + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + FailPointHelper::enableFailPoint(FailPoints::force_segment_logical_split); + auto new_seg_id_opt = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID); + ASSERT_TRUE(new_seg_id_opt.has_value()); + auto new_seg_id = new_seg_id_opt.value(); + + LOG_DEBUG(log, "beginSegmentMerge"); + + // Start a segment merge and suspend it before applyMerge + auto sp_seg_merge_apply = SyncPointCtl::enableInScope("before_Segment::applyMerge"); + auto th_seg_merge = std::async([&]() { + mergeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, new_seg_id, /*check_rows=*/false); + }); + sp_seg_merge_apply.waitAndPause(); + LOG_DEBUG(log, "pausedBeforeApplyMerge"); + + // flushed pack + writeSegment(new_seg_id, 100); + flushSegmentCache(new_seg_id); + + // Finish the segment merge + LOG_DEBUG(log, "continueApplyMerge"); + sp_seg_merge_apply.next(); + th_seg_merge.wait(); + LOG_DEBUG(log, "finishApplyMerge"); + + // logical split + FailPointHelper::enableFailPoint(FailPoints::force_segment_logical_split); + auto new_seg_id2_opt = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID); + ASSERT_TRUE(new_seg_id2_opt.has_value()); + auto new_seg_id2 = new_seg_id2_opt.value(); + + { + // further logical split on the left + FailPointHelper::enableFailPoint(FailPoints::force_segment_logical_split); + auto further_seg_id_opt = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID); + ASSERT_TRUE(further_seg_id_opt.has_value()); + } + + { + // further logical split on the right(it fall back to physical split cause by current + // implement of getSplitPointFast) + FailPointHelper::enableFailPoint(FailPoints::try_segment_logical_split); + auto further_seg_id_opt = splitSegment(new_seg_id2); + ASSERT_TRUE(further_seg_id_opt.has_value()); + } +} +CATCH + + +TEST_F(SegmentOperationTest, Issue5570Case2) +try +{ + { + SegmentTestOptions options; + // a smaller pack rows for logical split + options.db_settings.dt_segment_stable_pack_rows = 100; + reloadWithOptions(options); + } + + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + FailPointHelper::enableFailPoint(FailPoints::force_segment_logical_split); + auto new_seg_id_opt = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID); + ASSERT_TRUE(new_seg_id_opt.has_value()); + auto new_seg_id = new_seg_id_opt.value(); + + const auto storage_pool = db_context->getGlobalContext().getGlobalStoragePool(); + for (size_t round = 0; round < 50; ++round) + { + LOG_DEBUG(log, "beginSegmentMerge"); + + // Start a segment merge and suspend it before applyMerge + auto sp_seg_merge_apply = SyncPointCtl::enableInScope("before_Segment::applyMerge"); + auto th_seg_merge = std::async([&]() { + mergeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, new_seg_id, /*check_rows=*/false); + }); + sp_seg_merge_apply.waitAndPause(); + LOG_DEBUG(log, "pausedBeforeApplyMerge"); + + // non-flushed pack + writeSegment(new_seg_id, 100); + // flushSegmentCache(new_seg_id); // do not flush + + // Finish the segment merge + LOG_DEBUG(log, "continueApplyMerge"); + sp_seg_merge_apply.next(); + th_seg_merge.wait(); + LOG_DEBUG(log, "finishApplyMerge"); + + // logical split + FailPointHelper::enableFailPoint(FailPoints::try_segment_logical_split); + auto new_seg_id2_opt = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID); + ASSERT_TRUE(new_seg_id2_opt.has_value()); + new_seg_id = new_seg_id2_opt.value(); + + if (storage_pool) + { + const auto file_usage = storage_pool->getLogFileUsage(); + LOG_FMT_DEBUG(log, "log valid size: {}", file_usage.total_valid_size); + } + } + for (const auto & [seg_id, seg] : segments) + { + UNUSED(seg); + deleteRangeSegment(seg_id); + flushSegmentCache(seg_id); + mergeSegmentDelta(seg_id); + } + // TODO: make it compatible run under ps v2 + if (storage_pool) + { + storage_pool->gc(); + const auto file_usage = storage_pool->getLogFileUsage(); + LOG_FMT_DEBUG(log, "all removed, file usage: {}", file_usage.total_valid_size); // should be 0 + } +} +CATCH + + } // namespace tests } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index c676f2e08d5..73c99815934 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -26,6 +27,13 @@ namespace DB { namespace DM { +extern DMFilePtr writeIntoNewDMFile(DMContext & dm_context, + const ColumnDefinesPtr & schema_snap, + const BlockInputStreamPtr & input_stream, + UInt64 file_id, + const String & parent_path, + DMFileBlockOutputStream::Flags flags); + namespace tests { void SegmentTestBasic::reloadWithOptions(SegmentTestOptions config) @@ -34,7 +42,7 @@ void SegmentTestBasic::reloadWithOptions(SegmentTestOptions config) options = config; table_columns = std::make_shared(); - root_segment = reload(config.is_common_handle); + root_segment = reload(config.is_common_handle, nullptr, std::move(config.db_settings)); ASSERT_EQ(root_segment->segmentId(), DELTA_MERGE_FIRST_SEGMENT_ID); segments.clear(); segments[DELTA_MERGE_FIRST_SEGMENT_ID] = root_segment; @@ -105,7 +113,7 @@ void SegmentTestBasic::checkSegmentRow(PageId segment_id, size_t expected_row_nu ASSERT_EQ(num_rows_read, expected_row_num); } -std::optional SegmentTestBasic::splitSegment(PageId segment_id) +std::optional SegmentTestBasic::splitSegment(PageId segment_id, bool check_rows) { auto origin_segment = segments[segment_id]; size_t origin_segment_row_num = getSegmentRowNum(segment_id); @@ -116,13 +124,16 @@ std::optional SegmentTestBasic::splitSegment(PageId segment_id) segments[new_segment->segmentId()] = new_segment; segments[segment_id] = segment; - EXPECT_EQ(origin_segment_row_num, getSegmentRowNum(segment_id) + getSegmentRowNum(new_segment->segmentId())); + if (check_rows) + { + EXPECT_EQ(origin_segment_row_num, getSegmentRowNum(segment_id) + getSegmentRowNum(new_segment->segmentId())); + } return new_segment->segmentId(); } return std::nullopt; } -void SegmentTestBasic::mergeSegment(PageId left_segment_id, PageId right_segment_id) +void SegmentTestBasic::mergeSegment(PageId left_segment_id, PageId right_segment_id, bool check_rows) { auto left_segment = segments[left_segment_id]; auto right_segment = segments[right_segment_id]; @@ -138,16 +149,22 @@ void SegmentTestBasic::mergeSegment(PageId left_segment_id, PageId right_segment { segments.erase(it); } - EXPECT_EQ(getSegmentRowNum(merged_segment->segmentId()), left_segment_row_num + right_segment_row_num); + if (check_rows) + { + EXPECT_EQ(getSegmentRowNum(merged_segment->segmentId()), left_segment_row_num + right_segment_row_num); + } } -void SegmentTestBasic::mergeSegmentDelta(PageId segment_id) +void SegmentTestBasic::mergeSegmentDelta(PageId segment_id, bool check_rows) { auto segment = segments[segment_id]; size_t segment_row_num = getSegmentRowNum(segment_id); SegmentPtr merged_segment = segment->mergeDelta(dmContext(), tableColumns()); segments[merged_segment->segmentId()] = merged_segment; - EXPECT_EQ(getSegmentRowNum(merged_segment->segmentId()), segment_row_num); + if (check_rows) + { + EXPECT_EQ(getSegmentRowNum(merged_segment->segmentId()), segment_row_num); + } } void SegmentTestBasic::flushSegmentCache(PageId segment_id) @@ -219,6 +236,77 @@ void SegmentTestBasic::writeSegment(PageId segment_id, UInt64 write_rows) EXPECT_EQ(getSegmentRowNumWithoutMVCC(segment_id), segment_row_num + write_rows); } +void SegmentTestBasic::ingestDTFileIntoSegment(PageId segment_id, UInt64 write_rows) +{ + if (write_rows == 0) + { + return; + } + + auto write_data = [&](SegmentPtr segment, const Block & block) { + WriteBatches ingest_wbs(dm_context->storage_pool, dm_context->getWriteLimiter()); + auto delegator = storage_path_pool->getStableDiskDelegator(); + auto parent_path = delegator.choosePath(); + auto file_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto input_stream = std::make_shared(block); + DMFileBlockOutputStream::Flags flags; + auto dm_file = writeIntoNewDMFile( + *dm_context, + table_columns, + input_stream, + file_id, + parent_path, + flags); + ingest_wbs.data.putExternal(file_id, /* tag */ 0); + ingest_wbs.writeLogAndData(); + delegator.addDTFile(file_id, dm_file->getBytesOnDisk(), parent_path); + { + WriteBatches wbs(dm_context->storage_pool, dm_context->getWriteLimiter()); + auto ref_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + wbs.data.putRefPage(ref_id, dm_file->pageId()); + auto ref_file = DMFile::restore(dm_context->db_context.getFileProvider(), file_id, ref_id, parent_path, DMFile::ReadMetaMode::all()); + wbs.writeLogAndData(); + auto column_file = std::make_shared(*dm_context, ref_file, segment->getRowKeyRange()); + ColumnFiles column_files; + column_files.push_back(column_file); + ASSERT_TRUE(segment->ingestColumnFiles(*dm_context, segment->getRowKeyRange(), column_files, /* clear_data_in_range */ true)); + } + ingest_wbs.rollbackWrittenLogAndData(); + }; + + auto segment = segments[segment_id]; + size_t segment_row_num = getSegmentRowNumWithoutMVCC(segment_id); + std::pair keys = getSegmentKeyRange(segment); + Int64 start_key = keys.first; + Int64 end_key = keys.second; + UInt64 remain_row_num = 0; + if (static_cast(end_key - start_key) > write_rows) + { + end_key = start_key + write_rows; + } + else + { + remain_row_num = write_rows - static_cast(end_key - start_key); + } + { + // write to segment and not flush + Block block = DMTestEnv::prepareSimpleWriteBlock(start_key, end_key, false, version, DMTestEnv::pk_name, EXTRA_HANDLE_COLUMN_ID, options.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE, options.is_common_handle); + write_data(segment, block); + LOG_FMT_TRACE(&Poco::Logger::root(), "ingest key range [{}, {})", start_key, end_key); + version++; + } + while (remain_row_num > 0) + { + UInt64 write_num = std::min(remain_row_num, static_cast(end_key - start_key)); + Block block = DMTestEnv::prepareSimpleWriteBlock(start_key, write_num + start_key, false, version, DMTestEnv::pk_name, EXTRA_HANDLE_COLUMN_ID, options.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE, options.is_common_handle); + write_data(segment, block); + remain_row_num -= write_num; + LOG_FMT_TRACE(&Poco::Logger::root(), "ingest key range [{}, {})", start_key, write_num + start_key); + version++; + } + EXPECT_EQ(getSegmentRowNumWithoutMVCC(segment_id), segment_row_num + write_rows); +} + void SegmentTestBasic::writeSegmentWithDeletedPack(PageId segment_id) { UInt64 write_rows = DEFAULT_MERGE_BLOCK_SIZE; @@ -381,7 +469,7 @@ std::pair SegmentTestBasic::getRandomMergeablePair() } } -RowKeyRange SegmentTestBasic::commanHandleKeyRange() +RowKeyRange SegmentTestBasic::commonHandleKeyRange() { String start_key, end_key; { @@ -408,7 +496,7 @@ SegmentPtr SegmentTestBasic::reload(bool is_common_handle, const ColumnDefinesPt ColumnDefinesPtr cols = (!pre_define_columns) ? DMTestEnv::getDefaultColumns(is_common_handle ? DMTestEnv::PkType::CommonHandle : DMTestEnv::PkType::HiddenTiDBRowID) : pre_define_columns; setColumns(cols); - return Segment::newSegment(*dm_context, table_columns, is_common_handle ? commanHandleKeyRange() : RowKeyRange::newAll(is_common_handle, 1), storage_pool->newMetaPageId(), 0); + return Segment::newSegment(*dm_context, table_columns, is_common_handle ? commonHandleKeyRange() : RowKeyRange::newAll(is_common_handle, 1), storage_pool->newMetaPageId(), 0); } void SegmentTestBasic::setColumns(const ColumnDefinesPtr & columns) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h index ab0c7d6d0be..a9b12ce7e2e 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h @@ -13,6 +13,7 @@ // limitations under the License. #pragma once +#include #include #include #include @@ -33,16 +34,21 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic struct SegmentTestOptions { bool is_common_handle = false; + DB::Settings db_settings; }; public: void reloadWithOptions(SegmentTestOptions config); - std::optional splitSegment(PageId segment_id); - void mergeSegment(PageId left_segment_id, PageId right_segment_id); - void mergeSegmentDelta(PageId segment_id); + // When `check_rows` is true, it will compare the rows num before and after the segment update. + // So if there is some write during the segment update, it will report false failure if `check_rows` is true. + std::optional splitSegment(PageId segment_id, bool check_rows = true); + void mergeSegment(PageId left_segment_id, PageId right_segment_id, bool check_rows = true); + void mergeSegmentDelta(PageId segment_id, bool check_rows = true); + void flushSegmentCache(PageId segment_id); void writeSegment(PageId segment_id, UInt64 write_rows = 100); + void ingestDTFileIntoSegment(PageId segment_id, UInt64 write_rows = 100); void writeSegmentWithDeletedPack(PageId segment_id); void deleteRangeSegment(PageId segment_id); @@ -94,9 +100,9 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic std::pair getRandomMergeablePair(); - RowKeyRange commanHandleKeyRange(); + RowKeyRange commonHandleKeyRange(); - SegmentPtr reload(bool is_common_handle, const ColumnDefinesPtr & pre_define_columns = {}, DB::Settings && db_settings = DB::Settings()); + SegmentPtr reload(bool is_common_handle, const ColumnDefinesPtr & pre_define_columns, DB::Settings && db_settings); // setColumns should update dm_context at the same time void setColumns(const ColumnDefinesPtr & columns); @@ -120,4 +126,4 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic }; } // namespace tests } // namespace DM -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index 0397f10dbc1..2366a7c410a 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -238,7 +238,7 @@ bool VersionedPageEntries::createNewRef(const PageVersion & ver, PageIdV3Interna // apply a ref to same ori id with small ver, just ignore return false; } - // else adding ref to another ori id is not allow, just fallover + // else adding ref to another ori id is not allow, just fallthrough } else { @@ -247,7 +247,7 @@ bool VersionedPageEntries::createNewRef(const PageVersion & ver, PageIdV3Interna // adding ref to the same ori id should be idempotent, just ignore return false; } - // else adding ref to another ori id is not allow, just fallover + // else adding ref to another ori id is not allow, just fallthrough } } @@ -294,7 +294,7 @@ std::shared_ptr VersionedPageEntries::fromRestored(const PageE } std::tuple -VersionedPageEntries::resolveToPageId(UInt64 seq, bool check_prev, PageEntryV3 * entry) +VersionedPageEntries::resolveToPageId(UInt64 seq, bool ignore_delete, PageEntryV3 * entry) { auto page_lock = acquireLock(); if (type == EditRecordType::VAR_ENTRY) @@ -303,31 +303,36 @@ VersionedPageEntries::resolveToPageId(UInt64 seq, bool check_prev, PageEntryV3 * if (auto iter = MapUtils::findLess(entries, PageVersion(seq + 1)); iter != entries.end()) { - // If we applied write batches like this: [ver=1]{put 10}, [ver=2]{ref 11->10, del 10} - // then by ver=2, we should not able to read 10, but able to read 11 (resolving 11 ref to 10). - // when resolving 11 to 10, we need to set `check_prev` to true - if (iter->second.isDelete() && check_prev && iter->first.sequence == seq) + if (!ignore_delete && iter->second.isDelete()) { - if (iter == entries.begin()) - { - return {RESOLVE_FAIL, buildV3Id(0, 0), PageVersion(0)}; - } - --iter; - // fallover the check the prev item + // the page is not visible + return {RESOLVE_FAIL, buildV3Id(0, 0), PageVersion(0)}; } + // If `ignore_delete` is true, we need the page entry even if it is logical deleted. + // Checkout the details in `PageDirectory::get`. + + // Ignore all "delete" + while (iter != entries.begin() && iter->second.isDelete()) + { + --iter; + } + // Then `iter` point to an entry or the `entries.begin()`, return if entry found if (iter->second.isEntry()) { + // copy and return the entry if (entry != nullptr) *entry = iter->second.entry; return {RESOLVE_TO_NORMAL, buildV3Id(0, 0), PageVersion(0)}; - } // fallover to FAIL - } + } + // else fallthrough to FAIL + } // else fallthrough to FAIL } else if (type == EditRecordType::VAR_EXTERNAL) { - // We may add reference to an external id even if it is logically deleted. - bool ok = check_prev ? true : (!is_deleted || seq < delete_ver.sequence); + // If `ignore_delete` is true, we need the origin page id even if it is logical deleted. + // Checkout the details in `PageDirectory::getNormalPageId`. + bool ok = ignore_delete || (!is_deleted || seq < delete_ver.sequence); if (create_ver.sequence <= seq && ok) { return {RESOLVE_TO_NORMAL, buildV3Id(0, 0), PageVersion(0)}; @@ -335,6 +340,7 @@ VersionedPageEntries::resolveToPageId(UInt64 seq, bool check_prev, PageEntryV3 * } else if (type == EditRecordType::VAR_REF) { + // Return the origin page id if this ref is visible by `seq`. if (create_ver.sequence <= seq && (!is_deleted || seq < delete_ver.sequence)) { return {RESOLVE_TO_REF, ori_page_id, create_ver}; @@ -342,7 +348,7 @@ VersionedPageEntries::resolveToPageId(UInt64 seq, bool check_prev, PageEntryV3 * } else { - LOG_FMT_WARNING(&Poco::Logger::get("VersionedPageEntries"), "Can't reslove the EditRecordType {}", type); + LOG_FMT_WARNING(&Poco::Logger::get("VersionedPageEntries"), "Can't resolve the EditRecordType {}", type); } return {RESOLVE_FAIL, buildV3Id(0, 0), PageVersion(0)}; @@ -425,11 +431,23 @@ Int64 VersionedPageEntries::incrRefCount(const PageVersion & ver) if (auto iter = MapUtils::findMutLess(entries, PageVersion(ver.sequence + 1)); iter != entries.end()) { + // ignore all "delete" + bool met_delete = false; + while (iter != entries.begin() && iter->second.isDelete()) + { + met_delete = true; + --iter; + } + // Then `iter` point to an entry or the `entries.begin()`, return if entry found if (iter->second.isEntry()) + { + if (unlikely(met_delete && iter->second.being_ref_count == 1)) + { + throw Exception(fmt::format("Try to add ref to a completely deleted entry [entry={}] [ver={}]", iter->second.toDebugString(), ver), ErrorCodes::LOGICAL_ERROR); + } return ++iter->second.being_ref_count; - else - throw Exception(fmt::format("The entry to be added ref count is not normal entry [entry={}] [ver={}]", iter->second.toDebugString(), ver)); - } + } + } // fallthrough to FAIL } else if (type == EditRecordType::VAR_EXTERNAL) { @@ -439,7 +457,7 @@ Int64 VersionedPageEntries::incrRefCount(const PageVersion & ver) return ++being_ref_count; } } - throw Exception(fmt::format("The entry to be added ref count is not found [ver={}]", ver)); + throw Exception(fmt::format("The entry to be added ref count is not found [ver={}] [state={}]", ver, toDebugString()), ErrorCodes::LOGICAL_ERROR); } PageSize VersionedPageEntries::getEntriesByBlobIds( @@ -478,7 +496,7 @@ PageSize VersionedPageEntries::getEntriesByBlobIds( bool VersionedPageEntries::cleanOutdatedEntries( UInt64 lowest_seq, std::map> * normal_entries_to_deref, - PageEntriesV3 & entries_removed, + PageEntriesV3 * entries_removed, const PageLock & /*page_lock*/, bool keep_last_valid_var_entry) { @@ -488,9 +506,10 @@ bool VersionedPageEntries::cleanOutdatedEntries( } else if (type == EditRecordType::VAR_REF) { + // still visible by `lowest_seq` if (!is_deleted || lowest_seq < delete_ver.sequence) return false; - + // Else this ref page is safe to be deleted. if (normal_entries_to_deref != nullptr) { // need to decrease the ref count by second.origin_page_id, ver=iter->first, num=1> @@ -508,10 +527,11 @@ bool VersionedPageEntries::cleanOutdatedEntries( } else if (type != EditRecordType::VAR_ENTRY) { - throw Exception("Invalid state"); + throw Exception(fmt::format("Invalid state {}", toDebugString()), ErrorCodes::LOGICAL_ERROR); } // type == EditRecordType::VAR_ENTRY + assert(type == EditRecordType::VAR_ENTRY); if (entries.empty()) { return true; @@ -525,7 +545,7 @@ bool VersionedPageEntries::cleanOutdatedEntries( return false; } - // If the first version less than is entry / external, + // If the first version less than is entry, // then we can remove those entries prev of it. // If the first version less than is delete, // we may keep the first valid entry before the delete entry in the following case: @@ -539,7 +559,7 @@ bool VersionedPageEntries::cleanOutdatedEntries( { if (iter->second.isDelete()) { - // Already deleted + // a useless version, simply drop it iter = entries.erase(iter); } else if (iter->second.isEntry()) @@ -548,7 +568,10 @@ bool VersionedPageEntries::cleanOutdatedEntries( { if (!keep_last_valid_var_entry && iter->second.being_ref_count == 1) { - entries_removed.emplace_back(iter->second.entry); + if (entries_removed) + { + entries_removed->emplace_back(iter->second.entry); + } iter = entries.erase(iter); } // The `being_ref_count` for this version is valid. While for older versions, @@ -558,7 +581,10 @@ bool VersionedPageEntries::cleanOutdatedEntries( else { // else there are newer "entry" in the version list, the outdated entries should be removed - entries_removed.emplace_back(iter->second.entry); + if (entries_removed) + { + entries_removed->emplace_back(iter->second.entry); + } iter = entries.erase(iter); } } @@ -571,7 +597,7 @@ bool VersionedPageEntries::cleanOutdatedEntries( return entries.empty() || (entries.size() == 1 && entries.begin()->second.isDelete()); } -bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal page_id, const PageVersion & deref_ver, const Int64 deref_count, PageEntriesV3 & entries_removed, bool keep_last_valid_var_entry) +bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal page_id, const PageVersion & deref_ver, const Int64 deref_count, PageEntriesV3 * entries_removed, bool keep_last_valid_var_entry) { auto page_lock = acquireLock(); if (type == EditRecordType::VAR_EXTERNAL) @@ -592,14 +618,18 @@ bool VersionedPageEntries::derefAndClean(UInt64 lowest_seq, PageIdV3Internal pag { throw Exception(fmt::format("Can not find entry for decreasing ref count [page_id={}] [ver={}] [deref_count={}]", page_id, deref_ver, deref_count)); } - if (iter->second.isDelete()) + // ignore all "delete" + while (iter != entries.begin() && iter->second.isDelete()) { - if (iter == entries.begin()) - { - throw Exception(fmt::format("Can not find entry for decreasing ref count [page_id={}] [ver={}] [deref_count={}]", page_id, deref_ver, deref_count)); - } --iter; // move to the previous entry } + // Then `iter` point to an entry or the `entries.begin()` + if (iter->second.isDelete()) + { + // run into the begin of `entries`, but still can not find a valid entry to decrease the ref-count + throw Exception(fmt::format("Can not find entry for decreasing ref count till the begin [page_id={}] [ver={}] [deref_count={}]", page_id, deref_ver, deref_count)); + } + assert(iter->second.isEntry()); if (iter->second.being_ref_count <= deref_count) { throw Exception(fmt::format("Decreasing ref count error [page_id={}] [ver={}] [deref_count={}] [entry={}]", page_id, deref_ver, deref_count, iter->second.toDebugString())); @@ -765,6 +795,34 @@ PageIDAndEntryV3 PageDirectory::get(PageIdV3Internal page_id, const PageDirector { PageEntryV3 entry_got; + // After two write batches applied: [ver=1]{put 10}, [ver=2]{ref 11->10, del 10}, the `mvcc_table_directory` is: + // { + // "10": [ + // { + // "type": "entry", + // "create_ver": 1, + // "being_ref_count": 2, // being ref by id 11 + // "entry": "..some offset to blob file" // mark as "entryX" + // }, + // { + // "type": "delete", + // "delete_ver": 2, + // }, + // ], + // "11": { + // "type": "ref", + // "ori_page_id": 10, + // "create_ver": 2, + // }, + // } + // + // When accessing by a snapshot with seq=2, we should not get the page 10, but can get the page 11. + // In order to achieve this behavior, when calling `get` with page_id=10 and snapshot seq=2, first + // call `resolveToPageId` with `ignore_delete=false` and return invalid. + // When calling `get` with page_id=11 and snapshot seq=2, first call `resolveToPageId` and need further + // resolve ref id 11 to 10 with seq=2, and continue to ignore all "delete"s in the version chain in + // page 10 until we find the "entryX". + PageIdV3Internal id_to_resolve = page_id; PageVersion ver_to_resolve(snap->sequence, 0); bool ok = true; @@ -791,8 +849,8 @@ PageIDAndEntryV3 PageDirectory::get(PageIdV3Internal page_id, const PageDirector } } } - auto [need_collapse, next_id_to_resolve, next_ver_to_resolve] = iter->second->resolveToPageId(ver_to_resolve.sequence, id_to_resolve != page_id, &entry_got); - switch (need_collapse) + auto [resolve_state, next_id_to_resolve, next_ver_to_resolve] = iter->second->resolveToPageId(ver_to_resolve.sequence, /*ignore_delete=*/id_to_resolve != page_id, &entry_got); + switch (resolve_state) { case VersionedPageEntries::RESOLVE_TO_NORMAL: return PageIDAndEntryV3(page_id, entry_got); @@ -851,8 +909,8 @@ std::pair PageDirectory::get(const PageIdV3Internal } } } - auto [need_collapse, next_id_to_resolve, next_ver_to_resolve] = iter->second->resolveToPageId(ver_to_resolve.sequence, id_to_resolve != page_id, &entry_got); - switch (need_collapse) + auto [resolve_state, next_id_to_resolve, next_ver_to_resolve] = iter->second->resolveToPageId(ver_to_resolve.sequence, /*ignore_delete=*/id_to_resolve != page_id, &entry_got); + switch (resolve_state) { case VersionedPageEntries::RESOLVE_TO_NORMAL: return true; @@ -920,8 +978,8 @@ PageIdV3Internal PageDirectory::getNormalPageId(PageIdV3Internal page_id, const } } } - auto [need_collapse, next_id_to_resolve, next_ver_to_resolve] = iter->second->resolveToPageId(ver_to_resolve.sequence, id_to_resolve != page_id, nullptr); - switch (need_collapse) + auto [resolve_state, next_id_to_resolve, next_ver_to_resolve] = iter->second->resolveToPageId(ver_to_resolve.sequence, /*ignore_delete=*/id_to_resolve != page_id, nullptr); + switch (resolve_state) { case VersionedPageEntries::RESOLVE_TO_NORMAL: return id_to_resolve; @@ -982,9 +1040,38 @@ void PageDirectory::applyRefEditRecord( const PageEntriesEdit::EditRecord & rec, const PageVersion & version) { - // applying ref 3->2, existing ref 2->1, normal entry 1, then we should collapse - // the ref to be 3->1, increase the refcounting of normal entry 1 - auto [resolve_success, resolved_id, resolved_ver] = [&mvcc_table_directory](PageIdV3Internal id_to_resolve, PageVersion ver_to_resolve) + // Assume the `mvcc_table_directory` is: + // { + // "10": [ + // { + // "type": "entry", + // "create_ver": 1, + // "being_ref_count": 2, // being ref by id 11 + // "entry": "..some offset to blob file" // mark as "entryX" + // }, + // { + // "type": "delete", + // "delete_ver": 3, + // }, + // ], + // "11": { + // "type": "ref", + // "ori_page_id": 10, + // "create_ver": 2, + // }, + // } + // + // When we need to create a new ref 12->11, first call `resolveToPageId` with `ignore_delete=false` + // and further resolve ref id 11 to 10. Then we will call `resolveToPageId` with `ignore_delete=true` + // to ignore the "delete"s. + // Finally, we will collapse the ref chain to create a "ref 12 -> 10" instead of "ref 12 -> 11 -> 10" + // in memory and increase the ref-count of "entryX". + // + // The reason we choose to collapse the ref chain while applying ref edit is that doing GC on a + // non-collapse ref chain is much harder and long ref chain make the time of accessing an entry + // not stable. + + auto [resolve_success, resolved_id, resolved_ver] = [&mvcc_table_directory, ori_page_id = rec.ori_page_id](PageIdV3Internal id_to_resolve, PageVersion ver_to_resolve) -> std::tuple { while (true) { @@ -993,13 +1080,11 @@ void PageDirectory::applyRefEditRecord( return {false, buildV3Id(0, 0), PageVersion(0)}; const VersionedPageEntriesPtr & resolve_version_list = resolve_ver_iter->second; - // If we already hold the lock from `id_to_resolve`, then we should not request it again. - // This can happen when `id_to_resolve` have other operating in current writebatch - auto [need_collapse, next_id_to_resolve, next_ver_to_resolve] = resolve_version_list->resolveToPageId( + auto [resolve_state, next_id_to_resolve, next_ver_to_resolve] = resolve_version_list->resolveToPageId( ver_to_resolve.sequence, - /*check_prev=*/true, + /*ignore_delete=*/id_to_resolve != ori_page_id, nullptr); - switch (need_collapse) + switch (resolve_state) { case VersionedPageEntries::RESOLVE_FAIL: return {false, id_to_resolve, ver_to_resolve}; @@ -1026,9 +1111,9 @@ void PageDirectory::applyRefEditRecord( resolved_id, resolved_ver)); } + // use the resolved_id to collapse ref chain 3->2, 2->1 ==> 3->1 bool is_ref_created = version_list->createNewRef(version, resolved_id); - if (is_ref_created) { // Add the ref-count of being-ref entry @@ -1318,7 +1403,7 @@ PageEntriesV3 PageDirectory::gcInMemEntries(bool keep_last_valid_var_entry) const bool all_deleted = iter->second->cleanOutdatedEntries( lowest_seq, &normal_entries_to_deref, - all_del_entries, + &all_del_entries, iter->second->acquireLock(), keep_last_valid_var_entry); @@ -1358,7 +1443,7 @@ PageEntriesV3 PageDirectory::gcInMemEntries(bool keep_last_valid_var_entry) page_id, /*deref_ver=*/deref_counter.first, /*deref_count=*/deref_counter.second, - all_del_entries, + &all_del_entries, keep_last_valid_var_entry); if (all_deleted) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index 95bd5803ee4..faf0b94ae64 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -166,7 +166,7 @@ class VersionedPageEntries RESOLVE_TO_NORMAL, }; std::tuple - resolveToPageId(UInt64 seq, bool check_prev, PageEntryV3 * entry); + resolveToPageId(UInt64 seq, bool ignore_delete, PageEntryV3 * entry); Int64 incrRefCount(const PageVersion & ver); @@ -187,43 +187,21 @@ class VersionedPageEntries std::map & blob_versioned_entries); /** - * GC will give a `lowest_seq`. - * We will find the second entry which `LE` than `lowest_seq`. - * And reclaim all entries before that one. - * If we can't found any entry less than `lowest_seq`. - * Then all entries will be remained. - * - * Ex1. - * entry 1 : seq 2 epoch 0 - * entry 2 : seq 2 epoch 1 - * entry 3 : seq 3 epoch 0 - * entry 4 : seq 4 epoch 0 - * - * lowest_seq : 3 - * Then (entry 1, entry 2) will be delete. - * - * Ex2. - * entry 1 : seq 2 epoch 0 - * entry 2 : seq 2 epoch 1 - * entry 3 : seq 4 epoch 0 - * entry 4 : seq 4 epoch 1 - * - * lowest_seq : 3 - * Then (entry 1) will be delete - * - * Ex3. - * entry 1 : seq 2 epoch 0 - * entry 2 : seq 2 epoch 1 - * entry 3 : seq 4 epoch 0 - * entry 4 : seq 4 epoch 1 - * - * lowest_seq : 1 - * Then no entry should be delete. + * Given a `lowest_seq`, this will clean all outdated entries before `lowest_seq`. + * It takes good care of the entries being ref by another page id. + * + * `normal_entries_to_deref`: Return the informations that the entries need + * to be decreased the ref count by `derefAndClean`. + * The elem is > + * `entries_removed`: Return the entries removed from the version list + * `keep_last_valid_var_entry`: Keep the last valid entry, useful for dumping snapshot. + * + * Return `true` iff this page can be totally removed from the whole `PageDirectory`. */ bool cleanOutdatedEntries( UInt64 lowest_seq, std::map> * normal_entries_to_deref, - PageEntriesV3 & entries_removed, + PageEntriesV3 * entries_removed, const PageLock & page_lock, bool keep_last_valid_var_entry = false); bool derefAndClean( @@ -231,7 +209,7 @@ class VersionedPageEntries PageIdV3Internal page_id, const PageVersion & deref_ver, Int64 deref_count, - PageEntriesV3 & entries_removed, + PageEntriesV3 * entries_removed, bool keep_last_valid_var_entry = false); void collapseTo(UInt64 seq, PageIdV3Internal page_id, PageEntriesEdit & edit); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp index 5f43f975d2c..0ef470cacad 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -30,10 +30,15 @@ #include #include #include +#include #include #include +#include #include +#include +#include +#include namespace DB { @@ -75,7 +80,7 @@ try auto snap0 = dir->createSnapshot(); EXPECT_ENTRY_NOT_EXIST(dir, 1, snap0); - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -85,7 +90,7 @@ try auto snap1 = dir->createSnapshot(); EXPECT_ENTRY_EQ(entry1, dir, 1, snap1); - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(2, entry2); @@ -102,7 +107,7 @@ try EXPECT_ENTRIES_EQ(expected_entries, dir, ids, snap2); } - PageEntryV3 entry2_v2{.file_id = 2 + 102, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2_v2{.file_id = 2 + 102, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.del(2); @@ -123,7 +128,7 @@ try auto snap0 = dir->createSnapshot(); EXPECT_ENTRY_NOT_EXIST(dir, page_id, snap0); - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(page_id, entry1); @@ -133,7 +138,7 @@ try auto snap1 = dir->createSnapshot(); EXPECT_ENTRY_EQ(entry1, dir, page_id, snap1); - PageEntryV3 entry2{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x1234, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x1234, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(page_id, entry2); @@ -151,7 +156,7 @@ try // Put identical page within one `edit` page_id++; - PageEntryV3 entry3{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x12345, .checksum = 0x4567}; + PageEntryV3 entry3{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x12345, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(page_id, entry1); @@ -172,8 +177,8 @@ CATCH TEST_F(PageDirectoryTest, ApplyPutDelRead) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -185,8 +190,8 @@ try EXPECT_ENTRY_EQ(entry1, dir, 1, snap1); EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); - PageEntryV3 entry3{.file_id = 3, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry4{.file_id = 4, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry3{.file_id = 3, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry4{.file_id = 4, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.del(2); @@ -217,8 +222,8 @@ CATCH TEST_F(PageDirectoryTest, ApplyUpdateOnRefEntries) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -236,14 +241,14 @@ try EXPECT_ENTRY_EQ(entry2, dir, 3, snap1); // Update on ref page is not allowed - PageEntryV3 entry_updated{.file_id = 999, .size = 16, .tag = 0, .offset = 0x123, .checksum = 0x123}; + PageEntryV3 entry_updated{.file_id = 999, .size = 16, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x123}; { PageEntriesEdit edit; edit.put(3, entry_updated); ASSERT_ANY_THROW(dir->apply(std::move(edit))); } - PageEntryV3 entry_updated2{.file_id = 777, .size = 16, .tag = 0, .offset = 0x123, .checksum = 0x123}; + PageEntryV3 entry_updated2{.file_id = 777, .size = 16, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x123}; { PageEntriesEdit edit; edit.put(2, entry_updated2); @@ -255,8 +260,8 @@ CATCH TEST_F(PageDirectoryTest, ApplyDeleteOnRefEntries) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -305,8 +310,8 @@ CATCH TEST_F(PageDirectoryTest, ApplyRefOnRefEntries) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -343,8 +348,8 @@ CATCH TEST_F(PageDirectoryTest, ApplyDuplicatedRefEntries) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -410,8 +415,8 @@ CATCH TEST_F(PageDirectoryTest, ApplyCollapseDuplicatedRefEntries) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -444,30 +449,7 @@ try } CATCH -TEST_F(PageDirectoryTest, ApplyRefToNotExistEntry) -try -{ - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry3{.file_id = 3, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - { - PageEntriesEdit edit; - edit.put(1, entry1); - edit.put(2, entry2); - dir->apply(std::move(edit)); - } - - // Applying ref to not exist entry is not allowed - { // Ref 4-> 999 - PageEntriesEdit edit; - edit.put(3, entry3); - edit.ref(4, 999); - ASSERT_ANY_THROW(dir->apply(std::move(edit))); - } -} -CATCH - -TEST_F(PageDirectoryTest, TestRefWontDeadLock) +TEST_F(PageDirectoryTest, RefWontDeadLock) { PageEntriesEdit edit; { @@ -493,8 +475,10 @@ TEST_F(PageDirectoryTest, TestRefWontDeadLock) dir->apply(std::move(edit2)); } -TEST_F(PageDirectoryTest, NewRefAfterDel) +TEST_F(PageDirectoryTest, IdempotentNewExtPageAfterAllCleaned) { + // Make sure creating ext page after itself and all its reference are clean + // is idempotent { PageEntriesEdit edit; edit.putExternal(10); @@ -534,33 +518,343 @@ TEST_F(PageDirectoryTest, NewRefAfterDel) } } -TEST_F(PageDirectoryTest, RefToExt) +TEST_F(PageDirectoryTest, RefToDeletedPage) +try +{ + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry3{.file_id = 3, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + edit.put(1, entry1); + edit.put(2, entry2); + dir->apply(std::move(edit)); + } + + // Applying ref to not exist entry is not allowed + { // Ref 4-> 999 + PageEntriesEdit edit; + edit.put(3, entry3); + edit.ref(4, 999); + ASSERT_ANY_THROW(dir->apply(std::move(edit))); + } +} +CATCH + +TEST_F(PageDirectoryTest, RefToDeletedPageTwoHops) try { + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; - edit.putExternal(83); + edit.put(1, entry1); dir->apply(std::move(edit)); } { PageEntriesEdit edit; - edit.ref(85, 83); + edit.ref(2, 1); dir->apply(std::move(edit)); } { PageEntriesEdit edit; - edit.del(83); + edit.del(1); dir->apply(std::move(edit)); } - // The external id "83" is not changed, - // we may add ref to external "83" even - // if it is logical delete but have other - // alive reference page. { PageEntriesEdit edit; - edit.ref(86, 83); + edit.ref(3, 1); + ASSERT_ANY_THROW({ dir->apply(std::move(edit)); }); + } +} +CATCH + +TEST_F(PageDirectoryTest, RefToDeletedExtPageTwoHops) +try +{ + { + PageEntriesEdit edit; + edit.putExternal(1); + dir->apply(std::move(edit)); + } + { + PageEntriesEdit edit; + edit.ref(2, 1); + dir->apply(std::move(edit)); + } + { + PageEntriesEdit edit; + edit.del(1); + dir->apply(std::move(edit)); + } + { + PageEntriesEdit edit; + edit.ref(3, 1); + ASSERT_ANY_THROW({ dir->apply(std::move(edit)); }); + } +} +CATCH + +TEST_F(PageDirectoryTest, NewRefAfterDelThreeHops) +try +{ + // Fix issue: https://github.com/pingcap/tiflash/issues/5570 + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + edit.put(951, entry1); + dir->apply(std::move(edit)); + } + + { + PageEntriesEdit edit; + edit.ref(954, 951); + dir->apply(std::move(edit)); + } + + { + PageEntriesEdit edit; + edit.del(951); + edit.del(951); dir->apply(std::move(edit)); } + + { + PageEntriesEdit edit; + edit.ref(972, 954); + edit.ref(985, 954); + dir->apply(std::move(edit)); + } + + { + PageEntriesEdit edit; + edit.del(954); + dir->apply(std::move(edit)); + } + + { + PageEntriesEdit edit; + edit.ref(998, 985); + edit.ref(1011, 985); + dir->apply(std::move(edit)); + } + + auto snap = dir->createSnapshot(); + ASSERT_ENTRY_EQ(entry1, dir, 998, snap); +} +CATCH + +TEST_F(PageDirectoryTest, NewRefAfterDelRandom) +try +{ + PageId id = 50; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + edit.put(id, entry1); + dir->apply(std::move(edit)); + } + + std::unordered_set visible_page_ids{ + id, + }; + + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> distrib(0, 5); + + constexpr static size_t NUM_TEST = 10000; + for (size_t test_round = 0; test_round < NUM_TEST; ++test_round) + { + SCOPED_TRACE(fmt::format("test idx={}", test_round)); + const bool del_in_same_wb = distrib(gen) % 2 == 0; + const bool gc_or_not = distrib(gen) < 1; + LOG_FMT_DEBUG(log, "round={}, del_in_same_wb={}, gc_or_not={}, visible_ids_num={}", test_round, del_in_same_wb, gc_or_not, visible_page_ids.size()); + + // Generate new ref operations to the visible pages + const size_t num_ref_page = distrib(gen) + 1; + std::unordered_map new_ref_page_ids; + std::uniform_int_distribution<> rand_visible_ids(0, visible_page_ids.size() - 1); + for (size_t j = 0; j < num_ref_page; ++j) + { + // random choose a id from all visible id + auto r = rand_visible_ids(gen); + auto rand_it = std::next(std::begin(visible_page_ids), r); + new_ref_page_ids.emplace(++id, *rand_it); + } + + // Generate new delete operations among the visible pages and new-generated ref page + // Delete 1 page at least, delete until 1 page left at most + std::uniform_int_distribution<> rand_delete_ids(0, visible_page_ids.size() + num_ref_page - 1); + const size_t num_del_page = std::min(std::max(rand_delete_ids(gen), 1), visible_page_ids.size() + num_ref_page - 1); + std::unordered_set delete_ref_page_ids; + for (size_t j = 0; j < num_del_page; ++j) + { + // Random choose a id from all visible id and new-generated ref pages. + auto r = rand_delete_ids(gen); + PageId id_to_del = 0; + if (static_cast(r) < visible_page_ids.size()) + { + auto rand_it = std::next(std::begin(visible_page_ids), r); + id_to_del = *rand_it; + } + else + { + auto rand_it = std::next(std::begin(new_ref_page_ids), r - visible_page_ids.size()); + id_to_del = rand_it->first; + } + delete_ref_page_ids.emplace(id_to_del); + } + + // LOG_DEBUG(log, "round={}, create ids: {}", test_round, new_ref_page_ids); + // LOG_DEBUG(log, "round={}, delete ids: {}", test_round, delete_ref_page_ids); + + if (del_in_same_wb) + { + // create ref and del in the same write batch + PageEntriesEdit edit; + for (const auto & x : new_ref_page_ids) + edit.ref(x.first, x.second); + for (const auto x : delete_ref_page_ids) + edit.del(x); + dir->apply(std::move(edit)); + } + else + { + // first create all ref, then del in another write batch + { + PageEntriesEdit edit; + for (const auto & x : new_ref_page_ids) + edit.ref(x.first, x.second); + dir->apply(std::move(edit)); + } + { + PageEntriesEdit edit; + for (const auto x : delete_ref_page_ids) + edit.del(x); + dir->apply(std::move(edit)); + } + } + + for (const auto & x : new_ref_page_ids) + visible_page_ids.insert(x.first); + for (const auto & x : delete_ref_page_ids) + visible_page_ids.erase(x); + + if (gc_or_not) + dir->gcInMemEntries(/*return_removed_entries=*/false); + auto snap = dir->createSnapshot(); + for (const auto & id : visible_page_ids) + { + ASSERT_ENTRY_EQ(entry1, dir, id, snap); + } + } +} +CATCH + +TEST_F(PageDirectoryTest, NewRefToExtAfterDelRandom) +try +{ + PageId id = 50; + { + PageEntriesEdit edit; + edit.putExternal(id); + dir->apply(std::move(edit)); + } + + std::unordered_set visible_page_ids{ + id, + }; + + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> distrib(0, 5); + + constexpr static size_t NUM_TEST = 10000; + for (size_t test_round = 0; test_round < NUM_TEST; ++test_round) + { + SCOPED_TRACE(fmt::format("test idx={}", test_round)); + const bool del_in_same_wb = distrib(gen) % 2 == 0; + const bool gc_or_not = distrib(gen) < 1; + LOG_FMT_DEBUG(log, "round={}, del_in_same_wb={}, gc_or_not={}, visible_ids_num={}", test_round, del_in_same_wb, gc_or_not, visible_page_ids.size()); + + const size_t num_ref_page = distrib(gen) + 1; + std::unordered_map new_ref_page_ids; + std::uniform_int_distribution<> rand_visible_ids(0, visible_page_ids.size() - 1); + for (size_t j = 0; j < num_ref_page; ++j) + { + // random choose a id from all visible id + auto r = rand_visible_ids(gen); + auto rand_it = std::next(std::begin(visible_page_ids), r); + new_ref_page_ids.emplace(++id, *rand_it); + } + + // Delete 1 page at least, delete until 1 page left at most + std::uniform_int_distribution<> rand_delete_ids(0, visible_page_ids.size() + num_ref_page - 1); + const size_t num_del_page = std::min(std::max(rand_delete_ids(gen), 1), visible_page_ids.size() + num_ref_page - 1); + std::unordered_set delete_ref_page_ids; + for (size_t j = 0; j < num_del_page; ++j) + { + auto r = rand_delete_ids(gen); + // random choose a id from all visible id + if (static_cast(r) < visible_page_ids.size()) + { + auto rand_it = std::next(std::begin(visible_page_ids), r); + delete_ref_page_ids.emplace(*rand_it); + } + else + { + auto rand_it = std::next(std::begin(new_ref_page_ids), r - visible_page_ids.size()); + delete_ref_page_ids.emplace(rand_it->first); + } + } + + // LOG_DEBUG(log, "round={}, create ids: {}", test_round, new_ref_page_ids); + // LOG_DEBUG(log, "round={}, delete ids: {}", test_round, delete_ref_page_ids); + + if (del_in_same_wb) + { + PageEntriesEdit edit; + for (const auto & x : new_ref_page_ids) + edit.ref(x.first, x.second); + for (const auto x : delete_ref_page_ids) + edit.del(x); + dir->apply(std::move(edit)); + } + else + { + { + PageEntriesEdit edit; + for (const auto & x : new_ref_page_ids) + edit.ref(x.first, x.second); + dir->apply(std::move(edit)); + } + { + PageEntriesEdit edit; + for (const auto x : delete_ref_page_ids) + edit.del(x); + dir->apply(std::move(edit)); + } + } + + for (const auto & x : new_ref_page_ids) + visible_page_ids.insert(x.first); + for (const auto & x : delete_ref_page_ids) + visible_page_ids.erase(x); + + if (gc_or_not) + { + dir->gcInMemEntries(/*return_removed_entries=*/false); + const auto all_ids = dir->getAllPageIds(); + for (const auto & id : visible_page_ids) + { + EXPECT_GT(all_ids.count(buildV3Id(TEST_NAMESPACE_ID, id)), 0) << fmt::format("cur_id:{}, all_id:{}, visible_ids:{}", id, all_ids, visible_page_ids); + } + } + auto snap = dir->createSnapshot(); + auto alive_ids = dir->getAliveExternalIds(TEST_NAMESPACE_ID); + EXPECT_EQ(alive_ids.size(), 1); + EXPECT_GT(alive_ids.count(50), 0); + } } CATCH @@ -628,12 +922,12 @@ try } CATCH -#define INSERT_BLOBID_ENTRY(BLOBID, VERSION) \ - PageEntryV3 entry_v##VERSION{.file_id = (BLOBID), .size = (VERSION), .tag = 0, .offset = 0x123, .checksum = 0x4567}; \ +#define INSERT_BLOBID_ENTRY(BLOBID, VERSION) \ + PageEntryV3 entry_v##VERSION{.file_id = (BLOBID), .size = (VERSION), .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; \ entries.createNewEntry(PageVersion(VERSION), entry_v##VERSION); #define INSERT_ENTRY(VERSION) INSERT_BLOBID_ENTRY(1, VERSION) -#define INSERT_GC_ENTRY(VERSION, EPOCH) \ - PageEntryV3 entry_gc_v##VERSION##_##EPOCH{.file_id = 2, .size = 100 * (VERSION) + (EPOCH), .tag = 0, .offset = 0x234, .checksum = 0x5678}; \ +#define INSERT_GC_ENTRY(VERSION, EPOCH) \ + PageEntryV3 entry_gc_v##VERSION##_##EPOCH{.file_id = 2, .size = 100 * (VERSION) + (EPOCH), .padded_size = 0, .tag = 0, .offset = 0x234, .checksum = 0x5678}; \ entries.createNewEntry(PageVersion((VERSION), (EPOCH)), entry_gc_v##VERSION##_##EPOCH); class VersionedEntriesTest : public ::testing::Test @@ -644,14 +938,14 @@ class VersionedEntriesTest : public ::testing::Test { DerefCounter deref_counter; PageEntriesV3 removed_entries; - bool all_removed = entries.cleanOutdatedEntries(seq, &deref_counter, removed_entries, entries.acquireLock()); + bool all_removed = entries.cleanOutdatedEntries(seq, &deref_counter, &removed_entries, entries.acquireLock()); return {all_removed, removed_entries, deref_counter}; } std::tuple runDeref(UInt64 seq, PageVersion ver, Int64 decrease_num) { PageEntriesV3 removed_entries; - bool all_removed = entries.derefAndClean(seq, buildV3Id(TEST_NAMESPACE_ID, page_id), ver, decrease_num, removed_entries); + bool all_removed = entries.derefAndClean(seq, buildV3Id(TEST_NAMESPACE_ID, page_id), ver, decrease_num, &removed_entries); return {all_removed, removed_entries}; } @@ -847,9 +1141,11 @@ try } CATCH -TEST_F(VersionedEntriesTest, GC) +TEST_F(VersionedEntriesTest, CleanOutdateVersions) try { + // Test running gc on a single page, it should clean all + // outdated versions. INSERT_ENTRY(2); INSERT_GC_ENTRY(2, 1); INSERT_ENTRY(5); @@ -1271,12 +1567,12 @@ class PageDirectoryGCTest : public PageDirectoryTest { }; -#define INSERT_ENTRY_TO(PAGE_ID, VERSION, BLOB_FILE_ID) \ - PageEntryV3 entry_v##VERSION{.file_id = (BLOB_FILE_ID), .size = (VERSION), .tag = 0, .offset = 0x123, .checksum = 0x4567}; \ - { \ - PageEntriesEdit edit; \ - edit.put((PAGE_ID), entry_v##VERSION); \ - dir->apply(std::move(edit)); \ +#define INSERT_ENTRY_TO(PAGE_ID, VERSION, BLOB_FILE_ID) \ + PageEntryV3 entry_v##VERSION{.file_id = (BLOB_FILE_ID), .size = (VERSION), .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; \ + { \ + PageEntriesEdit edit; \ + edit.put((PAGE_ID), entry_v##VERSION); \ + dir->apply(std::move(edit)); \ } // Insert an entry into mvcc directory #define INSERT_ENTRY(PAGE_ID, VERSION) INSERT_ENTRY_TO(PAGE_ID, VERSION, 1) @@ -1566,7 +1862,7 @@ try INSERT_ENTRY_ACQ_SNAP(page_id, 5); INSERT_ENTRY(another_page_id, 6); INSERT_ENTRY(another_page_id, 7); - PageEntryV3 entry_v8{.file_id = 1, .size = 8, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_v8{.file_id = 1, .size = 8, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.del(page_id); @@ -1756,7 +2052,7 @@ TEST_F(PageDirectoryGCTest, GCOnRefedEntries) try { // 10->entry1, 11->10=>11->entry1; del 10->entry1 - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(10, entry1); @@ -1793,7 +2089,7 @@ TEST_F(PageDirectoryGCTest, GCOnRefedEntries2) try { // 10->entry1, 11->10=>11->entry1; del 10->entry1 - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(10, entry1); @@ -1836,7 +2132,7 @@ TEST_F(PageDirectoryGCTest, UpsertOnRefedEntries) try { // 10->entry1, 11->10, 12->10 - PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(10, entry1); @@ -1860,7 +2156,7 @@ try } // upsert 10->entry2 - PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; auto full_gc_entries = dir->getEntriesByBlobIds({1}); @@ -1961,8 +2257,8 @@ try } { PageEntriesEdit edit; // split - edit.ref(357, 352); - edit.ref(359, 352); + edit.ref(357, 353); + edit.ref(359, 353); dir->apply(std::move(edit)); } { @@ -2024,10 +2320,10 @@ try return d; }; - PageEntryV3 entry_1_v1{.file_id = 1, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_1_v2{.file_id = 1, .size = 2, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_2_v1{.file_id = 2, .size = 1, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_2_v2{.file_id = 2, .size = 2, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_1_v1{.file_id = 1, .size = 1, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_1_v2{.file_id = 1, .size = 2, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_2_v1{.file_id = 2, .size = 1, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_2_v2{.file_id = 2, .size = 2, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry_1_v1); @@ -2055,8 +2351,8 @@ try // 10->ext, 11->10, del 10->ext // 50->entry, 51->50, 52->51=>50, del 50 - PageEntryV3 entry_50{.file_id = 1, .size = 50, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_60{.file_id = 1, .size = 90, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_50{.file_id = 1, .size = 50, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_60{.file_id = 1, .size = 90, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.del(2); @@ -2218,9 +2514,9 @@ try Poco::File(fmt::format("{}/{}{}", path, BlobFile::BLOB_PREFIX_NAME, file_id1)).createFile(); Poco::File(fmt::format("{}/{}{}", path, BlobFile::BLOB_PREFIX_NAME, file_id2)).createFile(); - PageEntryV3 entry_1_v1{.file_id = file_id1, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_5_v1{.file_id = file_id2, .size = 255, .tag = 0, .offset = 0x100, .checksum = 0x4567}; - PageEntryV3 entry_5_v2{.file_id = file_id2, .size = 255, .tag = 0, .offset = 0x400, .checksum = 0x4567}; + PageEntryV3 entry_1_v1{.file_id = file_id1, .size = 7890, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_5_v1{.file_id = file_id2, .size = 255, .padded_size = 0, .tag = 0, .offset = 0x100, .checksum = 0x4567}; + PageEntryV3 entry_5_v2{.file_id = file_id2, .size = 255, .padded_size = 0, .tag = 0, .offset = 0x400, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry_1_v1); @@ -2276,8 +2572,8 @@ CATCH TEST_F(PageDirectoryGCTest, CleanAfterDecreaseRef) try { - PageEntryV3 entry_50_1{.file_id = 1, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry_50_2{.file_id = 2, .size = 7890, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_50_1{.file_id = 1, .size = 7890, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_50_2{.file_id = 2, .size = 7890, .padded_size = 0, .tag = 0, .offset = 0x123, .checksum = 0x4567}; auto restore_from_edit = [](const PageEntriesEdit & edit) { auto ctx = ::DB::tests::TiFlashTestEnv::getContext(); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp index 7ae771820a0..f166e40574c 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp @@ -26,7 +26,6 @@ namespace DB { -using namespace DM; using namespace tests; namespace PS::V3::tests { @@ -66,13 +65,13 @@ class PageStorageMixedTest : public DB::base::TiFlashStorageTestBasic storage_path_pool_v2 = std::make_unique(Strings{path}, Strings{path}, "test", "t1", true, cap_metrics, global_context.getFileProvider()); global_context.setPageStorageRunMode(PageStorageRunMode::ONLY_V2); - storage_pool_v2 = std::make_unique(global_context, TEST_NAMESPACE_ID, *storage_path_pool_v2, "test.t1"); + storage_pool_v2 = std::make_unique(global_context, TEST_NAMESPACE_ID, *storage_path_pool_v2, "test.t1"); global_context.setPageStorageRunMode(PageStorageRunMode::MIX_MODE); - storage_pool_mix = std::make_unique(global_context, - TEST_NAMESPACE_ID, - *storage_path_pool_v2, - "test.t1"); + storage_pool_mix = std::make_unique(global_context, + TEST_NAMESPACE_ID, + *storage_path_pool_v2, + "test.t1"); reloadV2StoragePool(); } @@ -97,8 +96,8 @@ class PageStorageMixedTest : public DB::base::TiFlashStorageTestBasic protected: std::unique_ptr storage_path_pool_v2; static std::unique_ptr storage_path_pool_v3; - std::unique_ptr storage_pool_v2; - std::unique_ptr storage_pool_mix; + std::unique_ptr storage_pool_v2; + std::unique_ptr storage_pool_mix; PageWriterPtr page_writer_v2; PageWriterPtr page_writer_mix; @@ -620,7 +619,7 @@ try { LOG_FMT_INFO(logger, "remove 100, create 105"); - StorageSnapshot snap(*storage_pool_mix, nullptr, "xxx", true); // must hold and write + DM::StorageSnapshot snap(*storage_pool_mix, nullptr, "xxx", true); // must hold and write // write delete again WriteBatch batch; batch.delPage(100); @@ -630,7 +629,7 @@ try } { LOG_FMT_INFO(logger, "remove 101, create 106"); - StorageSnapshot snap(*storage_pool_mix, nullptr, "xxx", true); // must hold and write + DM::StorageSnapshot snap(*storage_pool_mix, nullptr, "xxx", true); // must hold and write // write delete again WriteBatch batch; batch.delPage(101); @@ -640,7 +639,7 @@ try } { LOG_FMT_INFO(logger, "remove 102, create 107"); - StorageSnapshot snap(*storage_pool_mix, nullptr, "xxx", true); // must hold and write + DM::StorageSnapshot snap(*storage_pool_mix, nullptr, "xxx", true); // must hold and write // write delete again WriteBatch batch; batch.delPage(102); diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index bc186cd555d..9c651470018 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -258,7 +258,7 @@ void StorageDeltaMerge::updateTableColumnInfo() { throw PrimaryKeyNotMatchException(*pks.begin(), actual_pri_keys[0]); } - // fallover + // fallthrough } // Unknown bug, throw an exception. diff --git a/dbms/src/Storages/tests/TiFlashStorageTestBasic.h b/dbms/src/Storages/tests/TiFlashStorageTestBasic.h index 1cdbb2b6f25..22a6fdbcae6 100644 --- a/dbms/src/Storages/tests/TiFlashStorageTestBasic.h +++ b/dbms/src/Storages/tests/TiFlashStorageTestBasic.h @@ -19,8 +19,6 @@ namespace DB { namespace base { -using namespace DB::tests; - class TiFlashStorageTestBasic : public ::testing::Test { public: @@ -82,7 +80,7 @@ class TiFlashStorageTestBasic : public ::testing::Test * TiFlashTestEnv::findTestDataPath. We may need to check the files on "./tmp/xxx" if some storage test failed. * So instead of dropping data after cases run, we drop data before running each test case. */ - return TiFlashTestEnv::getTemporaryPath(getCurrentFullTestName().c_str()); + return DB::tests::TiFlashTestEnv::getTemporaryPath(getCurrentFullTestName().c_str()); } protected: @@ -110,7 +108,7 @@ class TiFlashStorageTestBasic : public ::testing::Test { Strings test_paths; test_paths.push_back(getTemporaryPath()); - db_context = std::make_unique(TiFlashTestEnv::getContext(db_settings, test_paths)); + db_context = std::make_unique(DB::tests::TiFlashTestEnv::getContext(db_settings, test_paths)); } protected: