Skip to content

Commit

Permalink
Create workload thread pool without cgroup
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo committed Feb 20, 2024
1 parent 558356f commit c7dd982
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 32 deletions.
10 changes: 2 additions & 8 deletions be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ namespace doris {

void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topic_info_list) {
std::set<uint64_t> current_wg_ids;
bool is_set_cgroup_path = config::doris_cgroup_cpu_path != "";
for (const TopicInfo& topic_info : topic_info_list) {
if (!topic_info.__isset.workload_group_info) {
continue;
Expand All @@ -51,12 +50,7 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
task_group_info.enable_cpu_hard_limit);

// 4 create and update task scheduler
Status ret2 = _exec_env->task_group_manager()->upsert_cg_task_scheduler(&task_group_info,
_exec_env);
if (is_set_cgroup_path && !ret2.ok()) {
LOG(INFO) << "upsert task sche failed, tg_id=" << task_group_info.id
<< ", reason=" << ret2.to_string();
}
_exec_env->task_group_manager()->upsert_cg_task_scheduler(&task_group_info, _exec_env);

LOG(INFO) << "update task group finish, tg info=" << tg->debug_string()
<< ", enable_cpu_hard_limit="
Expand All @@ -65,7 +59,7 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
<< ", cgroup cpu_hard_limit=" << task_group_info.cgroup_cpu_hard_limit
<< ", enable_cgroup_cpu_soft_limit="
<< (config::enable_cgroup_cpu_soft_limit ? "true" : "false")
<< ", is set cgroup path=" << (is_set_cgroup_path ? "true" : "flase");
<< ", cgroup home path=" << config::doris_cgroup_cpu_path;
}

_exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids);
Expand Down
48 changes: 25 additions & 23 deletions be/src/runtime/task_group/task_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ void TaskGroupManager::get_query_scheduler(uint64_t tg_id,
}
}

Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info,
ExecEnv* exec_env) {
void TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info,
ExecEnv* exec_env) {
uint64_t tg_id = tg_info->id;
std::string tg_name = tg_info->name;
int cpu_hard_limit = tg_info->cpu_hard_limit;
Expand All @@ -101,15 +101,17 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
// step 1: init cgroup cpu controller
CgroupCpuCtl* cg_cu_ctl_ptr = nullptr;
if (_cgroup_ctl_map.find(tg_id) == _cgroup_ctl_map.end()) {
if (config::doris_cgroup_cpu_path != "" &&
_cgroup_ctl_map.find(tg_id) == _cgroup_ctl_map.end()) {
std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl = std::make_unique<CgroupV1CpuCtl>(tg_id);
Status ret = cgroup_cpu_ctl->init();
if (ret.ok()) {
cg_cu_ctl_ptr = cgroup_cpu_ctl.get();
_cgroup_ctl_map.emplace(tg_id, std::move(cgroup_cpu_ctl));
LOG(INFO) << "[upsert wg thread pool] cgroup init success";
} else {
return Status::InternalError<false>("cgroup init failed, gid={}, reason={}", tg_id,
ret.to_string());
LOG(INFO) << "[upsert wg thread pool] cgroup init failed, gid= " << tg_id
<< ", reason=" << ret.to_string();
}
}

Expand All @@ -128,7 +130,7 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
if (ret.ok()) {
_tg_sche_map.emplace(tg_id, std::move(pipeline_task_scheduler));
} else {
return Status::InternalError<false>("task scheduler start failed, gid={}", tg_id);
LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, gid= " << tg_id;
}
}

Expand All @@ -140,7 +142,7 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
if (ret.ok()) {
_tg_scan_sche_map.emplace(tg_id, std::move(scan_scheduler));
} else {
return Status::InternalError<false>("scan scheduler start failed, gid={}", tg_id);
LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, gid=" << tg_id;
}
}
if (scan_thread_num > 0 && _tg_scan_sche_map.find(tg_id) != _tg_scan_sche_map.end()) {
Expand All @@ -157,31 +159,31 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
.set_cgroup_cpu_ctl(cg_cu_ctl_ptr)
.build(&thread_pool);
if (!ret.ok()) {
LOG(INFO) << "create non-pipline thread pool failed";
LOG(INFO) << "[upsert wg thread pool] create non-pipline thread pool failed";
} else {
_non_pipe_thread_pool_map.emplace(tg_id, std::move(thread_pool));
}
}

// step 5: update cgroup cpu if needed
if (enable_cpu_hard_limit) {
if (cpu_hard_limit > 0) {
_cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit);
_cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
if (_cgroup_ctl_map.find(tg_id) != _cgroup_ctl_map.end()) {
if (enable_cpu_hard_limit) {
if (cpu_hard_limit > 0) {
_cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit);
_cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
} else {
LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is illegal";
}
} else {
return Status::InternalError<false>("enable cpu hard limit but value is illegal");
}
} else {
if (config::enable_cgroup_cpu_soft_limit) {
_cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(cpu_shares);
_cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(
CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit
if (config::enable_cgroup_cpu_soft_limit) {
_cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(cpu_shares);
_cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(
CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit
}
}
_cgroup_ctl_map.at(tg_id)->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares),
&(tg_info->cgroup_cpu_hard_limit));
}
_cgroup_ctl_map.at(tg_id)->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares),
&(tg_info->cgroup_cpu_hard_limit));

return Status::OK();
}

void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/task_group/task_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class TaskGroupManager {
void get_related_taskgroups(const std::function<bool(const TaskGroupPtr& ptr)>& pred,
std::vector<TaskGroupPtr>* task_groups);

Status upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* exec_env);
void upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* exec_env);

void delete_task_group_by_ids(std::set<uint64_t> id_set);

Expand Down

0 comments on commit c7dd982

Please sign in to comment.