Skip to content

Commit

Permalink
Add scan_thread_num property for workload group
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo committed Feb 19, 2024
1 parent 1789e2a commit f7ae340
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 42 deletions.
15 changes: 12 additions & 3 deletions be/src/runtime/task_group/task_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,17 @@ TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
_enable_memory_overcommit(tg_info.enable_memory_overcommit),
_cpu_share(tg_info.cpu_share),
_mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM),
_cpu_hard_limit(tg_info.cpu_hard_limit) {}
_cpu_hard_limit(tg_info.cpu_hard_limit),
_scan_thread_num(tg_info.scan_thread_num) {}

std::string TaskGroup::debug_string() const {
std::shared_lock<std::shared_mutex> rl {_mutex};
return fmt::format(
"TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, enable_memory_overcommit = "
"{}, version = {}, cpu_hard_limit = {}]",
"{}, version = {}, cpu_hard_limit = {}, scan_thread_num = {}]",
_id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, TUnit::BYTES),
_enable_memory_overcommit ? "true" : "false", _version, cpu_hard_limit());
_enable_memory_overcommit ? "true" : "false", _version, cpu_hard_limit(),
_scan_thread_num);
}

void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) {
Expand All @@ -81,6 +83,7 @@ void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) {
_enable_memory_overcommit = tg_info.enable_memory_overcommit;
_cpu_share = tg_info.cpu_share;
_cpu_hard_limit = tg_info.cpu_hard_limit;
_scan_thread_num = tg_info.scan_thread_num;
} else {
return;
}
Expand Down Expand Up @@ -185,6 +188,12 @@ Status TaskGroupInfo::parse_topic_info(const TWorkloadGroupInfo& workload_group_
}
task_group_info->enable_cpu_hard_limit = enable_cpu_hard_limit;

// 9 scan thread num
task_group_info->scan_thread_num = config::doris_scanner_thread_pool_thread_num;
if (workload_group_info.__isset.scan_thread_num && workload_group_info.scan_thread_num > 0) {
task_group_info->scan_thread_num = workload_group_info.scan_thread_num;
}

return Status::OK();
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/task_group/task_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class TaskGroup : public std::enable_shared_from_this<TaskGroup> {
std::atomic<uint64_t> _cpu_share;
std::vector<TgTrackerLimiterGroup> _mem_tracker_limiter_pool;
std::atomic<int> _cpu_hard_limit;
std::atomic<int> _scan_thread_num;

// means task group is mark dropped
// new query can not submit
Expand All @@ -153,6 +154,7 @@ struct TaskGroupInfo {
int64_t version;
int cpu_hard_limit;
bool enable_cpu_hard_limit;
int scan_thread_num;
// log cgroup cpu info
uint64_t cgroup_cpu_shares = 0;
int cgroup_cpu_hard_limit = 0;
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/task_group/task_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
int cpu_hard_limit = tg_info->cpu_hard_limit;
uint64_t cpu_shares = tg_info->cpu_share;
bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit;
int scan_thread_num = tg_info->scan_thread_num;

std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
// step 1: init cgroup cpu controller
Expand Down Expand Up @@ -142,6 +143,9 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
return Status::InternalError<false>("scan scheduler start failed, gid={}", tg_id);
}
}
if (scan_thread_num > 0 && _tg_scan_sche_map.find(tg_id) != _tg_scan_sche_map.end()) {
_tg_scan_sche_map.at(tg_id)->reset_thread_num(scan_thread_num);
}

