From d32bf1023ef85b68e01fc70c802319f1a87e3657 Mon Sep 17 00:00:00 2001 From: jinhelin Date: Mon, 5 Sep 2022 12:32:55 +0800 Subject: [PATCH 1/3] MemoryTracker*: Use std::shared_ptr to manage MemoryTracker. (#5765) ref pingcap/tiflash#5689 --- dbms/src/Common/MemoryTracker.h | 20 ++++- .../tests/gtest_dynamic_thread_pool.cpp | 12 +-- dbms/src/Common/tests/gtest_memtracker.cpp | 84 +++++++++---------- dbms/src/Interpreters/ProcessList.cpp | 18 ++-- dbms/src/Interpreters/ProcessList.h | 14 ++-- .../src/Storages/BackgroundProcessingPool.cpp | 6 +- .../DeltaMerge/SegmentReadTaskPool.cpp | 4 +- .../Storages/DeltaMerge/SegmentReadTaskPool.h | 5 +- .../Storages/Page/workload/PSBackground.cpp | 6 +- .../src/Storages/Page/workload/PSRunnable.cpp | 6 +- 10 files changed, 99 insertions(+), 76 deletions(-) diff --git a/dbms/src/Common/MemoryTracker.h b/dbms/src/Common/MemoryTracker.h index 0d56cb5fcf4..161566b758d 100644 --- a/dbms/src/Common/MemoryTracker.h +++ b/dbms/src/Common/MemoryTracker.h @@ -25,6 +25,9 @@ namespace CurrentMetrics extern const Metric MemoryTracking; } +class MemoryTracker; +using MemoryTrackerPtr = std::shared_ptr; + /** Tracks memory consumption. * It throws an exception if amount of consumed memory become greater than certain limit. * The same memory tracker could be simultaneously used in different threads. @@ -51,12 +54,26 @@ class MemoryTracker : public std::enable_shared_from_this /// This description will be used as prefix into log messages (if isn't nullptr) const char * description = nullptr; -public: + /// Make constructors private to ensure all objects of this class is created by `MemoryTracker::create`. MemoryTracker() = default; explicit MemoryTracker(Int64 limit_) : limit(limit_) {} +public: + /// Using `std::shared_ptr` and `new` instread of `std::make_shared` is because `std::make_shared` cannot call private constructors. + static MemoryTrackerPtr create(Int64 limit = 0) + { + if (limit == 0) + { + return std::shared_ptr(new MemoryTracker); + } + else + { + return std::shared_ptr(new MemoryTracker(limit)); + } + } + ~MemoryTracker(); /** Call the following functions before calling of corresponding operations with memory allocators. @@ -101,7 +118,6 @@ class MemoryTracker : public std::enable_shared_from_this void logPeakMemoryUsage() const; }; - /** The MemoryTracker object is quite difficult to pass to all places where significant amounts of memory are allocated. * Therefore, a thread-local pointer to used MemoryTracker is set, or nullptr if MemoryTracker does not need to be used. * This pointer is set when memory consumption is monitored in current thread. diff --git a/dbms/src/Common/tests/gtest_dynamic_thread_pool.cpp b/dbms/src/Common/tests/gtest_dynamic_thread_pool.cpp index 82a53611e5b..1e8ef8ab910 100644 --- a/dbms/src/Common/tests/gtest_dynamic_thread_pool.cpp +++ b/dbms/src/Common/tests/gtest_dynamic_thread_pool.cpp @@ -128,9 +128,11 @@ CATCH TEST_F(DynamicThreadPoolTest, testMemoryTracker) try { - MemoryTracker t0, t1, t2; + auto t0 = MemoryTracker::create(); + auto t1 = MemoryTracker::create(); + auto t2 = MemoryTracker::create(); - current_memory_tracker = &t2; + current_memory_tracker = t2.get(); auto getter = [] { return current_memory_tracker; @@ -145,18 +147,18 @@ try auto f = pool.schedule(false, getter); ASSERT_EQ(f.get(), nullptr); - auto f0 = pool.schedule(false, setter, &t0); + auto f0 = pool.schedule(false, setter, t0.get()); f0.wait(); auto f1 = pool.schedule(false, getter); // f0 didn't pollute memory_tracker ASSERT_EQ(f1.get(), nullptr); - current_memory_tracker = &t1; + current_memory_tracker = t1.get(); auto f2 = pool.schedule(true, getter); // set propagate = true and it did propagate - ASSERT_EQ(f2.get(), &t1); + ASSERT_EQ(f2.get(), t1.get()); auto f3 = pool.schedule(false, getter); // set propagate = false and it didn't propagate diff --git a/dbms/src/Common/tests/gtest_memtracker.cpp b/dbms/src/Common/tests/gtest_memtracker.cpp index d31e7b42df4..9183c0e7b44 100644 --- a/dbms/src/Common/tests/gtest_memtracker.cpp +++ b/dbms/src/Common/tests/gtest_memtracker.cpp @@ -26,93 +26,93 @@ class MemTrackerTest : public ::testing::Test TEST_F(MemTrackerTest, testBasic) try { - MemoryTracker mem_tracker; - mem_tracker.alloc(1024); - ASSERT_EQ(1024, mem_tracker.get()); - mem_tracker.free(1024); - ASSERT_EQ(0, mem_tracker.get()); + auto mem_tracker = MemoryTracker::create(); + mem_tracker->alloc(1024); + ASSERT_EQ(1024, mem_tracker->get()); + mem_tracker->free(1024); + ASSERT_EQ(0, mem_tracker->get()); } CATCH TEST_F(MemTrackerTest, testRootAndChild) try { - MemoryTracker root_mem_tracker; - MemoryTracker child_mem_tracker(512); - child_mem_tracker.setNext(&root_mem_tracker); + auto root_mem_tracker = MemoryTracker::create(); + auto child_mem_tracker = MemoryTracker::create(512); + child_mem_tracker->setNext(root_mem_tracker.get()); // alloc 500 - child_mem_tracker.alloc(500); - ASSERT_EQ(500, child_mem_tracker.get()); - ASSERT_EQ(500, root_mem_tracker.get()); + child_mem_tracker->alloc(500); + ASSERT_EQ(500, child_mem_tracker->get()); + ASSERT_EQ(500, root_mem_tracker->get()); // alloc 256 base on 500 bool has_err = false; try { - child_mem_tracker.alloc(256); //500 + 256 > limit(512) + child_mem_tracker->alloc(256); //500 + 256 > limit(512) } catch (...) { has_err = true; } ASSERT_TRUE(has_err); - ASSERT_EQ(500, child_mem_tracker.get()); - ASSERT_EQ(500, root_mem_tracker.get()); + ASSERT_EQ(500, child_mem_tracker->get()); + ASSERT_EQ(500, root_mem_tracker->get()); //free 500 - child_mem_tracker.free(500); - ASSERT_EQ(0, child_mem_tracker.get()); - ASSERT_EQ(0, root_mem_tracker.get()); + child_mem_tracker->free(500); + ASSERT_EQ(0, child_mem_tracker->get()); + ASSERT_EQ(0, root_mem_tracker->get()); } CATCH TEST_F(MemTrackerTest, testRootAndMultipleChild) try { - MemoryTracker root(512); // limit 512 - MemoryTracker child1(512); // limit 512 - MemoryTracker child2(512); // limit 512 - child1.setNext(&root); - child2.setNext(&root); + auto root = MemoryTracker::create(512); // limit 512 + auto child1 = MemoryTracker::create(512); // limit 512 + auto child2 = MemoryTracker::create(512); // limit 512 + child1->setNext(root.get()); + child2->setNext(root.get()); // alloc 500 on child1 - child1.alloc(500); - ASSERT_EQ(500, child1.get()); - ASSERT_EQ(0, child2.get()); - ASSERT_EQ(500, root.get()); + child1->alloc(500); + ASSERT_EQ(500, child1->get()); + ASSERT_EQ(0, child2->get()); + ASSERT_EQ(500, root->get()); // alloc 500 on child2, should fail bool has_err = false; try { - child2.alloc(500); // root will throw error because of "out of quota" + child2->alloc(500); // root will throw error because of "out of quota" } catch (...) { has_err = true; } ASSERT_TRUE(has_err); - ASSERT_EQ(500, child1.get()); - ASSERT_EQ(0, child2.get()); - ASSERT_EQ(500, root.get()); + ASSERT_EQ(500, child1->get()); + ASSERT_EQ(0, child2->get()); + ASSERT_EQ(500, root->get()); // alloc 10 on child2 - child2.alloc(10); - ASSERT_EQ(500, child1.get()); - ASSERT_EQ(10, child2.get()); - ASSERT_EQ(510, root.get()); + child2->alloc(10); + ASSERT_EQ(500, child1->get()); + ASSERT_EQ(10, child2->get()); + ASSERT_EQ(510, root->get()); // free 500 on child1 - child1.free(500); - ASSERT_EQ(0, child1.get()); - ASSERT_EQ(10, child2.get()); - ASSERT_EQ(10, root.get()); + child1->free(500); + ASSERT_EQ(0, child1->get()); + ASSERT_EQ(10, child2->get()); + ASSERT_EQ(10, root->get()); // free 10 on child2 - child2.free(10); - ASSERT_EQ(0, child1.get()); - ASSERT_EQ(0, child2.get()); - ASSERT_EQ(0, root.get()); + child2->free(10); + ASSERT_EQ(0, child1->get()); + ASSERT_EQ(0, child2->get()); + ASSERT_EQ(0, root->get()); } CATCH diff --git a/dbms/src/Interpreters/ProcessList.cpp b/dbms/src/Interpreters/ProcessList.cpp index 5e50d560bb0..d0ec463eada 100644 --- a/dbms/src/Interpreters/ProcessList.cpp +++ b/dbms/src/Interpreters/ProcessList.cpp @@ -153,18 +153,18 @@ ProcessList::EntryPtr ProcessList::insert( /// setting from one query effectively sets values for all other queries. /// Track memory usage for all simultaneously running queries from single user. - user_process_list.user_memory_tracker.setOrRaiseLimit(settings.max_memory_usage_for_user); - user_process_list.user_memory_tracker.setDescription("(for user)"); - current_memory_tracker->setNext(&user_process_list.user_memory_tracker); + user_process_list.user_memory_tracker->setOrRaiseLimit(settings.max_memory_usage_for_user); + user_process_list.user_memory_tracker->setDescription("(for user)"); + current_memory_tracker->setNext(user_process_list.user_memory_tracker.get()); /// Track memory usage for all simultaneously running queries. /// You should specify this value in configuration for default profile, /// not for specific users, sessions or queries, /// because this setting is effectively global. - total_memory_tracker.setOrRaiseLimit(settings.max_memory_usage_for_all_queries); - total_memory_tracker.setBytesThatRssLargerThanLimit(settings.bytes_that_rss_larger_than_limit); - total_memory_tracker.setDescription("(total)"); - user_process_list.user_memory_tracker.setNext(&total_memory_tracker); + total_memory_tracker->setOrRaiseLimit(settings.max_memory_usage_for_all_queries); + total_memory_tracker->setBytesThatRssLargerThanLimit(settings.bytes_that_rss_larger_than_limit); + total_memory_tracker->setDescription("(total)"); + user_process_list.user_memory_tracker->setNext(total_memory_tracker.get()); } if (!total_network_throttler && settings.max_network_bandwidth_for_all_users) @@ -244,8 +244,8 @@ ProcessListEntry::~ProcessListEntry() if (parent.cur_size == 0) { /// Reset MemoryTracker, similarly (see above). - parent.total_memory_tracker.logPeakMemoryUsage(); - parent.total_memory_tracker.reset(); + parent.total_memory_tracker->logPeakMemoryUsage(); + parent.total_memory_tracker->reset(); parent.total_network_throttler.reset(); } } diff --git a/dbms/src/Interpreters/ProcessList.h b/dbms/src/Interpreters/ProcessList.h index e4e91b56517..4e1ab90ec85 100644 --- a/dbms/src/Interpreters/ProcessList.h +++ b/dbms/src/Interpreters/ProcessList.h @@ -80,7 +80,7 @@ class ProcessListElement /// Progress of output stream Progress progress_out; - std::shared_ptr memory_tracker; + MemoryTrackerPtr memory_tracker; QueryPriorities::Handle priority_handle; @@ -115,7 +115,7 @@ class ProcessListElement QueryPriorities::Handle && priority_handle_) : query(query_) , client_info(client_info_) - , memory_tracker(std::make_shared(max_memory_usage)) + , memory_tracker(MemoryTracker::create(max_memory_usage)) , priority_handle(std::move(priority_handle_)) { memory_tracker->setDescription("(for query)"); @@ -205,18 +205,21 @@ struct ProcessListForUser QueryToElement queries; /// Limit and counter for memory of all simultaneously running queries of single user. - MemoryTracker user_memory_tracker; + MemoryTrackerPtr user_memory_tracker; /// Count network usage for all simultaneously running queries of single user. ThrottlerPtr user_throttler; + ProcessListForUser() + : user_memory_tracker(MemoryTracker::create()) + {} /// Clears MemoryTracker for the user. /// Sometimes it is important to reset the MemoryTracker, because it may accumulate skew /// due to the fact that there are cases when memory can be allocated while processing the query, but released later. /// Clears network bandwidth Throttler, so it will not count periods of inactivity. void reset() { - user_memory_tracker.reset(); + user_memory_tracker->reset(); if (user_throttler) user_throttler.reset(); } @@ -281,7 +284,7 @@ class ProcessList QueryPriorities priorities; /// Limit and counter for memory of all simultaneously running queries. - MemoryTracker total_memory_tracker; + MemoryTrackerPtr total_memory_tracker; /// Limit network bandwidth for all users ThrottlerPtr total_network_throttler; @@ -293,6 +296,7 @@ class ProcessList ProcessList(size_t max_size_ = 0) : cur_size(0) , max_size(max_size_) + , total_memory_tracker(MemoryTracker::create()) {} using EntryPtr = std::shared_ptr; diff --git a/dbms/src/Storages/BackgroundProcessingPool.cpp b/dbms/src/Storages/BackgroundProcessingPool.cpp index 45ba032bf53..644e8d05b1a 100644 --- a/dbms/src/Storages/BackgroundProcessingPool.cpp +++ b/dbms/src/Storages/BackgroundProcessingPool.cpp @@ -152,9 +152,9 @@ void BackgroundProcessingPool::threadFunction() addThreadId(getTid()); } - MemoryTracker memory_tracker; - memory_tracker.setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool); - current_memory_tracker = &memory_tracker; + auto memory_tracker = MemoryTracker::create(); + memory_tracker->setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool); + current_memory_tracker = memory_tracker.get(); pcg64 rng(randomSeed()); std::this_thread::sleep_for(std::chrono::duration(std::uniform_real_distribution(0, sleep_seconds_random_part)(rng))); diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index 88dc0742c89..431cc9fb1a3 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -97,7 +97,7 @@ SegmentReadTasks SegmentReadTask::trySplitReadTasks(const SegmentReadTasks & tas BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t) { - MemoryTrackerSetter setter(true, mem_tracker); + MemoryTrackerSetter setter(true, mem_tracker.get()); auto seg = t->segment; BlockInputStreamPtr stream; if (is_raw) @@ -179,7 +179,7 @@ std::unordered_map>::const_iterator SegmentReadT bool SegmentReadTaskPool::readOneBlock(BlockInputStreamPtr & stream, const SegmentPtr & seg) { - MemoryTrackerSetter setter(true, mem_tracker); + MemoryTrackerSetter setter(true, mem_tracker.get()); auto block = stream->read(); if (block) { diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index e87099a22b4..c878543dda6 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -139,7 +139,7 @@ class SegmentReadTaskPool : private boost::noncopyable , log(&Poco::Logger::get("SegmentReadTaskPool")) , unordered_input_stream_ref_count(0) , exception_happened(false) - , mem_tracker(current_memory_tracker) + , mem_tracker(current_memory_tracker == nullptr ? nullptr : current_memory_tracker->shared_from_this()) {} ~SegmentReadTaskPool() @@ -223,7 +223,8 @@ class SegmentReadTaskPool : private boost::noncopyable std::atomic exception_happened; DB::Exception exception; - MemoryTracker * mem_tracker; + // The memory tracker of MPPTask. + MemoryTrackerPtr mem_tracker; inline static std::atomic pool_id_gen{1}; inline static BlockStat global_blk_stat; diff --git a/dbms/src/Storages/Page/workload/PSBackground.cpp b/dbms/src/Storages/Page/workload/PSBackground.cpp index 247bea23dcc..39ed7476c6b 100644 --- a/dbms/src/Storages/Page/workload/PSBackground.cpp +++ b/dbms/src/Storages/Page/workload/PSBackground.cpp @@ -51,9 +51,9 @@ void PSGc::doGcOnce() gc_stop_watch.start(); try { - MemoryTracker tracker; - tracker.setDescription("(Stress Test GC)"); - current_memory_tracker = &tracker; + auto tracker = MemoryTracker::create(); + tracker->setDescription("(Stress Test GC)"); + current_memory_tracker = tracker.get(); ps->gc(); current_memory_tracker = nullptr; } diff --git a/dbms/src/Storages/Page/workload/PSRunnable.cpp b/dbms/src/Storages/Page/workload/PSRunnable.cpp index 5e9774ccc99..545e40d46a6 100644 --- a/dbms/src/Storages/Page/workload/PSRunnable.cpp +++ b/dbms/src/Storages/Page/workload/PSRunnable.cpp @@ -29,9 +29,9 @@ namespace DB::PS::tests void PSRunnable::run() try { - MemoryTracker tracker; - tracker.setDescription(nullptr); - current_memory_tracker = &tracker; + auto tracker = MemoryTracker::create(); + tracker->setDescription(nullptr); + current_memory_tracker = tracker.get(); // If runImpl() return false, means it need break itself while (StressEnvStatus::getInstance().isRunning() && runImpl()) { From f4a3ea0ecf4d9f9b6465f76f1d9d37c0dcb66bbf Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 5 Sep 2022 14:00:55 +0800 Subject: [PATCH 2/3] doc: update window function design doc (#5780) close pingcap/tiflash#5781 --- docs/design/2022-05-12-window-function.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/design/2022-05-12-window-function.md b/docs/design/2022-05-12-window-function.md index d04ab5edba9..400db878ca2 100644 --- a/docs/design/2022-05-12-window-function.md +++ b/docs/design/2022-05-12-window-function.md @@ -94,7 +94,7 @@ Children task's cost *= 0.05 ### The Logic of Window Function in TiFlash #### Major additions - IWindowFunction - Defines execution functions of a window function(row_number, rank, dense_rank). + Defines execution functions of a window function(row_number, rank, dense_rank, lead, lag). - WindowDescription Define a window, including the name, frame, functions of a window. - WindowBlockInputStream @@ -125,7 +125,7 @@ Children task's cost *= 0.05 ##### Flow Chart ![tiflash-window-function-flow-chart](./images/tiflash-window-function-flow-chart.jpg) ### Support and Limit -- Compared with TiDB, currently only row_number, rank, dense_rank functions are supported. +- Compared with TiDB, currently only row_number, rank, dense_rank, lead, lag functions are supported. | Feature | support | | ----- | ----- | @@ -136,7 +136,7 @@ Children task's cost *= 0.05 | INTERVAL syntax for DateTime RANGE OFFSET frame | Not supported | | Calculating aggregate functions over a frame | Not supported | | rank(), dense_rank(), row_number() | Supported | -| lag/lead(value, offset, default) | Not supported | +| lag/lead(value, offset, default) | Supported | - Compared with TiDB or MySQL, the result may have some differences. For example, compare the calculation results of TiFlash and TiDB: From 4752b4bc1eb37b212fe3d03aef3ff024af3a2d77 Mon Sep 17 00:00:00 2001 From: JaySon Date: Mon, 5 Sep 2022 14:42:55 +0800 Subject: [PATCH 3/3] PageStorage: Fine grained lock on external callbacks (#5699) close pingcap/tiflash#5697 --- dbms/src/Databases/test/gtest_database.cpp | 36 +++-- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 119 +++++++++++----- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 4 +- dbms/src/Storages/DeltaMerge/File/DMFile.cpp | 7 +- dbms/src/Storages/DeltaMerge/StoragePool.cpp | 6 +- .../tests/gtest_dm_delta_merge_store.cpp | 127 +++++++++++++++++- .../gtest_dm_delta_merge_store_test_basic.h | 6 +- .../tests/gtest_dm_storage_delta_merge.cpp | 9 +- dbms/src/Storages/Page/PageStorage.h | 5 +- .../V3/PageDirectory/ExternalIdsByNamespace.h | 4 + dbms/src/Storages/Page/V3/PageStorageImpl.cpp | 75 ++++++++--- dbms/src/Storages/Page/V3/PageStorageImpl.h | 5 +- .../Page/V3/tests/gtest_page_storage.cpp | 102 ++++++++++++++ dbms/src/Storages/PathPool.cpp | 54 ++------ dbms/src/Storages/PathPool.h | 2 +- dbms/src/Storages/StorageDeltaMerge.cpp | 75 +++-------- tests/delta-merge-test/ddl/alter.test | 16 --- .../raft/schema/partition_table_restart.test | 40 ++---- .../raft/schema/rename_on_read.test | 38 ------ .../raft/schema/rename_on_write.test | 46 ------- .../raft/schema/rename_tables.test | 45 ------- .../raft/txn_mock/partition_table.test | 17 +-- 22 files changed, 457 insertions(+), 381 deletions(-) delete mode 100644 tests/delta-merge-test/raft/schema/rename_on_read.test delete mode 100644 tests/delta-merge-test/raft/schema/rename_on_write.test delete mode 100644 tests/delta-merge-test/raft/schema/rename_tables.test diff --git a/dbms/src/Databases/test/gtest_database.cpp b/dbms/src/Databases/test/gtest_database.cpp index 6b8bbc17348..b469a571570 100644 --- a/dbms/src/Databases/test/gtest_database.cpp +++ b/dbms/src/Databases/test/gtest_database.cpp @@ -273,18 +273,15 @@ try EXPECT_EQ(managed_storage->getDatabaseName(), db_name); } - const String to_tbl_name = "t_112"; + const String to_tbl_display_name = "tbl_test"; { // Rename table - typeid_cast(db.get())->renameTable(ctx, tbl_name, *db, to_tbl_name, db_name, to_tbl_name); + typeid_cast(db.get())->renameTable(ctx, tbl_name, *db, tbl_name, db_name, to_tbl_display_name); - auto old_storage = db->tryGetTable(ctx, tbl_name); - ASSERT_EQ(old_storage, nullptr); - - auto storage = db->tryGetTable(ctx, to_tbl_name); + auto storage = db->tryGetTable(ctx, tbl_name); ASSERT_NE(storage, nullptr); EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name); - EXPECT_EQ(storage->getTableName(), to_tbl_name); + EXPECT_EQ(storage->getTableName(), tbl_name); auto managed_storage = std::dynamic_pointer_cast(storage); EXPECT_EQ(managed_storage->getDatabaseName(), db_name); @@ -294,13 +291,13 @@ try // Drop table auto drop_query = std::make_shared(); drop_query->database = db_name; - drop_query->table = to_tbl_name; + drop_query->table = tbl_name; drop_query->if_exists = false; ASTPtr ast_drop_query = drop_query; InterpreterDropQuery drop_interpreter(ast_drop_query, ctx); drop_interpreter.execute(); - auto storage = db->tryGetTable(ctx, to_tbl_name); + auto storage = db->tryGetTable(ctx, tbl_name); ASSERT_EQ(storage, nullptr); } @@ -391,18 +388,18 @@ try EXPECT_EQ(managed_storage->getDatabaseName(), db_name); } - const String to_tbl_name = "t_112"; + const String to_tbl_display_name = "tbl_test"; { // Rename table - typeid_cast(db.get())->renameTable(ctx, tbl_name, *db2, to_tbl_name, db2_name, to_tbl_name); + typeid_cast(db.get())->renameTable(ctx, tbl_name, *db2, tbl_name, db2_name, to_tbl_display_name); auto old_storage = db->tryGetTable(ctx, tbl_name); ASSERT_EQ(old_storage, nullptr); - auto storage = db2->tryGetTable(ctx, to_tbl_name); + auto storage = db2->tryGetTable(ctx, tbl_name); ASSERT_NE(storage, nullptr); EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name); - EXPECT_EQ(storage->getTableName(), to_tbl_name); + EXPECT_EQ(storage->getTableName(), tbl_name); auto managed_storage = std::dynamic_pointer_cast(storage); EXPECT_EQ(managed_storage->getDatabaseName(), db2_name); @@ -412,13 +409,13 @@ try // Drop table auto drop_query = std::make_shared(); drop_query->database = db2_name; - drop_query->table = to_tbl_name; + drop_query->table = tbl_name; drop_query->if_exists = false; ASTPtr ast_drop_query = drop_query; InterpreterDropQuery drop_interpreter(ast_drop_query, ctx); drop_interpreter.execute(); - auto storage = db2->tryGetTable(ctx, to_tbl_name); + auto storage = db2->tryGetTable(ctx, tbl_name); ASSERT_EQ(storage, nullptr); } @@ -501,18 +498,17 @@ try EXPECT_FALSE(db->empty(ctx)); EXPECT_TRUE(db->isTableExist(ctx, tbl_name)); - const String to_tbl_name = "t_112"; // Rename table to another database, and mock crash by failed point FailPointHelper::enableFailPoint(FailPoints::exception_before_rename_table_old_meta_removed); ASSERT_THROW( - typeid_cast(db.get())->renameTable(ctx, tbl_name, *db2, to_tbl_name, db2_name, to_tbl_name), + typeid_cast(db.get())->renameTable(ctx, tbl_name, *db2, tbl_name, db2_name, tbl_name), DB::Exception); { // After fail point triggled we should have both meta file in disk Poco::File old_meta_file{db->getTableMetadataPath(tbl_name)}; ASSERT_TRUE(old_meta_file.exists()); - Poco::File new_meta_file(db2->getTableMetadataPath(to_tbl_name)); + Poco::File new_meta_file(db2->getTableMetadataPath(tbl_name)); ASSERT_TRUE(new_meta_file.exists()); // Old table should remain in db auto old_storage = db->tryGetTable(ctx, tbl_name); @@ -527,10 +523,10 @@ try ThreadPool thread_pool(2); db2->loadTables(ctx, &thread_pool, true); - Poco::File new_meta_file(db2->getTableMetadataPath(to_tbl_name)); + Poco::File new_meta_file(db2->getTableMetadataPath(tbl_name)); ASSERT_FALSE(new_meta_file.exists()); - auto storage = db2->tryGetTable(ctx, to_tbl_name); + auto storage = db2->tryGetTable(ctx, tbl_name); ASSERT_EQ(storage, nullptr); } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 523960e52a6..83e81b6017c 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -21,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -41,6 +43,7 @@ #include #include +#include namespace ProfileEvents { @@ -196,7 +199,7 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context, size_t rowkey_column_size_, const Settings & settings_) : global_context(db_context.getGlobalContext()) - , path_pool(global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name)) + , path_pool(std::make_shared(global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name))) , settings(settings_) , db_name(db_name_) , table_name(table_name_) @@ -216,7 +219,7 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context, storage_pool = std::make_shared(global_context, ns_id, - path_pool, + *path_pool, db_name_ + "." + table_name_); // Restore existing dm files and set capacity for path_pool. @@ -296,25 +299,46 @@ DeltaMergeStore::~DeltaMergeStore() void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) { + // Callbacks for cleaning outdated DTFiles. Note that there is a chance + // that callbacks is called after the `DeltaMergeStore` dropped, we must + // make the callbacks safe. ExternalPageCallbacks callbacks; - // V2 callbacks for cleaning DTFiles - callbacks.scanner = [this]() { + callbacks.ns_id = storage_pool->getNamespaceId(); + callbacks.scanner = [path_pool_weak_ref = std::weak_ptr(path_pool), file_provider = global_context.getFileProvider()]() { ExternalPageCallbacks::PathAndIdsVec path_and_ids_vec; - auto delegate = path_pool.getStableDiskDelegator(); + + // If the StoragePathPool is invalid, meaning we call `scanner` after dropping the table, + // simply return an empty list is OK. + auto path_pool = path_pool_weak_ref.lock(); + if (!path_pool) + return path_and_ids_vec; + + // Return the DTFiles on disks. + auto delegate = path_pool->getStableDiskDelegator(); + // Only return the DTFiles can be GC. The page id of not able to be GC files, which is being ingested or in the middle of + // SegmentSplit/Merge/MergeDelta, is not yet applied + // to PageStorage is marked as not able to be GC, so we don't return them and run the `remover` DMFile::ListOptions options; options.only_list_can_gc = true; for (auto & root_path : delegate.listPaths()) { - auto & path_and_ids = path_and_ids_vec.emplace_back(); - path_and_ids.first = root_path; - auto file_ids_in_current_path = DMFile::listAllInPath(global_context.getFileProvider(), root_path, options); - for (auto id : file_ids_in_current_path) - path_and_ids.second.insert(id); + std::set ids_under_path; + auto file_ids_in_current_path = DMFile::listAllInPath(file_provider, root_path, options); + path_and_ids_vec.emplace_back(root_path, std::move(file_ids_in_current_path)); } return path_and_ids_vec; }; - callbacks.remover = [this](const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set & valid_ids) { - auto delegate = path_pool.getStableDiskDelegator(); + callbacks.remover = [path_pool_weak_ref = std::weak_ptr(path_pool), // + file_provider = global_context.getFileProvider(), + logger = log](const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set & valid_ids) { + // If the StoragePathPool is invalid, meaning we call `remover` after dropping the table, + // simply skip is OK. + auto path_pool = path_pool_weak_ref.lock(); + if (!path_pool) + return; + + SYNC_FOR("before_DeltaMergeStore::callbacks_remover_remove"); + auto delegate = path_pool->getStableDiskDelegator(); for (const auto & [path, ids] : path_and_ids_vec) { for (auto id : ids) @@ -323,18 +347,50 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) continue; // Note that page_id is useless here. - auto dmfile = DMFile::restore(global_context.getFileProvider(), id, /* page_id= */ 0, path, DMFile::ReadMetaMode::none()); - if (dmfile->canGC()) + auto dmfile = DMFile::restore(file_provider, id, /* page_id= */ 0, path, DMFile::ReadMetaMode::none()); + if (unlikely(!dmfile)) { - delegate.removeDTFile(dmfile->fileId()); - dmfile->remove(global_context.getFileProvider()); + // If the dtfile directory is not exist, it means `StoragePathPool::drop` have been + // called in another thread. Just try to clean if any id is left. + try + { + delegate.removeDTFile(id); + } + catch (DB::Exception & e) + { + // just ignore + } + LOG_FMT_INFO(logger, + "GC try remove useless DM file, but file not found and may have been removed, dmfile={}", + DMFile::getPathByStatus(path, id, DMFile::Status::READABLE)); + } + else if (dmfile->canGC()) + { + // StoragePathPool::drop may be called concurrently, ignore and continue next file if any exception thrown + String err_msg; + try + { + // scanner should only return dtfiles that can GC, + // just another check here. + delegate.removeDTFile(dmfile->fileId()); + dmfile->remove(file_provider); + } + catch (DB::Exception & e) + { + err_msg = e.message(); + } + catch (Poco::Exception & e) + { + err_msg = e.message(); + } + if (err_msg.empty()) + LOG_FMT_INFO(logger, "GC removed useless DM file, dmfile={}", dmfile->path()); + else + LOG_FMT_INFO(logger, "GC try remove useless DM file, but error happen, dmfile={}, err_msg={}", dmfile->path(), err_msg); } - - LOG_FMT_INFO(log, "GC removed useless dmfile: {}", dmfile->path()); } } }; - callbacks.ns_id = storage_pool->getNamespaceId(); // remember to unregister it when shutdown storage_pool->dataRegisterExternalPagesCallbacks(callbacks); storage_pool->enableGC(); @@ -355,20 +411,9 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) blockable_background_pool_handle->wake(); } -void DeltaMergeStore::rename(String /*new_path*/, bool clean_rename, String new_database_name, String new_table_name) +void DeltaMergeStore::rename(String /*new_path*/, String new_database_name, String new_table_name) { - if (clean_rename) - { - path_pool.rename(new_database_name, new_table_name, clean_rename); - } - else - { - LOG_FMT_WARNING(log, "Applying heavy renaming for table {}.{} to {}.{}", db_name, table_name, new_database_name, new_table_name); - - // Remove all background task first - shutdown(); - path_pool.rename(new_database_name, new_table_name, clean_rename); // rename for multi-disk - } + path_pool->rename(new_database_name, new_table_name); // TODO: replacing these two variables is not atomic, but could be good enough? table_name.swap(new_table_name); @@ -465,7 +510,7 @@ void DeltaMergeStore::drop() storage_pool->drop(); // Drop data in storage path pool - path_pool.drop(/*recursive=*/true, /*must_success=*/false); + path_pool->drop(/*recursive=*/true, /*must_success=*/false); LOG_FMT_INFO(log, "Drop DeltaMerge done [{}.{}]", db_name, table_name); } @@ -496,7 +541,7 @@ DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB: // Because db_context could be a temporary object and won't last long enough during the query process. // Like the context created by InterpreterSelectWithUnionQuery. auto * ctx = new DMContext(db_context.getGlobalContext(), - path_pool, + *path_pool, *storage_pool, latest_gc_safe_point.load(std::memory_order_acquire), settings.not_compress_columns, @@ -704,7 +749,7 @@ std::tuple DeltaMergeStore::preAllocateIngestFile() if (shutdown_called.load(std::memory_order_relaxed)) return {}; - auto delegator = path_pool.getStableDiskDelegator(); + auto delegator = path_pool->getStableDiskDelegator(); auto parent_path = delegator.choosePath(); auto new_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); return {parent_path, new_id}; @@ -715,7 +760,7 @@ void DeltaMergeStore::preIngestFile(const String & parent_path, const PageId fil if (shutdown_called.load(std::memory_order_relaxed)) return; - auto delegator = path_pool.getStableDiskDelegator(); + auto delegator = path_pool->getStableDiskDelegator(); delegator.addDTFile(file_id, file_size, parent_path); } @@ -2529,7 +2574,7 @@ void DeltaMergeStore::restoreStableFiles() options.only_list_can_gc = false; // We need all files to restore the bytes on disk options.clean_up = true; auto file_provider = global_context.getFileProvider(); - auto path_delegate = path_pool.getStableDiskDelegator(); + auto path_delegate = path_pool->getStableDiskDelegator(); for (const auto & root_path : path_delegate.listPaths()) { for (const auto & file_id : DMFile::listAllInPath(file_provider, root_path, options)) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 769786ce070..3dc9fca4f08 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -314,7 +314,7 @@ class DeltaMergeStore : private boost::noncopyable return table_name; } - void rename(String new_path, bool clean_rename, String new_database_name, String new_table_name); + void rename(String new_path, String new_database_name, String new_table_name); void clearData(); @@ -529,7 +529,7 @@ class DeltaMergeStore : private boost::noncopyable #endif Context & global_context; - StoragePathPool path_pool; + std::shared_ptr path_pool; Settings settings; StoragePoolPtr storage_pool; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp index fe984ad519f..e05d98a15bd 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp @@ -151,7 +151,12 @@ DMFilePtr DMFile::restore( const ReadMetaMode & read_meta_mode) { String path = getPathByStatus(parent_path, file_id, DMFile::Status::READABLE); - bool single_file_mode = Poco::File(path).isFile(); + // The path may be dropped by another thread in some cases + auto poco_file = Poco::File(path); + if (!poco_file.exists()) + return nullptr; + + bool single_file_mode = poco_file.isFile(); DMFilePtr dmfile(new DMFile( file_id, page_id, diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool.cpp index c9ba3943a60..fbe6ab681d8 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool.cpp @@ -135,7 +135,7 @@ FileUsageStatistics GlobalStoragePool::getLogFileUsage() const bool GlobalStoragePool::gc() { - return gc(global_context.getSettingsRef(), true, DELTA_MERGE_GC_PERIOD); + return gc(global_context.getSettingsRef(), /*immediately=*/true, DELTA_MERGE_GC_PERIOD); } bool GlobalStoragePool::gc(const Settings & settings, bool immediately, const Seconds & try_gc_period) @@ -445,7 +445,7 @@ PageStorageRunMode StoragePool::restore() } else { - LOG_FMT_INFO(logger, "Current pool.meta translate already done before restored [ns_id={}] ", ns_id); + LOG_FMT_INFO(logger, "Current pool.meta transform already done before restored [ns_id={}] ", ns_id); } if (const auto & data_remain_pages = data_storage_v2->getNumberOfPages(); data_remain_pages != 0) @@ -461,7 +461,7 @@ PageStorageRunMode StoragePool::restore() } else { - LOG_FMT_INFO(logger, "Current pool.data translate already done before restored [ns_id={}]", ns_id); + LOG_FMT_INFO(logger, "Current pool.data transform already done before restored [ns_id={}]", ns_id); } // Check number of valid pages in v2 diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 471d59f760c..f3d1daa739a 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -17,19 +17,26 @@ #include #include #include +#include #include +#include #include #include +#include #include #include #include #include +#include +#include #include #include #include #include +#include "Storages/DeltaMerge/RowKeyRange.h" + namespace DB { namespace FailPoints @@ -84,8 +91,126 @@ try // check column structure of store const auto & cols = store->getTableColumns(); // version & tag column added - ASSERT_EQ(cols.size(), 3UL); + ASSERT_EQ(cols.size(), 3); + } +} +CATCH + +TEST_F(DeltaMergeStoreTest, DroppedInMiddleDTFileGC) +try +{ + // create table + ASSERT_NE(store, nullptr); + + auto global_page_storage = TiFlashTestEnv::getGlobalContext().getGlobalStoragePool(); + + // Start a PageStorage gc and suspend it before clean external page + auto sp_gc = SyncPointCtl::enableInScope("before_PageStorageImpl::cleanExternalPage_execute_callbacks"); + auto th_gc = std::async([&]() { + if (global_page_storage) + global_page_storage->gc(); + }); + sp_gc.waitAndPause(); + + { + // check column structure of store + const auto & cols = store->getTableColumns(); + // version & tag column added + ASSERT_EQ(cols.size(), 3); } + + // drop table in the middle of page storage gc + store->shutdown(); + store = nullptr; + + sp_gc.next(); // continue the page storage gc + th_gc.wait(); +} +CATCH + +TEST_F(DeltaMergeStoreTest, DroppedInMiddleDTFileRemoveCallback) +try +{ + // create table + ASSERT_NE(store, nullptr); + + auto global_page_storage = TiFlashTestEnv::getGlobalContext().getGlobalStoragePool(); + + // Start a PageStorage gc and suspend it before removing dtfiles + auto sp_gc = SyncPointCtl::enableInScope("before_DeltaMergeStore::callbacks_remover_remove"); + auto th_gc = std::async([&]() { + if (global_page_storage) + global_page_storage->gc(); + }); + sp_gc.waitAndPause(); + + { + // check column structure of store + const auto & cols = store->getTableColumns(); + // version & tag column added + ASSERT_EQ(cols.size(), 3); + } + + // drop table and files in the middle of page storage gc + store->drop(); + store = nullptr; + + sp_gc.next(); // continue removing dtfiles + th_gc.wait(); +} +CATCH + +TEST_F(DeltaMergeStoreTest, CreateInMiddleDTFileGC) +try +{ + // create table + ASSERT_NE(store, nullptr); + + auto global_page_storage = TiFlashTestEnv::getGlobalContext().getGlobalStoragePool(); + + // Start a PageStorage gc and suspend it before clean external page + auto sp_gc = SyncPointCtl::enableInScope("before_PageStorageImpl::cleanExternalPage_execute_callbacks"); + auto th_gc = std::async([&]() { + if (global_page_storage) + global_page_storage->gc(); + }); + sp_gc.waitAndPause(); + + DeltaMergeStorePtr new_store; + ColumnDefinesPtr new_cols; + { + new_cols = DMTestEnv::getDefaultColumns(); + ColumnDefine handle_column_define = (*new_cols)[0]; + new_store = std::make_shared(*db_context, + false, + "test", + "t_200", + 200, + *new_cols, + handle_column_define, + false, + 1, + DeltaMergeStore::Settings()); + auto block = DMTestEnv::prepareSimpleWriteBlock(0, 100, false); + new_store->write(*db_context, db_context->getSettingsRef(), block); + new_store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); + } + + sp_gc.next(); // continue the page storage gc + th_gc.wait(); + + BlockInputStreamPtr in = new_store->read(*db_context, + db_context->getSettingsRef(), + *new_cols, + {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, + "", + /* keep_order= */ false, + /* is_fast_mode= */ false, + /* expected_block_size= */ 1024)[0]; + ASSERT_INPUTSTREAM_NROWS(in, 100); } CATCH diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h index 5f8c098a8b1..a34a5125e28 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h @@ -67,7 +67,7 @@ class DeltaMergeStoreTest : public DB::base::TiFlashStorageTestBasic DeltaMergeStorePtr s = std::make_shared(*db_context, false, "test", - "DeltaMergeStoreTest", + "t_100", 100, *cols, handle_column_define, @@ -135,7 +135,7 @@ class DeltaMergeStoreRWTest DeltaMergeStorePtr s = std::make_shared(*db_context, false, "test", - "DeltaMergeStoreRWTest", + "t_101", 101, *cols, handle_column_define, @@ -180,4 +180,4 @@ class DeltaMergeStoreRWTest }; } // namespace tests } // namespace DM -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp index ac03b509f18..4406cf06289 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp @@ -207,10 +207,12 @@ try // Rename database name before store object is created. const String new_db_name = "new_" + storage->getDatabaseName(); - storage->rename(path_name, new_db_name, table_name, table_name); + const String new_display_table_name = "new_" + storage->getTableName(); + storage->rename(path_name, new_db_name, table_name, new_display_table_name); ASSERT_FALSE(storage->storeInited()); ASSERT_EQ(storage->getTableName(), table_name); ASSERT_EQ(storage->getDatabaseName(), new_db_name); + ASSERT_EQ(storage->getTableInfo().name, new_display_table_name); // prepare block data Block sample; @@ -231,9 +233,8 @@ try } // TiFlash always use t_{table_id} as table name - String new_table_name = storage->getTableName(); - storage->rename(path_name, new_db_name, new_table_name, new_table_name); - ASSERT_EQ(storage->getTableName(), new_table_name); + storage->rename(path_name, new_db_name, table_name, table_name); + ASSERT_EQ(storage->getTableName(), table_name); ASSERT_EQ(storage->getDatabaseName(), new_db_name); storage->drop(); diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index b77656785a7..7468c4fde44 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -68,13 +68,13 @@ enum class PageStorageRunMode : UInt8 struct ExternalPageCallbacks { - // `scanner` for scanning avaliable external page ids on disks. + // `scanner` for scanning available external page ids on disks. // `remover` will be called with living normal page ids after gc run a round, user should remove those // external pages(files) in `pending_external_pages` but not in `valid_normal_pages` using PathAndIdsVec = std::vector>>; using ExternalPagesScanner = std::function; using ExternalPagesRemover - = std::function & valid_normal_pages)>; + = std::function & valid_normal_pages)>; ExternalPagesScanner scanner = nullptr; ExternalPagesRemover remover = nullptr; NamespaceId ns_id = MAX_NAMESPACE_ID; @@ -336,6 +336,7 @@ class PageStorage : private boost::noncopyable } // Register and unregister external pages GC callbacks + // Note that user must ensure that it is safe to call `scanner` and `remover` even after unregister. virtual void registerExternalPagesCallbacks(const ExternalPageCallbacks & callbacks) = 0; virtual void unregisterExternalPagesCallbacks(NamespaceId /*ns_id*/){}; diff --git a/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.h b/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.h index 26ff9c109f4..51c118e9172 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.h +++ b/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.h @@ -17,6 +17,10 @@ #include #include +#include +#include +#include +#include #include namespace DB::PS::V3 diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp index dba1fef7566..67611dc980b 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -25,6 +26,8 @@ #include #include +#include + namespace DB { namespace ErrorCodes @@ -336,24 +339,55 @@ bool PageStorageImpl::gcImpl(bool /*not_skip*/, const WriteLimiterPtr & write_li // TODO: `clean_external_page` for all tables may slow down the whole gc process when there are lots of table. void PageStorageImpl::cleanExternalPage(Stopwatch & gc_watch, GCTimeStatistics & statistics) { - // TODO: `callbacks_mutex` is being held during the whole `cleanExternalPage`, meaning gc will block - // creating/dropping table, need to refine it later. - std::scoped_lock lock{callbacks_mutex}; - statistics.num_external_callbacks = callbacks_container.size(); - if (!callbacks_container.empty()) + // Fine grained lock on `callbacks_mutex`. + // So that adding/removing a storage will not be blocked for the whole + // processing time of `cleanExternalPage`. + std::shared_ptr ns_callbacks; + { + std::scoped_lock lock{callbacks_mutex}; + // check and get the begin iter + statistics.num_external_callbacks = callbacks_container.size(); + auto iter = callbacks_container.begin(); + if (iter == callbacks_container.end()) // empty + { + statistics.clean_external_page_ms = gc_watch.elapsedMillisecondsFromLastTime(); + return; + } + + assert(iter != callbacks_container.end()); // early exit in the previous code + // keep the shared_ptr so that erasing ns_id from PageStorage won't invalid the `ns_callbacks` + ns_callbacks = iter->second; + } + + Stopwatch external_watch; + + SYNC_FOR("before_PageStorageImpl::cleanExternalPage_execute_callbacks"); + + while (true) { - Stopwatch external_watch; - for (const auto & [ns_id, callbacks] : callbacks_container) + // 1. Note that we must call `scanner` before `getAliveExternalIds`. + // Or some committed external ids is not included in `alive_ids` + // but exist in `pending_external_pages`. They will be removed by + // accident with `remover` under this situation. + // 2. Assume calling the callbacks after erasing ns_is is safe. + + // the external pages on disks. + auto pending_external_pages = ns_callbacks->scanner(); + statistics.external_page_scan_ns += external_watch.elapsedFromLastTime(); + auto alive_external_ids = page_directory->getAliveExternalIds(ns_callbacks->ns_id); + statistics.external_page_get_alive_ns += external_watch.elapsedFromLastTime(); + // remove the external pages that is not alive now. + ns_callbacks->remover(pending_external_pages, alive_external_ids); + statistics.external_page_remove_ns += external_watch.elapsedFromLastTime(); + + // move to next namespace callbacks { - // Note that we must call `scanner` before `getAliveExternalIds` - // Or some committed external ids is not included and we may - // remove the external page by accident with `remover`. - const auto pending_external_pages = callbacks.scanner(); - statistics.external_page_scan_ns += external_watch.elapsedFromLastTime(); - const auto alive_external_ids = page_directory->getAliveExternalIds(ns_id); - statistics.external_page_get_alive_ns += external_watch.elapsedFromLastTime(); - callbacks.remover(pending_external_pages, alive_external_ids); - statistics.external_page_remove_ns += external_watch.elapsedFromLastTime(); + std::scoped_lock lock{callbacks_mutex}; + // next ns_id that is greater than `ns_id` + auto iter = callbacks_container.upper_bound(ns_callbacks->ns_id); + if (iter == callbacks_container.end()) + break; + ns_callbacks = iter->second; } } @@ -445,8 +479,13 @@ void PageStorageImpl::registerExternalPagesCallbacks(const ExternalPageCallbacks assert(callbacks.scanner != nullptr); assert(callbacks.remover != nullptr); assert(callbacks.ns_id != MAX_NAMESPACE_ID); - assert(callbacks_container.count(callbacks.ns_id) == 0); - callbacks_container.emplace(callbacks.ns_id, callbacks); + // NamespaceId(TableID) should not be reuse + RUNTIME_CHECK_MSG( + callbacks_container.count(callbacks.ns_id) == 0, + "Try to create callbacks for duplicated namespace id {}", + callbacks.ns_id); + // `emplace` won't invalid other iterator + callbacks_container.emplace(callbacks.ns_id, std::make_shared(callbacks)); } void PageStorageImpl::unregisterExternalPagesCallbacks(NamespaceId ns_id) diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.h b/dbms/src/Storages/Page/V3/PageStorageImpl.h index 9bce2e5dde8..321c9742f66 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.h +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.h @@ -15,6 +15,8 @@ #pragma once #include +#include +#include #include #include #include @@ -172,7 +174,8 @@ class PageStorageImpl : public DB::PageStorage const static String manifests_file_name; std::mutex callbacks_mutex; - using ExternalPageCallbacksContainer = std::unordered_map; + // Only std::map not std::unordered_map. We need insert/erase do not invalid other iterators. + using ExternalPageCallbacksContainer = std::map>; ExternalPageCallbacksContainer callbacks_container; }; diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index 0ccdd1d9b2c..0c86fbb52ec 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -13,6 +13,7 @@ // limitations under the License. +#include #include #include #include @@ -35,6 +36,9 @@ #include #include +#include +#include + namespace DB { namespace FailPoints @@ -1238,6 +1242,104 @@ try } CATCH +TEST_F(PageStorageTest, ConcurrencyAddExtCallbacks) +try +{ + auto ptr = std::make_shared(100); // mock the `StorageDeltaMerge` + ExternalPageCallbacks callbacks; + callbacks.ns_id = TEST_NAMESPACE_ID; + callbacks.scanner = [ptr_weak_ref = std::weak_ptr(ptr)]() -> ExternalPageCallbacks::PathAndIdsVec { + auto ptr = ptr_weak_ref.lock(); + if (!ptr) + return {}; + + (*ptr) += 1; // mock access the storage inside callback + return {}; + }; + callbacks.remover = [ptr_weak_ref = std::weak_ptr(ptr)](const ExternalPageCallbacks::PathAndIdsVec &, const std::set &) -> void { + auto ptr = ptr_weak_ref.lock(); + if (!ptr) + return; + + (*ptr) += 1; // mock access the storage inside callback + }; + page_storage->registerExternalPagesCallbacks(callbacks); + + // Start a PageStorage gc and suspend it before clean external page + auto sp_gc = SyncPointCtl::enableInScope("before_PageStorageImpl::cleanExternalPage_execute_callbacks"); + auto th_gc = std::async([&]() { + page_storage->gcImpl(/*not_skip*/ true, nullptr, nullptr); + }); + sp_gc.waitAndPause(); + + // mock table created while gc is running + { + ExternalPageCallbacks new_callbacks; + new_callbacks.ns_id = TEST_NAMESPACE_ID + 1; + new_callbacks.scanner = [ptr_weak_ref = std::weak_ptr(ptr)]() -> ExternalPageCallbacks::PathAndIdsVec { + auto ptr = ptr_weak_ref.lock(); + if (!ptr) + return {}; + + (*ptr) += 1; // mock access the storage inside callback + return {}; + }; + new_callbacks.remover = [ptr_weak_ref = std::weak_ptr(ptr)](const ExternalPageCallbacks::PathAndIdsVec &, const std::set &) -> void { + auto ptr = ptr_weak_ref.lock(); + if (!ptr) + return; + + (*ptr) += 1; // mock access the storage inside callback + }; + page_storage->registerExternalPagesCallbacks(new_callbacks); + } + + sp_gc.next(); // continue the gc + th_gc.wait(); + + ASSERT_EQ(*ptr, 100 + 4); +} +CATCH + +TEST_F(PageStorageTest, ConcurrencyRemoveExtCallbacks) +try +{ + auto ptr = std::make_shared(100); // mock the `StorageDeltaMerge` + ExternalPageCallbacks callbacks; + callbacks.ns_id = TEST_NAMESPACE_ID; + callbacks.scanner = [ptr_weak_ref = std::weak_ptr(ptr)]() -> ExternalPageCallbacks::PathAndIdsVec { + auto ptr = ptr_weak_ref.lock(); + if (!ptr) + return {}; + + (*ptr) += 1; // mock access the storage inside callback + return {}; + }; + callbacks.remover = [ptr_weak_ref = std::weak_ptr(ptr)](const ExternalPageCallbacks::PathAndIdsVec &, const std::set &) -> void { + auto ptr = ptr_weak_ref.lock(); + if (!ptr) + return; + + (*ptr) += 1; // mock access the storage inside callback + }; + page_storage->registerExternalPagesCallbacks(callbacks); + + // Start a PageStorage gc and suspend it before clean external page + auto sp_gc = SyncPointCtl::enableInScope("before_PageStorageImpl::cleanExternalPage_execute_callbacks"); + auto th_gc = std::async([&]() { + page_storage->gcImpl(/*not_skip*/ true, nullptr, nullptr); + }); + sp_gc.waitAndPause(); + + // mock table dropped while gc is running + page_storage->unregisterExternalPagesCallbacks(TEST_NAMESPACE_ID); + ptr = nullptr; + + sp_gc.next(); // continue the gc + th_gc.wait(); +} +CATCH + TEST_F(PageStorageTest, GcReuseSpaceThenRestore) try { diff --git a/dbms/src/Storages/PathPool.cpp b/dbms/src/Storages/PathPool.cpp index 2e7edd7435b..465874564f1 100644 --- a/dbms/src/Storages/PathPool.cpp +++ b/dbms/src/Storages/PathPool.cpp @@ -228,53 +228,15 @@ void StoragePathPool::clearPSV2ObsoleteData() drop_instance_data("data"); } -void StoragePathPool::rename(const String & new_database, const String & new_table, bool clean_rename) +void StoragePathPool::rename(const String & new_database, const String & new_table) { - if (unlikely(new_database.empty() || new_table.empty())) - throw Exception(fmt::format("Can not rename for PathPool to {}.{}", new_database, new_table)); + RUNTIME_CHECK(!new_database.empty() && !new_table.empty(), new_database, new_table); + RUNTIME_CHECK(!path_need_database_name); - if (likely(clean_rename)) - { - // caller ensure that no path need to be renamed. - if (unlikely(path_need_database_name)) - throw Exception("Can not do clean rename with path_need_database_name is true!"); - - std::lock_guard lock{mutex}; - database = new_database; - table = new_table; - } - else - { - if (unlikely(file_provider->isEncryptionEnabled())) - throw Exception("Encryption is only supported when using clean_rename"); - - // Note: changing these path is not atomic, we may lost data if process is crash here. - std::lock_guard lock{mutex}; - // Get root path without database and table - for (auto & info : main_path_infos) - { - Poco::Path p(info.path); - p = p.parent().parent(); - if (path_need_database_name) - p = p.parent(); - auto new_path = getStorePath(p.toString() + "/data", new_database, new_table); - renamePath(info.path, new_path); - info.path = new_path; - } - for (auto & info : latest_path_infos) - { - Poco::Path p(info.path); - p = p.parent().parent(); - if (path_need_database_name) - p = p.parent(); - auto new_path = getStorePath(p.toString() + "/data", new_database, new_table); - renamePath(info.path, new_path); - info.path = new_path; - } - - database = new_database; - table = new_table; - } + // The directories for storing table data is not changed, just rename related names. + std::lock_guard lock{mutex}; + database = new_database; + table = new_table; } void StoragePathPool::drop(bool recursive, bool must_success) @@ -297,6 +259,8 @@ void StoragePathPool::drop(bool recursive, bool must_success) total_bytes += file_size; } global_capacity->freeUsedSize(path_info.path, total_bytes); + // clear in case delegator->removeDTFile is called after `drop` + dt_file_path_map.clear(); } } catch (Poco::DirectoryNotEmptyException & e) diff --git a/dbms/src/Storages/PathPool.h b/dbms/src/Storages/PathPool.h index 4f16511feff..59d8f463e4b 100644 --- a/dbms/src/Storages/PathPool.h +++ b/dbms/src/Storages/PathPool.h @@ -412,7 +412,7 @@ class StoragePathPool void clearPSV2ObsoleteData(); - void rename(const String & new_database, const String & new_table, bool clean_rename); + void rename(const String & new_database, const String & new_table); void drop(bool recursive, bool must_success = true); diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 6eb6a16736b..8c9f115a472 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -1195,62 +1195,31 @@ void StorageDeltaMerge::rename( const String & new_display_table_name) { tidb_table_info.name = new_display_table_name; // update name in table info - // For DatabaseTiFlash, simply update store's database is OK. - // `store->getTableName() == new_table_name` only keep for mock test. - bool clean_rename = !data_path_contains_database_name && getTableName() == new_table_name; - if (likely(clean_rename)) { - if (storeInited()) - { - _store->rename(new_path_to_db, clean_rename, new_database_name, new_table_name); - return; - } - std::lock_guard lock(store_mutex); - if (storeInited()) - { - _store->rename(new_path_to_db, clean_rename, new_database_name, new_table_name); - } - else - { - table_column_info->db_name = new_database_name; - table_column_info->table_name = new_table_name; - } + // For DatabaseTiFlash, simply update store's database is OK. + // `store->getTableName() == new_table_name` only keep for mock test. + bool clean_rename = !data_path_contains_database_name && getTableName() == new_table_name; + RUNTIME_ASSERT(clean_rename, + log, + "should never rename the directories when renaming table, new_database_name={}, new_table_name={}", + new_database_name, + new_table_name); + } + if (storeInited()) + { + _store->rename(new_path_to_db, new_database_name, new_table_name); return; } - - /// Note that this routine is only left for CI tests. `clean_rename` should always be true in production env. - auto & store = getAndMaybeInitStore(); - - // For DatabaseOrdinary, we need to rename data path, then recreate a new store. - const String new_path = new_path_to_db + "/" + new_table_name; - - if (Poco::File{new_path}.exists()) - throw Exception( - fmt::format("Target path already exists: {}", new_path), - ErrorCodes::DIRECTORY_ALREADY_EXISTS); - - // flush store and then reset store to new path - store->flushCache(global_context, RowKeyRange::newAll(is_common_handle, rowkey_column_size)); - ColumnDefines table_column_defines = store->getTableColumns(); - ColumnDefine handle_column_define = store->getHandle(); - DeltaMergeStore::Settings settings = store->getSettings(); - - // remove background tasks - store->shutdown(); - // rename directories for multi disks - store->rename(new_path, clean_rename, new_database_name, new_table_name); - // generate a new store - store = std::make_shared( - global_context, - data_path_contains_database_name, - new_database_name, - new_table_name, - tidb_table_info.id, - std::move(table_column_defines), - std::move(handle_column_define), - is_common_handle, - rowkey_column_size, - settings); + std::lock_guard lock(store_mutex); + if (storeInited()) + { + _store->rename(new_path_to_db, new_database_name, new_table_name); + } + else + { + table_column_info->db_name = new_database_name; + table_column_info->table_name = new_table_name; + } } String StorageDeltaMerge::getTableName() const diff --git a/tests/delta-merge-test/ddl/alter.test b/tests/delta-merge-test/ddl/alter.test index 4bf405ac9e1..c058e5cdc81 100644 --- a/tests/delta-merge-test/ddl/alter.test +++ b/tests/delta-merge-test/ddl/alter.test @@ -72,21 +72,5 @@ └───┴──────┴───────┴──────┘ -## rename table ->> drop table if exists dm_test_renamed ->> rename table dm_test to dm_test_renamed ->> select * from dm_test -Received exception from server (version {#WORD}): -Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.dm_test doesn't exist.. - ->> select * from dm_test_renamed -┌─a─┬────b─┬─────c─┬────d─┐ -│ 1 │ 0 │ 0 │ \N │ -│ 2 │ 1024 │ 65535 │ 4096 │ -│ 3 │ 2048 │ 65536 │ \N │ -└───┴──────┴───────┴──────┘ - - ## Clean up >> drop table if exists dm_test ->> drop table if exists dm_test_renamed diff --git a/tests/delta-merge-test/raft/schema/partition_table_restart.test b/tests/delta-merge-test/raft/schema/partition_table_restart.test index 893bb617af4..c7a5e488111 100644 --- a/tests/delta-merge-test/raft/schema/partition_table_restart.test +++ b/tests/delta-merge-test/raft/schema/partition_table_restart.test @@ -15,16 +15,11 @@ => DBGInvoke __enable_schema_sync_service('false') => DBGInvoke __drop_tidb_table(default, test) -=> DBGInvoke __drop_tidb_table(default, test1) => DBGInvoke __refresh_schemas() => drop table if exists default.test => drop table if exists default.test_9999 => drop table if exists default.test_9998 => drop table if exists default.test_9997 -=> drop table if exists default.test1 -=> drop table if exists default.test1_9999 -=> drop table if exists default.test1_9998 -=> drop table if exists default.test1_9997 => DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64', '', 'dt') => DBGInvoke __mock_tidb_partition(default, test, 9999) @@ -38,38 +33,23 @@ => DBGInvoke __reset_schemas() => DBGInvoke __add_column_to_tidb_table(default, test, 'col_3 Nullable(Int8)') -=> DBGInvoke __rename_tidb_table(default, test, test1) => DBGInvoke __refresh_schemas() - -=> show tables -┌─name───────┐ -│ test1 │ -│ test1_9997 │ -│ test1_9998 │ -│ test1_9999 │ -└────────────┘ -=> select col_2 from default.test1_9997 -=> select * from default.test_9997 -Received exception from server (version {#WORD}): -Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test_9997 doesn't exist.. -=> select * from default.test_9998 -Received exception from server (version {#WORD}): -Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test_9998 doesn't exist.. +=> select col_2 from default.test_9997 => DBGInvoke __reset_schemas() -=> DBGInvoke __drop_tidb_partition(default, test1, 9997) +=> DBGInvoke __drop_tidb_partition(default, test, 9997) => DBGInvoke __refresh_schemas() -=> DBGInvoke is_tombstone(default, test1_9997) -┌─is_tombstone(default, test1_9997)─┐ +=> DBGInvoke is_tombstone(default, test_9997) +┌─is_tombstone(default, test_9997)─┐ │ true │ └───────────────────────────────────┘ -=> select * from default.test1_9999 +=> select * from default.test_9999 -=> DBGInvoke __drop_tidb_table(default, test1) +=> DBGInvoke __drop_tidb_table(default, test) => DBGInvoke __refresh_schemas() -=> drop table if exists default.test1 -=> drop table if exists default.test1_9999 -=> drop table if exists default.test1_9998 -=> drop table if exists default.test1_9997 +=> drop table if exists default.test +=> drop table if exists default.test_9999 +=> drop table if exists default.test_9998 +=> drop table if exists default.test_9997 => DBGInvoke __enable_schema_sync_service('true') diff --git a/tests/delta-merge-test/raft/schema/rename_on_read.test b/tests/delta-merge-test/raft/schema/rename_on_read.test deleted file mode 100644 index 40eb66277a9..00000000000 --- a/tests/delta-merge-test/raft/schema/rename_on_read.test +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright 2022 PingCAP, Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -=> DBGInvoke __enable_schema_sync_service('false') - -=> DBGInvoke __drop_tidb_table(default, test) -=> drop table if exists default.test - -=> DBGInvoke __drop_tidb_table(default, test1) -=> drop table if exists default.test1 - -=> DBGInvoke __set_flush_threshold(1000000, 1000000) - -=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String', '', 'dt') -=> DBGInvoke __refresh_schemas() -=> select * from default.test -=> DBGInvoke __rename_tidb_table(default, test, test1) -=> select * from default.test -=> select * from default.test " --schema_version "1000000 -Received exception from server (version {#WORD}): -Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist.. -=> select * from default.test1 -=> select * from default.test1 " --schema_version "1000000 - -=> DBGInvoke __drop_tidb_table(default, test1) -=> drop table if exists default.test1 -=> DBGInvoke __enable_schema_sync_service('true') diff --git a/tests/delta-merge-test/raft/schema/rename_on_write.test b/tests/delta-merge-test/raft/schema/rename_on_write.test deleted file mode 100644 index 6ad2c809ce6..00000000000 --- a/tests/delta-merge-test/raft/schema/rename_on_write.test +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright 2022 PingCAP, Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -#TODO: We can not mock this situation, ignore for now -#RETURN - -=> DBGInvoke __enable_schema_sync_service('false') - -=> DBGInvoke __drop_tidb_table(default, test) -=> drop table if exists default.test - -=> DBGInvoke __drop_tidb_table(default, test1) -=> drop table if exists default.test1 - -=> DBGInvoke __set_flush_threshold(1000000, 1000000) - -=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String', '', 'dt') -=> DBGInvoke __refresh_schemas() -=> DBGInvoke __put_region(4, 0, 100, default, test) -=> select col_1 from default.test -=> DBGInvoke __add_column_to_tidb_table(default, test, 'col_2 Nullable(Int8)') -=> DBGInvoke __rename_tidb_table(default, test, test1) -#For DeltaTree, each write will trigger schema sync. -=> DBGInvoke __raft_insert_row(default, test1, 4, 50, 'test1', 1) -=> select * from default.test -Received exception from server (version {#WORD}): -Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist.. -=> select * from default.test1 -┌─col_1─┬─_tidb_rowid─┬─col_2─┐ -│ test1 │ 50 │ 1 │ -└───────┴─────────────┴───────┘ - -=> DBGInvoke __drop_tidb_table(default, test1) -=> drop table if exists default.test1 -=> DBGInvoke __enable_schema_sync_service('true') diff --git a/tests/delta-merge-test/raft/schema/rename_tables.test b/tests/delta-merge-test/raft/schema/rename_tables.test deleted file mode 100644 index 7c65d46d3e3..00000000000 --- a/tests/delta-merge-test/raft/schema/rename_tables.test +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright 2022 PingCAP, Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Preparation. -=> DBGInvoke __enable_schema_sync_service('false') - -=> DBGInvoke __drop_tidb_table(default, t1) -=> DBGInvoke __drop_tidb_table(default, t2) -=> drop table if exists default.t1 -=> drop table if exists default.t2 -=> DBGInvoke __refresh_schemas() - -=> DBGInvoke __set_flush_threshold(1000000, 1000000) - - -=> DBGInvoke __create_tidb_tables(default, t1, t2) -# rename table -=> DBGInvoke __rename_tidb_tables(default, t1, r1, default, t2, r2) -=> DBGInvoke __refresh_schemas() -=> select database,name,engine from system.tables where database='default' and name='r1' -┌─database─┬─name─┬─engine─────┐ -│ default │ r1 │ DeltaMerge │ -└──────────┴──────┴────────────┘ -=> select database,name,engine from system.tables where database='default' and name='r2' -┌─database─┬─name─┬─engine─────┐ -│ default │ r2 │ DeltaMerge │ -└──────────┴──────┴────────────┘ - -# clean -=> DBGInvoke __drop_tidb_table(default, r1) -=> DBGInvoke __drop_tidb_table(default, r2) -=> drop table if exists default.r1 -=> drop table if exists default.r2 -=> DBGInvoke __enable_schema_sync_service('true') \ No newline at end of file diff --git a/tests/delta-merge-test/raft/txn_mock/partition_table.test b/tests/delta-merge-test/raft/txn_mock/partition_table.test index 2f8e67a61a8..84b8044260f 100644 --- a/tests/delta-merge-test/raft/txn_mock/partition_table.test +++ b/tests/delta-merge-test/raft/txn_mock/partition_table.test @@ -17,12 +17,9 @@ => DBGInvoke __drop_tidb_table(default, test) => drop table if exists default.test -=> drop table if exists default.test1 => drop table if exists default.test_9997 => drop table if exists default.test_9998 => drop table if exists default.test_9999 -=> drop table if exists default.test1_9997 -=> drop table if exists default.test1_9999 => DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64') => DBGInvoke __mock_tidb_partition(default, test, 9999) @@ -94,27 +91,17 @@ │ 0 │ └──────────────┘ -=> DBGInvoke __rename_tidb_table(default, test, test1) -=> DBGInvoke __refresh_schemas() -=> select count(*) from default.test1_9997 -┌─count()─┐ -│ 2 │ -└─────────┘ - -=> DBGInvoke __drop_tidb_table(default, test1) +=> DBGInvoke __drop_tidb_table(default, test) => DBGInvoke __refresh_schemas() -=> DBGInvoke is_tombstone(default, test1_9999) +=> DBGInvoke is_tombstone(default, test_9999) ┌─is_tombstone(default, test_9999)─┐ │ true │ └──────────────────────────────────┘ => drop table if exists default.test -=> drop table if exists default.test1 => drop table if exists default.test_9997 => drop table if exists default.test_9998 => drop table if exists default.test_9999 -=> drop table if exists default.test1_9997 -=> drop table if exists default.test1_9999 => DBGInvoke __enable_schema_sync_service('true') => DBGInvoke __clean_up_region()