Skip to content

Commit

Permalink
Fix AsyncTasks cancel deadlock (#8953) (#8960)
Browse files Browse the repository at this point in the history
close #8952
  • Loading branch information
ti-chi-bot authored Apr 22, 2024
1 parent e6ea28e commit be0b5e6
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 19 deletions.
9 changes: 9 additions & 0 deletions dbms/src/Interpreters/SharedContexts/Disagg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,13 @@ void SharedContextDisagg::initFastAddPeerContext(UInt64 fap_concur)
fap_context = std::make_shared<FastAddPeerContext>(fap_concur);
}


SharedContextDisagg::~SharedContextDisagg()
{
if (fap_context)
{
fap_context->shutdown();
}
}

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Interpreters/SharedContexts/Disagg.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ struct SharedContextDisagg : private boost::noncopyable
: global_context(global_context_)
{}

~SharedContextDisagg();

void initReadNodePageCache(const PathPool & path_pool, const String & cache_dir, size_t cache_capacity);

/// Note that the unit of max_size is quantity, not byte size. It controls how
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/KVStore/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ KVStore::~KVStore()
LOG_INFO(log, "Destroy KVStore");
stopThreadAllocInfo();
releaseReadIndexWorkers();
LOG_INFO(log, "Destroy KVStore Finished");
}

FileUsageStatistics KVStore::getFileUsageStatistics() const
Expand Down Expand Up @@ -611,6 +612,7 @@ void KVStore::stopThreadAllocInfo()
is_terminated = true;
monitoring_cv.notify_all();
}
LOG_INFO(log, "KVStore shutdown, wait thread alloc monitor join");
monitoring_thread->join();
delete monitoring_thread;
monitoring_thread = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ FastAddPeerContext::FastAddPeerContext(uint64_t thread_count)
tasks_trace = std::make_shared<FAPAsyncTasks>(thread_count, thread_count, 1000);
}

void FastAddPeerContext::shutdown() const
{
tasks_trace->shutdown();
}

ParsedCheckpointDataHolderPtr FastAddPeerContext::CheckpointCacheElement::getParsedCheckpointData(Context & context)
{
std::scoped_lock<std::mutex> lock(mu);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

namespace DB
{
class FastAddPeerContext;
using FAPAsyncTasks = AsyncTasks<uint64_t, std::function<FastAddPeerRes()>, FastAddPeerRes>;
struct CheckpointInfo;
using CheckpointInfoPtr = std::shared_ptr<CheckpointInfo>;
Expand All @@ -34,6 +35,7 @@ class FastAddPeerContext
{
public:
explicit FastAddPeerContext(uint64_t thread_count = 0);
void shutdown() const;

// Return parsed checkpoint data and its corresponding seq which is newer than `required_seq` if exists, otherwise return pair<required_seq, nullptr>
std::pair<UInt64, ParsedCheckpointDataHolderPtr> getNewerCheckpointData(
Expand Down
15 changes: 9 additions & 6 deletions dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,14 @@ RegionsReadIndexResult LearnerReadWorker::readIndex(
log_lvl,
"[Learner Read] Batch read index, num_regions={} num_requests={} num_stale_read={} num_cached_index={} "
"num_unavailable={} "
"cost={}ms",
"cost={}ms, read_tso={}",
stats.num_regions,
stats.num_read_index_request,
stats.num_stale_read,
stats.num_cached_read_index,
unavailable_regions.size(),
stats.read_index_elapsed_ms);
stats.read_index_elapsed_ms,
mvcc_query_info.read_tso);

return batch_read_index_result;
}
Expand Down Expand Up @@ -427,10 +428,11 @@ void LearnerReadWorker::waitIndex(
LOG_IMPL(
log,
log_lvl,
"[Learner Read] Finish wait index and resolve locks, wait_cost={}ms n_regions={} n_unavailable={}",
"[Learner Read] Finish wait index and resolve locks, wait_cost={}ms n_regions={} n_unavailable={}, read_tso={}",
stats.wait_index_elapsed_ms,
stats.num_regions,
unavailable_regions.size());
unavailable_regions.size(),
mvcc_query_info.read_tso);
}

std::tuple<Clock::time_point, Clock::time_point> //
Expand Down Expand Up @@ -469,13 +471,14 @@ LearnerReadWorker::waitUntilDataAvailable(
log,
log_lvl,
"[Learner Read] batch read index | wait index"
" total_cost={} read_cost={} wait_cost={} n_regions={} n_stale_read={} n_unavailable={}",
" total_cost={} read_cost={} wait_cost={} n_regions={} n_stale_read={} n_unavailable={}, read_tso={}",
time_elapsed_ms,
stats.read_index_elapsed_ms,
stats.wait_index_elapsed_ms,
stats.num_regions,
stats.num_stale_read,
unavailable_regions.size());
unavailable_regions.size(),
mvcc_query_info.read_tso);
return {start_time, end_time};
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/Read/ReadIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ void KVStore::stopReadIndexWorkers() const