// step 4: init non-pipe scheduler
if (_non_pipe_thread_pool_map.find(tg_id) == _non_pipe_thread_pool_map.end()) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
this->_scanner_scan(ctx, scanner_ref);
};
SimplifiedScanTask simple_scan_task = {work_func, ctx};
ret = scan_sche->get_scan_queue()->try_put(simple_scan_task);
ret = scan_sche->submit_scan_task(simple_scan_task);
} else {
PriorityThreadPool::Task task;
task.work_function = [this, scanner_ref = scan_task, ctx]() {
Expand Down
35 changes: 18 additions & 17 deletions be/src/vec/exec/scan/scanner_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,9 @@ struct SimplifiedScanTask {
std::shared_ptr<vectorized::ScannerContext> scanner_context = nullptr;
};

// used for cpu hard limit
class SimplifiedScanScheduler {
public:
SimplifiedScanScheduler(std::string wg_name, CgroupCpuCtl* cgroup_cpu_ctl) {
_scan_task_queue = std::make_unique<BlockingQueue<SimplifiedScanTask>>(
config::doris_scanner_thread_pool_queue_size);
_is_stop.store(false);
_cgroup_cpu_ctl = cgroup_cpu_ctl;
_wg_name = wg_name;
Expand All @@ -117,7 +114,6 @@ class SimplifiedScanScheduler {

void stop() {
_is_stop.store(true);
_scan_task_queue->shutdown();
_scan_thread_pool->shutdown();
_scan_thread_pool->wait();
}
Expand All @@ -128,27 +124,32 @@ class SimplifiedScanScheduler {
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
.set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
.build(&_scan_thread_pool));

for (int i = 0; i < config::doris_scanner_thread_pool_thread_num; i++) {
RETURN_IF_ERROR(_scan_thread_pool->submit_func([this] { this->_work(); }));
}
return Status::OK();
}

BlockingQueue<SimplifiedScanTask>* get_scan_queue() { return _scan_task_queue.get(); }
Status submit_scan_task(SimplifiedScanTask scan_task) {
if (!_is_stop) {
return _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); });
} else {
return Status::InternalError<false>("scanner pool {} is shutdown.", _wg_name);
}
}

private:
void _work() {
while (!_is_stop.load()) {
SimplifiedScanTask scan_task;
if (_scan_task_queue->blocking_get(&scan_task)) {
scan_task.scan_func();
};
void reset_thread_num(int thread_num) {
int max_thread_num = _scan_thread_pool->max_threads();
if (max_thread_num != thread_num) {
if (thread_num > max_thread_num) {
static_cast<void>(_scan_thread_pool->set_max_threads(thread_num));
static_cast<void>(_scan_thread_pool->set_min_threads(thread_num));
} else {
static_cast<void>(_scan_thread_pool->set_min_threads(thread_num));
static_cast<void>(_scan_thread_pool->set_max_threads(thread_num));
}
}
}

private:
std::unique_ptr<ThreadPool> _scan_thread_pool;
std::unique_ptr<BlockingQueue<SimplifiedScanTask>> _scan_task_queue;
std::atomic<bool> _is_stop;
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
std::string _wg_name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {

public static final String QUEUE_TIMEOUT = "queue_timeout";

public static final String SCAN_THREAD_NUM = "scan_thread_num";

// NOTE(wb): all property is not required, some properties default value is set in be
// default value is as followed
// cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true
private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new ImmutableSet.Builder<String>()
.add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY)
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).build();
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM).build();

@SerializedName(value = "id")
private long id;
Expand Down Expand Up @@ -208,6 +210,19 @@ private static void checkProperties(Map<String, String> properties) throws DdlEx
}
}

if (properties.containsKey(SCAN_THREAD_NUM)) {
String value = properties.get(SCAN_THREAD_NUM);
try {
int intValue = Integer.parseInt(value);
if (intValue <= 0 && intValue != -1) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new DdlException(
SCAN_THREAD_NUM + " must be a positive integer or -1. but input value is " + value);
}
}

