From f9e5f28458f7cfd693648f8d9974c1ea0629b2c5 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Wed, 17 Apr 2024 15:45:07 +0800 Subject: [PATCH 1/4] This is an automated cherry-pick of #8953 Signed-off-by: ti-chi-bot --- dbms/src/Common/TiFlashMetrics.h | 1 + .../Interpreters/SharedContexts/Disagg.cpp | 9 +++ dbms/src/Interpreters/SharedContexts/Disagg.h | 2 + dbms/src/Storages/KVStore/KVStore.cpp | 2 + .../MultiRaft/Disagg/FastAddPeerContext.cpp | 5 ++ .../MultiRaft/Disagg/FastAddPeerContext.h | 2 + .../KVStore/Read/LearnerReadWorker.cpp | 58 +++++++++++++++++-- dbms/src/Storages/KVStore/Read/ReadIndex.cpp | 1 + dbms/src/Storages/KVStore/Utils/AsyncTasks.h | 22 ++++++- .../tests/gtest_kvstore_fast_add_peer.cpp | 2 +- 10 files changed, 97 insertions(+), 7 deletions(-) diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 7496107d889..99ed06ac1f3 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -537,6 +537,7 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva "Raft read index events counter", \ Counter, \ F(type_bypass_lock, {{"type", "bypass_lock"}}), \ + F(type_zero_read_tso, {{"type", "zero_read_tso"}}), \ F(type_use_histroy, {{"type", "use_histroy"}}), \ F(type_use_cache, {{"type", "use_cache"}})) \ M(tiflash_raft_learner_read_failures_count, \ diff --git a/dbms/src/Interpreters/SharedContexts/Disagg.cpp b/dbms/src/Interpreters/SharedContexts/Disagg.cpp index eee4d0f4547..c4aa7fbf650 100644 --- a/dbms/src/Interpreters/SharedContexts/Disagg.cpp +++ b/dbms/src/Interpreters/SharedContexts/Disagg.cpp @@ -105,4 +105,13 @@ void SharedContextDisagg::initFastAddPeerContext(UInt64 fap_concur) fap_context = std::make_shared(fap_concur); } + +SharedContextDisagg::~SharedContextDisagg() +{ + if (fap_context) + { + fap_context->shutdown(); + } +} + } // namespace DB diff --git a/dbms/src/Interpreters/SharedContexts/Disagg.h b/dbms/src/Interpreters/SharedContexts/Disagg.h index 07a619a4907..2924a36dd58 100644 --- a/dbms/src/Interpreters/SharedContexts/Disagg.h +++ b/dbms/src/Interpreters/SharedContexts/Disagg.h @@ -79,6 +79,8 @@ struct SharedContextDisagg : private boost::noncopyable : global_context(global_context_) {} + ~SharedContextDisagg(); + void initReadNodePageCache(const PathPool & path_pool, const String & cache_dir, size_t cache_capacity); /// Note that the unit of max_size is quantity, not byte size. It controls how diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index 9ff701d1a45..b16499c4dee 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -438,6 +438,7 @@ KVStore::~KVStore() LOG_INFO(log, "Destroy KVStore"); stopThreadAllocInfo(); releaseReadIndexWorkers(); + LOG_INFO(log, "Destroy KVStore Finished"); } FileUsageStatistics KVStore::getFileUsageStatistics() const @@ -611,6 +612,7 @@ void KVStore::stopThreadAllocInfo() is_terminated = true; monitoring_cv.notify_all(); } + LOG_INFO(log, "KVStore shutdown, wait thread alloc monitor join"); monitoring_thread->join(); delete monitoring_thread; monitoring_thread = nullptr; diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp index 588ec80a9f3..62e9bcb7962 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp @@ -43,6 +43,11 @@ FastAddPeerContext::FastAddPeerContext(uint64_t thread_count) tasks_trace = std::make_shared(thread_count, thread_count, 1000); } +void FastAddPeerContext::shutdown() const +{ + tasks_trace->shutdown(); +} + ParsedCheckpointDataHolderPtr FastAddPeerContext::CheckpointCacheElement::getParsedCheckpointData(Context & context) { std::scoped_lock lock(mu); diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h index 6c1411e0ab9..0ec62668cbc 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h @@ -22,6 +22,7 @@ namespace DB { +class FastAddPeerContext; using FAPAsyncTasks = AsyncTasks, FastAddPeerRes>; struct CheckpointInfo; using CheckpointInfoPtr = std::shared_ptr; @@ -34,6 +35,7 @@ class FastAddPeerContext { public: explicit FastAddPeerContext(uint64_t thread_count = 0); + void shutdown() const; // Return parsed checkpoint data and its corresponding seq which is newer than `required_seq` if exists, otherwise return pair std::pair getNewerCheckpointData( diff --git a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp index 1da417b5d59..4916ee54043 100644 --- a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp +++ b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp @@ -117,6 +117,10 @@ std::vector LearnerReadWorker::buildBatchReadIndexReq // If using `std::numeric_limits::max()`, set `start-ts` 0 to get the latest index but let read-index-worker do not record as history. auto read_index_tso = mvcc_query_info.read_tso == std::numeric_limits::max() ? 0 : mvcc_query_info.read_tso; + if (read_index_tso == 0) + { + GET_METRIC(tiflash_raft_read_index_events_count, type_zero_read_tso).Increment(); + } for (const auto & region_to_query : regions_info) { const RegionID region_id = region_to_query.region_id; @@ -325,13 +329,14 @@ RegionsReadIndexResult LearnerReadWorker::readIndex( log_lvl, "[Learner Read] Batch read index, num_regions={} num_requests={} num_stale_read={} num_cached_index={} " "num_unavailable={} " - "cost={}ms", + "cost={}ms, read_tso={}", stats.num_regions, stats.num_read_index_request, stats.num_stale_read, stats.num_cached_read_index, unavailable_regions.size(), - stats.read_index_elapsed_ms); + stats.read_index_elapsed_ms, + mvcc_query_info.read_tso); return batch_read_index_result; } @@ -427,10 +432,52 @@ void LearnerReadWorker::waitIndex( LOG_IMPL( log, log_lvl, - "[Learner Read] Finish wait index and resolve locks, wait_cost={}ms n_regions={} n_unavailable={}", + "[Learner Read] Finish wait index and resolve locks, wait_cost={}ms n_regions={} n_unavailable={}, read_tso={}", stats.wait_index_elapsed_ms, stats.num_regions, +<<<<<<< HEAD unavailable_regions.size()); +======= + unavailable_regions.size(), + mvcc_query_info.read_tso); + + auto bypass_formatter = [&](const RegionQueryInfo & query_info) -> String { + if (query_info.bypass_lock_ts == nullptr) + return ""; + FmtBuffer buffer; + buffer.append("["); + buffer.joinStr( + query_info.bypass_lock_ts->begin(), + query_info.bypass_lock_ts->end(), + [](const auto & v, FmtBuffer & f) { f.fmtAppend("{}", v); }, + "|"); + buffer.append("]"); + return buffer.toString(); + }; + auto region_info_formatter = [&]() -> String { + FmtBuffer buffer; + buffer.joinStr( + regions_info.begin(), + regions_info.end(), + [&](const auto & region_to_query, FmtBuffer & f) { + const auto & region = regions_snapshot.find(region_to_query.region_id)->second; + f.fmtAppend( + "(id:{} applied_index:{} bypass_locks:{})", + region_to_query.region_id, + region->appliedIndex(), + bypass_formatter(region_to_query)); + }, + ";"); + return buffer.toString(); + }; + + LOG_DEBUG( + log, + "[Learner Read] Learner Read Summary, regions_info={}, unavailable_regions_info={}, read_tso={}", + region_info_formatter(), + unavailable_regions.toDebugString(), + mvcc_query_info.read_tso); +>>>>>>> 14a127820d (Fix AsyncTasks cancel deadlock (#8953)) } std::tuple // @@ -469,13 +516,14 @@ LearnerReadWorker::waitUntilDataAvailable( log, log_lvl, "[Learner Read] batch read index | wait index" - " total_cost={} read_cost={} wait_cost={} n_regions={} n_stale_read={} n_unavailable={}", + " total_cost={} read_cost={} wait_cost={} n_regions={} n_stale_read={} n_unavailable={}, read_tso={}", time_elapsed_ms, stats.read_index_elapsed_ms, stats.wait_index_elapsed_ms, stats.num_regions, stats.num_stale_read, - unavailable_regions.size()); + unavailable_regions.size(), + mvcc_query_info.read_tso); return {start_time, end_time}; } diff --git a/dbms/src/Storages/KVStore/Read/ReadIndex.cpp b/dbms/src/Storages/KVStore/Read/ReadIndex.cpp index 273e85a58c8..e6309d1e5b1 100644 --- a/dbms/src/Storages/KVStore/Read/ReadIndex.cpp +++ b/dbms/src/Storages/KVStore/Read/ReadIndex.cpp @@ -368,6 +368,7 @@ void KVStore::stopReadIndexWorkers() const void KVStore::releaseReadIndexWorkers() { + LOG_INFO(log, "KVStore shutdown, deleting read index worker"); if (read_index_worker_manager) { delete read_index_worker_manager; diff --git a/dbms/src/Storages/KVStore/Utils/AsyncTasks.h b/dbms/src/Storages/KVStore/Utils/AsyncTasks.h index de5e0839bb2..e497a230f89 100644 --- a/dbms/src/Storages/KVStore/Utils/AsyncTasks.h +++ b/dbms/src/Storages/KVStore/Utils/AsyncTasks.h @@ -53,7 +53,24 @@ struct AsyncTasks , log(DB::Logger::get()) {} - ~AsyncTasks() { LOG_INFO(log, "Pending {} tasks when destructing", count()); } + void shutdown() + { + LOG_INFO(log, "Pending {} tasks when destructing", count()); + // To avoid the "last owner" problem in worker thread. + thread_pool->wait(); + shut.store(true); + LOG_INFO(log, "Finish finalize thread pool"); + } + + ~AsyncTasks() + { + if (!shut.load()) + { + LOG_INFO(log, "Destruct without shutdown"); + // Potential deadlock if the instance is held and released directly or indirectly by a task in its worker. + shutdown(); + } + } using TaskState = AsyncTaskHelper::TaskState; @@ -241,6 +258,8 @@ struct AsyncTasks // 1. There is already a task registered with the same name and not canceled or fetched. bool addTaskWithCancel(Key k, Func f, CancelFunc cf) { + if (shut.load()) + return false; std::scoped_lock l(mtx); RUNTIME_CHECK(!tasks.contains(k)); using P = std::packaged_task; @@ -399,5 +418,6 @@ struct AsyncTasks std::unique_ptr thread_pool; mutable std::mutex mtx; LoggerPtr log; + std::atomic_bool shut = false; }; } // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp index ff5e230169c..16426df352f 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp @@ -121,7 +121,7 @@ class RegionKVStoreTestFAP : public KVStoreTestBase { auto & global_context = TiFlashTestEnv::getGlobalContext(); KVStoreTestBase::TearDown(); - global_context.getSharedContextDisagg()->fap_context.reset(); + global_context.getSharedContextDisagg()->fap_context->shutdown(); if (!already_initialize_data_store) { global_context.getSharedContextDisagg()->remote_data_store = nullptr; From b353daf8f3a18ec36210558c84d4bf8b43d0b6b9 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 22 Apr 2024 14:50:32 +0800 Subject: [PATCH 2/4] Remove unrelate code --- dbms/src/Common/TiFlashMetrics.h | 1 - .../KVStore/Read/LearnerReadWorker.cpp | 45 ------------------- 2 files changed, 46 deletions(-) diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 99ed06ac1f3..7496107d889 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -537,7 +537,6 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva "Raft read index events counter", \ Counter, \ F(type_bypass_lock, {{"type", "bypass_lock"}}), \ - F(type_zero_read_tso, {{"type", "zero_read_tso"}}), \ F(type_use_histroy, {{"type", "use_histroy"}}), \ F(type_use_cache, {{"type", "use_cache"}})) \ M(tiflash_raft_learner_read_failures_count, \ diff --git a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp index 4916ee54043..00b516b287e 100644 --- a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp +++ b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp @@ -117,10 +117,6 @@ std::vector LearnerReadWorker::buildBatchReadIndexReq // If using `std::numeric_limits::max()`, set `start-ts` 0 to get the latest index but let read-index-worker do not record as history. auto read_index_tso = mvcc_query_info.read_tso == std::numeric_limits::max() ? 0 : mvcc_query_info.read_tso; - if (read_index_tso == 0) - { - GET_METRIC(tiflash_raft_read_index_events_count, type_zero_read_tso).Increment(); - } for (const auto & region_to_query : regions_info) { const RegionID region_id = region_to_query.region_id; @@ -435,49 +431,8 @@ void LearnerReadWorker::waitIndex( "[Learner Read] Finish wait index and resolve locks, wait_cost={}ms n_regions={} n_unavailable={}, read_tso={}", stats.wait_index_elapsed_ms, stats.num_regions, -<<<<<<< HEAD - unavailable_regions.size()); -======= unavailable_regions.size(), mvcc_query_info.read_tso); - - auto bypass_formatter = [&](const RegionQueryInfo & query_info) -> String { - if (query_info.bypass_lock_ts == nullptr) - return ""; - FmtBuffer buffer; - buffer.append("["); - buffer.joinStr( - query_info.bypass_lock_ts->begin(), - query_info.bypass_lock_ts->end(), - [](const auto & v, FmtBuffer & f) { f.fmtAppend("{}", v); }, - "|"); - buffer.append("]"); - return buffer.toString(); - }; - auto region_info_formatter = [&]() -> String { - FmtBuffer buffer; - buffer.joinStr( - regions_info.begin(), - regions_info.end(), - [&](const auto & region_to_query, FmtBuffer & f) { - const auto & region = regions_snapshot.find(region_to_query.region_id)->second; - f.fmtAppend( - "(id:{} applied_index:{} bypass_locks:{})", - region_to_query.region_id, - region->appliedIndex(), - bypass_formatter(region_to_query)); - }, - ";"); - return buffer.toString(); - }; - - LOG_DEBUG( - log, - "[Learner Read] Learner Read Summary, regions_info={}, unavailable_regions_info={}, read_tso={}", - region_info_formatter(), - unavailable_regions.toDebugString(), - mvcc_query_info.read_tso); ->>>>>>> 14a127820d (Fix AsyncTasks cancel deadlock (#8953)) } std::tuple // From 17b704d752463977ca03726441f9acbb589de3fc Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Wed, 10 Apr 2024 19:40:51 +0800 Subject: [PATCH 3/4] KVStore: Fix test of async tasks (#8928) close pingcap/tiflash#8926 --- .../Storages/KVStore/Read/LearnerReadWorker.h | 20 ++++++++++ .../KVStore/tests/gtest_async_tasks.cpp | 37 +++++++++++++------ 2 files changed, 46 insertions(+), 11 deletions(-) diff --git a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.h b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.h index d9bf888b0e9..f546257bea7 100644 --- a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.h +++ b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.h @@ -71,6 +71,26 @@ struct UnavailableRegions void addRegionWaitIndexTimeout(RegionID region_id, UInt64 index_to_wait, UInt64 current_applied_index); + String toDebugString() const + { + FmtBuffer buffer; + buffer.append("{ids=["); + buffer.joinStr( + ids.begin(), + ids.end(), + [](const auto & v, FmtBuffer & f) { f.fmtAppend("{}", v); }, + "|"); + buffer.append("] locks="); + buffer.append("["); + buffer.joinStr( + region_locks.begin(), + region_locks.end(), + [](const auto & v, FmtBuffer & f) { f.fmtAppend("{}({})", v.first, v.second->DebugString()); }, + "|"); + buffer.append("]}"); + return buffer.toString(); + } + private: const bool batch_cop; const bool is_wn_disagg_read; diff --git a/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp b/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp index 1160137161f..e988b1e40b9 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp @@ -28,17 +28,18 @@ TEST(AsyncTasksTest, AsyncTasksNormal) auto log = DB::Logger::get(); LOG_INFO(log, "Cancel and addTask"); // Cancel and addTask - // 3 -> 1 -> 4 -> 2 { auto async_tasks = std::make_unique(1, 1, 2); auto m = std::make_shared(); int flag = 0; std::unique_lock cl(*m); std::atomic_bool finished_flag = false; - async_tasks->addTask(1, [m, &flag, &async_tasks, &finished_flag]() { + std::atomic_bool running_flag = false; + async_tasks->addTask(1, [m, &flag, &async_tasks, &finished_flag, &running_flag]() { + running_flag.store(true, std::memory_order_seq_cst); auto cancel_handle = async_tasks->getCancelHandleFromExecutor(1); - std::scoped_lock rl(*m); // 2 - SCOPE_EXIT({ finished_flag.store(true); }); + std::scoped_lock rl(*m); + SCOPE_EXIT({ finished_flag.store(true, std::memory_order_seq_cst); }); // Run after `cl` is released. if (cancel_handle->isCanceled()) { @@ -46,17 +47,31 @@ TEST(AsyncTasksTest, AsyncTasksNormal) } flag = 1; }); + ASSERT_TRUE(async_tasks->isScheduled(1)); + { + int cnt_wait_sche = 0; + while (!running_flag.load(std::memory_order_seq_cst)) + { + cnt_wait_sche += 1; + ASSERT(cnt_wait_sche < 6); + std::this_thread::sleep_for(200ms); + } + } + // Make sure we don't cancel in queue. async_tasks->asyncCancelTask(1); + // The task is not registered anymore. ASSERT_FALSE(async_tasks->isScheduled(1)); async_tasks->addTask(1, [&flag]() { flag = 2; }); - cl.unlock(); // Now can task 1 run. - int count = 0; - using namespace std::chrono_literals; - while (!finished_flag.load()) + cl.unlock(); { - count += 1; - ASSERT(count < 6); - std::this_thread::sleep_for(200ms); + int cnt_wait_finish = 0; + using namespace std::chrono_literals; + while (!finished_flag.load(std::memory_order_seq_cst)) + { + cnt_wait_finish += 1; + ASSERT(cnt_wait_finish < 6); + std::this_thread::sleep_for(200ms); + } } ASSERT_NO_THROW(async_tasks->fetchResult(1)); ASSERT_EQ(flag, 2); From 01a4c97f107838250957efdced953d0e61419a85 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 22 Apr 2024 14:51:56 +0800 Subject: [PATCH 4/4] Remove unrelate code --- .../Storages/KVStore/Read/LearnerReadWorker.h | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.h b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.h index f546257bea7..d9bf888b0e9 100644 --- a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.h +++ b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.h @@ -71,26 +71,6 @@ struct UnavailableRegions void addRegionWaitIndexTimeout(RegionID region_id, UInt64 index_to_wait, UInt64 current_applied_index); - String toDebugString() const - { - FmtBuffer buffer; - buffer.append("{ids=["); - buffer.joinStr( - ids.begin(), - ids.end(), - [](const auto & v, FmtBuffer & f) { f.fmtAppend("{}", v); }, - "|"); - buffer.append("] locks="); - buffer.append("["); - buffer.joinStr( - region_locks.begin(), - region_locks.end(), - [](const auto & v, FmtBuffer & f) { f.fmtAppend("{}({})", v.first, v.second->DebugString()); }, - "|"); - buffer.append("]}"); - return buffer.toString(); - } - private: const bool batch_cop; const bool is_wn_disagg_read;