void KVStore::releaseReadIndexWorkers()
{
LOG_INFO(log, "KVStore shutdown, deleting read index worker");
if (read_index_worker_manager)
{
delete read_index_worker_manager;
Expand Down
22 changes: 21 additions & 1 deletion dbms/src/Storages/KVStore/Utils/AsyncTasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,24 @@ struct AsyncTasks
, log(DB::Logger::get())
{}

~AsyncTasks() { LOG_INFO(log, "Pending {} tasks when destructing", count()); }
void shutdown()
{
LOG_INFO(log, "Pending {} tasks when destructing", count());
// To avoid the "last owner" problem in worker thread.
thread_pool->wait();
shut.store(true);
LOG_INFO(log, "Finish finalize thread pool");
}

~AsyncTasks()
{
if (!shut.load())
{
LOG_INFO(log, "Destruct without shutdown");
// Potential deadlock if the instance is held and released directly or indirectly by a task in its worker.
shutdown();
}
}

using TaskState = AsyncTaskHelper::TaskState;

Expand Down Expand Up @@ -241,6 +258,8 @@ struct AsyncTasks
// 1. There is already a task registered with the same name and not canceled or fetched.
bool addTaskWithCancel(Key k, Func f, CancelFunc cf)
{
if (shut.load())
return false;
std::scoped_lock l(mtx);
RUNTIME_CHECK(!tasks.contains(k));
using P = std::packaged_task<R()>;
Expand Down Expand Up @@ -399,5 +418,6 @@ struct AsyncTasks
std::unique_ptr<ThreadPool> thread_pool;
mutable std::mutex mtx;
LoggerPtr log;
std::atomic_bool shut = false;
};
} // namespace DB
37 changes: 26 additions & 11 deletions dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,50 @@ TEST(AsyncTasksTest, AsyncTasksNormal)
auto log = DB::Logger::get();
LOG_INFO(log, "Cancel and addTask");
// Cancel and addTask
// 3 -> 1 -> 4 -> 2
{
auto async_tasks = std::make_unique<TestAsyncTasks>(1, 1, 2);
auto m = std::make_shared<std::mutex>();
int flag = 0;
std::unique_lock cl(*m);
std::atomic_bool finished_flag = false;
async_tasks->addTask(1, [m, &flag, &async_tasks, &finished_flag]() {
std::atomic_bool running_flag = false;
async_tasks->addTask(1, [m, &flag, &async_tasks, &finished_flag, &running_flag]() {
running_flag.store(true, std::memory_order_seq_cst);
auto cancel_handle = async_tasks->getCancelHandleFromExecutor(1);
std::scoped_lock rl(*m); // 2
SCOPE_EXIT({ finished_flag.store(true); });
std::scoped_lock rl(*m);
SCOPE_EXIT({ finished_flag.store(true, std::memory_order_seq_cst); });
// Run after `cl` is released.
if (cancel_handle->isCanceled())
{
return;
}
flag = 1;
});
ASSERT_TRUE(async_tasks->isScheduled(1));
{
int cnt_wait_sche = 0;
while (!running_flag.load(std::memory_order_seq_cst))
{
cnt_wait_sche += 1;
ASSERT(cnt_wait_sche < 6);
std::this_thread::sleep_for(200ms);
}
}
// Make sure we don't cancel in queue.
async_tasks->asyncCancelTask(1);
// The task is not registered anymore.
ASSERT_FALSE(async_tasks->isScheduled(1));
async_tasks->addTask(1, [&flag]() { flag = 2; });
cl.unlock(); // Now can task 1 run.
int count = 0;
using namespace std::chrono_literals;
while (!finished_flag.load())
cl.unlock();
{
count += 1;
ASSERT(count < 6);
std::this_thread::sleep_for(200ms);
int cnt_wait_finish = 0;
using namespace std::chrono_literals;
while (!finished_flag.load(std::memory_order_seq_cst))
{
cnt_wait_finish += 1;
ASSERT(cnt_wait_finish < 6);
std::this_thread::sleep_for(200ms);
}
}
ASSERT_NO_THROW(async_tasks->fetchResult(1));
ASSERT_EQ(flag, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class RegionKVStoreTestFAP : public KVStoreTestBase
{
auto & global_context = TiFlashTestEnv::getGlobalContext();
KVStoreTestBase::TearDown();
global_context.getSharedContextDisagg()->fap_context.reset();
global_context.getSharedContextDisagg()->fap_context->shutdown();
if (!already_initialize_data_store)
{
global_context.getSharedContextDisagg()->remote_data_store = nullptr;
Expand Down

0 comments on commit be0b5e6

Please sign in to comment.