Skip to content

Commit

Permalink
Add scan_num property for workload group
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo committed Feb 19, 2024
1 parent 1789e2a commit 829139a
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 19 deletions.
6 changes: 6 additions & 0 deletions be/src/runtime/task_group/task_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ Status TaskGroupInfo::parse_topic_info(const TWorkloadGroupInfo& workload_group_
}
task_group_info->enable_cpu_hard_limit = enable_cpu_hard_limit;

// 9 scan thread num
task_group_info->scan_thread_num = config::doris_scanner_thread_pool_thread_num;
if (workload_group_info.__isset.scan_thread_num) {
task_group_info->scan_thread_num = workload_group_info.scan_thread_num;
}

return Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/task_group/task_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ struct TaskGroupInfo {
int64_t version;
int cpu_hard_limit;
bool enable_cpu_hard_limit;
int scan_thread_num = -1; // -1 means not set, then no need to update theadpool
// log cgroup cpu info
uint64_t cgroup_cpu_shares = 0;
int cgroup_cpu_hard_limit = 0;
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/task_group/task_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
int cpu_hard_limit = tg_info->cpu_hard_limit;
uint64_t cpu_shares = tg_info->cpu_share;
bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit;
int scan_thread_num = tg_info->scan_thread_num;

std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
// step 1: init cgroup cpu controller
Expand Down Expand Up @@ -142,6 +143,9 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
return Status::InternalError<false>("scan scheduler start failed, gid={}", tg_id);
}
}
if (_tg_scan_sche_map.find(tg_id) != _tg_scan_sche_map.end()) {
_tg_scan_sche_map.at(tg_id)->reset_thread_num(scan_thread_num);
}

// step 4: init non-pipe scheduler
if (_non_pipe_thread_pool_map.find(tg_id) == _non_pipe_thread_pool_map.end()) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
this->_scanner_scan(ctx, scanner_ref);
};
SimplifiedScanTask simple_scan_task = {work_func, ctx};
ret = scan_sche->get_scan_queue()->try_put(simple_scan_task);
ret = scan_sche->submit_scan_task(simple_scan_task);
} else {
PriorityThreadPool::Task task;
task.work_function = [this, scanner_ref = scan_task, ctx]() {
Expand Down
35 changes: 18 additions & 17 deletions be/src/vec/exec/scan/scanner_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,9 @@ struct SimplifiedScanTask {
std::shared_ptr<vectorized::ScannerContext> scanner_context = nullptr;
};

// used for cpu hard limit
class SimplifiedScanScheduler {
public:
SimplifiedScanScheduler(std::string wg_name, CgroupCpuCtl* cgroup_cpu_ctl) {
_scan_task_queue = std::make_unique<BlockingQueue<SimplifiedScanTask>>(
config::doris_scanner_thread_pool_queue_size);
_is_stop.store(false);
_cgroup_cpu_ctl = cgroup_cpu_ctl;
_wg_name = wg_name;
Expand All @@ -117,7 +114,6 @@ class SimplifiedScanScheduler {

void stop() {
_is_stop.store(true);
_scan_task_queue->shutdown();
_scan_thread_pool->shutdown();
_scan_thread_pool->wait();
}
Expand All @@ -128,27 +124,32 @@ class SimplifiedScanScheduler {
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
.set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
.build(&_scan_thread_pool));

for (int i = 0; i < config::doris_scanner_thread_pool_thread_num; i++) {
RETURN_IF_ERROR(_scan_thread_pool->submit_func([this] { this->_work(); }));
}
return Status::OK();
}

BlockingQueue<SimplifiedScanTask>* get_scan_queue() { return _scan_task_queue.get(); }
Status submit_scan_task(SimplifiedScanTask scan_task) {
if (!_is_stop) {
return _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); });
} else {
return Status::InternalError<false>("scanner pool {} is shutdown.", _wg_name);
}
}

private:
void _work() {
while (!_is_stop.load()) {
SimplifiedScanTask scan_task;
if (_scan_task_queue->blocking_get(&scan_task)) {
scan_task.scan_func();
};
void reset_thread_num(int thread_num) {
int max_thread_num = _scan_thread_pool->max_threads();
if (max_thread_num != thread_num) {
if (thread_num > max_thread_num) {
static_cast<void>(_scan_thread_pool->set_max_threads(thread_num));
static_cast<void>(_scan_thread_pool->set_min_threads(thread_num));
} else {
static_cast<void>(_scan_thread_pool->set_min_threads(thread_num));
static_cast<void>(_scan_thread_pool->set_max_threads(thread_num));
}
}
}

private:
std::unique_ptr<ThreadPool> _scan_thread_pool;
std::unique_ptr<BlockingQueue<SimplifiedScanTask>> _scan_task_queue;
std::atomic<bool> _is_stop;
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
std::string _wg_name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {

public static final String QUEUE_TIMEOUT = "queue_timeout";

public static final String SCAN_THREAD_NUM = "scan_thread_num";

// NOTE(wb): all property is not required, some properties default value is set in be
// default value is as followed
// cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true
private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new ImmutableSet.Builder<String>()
.add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY)
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).build();
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM).build();

@SerializedName(value = "id")
private long id;
Expand Down Expand Up @@ -208,6 +210,21 @@ private static void checkProperties(Map<String, String> properties) throws DdlEx
}
}

if (properties.containsKey(SCAN_THREAD_NUM)) {
String value = properties.get(SCAN_THREAD_NUM);
if (!"NULL".equalsIgnoreCase(value)) {
try {
int intValue = Integer.parseInt(value);
if (intValue <= 0) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new DdlException(
SCAN_THREAD_NUM + " must be a positive integer or null. but input value is " + value);
}
}
}

// check queue property
if (properties.containsKey(MAX_CONCURRENCY)) {
try {
Expand Down Expand Up @@ -345,6 +362,11 @@ public TopicInfo toTopicInfo() {
+ "id=" + id + ",name=" + name);
}

String scanThreadNumStr = properties.get(SCAN_THREAD_NUM);
if (scanThreadNumStr != null && !"NULL".equalsIgnoreCase(scanThreadNumStr)) {
tWorkloadGroupInfo.setScanThreadNum(Integer.parseInt(scanThreadNumStr));
}

TopicInfo topicInfo = new TopicInfo();
topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo);
return topicInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
.add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT)
.add(WorkloadGroup.MAX_CONCURRENCY).add(WorkloadGroup.MAX_QUEUE_SIZE)
.add(WorkloadGroup.QUEUE_TIMEOUT).add(WorkloadGroup.CPU_HARD_LIMIT)
.add(WorkloadGroup.SCAN_THREAD_NUM)
.add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM)
.build();

Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/BackendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ struct TWorkloadGroupInfo {
6: optional string mem_limit
7: optional bool enable_memory_overcommit
8: optional bool enable_cpu_hard_limit
9: optional i32 scan_thread_num
}

struct TWorkloadMoveQueryToGroupAction {
Expand Down

0 comments on commit 829139a

Please sign in to comment.