Skip to content

Commit

Permalink
[Improvement](pipeline) Terminate early for short-circuit join (#23378)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Aug 23, 2023
1 parent c827aa9 commit b918840
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 6 deletions.
2 changes: 2 additions & 0 deletions be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ class ExecNode {

bool can_read() const { return _can_read; }

[[nodiscard]] virtual bool can_terminate_early() { return false; }

// Sink Data to ExecNode to do some stock work, both need impl with method: get_result
// `eos` means source is exhausted, exec node should do some finalize work
// Eg: Aggregation, Sort
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ class OperatorBase {

virtual bool can_write() { return false; } // for sink

[[nodiscard]] virtual bool can_terminate_early() { return false; }

/**
* The main method to execute a pipeline task.
* Now it is a pull-based pipeline and operators pull data from its child by this method.
Expand Down Expand Up @@ -321,6 +323,8 @@ class StreamingOperator : public OperatorBase {

~StreamingOperator() override = default;

[[nodiscard]] bool can_terminate_early() override { return _node->can_terminate_early(); }

Status prepare(RuntimeState* state) override {
_node->increase_ref();
_use_projection = _node->has_output_row_descriptor();
Expand Down
8 changes: 4 additions & 4 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,23 +222,23 @@ Status PipelineTask::execute(bool* eos) {
set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
return Status::OK();
}
if (!_source->can_read()) {
if (!source_can_read()) {
set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
return Status::OK();
}
if (!_sink->can_write()) {
if (!sink_can_write()) {
set_state(PipelineTaskState::BLOCKED_FOR_SINK);
return Status::OK();
}
}

this->set_begin_execute_time();
while (!_fragment_context->is_canceled()) {
if (_data_state != SourceState::MORE_DATA && !_source->can_read()) {
if (_data_state != SourceState::MORE_DATA && !source_can_read()) {
set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
break;
}
if (!_sink->can_write()) {
if (!sink_can_write()) {
set_state(PipelineTaskState::BLOCKED_FOR_SINK);
break;
}
Expand Down
38 changes: 36 additions & 2 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,47 @@ class PipelineTask {
return false;
}

bool source_can_read() { return _source->can_read(); }
bool source_can_read() { return _source->can_read() || ignore_blocking_source();; }

bool runtime_filters_are_ready_or_timeout() {
return _source->runtime_filters_are_ready_or_timeout();
}

bool sink_can_write() { return _sink->can_write(); }
bool sink_can_write() { return _sink->can_write() || ignore_blocking_sink(); }
/**
* Consider the query plan below:
*
* ExchangeSource JoinBuild1
* \ /
* JoinProbe1 (Right Outer) JoinBuild2
* \ /
* JoinProbe2 (Right Outer)
* |
* Sink
*
* Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should not be blocked by ExchangeSource
* because we have a determined conclusion that JoinProbe1/JoinProbe2 will also output 0 rows.
*
* Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked by Sink because JoinProbe2 will
* produce more data.
*
* Assume both JoinBuild2 outputs 0 rows this pipeline task should not be blocked by ExchangeSource
* and Sink because JoinProbe2 will always produce 0 rows and terminate early.
*
* In a nutshell, we should follow the rules:
* 1. if any operator in pipeline can terminate early, this task should never be blocked by source operator.
* 2. if the last operator (except sink) can terminate early, this task should never be blocked by sink operator.
*/
[[nodiscard]] virtual bool ignore_blocking_sink() { return _root->can_terminate_early(); }

[[nodiscard]] virtual bool ignore_blocking_source() {
for (size_t i = 1; i < _operators.size(); i++) {
if (_operators[i]->can_terminate_early()) {
return true;
}
}
return false;
}

Status finalize();

Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/join/vjoin_node_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ class VJoinNodeBase : public ExecNode {

virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;

[[nodiscard]] bool can_terminate_early() override { return _short_circuit_for_probe; }

protected:
// Construct the intermediate blocks to store the results from join operation.
void _construct_mutable_join_block();
Expand Down

0 comments on commit b918840

Please sign in to comment.