diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index ad7eb83074c9f0..ae7b40dc20eae8 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -134,6 +134,8 @@ class ExecNode { bool can_read() const { return _can_read; } + [[nodiscard]] virtual bool can_terminate_early() { return false; } + // Sink Data to ExecNode to do some stock work, both need impl with method: get_result // `eos` means source is exhausted, exec node should do some finalize work // Eg: Aggregation, Sort diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index acf55cb7bc465a..38ed45ed89f586 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -199,6 +199,8 @@ class OperatorBase { virtual bool can_write() { return false; } // for sink + [[nodiscard]] virtual bool can_terminate_early() { return false; } + /** * The main method to execute a pipeline task. * Now it is a pull-based pipeline and operators pull data from its child by this method. @@ -321,6 +323,8 @@ class StreamingOperator : public OperatorBase { ~StreamingOperator() override = default; + [[nodiscard]] bool can_terminate_early() override { return _node->can_terminate_early(); } + Status prepare(RuntimeState* state) override { _node->increase_ref(); _use_projection = _node->has_output_row_descriptor(); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 4d0fb49de80ff5..645028a3dc07c3 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -222,11 +222,11 @@ Status PipelineTask::execute(bool* eos) { set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY); return Status::OK(); } - if (!_source->can_read()) { + if (!source_can_read()) { set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); return Status::OK(); } - if (!_sink->can_write()) { + if (!sink_can_write()) { set_state(PipelineTaskState::BLOCKED_FOR_SINK); return Status::OK(); } @@ -234,11 +234,11 @@ Status PipelineTask::execute(bool* eos) { this->set_begin_execute_time(); while (!_fragment_context->is_canceled()) { - if (_data_state != SourceState::MORE_DATA && !_source->can_read()) { + if (_data_state != SourceState::MORE_DATA && !source_can_read()) { set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); break; } - if (!_sink->can_write()) { + if (!sink_can_write()) { set_state(PipelineTaskState::BLOCKED_FOR_SINK); break; } diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 62de1fd2815982..dcd65c469b738b 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -152,13 +152,47 @@ class PipelineTask { return false; } - bool source_can_read() { return _source->can_read(); } + bool source_can_read() { return _source->can_read() || ignore_blocking_source();; } bool runtime_filters_are_ready_or_timeout() { return _source->runtime_filters_are_ready_or_timeout(); } - bool sink_can_write() { return _sink->can_write(); } + bool sink_can_write() { return _sink->can_write() || ignore_blocking_sink(); } + /** + * Consider the query plan below: + * + * ExchangeSource JoinBuild1 + * \ / + * JoinProbe1 (Right Outer) JoinBuild2 + * \ / + * JoinProbe2 (Right Outer) + * | + * Sink + * + * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should not be blocked by ExchangeSource + * because we have a determined conclusion that JoinProbe1/JoinProbe2 will also output 0 rows. + * + * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked by Sink because JoinProbe2 will + * produce more data. + * + * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be blocked by ExchangeSource + * and Sink because JoinProbe2 will always produce 0 rows and terminate early. + * + * In a nutshell, we should follow the rules: + * 1. if any operator in pipeline can terminate early, this task should never be blocked by source operator. + * 2. if the last operator (except sink) can terminate early, this task should never be blocked by sink operator. + */ + [[nodiscard]] virtual bool ignore_blocking_sink() { return _root->can_terminate_early(); } + + [[nodiscard]] virtual bool ignore_blocking_source() { + for (size_t i = 1; i < _operators.size(); i++) { + if (_operators[i]->can_terminate_early()) { + return true; + } + } + return false; + } Status finalize(); diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index 120e77785ed795..8756c24d20515a 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -74,6 +74,8 @@ class VJoinNodeBase : public ExecNode { virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + [[nodiscard]] bool can_terminate_early() override { return _short_circuit_for_probe; } + protected: // Construct the intermediate blocks to store the results from join operation. void _construct_mutable_join_block();