Skip to content

Commit

Permalink
fix union
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Feb 27, 2024
1 parent 32b5aef commit 289aca7
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
8 changes: 7 additions & 1 deletion be/src/pipeline/exec/union_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,16 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b
}
local_state.reached_limit(block, eos);
//have executing const expr, queue have no data anymore, and child could be closed
*eos = _child_size == 0 && !local_state._need_read_for_const_expr;
*eos = (_child_size == 0 && !local_state._need_read_for_const_expr) ||
(local_state._shared_state->data_queue.is_all_finish() && !_has_data(state));

return Status::OK();
}

bool UnionSourceOperatorX::need_data_from_children(RuntimeState* state) const {
return !_has_data(state);
}

Status UnionSourceOperatorX::get_next_const(RuntimeState* state, vectorized::Block* block) {
DCHECK_EQ(state->per_fragment_instance_idx(), 0);
auto& local_state = state->get_local_state(operator_id())->cast<UnionSourceLocalState>();
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/union_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,10 @@ class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {
}
[[nodiscard]] int get_child_count() const { return _child_size; }

bool need_data_from_children(RuntimeState* state) const override;

private:
bool _has_data(RuntimeState* state) {
bool _has_data(RuntimeState* state) const {
auto& local_state = state->get_local_state(operator_id())->cast<UnionSourceLocalState>();
if (_child_size == 0) {
return local_state._need_read_for_const_expr;
Expand Down
5 changes: 2 additions & 3 deletions be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -690,9 +690,8 @@ class StatefulOperatorX : public OperatorX<LocalStateType> {

[[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const = 0;

[[nodiscard]] bool need_data_from_children(RuntimeState* state) const override {
auto need_data = need_more_input_data(state);
if (need_data) {
bool need_data_from_children(RuntimeState* state) const override {
if (need_more_input_data(state)) {
return OperatorX<LocalStateType>::_child_x->need_data_from_children(state);
} else {
return false;
Expand Down

0 comments on commit 289aca7

Please sign in to comment.