Skip to content

Commit

Permalink
[fix](pipeline) Make all upstream tasks runnable if all tasks finishe… (
Browse files Browse the repository at this point in the history
apache#41292)

Consider 3 pipelines in this fragment (... -> join -> shuffle) :
pipeline 0 : `... -> local exchange sink`
pipeline 1 : `... -> join build (INNER JOIN)`
pipeline 2 : `local exchange source -> join probe (INNER JOIN) -> data
stream sender `

Assume the JoinBuild returned 0 rows, join probe can finish directly
once join build finished and do not need to wait for the `local exchange
sink` finished. In this case, if pipeline 0 is blocked by a dependency
for a long time, pipeline 2 should notify pipeline 0 to finish.
  • Loading branch information
Gabriel39 committed Oct 16, 2024
1 parent b185dfc commit c2436f8
Show file tree
Hide file tree
Showing 15 changed files with 67 additions and 17 deletions.
10 changes: 7 additions & 3 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,18 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) {
}

Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}
auto p = _parent->cast<HashJoinBuildSinkOperatorX>();
Defer defer {[&]() {
if (_should_build_hash_table && p._shared_hashtable_controller) {
p._shared_hashtable_controller->signal_finish(p.node_id());
}
}};

if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) {
return Status::OK();
if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos) {
return Base::close(state, exec_status);
}
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
Expand All @@ -148,7 +151,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu

SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
return Status::OK();
return Base::close(state, exec_status);
}

bool HashJoinBuildSinkLocalState::build_unique() const {
Expand Down Expand Up @@ -519,6 +522,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());

local_state._eos = eos;
if (local_state._should_build_hash_table) {
// If eos or have already met a null value using short-circuit strategy, we do not need to pull
// data from probe side.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ class PartitionedAggSinkLocalState

std::unique_ptr<RuntimeState> _runtime_state;

bool _eos = false;
std::shared_ptr<Dependency> _finish_dependency;

// temp structures during spilling
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/spill_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class SpillSortSinkLocalState : public PipelineXSpillSinkLocalState<SpillSortSha

RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr;

bool _eos = false;
vectorized::SpillStreamSPtr _spilling_stream;
std::shared_ptr<Dependency> _finish_dependency;
};
Expand Down
11 changes: 10 additions & 1 deletion be/src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <utility>

#include "pipeline/exec/operator.h"
#include "pipeline/pipeline_task.h"

namespace doris::pipeline {

Expand Down Expand Up @@ -99,4 +100,12 @@ Status Pipeline::set_sink(DataSinkOperatorXPtr& sink) {
return Status::OK();
}

} // namespace doris::pipeline
void Pipeline::make_all_runnable() {
for (auto* task : _tasks) {
if (task) {
task->clear_blocking_state(true);
}
}
}

} // namespace doris::pipeline
17 changes: 16 additions & 1 deletion be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
std::weak_ptr<PipelineFragmentContext> context)
: _pipeline_id(pipeline_id), _context(std::move(context)), _num_tasks(num_tasks) {
_init_profile();
_tasks.resize(_num_tasks, nullptr);
}

void add_dependency(std::shared_ptr<Pipeline>& pipeline) {
Expand Down Expand Up @@ -155,14 +156,24 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
void set_children(std::shared_ptr<Pipeline> child) { _children.push_back(child); }
void set_children(std::vector<std::shared_ptr<Pipeline>> children) { _children = children; }

void incr_created_tasks() { _num_tasks_created++; }
void incr_created_tasks(int i, PipelineTask* task) {
_num_tasks_created++;
_num_tasks_running++;
DCHECK_LT(i, _tasks.size());
_tasks[i] = task;
}

void make_all_runnable();

void set_num_tasks(int num_tasks) {
_num_tasks = num_tasks;
_tasks.resize(_num_tasks, nullptr);
for (auto& op : operatorXs) {
op->set_parallel_tasks(_num_tasks);
}
}
int num_tasks() const { return _num_tasks; }
bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; }

std::string debug_string() {
fmt::memory_buffer debug_string_buffer;
Expand Down Expand Up @@ -243,6 +254,10 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
int _num_tasks = 1;
// How many tasks are already created?
std::atomic<int> _num_tasks_created = 0;
// How many tasks are already created and not finished?
std::atomic<int> _num_tasks_running = 0;
// Tasks in this pipeline.
std::vector<PipelineTask*> _tasks;
};

} // namespace doris::pipeline
4 changes: 2 additions & 2 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ void PipelineFragmentContext::close_if_prepare_failed(Status /*st*/) {
DCHECK(!task->is_pending_finish());
WARN_IF_ERROR(task->close(Status::OK()),
fmt::format("Query {} closed since prepare failed", print_id(_query_id)));
close_a_pipeline();
close_a_pipeline(task->pipeline_id());
}
}

Expand Down Expand Up @@ -960,7 +960,7 @@ void PipelineFragmentContext::_close_fragment_instance() {
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
}

