Skip to content

Commit

Permalink
refactor the task queue code
Browse files Browse the repository at this point in the history
  • Loading branch information
HappenLee committed Nov 5, 2024
1 parent 17df7d5 commit c7f7c76
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 89 deletions.
4 changes: 0 additions & 4 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,6 @@ Status PipelineTask::_open() {
return Status::OK();
}

void PipelineTask::set_task_queue(TaskQueue* task_queue) {
_task_queue = task_queue;
}

bool PipelineTask::_wait_to_start() {
// Before task starting, we should make sure
// 1. Execution dependency is ready (which is controlled by FE 2-phase commit)
Expand Down
8 changes: 4 additions & 4 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class PipelineFragmentContext;

namespace doris::pipeline {

class TaskQueue;
class MultiCoreTaskQueue;
class PriorityTaskQueue;
class Dependency;

Expand Down Expand Up @@ -159,8 +159,8 @@ class PipelineTask {
}
}

void set_task_queue(TaskQueue* task_queue);
TaskQueue* get_task_queue() { return _task_queue; }
void set_task_queue(MultiCoreTaskQueue* task_queue) { _task_queue = task_queue; }
MultiCoreTaskQueue* get_task_queue() { return _task_queue; }

static constexpr auto THREAD_TIME_SLICE = 100'000'000ULL;

Expand Down Expand Up @@ -257,7 +257,7 @@ class PipelineTask {
uint32_t _schedule_time = 0;
std::unique_ptr<doris::vectorized::Block> _block;
PipelineFragmentContext* _fragment_context = nullptr;
TaskQueue* _task_queue = nullptr;
MultiCoreTaskQueue* _task_queue = nullptr;

// used for priority queue
// it may be visited by different thread but there is no race condition
Expand Down
45 changes: 15 additions & 30 deletions be/src/pipeline/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

namespace doris::pipeline {
#include "common/compile_check_begin.h"
TaskQueue::~TaskQueue() = default;

PipelineTask* SubTaskQueue::try_take(bool is_steal) {
if (_queue.empty()) {
Expand Down Expand Up @@ -133,44 +132,35 @@ Status PriorityTaskQueue::push(PipelineTask* task) {

MultiCoreTaskQueue::~MultiCoreTaskQueue() = default;

MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size) : TaskQueue(core_size), _closed(false) {
_prio_task_queue_list =
std::make_shared<std::vector<std::unique_ptr<PriorityTaskQueue>>>(core_size);
for (int i = 0; i < core_size; i++) {
(*_prio_task_queue_list)[i] = std::make_unique<PriorityTaskQueue>();
}
}
MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size)
: _prio_task_queues(core_size), _closed(false), _core_size(core_size) {}

void MultiCoreTaskQueue::close() {
if (_closed) {
return;
}
_closed = true;
for (int i = 0; i < _core_size; ++i) {
(*_prio_task_queue_list)[i]->close();
}
std::atomic_store(&_prio_task_queue_list,
std::shared_ptr<std::vector<std::unique_ptr<PriorityTaskQueue>>>(nullptr));
// close all priority task queue
std::ranges::for_each(_prio_task_queues,
[](auto& prio_task_queue) { prio_task_queue.close(); });
}

PipelineTask* MultiCoreTaskQueue::take(int core_id) {
PipelineTask* task = nullptr;
auto prio_task_queue_list =
std::atomic_load_explicit(&_prio_task_queue_list, std::memory_order_relaxed);
while (!_closed) {
DCHECK(prio_task_queue_list->size() > core_id)
<< " list size: " << prio_task_queue_list->size() << " core_id: " << core_id
DCHECK(_prio_task_queues.size() > core_id)
<< " list size: " << _prio_task_queues.size() << " core_id: " << core_id
<< " _core_size: " << _core_size << " _next_core: " << _next_core.load();
task = (*prio_task_queue_list)[core_id]->try_take(false);
task = _prio_task_queues[core_id].try_take(false);
if (task) {
task->set_core_id(core_id);
break;
}
task = _steal_take(core_id, *prio_task_queue_list);
task = _steal_take(core_id);
if (task) {
break;
}
task = (*prio_task_queue_list)[core_id]->take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
task = _prio_task_queues[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
if (task) {
task->set_core_id(core_id);
break;
Expand All @@ -182,8 +172,7 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) {
return task;
}

PipelineTask* MultiCoreTaskQueue::_steal_take(
int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>& prio_task_queue_list) {
PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) {
DCHECK(core_id < _core_size);
int next_id = core_id;
for (int i = 1; i < _core_size; ++i) {
Expand All @@ -192,7 +181,7 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(
next_id = 0;
}
DCHECK(next_id < _core_size);
auto task = prio_task_queue_list[next_id]->try_take(true);
auto task = _prio_task_queues[next_id].try_take(true);
if (task) {
task->set_core_id(next_id);
return task;
Expand All @@ -212,17 +201,13 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task) {
Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) {
DCHECK(core_id < _core_size);
task->put_in_runnable_queue();
auto prio_task_queue_list =
std::atomic_load_explicit(&_prio_task_queue_list, std::memory_order_relaxed);
return (*prio_task_queue_list)[core_id]->push(task);
return _prio_task_queues[core_id].push(task);
}

void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) {
task->inc_runtime_ns(time_spent);
auto prio_task_queue_list =
std::atomic_load_explicit(&_prio_task_queue_list, std::memory_order_relaxed);
(*prio_task_queue_list)[task->get_core_id()]->inc_sub_queue_runtime(task->get_queue_level(),
time_spent);
_prio_task_queues[task->get_core_id()].inc_sub_queue_runtime(task->get_queue_level(),
time_spent);
}

} // namespace doris::pipeline
48 changes: 14 additions & 34 deletions be/src/pipeline/task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,6 @@
namespace doris::pipeline {
#include "common/compile_check_begin.h"

class TaskQueue {
public:
TaskQueue(int core_size) : _core_size(core_size) {}
virtual ~TaskQueue();
virtual void close() = 0;
// Get the task by core id.
// TODO: To think the logic is useful?
virtual PipelineTask* take(int core_id) = 0;

// push from scheduler
virtual Status push_back(PipelineTask* task) = 0;

// push from worker
virtual Status push_back(PipelineTask* task, int core_id) = 0;

virtual void update_statistics(PipelineTask* task, int64_t time_spent) {}

int cores() const { return _core_size; }

protected:
int _core_size;
static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
};

class SubTaskQueue {
friend class PriorityTaskQueue;

Expand Down Expand Up @@ -127,31 +103,35 @@ class PriorityTaskQueue {
};

// Need consider NUMA architecture
class MultiCoreTaskQueue : public TaskQueue {
class MultiCoreTaskQueue {
public:
explicit MultiCoreTaskQueue(int core_size);

~MultiCoreTaskQueue() override;
~MultiCoreTaskQueue();

void close() override;
void close();

// Get the task by core id.
PipelineTask* take(int core_id) override;
PipelineTask* take(int core_id);

// TODO combine these methods to `push_back(task, core_id = -1)`
Status push_back(PipelineTask* task) override;
Status push_back(PipelineTask* task);

Status push_back(PipelineTask* task, int core_id) override;
Status push_back(PipelineTask* task, int core_id);

void update_statistics(PipelineTask* task, int64_t time_spent) override;
void update_statistics(PipelineTask* task, int64_t time_spent);

int cores() const { return _core_size; }

private:
PipelineTask* _steal_take(
int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>& prio_task_queue_list);
PipelineTask* _steal_take(int core_id);

std::shared_ptr<std::vector<std::unique_ptr<PriorityTaskQueue>>> _prio_task_queue_list;
std::vector<PriorityTaskQueue> _prio_task_queues;
std::atomic<uint32_t> _next_core = 0;
std::atomic<bool> _closed;

int _core_size;
static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
};
#include "common/compile_check_end.h"
} // namespace doris::pipeline
14 changes: 6 additions & 8 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ TaskScheduler::~TaskScheduler() {
}

Status TaskScheduler::start() {
int cores = _task_queue->cores();
int cores = _task_queue.cores();
RETURN_IF_ERROR(ThreadPoolBuilder(_name)
.set_min_threads(cores)
.set_max_threads(cores)
Expand All @@ -67,7 +67,7 @@ Status TaskScheduler::start() {
}

Status TaskScheduler::schedule_task(PipelineTask* task) {
return _task_queue->push_back(task);
return _task_queue.push_back(task);
}

// after _close_task, task maybe destructed.
Expand Down Expand Up @@ -99,17 +99,17 @@ void _close_task(PipelineTask* task, Status exec_status) {

void TaskScheduler::_do_work(int index) {
while (_markers[index]) {
auto* task = _task_queue->take(index);
auto* task = _task_queue.take(index);
if (!task) {
continue;
}
if (task->is_running()) {
static_cast<void>(_task_queue->push_back(task, index));
static_cast<void>(_task_queue.push_back(task, index));
continue;
}
task->log_detail_if_need();
task->set_running(true);
task->set_task_queue(_task_queue.get());
task->set_task_queue(&_task_queue);
auto* fragment_ctx = task->fragment_context();
bool canceled = fragment_ctx->is_canceled();

Expand Down Expand Up @@ -189,9 +189,7 @@ void TaskScheduler::_do_work(int index) {

void TaskScheduler::stop() {
if (!_shutdown) {
if (_task_queue) {
_task_queue->close();
}
_task_queue.close();
if (_fix_thread_pool) {
for (size_t i = 0; i < _markers.size(); ++i) {
_markers[i] = false;
Expand Down
9 changes: 4 additions & 5 deletions be/src/pipeline/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "gutil/ref_counted.h"
#include "pipeline_task.h"
#include "runtime/workload_group/workload_group.h"
#include "task_queue.h"
#include "util/thread.h"

namespace doris {
Expand All @@ -39,13 +40,11 @@ class ThreadPool;
} // namespace doris

namespace doris::pipeline {
class TaskQueue;

class TaskScheduler {
public:
TaskScheduler(std::shared_ptr<TaskQueue> task_queue, std::string name,
CgroupCpuCtl* cgroup_cpu_ctl)
: _task_queue(std::move(task_queue)),
TaskScheduler(int core_num, std::string name, CgroupCpuCtl* cgroup_cpu_ctl)
: _task_queue(core_num),
_shutdown(false),
_name(std::move(name)),
_cgroup_cpu_ctl(cgroup_cpu_ctl) {}
Expand All @@ -62,7 +61,7 @@ class TaskScheduler {

private:
std::unique_ptr<ThreadPool> _fix_thread_pool;
std::shared_ptr<TaskQueue> _task_queue;
MultiCoreTaskQueue _task_queue;
std::vector<bool> _markers;
bool _shutdown;
std::string _name;
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,8 @@ Status ExecEnv::init_pipeline_task_scheduler() {

LOG_INFO("pipeline executors_size set ").tag("size", executors_size);
// TODO pipeline workload group combie two blocked schedulers.
auto t_queue = std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);
_without_group_task_scheduler =
new pipeline::TaskScheduler(t_queue, "PipeNoGSchePool", nullptr);
new pipeline::TaskScheduler(executors_size, "PipeNoGSchePool", nullptr);
RETURN_IF_ERROR(_without_group_task_scheduler->start());

_runtime_filter_timer_queue = new doris::pipeline::RuntimeFilterTimerQueue();
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,8 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
if (executors_size <= 0) {
executors_size = CpuInfo::num_cores();
}
auto task_queue = std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);
std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler =
std::make_unique<pipeline::TaskScheduler>(std::move(task_queue), "Pipe_" + tg_name,
std::make_unique<pipeline::TaskScheduler>(executors_size, "Pipe_" + tg_name,
cg_cpu_ctl_ptr);
Status ret = pipeline_task_scheduler->start();
if (ret.ok()) {
Expand Down

0 comments on commit c7f7c76

Please sign in to comment.