Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](pipeline) Make all upstream tasks runnable if all tasks finishe… #41292

Merged
merged 13 commits into from
Oct 8, 2024
Merged
10 changes: 7 additions & 3 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,18 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) {
}

Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}
auto p = _parent->cast<HashJoinBuildSinkOperatorX>();
Defer defer {[&]() {
if (_should_build_hash_table && p._shared_hashtable_controller) {
p._shared_hashtable_controller->signal_finish(p.node_id());
}
}};

if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) {
return Status::OK();
if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这么写可能有问题。
比如join build 和 join probe,假如说有一个优化,join probe 先读一个block,如果读取出来的为空,直接放弃join build,此时join build的close 怎么处理?

我感觉我们得依靠close的status 来处理一些事情,或者pipeline task 里保存一下down stream eos,然后把这个down stream eos 传递给pipeline task的close 函数,可能是通过exec status 来传递。

return Base::close(state, exec_status);
}
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
Expand All @@ -137,7 +140,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu

SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我有一个问题,为啥我们要再close 里去publish rf,而不是在sink 里,判断eos 然后publish?
感觉close 只应该是释放资源,不应该干别的啊

return Status::OK();
return Base::close(state, exec_status);
}

bool HashJoinBuildSinkLocalState::build_unique() const {
Expand Down Expand Up @@ -504,6 +507,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());

local_state._eos = eos;
if (local_state._should_build_hash_table) {
// If eos or have already met a null value using short-circuit strategy, we do not need to pull
// data from probe side.
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ class PipelineXSinkLocalStateBase {
// Set to true after close() has been called. subclasses should check and set this in
// close().
bool _closed = false;
std::atomic<bool> _eos = false;
//NOTICE: now add a faker profile, because sometimes the profile record is useless
//so we want remove some counters and timers, eg: in join node, if it's broadcast_join
//and shared hash table, some counter/timer about build hash table is useless,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ class PartitionedAggSinkLocalState

std::unique_ptr<RuntimeState> _runtime_state;

bool _eos = false;
std::shared_ptr<Dependency> _finish_dependency;

// temp structures during spilling
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/spill_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class SpillSortSinkLocalState : public PipelineXSpillSinkLocalState<SpillSortSha

RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr;

bool _eos = false;
vectorized::SpillStreamSPtr _spilling_stream;
std::shared_ptr<Dependency> _finish_dependency;
};
Expand Down
13 changes: 8 additions & 5 deletions be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,14 @@ Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status exec_statu
}
RETURN_IF_ERROR(Base::close(state, exec_status));
if (exec_status.ok()) {
DCHECK(_release_count) << "Do not finish correctly! " << debug_string(0)
<< " state: { cancel = " << state->is_cancelled() << ", "
<< state->cancel_reason().to_string() << "} query ctx: { cancel = "
<< state->get_query_ctx()->is_cancelled() << ", "
<< state->get_query_ctx()->exec_status().to_string() << "}";
DCHECK(_release_count || _exchanger == nullptr ||
_exchanger->_running_source_operators == 0)
<< "Do not finish correctly! " << debug_string(0)
<< " state: { cancel = " << state->is_cancelled() << ", "
<< state->cancel_reason().to_string()
<< "} query ctx: { cancel = " << state->get_query_ctx()->is_cancelled() << ", "
<< state->get_query_ctx()->exec_status().to_string()
<< "} Exchanger: " << (void*)_exchanger;
}
return Status::OK();
}
Expand Down
11 changes: 10 additions & 1 deletion be/src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <utility>

#include "pipeline/exec/operator.h"
#include "pipeline/pipeline_task.h"

namespace doris::pipeline {

Expand Down Expand Up @@ -65,4 +66,12 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
return Status::OK();
}

} // namespace doris::pipeline
void Pipeline::make_all_runnable() {
for (auto* task : _tasks) {
if (task) {
task->clear_blocking_state(true);
}
}
}

} // namespace doris::pipeline
17 changes: 16 additions & 1 deletion be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
std::weak_ptr<PipelineFragmentContext> context)
: _pipeline_id(pipeline_id), _num_tasks(num_tasks) {
_init_profile();
_tasks.resize(_num_tasks, nullptr);
}

// Add operators for pipelineX
Expand Down Expand Up @@ -104,14 +105,24 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
void set_children(std::shared_ptr<Pipeline> child) { _children.push_back(child); }
void set_children(std::vector<std::shared_ptr<Pipeline>> children) { _children = children; }

void incr_created_tasks() { _num_tasks_created++; }
void incr_created_tasks(int i, PipelineTask* task) {
_num_tasks_created++;
_num_tasks_running++;
DCHECK_LT(i, _tasks.size());
_tasks[i] = task;
}

void make_all_runnable();

void set_num_tasks(int num_tasks) {
_num_tasks = num_tasks;
_tasks.resize(_num_tasks, nullptr);
for (auto& op : _operators) {
op->set_parallel_tasks(_num_tasks);
}
}
int num_tasks() const { return _num_tasks; }
bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; }

