From 63938c412cf81ad40b1717a844bd1755f93beab7 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 10 Oct 2024 15:32:22 +0800 Subject: [PATCH] [fix](followup) Stop tasks if waken by downstream tasks --- be/src/exprs/runtime_filter.cpp | 4 ---- be/src/exprs/runtime_filter.h | 7 ------- .../common/runtime_filter_consumer.cpp | 1 - .../pipeline/common/runtime_filter_consumer.h | 1 - be/src/pipeline/exec/hashjoin_build_sink.cpp | 4 +++- .../exec/multi_cast_data_stream_sink.h | 15 +++++++++----- be/src/pipeline/exec/operator.h | 2 ++ be/src/pipeline/pipeline.cpp | 8 +++++--- be/src/pipeline/pipeline_fragment_context.cpp | 18 ++++++++++------- be/src/pipeline/pipeline_fragment_context.h | 2 ++ be/src/pipeline/pipeline_task.cpp | 20 ++++++++++++------- be/src/pipeline/pipeline_task.h | 2 +- .../runtime/shared_hash_table_controller.h | 1 + 13 files changed, 48 insertions(+), 37 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 8b3f4b197d08cd4..fee7363df29f6be 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1307,10 +1307,6 @@ std::string IRuntimeFilter::formatted_state() const { _wrapper->_context->ignored); } -BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const { - return _wrapper->get_bloomfilter(); -} - Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, int node_id, bool build_bf_exactly) { // if node_id == -1 , it shouldn't be a consumer diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index c4a38517ab4ba04..4a146fbba81bfdd 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -198,7 +198,6 @@ class IRuntimeFilter { _is_broadcast_join(true), _has_remote_target(false), _has_local_target(false), - _rf_state(RuntimeFilterState::NOT_READY), _rf_state_atomic(RuntimeFilterState::NOT_READY), _role(RuntimeFilterRole::PRODUCER), _expr_order(-1), @@ -264,8 +263,6 @@ class IRuntimeFilter { Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, int node_id = -1, bool build_bf_exactly = false); - BloomFilterFuncBase* get_bloomfilter() const; - // serialize _wrapper to protobuf Status serialize(PMergeFilterRequest* request, void** data, int* len); Status serialize(PPublishFilterRequest* request, void** data = nullptr, int* len = nullptr); @@ -366,9 +363,6 @@ class IRuntimeFilter { void to_protobuf(PInFilter* filter); void to_protobuf(PMinMaxFilter* filter); - template - Status _update_filter(const T* param); - template Status serialize_impl(T* request, void** data, int* len); @@ -398,7 +392,6 @@ class IRuntimeFilter { // will apply to local node bool _has_local_target; // filter is ready for consumer - RuntimeFilterState _rf_state; std::atomic _rf_state_atomic; // role consumer or producer RuntimeFilterRole _role; diff --git a/be/src/pipeline/common/runtime_filter_consumer.cpp b/be/src/pipeline/common/runtime_filter_consumer.cpp index 817c76a79af47c3..29279824964e686 100644 --- a/be/src/pipeline/common/runtime_filter_consumer.cpp +++ b/be/src/pipeline/common/runtime_filter_consumer.cpp @@ -76,7 +76,6 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency( auto runtime_filter = _runtime_filter_ctxs[i].runtime_filter; runtime_filter_dependencies[i] = std::make_shared( id, node_id, name, runtime_filter.get()); - _runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get(); runtime_filter_timers[i] = std::make_shared( runtime_filter->registration_time(), runtime_filter->wait_time_ms(), runtime_filter_dependencies[i]); diff --git a/be/src/pipeline/common/runtime_filter_consumer.h b/be/src/pipeline/common/runtime_filter_consumer.h index 03868355875454f..c1e5ea91bc8a2cd 100644 --- a/be/src/pipeline/common/runtime_filter_consumer.h +++ b/be/src/pipeline/common/runtime_filter_consumer.h @@ -61,7 +61,6 @@ class RuntimeFilterConsumer { // set to true if this runtime filter is already applied to vconjunct_ctx_ptr bool apply_mark = false; std::shared_ptr runtime_filter; - pipeline::RuntimeFilterDependency* runtime_filter_dependency = nullptr; }; std::vector _runtime_filter_ctxs; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 93d22850dfcbb1f..78a068d15df2f8a 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -590,6 +590,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state.process_build_block(state, (*local_state._shared_state->build_block))); if (_shared_hashtable_controller) { _shared_hash_table_context->status = Status::OK(); + _shared_hash_table_context->complete_build_stage = true; // arena will be shared with other instances. _shared_hash_table_context->arena = local_state._shared_state->arena; _shared_hash_table_context->hash_table_variants = @@ -601,7 +602,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._shared_state->build_indexes_null; local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context); } - } else if (!local_state._should_build_hash_table) { + } else if (!local_state._should_build_hash_table && + _shared_hash_table_context->complete_build_stage) { DCHECK(_shared_hashtable_controller != nullptr); DCHECK(_shared_hash_table_context != nullptr); // the instance which is not build hash table, it's should wait the signal of hash table build finished. diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h b/be/src/pipeline/exec/multi_cast_data_stream_sink.h index 1a9787789dde027..57b5974064b6a2e 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h @@ -42,15 +42,15 @@ class MultiCastDataStreamSinkOperatorX final using Base = DataSinkOperatorX; public: - MultiCastDataStreamSinkOperatorX(int sink_id, std::vector& sources, - const int cast_sender_count, ObjectPool* pool, + MultiCastDataStreamSinkOperatorX(int sink_id, std::vector& sources, ObjectPool* pool, const TMultiCastDataStreamSink& sink, const RowDescriptor& row_desc) : Base(sink_id, -1, sources), _pool(pool), _row_desc(row_desc), - _cast_sender_count(cast_sender_count), - _sink(sink) {} + _cast_sender_count(sources.size()), + _sink(sink), + _num_dests(sources.size()) {} ~MultiCastDataStreamSinkOperatorX() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; @@ -60,14 +60,19 @@ class MultiCastDataStreamSinkOperatorX final std::shared_ptr create_shared_state() const override; const TMultiCastDataStreamSink& sink_node() { return _sink; } + bool count_down_destination() override { + DCHECK_GT(_num_dests, 0); + return _num_dests.fetch_sub(1) == 1; + } private: friend class MultiCastDataStreamSinkLocalState; ObjectPool* _pool; RowDescriptor _row_desc; - const int _cast_sender_count; + const size_t _cast_sender_count; const TMultiCastDataStreamSink& _sink; friend class MultiCastDataStreamSinkLocalState; + std::atomic _num_dests; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 5cb83528e040d92..b848aea6e1ecd81 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -515,6 +515,8 @@ class DataSinkOperatorXBase : public OperatorBase { virtual bool should_dry_run(RuntimeState* state) { return false; } + [[nodiscard]] virtual bool count_down_destination() { return true; } + protected: template requires(std::is_base_of_v) diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index cef02d6374b9dc2..6e83c7805e46fca 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -67,9 +67,11 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) { } void Pipeline::make_all_runnable() { - for (auto* task : _tasks) { - if (task) { - task->clear_blocking_state(true); + if (_sink->count_down_destination()) { + for (auto* task : _tasks) { + if (task) { + task->clear_blocking_state(true); + } } } } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 2d283652945528a..9c444b69cf85e50 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1142,8 +1142,7 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS } _sink.reset(new MultiCastDataStreamSinkOperatorX( - sink_id, sources, cast_set(thrift_sink.multi_cast_stream_sink.sinks.size()), - pool, thrift_sink.multi_cast_stream_sink, row_desc)); + sink_id, sources, pool, thrift_sink.multi_cast_stream_sink, row_desc)); for (int i = 0; i < sender_size; ++i) { auto new_pipeline = add_pipeline(); RowDescriptor* _row_desc = nullptr; @@ -1759,15 +1758,20 @@ void PipelineFragmentContext::_close_fragment_instance() { std::dynamic_pointer_cast(shared_from_this())); } +void PipelineFragmentContext::_trigger_tasks_recursively(PipelineId pipeline_id) { + if (_dag.contains(pipeline_id)) { + for (auto dep : _dag[pipeline_id]) { + _pip_id_to_pipeline[dep]->make_all_runnable(); + _trigger_tasks_recursively(dep); + } + } +} + void PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) { // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here DCHECK(_pip_id_to_pipeline.contains(pipeline_id)); if (_pip_id_to_pipeline[pipeline_id]->close_task()) { - if (_dag.contains(pipeline_id)) { - for (auto dep : _dag[pipeline_id]) { - _pip_id_to_pipeline[dep]->make_all_runnable(); - } - } + _trigger_tasks_recursively(pipeline_id); } std::lock_guard l(_task_mutex); ++_closed_tasks; diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index bcef1271b6025ad..ad180e7ea834758 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -188,6 +188,8 @@ class PipelineFragmentContext : public TaskExecutionContext { void _close_fragment_instance(); void _init_next_report_time(); + void _trigger_tasks_recursively(PipelineId pipeline_id); + // Id of this query TUniqueId _query_id; int _fragment_id; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 35d09f4850930b1..0421123231da91e 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -227,14 +227,14 @@ bool PipelineTask::_wait_to_start() { _blocked_dep = _execution_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { static_cast(_blocked_dep)->start_watcher(); - return true; + return !_wake_up_by_downstream; } for (auto* op_dep : _filter_dependencies) { _blocked_dep = op_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - return true; + return !_wake_up_by_downstream; } } return false; @@ -249,7 +249,7 @@ bool PipelineTask::_is_blocked() { _blocked_dep = dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - return true; + return !_wake_up_by_downstream; } } // If all dependencies are ready for this operator, we can execute this task if no datum is needed from upstream operators. @@ -268,7 +268,7 @@ bool PipelineTask::_is_blocked() { _blocked_dep = op_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - return true; + return !_wake_up_by_downstream; } } return false; @@ -278,7 +278,7 @@ Status PipelineTask::execute(bool* eos) { SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_TIMER(_exec_timer); SCOPED_ATTACH_TASK(_state); - _eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream; + _eos = _sink->is_finished(_state) || _eos; *eos = _eos; if (_eos) { // If task is waken up by finish dependency, `_eos` is set to true by last execution, and we should return here. @@ -306,6 +306,11 @@ Status PipelineTask::execute(bool* eos) { if (_wait_to_start()) { return Status::OK(); } + if (_wake_up_by_downstream) { + _eos = true; + *eos = true; + return Status::OK(); + } // The status must be runnable if (!_opened && !_fragment_context->is_canceled()) { RETURN_IF_ERROR(_open()); @@ -481,9 +486,10 @@ std::string PipelineTask::debug_string() { auto elapsed = _fragment_context->elapsed_time() / 1000000000.0; fmt::format_to(debug_string_buffer, "PipelineTask[this = {}, id = {}, open = {}, eos = {}, finish = {}, dry run = " - "{}, elapse time " - "= {}s], block dependency = {}, is running = {}\noperators: ", + "{}, elapse time = {}s, _wake_up_by_downstream = {}], block dependency = {}, is " + "running = {}\noperators: ", (void*)this, _index, _opened, _eos, _finalized, _dry_run, elapsed, + _wake_up_by_downstream.load(), cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : "NULL", is_running()); for (size_t i = 0; i < _operators.size(); i++) { diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index e216ef103766fdb..a73393fb2708855 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -90,7 +90,7 @@ class PipelineTask { _blocked_dep = fin_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - return true; + return !_wake_up_by_downstream; } } return false; diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index 173f9d46e890c87..c831d1b46e4e781 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -66,6 +66,7 @@ struct SharedHashTableContext { std::map runtime_filters; std::atomic signaled = false; bool short_circuit_for_null_in_probe_side = false; + std::atomic complete_build_stage = false; }; using SharedHashTableContextPtr = std::shared_ptr;