diff --git a/dbms/src/Interpreters/SharedContexts/Disagg.cpp b/dbms/src/Interpreters/SharedContexts/Disagg.cpp index eee4d0f4547..c4aa7fbf650 100644 --- a/dbms/src/Interpreters/SharedContexts/Disagg.cpp +++ b/dbms/src/Interpreters/SharedContexts/Disagg.cpp @@ -105,4 +105,13 @@ void SharedContextDisagg::initFastAddPeerContext(UInt64 fap_concur) fap_context = std::make_shared(fap_concur); } + +SharedContextDisagg::~SharedContextDisagg() +{ + if (fap_context) + { + fap_context->shutdown(); + } +} + } // namespace DB diff --git a/dbms/src/Interpreters/SharedContexts/Disagg.h b/dbms/src/Interpreters/SharedContexts/Disagg.h index 07a619a4907..2924a36dd58 100644 --- a/dbms/src/Interpreters/SharedContexts/Disagg.h +++ b/dbms/src/Interpreters/SharedContexts/Disagg.h @@ -79,6 +79,8 @@ struct SharedContextDisagg : private boost::noncopyable : global_context(global_context_) {} + ~SharedContextDisagg(); + void initReadNodePageCache(const PathPool & path_pool, const String & cache_dir, size_t cache_capacity); /// Note that the unit of max_size is quantity, not byte size. It controls how diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index 9ff701d1a45..b16499c4dee 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -438,6 +438,7 @@ KVStore::~KVStore() LOG_INFO(log, "Destroy KVStore"); stopThreadAllocInfo(); releaseReadIndexWorkers(); + LOG_INFO(log, "Destroy KVStore Finished"); } FileUsageStatistics KVStore::getFileUsageStatistics() const @@ -611,6 +612,7 @@ void KVStore::stopThreadAllocInfo() is_terminated = true; monitoring_cv.notify_all(); } + LOG_INFO(log, "KVStore shutdown, wait thread alloc monitor join"); monitoring_thread->join(); delete monitoring_thread; monitoring_thread = nullptr; diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp index 588ec80a9f3..62e9bcb7962 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp @@ -43,6 +43,11 @@ FastAddPeerContext::FastAddPeerContext(uint64_t thread_count) tasks_trace = std::make_shared(thread_count, thread_count, 1000); } +void FastAddPeerContext::shutdown() const +{ + tasks_trace->shutdown(); +} + ParsedCheckpointDataHolderPtr FastAddPeerContext::CheckpointCacheElement::getParsedCheckpointData(Context & context) { std::scoped_lock lock(mu); diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h index 6c1411e0ab9..0ec62668cbc 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h @@ -22,6 +22,7 @@ namespace DB { +class FastAddPeerContext; using FAPAsyncTasks = AsyncTasks, FastAddPeerRes>; struct CheckpointInfo; using CheckpointInfoPtr = std::shared_ptr; @@ -34,6 +35,7 @@ class FastAddPeerContext { public: explicit FastAddPeerContext(uint64_t thread_count = 0); + void shutdown() const; // Return parsed checkpoint data and its corresponding seq which is newer than `required_seq` if exists, otherwise return pair std::pair getNewerCheckpointData( diff --git a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp index 1da417b5d59..00b516b287e 100644 --- a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp +++ b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp @@ -325,13 +325,14 @@ RegionsReadIndexResult LearnerReadWorker::readIndex( log_lvl, "[Learner Read] Batch read index, num_regions={} num_requests={} num_stale_read={} num_cached_index={} " "num_unavailable={} " - "cost={}ms", + "cost={}ms, read_tso={}", stats.num_regions, stats.num_read_index_request, stats.num_stale_read, stats.num_cached_read_index, unavailable_regions.size(), - stats.read_index_elapsed_ms); + stats.read_index_elapsed_ms, + mvcc_query_info.read_tso); return batch_read_index_result; } @@ -427,10 +428,11 @@ void LearnerReadWorker::waitIndex( LOG_IMPL( log, log_lvl, - "[Learner Read] Finish wait index and resolve locks, wait_cost={}ms n_regions={} n_unavailable={}", + "[Learner Read] Finish wait index and resolve locks, wait_cost={}ms n_regions={} n_unavailable={}, read_tso={}", stats.wait_index_elapsed_ms, stats.num_regions, - unavailable_regions.size()); + unavailable_regions.size(), + mvcc_query_info.read_tso); } std::tuple // @@ -469,13 +471,14 @@ LearnerReadWorker::waitUntilDataAvailable( log, log_lvl, "[Learner Read] batch read index | wait index" - " total_cost={} read_cost={} wait_cost={} n_regions={} n_stale_read={} n_unavailable={}", + " total_cost={} read_cost={} wait_cost={} n_regions={} n_stale_read={} n_unavailable={}, read_tso={}", time_elapsed_ms, stats.read_index_elapsed_ms, stats.wait_index_elapsed_ms, stats.num_regions, stats.num_stale_read, - unavailable_regions.size()); + unavailable_regions.size(), + mvcc_query_info.read_tso); return {start_time, end_time}; } diff --git a/dbms/src/Storages/KVStore/Read/ReadIndex.cpp b/dbms/src/Storages/KVStore/Read/ReadIndex.cpp index 273e85a58c8..e6309d1e5b1 100644 --- a/dbms/src/Storages/KVStore/Read/ReadIndex.cpp +++ b/dbms/src/Storages/KVStore/Read/ReadIndex.cpp @@ -368,6 +368,7 @@ void KVStore::stopReadIndexWorkers() const void KVStore::releaseReadIndexWorkers() { + LOG_INFO(log, "KVStore shutdown, deleting read index worker"); if (read_index_worker_manager) { delete read_index_worker_manager; diff --git a/dbms/src/Storages/KVStore/Utils/AsyncTasks.h b/dbms/src/Storages/KVStore/Utils/AsyncTasks.h index de5e0839bb2..e497a230f89 100644 --- a/dbms/src/Storages/KVStore/Utils/AsyncTasks.h +++ b/dbms/src/Storages/KVStore/Utils/AsyncTasks.h @@ -53,7 +53,24 @@ struct AsyncTasks , log(DB::Logger::get()) {} - ~AsyncTasks() { LOG_INFO(log, "Pending {} tasks when destructing", count()); } + void shutdown() + { + LOG_INFO(log, "Pending {} tasks when destructing", count()); + // To avoid the "last owner" problem in worker thread. + thread_pool->wait(); + shut.store(true); + LOG_INFO(log, "Finish finalize thread pool"); + } + + ~AsyncTasks() + { + if (!shut.load()) + { + LOG_INFO(log, "Destruct without shutdown"); + // Potential deadlock if the instance is held and released directly or indirectly by a task in its worker. + shutdown(); + } + } using TaskState = AsyncTaskHelper::TaskState; @@ -241,6 +258,8 @@ struct AsyncTasks // 1. There is already a task registered with the same name and not canceled or fetched. bool addTaskWithCancel(Key k, Func f, CancelFunc cf) { + if (shut.load()) + return false; std::scoped_lock l(mtx); RUNTIME_CHECK(!tasks.contains(k)); using P = std::packaged_task; @@ -399,5 +418,6 @@ struct AsyncTasks std::unique_ptr thread_pool; mutable std::mutex mtx; LoggerPtr log; + std::atomic_bool shut = false; }; } // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp b/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp index 1160137161f..e988b1e40b9 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp @@ -28,17 +28,18 @@ TEST(AsyncTasksTest, AsyncTasksNormal) auto log = DB::Logger::get(); LOG_INFO(log, "Cancel and addTask"); // Cancel and addTask - // 3 -> 1 -> 4 -> 2 { auto async_tasks = std::make_unique(1, 1, 2); auto m = std::make_shared(); int flag = 0; std::unique_lock cl(*m); std::atomic_bool finished_flag = false; - async_tasks->addTask(1, [m, &flag, &async_tasks, &finished_flag]() { + std::atomic_bool running_flag = false; + async_tasks->addTask(1, [m, &flag, &async_tasks, &finished_flag, &running_flag]() { + running_flag.store(true, std::memory_order_seq_cst); auto cancel_handle = async_tasks->getCancelHandleFromExecutor(1); - std::scoped_lock rl(*m); // 2 - SCOPE_EXIT({ finished_flag.store(true); }); + std::scoped_lock rl(*m); + SCOPE_EXIT({ finished_flag.store(true, std::memory_order_seq_cst); }); // Run after `cl` is released. if (cancel_handle->isCanceled()) { @@ -46,17 +47,31 @@ TEST(AsyncTasksTest, AsyncTasksNormal) } flag = 1; }); + ASSERT_TRUE(async_tasks->isScheduled(1)); + { + int cnt_wait_sche = 0; + while (!running_flag.load(std::memory_order_seq_cst)) + { + cnt_wait_sche += 1; + ASSERT(cnt_wait_sche < 6); + std::this_thread::sleep_for(200ms); + } + } + // Make sure we don't cancel in queue. async_tasks->asyncCancelTask(1); + // The task is not registered anymore. ASSERT_FALSE(async_tasks->isScheduled(1)); async_tasks->addTask(1, [&flag]() { flag = 2; }); - cl.unlock(); // Now can task 1 run. - int count = 0; - using namespace std::chrono_literals; - while (!finished_flag.load()) + cl.unlock(); { - count += 1; - ASSERT(count < 6); - std::this_thread::sleep_for(200ms); + int cnt_wait_finish = 0; + using namespace std::chrono_literals; + while (!finished_flag.load(std::memory_order_seq_cst)) + { + cnt_wait_finish += 1; + ASSERT(cnt_wait_finish < 6); + std::this_thread::sleep_for(200ms); + } } ASSERT_NO_THROW(async_tasks->fetchResult(1)); ASSERT_EQ(flag, 2); diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp index ff5e230169c..16426df352f 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp @@ -121,7 +121,7 @@ class RegionKVStoreTestFAP : public KVStoreTestBase { auto & global_context = TiFlashTestEnv::getGlobalContext(); KVStoreTestBase::TearDown(); - global_context.getSharedContextDisagg()->fap_context.reset(); + global_context.getSharedContextDisagg()->fap_context->shutdown(); if (!already_initialize_data_store) { global_context.getSharedContextDisagg()->remote_data_store = nullptr;