From bd10aec501f4d8e27d5a0d1255875a3459af38dc Mon Sep 17 00:00:00 2001 From: wangbo <506340561@qq.com> Date: Tue, 20 Feb 2024 17:22:38 +0800 Subject: [PATCH] Create workload thread pool without cgroup --- be/src/agent/workload_group_listener.cpp | 10 ++-------- .../runtime/task_group/task_group_manager.cpp | 20 ++++++++++--------- .../runtime/task_group/task_group_manager.h | 2 +- 3 files changed, 14 insertions(+), 18 deletions(-) diff --git a/be/src/agent/workload_group_listener.cpp b/be/src/agent/workload_group_listener.cpp index 237d6c77274d69..5f9da64bd2a40a 100644 --- a/be/src/agent/workload_group_listener.cpp +++ b/be/src/agent/workload_group_listener.cpp @@ -26,7 +26,6 @@ namespace doris { void WorkloadGroupListener::handle_topic_info(const std::vector& topic_info_list) { std::set current_wg_ids; - bool is_set_cgroup_path = config::doris_cgroup_cpu_path != ""; for (const TopicInfo& topic_info : topic_info_list) { if (!topic_info.__isset.workload_group_info) { continue; @@ -51,12 +50,7 @@ void WorkloadGroupListener::handle_topic_info(const std::vector& topi task_group_info.enable_cpu_hard_limit); // 4 create and update task scheduler - Status ret2 = _exec_env->task_group_manager()->upsert_cg_task_scheduler(&task_group_info, - _exec_env); - if (is_set_cgroup_path && !ret2.ok()) { - LOG(INFO) << "upsert task sche failed, tg_id=" << task_group_info.id - << ", reason=" << ret2.to_string(); - } + _exec_env->task_group_manager()->upsert_cg_task_scheduler(&task_group_info, _exec_env); LOG(INFO) << "update task group finish, tg info=" << tg->debug_string() << ", enable_cpu_hard_limit=" @@ -65,7 +59,7 @@ void WorkloadGroupListener::handle_topic_info(const std::vector& topi << ", cgroup cpu_hard_limit=" << task_group_info.cgroup_cpu_hard_limit << ", enable_cgroup_cpu_soft_limit=" << (config::enable_cgroup_cpu_soft_limit ? "true" : "false") - << ", is set cgroup path=" << (is_set_cgroup_path ? "true" : "flase"); + << ", cgroup home path=" << config::doris_cgroup_cpu_path; } _exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids); diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp index 18e446295cb650..71f0da4f0ff36f 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -89,8 +89,8 @@ void TaskGroupManager::get_query_scheduler(uint64_t tg_id, } } -Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info, - ExecEnv* exec_env) { +void TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info, + ExecEnv* exec_env) { uint64_t tg_id = tg_info->id; std::string tg_name = tg_info->name; int cpu_hard_limit = tg_info->cpu_hard_limit; @@ -101,15 +101,17 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i std::lock_guard write_lock(_task_scheduler_lock); // step 1: init cgroup cpu controller CgroupCpuCtl* cg_cu_ctl_ptr = nullptr; - if (_cgroup_ctl_map.find(tg_id) == _cgroup_ctl_map.end()) { + if (config::doris_cgroup_cpu_path != "" && + _cgroup_ctl_map.find(tg_id) == _cgroup_ctl_map.end()) { std::unique_ptr cgroup_cpu_ctl = std::make_unique(tg_id); Status ret = cgroup_cpu_ctl->init(); if (ret.ok()) { cg_cu_ctl_ptr = cgroup_cpu_ctl.get(); _cgroup_ctl_map.emplace(tg_id, std::move(cgroup_cpu_ctl)); + LOG(INFO) << "[upsert wg thread pool] cgroup init success"; } else { - return Status::InternalError("cgroup init failed, gid={}, reason={}", tg_id, - ret.to_string()); + LOG(INFO) << "[upsert wg thread pool] cgroup init failed, gid= " << tg_id + << ", reason=" << ret.to_string(); } } @@ -128,7 +130,7 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i if (ret.ok()) { _tg_sche_map.emplace(tg_id, std::move(pipeline_task_scheduler)); } else { - return Status::InternalError("task scheduler start failed, gid={}", tg_id); + LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, gid= " << tg_id; } } @@ -140,7 +142,7 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i if (ret.ok()) { _tg_scan_sche_map.emplace(tg_id, std::move(scan_scheduler)); } else { - return Status::InternalError("scan scheduler start failed, gid={}", tg_id); + LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, gid=" << tg_id; } } if (scan_thread_num > 0 && _tg_scan_sche_map.find(tg_id) != _tg_scan_sche_map.end()) { @@ -157,7 +159,7 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i .set_cgroup_cpu_ctl(cg_cu_ctl_ptr) .build(&thread_pool); if (!ret.ok()) { - LOG(INFO) << "create non-pipline thread pool failed"; + LOG(INFO) << "[upsert wg thread pool] create non-pipline thread pool failed"; } else { _non_pipe_thread_pool_map.emplace(tg_id, std::move(thread_pool)); } @@ -169,7 +171,7 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit); _cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE); } else { - return Status::InternalError("enable cpu hard limit but value is illegal"); + LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is illegal"; } } else { if (config::enable_cgroup_cpu_soft_limit) { diff --git a/be/src/runtime/task_group/task_group_manager.h b/be/src/runtime/task_group/task_group_manager.h index 1a1a614d06879e..29e5c30a59645e 100644 --- a/be/src/runtime/task_group/task_group_manager.h +++ b/be/src/runtime/task_group/task_group_manager.h @@ -51,7 +51,7 @@ class TaskGroupManager { void get_related_taskgroups(const std::function& pred, std::vector* task_groups); - Status upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* exec_env); + void upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* exec_env); void delete_task_group_by_ids(std::set id_set);