diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index c88e739921f..3d47c65b0fc 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -240,6 +240,13 @@ struct ContextShared return; shutdown_called = true; + if (global_storage_pool) + { + // shutdown the gc task of global storage pool before + // shutting down the tables. + global_storage_pool->shutdown(); + } + /** At this point, some tables may have threads that block our mutex. * To complete them correctly, we will copy the current list of tables, * and ask them all to finish their work. diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 8640cd37af8..6cc54790cb5 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -403,10 +403,11 @@ void DeltaMergeStore::shutdown() return; LOG_TRACE(log, "Shutdown DeltaMerge start"); - // shutdown before unregister to avoid conflict between this thread and background gc thread on the `ExternalPagesCallbacks` - // because PageStorage V2 doesn't have any lock protection on the `ExternalPagesCallbacks`.(The order doesn't matter for V3) + // Must shutdown storage path pool to make sure the DMFile remove callbacks + // won't remove dmfiles unexpectly. + path_pool->shutdown(); + // shutdown storage pool and clean up the local DMFile remove callbacks storage_pool->shutdown(); - storage_pool->dataUnregisterExternalPagesCallbacks(storage_pool->getNamespaceId()); background_pool.removeTask(background_task_handle); blockable_background_pool.removeTask(blockable_background_pool_handle); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index a0fbcd6c951..b0a4feb86c4 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -52,6 +52,11 @@ using SegmentIdSet = std::unordered_set; struct ExternalDTFileInfo; struct GCOptions; +namespace tests +{ +class DeltaMergeStoreTest; +} + inline static const PageId DELTA_MERGE_FIRST_SEGMENT_ID = 1; struct SegmentStats @@ -155,6 +160,7 @@ struct StoreStats class DeltaMergeStore : private boost::noncopyable { public: + friend class ::DB::DM::tests::DeltaMergeStoreTest; struct Settings { NotCompress not_compress_columns{}; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp index b875b57e08c..c70acce453c 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp @@ -14,12 +14,15 @@ #include #include +#include #include #include #include +#include #include #include +#include namespace CurrentMetrics { @@ -36,20 +39,33 @@ extern const char pause_until_dt_background_delta_merge[]; namespace DM { -void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) + +// A callback class for scanning the DMFiles on local filesystem +class LocalDMFileGcScanner final { - // Callbacks for cleaning outdated DTFiles. Note that there is a chance - // that callbacks is called after the `DeltaMergeStore` dropped, we must - // make the callbacks safe. - ExternalPageCallbacks callbacks; - callbacks.ns_id = storage_pool->getNamespaceId(); - callbacks.scanner = [path_pool_weak_ref = std::weak_ptr(path_pool), file_provider = global_context.getFileProvider()]() { +private: + // !!! Warning !!! + // Should only keep a weak ref of storage path pool since + // this callback instance may still valid inside the PageStorage + // even after the DeltaMerge storage is shutdown or released. + std::weak_ptr path_pool_weak_ref; + FileProviderPtr file_provider; + +public: + LocalDMFileGcScanner(std::weak_ptr path_pool_, FileProviderPtr provider) + : path_pool_weak_ref(std::move(path_pool_)) + , file_provider(std::move(provider)) + {} + + ExternalPageCallbacks::PathAndIdsVec operator()() + { ExternalPageCallbacks::PathAndIdsVec path_and_ids_vec; - // If the StoragePathPool is invalid, meaning we call `scanner` after dropping the table, + // If the StoragePathPool is invalid or shutdown flag is set, + // meaning we call `scanner` after shutdowning or dropping the table, // simply return an empty list is OK. auto path_pool = path_pool_weak_ref.lock(); - if (!path_pool) + if (!path_pool || path_pool->isShutdown()) return path_and_ids_vec; // Return the DTFiles on disks. @@ -66,14 +82,35 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) path_and_ids_vec.emplace_back(root_path, std::move(file_ids_in_current_path)); } return path_and_ids_vec; - }; - callbacks.remover = [path_pool_weak_ref = std::weak_ptr(path_pool), // - file_provider = global_context.getFileProvider(), - logger = log](const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set & valid_ids) { - // If the StoragePathPool is invalid, meaning we call `remover` after dropping the table, - // simply skip is OK. + } +}; + +// A callback class for removing the DMFiles on local filesystem +class LocalDMFileGcRemover final +{ +private: + // !!! Warning !!! + // Should only keep a weak ref of storage path pool since + // this callback instance may still valid inside the PageStorage + // even after the DeltaMerge storage is shutdown or released. + std::weak_ptr path_pool_weak_ref; + FileProviderPtr file_provider; + LoggerPtr logger; + +public: + LocalDMFileGcRemover(std::weak_ptr path_pool_, FileProviderPtr provider, LoggerPtr log) + : path_pool_weak_ref(std::move(path_pool_)) + , file_provider(std::move(provider)) + , logger(std::move(log)) + {} + + void operator()(const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set & valid_ids) + { + // If the StoragePathPool is invalid or shutdown flag is set, + // meaning we call `remover` after shutdowning or dropping the table, + // we must skip because the `valid_ids` is not reliable! auto path_pool = path_pool_weak_ref.lock(); - if (!path_pool) + if (!path_pool || path_pool->isShutdown()) return; SYNC_FOR("before_DeltaMergeStore::callbacks_remover_remove"); @@ -102,6 +139,7 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) LOG_INFO(logger, "GC try remove useless DM file, but file not found and may have been removed, dmfile={}", DMFile::getPathByStatus(path, id, DMFile::Status::READABLE)); + continue; // next file } else if (dmfile->canGC()) { @@ -126,19 +164,30 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) LOG_INFO(logger, "GC removed useless DM file, dmfile={}", dmfile->path()); else LOG_INFO(logger, "GC try remove useless DM file, but error happen, dmfile={} err_msg={}", dmfile->path(), err_msg); + continue; // next file } } } - }; + } +}; + +void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) +{ + // Callbacks for cleaning outdated DTFiles. Note that there is a chance + // that callbacks is called after the `DeltaMergeStore` shutdown or dropped, + // we must make the callbacks safe. + ExternalPageCallbacks callbacks; + callbacks.ns_id = storage_pool->getNamespaceId(); + callbacks.scanner = LocalDMFileGcScanner(std::weak_ptr(path_pool), global_context.getFileProvider()); + callbacks.remover = LocalDMFileGcRemover(std::weak_ptr(path_pool), global_context.getFileProvider(), log); // remember to unregister it when shutdown - storage_pool->dataRegisterExternalPagesCallbacks(callbacks); - storage_pool->enableGC(); + storage_pool->startup(std::move(callbacks)); background_task_handle = background_pool.addTask([this] { return handleBackgroundTask(false); }); blockable_background_pool_handle = blockable_background_pool.addTask([this] { return handleBackgroundTask(true); }); - // Do place delta index. + // Generate place delta index tasks for (auto & [end, segment] : segments) { (void)end; diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool.cpp index 22ca45179a8..123f143ec0a 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool.cpp @@ -112,11 +112,7 @@ GlobalStoragePool::GlobalStoragePool(const PathPool & path_pool, Context & globa GlobalStoragePool::~GlobalStoragePool() { - if (gc_handle) - { - global_context.getBackgroundPool().removeTask(gc_handle); - gc_handle = nullptr; - } + shutdown(); } void GlobalStoragePool::restore() @@ -132,6 +128,15 @@ void GlobalStoragePool::restore() false); } +void GlobalStoragePool::shutdown() +{ + if (gc_handle) + { + global_context.getBackgroundPool().removeTask(gc_handle); + gc_handle = {}; + } +} + FileUsageStatistics GlobalStoragePool::getLogFileUsage() const { return log_storage->getFileUsageStatistics(); @@ -535,30 +540,31 @@ StoragePool::~StoragePool() shutdown(); } -void StoragePool::enableGC() -{ - // The data in V3 will be GCed by `GlobalStoragePool::gc`, only register gc task under only v2/mix mode - if (run_mode == PageStorageRunMode::ONLY_V2 || run_mode == PageStorageRunMode::MIX_MODE) - { - gc_handle = global_context.getBackgroundPool().addTask([this] { return this->gc(global_context.getSettingsRef()); }); - } -} - -void StoragePool::dataRegisterExternalPagesCallbacks(const ExternalPageCallbacks & callbacks) +void StoragePool::startup(ExternalPageCallbacks && callbacks) { switch (run_mode) { case PageStorageRunMode::ONLY_V2: { + // For V2, we need a per physical table gc handle to perform the gc of its PageStorage instances. data_storage_v2->registerExternalPagesCallbacks(callbacks); + gc_handle = global_context.getBackgroundPool().addTask([this] { return this->gc(global_context.getSettingsRef()); }); break; } case PageStorageRunMode::ONLY_V3: + { + // For V3, the GC is handled by `GlobalStoragePool::gc`, just register callbacks is OK. + data_storage_v3->registerExternalPagesCallbacks(callbacks); + break; + } case PageStorageRunMode::MIX_MODE: { - // We have transformed all pages from V2 to V3 in `restore`, so - // only need to register callbacks for V3. + // For V3, the GC is handled by `GlobalStoragePool::gc`. + // Since we have transformed all external pages from V2 to V3 in `StoragePool::restore`, + // just register callbacks to V3 is OK data_storage_v3->registerExternalPagesCallbacks(callbacks); + // we still need a gc_handle to reclaim the V2 disk space. + gc_handle = global_context.getBackgroundPool().addTask([this] { return this->gc(global_context.getSettingsRef()); }); break; } default: @@ -566,19 +572,36 @@ void StoragePool::dataRegisterExternalPagesCallbacks(const ExternalPageCallbacks } } -void StoragePool::dataUnregisterExternalPagesCallbacks(NamespaceId ns_id) +void StoragePool::shutdown() { + // Note: Should reset the gc_handle before unregistering the pages callbacks + if (gc_handle) + { + global_context.getBackgroundPool().removeTask(gc_handle); + gc_handle = nullptr; + } + switch (run_mode) { case PageStorageRunMode::ONLY_V2: { + meta_storage_v2->shutdown(); + log_storage_v2->shutdown(); + data_storage_v2->shutdown(); data_storage_v2->unregisterExternalPagesCallbacks(ns_id); break; } case PageStorageRunMode::ONLY_V3: + { + data_storage_v3->unregisterExternalPagesCallbacks(ns_id); + break; + } case PageStorageRunMode::MIX_MODE: { - // We have transformed all pages from V2 to V3 in `restore`, so + meta_storage_v2->shutdown(); + log_storage_v2->shutdown(); + data_storage_v2->shutdown(); + // We have transformed all external pages from V2 to V3 in `restore`, so // only need to unregister callbacks for V3. data_storage_v3->unregisterExternalPagesCallbacks(ns_id); break; @@ -588,7 +611,6 @@ void StoragePool::dataUnregisterExternalPagesCallbacks(NamespaceId ns_id) } } - bool StoragePool::doV2Gc(const Settings & settings) { bool done_anything = false; @@ -629,21 +651,6 @@ bool StoragePool::gc(const Settings & settings, const Seconds & try_gc_period) return doV2Gc(settings); } -void StoragePool::shutdown() -{ - if (gc_handle) - { - global_context.getBackgroundPool().removeTask(gc_handle); - gc_handle = nullptr; - } - if (run_mode != PageStorageRunMode::ONLY_V3) - { - meta_storage_v2->shutdown(); - log_storage_v2->shutdown(); - data_storage_v2->shutdown(); - } -} - void StoragePool::drop() { shutdown(); diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.h b/dbms/src/Storages/DeltaMerge/StoragePool.h index 4dc8d71b2b1..c49380b62e7 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.h +++ b/dbms/src/Storages/DeltaMerge/StoragePool.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include #include @@ -36,7 +35,7 @@ namespace DM class StoragePool; using StoragePoolPtr = std::shared_ptr; -static const std::chrono::seconds DELTA_MERGE_GC_PERIOD(60); +static constexpr std::chrono::seconds DELTA_MERGE_GC_PERIOD(60); class GlobalStoragePool : private boost::noncopyable { @@ -51,6 +50,8 @@ class GlobalStoragePool : private boost::noncopyable void restore(); + void shutdown(); + friend class StoragePool; friend class ::DB::AsynchronousMetrics; @@ -90,7 +91,7 @@ class StoragePool : private boost::noncopyable NamespaceId getNamespaceId() const { return ns_id; } - PageStorageRunMode getPageStorageRunMode() + PageStorageRunMode getPageStorageRunMode() const { return run_mode; } @@ -141,16 +142,15 @@ class StoragePool : private boost::noncopyable PageReader newMetaReader(ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id); PageReader newMetaReader(ReadLimiterPtr read_limiter, PageStorage::SnapshotPtr & snapshot); - void enableGC(); - - void dataRegisterExternalPagesCallbacks(const ExternalPageCallbacks & callbacks); + // Register the clean up DMFiles callbacks to PageStorage. + // The callbacks will be unregister when `shutdown` is called. + void startup(ExternalPageCallbacks && callbacks); - void dataUnregisterExternalPagesCallbacks(NamespaceId ns_id); + // Shutdown the gc handle and DMFile callbacks + void shutdown(); bool gc(const Settings & settings, const Seconds & try_gc_period = DELTA_MERGE_GC_PERIOD); - void shutdown(); - // Caller must cancel gc tasks before drop void drop(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 7b9899e25d8..b1a786baab4 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -97,6 +97,59 @@ try } CATCH +TEST_F(DeltaMergeStoreTest, ShutdownInMiddleDTFileGC) +try +{ + // create table + ASSERT_NE(store, nullptr); + + auto global_page_storage = TiFlashTestEnv::getGlobalContext().getGlobalStoragePool(); + + // Start a PageStorage gc and suspend it before clean external page + auto sp_gc = SyncPointCtl::enableInScope("before_PageStorageImpl::cleanExternalPage_execute_callbacks"); + auto th_gc = std::async([&]() { + if (global_page_storage) + global_page_storage->gc(); + }); + sp_gc.waitAndPause(); + + { + // check column structure of store + const auto & cols = store->getTableColumns(); + // version & tag column added + ASSERT_EQ(cols.size(), 3); + } + + auto provider = db_context->getFileProvider(); + const auto paths = getAllStorePaths(); + auto get_num_stable_files = [&]() -> size_t { + size_t total_num_dtfiles = 0; + auto options = DMFile::ListOptions{.only_list_can_gc = true, .clean_up = false}; + for (const auto & root_path : paths) + { + auto file_ids = DMFile::listAllInPath(provider, root_path, options); + total_num_dtfiles += file_ids.size(); + } + return total_num_dtfiles; + }; + + const size_t num_before_shutdown = get_num_stable_files(); + ASSERT_GT(num_before_shutdown, 0); + + // shutdown the table in the middle of page storage gc, but not dropped + store->shutdown(); + + const size_t num_after_shutdown = get_num_stable_files(); + ASSERT_EQ(num_before_shutdown, num_after_shutdown); + + sp_gc.next(); // continue the page storage gc + th_gc.get(); + + const size_t num_after_gc = get_num_stable_files(); + ASSERT_EQ(num_before_shutdown, num_after_gc); +} +CATCH + TEST_F(DeltaMergeStoreTest, DroppedInMiddleDTFileGC) try { @@ -126,6 +179,8 @@ try sp_gc.next(); // continue the page storage gc th_gc.get(); + + // Mark: ensure this test won't run into crash by nullptr } CATCH diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h index d3b15f4c930..68c95d07fa6 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h @@ -77,6 +77,12 @@ class DeltaMergeStoreTest : public DB::base::TiFlashStorageTestBasic return s; } + Strings getAllStorePaths() const + { + auto path_delegate = store->path_pool->getStableDiskDelegator(); + return path_delegate.listPaths(); + } + protected: DeltaMergeStorePtr store; }; diff --git a/dbms/src/Storages/Page/V2/PageStorage.cpp b/dbms/src/Storages/Page/V2/PageStorage.cpp index 287f6b8d4c2..93f1c9d64be 100644 --- a/dbms/src/Storages/Page/V2/PageStorage.cpp +++ b/dbms/src/Storages/Page/V2/PageStorage.cpp @@ -621,6 +621,7 @@ size_t PageStorage::getNumberOfPages() } } +// For debugging purpose std::set PageStorage::getAliveExternalPageIds(NamespaceId /*ns_id*/) { const auto & concrete_snap = getConcreteSnapshot(); diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index b489ccfad4a..627a868485f 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -339,7 +339,7 @@ class PageDirectory // Get the external id that is not deleted or being ref by another id by // `ns_id`. - std::set getAliveExternalIds(NamespaceId ns_id) const + std::optional> getAliveExternalIds(NamespaceId ns_id) const { return external_ids_by_ns.getAliveIds(ns_id); } diff --git a/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.cpp b/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.cpp index 34934297d30..445f2fc7d2c 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.cpp @@ -16,6 +16,7 @@ #include #include +#include namespace DB::PS::V3 { @@ -34,17 +35,17 @@ void ExternalIdsByNamespace::addExternalId(const std::shared_ptr ExternalIdsByNamespace::getAliveIds(NamespaceId ns_id) const +std::optional> ExternalIdsByNamespace::getAliveIds(NamespaceId ns_id) const { // Now we assume a lock among all NamespaceIds is good enough. std::unique_lock map_guard(mu); - std::set valid_external_ids; auto ns_iter = ids_by_ns.find(ns_id); if (ns_iter == ids_by_ns.end()) - return valid_external_ids; + return std::nullopt; // Only scan the given `ns_id` + std::set valid_external_ids; auto & external_ids = ns_iter->second; for (auto iter = external_ids.begin(); iter != external_ids.end(); /*empty*/) { @@ -61,11 +62,8 @@ std::set ExternalIdsByNamespace::getAliveIds(NamespaceId ns_id) const ++iter; } } - // No valid external pages in this `ns_id` - if (valid_external_ids.empty()) - { - ids_by_ns.erase(ns_id); - } + // The `external_ids` maybe an empty list now, leave it to be + // cleaned by unregister return valid_external_ids; } diff --git a/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.h b/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.h index 01cae5ebe20..0635ca8ca5e 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.h +++ b/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.h @@ -40,7 +40,8 @@ class ExternalIdsByNamespace // Get all alive external ids of given `ns_id` // Will also cleanup the invalid external ids. - std::set getAliveIds(NamespaceId ns_id) const; + // If the ns_id is invalid, std::nullopt will be returned. + std::optional> getAliveIds(NamespaceId ns_id) const; // After table dropped, the `getAliveIds` with specified // `ns_id` will not be cleaned. We need this method to diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp index af82c8fed48..e132b847357 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp @@ -115,9 +115,13 @@ size_t PageStorageImpl::getNumberOfPages() return page_directory->numPages(); } +// For debugging purpose std::set PageStorageImpl::getAliveExternalPageIds(NamespaceId ns_id) { - return page_directory->getAliveExternalIds(ns_id); + // Keep backward compatibility of this functions with v2 + if (auto ids = page_directory->getAliveExternalIds(ns_id); ids) + return *ids; + return {}; } void PageStorageImpl::writeImpl(DB::WriteBatch && write_batch, const WriteLimiterPtr & write_limiter) @@ -369,8 +373,11 @@ void PageStorageImpl::cleanExternalPage(Stopwatch & gc_watch, GCTimeStatistics & statistics.external_page_scan_ns += external_watch.elapsedFromLastTime(); auto alive_external_ids = page_directory->getAliveExternalIds(ns_callbacks->ns_id); statistics.external_page_get_alive_ns += external_watch.elapsedFromLastTime(); - // remove the external pages that is not alive now. - ns_callbacks->remover(pending_external_pages, alive_external_ids); + if (alive_external_ids) + { + // remove the external pages that is not alive now. + ns_callbacks->remover(pending_external_pages, *alive_external_ids); + } // else the ns_id is invalid, just skip statistics.external_page_remove_ns += external_watch.elapsedFromLastTime(); // move to next namespace callbacks diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp index 865da8e1a43..7f9c59b7902 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -81,9 +81,10 @@ TEST(ExternalIdsByNamespaceTest, Simple) { // holder keep "50" alive auto ids = external_ids_by_ns.getAliveIds(ns_id); - LOG_DEBUG(&Poco::Logger::root(), "{} end first, size={}", who.load(), ids.size()); - ASSERT_EQ(ids.size(), 1); - ASSERT_EQ(*ids.begin(), 50); + ASSERT_TRUE(ids.has_value()); + LOG_DEBUG(Logger::get(), "{} end first, size={}", who.load(), ids->size()); + ASSERT_EQ(ids->size(), 1); + ASSERT_EQ(*ids->begin(), 50); ASSERT_TRUE(external_ids_by_ns.existNamespace(ns_id)); } @@ -92,7 +93,7 @@ TEST(ExternalIdsByNamespaceTest, Simple) // "50" is erased though the holder is not released. external_ids_by_ns.unregisterNamespace(ns_id); auto ids = external_ids_by_ns.getAliveIds(ns_id); - ASSERT_EQ(ids.size(), 0); + ASSERT_FALSE(ids.has_value()); // nullopt ASSERT_FALSE(external_ids_by_ns.existNamespace(ns_id)); } } @@ -547,8 +548,9 @@ TEST_F(PageDirectoryTest, IdempotentNewExtPageAfterAllCleaned) edit.putExternal(buildV3Id(TEST_NAMESPACE_ID, 10)); dir->apply(std::move(edit)); auto alive_ids = dir->getAliveExternalIds(TEST_NAMESPACE_ID); - EXPECT_EQ(alive_ids.size(), 1); - EXPECT_GT(alive_ids.count(10), 0); + ASSERT_TRUE(alive_ids.has_value()); + EXPECT_EQ(alive_ids->size(), 1); + EXPECT_GT(alive_ids->count(10), 0); } { @@ -556,8 +558,9 @@ TEST_F(PageDirectoryTest, IdempotentNewExtPageAfterAllCleaned) edit.putExternal(buildV3Id(TEST_NAMESPACE_ID, 10)); // should be idempotent dir->apply(std::move(edit)); auto alive_ids = dir->getAliveExternalIds(TEST_NAMESPACE_ID); - EXPECT_EQ(alive_ids.size(), 1); - EXPECT_GT(alive_ids.count(10), 0); + ASSERT_TRUE(alive_ids.has_value()); + EXPECT_EQ(alive_ids->size(), 1); + EXPECT_GT(alive_ids->count(10), 0); } { @@ -566,8 +569,9 @@ TEST_F(PageDirectoryTest, IdempotentNewExtPageAfterAllCleaned) dir->apply(std::move(edit)); dir->gcInMemEntries(); // clean in memory auto alive_ids = dir->getAliveExternalIds(TEST_NAMESPACE_ID); - EXPECT_EQ(alive_ids.size(), 0); - EXPECT_EQ(alive_ids.count(10), 0); // removed + ASSERT_TRUE(alive_ids.has_value()); + EXPECT_EQ(alive_ids->size(), 0); + EXPECT_EQ(alive_ids->count(10), 0); // removed } { @@ -576,8 +580,9 @@ TEST_F(PageDirectoryTest, IdempotentNewExtPageAfterAllCleaned) edit.putExternal(buildV3Id(TEST_NAMESPACE_ID, 10)); dir->apply(std::move(edit)); auto alive_ids = dir->getAliveExternalIds(TEST_NAMESPACE_ID); - EXPECT_EQ(alive_ids.size(), 1); - EXPECT_GT(alive_ids.count(10), 0); + ASSERT_TRUE(alive_ids.has_value()); + EXPECT_EQ(alive_ids->size(), 1); + EXPECT_GT(alive_ids->count(10), 0); } } @@ -915,8 +920,9 @@ try } auto snap = dir->createSnapshot(); auto alive_ids = dir->getAliveExternalIds(TEST_NAMESPACE_ID); - EXPECT_EQ(alive_ids.size(), 1); - EXPECT_GT(alive_ids.count(50), 0); + ASSERT_TRUE(alive_ids.has_value()); + EXPECT_EQ(alive_ids->size(), 1); + EXPECT_GT(alive_ids->count(50), 0); } } CATCH @@ -1732,8 +1738,9 @@ try auto outdated_entries = dir->gcInMemEntries(); EXPECT_TRUE(outdated_entries.empty()); auto alive_ex_id = dir->getAliveExternalIds(TEST_NAMESPACE_ID); - ASSERT_EQ(alive_ex_id.size(), 1); - ASSERT_EQ(*alive_ex_id.begin(), 10); + ASSERT_TRUE(alive_ex_id.has_value()); + ASSERT_EQ(alive_ex_id->size(), 1); + ASSERT_EQ(*alive_ex_id->begin(), 10); } // del 11->ext @@ -1747,7 +1754,8 @@ try auto outdated_entries = dir->gcInMemEntries(); EXPECT_EQ(0, outdated_entries.size()); auto alive_ex_id = dir->getAliveExternalIds(TEST_NAMESPACE_ID); - ASSERT_EQ(alive_ex_id.size(), 0); + ASSERT_TRUE(alive_ex_id.has_value()); + ASSERT_EQ(alive_ex_id->size(), 0); } } CATCH @@ -1889,10 +1897,11 @@ try EXPECT_ANY_THROW(getEntry(restored_dir, 2, temp_snap)); EXPECT_ANY_THROW(getEntry(restored_dir, 3, temp_snap)); auto alive_ex = restored_dir->getAliveExternalIds(TEST_NAMESPACE_ID); - EXPECT_EQ(alive_ex.size(), 3); - EXPECT_GT(alive_ex.count(10), 0); - EXPECT_GT(alive_ex.count(20), 0); - EXPECT_GT(alive_ex.count(30), 0); + ASSERT_TRUE(alive_ex.has_value()); + EXPECT_EQ(alive_ex->size(), 3); + EXPECT_GT(alive_ex->count(10), 0); + EXPECT_GT(alive_ex->count(20), 0); + EXPECT_GT(alive_ex->count(30), 0); EXPECT_SAME_ENTRY(getEntry(restored_dir, 50, temp_snap), entry_50); EXPECT_SAME_ENTRY(getEntry(restored_dir, 60, temp_snap), entry_60); }; @@ -1927,15 +1936,16 @@ try EXPECT_ANY_THROW(getEntry(restored_dir, 2, temp_snap)); EXPECT_ANY_THROW(getEntry(restored_dir, 3, temp_snap)); auto alive_ex = restored_dir->getAliveExternalIds(TEST_NAMESPACE_ID); - EXPECT_EQ(alive_ex.size(), 2); - EXPECT_GT(alive_ex.count(10), 0); + ASSERT_TRUE(alive_ex.has_value()); + EXPECT_EQ(alive_ex->size(), 2); + EXPECT_GT(alive_ex->count(10), 0); EXPECT_EQ(getNormalPageIdU64(restored_dir, 11, temp_snap), 10); - EXPECT_GT(alive_ex.count(20), 0); + EXPECT_GT(alive_ex->count(20), 0); EXPECT_EQ(getNormalPageIdU64(restored_dir, 21, temp_snap), 20); EXPECT_EQ(getNormalPageIdU64(restored_dir, 22, temp_snap), 20); - EXPECT_EQ(alive_ex.count(30), 0); // removed + EXPECT_EQ(alive_ex->count(30), 0); // removed EXPECT_ANY_THROW(getEntry(restored_dir, 50, temp_snap)); EXPECT_SAME_ENTRY(getEntry(restored_dir, 51, temp_snap), entry_50); @@ -1967,10 +1977,11 @@ try EXPECT_ANY_THROW(getEntry(restored_dir, 2, temp_snap)); EXPECT_ANY_THROW(getEntry(restored_dir, 3, temp_snap)); auto alive_ex = restored_dir->getAliveExternalIds(TEST_NAMESPACE_ID); - EXPECT_EQ(alive_ex.size(), 0); - EXPECT_EQ(alive_ex.count(10), 0); // removed - EXPECT_EQ(alive_ex.count(20), 0); // removed - EXPECT_EQ(alive_ex.count(30), 0); // removed + ASSERT_TRUE(alive_ex.has_value()); + EXPECT_EQ(alive_ex->size(), 0); + EXPECT_EQ(alive_ex->count(10), 0); // removed + EXPECT_EQ(alive_ex->count(20), 0); // removed + EXPECT_EQ(alive_ex->count(30), 0); // removed EXPECT_ANY_THROW(getEntry(restored_dir, 50, temp_snap)); EXPECT_SAME_ENTRY(getEntry(restored_dir, 51, temp_snap), entry_50); @@ -1999,10 +2010,11 @@ try EXPECT_ANY_THROW(getEntry(restored_dir, 2, temp_snap)); EXPECT_ANY_THROW(getEntry(restored_dir, 3, temp_snap)); auto alive_ex = restored_dir->getAliveExternalIds(TEST_NAMESPACE_ID); - EXPECT_EQ(alive_ex.size(), 0); - EXPECT_EQ(alive_ex.count(10), 0); // removed - EXPECT_EQ(alive_ex.count(20), 0); // removed - EXPECT_EQ(alive_ex.count(30), 0); // removed + ASSERT_TRUE(alive_ex.has_value()); + EXPECT_EQ(alive_ex->size(), 0); + EXPECT_EQ(alive_ex->count(10), 0); // removed + EXPECT_EQ(alive_ex->count(20), 0); // removed + EXPECT_EQ(alive_ex->count(30), 0); // removed EXPECT_ANY_THROW(getEntry(restored_dir, 50, temp_snap)); EXPECT_ANY_THROW(getEntry(restored_dir, 51, temp_snap)); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index d699701b56d..96b82bf4c2a 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -1265,9 +1265,22 @@ CATCH TEST_F(PageStorageTest, ConcurrencyAddExtCallbacks) try { + NamespaceId ns_id1 = TEST_NAMESPACE_ID; + NamespaceId ns_id2 = TEST_NAMESPACE_ID + 1; + { + WriteBatch wb(ns_id1); + wb.putExternal(20, 0); + page_storage->write(std::move(wb)); + } + { + WriteBatch wb(ns_id2); + wb.putExternal(20, 0); + page_storage->write(std::move(wb)); + } + auto ptr = std::make_shared(100); // mock the `StorageDeltaMerge` ExternalPageCallbacks callbacks; - callbacks.ns_id = TEST_NAMESPACE_ID; + callbacks.ns_id = ns_id1; callbacks.scanner = [ptr_weak_ref = std::weak_ptr(ptr)]() -> ExternalPageCallbacks::PathAndIdsVec { auto ptr = ptr_weak_ref.lock(); if (!ptr) @@ -1295,7 +1308,7 @@ try // mock table created while gc is running { ExternalPageCallbacks new_callbacks; - new_callbacks.ns_id = TEST_NAMESPACE_ID + 1; + new_callbacks.ns_id = ns_id2; new_callbacks.scanner = [ptr_weak_ref = std::weak_ptr(ptr)]() -> ExternalPageCallbacks::PathAndIdsVec { auto ptr = ptr_weak_ref.lock(); if (!ptr) diff --git a/dbms/src/Storages/PathPool.cpp b/dbms/src/Storages/PathPool.cpp index cc458872172..3e5232db3c6 100644 --- a/dbms/src/Storages/PathPool.cpp +++ b/dbms/src/Storages/PathPool.cpp @@ -134,6 +134,7 @@ StoragePathPool::StoragePathPool( // : database(std::move(database_)) , table(std::move(table_)) , path_need_database_name(path_need_database_name_) + , shutdown_called(false) , global_capacity(std::move(global_capacity_)) , file_provider(std::move(file_provider_)) , log(Logger::get("StoragePathPool")) @@ -161,6 +162,7 @@ StoragePathPool::StoragePathPool(StoragePathPool && rhs) noexcept , table(std::move(rhs.table)) , dt_file_path_map(std::move(rhs.dt_file_path_map)) , path_need_database_name(rhs.path_need_database_name) + , shutdown_called(rhs.shutdown_called.load()) , global_capacity(std::move(rhs.global_capacity)) , file_provider(std::move(rhs.file_provider)) , log(std::move(rhs.log)) @@ -176,6 +178,7 @@ StoragePathPool & StoragePathPool::operator=(StoragePathPool && rhs) database.swap(rhs.database); table.swap(rhs.table); path_need_database_name = rhs.path_need_database_name; + shutdown_called = rhs.shutdown_called.load(); global_capacity.swap(rhs.global_capacity); file_provider.swap(rhs.file_provider); log.swap(rhs.log); diff --git a/dbms/src/Storages/PathPool.h b/dbms/src/Storages/PathPool.h index 28e14035802..3a7cb7191f9 100644 --- a/dbms/src/Storages/PathPool.h +++ b/dbms/src/Storages/PathPool.h @@ -411,7 +411,7 @@ class PSDiskDelegatorGlobalSingle : public PSDiskDelegator PathPool::PageFilePathMap page_path_map; }; -/// A class to manage paths for the specified storage. +/// A class to manage paths for a specified physical table. class StoragePathPool { public: @@ -448,6 +448,10 @@ class StoragePathPool void drop(bool recursive, bool must_success = true); + void shutdown() { shutdown_called.store(true); } + + bool isShutdown() const { return shutdown_called.load(); } + DISALLOW_COPY(StoragePathPool); StoragePathPool(StoragePathPool && rhs) noexcept; @@ -496,6 +500,8 @@ class StoragePathPool bool path_need_database_name = false; + std::atomic shutdown_called; + PathCapacityMetricsPtr global_capacity; FileProviderPtr file_provider; diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 475c4bf8eb6..8c234e47377 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -1555,6 +1555,9 @@ void StorageDeltaMerge::startup() tmt.getStorages().put(std::static_pointer_cast(shared_from_this())); } +// Avoid calling virtual function `shutdown` in destructor, +// we should use this function instead. +// https://stackoverflow.com/a/12093250/4412495 void StorageDeltaMerge::shutdownImpl() { bool v = false;