diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index ccb2d87efb2..45b9103f69f 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -73,9 +73,6 @@ std::unordered_map> FailPointHelper::f #define APPLY_FOR_FAILPOINTS(M) \ M(skip_check_segment_update) \ - M(gc_skip_update_safe_point) \ - M(gc_skip_merge_delta) \ - M(gc_skip_merge) \ M(force_set_page_file_write_errno) \ M(force_split_io_size_4k) \ M(minimum_block_size_for_cross_join) \ diff --git a/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp b/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp index 4b940f6f27e..a940e54aa30 100644 --- a/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp +++ b/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp @@ -168,7 +168,7 @@ void MockRaftCommand::dbgFuncRegionSnapshotWithData(Context & context, const AST // Mock to apply a snapshot with data in `region` auto & tmt = context.getTMTContext(); - context.getTMTContext().getKVStore()->checkAndApplySnapshot(region, tmt); + context.getTMTContext().getKVStore()->checkAndApplyPreHandledSnapshot(region, tmt); output(fmt::format("put region #{}, range{} to table #{} with {} records", region_id, range_string, table_id, cnt)); } @@ -573,7 +573,7 @@ void MockRaftCommand::dbgFuncRegionSnapshotApplyBlock(Context & context, const A auto region_id = static_cast(safeGet(typeid_cast(*args.front()).value)); auto [region, block_cache] = GLOBAL_REGION_MAP.popRegionCache("__snap_" + std::to_string(region_id)); auto & tmt = context.getTMTContext(); - context.getTMTContext().getKVStore()->checkAndApplySnapshot({region, std::move(block_cache)}, tmt); + context.getTMTContext().getKVStore()->checkAndApplyPreHandledSnapshot({region, std::move(block_cache)}, tmt); output(fmt::format("success apply {} with block cache", region->id())); } @@ -789,7 +789,7 @@ void MockRaftCommand::dbgFuncRegionSnapshotApplyDTFiles(Context & context, const const auto region_name = "__snap_snap_" + std::to_string(region_id); auto [new_region, external_files] = GLOBAL_REGION_MAP.popRegionSnap(region_name); auto & tmt = context.getTMTContext(); - context.getTMTContext().getKVStore()->checkAndApplySnapshot( + context.getTMTContext().getKVStore()->checkAndApplyPreHandledSnapshot( RegionPtrWithSnapshotFiles{new_region, std::move(external_files)}, tmt); diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index e05e97c0f83..fca90d17c20 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -424,23 +424,26 @@ TEST_F(TestMPPTunnel, WriteError) TEST_F(TestMPPTunnel, WriteAfterFinished) { + std::unique_ptr writer_ptr = nullptr; + MPPTunnelPtr mpp_tunnel_ptr = nullptr; try { - auto mpp_tunnel_ptr = constructRemoteSyncTunnel(); - std::unique_ptr writer_ptr = std::make_unique(); + mpp_tunnel_ptr = constructRemoteSyncTunnel(); + writer_ptr = std::make_unique(); mpp_tunnel_ptr->connect(writer_ptr.get()); GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); mpp_tunnel_ptr->close("Canceled", false); auto data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); - mpp_tunnel_ptr->waitForFinish(); GTEST_FAIL(); } catch (Exception & e) { GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed,"); } + if (mpp_tunnel_ptr != nullptr) + mpp_tunnel_ptr->waitForFinish(); } /// Test Local MPPTunnel diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index d934d455f47..e1164d9d101 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -49,6 +49,7 @@ using DMContextPtr = std::shared_ptr; using NotCompress = std::unordered_set; using SegmentIdSet = std::unordered_set; struct ExternalDTFileInfo; +struct GCOptions; inline static const PageId DELTA_MERGE_FIRST_SEGMENT_ID = 1; @@ -340,7 +341,7 @@ class DeltaMergeStore : private boost::noncopyable void compact(const Context & context, const RowKeyRange & range); /// Iterator over all segments and apply gc jobs. - UInt64 onSyncGc(Int64 limit); + UInt64 onSyncGc(Int64 limit, const GCOptions & gc_options); /** * Try to merge the segment in the current thread as the GC operation. diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp index 4202430d362..5b4f8db1567 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -29,9 +30,6 @@ namespace DB { namespace FailPoints { -extern const char gc_skip_update_safe_point[]; -extern const char gc_skip_merge_delta[]; -extern const char gc_skip_merge[]; extern const char pause_before_dt_background_delta_merge[]; extern const char pause_until_dt_background_delta_merge[]; } // namespace FailPoints @@ -517,10 +515,6 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, const SegmentPtr & segment) { - fiu_do_on(FailPoints::gc_skip_merge, { - return {}; - }); - auto segment_rows = segment->getEstimatedRows(); auto segment_bytes = segment->getEstimatedBytes(); if (segment_rows >= dm_context->small_segment_rows || segment_bytes >= dm_context->small_segment_bytes) @@ -560,10 +554,6 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, c SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta(const DMContextPtr & dm_context, const SegmentPtr & segment, const SegmentPtr & prev_segment, const SegmentPtr & next_segment, DB::Timestamp gc_safe_point) { - fiu_do_on(FailPoints::gc_skip_merge_delta, { - return {}; - }); - SegmentSnapshotPtr segment_snap; { std::shared_lock lock(read_write_mutex); @@ -684,20 +674,13 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta(const DMContextPtr & dm_conte return new_segment; } -UInt64 DeltaMergeStore::onSyncGc(Int64 limit) +UInt64 DeltaMergeStore::onSyncGc(Int64 limit, const GCOptions & gc_options) { if (shutdown_called.load(std::memory_order_relaxed)) return 0; - bool skip_update_safe_point = false; - fiu_do_on(FailPoints::gc_skip_update_safe_point, { - skip_update_safe_point = true; - }); - if (!skip_update_safe_point) - { - if (!updateGCSafePoint()) - return 0; - } + if (gc_options.update_safe_point && !updateGCSafePoint()) + return 0; { std::shared_lock lock(read_write_mutex); @@ -712,9 +695,10 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) DB::Timestamp gc_safe_point = latest_gc_safe_point.load(std::memory_order_acquire); LOG_TRACE(log, - "GC on table {} start with key: {}, gc_safe_point: {}, max gc limit: {}", + "GC on table start, table={} check_key={} options={} gc_safe_point={} max_gc_limit={}", table_name, next_gc_check_key.toDebugString(), + gc_options.toString(), gc_safe_point, limit); @@ -764,9 +748,9 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) try { SegmentPtr new_seg = nullptr; - if (!new_seg) + if (!new_seg && gc_options.do_merge) new_seg = gcTrySegmentMerge(dm_context, segment); - if (!new_seg) + if (!new_seg && gc_options.do_merge_delta) new_seg = gcTrySegmentMergeDelta(dm_context, segment, prev_segment, next_segment, gc_safe_point); if (!new_seg) diff --git a/dbms/src/Storages/DeltaMerge/GCOptions.h b/dbms/src/Storages/DeltaMerge/GCOptions.h new file mode 100644 index 00000000000..058c8f5dc52 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/GCOptions.h @@ -0,0 +1,66 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#pragma once + +namespace DB +{ +namespace DM +{ + +struct GCOptions +{ + bool do_merge = true; + bool do_merge_delta = true; + bool update_safe_point = true; + + static GCOptions newAll() + { + return GCOptions{ + .do_merge = true, + .do_merge_delta = true, + .update_safe_point = true, + }; + } + + static GCOptions newNoneForTest() + { + return GCOptions{ + .do_merge = false, + .do_merge_delta = false, + .update_safe_point = false, + }; + } + + static GCOptions newAllForTest() + { + return GCOptions{ + .do_merge = true, + .do_merge_delta = true, + .update_safe_point = false, + }; + } + + std::string toString() const + { + return fmt::format("", do_merge, do_merge_delta, update_safe_point); + } +}; + +} // namespace DM +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index c5d48f0da4f..dc744912669 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -39,7 +39,6 @@ namespace DB { namespace FailPoints { -extern const char gc_skip_update_safe_point[]; extern const char pause_before_dt_background_delta_merge[]; extern const char pause_until_dt_background_delta_merge[]; extern const char force_triggle_background_merge_delta[]; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_store_background.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_store_background.cpp index 2164a1c0a43..e1739d2138a 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_store_background.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_store_background.cpp @@ -13,6 +13,8 @@ // limitations under the License. #include +#include +#include #include #include @@ -21,9 +23,6 @@ namespace DB { namespace FailPoints { -extern const char gc_skip_update_safe_point[]; -extern const char gc_skip_merge_delta[]; -extern const char gc_skip_merge[]; extern const char skip_check_segment_update[]; } // namespace FailPoints @@ -39,7 +38,6 @@ class DeltaMergeStoreGCTest public: void SetUp() override { - FailPointHelper::enableFailPoint(FailPoints::gc_skip_update_safe_point); FailPointHelper::enableFailPoint(FailPoints::skip_check_segment_update); SimplePKTestBasic::SetUp(); global_settings_backup = db_context->getGlobalContext().getSettings(); @@ -49,7 +47,6 @@ class DeltaMergeStoreGCTest { SimplePKTestBasic::TearDown(); FailPointHelper::disableFailPoint(FailPoints::skip_check_segment_update); - FailPointHelper::disableFailPoint(FailPoints::gc_skip_update_safe_point); db_context->getGlobalContext().setSettings(global_settings_backup); } @@ -63,15 +60,13 @@ class DeltaMergeStoreGCMergeTest : public DeltaMergeStoreGCTest public: void SetUp() override { - FailPointHelper::enableFailPoint(FailPoints::gc_skip_merge_delta); + gc_options = GCOptions::newNoneForTest(); + gc_options.do_merge = true; DeltaMergeStoreGCTest::SetUp(); } - void TearDown() override - { - DeltaMergeStoreGCTest::TearDown(); - FailPointHelper::disableFailPoint(FailPoints::gc_skip_merge_delta); - } +protected: + GCOptions gc_options{}; }; TEST_F(DeltaMergeStoreGCMergeTest, MergeMultipleSegments) @@ -80,7 +75,7 @@ try ensureSegmentBreakpoints({0, 10, 40, 100}); ASSERT_EQ(std::vector({0, 10, 40, 100}), getSegmentBreakpoints()); - auto gc_n = store->onSyncGc(1); + auto gc_n = store->onSyncGc(1, gc_options); ASSERT_EQ(std::vector{}, getSegmentBreakpoints()); ASSERT_EQ(gc_n, 1); ASSERT_EQ(0, getRowsN()); @@ -98,23 +93,23 @@ try ASSERT_EQ(2000, getRowsN()); db_context->getGlobalContext().getSettingsRef().dt_segment_limit_rows = 10; - gc_n = store->onSyncGc(100); + gc_n = store->onSyncGc(100, gc_options); ASSERT_EQ(std::vector({0, 50, 100, 150, 200}), getSegmentBreakpoints()); ASSERT_EQ(gc_n, 0); // In this case, merge two segments will exceed small_segment_rows, so no merge will happen db_context->getGlobalContext().getSettingsRef().dt_segment_limit_rows = 55 * 3; - gc_n = store->onSyncGc(100); + gc_n = store->onSyncGc(100, gc_options); ASSERT_EQ(std::vector({0, 50, 100, 150, 200}), getSegmentBreakpoints()); ASSERT_EQ(gc_n, 0); // In this case, we will only merge two segments and then stop. db_context->getGlobalContext().getSettingsRef().dt_segment_limit_rows = 105 * 3; - gc_n = store->onSyncGc(100); + gc_n = store->onSyncGc(100, gc_options); ASSERT_EQ(std::vector({0, 100, 200}), getSegmentBreakpoints()); ASSERT_EQ(gc_n, 2); - gc_n = store->onSyncGc(100); + gc_n = store->onSyncGc(100, gc_options); ASSERT_EQ(std::vector({0, 100, 200}), getSegmentBreakpoints()); ASSERT_EQ(gc_n, 0); @@ -135,7 +130,7 @@ try // In this case, we will only merge two segments and then stop. db_context->getGlobalContext().getSettingsRef().dt_segment_limit_rows = 105 * 3; - auto gc_n = store->onSyncGc(1); + auto gc_n = store->onSyncGc(1, gc_options); ASSERT_EQ(std::vector({0, 100, 150, 200}), getSegmentBreakpoints()); ASSERT_EQ(gc_n, 1); @@ -165,7 +160,7 @@ try sp_flush_commit.waitAndPause(); auto th_gc = std::async([&]() { - auto gc_n = store->onSyncGc(1); + auto gc_n = store->onSyncGc(1, gc_options); ASSERT_EQ(gc_n, 1); ASSERT_EQ(store->segments.size(), 1); }); @@ -190,15 +185,13 @@ class DeltaMergeStoreGCMergeDeltaTest : public DeltaMergeStoreGCTest public: void SetUp() override { - FailPointHelper::enableFailPoint(FailPoints::gc_skip_merge); + gc_options = GCOptions::newNoneForTest(); + gc_options.do_merge_delta = true; DeltaMergeStoreGCTest::SetUp(); } - void TearDown() override - { - DeltaMergeStoreGCTest::TearDown(); - FailPointHelper::disableFailPoint(FailPoints::gc_skip_merge); - } +protected: + GCOptions gc_options{}; }; @@ -208,19 +201,19 @@ try db_context->getSettingsRef().dt_segment_stable_pack_rows = 107; // for mergeDelta db_context->getGlobalContext().getSettingsRef().dt_segment_stable_pack_rows = 107; // for GC - auto gc_n = store->onSyncGc(1); + auto gc_n = store->onSyncGc(1, gc_options); ASSERT_EQ(0, gc_n); fill(0, 1000); flush(); mergeDelta(); - gc_n = store->onSyncGc(1); + gc_n = store->onSyncGc(1, gc_options); ASSERT_EQ(0, gc_n); // Segments that are just logical splited out should not trigger merge delta at all. ensureSegmentBreakpoints({500}, /* logical_split */ true); - gc_n = store->onSyncGc(1); + gc_n = store->onSyncGc(1, gc_options); ASSERT_EQ(0, gc_n); ASSERT_EQ(2, store->segments.size()); @@ -229,7 +222,7 @@ try // Segments that are just logical splited out should not trigger merge delta at all. ensureSegmentBreakpoints({150, 500}, /* logical_split */ true); - gc_n = store->onSyncGc(1); + gc_n = store->onSyncGc(1, gc_options); ASSERT_EQ(0, gc_n); ASSERT_EQ(3, store->segments.size()); @@ -241,7 +234,7 @@ try mergeDelta(1000, 1001); ASSERT_EQ(500, getSegmentAt(600)->getStable()->getDMFilesRows()); - gc_n = store->onSyncGc(100); + gc_n = store->onSyncGc(100, gc_options); ASSERT_EQ(1, gc_n); ASSERT_EQ(3, store->segments.size()); @@ -250,7 +243,7 @@ try ASSERT_EQ(500, getSegmentAt(600)->getStable()->getDMFilesRows()); // Trigger GC again, more segments will be merged delta - gc_n = store->onSyncGc(100); + gc_n = store->onSyncGc(100, gc_options); ASSERT_EQ(1, gc_n); ASSERT_EQ(3, store->segments.size()); @@ -259,7 +252,7 @@ try ASSERT_EQ(500, getSegmentAt(600)->getStable()->getDMFilesRows()); // Trigger GC again, no more merge delta. - gc_n = store->onSyncGc(100); + gc_n = store->onSyncGc(100, gc_options); ASSERT_EQ(0, gc_n); ASSERT_EQ(3, store->segments.size()); } @@ -274,7 +267,7 @@ try flush(); mergeDelta(); - auto gc_n = store->onSyncGc(100); + auto gc_n = store->onSyncGc(100, gc_options); ASSERT_EQ(0, gc_n); } CATCH @@ -288,7 +281,7 @@ try flush(); mergeDelta(); - auto gc_n = store->onSyncGc(100); + auto gc_n = store->onSyncGc(100, gc_options); ASSERT_EQ(0, gc_n); } CATCH @@ -302,7 +295,7 @@ try ensureSegmentBreakpoints({100, 200, 300}); - auto gc_n = store->onSyncGc(100); + auto gc_n = store->onSyncGc(100, gc_options); ASSERT_EQ(0, gc_n); } CATCH @@ -324,11 +317,11 @@ try auto pack_n = static_cast(std::ceil(200.0 / static_cast(pack_size))); EXPECT_EQ(pack_n, getSegmentAt(0)->getStable()->getDMFilesPacks()); - auto gc_n = store->onSyncGc(100); + auto gc_n = store->onSyncGc(100, gc_options); EXPECT_EQ(0, gc_n); ensureSegmentBreakpoints({10, 190}, /* logical_split */ true); - gc_n = store->onSyncGc(100); + gc_n = store->onSyncGc(100, gc_options); EXPECT_EQ(0, gc_n); mergeDelta(0, 1); @@ -344,20 +337,20 @@ try { // The segment [10, 190) only overlaps with 1 pack and is contained by the pack. // Even it contains most of the data, it will still be GCed. - gc_n = store->onSyncGc(50); + gc_n = store->onSyncGc(50, gc_options); EXPECT_EQ(1, gc_n); EXPECT_EQ(1, getSegmentAt(150)->getStable()->getDMFilesPacks()); EXPECT_EQ(180, getSegmentAt(150)->getStable()->getDMFilesRows()); // There should be no more GCs. - gc_n = store->onSyncGc(100); + gc_n = store->onSyncGc(100, gc_options); EXPECT_EQ(0, gc_n); } else if (pack_size == 7) { // When pack size is small, we will more precisely know that most of the DTFile is still valid. // So in this case, no GC will happen. - gc_n = store->onSyncGc(50); + gc_n = store->onSyncGc(50, gc_options); EXPECT_EQ(0, gc_n); } else @@ -379,18 +372,18 @@ try flush(); mergeDelta(); - auto gc_n = store->onSyncGc(100); + auto gc_n = store->onSyncGc(100, gc_options); EXPECT_EQ(0, gc_n); ensureSegmentBreakpoints({10}, /* logical_split */ true); - gc_n = store->onSyncGc(100); + gc_n = store->onSyncGc(100, gc_options); EXPECT_EQ(0, gc_n); mergeDelta(0, 1); EXPECT_EQ(10, getSegmentAt(0)->getStable()->getDMFilesRows()); EXPECT_EQ(400, getSegmentAt(150)->getStable()->getDMFilesRows()); - gc_n = store->onSyncGc(100); + gc_n = store->onSyncGc(100, gc_options); EXPECT_EQ(0, gc_n); EXPECT_EQ(10, getSegmentAt(0)->getStable()->getDMFilesRows()); EXPECT_EQ(400, getSegmentAt(150)->getStable()->getDMFilesRows()); @@ -408,24 +401,24 @@ try flush(); mergeDelta(); - auto gc_n = store->onSyncGc(100); + auto gc_n = store->onSyncGc(100, gc_options); EXPECT_EQ(0, gc_n); ensureSegmentBreakpoints({10}, /* logical_split */ true); - gc_n = store->onSyncGc(100); + gc_n = store->onSyncGc(100, gc_options); EXPECT_EQ(0, gc_n); mergeDelta(100, 101); EXPECT_EQ(400, getSegmentAt(0)->getStable()->getDMFilesRows()); EXPECT_EQ(390, getSegmentAt(150)->getStable()->getDMFilesRows()); - gc_n = store->onSyncGc(100); + gc_n = store->onSyncGc(100, gc_options); EXPECT_EQ(1, gc_n); EXPECT_EQ(10, getSegmentAt(0)->getStable()->getDMFilesRows()); EXPECT_EQ(390, getSegmentAt(150)->getStable()->getDMFilesRows()); // GC again does not introduce new changes - gc_n = store->onSyncGc(100); + gc_n = store->onSyncGc(100, gc_options); EXPECT_EQ(0, gc_n); EXPECT_EQ(10, getSegmentAt(0)->getStable()->getDMFilesRows()); EXPECT_EQ(390, getSegmentAt(150)->getStable()->getDMFilesRows()); @@ -437,6 +430,7 @@ CATCH TEST_F(DeltaMergeStoreGCTest, RandomShuffleLogicalSplitAndDeleteRange) try { + auto gc_options = GCOptions::newAllForTest(); // TODO: Better to be fuzz tests, in order to reach edge cases efficiently. std::random_device rd; @@ -490,14 +484,14 @@ try // Finally, let's do GCs. We should expect everything are reclaimed within 10 rounds of GC. for (size_t gc_round = 0; gc_round < 10; gc_round++) - store->onSyncGc(100); + store->onSyncGc(100, gc_options); // Check whether we have reclaimed everything EXPECT_EQ(store->segments.size(), 1); EXPECT_EQ(getSegmentAt(0)->getStable()->getDMFilesPacks(), 0); // No more GCs are needed. - EXPECT_EQ(0, store->onSyncGc(100)); + EXPECT_EQ(0, store->onSyncGc(100, gc_options)); } } } diff --git a/dbms/src/Storages/GCManager.cpp b/dbms/src/Storages/GCManager.cpp index abaf2e9477d..9f3aed75a42 100644 --- a/dbms/src/Storages/GCManager.cpp +++ b/dbms/src/Storages/GCManager.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -76,7 +77,7 @@ bool GCManager::work() // Block this thread and do GC on the storage // It is OK if any schema changes is apply to the storage while doing GC, so we // do not acquire structure lock on the storage. - auto gc_segments_num = storage->onSyncGc(gc_segments_limit); + auto gc_segments_num = storage->onSyncGc(gc_segments_limit, DM::GCOptions::newAll()); gc_segments_limit = gc_segments_limit - gc_segments_num; LOG_TRACE(log, "GCManager gc {} segments of table {}", gc_segments_num, storage->getTableInfo().id); // Reach the limit on the number of segments to be gc, stop here diff --git a/dbms/src/Storages/IManageableStorage.h b/dbms/src/Storages/IManageableStorage.h index 2ff766a9c6d..1ce312493e0 100644 --- a/dbms/src/Storages/IManageableStorage.h +++ b/dbms/src/Storages/IManageableStorage.h @@ -37,7 +37,8 @@ class Region; namespace DM { struct RowKeyRange; -} +struct GCOptions; +} // namespace DM using BlockUPtr = std::unique_ptr; /** @@ -77,7 +78,7 @@ class IManageableStorage : public IStorage virtual void deleteRows(const Context &, size_t /*rows*/) { throw Exception("Unsupported"); } /// `limit` is the max number of segments to gc, return value is the number of segments gced - virtual UInt64 onSyncGc(Int64 /*limit*/) { throw Exception("Unsupported"); } + virtual UInt64 onSyncGc(Int64 /*limit*/, const DM::GCOptions &) { throw Exception("Unsupported"); } /// Return true is data dir exist virtual bool initStoreIfDataDirExist() { throw Exception("Unsupported"); } diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 43afe2caee5..2279b47ee44 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -815,11 +815,11 @@ void StorageDeltaMerge::ingestFiles( clear_data_in_range); } -UInt64 StorageDeltaMerge::onSyncGc(Int64 limit) +UInt64 StorageDeltaMerge::onSyncGc(Int64 limit, const GCOptions & gc_options) { if (storeInited()) { - return _store->onSyncGc(limit); + return _store->onSyncGc(limit, gc_options); } return 0; } diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index 6a29b565a54..fb908e9940b 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -35,6 +35,7 @@ struct RowKeyValue; class DeltaMergeStore; using DeltaMergeStorePtr = std::shared_ptr; struct ExternalDTFileInfo; +struct GCOptions; } // namespace DM class StorageDeltaMerge @@ -91,7 +92,7 @@ class StorageDeltaMerge bool clear_data_in_range, const Settings & settings); - UInt64 onSyncGc(Int64) override; + UInt64 onSyncGc(Int64, const DM::GCOptions &) override; void rename( const String & new_path_to_db, diff --git a/dbms/src/Storages/Transaction/ApplySnapshot.cpp b/dbms/src/Storages/Transaction/ApplySnapshot.cpp index 9dcd262f79c..15ae985a3d6 100644 --- a/dbms/src/Storages/Transaction/ApplySnapshot.cpp +++ b/dbms/src/Storages/Transaction/ApplySnapshot.cpp @@ -51,7 +51,7 @@ extern const int REGION_DATA_SCHEMA_UPDATED; } // namespace ErrorCodes template -void KVStore::checkAndApplySnapshot(const RegionPtrWrap & new_region, TMTContext & tmt) +void KVStore::checkAndApplyPreHandledSnapshot(const RegionPtrWrap & new_region, TMTContext & tmt) { auto region_id = new_region->id(); auto old_region = getRegion(region_id); @@ -170,6 +170,8 @@ void KVStore::onSnapshot(const RegionPtrWrap & new_region_wrap, RegionPtr old_re { LOG_FMT_INFO(log, "clear region {} old range {} before apply snapshot of new range {}", region_id, old_key_range.toDebugString(), new_key_range.toDebugString()); dm_storage->deleteRange(old_key_range, context.getSettingsRef()); + // We must flush the deletion to the disk here, because we only flush new range when persisting this region later. + dm_storage->flushCache(context, old_key_range, /*try_until_succeed*/ true); } } if constexpr (std::is_same_v) @@ -409,24 +411,24 @@ std::vector KVStore::preHandleSSTsToDTFiles( } template -void KVStore::handlePreApplySnapshot(const RegionPtrWrap & new_region, TMTContext & tmt) +void KVStore::applyPreHandledSnapshot(const RegionPtrWrap & new_region, TMTContext & tmt) { LOG_FMT_INFO(log, "Begin apply snapshot, new_region={}", new_region->toString(true)); Stopwatch watch; SCOPE_EXIT({ GET_METRIC(tiflash_raft_command_duration_seconds, type_apply_snapshot_flush).Observe(watch.elapsedSeconds()); }); - checkAndApplySnapshot(new_region, tmt); + checkAndApplyPreHandledSnapshot(new_region, tmt); FAIL_POINT_PAUSE(FailPoints::pause_until_apply_raft_snapshot); LOG_FMT_INFO(log, "Finish apply snapshot, new_region={}", new_region->toString(false)); } -template void KVStore::handlePreApplySnapshot(const RegionPtrWithSnapshotFiles &, TMTContext &); +template void KVStore::applyPreHandledSnapshot(const RegionPtrWithSnapshotFiles &, TMTContext &); -template void KVStore::checkAndApplySnapshot(const RegionPtrWithBlock &, TMTContext &); -template void KVStore::checkAndApplySnapshot(const RegionPtrWithSnapshotFiles &, TMTContext &); +template void KVStore::checkAndApplyPreHandledSnapshot(const RegionPtrWithBlock &, TMTContext &); +template void KVStore::checkAndApplyPreHandledSnapshot(const RegionPtrWithSnapshotFiles &, TMTContext &); template void KVStore::onSnapshot(const RegionPtrWithBlock &, RegionPtr, UInt64, TMTContext &); template void KVStore::onSnapshot(const RegionPtrWithSnapshotFiles &, RegionPtr, UInt64, TMTContext &); @@ -471,7 +473,8 @@ void KVStore::handleApplySnapshot( TMTContext & tmt) { auto new_region = genRegionPtr(std::move(region), peer_id, index, term); - handlePreApplySnapshot(RegionPtrWithSnapshotFiles{new_region, preHandleSnapshotToFiles(new_region, snaps, index, term, tmt)}, tmt); + auto external_files = preHandleSnapshotToFiles(new_region, snaps, index, term, tmt); + applyPreHandledSnapshot(RegionPtrWithSnapshotFiles{new_region, std::move(external_files)}, tmt); } EngineStoreApplyRes KVStore::handleIngestSST(UInt64 region_id, const SSTViewVec snaps, UInt64 index, UInt64 term, TMTContext & tmt) diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index 0315793e99f..20b6aaf2c1e 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -113,6 +113,9 @@ class KVStore final : private boost::noncopyable bool needFlushRegionData(UInt64 region_id, TMTContext & tmt); bool tryFlushRegionData(UInt64 region_id, bool try_until_succeed, TMTContext & tmt, UInt64 index, UInt64 term); + /** + * Only used in tests. In production we will call preHandleSnapshotToFiles + applyPreHandledSnapshot. + */ void handleApplySnapshot(metapb::Region && region, uint64_t peer_id, SSTViewVec, uint64_t index, uint64_t term, TMTContext & tmt); std::vector preHandleSnapshotToFiles( @@ -122,7 +125,7 @@ class KVStore final : private boost::noncopyable uint64_t term, TMTContext & tmt); template - void handlePreApplySnapshot(const RegionPtrWrap &, TMTContext & tmt); + void applyPreHandledSnapshot(const RegionPtrWrap &, TMTContext & tmt); void handleDestroy(UInt64 region_id, TMTContext & tmt); void setRegionCompactLogConfig(UInt64, UInt64, UInt64); @@ -199,7 +202,7 @@ class KVStore final : private boost::noncopyable TMTContext & tmt); template - void checkAndApplySnapshot(const RegionPtrWrap &, TMTContext & tmt); + void checkAndApplyPreHandledSnapshot(const RegionPtrWrap &, TMTContext & tmt); template void onSnapshot(const RegionPtrWrap &, RegionPtr old_region, UInt64 old_region_index, TMTContext & tmt); diff --git a/dbms/src/Storages/Transaction/ProxyFFI.cpp b/dbms/src/Storages/Transaction/ProxyFFI.cpp index c3baeeb7403..94ff4c735c5 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.cpp +++ b/dbms/src/Storages/Transaction/ProxyFFI.cpp @@ -410,7 +410,7 @@ void ApplyPreHandledSnapshot(EngineStoreServerWrap * server, PreHandledSnapshot auto & kvstore = server->tmt->getKVStore(); if constexpr (std::is_same_v) { - kvstore->handlePreApplySnapshot(RegionPtrWithSnapshotFiles{snap->region, std::move(snap->external_files)}, *server->tmt); + kvstore->applyPreHandledSnapshot(RegionPtrWithSnapshotFiles{snap->region, std::move(snap->external_files)}, *server->tmt); } } catch (...) diff --git a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp index 2ed63481e43..97f447293a1 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp @@ -12,10 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include +#include +#include +#include #include +#include #include #include #include @@ -31,6 +36,20 @@ namespace DB { + +extern void GenMockSSTData(const TiDB::TableInfo & table_info, + TableID table_id, + const String & store_key, + UInt64 start_handle, + UInt64 end_handle, + UInt64 num_fields = 1, + const std::unordered_set & cfs = {ColumnFamilyType::Write, ColumnFamilyType::Default}); + +namespace FailPoints +{ +extern const char skip_check_segment_update[]; +} + namespace RegionBench { extern void setupPutRequest(raft_cmdpb::Request *, const std::string &, const TiKVKey &, const TiKVValue &); @@ -1134,7 +1153,7 @@ TEST_F(RegionKVStoreTest, KVStore) 9, 5, ctx.getTMTContext()); - kvs.checkAndApplySnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); + kvs.checkAndApplyPreHandledSnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); } try { @@ -1145,7 +1164,7 @@ TEST_F(RegionKVStoreTest, KVStore) 9, 5, ctx.getTMTContext()); - kvs.checkAndApplySnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); // overlap, but not tombstone + kvs.checkAndApplyPreHandledSnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); // overlap, but not tombstone ASSERT_TRUE(false); } catch (Exception & e) @@ -1169,7 +1188,7 @@ TEST_F(RegionKVStoreTest, KVStore) 10, 5, ctx.getTMTContext()); - kvs.checkAndApplySnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); + kvs.checkAndApplyPreHandledSnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); ASSERT_TRUE(false); } catch (Exception & e) @@ -1191,7 +1210,7 @@ TEST_F(RegionKVStoreTest, KVStore) 10, 5, ctx.getTMTContext()); - kvs.checkAndApplySnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); // overlap, tombstone, remove previous one + kvs.checkAndApplyPreHandledSnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); // overlap, tombstone, remove previous one auto state = proxy_helper->getRegionLocalState(8192); ASSERT_EQ(state.state(), raft_serverpb::PeerState::Tombstone); @@ -1268,6 +1287,170 @@ TEST_F(RegionKVStoreTest, KVStore) } } + +class ApplySnapshotTest + : public RegionKVStoreTest + , public testing::WithParamInterface +{ +public: + ApplySnapshotTest() + { + ingest_using_split = GetParam(); + } + +protected: + bool ingest_using_split{}; +}; + +INSTANTIATE_TEST_CASE_P( + ByIngestUsingSplit, + ApplySnapshotTest, + testing::Bool()); + +TEST_P(ApplySnapshotTest, WithNewRegionRange) +try +{ + using DM::tests::DMTestEnv; + + create_default_regions(); + auto ctx = TiFlashTestEnv::getGlobalContext(); + auto & kvs = getKVS(); + auto table_id = 101; + auto region_id = 19; + auto region_id_str = std::to_string(region_id); + + auto settings_backup = ctx.getGlobalContext().getSettings(); + ctx.getGlobalContext().getSettingsRef().dt_segment_limit_rows = 50; + if (ingest_using_split) + { + ctx.getGlobalContext().getSettingsRef().dt_segment_delta_small_column_file_size = 50 * 8; + } + FailPointHelper::enableFailPoint(FailPoints::skip_check_segment_update); + SCOPE_EXIT({ + FailPointHelper::disableFailPoint(FailPoints::skip_check_segment_update); + ctx.getGlobalContext().setSettings(settings_backup); + }); + + StorageDeltaMergePtr storage; + { + auto columns = DMTestEnv::getDefaultTableColumns(); + auto table_info = DMTestEnv::getMinimalTableInfo(table_id); + auto astptr = DMTestEnv::getPrimaryKeyExpr("test_table"); + storage = StorageDeltaMerge::create("TiFlash", + "default" /* db_name */, + "test_table" /* table_name */, + table_info, + ColumnsDescription{columns}, + astptr, + 0, + ctx); + storage->startup(); + } + SCOPE_EXIT({ + storage->drop(); + ctx.getTMTContext().getStorages().remove(table_id); + }); + // Initially region_19 range is [0, 10000) + { + auto region = makeRegion(region_id, RecordKVFormat::genKey(table_id, 0), RecordKVFormat::genKey(table_id, 10000)); + GenMockSSTData(DMTestEnv::getMinimalTableInfo(table_id), table_id, region_id_str, 20, 100, 0); + std::vector sst_views{ + SSTView{ + ColumnFamilyType::Write, + BaseBuffView{region_id_str.data(), region_id_str.length()}, + }, + SSTView{ + ColumnFamilyType::Default, + BaseBuffView{region_id_str.data(), region_id_str.length()}, + }, + }; + { + RegionMockTest mock_test(kvstore.get(), region); + + kvs.handleApplySnapshot( + region->getMetaRegion(), + 2, + SSTViewVec{sst_views.data(), sst_views.size()}, + 8, + 5, + ctx.getTMTContext()); + ASSERT_EQ(kvs.getRegion(region_id)->checkIndex(8), true); + } + } + { + if (ingest_using_split) + { + auto stats = storage->getStore()->getStoreStats(); + ASSERT_EQ(3, stats.segment_count); + } + + storage->mergeDelta(ctx); + } + // Later, its range is changed to [20000, 50000) + { + auto region = makeRegion(region_id, RecordKVFormat::genKey(table_id, 20000), RecordKVFormat::genKey(table_id, 50000)); + GenMockSSTData(DMTestEnv::getMinimalTableInfo(table_id), table_id, region_id_str, 20100, 20200, 0); + std::vector sst_views{ + SSTView{ + ColumnFamilyType::Write, + BaseBuffView{region_id_str.data(), region_id_str.length()}, + }, + SSTView{ + ColumnFamilyType::Default, + BaseBuffView{region_id_str.data(), region_id_str.length()}, + }, + }; + { + RegionMockTest mock_test(kvstore.get(), region); + + kvs.handleApplySnapshot( + region->getMetaRegion(), + 2, + SSTViewVec{sst_views.data(), sst_views.size()}, + 9, + 5, + ctx.getTMTContext()); + ASSERT_EQ(kvs.getRegion(region_id)->checkIndex(9), true); + } + } + { + auto stats = storage->getStore()->getStoreStats(); + ASSERT_NE(0, stats.total_stable_size_on_disk); + ASSERT_NE(0, stats.total_rows); + ASSERT_NE(0, stats.total_size); + } + // Finally, the region is migrated out + { + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + request.set_cmd_type(raft_cmdpb::AdminCmdType::ChangePeerV2); + auto meta = kvs.getRegion(region_id)->getMetaRegion(); + meta.mutable_peers()->Clear(); + meta.add_peers()->set_id(3); + *response.mutable_change_peer()->mutable_region() = meta; + kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), raft_cmdpb::AdminResponse(response), region_id, 10, 6, ctx.getTMTContext()); + ASSERT_EQ(kvs.getRegion(region_id), nullptr); + } + { + // After several rounds of GC, everything should be reclaimed. + for (size_t i = 0; i < 10; ++i) + { + storage->onSyncGc(100, DM::GCOptions::newAllForTest()); + } + + auto gc_n = storage->onSyncGc(100, DM::GCOptions::newAllForTest()); + ASSERT_EQ(0, gc_n); + + auto stats = storage->getStore()->getStoreStats(); + ASSERT_EQ(1, stats.segment_count); + ASSERT_EQ(0, stats.total_stable_size_on_disk); + ASSERT_EQ(0, stats.total_rows); + ASSERT_EQ(0, stats.total_size); + } +} +CATCH + + TEST_F(RegionKVStoreTest, KVStoreFailRecovery) { auto ctx = TiFlashTestEnv::getGlobalContext(); diff --git a/tests/sanitize/tsan.suppression b/tests/sanitize/tsan.suppression index 73824caa2b9..537b584e8ff 100644 --- a/tests/sanitize/tsan.suppression +++ b/tests/sanitize/tsan.suppression @@ -1 +1,2 @@ race:dbms/src/Common/TiFlashMetrics.h +race:DB::Context::setCancelTest