std::string debug_string() {
fmt::memory_buffer debug_string_buffer;
Expand Down Expand Up @@ -158,6 +169,10 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
int _num_tasks = 1;
// How many tasks are already created?
std::atomic<int> _num_tasks_created = 0;
// How many tasks are already created and not finished?
std::atomic<int> _num_tasks_running = 0;
// Tasks in this pipeline.
std::vector<PipelineTask*> _tasks;
};

} // namespace doris::pipeline
16 changes: 14 additions & 2 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ PipelineFragmentContext::~PipelineFragmentContext() {
runtime_state.reset();
}
}
_dag.clear();
_pip_id_to_pipeline.clear();
_pipelines.clear();
_sink.reset();
_root_op.reset();
Expand Down Expand Up @@ -362,6 +364,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
_task_runtime_states.resize(_pipelines.size());
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
_task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks());
_pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
}
auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());

Expand Down Expand Up @@ -466,6 +469,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
task_runtime_state.get(), this,
pipeline_id_to_profile[pip_idx].get(),
get_local_exchange_state(pipeline), i);
pipeline->incr_created_tasks(i, task.get());
task_runtime_state->set_task(task.get());
pipeline_id_to_task.insert({pipeline->id(), task.get()});
_tasks[i].emplace_back(std::move(task));
Expand Down Expand Up @@ -559,7 +563,6 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
}
}
_pipeline_parent_map.clear();
_dag.clear();
_op_id_to_le_state.clear();

return Status::OK();
Expand Down Expand Up @@ -1748,7 +1751,16 @@ void PipelineFragmentContext::_close_fragment_instance() {
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
}

void PipelineFragmentContext::close_a_pipeline() {
void PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) {
// If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
if (_dag.contains(pipeline_id)) {
for (auto dep : _dag[pipeline_id]) {
_pip_id_to_pipeline[dep]->make_all_runnable();
}
}
}
std::lock_guard<std::mutex> l(_task_mutex);
++_closed_tasks;
if (_closed_tasks == _total_tasks) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class PipelineFragmentContext : public TaskExecutionContext {

[[nodiscard]] int get_fragment_id() const { return _fragment_id; }

void close_a_pipeline();
void close_a_pipeline(PipelineId pipeline_id);

Status send_report(bool);

Expand Down Expand Up @@ -291,6 +291,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>>
_op_id_to_le_state;

std::map<PipelineId, Pipeline*> _pip_id_to_pipeline;
// UniqueId -> runtime mgr
std::map<UniqueId, std::unique_ptr<RuntimeFilterMgr>> _runtime_filter_mgr_map;

Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ PipelineTask::PipelineTask(
if (shared_state) {
_sink_shared_state = shared_state;
}
pipeline->incr_created_tasks();
}

Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink,
Expand Down Expand Up @@ -279,7 +278,7 @@ Status PipelineTask::execute(bool* eos) {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_TIMER(_exec_timer);
SCOPED_ATTACH_TASK(_state);
_eos = _sink->is_finished(_state) || _eos;
_eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream;
*eos = _eos;
if (_eos) {
// If task is waken up by finish dependency, `_eos` is set to true by last execution, and we should return here.
Expand Down
12 changes: 8 additions & 4 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ class PipelineTask {
int task_id() const { return _index; };
bool is_finalized() const { return _finalized; }

void clear_blocking_state() {
void clear_blocking_state(bool wake_up_by_downstream = false) {
_wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream;
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
// We use a lock to assure all dependencies are not deconstructed here.
std::unique_lock<std::mutex> lc(_dependency_lock);
Expand Down Expand Up @@ -231,6 +232,8 @@ class PipelineTask {
}
}

PipelineId pipeline_id() const { return _pipeline->id(); }

private:
friend class RuntimeFilterDependency;
bool _is_blocked();
Expand Down Expand Up @@ -306,11 +309,12 @@ class PipelineTask {

Dependency* _execution_dep = nullptr;

std::atomic<bool> _finalized {false};
std::atomic<bool> _finalized = false;
std::mutex _dependency_lock;

std::atomic<bool> _running {false};
std::atomic<bool> _eos {false};
std::atomic<bool> _running = false;
std::atomic<bool> _eos = false;
std::atomic<bool> _wake_up_by_downstream = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment to this field
maybe rename to _downstream_eos

};

} // namespace doris::pipeline
2 changes: 1 addition & 1 deletion be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ void _close_task(PipelineTask* task, Status exec_status) {
}
task->finalize();
task->set_running(false);
task->fragment_context()->close_a_pipeline();
task->fragment_context()->close_a_pipeline(task->pipeline_id());
}

void TaskScheduler::_do_work(size_t index) {
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters(
}
*local_merge_filters = &iter->second;
DCHECK(!iter->second.filters.empty());
DCHECK_GT(iter->second.merge_time, 0);
return Status::OK();
}

Expand Down
Loading