Skip to content

Commit

Permalink
[bugfix](wgcore) map at only get reference and it will core in multit…
Browse files Browse the repository at this point in the history
…hread (#31702)

map.at method only get a reference of the task group.
in multi thread env, the task group maybe erased by another thread.
map.at()->stop_task_schedulers will core.

---------

Co-authored-by: yiguolei <[email protected]>
  • Loading branch information
yiguolei and Doris-Extras authored Mar 3, 2024
1 parent 7d4e855 commit d8b4edb
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions be/src/runtime/task_group/task_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,33 +72,35 @@ TaskGroupPtr TaskGroupManager::get_task_group_by_id(uint64_t tg_id) {
void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
int64_t begin_time = MonotonicMillis();
// 1 get delete group without running queries
std::set<uint64_t> deleted_tg_ids;
std::vector<TaskGroupPtr> deleted_task_groups;
{
std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
for (auto iter = _task_groups.begin(); iter != _task_groups.end(); iter++) {
uint64_t tg_id = iter->first;
auto* task_group_ptr = iter->second.get();
auto task_group_ptr = iter->second;
if (used_wg_id.find(tg_id) == used_wg_id.end()) {
task_group_ptr->shutdown();
// only when no query running in task group, its resource can be released in BE
if (task_group_ptr->query_num() == 0) {
LOG(INFO) << "There is no query in wg " << tg_id << ", delete it.";
deleted_tg_ids.insert(tg_id);
deleted_task_groups.push_back(task_group_ptr);
}
}
}
}

// 2 stop active thread
for (uint64_t tg_id : deleted_tg_ids) {
_task_groups.at(tg_id)->try_stop_schedulers();
for (auto& tg : deleted_task_groups) {
// There is not lock here, but the tg may be released by another
// thread, so that we should use shared ptr here, not use tg_id
tg->try_stop_schedulers();
}

// 3 release resource in memory
{
std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
for (uint64_t tg_id : deleted_tg_ids) {
_task_groups.erase(tg_id);
for (auto& tg : deleted_task_groups) {
_task_groups.erase(tg->id());
}
}

Expand Down Expand Up @@ -129,7 +131,7 @@ void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
}
int64_t time_cost_ms = MonotonicMillis() - begin_time;
LOG(INFO) << "finish clear unused task group, time cost: " << time_cost_ms
<< "ms, deleted group size:" << deleted_tg_ids.size();
<< "ms, deleted group size:" << deleted_task_groups.size();
}

void TaskGroupManager::stop() {
Expand Down

0 comments on commit d8b4edb

Please sign in to comment.