diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h index 95d50642448771..ae206615bd7130 100644 --- a/be/src/exprs/bloom_filter_func.h +++ b/be/src/exprs/bloom_filter_func.h @@ -157,19 +157,19 @@ class BloomFilterFuncBase : public RuntimeFilterFuncBase { } Status merge(BloomFilterFuncBase* bloomfilter_func) { + DCHECK(bloomfilter_func != nullptr); + DCHECK(bloomfilter_func->_bloom_filter != nullptr); // If `_inited` is false, there is no memory allocated in bloom filter and this is the first // call for `merge` function. So we just reuse this bloom filter, and we don't need to // allocate memory again. if (!_inited) { auto* other_func = static_cast(bloomfilter_func); DCHECK(_bloom_filter == nullptr); - DCHECK(bloomfilter_func != nullptr); _bloom_filter = bloomfilter_func->_bloom_filter; _bloom_filter_alloced = other_func->_bloom_filter_alloced; _inited = true; return Status::OK(); } - DCHECK(bloomfilter_func != nullptr); auto* other_func = static_cast(bloomfilter_func); if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) { return Status::InternalError( diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index c3404a15355c2f..b9bf837cf56bb3 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -472,10 +472,10 @@ class RuntimePredicateWrapper { const TExpr& probe_expr); Status merge(const RuntimePredicateWrapper* wrapper) { - if (is_ignored() || wrapper->is_ignored()) { - _context->ignored = true; + if (wrapper->is_ignored()) { return Status::OK(); } + _context->ignored = false; bool can_not_merge_in_or_bloom = _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER && @@ -493,7 +493,10 @@ class RuntimePredicateWrapper { switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { - // try insert set + if (!_context->hybrid_set) { + _context->ignored = true; + return Status::OK(); + } _context->hybrid_set->insert(wrapper->_context->hybrid_set.get()); if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) { _context->ignored = true; @@ -1307,10 +1310,6 @@ std::string IRuntimeFilter::formatted_state() const { _wrapper->_context->ignored); } -BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const { - return _wrapper->get_bloomfilter(); -} - Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, int node_id, bool build_bf_exactly) { // if node_id == -1 , it shouldn't be a consumer diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index f199e173e84cd4..629e5fa2550fa9 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -197,7 +197,6 @@ class IRuntimeFilter { _is_broadcast_join(true), _has_remote_target(false), _has_local_target(false), - _rf_state(RuntimeFilterState::NOT_READY), _rf_state_atomic(RuntimeFilterState::NOT_READY), _role(RuntimeFilterRole::PRODUCER), _expr_order(-1), @@ -263,8 +262,6 @@ class IRuntimeFilter { Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, int node_id = -1, bool build_bf_exactly = false); - BloomFilterFuncBase* get_bloomfilter() const; - // serialize _wrapper to protobuf Status serialize(PMergeFilterRequest* request, void** data, int* len); Status serialize(PPublishFilterRequest* request, void** data = nullptr, int* len = nullptr); @@ -365,9 +362,6 @@ class IRuntimeFilter { void to_protobuf(PInFilter* filter); void to_protobuf(PMinMaxFilter* filter); - template - Status _update_filter(const T* param); - template Status serialize_impl(T* request, void** data, int* len); @@ -397,7 +391,6 @@ class IRuntimeFilter { // will apply to local node bool _has_local_target; // filter is ready for consumer - RuntimeFilterState _rf_state; std::atomic _rf_state_atomic; // role consumer or producer RuntimeFilterRole _role; diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index c0a249cd6b063d..d330d327149fd2 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -98,12 +98,16 @@ class VRuntimeFilterSlots { return Status::OK(); } + Status ignore_all_filters() { + for (auto filter : _runtime_filters) { + filter->set_ignored(); + } + return Status::OK(); + } + Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) { // process IN_OR_BLOOM_FILTER's real type for (auto filter : _runtime_filters) { - if (filter->get_ignored()) { - continue; - } if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER && get_real_size(filter.get(), local_hash_table_size) > state->runtime_filter_max_in_num()) { @@ -141,7 +145,7 @@ class VRuntimeFilterSlots { } // publish runtime filter - Status publish(bool publish_local = false) { + Status publish(bool publish_local) { for (auto& pair : _runtime_filters_map) { for (auto& filter : pair.second) { RETURN_IF_ERROR(filter->publish(publish_local)); diff --git a/be/src/pipeline/common/runtime_filter_consumer.cpp b/be/src/pipeline/common/runtime_filter_consumer.cpp index 817c76a79af47c..29279824964e68 100644 --- a/be/src/pipeline/common/runtime_filter_consumer.cpp +++ b/be/src/pipeline/common/runtime_filter_consumer.cpp @@ -76,7 +76,6 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency( auto runtime_filter = _runtime_filter_ctxs[i].runtime_filter; runtime_filter_dependencies[i] = std::make_shared( id, node_id, name, runtime_filter.get()); - _runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get(); runtime_filter_timers[i] = std::make_shared( runtime_filter->registration_time(), runtime_filter->wait_time_ms(), runtime_filter_dependencies[i]); diff --git a/be/src/pipeline/common/runtime_filter_consumer.h b/be/src/pipeline/common/runtime_filter_consumer.h index 03868355875454..c1e5ea91bc8a2c 100644 --- a/be/src/pipeline/common/runtime_filter_consumer.h +++ b/be/src/pipeline/common/runtime_filter_consumer.h @@ -61,7 +61,6 @@ class RuntimeFilterConsumer { // set to true if this runtime filter is already applied to vconjunct_ctx_ptr bool apply_mark = false; std::shared_ptr runtime_filter; - pipeline::RuntimeFilterDependency* runtime_filter_dependency = nullptr; }; std::vector _runtime_filter_ctxs; diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 8f7a9d1c35ae38..ee305a877f55e1 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -80,17 +80,16 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX::required_data_distribution(); } bool require_data_distribution() const override { return true; } bool require_shuffled_data_distribution() const override { - return !_partition_by_eq_expr_ctxs.empty() && _order_by_eq_expr_ctxs.empty(); + return !_partition_by_eq_expr_ctxs.empty(); } private: diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 8f7b176a979a4d..5be3fcad112db5 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -22,6 +22,7 @@ #include "exprs/bloom_filter_func.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/operator.h" +#include "pipeline/pipeline_task.h" #include "vec/data_types/data_type_nullable.h" #include "vec/utils/template_helpers.hpp" @@ -111,6 +112,9 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) { } Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) { + if (_closed) { + return Status::OK(); + } auto p = _parent->cast(); Defer defer {[&]() { if (_should_build_hash_table && p._shared_hashtable_controller) { @@ -119,25 +123,30 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu }}; if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) { - return Status::OK(); + return Base::close(state, exec_status); } - auto* block = _shared_state->build_block.get(); - uint64_t hash_table_size = block ? block->rows() : 0; - { - SCOPED_TIMER(_runtime_filter_init_timer); - if (_should_build_hash_table) { - RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size)); + + if (state->get_task()->wake_up_by_downstream()) { + RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency)); + RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters()); + } else { + auto* block = _shared_state->build_block.get(); + uint64_t hash_table_size = block ? block->rows() : 0; + { + SCOPED_TIMER(_runtime_filter_init_timer); + if (_should_build_hash_table) { + RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size)); + } + RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state)); + } + if (_should_build_hash_table && hash_table_size > 1) { + SCOPED_TIMER(_runtime_filter_compute_timer); + _runtime_filter_slots->insert(block); } - RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state)); - } - if (_should_build_hash_table && hash_table_size > 1) { - SCOPED_TIMER(_runtime_filter_compute_timer); - _runtime_filter_slots->insert(block); } - SCOPED_TIMER(_publish_runtime_filter_timer); RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table)); - return Status::OK(); + return Base::close(state, exec_status); } bool HashJoinBuildSinkLocalState::build_unique() const { @@ -504,6 +513,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + local_state._eos = eos; if (local_state._should_build_hash_table) { // If eos or have already met a null value using short-circuit strategy, we do not need to pull // data from probe side. @@ -555,6 +565,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state.process_build_block(state, (*local_state._shared_state->build_block))); if (_shared_hashtable_controller) { _shared_hash_table_context->status = Status::OK(); + _shared_hash_table_context->complete_build_stage = true; // arena will be shared with other instances. _shared_hash_table_context->arena = local_state._shared_state->arena; _shared_hash_table_context->hash_table_variants = @@ -567,7 +578,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context); _shared_hashtable_controller->signal(node_id()); } - } else if (!local_state._should_build_hash_table) { + } else if (!local_state._should_build_hash_table && + _shared_hash_table_context->complete_build_stage) { DCHECK(_shared_hashtable_controller != nullptr); DCHECK(_shared_hash_table_context != nullptr); // the instance which is not build hash table, it's should wait the signal of hash table build finished. diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h b/be/src/pipeline/exec/multi_cast_data_stream_sink.h index 1a9787789dde02..57b5974064b6a2 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h @@ -42,15 +42,15 @@ class MultiCastDataStreamSinkOperatorX final using Base = DataSinkOperatorX; public: - MultiCastDataStreamSinkOperatorX(int sink_id, std::vector& sources, - const int cast_sender_count, ObjectPool* pool, + MultiCastDataStreamSinkOperatorX(int sink_id, std::vector& sources, ObjectPool* pool, const TMultiCastDataStreamSink& sink, const RowDescriptor& row_desc) : Base(sink_id, -1, sources), _pool(pool), _row_desc(row_desc), - _cast_sender_count(cast_sender_count), - _sink(sink) {} + _cast_sender_count(sources.size()), + _sink(sink), + _num_dests(sources.size()) {} ~MultiCastDataStreamSinkOperatorX() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; @@ -60,14 +60,19 @@ class MultiCastDataStreamSinkOperatorX final std::shared_ptr create_shared_state() const override; const TMultiCastDataStreamSink& sink_node() { return _sink; } + bool count_down_destination() override { + DCHECK_GT(_num_dests, 0); + return _num_dests.fetch_sub(1) == 1; + } private: friend class MultiCastDataStreamSinkLocalState; ObjectPool* _pool; RowDescriptor _row_desc; - const int _cast_sender_count; + const size_t _cast_sender_count; const TMultiCastDataStreamSink& _sink; friend class MultiCastDataStreamSinkLocalState; + std::atomic _num_dests; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 0d0b0c05b366fc..b848aea6e1ecd8 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -359,6 +359,7 @@ class PipelineXSinkLocalStateBase { // Set to true after close() has been called. subclasses should check and set this in // close(). bool _closed = false; + std::atomic _eos = false; //NOTICE: now add a faker profile, because sometimes the profile record is useless //so we want remove some counters and timers, eg: in join node, if it's broadcast_join //and shared hash table, some counter/timer about build hash table is useless, @@ -514,6 +515,8 @@ class DataSinkOperatorXBase : public OperatorBase { virtual bool should_dry_run(RuntimeState* state) { return false; } + [[nodiscard]] virtual bool count_down_destination() { return true; } + protected: template requires(std::is_base_of_v) diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 259d7580877493..6b3a74c83df97c 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -259,7 +259,6 @@ class PartitionedAggSinkLocalState std::unique_ptr _runtime_state; - bool _eos = false; std::shared_ptr _finish_dependency; // temp structures during spilling diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index e74b5d2a41401a..2c820d9fa09daf 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -54,7 +54,6 @@ class SpillSortSinkLocalState : public PipelineXSpillSinkLocalState _finish_dependency; }; diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index 19c37f3649bcc7..d87113ca80a959 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -41,6 +41,7 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")"; _type = type; if (_type == ExchangeType::HASH_SHUFFLE) { + _use_global_shuffle = should_disable_bucket_shuffle; // For shuffle join, if data distribution has been broken by previous operator, we // should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned, // we should use map shuffle idx to instance idx because all instances will be @@ -84,6 +85,11 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo SCOPED_TIMER(_init_timer); _compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime"); _distribute_timer = ADD_TIMER(profile(), "DistributeDataTime"); + if (_parent->cast()._type == ExchangeType::HASH_SHUFFLE) { + _profile->add_info_string( + "UseGlobalShuffle", + std::to_string(_parent->cast()._use_global_shuffle)); + } _channel_id = info.task_idx; return Status::OK(); } @@ -104,30 +110,16 @@ Status LocalExchangeSinkLocalState::open(RuntimeState* state) { return Status::OK(); } -Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { - if (_closed) { - return Status::OK(); - } - RETURN_IF_ERROR(Base::close(state, exec_status)); - if (exec_status.ok()) { - DCHECK(_release_count) << "Do not finish correctly! " << debug_string(0) - << " state: { cancel = " << state->is_cancelled() << ", " - << state->cancel_reason().to_string() << "} query ctx: { cancel = " - << state->get_query_ctx()->is_cancelled() << ", " - << state->get_query_ctx()->exec_status().to_string() << "}"; - } - return Status::OK(); -} - std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, - "{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, " - "_running_sink_operators: {}, _running_source_operators: {}, _release_count: {}", - Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions, - _exchanger->_num_senders, _exchanger->_num_sources, - _exchanger->_running_sink_operators, _exchanger->_running_source_operators, - _release_count); + "{}, _use_global_shuffle: {}, _channel_id: {}, _num_partitions: {}, " + "_num_senders: {}, _num_sources: {}, " + "_running_sink_operators: {}, _running_source_operators: {}", + Base::debug_string(indentation_level), + _parent->cast()._use_global_shuffle, _channel_id, + _exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources, + _exchanger->_running_sink_operators, _exchanger->_running_source_operators); return fmt::to_string(debug_string_buffer); } @@ -140,13 +132,11 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* // If all exchange sources ended due to limit reached, current task should also finish if (local_state._exchanger->_running_source_operators == 0) { - local_state._release_count = true; local_state._shared_state->sub_running_sink_operators(); return Status::EndOfFile("receiver eof"); } if (eos) { local_state._shared_state->sub_running_sink_operators(); - local_state._release_count = true; } return Status::OK(); diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h index 7a98840b4b323e..1cd9736d4291d6 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h @@ -43,7 +43,6 @@ class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState dependencies() const override; @@ -69,7 +68,6 @@ class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState _partitioner; const std::map _bucket_seq_to_instance_idx; std::vector> _shuffle_idx_to_instance_idx; + bool _use_global_shuffle = false; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 74e15d7cc93ea1..6e83c7805e46fc 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -22,6 +22,7 @@ #include #include "pipeline/exec/operator.h" +#include "pipeline/pipeline_task.h" namespace doris::pipeline { @@ -65,4 +66,14 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) { return Status::OK(); } -} // namespace doris::pipeline \ No newline at end of file +void Pipeline::make_all_runnable() { + if (_sink->count_down_destination()) { + for (auto* task : _tasks) { + if (task) { + task->clear_blocking_state(true); + } + } + } +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index dfeb53ae006116..8a20ccb631cc47 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -47,6 +47,7 @@ class Pipeline : public std::enable_shared_from_this { std::weak_ptr context) : _pipeline_id(pipeline_id), _num_tasks(num_tasks) { _init_profile(); + _tasks.resize(_num_tasks, nullptr); } // Add operators for pipelineX @@ -104,14 +105,24 @@ class Pipeline : public std::enable_shared_from_this { void set_children(std::shared_ptr child) { _children.push_back(child); } void set_children(std::vector> children) { _children = children; } - void incr_created_tasks() { _num_tasks_created++; } + void incr_created_tasks(int i, PipelineTask* task) { + _num_tasks_created++; + _num_tasks_running++; + DCHECK_LT(i, _tasks.size()); + _tasks[i] = task; + } + + void make_all_runnable(); + void set_num_tasks(int num_tasks) { _num_tasks = num_tasks; + _tasks.resize(_num_tasks, nullptr); for (auto& op : _operators) { op->set_parallel_tasks(_num_tasks); } } int num_tasks() const { return _num_tasks; } + bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; } std::string debug_string() { fmt::memory_buffer debug_string_buffer; @@ -158,6 +169,10 @@ class Pipeline : public std::enable_shared_from_this { int _num_tasks = 1; // How many tasks are already created? std::atomic _num_tasks_created = 0; + // How many tasks are already created and not finished? + std::atomic _num_tasks_running = 0; + // Tasks in this pipeline. + std::vector _tasks; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 7538e580c3ee4f..8c998ab8c2fe68 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -142,6 +142,8 @@ PipelineFragmentContext::~PipelineFragmentContext() { runtime_state.reset(); } } + _dag.clear(); + _pip_id_to_pipeline.clear(); _pipelines.clear(); _sink.reset(); _root_op.reset(); @@ -368,6 +370,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag _task_runtime_states.resize(_pipelines.size()); for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { _task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks()); + _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get(); } auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size()); @@ -473,6 +476,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag task_runtime_state.get(), this, pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline), i); + pipeline->incr_created_tasks(i, task.get()); task_runtime_state->set_task(task.get()); pipeline_id_to_task.insert({pipeline->id(), task.get()}); _tasks[i].emplace_back(std::move(task)); @@ -573,7 +577,6 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag } } _pipeline_parent_map.clear(); - _dag.clear(); _op_id_to_le_state.clear(); return Status::OK(); @@ -1151,8 +1154,7 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS } _sink.reset(new MultiCastDataStreamSinkOperatorX( - sink_id, sources, thrift_sink.multi_cast_stream_sink.sinks.size(), pool, - thrift_sink.multi_cast_stream_sink, row_desc)); + sink_id, sources, pool, thrift_sink.multi_cast_stream_sink, row_desc)); for (int i = 0; i < sender_size; ++i) { auto new_pipeline = add_pipeline(); RowDescriptor* _row_desc = nullptr; @@ -1711,7 +1713,16 @@ void PipelineFragmentContext::_close_fragment_instance() { std::dynamic_pointer_cast(shared_from_this())); } -void PipelineFragmentContext::close_a_pipeline() { +void PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) { + // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here + DCHECK(_pip_id_to_pipeline.contains(pipeline_id)); + if (_pip_id_to_pipeline[pipeline_id]->close_task()) { + if (_dag.contains(pipeline_id)) { + for (auto dep : _dag[pipeline_id]) { + _pip_id_to_pipeline[dep]->make_all_runnable(); + } + } + } std::lock_guard l(_task_mutex); g_pipeline_tasks_count << -1; ++_closed_tasks; diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index f95eb03fb12d48..b20c324756c095 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -102,7 +102,7 @@ class PipelineFragmentContext : public TaskExecutionContext { [[nodiscard]] int get_fragment_id() const { return _fragment_id; } - void close_a_pipeline(); + void close_a_pipeline(PipelineId pipeline_id); Status send_report(bool); @@ -295,6 +295,7 @@ class PipelineFragmentContext : public TaskExecutionContext { std::map, std::shared_ptr>> _op_id_to_le_state; + std::map _pip_id_to_pipeline; // UniqueId -> runtime mgr std::map> _runtime_filter_mgr_map; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 4f362ac5042e8f..e06b8028c9c730 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -71,7 +71,6 @@ PipelineTask::PipelineTask( if (shared_state) { _sink_shared_state = shared_state; } - pipeline->incr_created_tasks(); } Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink, @@ -228,6 +227,9 @@ bool PipelineTask::_wait_to_start() { _blocked_dep = _execution_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { static_cast(_blocked_dep)->start_watcher(); + if (_wake_up_by_downstream) { + _eos = true; + } return true; } @@ -235,6 +237,9 @@ bool PipelineTask::_wait_to_start() { _blocked_dep = op_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); + if (_wake_up_by_downstream) { + _eos = true; + } return true; } } @@ -250,6 +255,9 @@ bool PipelineTask::_is_blocked() { _blocked_dep = dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); + if (_wake_up_by_downstream) { + _eos = true; + } return true; } } @@ -269,6 +277,9 @@ bool PipelineTask::_is_blocked() { _blocked_dep = op_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); + if (_wake_up_by_downstream) { + _eos = true; + } return true; } } @@ -279,7 +290,7 @@ Status PipelineTask::execute(bool* eos) { SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_TIMER(_exec_timer); SCOPED_ATTACH_TASK(_state); - _eos = _sink->is_finished(_state) || _eos; + _eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream; *eos = _eos; if (_eos) { // If task is waken up by finish dependency, `_eos` is set to true by last execution, and we should return here. @@ -307,6 +318,11 @@ Status PipelineTask::execute(bool* eos) { if (_wait_to_start()) { return Status::OK(); } + if (_wake_up_by_downstream) { + _eos = true; + *eos = true; + return Status::OK(); + } // The status must be runnable if (!_opened && !_fragment_context->is_canceled()) { RETURN_IF_ERROR(_open()); @@ -316,6 +332,11 @@ Status PipelineTask::execute(bool* eos) { if (_is_blocked()) { return Status::OK(); } + if (_wake_up_by_downstream) { + _eos = true; + *eos = true; + return Status::OK(); + } /// When a task is cancelled, /// its blocking state will be cleared and it will transition to a ready state (though it is not truly ready). @@ -482,9 +503,10 @@ std::string PipelineTask::debug_string() { auto elapsed = _fragment_context->elapsed_time() / 1000000000.0; fmt::format_to(debug_string_buffer, "PipelineTask[this = {}, id = {}, open = {}, eos = {}, finish = {}, dry run = " - "{}, elapse time " - "= {}s], block dependency = {}, is running = {}\noperators: ", + "{}, elapse time = {}s, _wake_up_by_downstream = {}], block dependency = {}, is " + "running = {}\noperators: ", (void*)this, _index, _opened, _eos, _finalized, _dry_run, elapsed, + _wake_up_by_downstream.load(), cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : "NULL", is_running()); for (size_t i = 0; i < _operators.size(); i++) { diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index dd2ead4b5dcc91..223420ea55aff1 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -135,10 +135,11 @@ class PipelineTask { int task_id() const { return _index; }; bool is_finalized() const { return _finalized; } - void clear_blocking_state() { + void clear_blocking_state(bool wake_up_by_downstream = false) { _state->get_query_ctx()->get_execution_dependency()->set_always_ready(); // We use a lock to assure all dependencies are not deconstructed here. std::unique_lock lc(_dependency_lock); + _wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream; if (!_finalized) { _execution_dep->set_always_ready(); for (auto* dep : _filter_dependencies) { @@ -231,6 +232,10 @@ class PipelineTask { } } + PipelineId pipeline_id() const { return _pipeline->id(); } + + bool wake_up_by_downstream() const { return _wake_up_by_downstream; } + private: friend class RuntimeFilterDependency; bool _is_blocked(); @@ -306,11 +311,12 @@ class PipelineTask { Dependency* _execution_dep = nullptr; - std::atomic _finalized {false}; + std::atomic _finalized = false; std::mutex _dependency_lock; - std::atomic _running {false}; - std::atomic _eos {false}; + std::atomic _running = false; + std::atomic _eos = false; + std::atomic _wake_up_by_downstream = false; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 8be30773ee11f1..475d3a8065f8b4 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -94,7 +94,7 @@ void _close_task(PipelineTask* task, Status exec_status) { } task->finalize(); task->set_running(false); - task->fragment_context()->close_a_pipeline(); + task->fragment_context()->close_a_pipeline(task->pipeline_id()); } void TaskScheduler::_do_work(size_t index) { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 50e71d28946dac..577b7ffa157f3c 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -613,11 +613,15 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, } Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* request) { - std::lock_guard lock(_lock); - TUniqueId query_id; - query_id.__set_hi(request->query_id().hi()); - query_id.__set_lo(request->query_id().lo()); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { + std::shared_ptr q_ctx = nullptr; + { + std::lock_guard lock(_lock); + TUniqueId query_id; + query_id.__set_hi(request->query_id().hi()); + query_id.__set_lo(request->query_id().lo()); + q_ctx = _get_or_erase_query_ctx(query_id); + } + if (q_ctx) { q_ctx->set_ready_to_execute(Status::OK()); } else { return Status::InternalError( diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index d2b55d86bc6bd4..84dfdd8e1d2f75 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -130,6 +130,7 @@ Status RuntimeFilterMgr::register_local_merge_producer_filter( RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, RuntimeFilterRole::PRODUCER, -1, &merge_filter, build_bf_exactly, true)); + merge_filter->set_ignored(); iter->second.filters.emplace_back(merge_filter); } iter->second.merge_time++; @@ -151,7 +152,6 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters( } *local_merge_filters = &iter->second; DCHECK(!iter->second.filters.empty()); - DCHECK_GT(iter->second.merge_time, 0); return Status::OK(); } @@ -236,6 +236,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( auto filter_id = runtime_filter_desc->filter_id; RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options, -1, false)); + cnt_val->filter->set_ignored(); _filter_map.emplace(filter_id, cnt_val); return Status::OK(); } @@ -254,6 +255,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( cnt_val->filter = cnt_val->pool->add(new IRuntimeFilter(_state, runtime_filter_desc)); auto filter_id = runtime_filter_desc->filter_id; RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options)); + cnt_val->filter->set_ignored(); std::unique_lock guard(_filter_map_mutex); _filter_map.emplace(filter_id, cnt_val); diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index b04b1cdba064b9..1ca7325e8cbbac 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -66,6 +66,7 @@ struct SharedHashTableContext { std::map runtime_filters; std::atomic signaled = false; bool short_circuit_for_null_in_probe_side = false; + std::atomic complete_build_stage = false; }; using SharedHashTableContextPtr = std::shared_ptr; diff --git a/be/test/exprs/runtime_filter_test.cpp b/be/test/exprs/runtime_filter_test.cpp index cfcbaae4a4e6aa..0476104c2e1d64 100644 --- a/be/test/exprs/runtime_filter_test.cpp +++ b/be/test/exprs/runtime_filter_test.cpp @@ -105,11 +105,6 @@ std::shared_ptr create_runtime_filter(TRuntimeFilterType::type t EXPECT_TRUE(status.ok()) << status.to_string(); - if (auto bf = runtime_filter->get_bloomfilter()) { - status = bf->init_with_fixed_length(); - EXPECT_TRUE(status.ok()) << status.to_string(); - } - return status.ok() ? runtime_filter : nullptr; }