From d8b4edbd60af3cd343d4cf7740ceb7157346237f Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Sun, 3 Mar 2024 17:42:42 +0800 Subject: [PATCH] [bugfix](wgcore) map at only get reference and it will core in multithread (#31702) map.at method only get a reference of the task group. in multi thread env, the task group maybe erased by another thread. map.at()->stop_task_schedulers will core. --------- Co-authored-by: yiguolei --- .../runtime/task_group/task_group_manager.cpp | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp index 819e63c855d201..a336cccd3d21a6 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -72,33 +72,35 @@ TaskGroupPtr TaskGroupManager::get_task_group_by_id(uint64_t tg_id) { void TaskGroupManager::delete_task_group_by_ids(std::set used_wg_id) { int64_t begin_time = MonotonicMillis(); // 1 get delete group without running queries - std::set deleted_tg_ids; + std::vector deleted_task_groups; { std::lock_guard write_lock(_group_mutex); for (auto iter = _task_groups.begin(); iter != _task_groups.end(); iter++) { uint64_t tg_id = iter->first; - auto* task_group_ptr = iter->second.get(); + auto task_group_ptr = iter->second; if (used_wg_id.find(tg_id) == used_wg_id.end()) { task_group_ptr->shutdown(); // only when no query running in task group, its resource can be released in BE if (task_group_ptr->query_num() == 0) { LOG(INFO) << "There is no query in wg " << tg_id << ", delete it."; - deleted_tg_ids.insert(tg_id); + deleted_task_groups.push_back(task_group_ptr); } } } } // 2 stop active thread - for (uint64_t tg_id : deleted_tg_ids) { - _task_groups.at(tg_id)->try_stop_schedulers(); + for (auto& tg : deleted_task_groups) { + // There is not lock here, but the tg may be released by another + // thread, so that we should use shared ptr here, not use tg_id + tg->try_stop_schedulers(); } // 3 release resource in memory { std::lock_guard write_lock(_group_mutex); - for (uint64_t tg_id : deleted_tg_ids) { - _task_groups.erase(tg_id); + for (auto& tg : deleted_task_groups) { + _task_groups.erase(tg->id()); } } @@ -129,7 +131,7 @@ void TaskGroupManager::delete_task_group_by_ids(std::set used_wg_id) { } int64_t time_cost_ms = MonotonicMillis() - begin_time; LOG(INFO) << "finish clear unused task group, time cost: " << time_cost_ms - << "ms, deleted group size:" << deleted_tg_ids.size(); + << "ms, deleted group size:" << deleted_task_groups.size(); } void TaskGroupManager::stop() {