Skip to content

Commit

Permalink
refine some codes for pipeline (#7406)
Browse files Browse the repository at this point in the history
ref #6518
  • Loading branch information
SeaRise authored Apr 28, 2023
1 parent 62823df commit 90784b1
Show file tree
Hide file tree
Showing 14 changed files with 158 additions and 76 deletions.
112 changes: 79 additions & 33 deletions dbms/src/Flash/Pipeline/Schedule/Events/Event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,50 +38,54 @@ extern const char random_pipeline_model_event_finish_failpoint[];
exec_status.onErrorOccurred(std::current_exception()); \
}

void Event::addInput(const EventPtr & input) noexcept
void Event::addInput(const EventPtr & input)
{
RUNTIME_ASSERT(status == EventStatus::INIT);
RUNTIME_ASSERT(input.get() != this);
assertStatus(EventStatus::INIT);
RUNTIME_ASSERT(input.get() != this, log, "Cannot create circular dependency");
input->addOutput(shared_from_this());
++unfinished_inputs;
is_source = false;
}

void Event::addOutput(const EventPtr & output) noexcept
void Event::addOutput(const EventPtr & output)
{
/// Output will also be added in the Finished state, as can be seen in the `insertEvent`.
RUNTIME_ASSERT(status == EventStatus::INIT || status == EventStatus::FINISHED);
RUNTIME_ASSERT(output.get() != this);
assertStatus(EventStatus::INIT, EventStatus::FINISHED);
RUNTIME_ASSERT(output.get() != this, log, "Cannot create circular dependency");
outputs.push_back(output);
}

void Event::insertEvent(const EventPtr & insert_event) noexcept
void Event::insertEvent(const EventPtr & insert_event)
{
RUNTIME_ASSERT(status == EventStatus::FINISHED);
RUNTIME_ASSERT(insert_event);
assertStatus(EventStatus::FINISHED);
RUNTIME_ASSERT(insert_event, log, "The insert event cannot be nullptr");
/// eventA───────►eventB ===> eventA─────────────►eventB
/// │ ▲
/// └────►insert_event───┘
for (const auto & output : outputs)
{
RUNTIME_ASSERT(output);
RUNTIME_ASSERT(output, log, "output event cannot be nullptr");
output->addInput(insert_event);
}
insert_event->addInput(shared_from_this());
RUNTIME_ASSERT(!insert_event->prepare());
RUNTIME_ASSERT(!insert_event->prepare(), log, "The insert event cannot be source event");
}

void Event::onInputFinish() noexcept
void Event::onInputFinish()
{
auto cur_value = unfinished_inputs.fetch_sub(1);
RUNTIME_ASSERT(cur_value >= 1);
if (1 == cur_value)
auto cur_value = unfinished_inputs.fetch_sub(1) - 1;
RUNTIME_ASSERT(
cur_value >= 0,
log,
"unfinished_inputs cannot < 0, but actual value is {}",
cur_value);
if (0 == cur_value)
schedule();
}

bool Event::prepare() noexcept
bool Event::prepare()
{
RUNTIME_ASSERT(status == EventStatus::INIT);
assertStatus(EventStatus::INIT);
if (is_source)
{
// For source event, `exec_status.onEventSchedule()` needs to be called before schedule.
Expand All @@ -99,23 +103,27 @@ bool Event::prepare() noexcept
}
}

void Event::addTask(TaskPtr && task) noexcept
void Event::addTask(TaskPtr && task)
{
RUNTIME_ASSERT(status == EventStatus::SCHEDULED);
assertStatus(EventStatus::SCHEDULED);
++unfinished_tasks;
tasks.push_back(std::move(task));
}

void Event::schedule() noexcept
void Event::schedule()
{
RUNTIME_ASSERT(0 == unfinished_inputs);
RUNTIME_ASSERT(
0 == unfinished_inputs,
log,
"unfinished_inputs must be 0 in `schedule`, but actual value is {}",
unfinished_inputs);
if (is_source)
{
RUNTIME_ASSERT(status == EventStatus::SCHEDULED);
assertStatus(EventStatus::SCHEDULED);
}
else
{
// for is_source == true, `exec_status.onEventSchedule()` has been called in `prepareForSource`.
// for is_source == true, `exec_status.onEventSchedule()` has been called in `prepare`.
switchStatus(EventStatus::INIT, EventStatus::SCHEDULED);
exec_status.onEventSchedule();
}
Expand All @@ -129,10 +137,15 @@ void Event::schedule() noexcept
scheduleTasks();
}