// check queue property
if (properties.containsKey(MAX_CONCURRENCY)) {
try {
Expand Down Expand Up @@ -290,6 +305,8 @@ public void getProcNodeData(BaseProcResult result, QueryQueue qq) {
row.add("0%");
} else if (ENABLE_MEMORY_OVERCOMMIT.equals(key) && !properties.containsKey(key)) {
row.add("true");
} else if (SCAN_THREAD_NUM.equals(key) && !properties.containsKey(key)) {
row.add("-1");
} else {
row.add(properties.get(key));
}
Expand Down Expand Up @@ -345,6 +362,11 @@ public TopicInfo toTopicInfo() {
+ "id=" + id + ",name=" + name);
}

String scanThreadNumStr = properties.get(SCAN_THREAD_NUM);
if (scanThreadNumStr != null) {
tWorkloadGroupInfo.setScanThreadNum(Integer.parseInt(scanThreadNumStr));
}

TopicInfo topicInfo = new TopicInfo();
topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo);
return topicInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
.add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT)
.add(WorkloadGroup.MAX_CONCURRENCY).add(WorkloadGroup.MAX_QUEUE_SIZE)
.add(WorkloadGroup.QUEUE_TIMEOUT).add(WorkloadGroup.CPU_HARD_LIMIT)
.add(WorkloadGroup.SCAN_THREAD_NUM)
.add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,9 @@ private static TFetchSchemaTableDataResult workloadGroupsMetadataResult(TMetadat
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(6)))); // max queue size
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(7)))); // queue timeout
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(8))); // cpu hard limit
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(9)))); // running query num
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10)))); // waiting query num
trow.addToColumnValue(new TCell().setIntVal(Integer.parseInt(rGroupsInfo.get(9)))); // scan thread num
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10)))); // running query num
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(11)))); // waiting query num
dataBatch.add(trow);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class WorkloadGroupsTableValuedFunction extends MetadataTableValuedFuncti
new Column(WorkloadGroup.MAX_QUEUE_SIZE, ScalarType.createType(PrimitiveType.BIGINT)),
new Column(WorkloadGroup.QUEUE_TIMEOUT, ScalarType.createType(PrimitiveType.BIGINT)),
new Column(WorkloadGroup.CPU_HARD_LIMIT, ScalarType.createStringType()),
new Column(WorkloadGroup.SCAN_THREAD_NUM, ScalarType.createType(PrimitiveType.INT)),
new Column(QueryQueue.RUNNING_QUERY_NUM, ScalarType.createType(PrimitiveType.BIGINT)),
new Column(QueryQueue.WAITING_QUERY_NUM, ScalarType.createType(PrimitiveType.BIGINT)));

Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/BackendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ struct TWorkloadGroupInfo {
6: optional string mem_limit
7: optional bool enable_memory_overcommit
8: optional bool enable_cpu_hard_limit
9: optional i32 scan_thread_num
}

