From 443b1182d3441efb7a024f4fe2725a7517791ea7 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 27 Oct 2023 15:53:04 +0800 Subject: [PATCH] ddl: Fix frequent schema GC under multi tenant (#8248) (#8266) close pingcap/tiflash#8256 --- dbms/src/Databases/IDatabase.h | 4 +- .../Interpreters/InterpreterCreateQuery.cpp | 30 +-- dbms/src/Storages/KVStore/KVStore.h | 4 +- .../Storages/KVStore/Read/ReadIndexWorker.cpp | 11 +- .../Storages/KVStore/Read/ReadIndexWorker.h | 8 +- dbms/src/TiDB/Schema/SchemaSyncService.cpp | 191 +++++++++++------- dbms/src/TiDB/Schema/SchemaSyncService.h | 21 +- .../TiDB/Schema/tests/gtest_schema_sync.cpp | 44 ++++ 8 files changed, 207 insertions(+), 106 deletions(-) diff --git a/dbms/src/Databases/IDatabase.h b/dbms/src/Databases/IDatabase.h index 154200ae87a..db1d24f6bca 100644 --- a/dbms/src/Databases/IDatabase.h +++ b/dbms/src/Databases/IDatabase.h @@ -49,7 +49,7 @@ class IDatabaseIterator virtual const String & name() const = 0; virtual StoragePtr & table() const = 0; - virtual ~IDatabaseIterator() {} + virtual ~IDatabaseIterator() = default; }; using DatabaseIteratorPtr = std::unique_ptr; @@ -154,7 +154,7 @@ class IDatabase : public std::enable_shared_from_this /// Delete metadata, the deletion of which differs from the recursive deletion of the directory, if any. virtual void drop(const Context & context) = 0; - virtual ~IDatabase() {} + virtual ~IDatabase() = default; }; using DatabasePtr = std::shared_ptr; diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index e538b29e674..4cb4d112c71 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -576,31 +576,31 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) // Thus, we choose to do a retry here to wait the table created completed. if (e.code() == ErrorCodes::TABLE_ALREADY_EXISTS || e.code() == ErrorCodes::DDL_GUARD_IS_ACTIVE) { + auto log = Logger::get(fmt::format("InterpreterCreateQuery {} {}", database_name, table_name)); LOG_WARNING( - Logger::get("InterpreterCreateQuery"), + log, "createTable failed with error code is {}, error info is {}, stack_info is {}", e.code(), e.displayText(), e.getStackTrace().toString()); - for (int i = 0; i < 20; i++) // retry for 400ms + const size_t max_retry = 50; + const int wait_useconds = 20000; + for (size_t i = 0; i < max_retry; i++) // retry { if (context.isTableExist(database_name, table_name)) - { return {}; - } - else - { - const int wait_useconds = 20000; - LOG_ERROR( - Logger::get("InterpreterCreateQuery"), - "createTable failed but table not exist now, \nWe will sleep for {} ms and try again.", - wait_useconds / 1000); - usleep(wait_useconds); // sleep 20ms - } + + // sleep a while and retry + LOG_ERROR( + log, + "createTable failed but table not exist now, \nWe will sleep for {} ms and try again.", + wait_useconds / 1000); + usleep(wait_useconds); // sleep 20ms } LOG_ERROR( - Logger::get("InterpreterCreateQuery"), - "still failed to createTable in InterpreterCreateQuery for retry 20 times"); + log, + "still failed to createTable in InterpreterCreateQuery for retry {} times", + max_retry); e.rethrow(); } else diff --git a/dbms/src/Storages/KVStore/KVStore.h b/dbms/src/Storages/KVStore/KVStore.h index 006b4bc2861..a65167044a3 100644 --- a/dbms/src/Storages/KVStore/KVStore.h +++ b/dbms/src/Storages/KVStore/KVStore.h @@ -225,10 +225,10 @@ class KVStore final : private boost::noncopyable /// Create `runner_cnt` threads to run ReadIndexWorker asynchronously and automatically. /// If there is other runtime framework, DO NOT invoke it. - void asyncRunReadIndexWorkers(); + void asyncRunReadIndexWorkers() const; /// Stop workers after there is no more read-index task. - void stopReadIndexWorkers(); + void stopReadIndexWorkers() const; /// TODO: if supported by runtime framework, run one round for specific runner by `id`. void runOneRoundOfReadIndexRunner(size_t runner_id); diff --git a/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp b/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp index fe59382883b..624cb441b77 100644 --- a/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp +++ b/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp @@ -755,7 +755,7 @@ ReadIndexWorkerManager::ReadIndexWorkerManager( ReadIndexWorkerManager::FnGetTickTime && fn_min_dur_handle_region, size_t runner_cnt) : proxy_helper(proxy_helper_) - , logger(&Poco::Logger::get("ReadIndexWorkers")) + , logger(Logger::get("ReadIndexWorkers")) { for (size_t i = 0; i < runner_cnt; ++i) runners.emplace_back(std::make_unique( @@ -931,7 +931,7 @@ void KVStore::initReadIndexWorkers( read_index_worker_manager = ptr; } -void KVStore::asyncRunReadIndexWorkers() +void KVStore::asyncRunReadIndexWorkers() const { if (!read_index_worker_manager) return; @@ -940,13 +940,12 @@ void KVStore::asyncRunReadIndexWorkers() read_index_worker_manager->asyncRun(); } -void KVStore::stopReadIndexWorkers() +void KVStore::stopReadIndexWorkers() const { if (!read_index_worker_manager) return; assert(this->proxy_helper); - read_index_worker_manager->stop(); } @@ -1012,13 +1011,13 @@ ReadIndexWorkerManager::ReadIndexRunner::ReadIndexRunner( size_t id_, size_t runner_cnt_, ReadIndexWorkers & workers_, - Poco::Logger * logger_, + LoggerPtr logger_, FnGetTickTime fn_min_dur_handle_region_, AsyncWaker::NotifierPtr global_notifier_) : id(id_) , runner_cnt(runner_cnt_) , workers(workers_) - , logger(logger_) + , logger(std::move(logger_)) , fn_min_dur_handle_region(std::move(fn_min_dur_handle_region_)) , global_notifier(std::move(global_notifier_)) {} diff --git a/dbms/src/Storages/KVStore/Read/ReadIndexWorker.h b/dbms/src/Storages/KVStore/Read/ReadIndexWorker.h index 66fe4896cb6..9c4854997d5 100644 --- a/dbms/src/Storages/KVStore/Read/ReadIndexWorker.h +++ b/dbms/src/Storages/KVStore/Read/ReadIndexWorker.h @@ -136,14 +136,14 @@ class ReadIndexWorkerManager : boost::noncopyable size_t id_, size_t runner_cnt_, ReadIndexWorkers & workers_, - Poco::Logger * logger_, + LoggerPtr logger_, FnGetTickTime fn_min_dur_handle_region_, AsyncWaker::NotifierPtr global_notifier_); const size_t id; const size_t runner_cnt; ReadIndexWorkers & workers; - Poco::Logger * logger; + LoggerPtr logger; const FnGetTickTime fn_min_dur_handle_region; /// The workers belonged to runner share same notifier. AsyncWaker::NotifierPtr global_notifier; @@ -157,7 +157,7 @@ class ReadIndexWorkerManager : boost::noncopyable std::vector> runners; /// Each worker controls read-index process of region(region_id % worker_cnt == worker_id). ReadIndexWorkers workers; - Poco::Logger * logger; + LoggerPtr logger; }; struct ReadIndexNotifyCtrl; @@ -359,4 +359,4 @@ struct MockStressTestCfg static bool enable; }; -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/TiDB/Schema/SchemaSyncService.cpp b/dbms/src/TiDB/Schema/SchemaSyncService.cpp index fcb67db8c99..3b1f5d51606 100644 --- a/dbms/src/TiDB/Schema/SchemaSyncService.cpp +++ b/dbms/src/TiDB/Schema/SchemaSyncService.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -33,10 +34,6 @@ namespace ErrorCodes extern const int DEADLOCK_AVOIDED; } // namespace ErrorCodes -// TODO: make this interval configurable -// constexpr size_t interval_seconds = 60; -// bg_ddl_sync_schema_interval - SchemaSyncService::SchemaSyncService(DB::Context & context_) : context(context_) , background_pool(context_.getBackgroundPool()) @@ -63,56 +60,56 @@ void SchemaSyncService::addKeyspaceGCTasks() for (auto const iter : keyspaces) { auto keyspace = iter.first; - if (!keyspace_handle_map.count(keyspace)) - { - auto ks_log = log->getChild(fmt::format("keyspace={}", keyspace)); - LOG_INFO(ks_log, "add sync schema task"); - auto task_handle = background_pool.addTask( - [&, this, keyspace, ks_log]() noexcept { - String stage; - bool done_anything = false; - try - { - /// Do sync schema first, then gc. - /// They must be performed synchronously, - /// otherwise table may get mis-GC-ed if RECOVER was not properly synced caused by schema sync pause but GC runs too aggressively. - // GC safe point must be obtained ahead of syncing schema. - auto gc_safe_point - = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient(), keyspace); - stage = "Sync schemas"; - done_anything = syncSchemas(keyspace); - if (done_anything) - GET_METRIC(tiflash_schema_trigger_count, type_timer).Increment(); - - stage = "GC"; - done_anything = gc(gc_safe_point, keyspace); - - return done_anything; - } - catch (const Exception & e) - { - LOG_ERROR( - ks_log, - "{} failed by {} \n stack : {}", - stage, - e.displayText(), - e.getStackTrace().toString()); - } - catch (const Poco::Exception & e) - { - LOG_ERROR(ks_log, "{} failed by {}", stage, e.displayText()); - } - catch (const std::exception & e) - { - LOG_ERROR(ks_log, "{} failed by {}", stage, e.what()); - } - return false; - }, - false, - context.getSettingsRef().ddl_sync_interval_seconds * 1000); - - keyspace_handle_map.emplace(keyspace, task_handle); - } + if (keyspace_handle_map.contains(keyspace)) + continue; + + auto ks_log = log->getChild(fmt::format("keyspace={}", keyspace)); + LOG_INFO(ks_log, "add sync schema task"); + auto task_handle = background_pool.addTask( + [&, this, keyspace, ks_log]() noexcept { + String stage; + bool done_anything = false; + try + { + /// Do sync schema first, then gc. + /// They must be performed synchronously, + /// otherwise table may get mis-GC-ed if RECOVER was not properly synced caused by schema sync pause but GC runs too aggressively. + // GC safe point must be obtained ahead of syncing schema. + auto gc_safe_point + = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient(), keyspace); + stage = "Sync schemas"; + done_anything = syncSchemas(keyspace); + if (done_anything) + GET_METRIC(tiflash_schema_trigger_count, type_timer).Increment(); + + stage = "GC"; + done_anything = gc(gc_safe_point, keyspace); + + return done_anything; + } + catch (const Exception & e) + { + LOG_ERROR( + ks_log, + "{} failed by {} \n stack : {}", + stage, + e.displayText(), + e.getStackTrace().toString()); + } + catch (const Poco::Exception & e) + { + LOG_ERROR(ks_log, "{} failed by {}", stage, e.displayText()); + } + catch (const std::exception & e) + { + LOG_ERROR(ks_log, "{} failed by {}", stage, e.what()); + } + return false; + }, + false, + context.getSettingsRef().ddl_sync_interval_seconds * 1000); + + keyspace_handle_map.emplace(keyspace, task_handle); } } @@ -138,12 +135,19 @@ void SchemaSyncService::removeKeyspaceGCTasks() context.getTMTContext().getSchemaSyncerManager()->removeSchemaSyncer(keyspace); PDClientHelper::remove_ks_gc_sp(keyspace); + + keyspace_gc_context.erase(keyspace); } } SchemaSyncService::~SchemaSyncService() { - background_pool.removeTask(handle); + if (handle) + { + // stop the root handle first + background_pool.removeTask(handle); + } + for (auto const & iter : keyspace_handle_map) { auto task_handle = iter.second; @@ -158,21 +162,40 @@ bool SchemaSyncService::syncSchemas(KeyspaceID keyspace_id) } template -inline bool isSafeForGC(const DatabaseOrTablePtr & ptr, Timestamp gc_safe_point) +inline std::tuple isSafeForGC(const DatabaseOrTablePtr & ptr, Timestamp gc_safepoint) { - return ptr->isTombstone() && ptr->getTombstone() < gc_safe_point; + const auto tombstone_ts = ptr->getTombstone(); + return {tombstone_ts != 0 && tombstone_ts < gc_safepoint, tombstone_ts}; } -bool SchemaSyncService::gc(Timestamp gc_safe_point, KeyspaceID keyspace_id) +Timestamp SchemaSyncService::lastGcSafePoint(KeyspaceID keyspace_id) const { - auto & tmt_context = context.getTMTContext(); - if (gc_safe_point == gc_context.last_gc_safe_point) + std::shared_lock lock(keyspace_map_mutex); + auto iter = keyspace_gc_context.find(keyspace_id); + if (iter == keyspace_gc_context.end()) + return 0; + return iter->second.last_gc_safepoint; +} + +void SchemaSyncService::updateLastGcSafepoint(KeyspaceID keyspace_id, Timestamp gc_safepoint) +{ + std::unique_lock lock(keyspace_map_mutex); + keyspace_gc_context[keyspace_id].last_gc_safepoint = gc_safepoint; +} + +bool SchemaSyncService::gc(Timestamp gc_safepoint, KeyspaceID keyspace_id) +{ + const Timestamp last_gc_safepoint = lastGcSafePoint(keyspace_id); + if (last_gc_safepoint != 0 && gc_safepoint == last_gc_safepoint) return false; auto keyspace_log = log->getChild(fmt::format("keyspace={}", keyspace_id)); + LOG_INFO(keyspace_log, "Schema GC begin, last_safepoint={} safepoint={}", last_gc_safepoint, gc_safepoint); - LOG_INFO(keyspace_log, "Performing GC using safe point {}", gc_safe_point); + size_t num_tables_removed = 0; + size_t num_databases_removed = 0; + auto & tmt_context = context.getTMTContext(); // The storages that are ready for gc std::vector> storages_to_gc; // Get a snapshot of database @@ -190,11 +213,22 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point, KeyspaceID keyspace_id) if (!managed_storage) continue; - if (isSafeForGC(db, gc_safe_point) || isSafeForGC(managed_storage, gc_safe_point)) + const auto & [database_is_stale, db_tombstone] = isSafeForGC(db, gc_safepoint); + const auto & [table_is_stale, table_tombstone] = isSafeForGC(managed_storage, gc_safepoint); + if (database_is_stale || table_is_stale) { // Only keep a weak_ptr on storage so that the memory can be free as soon as // it is dropped. storages_to_gc.emplace_back(std::weak_ptr(managed_storage)); + LOG_INFO( + log, + "Detect stale table, database_name={} table_name={} database_tombstone={} table_tombstone={} " + "safepoint={}", + managed_storage->getDatabaseName(), + managed_storage->getTableName(), + db_tombstone, + table_tombstone, + gc_safepoint); } } } @@ -228,7 +262,12 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point, KeyspaceID keyspace_id) SchemaNameMapper().debugTableName(table_info), table_info.id); }(); - LOG_INFO(keyspace_log, "Physically dropping table {}", canonical_name); + LOG_INFO( + keyspace_log, + "Physically dropping table, table_tombstone={} safepoint={} {}", + storage->getTombstone(), + gc_safepoint, + canonical_name); auto drop_query = std::make_shared(); drop_query->database = std::move(database_name); drop_query->table = std::move(table_name); @@ -240,6 +279,7 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point, KeyspaceID keyspace_id) InterpreterDropQuery drop_interpreter(ast_drop_query, context); drop_interpreter.execute(); LOG_INFO(keyspace_log, "Physically dropped table {}", canonical_name); + ++num_tables_removed; } catch (DB::Exception & e) { @@ -260,7 +300,10 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point, KeyspaceID keyspace_id) { const auto & db = iter.second; auto ks_db_id = SchemaNameMapper::getMappedNameKeyspaceID(iter.first); - if (!isSafeForGC(db, gc_safe_point) || ks_db_id != keyspace_id) + if (ks_db_id != keyspace_id) + continue; + const auto & [db_is_stale, db_tombstone] = isSafeForGC(db, gc_safepoint); + if (!db_is_stale) continue; const auto & db_name = iter.first; @@ -279,7 +322,7 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point, KeyspaceID keyspace_id) continue; } - LOG_INFO(keyspace_log, "Physically dropping database {}", db_name); + LOG_INFO(keyspace_log, "Physically dropping database, database_tombstone={} {}", db->getTombstone(), db_name); auto drop_query = std::make_shared(); drop_query->database = db_name; drop_query->if_exists = true; @@ -289,7 +332,8 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point, KeyspaceID keyspace_id) { InterpreterDropQuery drop_interpreter(ast_drop_query, context); drop_interpreter.execute(); - LOG_INFO(keyspace_log, "Physically dropped database {}", db_name); + LOG_INFO(keyspace_log, "Physically dropped database {}, safepoint={}", db_name, gc_safepoint); + ++num_databases_removed; } catch (DB::Exception & e) { @@ -305,13 +349,22 @@ bool SchemaSyncService::gc(Timestamp gc_safe_point, KeyspaceID keyspace_id) if (succeeded) { - gc_context.last_gc_safe_point = gc_safe_point; - LOG_INFO(keyspace_log, "Performed GC using safe point {}", gc_safe_point); + updateLastGcSafepoint(keyspace_id, gc_safepoint); + LOG_INFO( + keyspace_log, + "Schema GC done, tables_removed={} databases_removed={} safepoint={}", + num_tables_removed, + num_databases_removed, + gc_safepoint); } else { // Don't update last_gc_safe_point and retry later - LOG_INFO(keyspace_log, "Performed GC using safe point {} meet error, will try again later", gc_safe_point); + LOG_INFO( + keyspace_log, + "Schema GC meet error, will try again later, last_safepoint={} safepoint={}", + last_gc_safepoint, + gc_safepoint); } return true; diff --git a/dbms/src/TiDB/Schema/SchemaSyncService.h b/dbms/src/TiDB/Schema/SchemaSyncService.h index 59a98acf1e1..8551c1cd4d4 100644 --- a/dbms/src/TiDB/Schema/SchemaSyncService.h +++ b/dbms/src/TiDB/Schema/SchemaSyncService.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -24,7 +25,6 @@ namespace DB { -class Context; class BackgroundProcessingPool; class Logger; using LoggerPtr = std::shared_ptr; @@ -43,30 +43,35 @@ class SchemaSyncService explicit SchemaSyncService(Context & context_); ~SchemaSyncService(); + bool gc(Timestamp gc_safepoint, KeyspaceID keyspace_id); + private: bool syncSchemas(KeyspaceID keyspace_id); - struct GCContext - { - Timestamp last_gc_safe_point = 0; - } gc_context; - - bool gc(Timestamp gc_safe_point, KeyspaceID keyspace_id); - void addKeyspaceGCTasks(); void removeKeyspaceGCTasks(); + Timestamp lastGcSafePoint(KeyspaceID keyspace_id) const; + void updateLastGcSafepoint(KeyspaceID keyspace_id, Timestamp gc_safepoint); + private: Context & context; friend void dbgFuncGcSchemas(Context &, const ASTs &, DBGInvokerPrinter); + struct KeyspaceGCContext + { + Timestamp last_gc_safepoint = 0; + }; + BackgroundProcessingPool & background_pool; + // The background task handle for adding/removing task for all keyspaces BackgroundProcessingPool::TaskHandle handle; mutable std::shared_mutex keyspace_map_mutex; // Handles for each keyspace schema sync task. std::unordered_map keyspace_handle_map; + std::unordered_map keyspace_gc_context; LoggerPtr log; }; diff --git a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp index cd0f1bd7f9f..cd11a4127d1 100644 --- a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp +++ b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp @@ -254,6 +254,50 @@ try } CATCH +TEST_F(SchemaSyncTest, PhysicalDropTable) +try +{ + auto pd_client = global_ctx.getTMTContext().getPDClient(); + + const String db_name = "mock_db"; + MockTiDB::instance().newDataBase(db_name); + + auto cols = ColumnsDescription({ + {"col1", typeFromString("String")}, + {"col2", typeFromString("Int64")}, + }); + // table_name, cols, pk_name + std::vector> tables{ + {"t1", cols, ""}, + {"t2", cols, ""}, + }; + auto table_ids = MockTiDB::instance().newTables(db_name, tables, pd_client->getTS(), "dt"); + + refreshSchema(); + + for (auto table_id : table_ids) + { + refreshTableSchema(table_id); + } + + mustGetSyncedTableByName(db_name, "t1"); + mustGetSyncedTableByName(db_name, "t2"); + + MockTiDB::instance().dropTable(global_ctx, db_name, "t1", true); + + refreshSchema(); + + global_ctx.initializeSchemaSyncService(); + auto sync_service = global_ctx.getSchemaSyncService(); + ASSERT_TRUE(sync_service->gc(10000000, NullspaceID)); + // run gc with the same safepoint, will be skip + ASSERT_FALSE(sync_service->gc(10000000, NullspaceID)); + // run gc for another keyspace + ASSERT_TRUE(sync_service->gc(20000000, 1024)); + ASSERT_FALSE(sync_service->gc(20000000, 1024)); +} +CATCH + TEST_F(SchemaSyncTest, RenamePartitionTable) try {