void Event::scheduleTasks() noexcept
void Event::scheduleTasks()
{
RUNTIME_ASSERT(status == EventStatus::SCHEDULED);
RUNTIME_ASSERT(tasks.size() == static_cast<size_t>(unfinished_tasks));
assertStatus(EventStatus::SCHEDULED);
RUNTIME_ASSERT(
tasks.size() == static_cast<size_t>(unfinished_tasks),
log,
"{} does not equal to {}",
tasks.size(),
unfinished_tasks);
if (!tasks.empty())
{
// If query has already been cancelled, we can skip scheduling tasks.
Expand All @@ -151,17 +164,21 @@ void Event::scheduleTasks() noexcept
}
}

void Event::onTaskFinish() noexcept
void Event::onTaskFinish()
{
RUNTIME_ASSERT(status == EventStatus::SCHEDULED);
assertStatus(EventStatus::SCHEDULED);
int32_t remaining_tasks = unfinished_tasks.fetch_sub(1) - 1;
RUNTIME_ASSERT(remaining_tasks >= 0);
RUNTIME_ASSERT(
remaining_tasks >= 0,
log,
"remaining_tasks must >= 0, but actual value is {}",
remaining_tasks);
LOG_DEBUG(log, "one task finished, {} tasks remaining", remaining_tasks);
if (0 == remaining_tasks)
finish();
}

void Event::finish() noexcept
void Event::finish()
{
switchStatus(EventStatus::SCHEDULED, EventStatus::FINISHED);
MemoryTrackerSetter setter{true, mem_tracker.get()};
Expand All @@ -177,7 +194,7 @@ void Event::finish() noexcept
// finished processing the event, now we can schedule output events.
for (auto & output : outputs)
{
RUNTIME_ASSERT(output);
RUNTIME_ASSERT(output, log, "output event cannot be nullptr");
output->onInputFinish();
output.reset();
}
Expand All @@ -192,12 +209,41 @@ void Event::finish() noexcept
exec_status.onEventFinish();
}

void Event::switchStatus(EventStatus from, EventStatus to) noexcept
void Event::switchStatus(EventStatus from, EventStatus to)
{
RUNTIME_ASSERT(status.compare_exchange_strong(from, to));
RUNTIME_ASSERT(
status.compare_exchange_strong(from, to),
log,
"switch from {} to {} fail, because the current status is {}",
magic_enum::enum_name(from),
magic_enum::enum_name(to),
magic_enum::enum_name(status.load()));
LOG_DEBUG(log, "switch status: {} --> {}", magic_enum::enum_name(from), magic_enum::enum_name(to));
}

void Event::assertStatus(EventStatus expect)
{
auto cur_status = status.load();
RUNTIME_ASSERT(
cur_status == expect,
log,
"actual status is {}, but expect status is {}",
magic_enum::enum_name(cur_status),
magic_enum::enum_name(expect));
}

void Event::assertStatus(EventStatus expect1, EventStatus expect2)
{
auto cur_status = status.load();
RUNTIME_ASSERT(
cur_status == expect1 || cur_status == expect2,
log,
"actual status is {}, but expect status are {} and {}",
magic_enum::enum_name(cur_status),
magic_enum::enum_name(expect1),
magic_enum::enum_name(expect2));
}

#undef CATCH

} // namespace DB
28 changes: 15 additions & 13 deletions dbms/src/Flash/Pipeline/Schedule/Events/Event.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,18 @@ class Event : public std::enable_shared_from_this<Event>
{}
virtual ~Event() = default;

void addInput(const EventPtr & input) noexcept;
void addInput(const EventPtr & input);

// schedule, onTaskFinish and finish maybe called directly in TaskScheduler,
// so these functions must be noexcept.
void schedule() noexcept;
void schedule();

void onTaskFinish() noexcept;
void onTaskFinish();

// return true for source event.
bool prepare() noexcept;
bool prepare();

protected:
// add task ready to be scheduled.
void addTask(TaskPtr && task) noexcept;
void addTask(TaskPtr && task);