void PipelineFragmentContext::close_a_pipeline() {
void PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) {
std::lock_guard<std::mutex> l(_task_mutex);
g_pipeline_tasks_count << -1;
++_closed_tasks;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class PipelineFragmentContext : public TaskExecutionContext {

[[nodiscard]] int get_fragment_id() const { return _fragment_id; }

void close_a_pipeline();
virtual void close_a_pipeline(PipelineId pipeline_id);

virtual void clear_finished_tasks() {}

Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ class PipelineTask {

std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); }

PipelineId pipeline_id() const { return _pipeline->id(); }

virtual void clear_blocking_state(bool wake_up_by_downstream = false) {}

protected:
void _finish_p_dependency() {
for (const auto& p : _pipeline->_parents) {
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ class PipelineXSinkLocalStateBase {
// Set to true after close() has been called. subclasses should check and set this in
// close().
bool _closed = false;
std::atomic<bool> _eos = false;
//NOTICE: now add a faker profile, because sometimes the profile record is useless
//so we want remove some counters and timers, eg: in join node, if it's broadcast_join
//and shared hash table, some counter/timer about build hash table is useless,
Expand Down
20 changes: 18 additions & 2 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ PipelineXFragmentContext::~PipelineXFragmentContext() {
} else {
_call_back(nullptr, &st);
}
_dag.clear();
_pip_id_to_pipeline.clear();
_runtime_state.reset();
_runtime_filter_states.clear();
_runtime_filter_mgr_map.clear();
Expand Down Expand Up @@ -525,6 +527,7 @@ Status PipelineXFragmentContext::_build_pipeline_x_tasks(
_task_runtime_states.resize(_pipelines.size());
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
_task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks());
_pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
}
auto& pipeline_id_to_profile = _runtime_state->pipeline_id_to_profile();
DCHECK(pipeline_id_to_profile.empty());
Expand Down Expand Up @@ -638,6 +641,7 @@ Status PipelineXFragmentContext::_build_pipeline_x_tasks(
task_runtime_state.get(), ctx,
pipeline_id_to_profile[pip_idx].get(),
get_local_exchange_state(pipeline), i);
pipeline->incr_created_tasks(i, task.get());
pipeline_id_to_task.insert({pipeline->id(), task.get()});
_tasks[i].emplace_back(std::move(task));
}
Expand Down Expand Up @@ -737,7 +741,6 @@ Status PipelineXFragmentContext::_build_pipeline_x_tasks(
}
}
_pipeline_parent_map.clear();
_dag.clear();
_op_id_to_le_state.clear();

return Status::OK();
Expand Down Expand Up @@ -1498,7 +1501,7 @@ void PipelineXFragmentContext::close_if_prepare_failed(Status st) {
for (auto& t : task) {
DCHECK(!t->is_pending_finish());
WARN_IF_ERROR(t->close(Status::OK()), "close_if_prepare_failed failed: ");
close_a_pipeline();
close_a_pipeline(t->pipeline_id());
}
}
_query_ctx->cancel(st.to_string(), st, _fragment_id);
Expand Down Expand Up @@ -1587,4 +1590,17 @@ std::string PipelineXFragmentContext::debug_string() {

return fmt::to_string(debug_string_buffer);
}

void PipelineXFragmentContext::close_a_pipeline(PipelineId pipeline_id) {
// If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
if (_dag.contains(pipeline_id)) {
for (auto dep : _dag[pipeline_id]) {
_pip_id_to_pipeline[dep]->make_all_runnable();
}
}
}
PipelineXFragmentContext::close_a_pipeline(pipeline_id);
}
} // namespace doris::pipeline
3 changes: 3 additions & 0 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ class PipelineXFragmentContext : public PipelineFragmentContext {

std::string debug_string() override;

void close_a_pipeline(PipelineId pipeline_id) override;

private:
void _close_fragment_instance() override;
Status _build_pipeline_x_tasks(const doris::TPipelineFragmentParams& request,
Expand Down Expand Up @@ -222,6 +224,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext {
std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>>
_op_id_to_le_state;

std::map<PipelineId, Pipeline*> _pip_id_to_pipeline;
// UniqueId -> runtime mgr
std::map<UniqueId, std::unique_ptr<RuntimeFilterMgr>> _runtime_filter_mgr_map;

Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ PipelineXTask::PipelineXTask(
if (shared_state) {
_sink_shared_state = shared_state;
}
pipeline->incr_created_tasks();
}

Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink,
Expand Down Expand Up @@ -248,7 +247,7 @@ Status PipelineXTask::execute(bool* eos) {
cpu_qs->add_cpu_nanos(delta_cpu_time);
}
}};
*eos = _sink->is_finished(_state);
*eos = _sink->is_finished(_state) || _wake_up_by_downstream || is_final_state(_cur_state);
if (*eos) {
return Status::OK();
}
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/pipeline_x/pipeline_x_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ class PipelineXTask : public PipelineTask {

int task_id() const { return _index; };

void clear_blocking_state() {
void clear_blocking_state(bool wake_up_by_downstream = false) override {
_wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream;
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
// We use a lock to assure all dependencies are not deconstructed here.
std::unique_lock<std::mutex> lc(_dependency_lock);
Expand Down Expand Up @@ -252,6 +253,7 @@ class PipelineXTask : public PipelineTask {

std::atomic<bool> _finished {false};
std::mutex _dependency_lock;
std::atomic<bool> _wake_up_by_downstream = false;
};

} // namespace doris::pipeline
2 changes: 1 addition & 1 deletion be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ void _close_task(PipelineTask* task, PipelineTaskState state, Status exec_status
task->set_close_pipeline_time();
task->finalize();
task->set_running(false);
task->fragment_context()->close_a_pipeline();
task->fragment_context()->close_a_pipeline(task->pipeline_id());
}

void TaskScheduler::_do_work(size_t index) {
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters(
}
*local_merge_filters = &iter->second;
DCHECK(!iter->second.filters.empty());
DCHECK_GT(iter->second.merge_time, 0);
return Status::OK();
}

Expand Down

0 comments on commit c2436f8

Please sign in to comment.