diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 9b0a959295028e..0e851fba17a035 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -540,26 +540,20 @@ Status TaskWorkerPool::submit_task(const TAgentTaskRequest& task) { } PriorTaskWorkerPool::PriorTaskWorkerPool( - std::string_view name, int normal_worker_count, int high_prior_worker_count, + const std::string& name, int normal_worker_count, int high_prior_worker_count, std::function callback) : _callback(std::move(callback)) { - auto st = ThreadPoolBuilder(fmt::format("TaskWP_.{}", name)) - .set_min_threads(normal_worker_count) - .set_max_threads(normal_worker_count) - .build(&_normal_pool); - CHECK(st.ok()) << name << ": " << st; - - st = _normal_pool->submit_func([this] { normal_loop(); }); - CHECK(st.ok()) << name << ": " << st; - - st = ThreadPoolBuilder(fmt::format("HighPriorPool.{}", name)) - .set_min_threads(high_prior_worker_count) - .set_max_threads(high_prior_worker_count) - .build(&_high_prior_pool); - CHECK(st.ok()) << name << ": " << st; + for (int i = 0; i < normal_worker_count; ++i) { + auto st = Thread::create( + "Normal", name, [this] { normal_loop(); }, &_workers.emplace_back()); + CHECK(st.ok()) << name << ": " << st; + } - st = _high_prior_pool->submit_func([this] { high_prior_loop(); }); - CHECK(st.ok()) << name << ": " << st; + for (int i = 0; i < high_prior_worker_count; ++i) { + auto st = Thread::create( + "HighPrior", name, [this] { high_prior_loop(); }, &_workers.emplace_back()); + CHECK(st.ok()) << name << ": " << st; + } } PriorTaskWorkerPool::~PriorTaskWorkerPool() { @@ -578,12 +572,10 @@ void PriorTaskWorkerPool::stop() { _normal_condv.notify_all(); _high_prior_condv.notify_all(); - if (_normal_pool) { - _normal_pool->shutdown(); - } - - if (_high_prior_pool) { - _high_prior_pool->shutdown(); + for (auto&& w : _workers) { + if (w) { + w->join(); + } } } diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index 514692968b474c..f51d6c2a4c0dc0 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -79,7 +79,8 @@ class PublishVersionWorkerPool final : public TaskWorkerPool { class PriorTaskWorkerPool final : public TaskWorkerPoolIf { public: - PriorTaskWorkerPool(std::string_view name, int normal_worker_count, int high_prior_worker_count, + PriorTaskWorkerPool(const std::string& name, int normal_worker_count, + int high_prior_worker_count, std::function callback); ~PriorTaskWorkerPool() override; @@ -101,8 +102,7 @@ class PriorTaskWorkerPool final : public TaskWorkerPoolIf { std::condition_variable _high_prior_condv; std::deque> _high_prior_queue; - std::unique_ptr _normal_pool; - std::unique_ptr _high_prior_pool; + std::vector> _workers; std::function _callback; };