// Generate the tasks ready to be scheduled and use `addTask` to add the tasks.
virtual void scheduleImpl() {}
Expand All @@ -73,18 +71,22 @@ class Event : public std::enable_shared_from_this<Event>
virtual void finishImpl() {}

/// This method can only be called in finishImpl and is used to dynamically adjust the topology of events.
void insertEvent(const EventPtr & insert_event) noexcept;
void insertEvent(const EventPtr & insert_event);

private:
void scheduleTasks() noexcept;
void scheduleTasks();

void finish() noexcept;
void finish();

void addOutput(const EventPtr & output) noexcept;
void addOutput(const EventPtr & output);

void onInputFinish() noexcept;
void onInputFinish();

void switchStatus(EventStatus from, EventStatus to) noexcept;
void switchStatus(EventStatus from, EventStatus to);

void assertStatus(EventStatus expect);

void assertStatus(EventStatus expect1, EventStatus expect2);

protected:
PipelineExecutorStatus & exec_status;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Pipeline/Schedule/TaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ void TaskScheduler::submit(std::vector<TaskPtr> & tasks) noexcept
waiting_tasks.push_back(std::move(task));
break;
case FINISH_STATUS:
task->finalize();
task.reset();
break;
default:
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Pipeline/Schedule/TaskThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ void TaskThreadPool<Impl>::handleTask(TaskPtr & task, const LoggerPtr & log) noe
scheduler.submitToWaitReactor(std::move(task));
break;
case FINISH_STATUS:
task->finalize();
task.reset();
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ AggregateFinalSpillTask::AggregateFinalSpillTask(
assert(agg_context);
}

void AggregateFinalSpillTask::finalizeImpl()
void AggregateFinalSpillTask::doFinalizeImpl()
{
agg_context.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class AggregateFinalSpillTask : public IOEventTask
protected:
ExecTaskStatus doExecuteIOImpl() override;

void finalizeImpl() override;
void doFinalizeImpl() override;

private:
AggregateContextPtr agg_context;
Expand Down
20 changes: 4 additions & 16 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,11 @@ EventTask::~EventTask()
event.reset();
}

void EventTask::finalize() noexcept
void EventTask::finalizeImpl() noexcept
{
try
{
RUNTIME_CHECK(!finalized);
finalized = true;
finalizeImpl();
doFinalizeImpl();
}
catch (...)
{
Expand All @@ -82,26 +80,16 @@ ExecTaskStatus EventTask::awaitImpl() noexcept
ExecTaskStatus EventTask::doTaskAction(std::function<ExecTaskStatus()> && action)
{
if (unlikely(exec_status.isCancelled()))
{
finalize();
return ExecTaskStatus::CANCELLED;
}

try
{
auto status = action();
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_pipeline_model_task_run_failpoint);
switch (status)
{
case FINISH_STATUS:
finalize();
default:
return status;
}
return status;
}
catch (...)
{
finalize();
assert(event);
LOG_WARNING(log, "error occurred and cancel the query");
exec_status.onErrorOccurred(std::current_exception());
return ExecTaskStatus::ERROR;
Expand Down
6 changes: 2 additions & 4 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/EventTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,15 @@ class EventTask : public Task
ExecTaskStatus awaitImpl() noexcept override;
virtual ExecTaskStatus doAwaitImpl() { return ExecTaskStatus::RUNNING; };

// Used to release held resources, just like `Event::finishImpl`.
void finalize() noexcept;
virtual void finalizeImpl(){};
void finalizeImpl() noexcept override;
virtual void doFinalizeImpl(){};

private:
ExecTaskStatus doTaskAction(std::function<ExecTaskStatus()> && action);

private:
PipelineExecutorStatus & exec_status;
EventPtr event;
bool finalized = false;
};

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ PipelineTask::PipelineTask(
pipeline_exec->executePrefix();
}

void PipelineTask::finalizeImpl()
void PipelineTask::doFinalizeImpl()
{
RUNTIME_CHECK(pipeline_exec);
pipeline_exec->executeSuffix();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class PipelineTask : public EventTask

ExecTaskStatus doAwaitImpl() override;

void finalizeImpl() override;
void doFinalizeImpl() override;

private:
PipelineExecPtr pipeline_exec;
Expand Down
Loading

0 comments on commit 90784b1

Please sign in to comment.