From f7ae340a5df9eff3199c1d2a72c24ccc7f8a8706 Mon Sep 17 00:00:00 2001 From: wangbo <506340561@qq.com> Date: Sun, 18 Feb 2024 17:47:35 +0800 Subject: [PATCH] Add scan_thread_num property for workload group --- be/src/runtime/task_group/task_group.cpp | 15 ++++++-- be/src/runtime/task_group/task_group.h | 2 ++ .../runtime/task_group/task_group_manager.cpp | 4 +++ be/src/vec/exec/scan/scanner_scheduler.cpp | 2 +- be/src/vec/exec/scan/scanner_scheduler.h | 35 ++++++++++--------- .../resource/workloadgroup/WorkloadGroup.java | 24 ++++++++++++- .../workloadgroup/WorkloadGroupMgr.java | 1 + .../tablefunction/MetadataGenerator.java | 5 +-- .../WorkloadGroupsTableValuedFunction.java | 1 + gensrc/thrift/BackendService.thrift | 1 + .../workload_manager_p0/test_curd_wlg.out | 24 ++++++------- .../workload_manager_p0/test_curd_wlg.groovy | 20 +++++++---- 12 files changed, 92 insertions(+), 42 deletions(-) diff --git a/be/src/runtime/task_group/task_group.cpp b/be/src/runtime/task_group/task_group.cpp index e0b0dc1fb5ef81..ee1be7027688d8 100644 --- a/be/src/runtime/task_group/task_group.cpp +++ b/be/src/runtime/task_group/task_group.cpp @@ -51,15 +51,17 @@ TaskGroup::TaskGroup(const TaskGroupInfo& tg_info) _enable_memory_overcommit(tg_info.enable_memory_overcommit), _cpu_share(tg_info.cpu_share), _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM), - _cpu_hard_limit(tg_info.cpu_hard_limit) {} + _cpu_hard_limit(tg_info.cpu_hard_limit), + _scan_thread_num(tg_info.scan_thread_num) {} std::string TaskGroup::debug_string() const { std::shared_lock rl {_mutex}; return fmt::format( "TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, enable_memory_overcommit = " - "{}, version = {}, cpu_hard_limit = {}]", + "{}, version = {}, cpu_hard_limit = {}, scan_thread_num = {}]", _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, TUnit::BYTES), - _enable_memory_overcommit ? "true" : "false", _version, cpu_hard_limit()); + _enable_memory_overcommit ? "true" : "false", _version, cpu_hard_limit(), + _scan_thread_num); } void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) { @@ -81,6 +83,7 @@ void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) { _enable_memory_overcommit = tg_info.enable_memory_overcommit; _cpu_share = tg_info.cpu_share; _cpu_hard_limit = tg_info.cpu_hard_limit; + _scan_thread_num = tg_info.scan_thread_num; } else { return; } @@ -185,6 +188,12 @@ Status TaskGroupInfo::parse_topic_info(const TWorkloadGroupInfo& workload_group_ } task_group_info->enable_cpu_hard_limit = enable_cpu_hard_limit; + // 9 scan thread num + task_group_info->scan_thread_num = config::doris_scanner_thread_pool_thread_num; + if (workload_group_info.__isset.scan_thread_num && workload_group_info.scan_thread_num > 0) { + task_group_info->scan_thread_num = workload_group_info.scan_thread_num; + } + return Status::OK(); } diff --git a/be/src/runtime/task_group/task_group.h b/be/src/runtime/task_group/task_group.h index 647d088e8174c8..3767731435a6f1 100644 --- a/be/src/runtime/task_group/task_group.h +++ b/be/src/runtime/task_group/task_group.h @@ -134,6 +134,7 @@ class TaskGroup : public std::enable_shared_from_this { std::atomic _cpu_share; std::vector _mem_tracker_limiter_pool; std::atomic _cpu_hard_limit; + std::atomic _scan_thread_num; // means task group is mark dropped // new query can not submit @@ -153,6 +154,7 @@ struct TaskGroupInfo { int64_t version; int cpu_hard_limit; bool enable_cpu_hard_limit; + int scan_thread_num; // log cgroup cpu info uint64_t cgroup_cpu_shares = 0; int cgroup_cpu_hard_limit = 0; diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp index 05be653747da6e..18e446295cb650 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -96,6 +96,7 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i int cpu_hard_limit = tg_info->cpu_hard_limit; uint64_t cpu_shares = tg_info->cpu_share; bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit; + int scan_thread_num = tg_info->scan_thread_num; std::lock_guard write_lock(_task_scheduler_lock); // step 1: init cgroup cpu controller @@ -142,6 +143,9 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i return Status::InternalError("scan scheduler start failed, gid={}", tg_id); } } + if (scan_thread_num > 0 && _tg_scan_sche_map.find(tg_id) != _tg_scan_sche_map.end()) { + _tg_scan_sche_map.at(tg_id)->reset_thread_num(scan_thread_num); + } // step 4: init non-pipe scheduler if (_non_pipe_thread_pool_map.find(tg_id) == _non_pipe_thread_pool_map.end()) { diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 40fff7ed70c1f6..d3fd740b43715b 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -162,7 +162,7 @@ void ScannerScheduler::submit(std::shared_ptr ctx, this->_scanner_scan(ctx, scanner_ref); }; SimplifiedScanTask simple_scan_task = {work_func, ctx}; - ret = scan_sche->get_scan_queue()->try_put(simple_scan_task); + ret = scan_sche->submit_scan_task(simple_scan_task); } else { PriorityThreadPool::Task task; task.work_function = [this, scanner_ref = scan_task, ctx]() { diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 7a602038956d11..746aa34ff9aa63 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -99,12 +99,9 @@ struct SimplifiedScanTask { std::shared_ptr scanner_context = nullptr; }; -// used for cpu hard limit class SimplifiedScanScheduler { public: SimplifiedScanScheduler(std::string wg_name, CgroupCpuCtl* cgroup_cpu_ctl) { - _scan_task_queue = std::make_unique>( - config::doris_scanner_thread_pool_queue_size); _is_stop.store(false); _cgroup_cpu_ctl = cgroup_cpu_ctl; _wg_name = wg_name; @@ -117,7 +114,6 @@ class SimplifiedScanScheduler { void stop() { _is_stop.store(true); - _scan_task_queue->shutdown(); _scan_thread_pool->shutdown(); _scan_thread_pool->wait(); } @@ -128,27 +124,32 @@ class SimplifiedScanScheduler { .set_max_threads(config::doris_scanner_thread_pool_thread_num) .set_cgroup_cpu_ctl(_cgroup_cpu_ctl) .build(&_scan_thread_pool)); - - for (int i = 0; i < config::doris_scanner_thread_pool_thread_num; i++) { - RETURN_IF_ERROR(_scan_thread_pool->submit_func([this] { this->_work(); })); - } return Status::OK(); } - BlockingQueue* get_scan_queue() { return _scan_task_queue.get(); } + Status submit_scan_task(SimplifiedScanTask scan_task) { + if (!_is_stop) { + return _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); }); + } else { + return Status::InternalError("scanner pool {} is shutdown.", _wg_name); + } + } -private: - void _work() { - while (!_is_stop.load()) { - SimplifiedScanTask scan_task; - if (_scan_task_queue->blocking_get(&scan_task)) { - scan_task.scan_func(); - }; + void reset_thread_num(int thread_num) { + int max_thread_num = _scan_thread_pool->max_threads(); + if (max_thread_num != thread_num) { + if (thread_num > max_thread_num) { + static_cast(_scan_thread_pool->set_max_threads(thread_num)); + static_cast(_scan_thread_pool->set_min_threads(thread_num)); + } else { + static_cast(_scan_thread_pool->set_min_threads(thread_num)); + static_cast(_scan_thread_pool->set_max_threads(thread_num)); + } } } +private: std::unique_ptr _scan_thread_pool; - std::unique_ptr> _scan_task_queue; std::atomic _is_stop; CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; std::string _wg_name; diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index 91a516b9c52af8..83254bf926310c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -61,12 +61,14 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { public static final String QUEUE_TIMEOUT = "queue_timeout"; + public static final String SCAN_THREAD_NUM = "scan_thread_num"; + // NOTE(wb): all property is not required, some properties default value is set in be // default value is as followed // cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true private static final ImmutableSet ALL_PROPERTIES_NAME = new ImmutableSet.Builder() .add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY) - .add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).build(); + .add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM).build(); @SerializedName(value = "id") private long id; @@ -208,6 +210,19 @@ private static void checkProperties(Map properties) throws DdlEx } } + if (properties.containsKey(SCAN_THREAD_NUM)) { + String value = properties.get(SCAN_THREAD_NUM); + try { + int intValue = Integer.parseInt(value); + if (intValue <= 0 && intValue != -1) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new DdlException( + SCAN_THREAD_NUM + " must be a positive integer or -1. but input value is " + value); + } + } + // check queue property if (properties.containsKey(MAX_CONCURRENCY)) { try { @@ -290,6 +305,8 @@ public void getProcNodeData(BaseProcResult result, QueryQueue qq) { row.add("0%"); } else if (ENABLE_MEMORY_OVERCOMMIT.equals(key) && !properties.containsKey(key)) { row.add("true"); + } else if (SCAN_THREAD_NUM.equals(key) && !properties.containsKey(key)) { + row.add("-1"); } else { row.add(properties.get(key)); } @@ -345,6 +362,11 @@ public TopicInfo toTopicInfo() { + "id=" + id + ",name=" + name); } + String scanThreadNumStr = properties.get(SCAN_THREAD_NUM); + if (scanThreadNumStr != null) { + tWorkloadGroupInfo.setScanThreadNum(Integer.parseInt(scanThreadNumStr)); + } + TopicInfo topicInfo = new TopicInfo(); topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo); return topicInfo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 44c5de41947873..b039340080a9b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -70,6 +70,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { .add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT) .add(WorkloadGroup.MAX_CONCURRENCY).add(WorkloadGroup.MAX_QUEUE_SIZE) .add(WorkloadGroup.QUEUE_TIMEOUT).add(WorkloadGroup.CPU_HARD_LIMIT) + .add(WorkloadGroup.SCAN_THREAD_NUM) .add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM) .build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 2e0de09d29bec3..5ad2a7d5de3559 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -381,8 +381,9 @@ private static TFetchSchemaTableDataResult workloadGroupsMetadataResult(TMetadat trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(6)))); // max queue size trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(7)))); // queue timeout trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(8))); // cpu hard limit - trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(9)))); // running query num - trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10)))); // waiting query num + trow.addToColumnValue(new TCell().setIntVal(Integer.parseInt(rGroupsInfo.get(9)))); // scan thread num + trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10)))); // running query num + trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(11)))); // waiting query num dataBatch.add(trow); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadGroupsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadGroupsTableValuedFunction.java index 5b34db5a684272..a50fb7ca8531da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadGroupsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadGroupsTableValuedFunction.java @@ -49,6 +49,7 @@ public class WorkloadGroupsTableValuedFunction extends MetadataTableValuedFuncti new Column(WorkloadGroup.MAX_QUEUE_SIZE, ScalarType.createType(PrimitiveType.BIGINT)), new Column(WorkloadGroup.QUEUE_TIMEOUT, ScalarType.createType(PrimitiveType.BIGINT)), new Column(WorkloadGroup.CPU_HARD_LIMIT, ScalarType.createStringType()), + new Column(WorkloadGroup.SCAN_THREAD_NUM, ScalarType.createType(PrimitiveType.INT)), new Column(QueryQueue.RUNNING_QUERY_NUM, ScalarType.createType(PrimitiveType.BIGINT)), new Column(QueryQueue.WAITING_QUERY_NUM, ScalarType.createType(PrimitiveType.BIGINT))); diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 8559698ffd7368..e4276fe59edc8d 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -257,6 +257,7 @@ struct TWorkloadGroupInfo { 6: optional string mem_limit 7: optional bool enable_memory_overcommit 8: optional bool enable_cpu_hard_limit + 9: optional i32 scan_thread_num } struct TWorkloadMoveQueryToGroupAction { diff --git a/regression-test/data/workload_manager_p0/test_curd_wlg.out b/regression-test/data/workload_manager_p0/test_curd_wlg.out index 1f3fbaf9e7470a..77b9fa75be8da9 100644 --- a/regression-test/data/workload_manager_p0/test_curd_wlg.out +++ b/regression-test/data/workload_manager_p0/test_curd_wlg.out @@ -6,15 +6,15 @@ 2 -- !show_1 -- -normal 20 50% true 2147483647 0 0 1% -test_group 10 10% true 2147483647 0 0 0% +normal 20 50% true 2147483647 0 0 1% 16 +test_group 10 10% true 2147483647 0 0 0% -1 -- !mem_limit_1 -- 2 -- !mem_limit_2 -- -normal 20 50% true 2147483647 0 0 1% -test_group 10 11% true 2147483647 0 0 0% +normal 20 50% true 2147483647 0 0 1% 16 +test_group 10 11% true 2147483647 0 0 0% -1 -- !mem_overcommit_1 -- 2 @@ -23,24 +23,24 @@ test_group 10 11% true 2147483647 0 0 0% 2 -- !mem_overcommit_3 -- -normal 20 50% true 2147483647 0 0 1% -test_group 10 11% false 2147483647 0 0 0% +normal 20 50% true 2147483647 0 0 1% 16 +test_group 10 11% false 2147483647 0 0 0% -1 -- !cpu_hard_limit_1 -- 2 -- !cpu_hard_limit_2 -- -normal 20 50% true 2147483647 0 0 1% -test_group 10 11% false 2147483647 0 0 20% +normal 20 50% true 2147483647 0 0 1% 16 +test_group 10 11% false 2147483647 0 0 20% -1 -- !queue_1 -- 2 -- !show_queue -- -normal 20 50% true 2147483647 0 0 1% -test_group 10 11% false 100 0 0 20% +normal 20 50% true 2147483647 0 0 1% 16 +test_group 10 11% false 100 0 0 20% -1 -- !select_tvf_1 -- -normal 20 50% true 2147483647 0 0 1% -test_group 10 11% false 100 0 0 20% +normal 20 50% true 2147483647 0 0 1% 16 +test_group 10 11% false 100 0 0 20% -1 diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy index 3e56525acca78f..1c3536eefc5400 100644 --- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy +++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy @@ -80,12 +80,14 @@ suite("test_crud_wlg") { sql "alter workload group normal properties ( 'max_queue_size'='0' );" sql "alter workload group normal properties ( 'queue_timeout'='0' );" sql "alter workload group normal properties ( 'cpu_hard_limit'='1%' );" + sql "alter workload group normal properties ( 'scan_thread_num'='-1' );" sql "set workload_group=normal;" // test cpu_share qt_cpu_share """ select count(1) from ${table_name} """ + sql "alter workload group normal properties ( 'scan_thread_num'='16' );" sql "alter workload group normal properties ( 'cpu_share'='20' );" qt_cpu_share_2 """ select count(1) from ${table_name} """ @@ -96,6 +98,12 @@ suite("test_crud_wlg") { exception "requires a positive integer" } + test { + sql "alter workload group normal properties ( 'scan_thread_num'='0' );" + + exception "scan_thread_num must be a positive integer or -1" + } + sql "drop workload group if exists test_group;" // test create group @@ -107,7 +115,7 @@ suite("test_crud_wlg") { ");" sql "set workload_group=test_group;" - qt_show_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit from workload_groups() order by name;" + qt_show_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() order by name;" // test memory_limit test { @@ -118,7 +126,7 @@ suite("test_crud_wlg") { sql "alter workload group test_group properties ( 'memory_limit'='11%' );" qt_mem_limit_1 """ select count(1) from ${table_name} """ - qt_mem_limit_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit from workload_groups() order by name;" + qt_mem_limit_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() order by name;" // test enable_memory_overcommit test { @@ -131,7 +139,7 @@ suite("test_crud_wlg") { qt_mem_overcommit_1 """ select count(1) from ${table_name} """ sql "alter workload group test_group properties ( 'enable_memory_overcommit'='false' );" qt_mem_overcommit_2 """ select count(1) from ${table_name} """ - qt_mem_overcommit_3 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit from workload_groups() order by name;" + qt_mem_overcommit_3 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() order by name;" // test cpu_hard_limit test { @@ -150,7 +158,7 @@ suite("test_crud_wlg") { sql "alter workload group test_group properties ( 'cpu_hard_limit'='20%' );" qt_cpu_hard_limit_1 """ select count(1) from ${table_name} """ - qt_cpu_hard_limit_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit from workload_groups() order by name;" + qt_cpu_hard_limit_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() order by name;" // test query queue test { @@ -173,7 +181,7 @@ suite("test_crud_wlg") { sql "alter workload group test_group properties ( 'max_concurrency'='100' );" qt_queue_1 """ select count(1) from ${table_name} """ - qt_show_queue "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit from workload_groups() order by name;" + qt_show_queue "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() order by name;" // test create group failed // failed for cpu_share @@ -251,7 +259,7 @@ suite("test_crud_wlg") { } // test show workload groups - qt_select_tvf_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit from workload_groups() order by name;" + qt_select_tvf_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() order by name;" // test auth sql """drop user if exists test_wlg_user"""