struct TWorkloadMoveQueryToGroupAction {
Expand Down
24 changes: 12 additions & 12 deletions regression-test/data/workload_manager_p0/test_curd_wlg.out
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
2

-- !show_1 --
normal 20 50% true 2147483647 0 0 1%
test_group 10 10% true 2147483647 0 0 0%
normal 20 50% true 2147483647 0 0 1% 16
test_group 10 10% true 2147483647 0 0 0% -1

-- !mem_limit_1 --
2

-- !mem_limit_2 --
normal 20 50% true 2147483647 0 0 1%
test_group 10 11% true 2147483647 0 0 0%
normal 20 50% true 2147483647 0 0 1% 16
test_group 10 11% true 2147483647 0 0 0% -1

-- !mem_overcommit_1 --
2
Expand All @@ -23,24 +23,24 @@ test_group 10 11% true 2147483647 0 0 0%
2

-- !mem_overcommit_3 --
normal 20 50% true 2147483647 0 0 1%
test_group 10 11% false 2147483647 0 0 0%
normal 20 50% true 2147483647 0 0 1% 16
test_group 10 11% false 2147483647 0 0 0% -1

-- !cpu_hard_limit_1 --
2

-- !cpu_hard_limit_2 --
normal 20 50% true 2147483647 0 0 1%
test_group 10 11% false 2147483647 0 0 20%
normal 20 50% true 2147483647 0 0 1% 16
test_group 10 11% false 2147483647 0 0 20% -1

-- !queue_1 --
2

-- !show_queue --
normal 20 50% true 2147483647 0 0 1%
test_group 10 11% false 100 0 0 20%
normal 20 50% true 2147483647 0 0 1% 16
test_group 10 11% false 100 0 0 20% -1

-- !select_tvf_1 --
normal 20 50% true 2147483647 0 0 1%
test_group 10 11% false 100 0 0 20%
normal 20 50% true 2147483647 0 0 1% 16
test_group 10 11% false 100 0 0 20% -1

20 changes: 14 additions & 6 deletions regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,14 @@ suite("test_crud_wlg") {
sql "alter workload group normal properties ( 'max_queue_size'='0' );"
sql "alter workload group normal properties ( 'queue_timeout'='0' );"
sql "alter workload group normal properties ( 'cpu_hard_limit'='1%' );"
sql "alter workload group normal properties ( 'scan_thread_num'='-1' );"

sql "set workload_group=normal;"

// test cpu_share
qt_cpu_share """ select count(1) from ${table_name} """

sql "alter workload group normal properties ( 'scan_thread_num'='16' );"
sql "alter workload group normal properties ( 'cpu_share'='20' );"

qt_cpu_share_2 """ select count(1) from ${table_name} """
Expand All @@ -96,6 +98,12 @@ suite("test_crud_wlg") {
exception "requires a positive integer"
}

test {
sql "alter workload group normal properties ( 'scan_thread_num'='0' );"

exception "scan_thread_num must be a positive integer or -1"
}

sql "drop workload group if exists test_group;"

// test create group
Expand All @@ -107,7 +115,7 @@ suite("test_crud_wlg") {
");"
sql "set workload_group=test_group;"

qt_show_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit from workload_groups() order by name;"
qt_show_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() order by name;"

// test memory_limit
test {
Expand All @@ -118,7 +126,7 @@ suite("test_crud_wlg") {

sql "alter workload group test_group properties ( 'memory_limit'='11%' );"
qt_mem_limit_1 """ select count(1) from ${table_name} """
qt_mem_limit_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit from workload_groups() order by name;"
qt_mem_limit_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() order by name;"

// test enable_memory_overcommit
test {
Expand All @@ -131,7 +139,7 @@ suite("test_crud_wlg") {
qt_mem_overcommit_1 """ select count(1) from ${table_name} """
sql "alter workload group test_group properties ( 'enable_memory_overcommit'='false' );"
qt_mem_overcommit_2 """ select count(1) from ${table_name} """
qt_mem_overcommit_3 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit from workload_groups() order by name;"
qt_mem_overcommit_3 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() order by name;"

// test cpu_hard_limit
test {
Expand All @@ -150,7 +158,7 @@ suite("test_crud_wlg") {

sql "alter workload group test_group properties ( 'cpu_hard_limit'='20%' );"
qt_cpu_hard_limit_1 """ select count(1) from ${table_name} """
qt_cpu_hard_limit_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit from workload_groups() order by name;"
qt_cpu_hard_limit_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() order by name;"

// test query queue
test {
Expand All @@ -173,7 +181,7 @@ suite("test_crud_wlg") {

sql "alter workload group test_group properties ( 'max_concurrency'='100' );"
qt_queue_1 """ select count(1) from ${table_name} """
qt_show_queue "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit from workload_groups() order by name;"
qt_show_queue "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() order by name;"

// test create group failed
// failed for cpu_share
Expand Down Expand Up @@ -251,7 +259,7 @@ suite("test_crud_wlg") {
}

// test show workload groups
qt_select_tvf_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit from workload_groups() order by name;"
qt_select_tvf_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() order by name;"

// test auth
sql """drop user if exists test_wlg_user"""
Expand Down

0 comments on commit f7ae340

Please sign in to comment.