Skip to content

Commit

Permalink
[pipelineX](refactor) remove source state from operator functions (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Feb 27, 2024
1 parent 1e91a38 commit 0cd0a94
Show file tree
Hide file tree
Showing 78 changed files with 360 additions and 482 deletions.
5 changes: 2 additions & 3 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -788,8 +788,7 @@ Status AggSinkOperatorX::open(RuntimeState* state) {
return Status::OK();
}

Status AggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) {
Status AggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
Expand All @@ -799,7 +798,7 @@ Status AggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_
RETURN_IF_ERROR(local_state.try_spill_disk());
local_state._executor->update_memusage(&local_state);
}
if (source_state == SourceState::FINISHED) {
if (eos) {
if (local_state._shared_state->spill_context.has_data) {
static_cast<void>(local_state.try_spill_disk(true));
RETURN_IF_ERROR(local_state._shared_state->spill_context.prepare_for_reading());
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;

DataDistribution required_data_distribution() const override {
if (_probe_expr_ctxs.empty()) {
Expand Down
70 changes: 32 additions & 38 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,16 @@ Status AggLocalState::_destroy_agg_status(vectorized::AggregateDataPtr data) {
}

Status AggLocalState::_serialize_with_serialized_key_result(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state) {
vectorized::Block* block, bool* eos) {
if (_shared_state->spill_context.has_data) {
return _serialize_with_serialized_key_result_with_spilt_data(state, block, source_state);
return _serialize_with_serialized_key_result_with_spilt_data(state, block, eos);
} else {
return _serialize_with_serialized_key_result_non_spill(state, block, source_state);
return _serialize_with_serialized_key_result_non_spill(state, block, eos);
}
}

Status AggLocalState::_serialize_with_serialized_key_result_with_spilt_data(
RuntimeState* state, vectorized::Block* block, SourceState& source_state) {
RuntimeState* state, vectorized::Block* block, bool* eos) {
CHECK(!_shared_state->spill_context.stream_ids.empty());
CHECK(_shared_state->spill_partition_helper != nullptr)
<< "_spill_partition_helper should not be null";
Expand All @@ -112,14 +111,12 @@ Status AggLocalState::_serialize_with_serialized_key_result_with_spilt_data(
_shared_state->aggregate_data_container->init_once();
}

RETURN_IF_ERROR(_serialize_with_serialized_key_result_non_spill(state, block, source_state));
if (source_state == SourceState::FINISHED) {
source_state = _shared_state->spill_context.read_cursor ==
_shared_state->spill_partition_helper->partition_count
? SourceState::FINISHED
: SourceState::DEPEND_ON_SOURCE;
RETURN_IF_ERROR(_serialize_with_serialized_key_result_non_spill(state, block, eos));
if (*eos) {
*eos = _shared_state->spill_context.read_cursor ==
_shared_state->spill_partition_helper->partition_count;
}
CHECK(!block->empty() || source_state == SourceState::FINISHED);
CHECK(!block->empty() || *eos);
return Status::OK();
}

Expand Down Expand Up @@ -153,7 +150,7 @@ Status AggLocalState::_reset_hash_table() {

Status AggLocalState::_serialize_with_serialized_key_result_non_spill(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state) {
bool* eos) {
SCOPED_TIMER(_serialize_result_timer);
auto& shared_state = *_shared_state;
int key_size = _shared_state->probe_expr_ctxs.size();
Expand Down Expand Up @@ -218,10 +215,10 @@ Status AggLocalState::_serialize_with_serialized_key_result_non_spill(RuntimeSta
agg_method.hash_table->template get_null_key_data<
vectorized::AggregateDataPtr>();
++num_rows;
source_state = SourceState::FINISHED;
*eos = true;
}
} else {
source_state = SourceState::FINISHED;
*eos = true;
}
}

Expand Down Expand Up @@ -265,16 +262,16 @@ Status AggLocalState::_serialize_with_serialized_key_result_non_spill(RuntimeSta
}

Status AggLocalState::_get_with_serialized_key_result(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
bool* eos) {
if (_shared_state->spill_context.has_data) {
return _get_result_with_spilt_data(state, block, source_state);
return _get_result_with_spilt_data(state, block, eos);
} else {
return _get_result_with_serialized_key_non_spill(state, block, source_state);
return _get_result_with_serialized_key_non_spill(state, block, eos);
}
}

Status AggLocalState::_get_result_with_spilt_data(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
bool* eos) {
CHECK(!_shared_state->spill_context.stream_ids.empty());
CHECK(_shared_state->spill_partition_helper != nullptr)
<< "_spill_partition_helper should not be null";
Expand All @@ -290,14 +287,12 @@ Status AggLocalState::_get_result_with_spilt_data(RuntimeState* state, vectorize
_shared_state->aggregate_data_container->init_once();
}

RETURN_IF_ERROR(_get_result_with_serialized_key_non_spill(state, block, source_state));
if (source_state == SourceState::FINISHED) {
source_state = _shared_state->spill_context.read_cursor ==
_shared_state->spill_partition_helper->partition_count
? SourceState::FINISHED
: SourceState::DEPEND_ON_SOURCE;
RETURN_IF_ERROR(_get_result_with_serialized_key_non_spill(state, block, eos));
if (*eos) {
*eos = _shared_state->spill_context.read_cursor ==
_shared_state->spill_partition_helper->partition_count;
}
CHECK(!block->empty() || source_state == SourceState::FINISHED);
CHECK(!block->empty() || *eos);
return Status::OK();
}

Expand All @@ -324,7 +319,7 @@ Status AggLocalState::_merge_spilt_data() {

Status AggLocalState::_get_result_with_serialized_key_non_spill(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state) {
bool* eos) {
auto& shared_state = *_shared_state;
// non-nullable column(id in `_make_nullable_keys`) will be converted to nullable.
bool mem_reuse = shared_state.make_nullable_keys.empty() && block->mem_reuse();
Expand Down Expand Up @@ -402,10 +397,10 @@ Status AggLocalState::_get_result_with_serialized_key_non_spill(RuntimeState* st
shared_state.aggregate_evaluators[i]->insert_result_info(
mapped + shared_state.offsets_of_aggregate_states[i],
value_columns[i].get());
source_state = SourceState::FINISHED;
*eos = true;
}
} else {
source_state = SourceState::FINISHED;
*eos = true;
}
}
},
Expand All @@ -428,14 +423,14 @@ Status AggLocalState::_get_result_with_serialized_key_non_spill(RuntimeState* st
}

Status AggLocalState::_serialize_without_key(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
bool* eos) {
auto& shared_state = *_shared_state;
// 1. `child(0)->rows_returned() == 0` mean not data from child
// in level two aggregation node should return NULL result
// level one aggregation node set `eos = true` return directly
SCOPED_TIMER(_serialize_result_timer);
if (UNLIKELY(_shared_state->input_num_rows == 0)) {
source_state = SourceState::FINISHED;
*eos = true;
return Status::OK();
}
block->clear();
Expand Down Expand Up @@ -468,12 +463,12 @@ Status AggLocalState::_serialize_without_key(RuntimeState* state, vectorized::Bl
}

block->set_columns(std::move(value_columns));
source_state = SourceState::FINISHED;
*eos = true;
return Status::OK();
}

Status AggLocalState::_get_without_key_result(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
bool* eos) {
auto& shared_state = *_shared_state;
DCHECK(_agg_data->without_key != nullptr);
block->clear();
Expand Down Expand Up @@ -521,7 +516,7 @@ Status AggLocalState::_get_without_key_result(RuntimeState* state, vectorized::B
}

block->set_columns(std::move(columns));
source_state = SourceState::FINISHED;
*eos = true;
return Status::OK();
}

Expand All @@ -531,15 +526,14 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode,
_needs_finalize(tnode.agg_node.need_finalize),
_without_key(tnode.agg_node.grouping_exprs.empty()) {}

Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(local_state._executor.get_result(state, block, source_state));
RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos));
local_state.make_nullable_output_key(block);
// dispose the having clause, should not be execute in prestreaming agg
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, block, block->columns()));
local_state.reached_limit(block, source_state);
local_state.reached_limit(block, eos);
return Status::OK();
}

Expand Down
27 changes: 11 additions & 16 deletions be/src/pipeline/exec/aggregation_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,21 @@ class AggLocalState final : public PipelineXLocalState<AggSharedState> {
protected:
friend class AggSourceOperatorX;

Status _get_without_key_result(RuntimeState* state, vectorized::Block* block,
SourceState& source_state);
Status _serialize_without_key(RuntimeState* state, vectorized::Block* block,
SourceState& source_state);
Status _get_without_key_result(RuntimeState* state, vectorized::Block* block, bool* eos);
Status _serialize_without_key(RuntimeState* state, vectorized::Block* block, bool* eos);
Status _get_with_serialized_key_result(RuntimeState* state, vectorized::Block* block,
SourceState& source_state);
bool* eos);
Status _serialize_with_serialized_key_result(RuntimeState* state, vectorized::Block* block,
SourceState& source_state);
bool* eos);
Status _get_result_with_serialized_key_non_spill(RuntimeState* state, vectorized::Block* block,
SourceState& source_state);
Status _get_result_with_spilt_data(RuntimeState* state, vectorized::Block* block,
SourceState& source_state);
bool* eos);
Status _get_result_with_spilt_data(RuntimeState* state, vectorized::Block* block, bool* eos);

Status _serialize_with_serialized_key_result_non_spill(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state);
vectorized::Block* block, bool* eos);
Status _serialize_with_serialized_key_result_with_spilt_data(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state);
bool* eos);
Status _destroy_agg_status(vectorized::AggregateDataPtr data);
Status _reset_hash_table();
Status _merge_spilt_data();
Expand All @@ -105,8 +101,8 @@ class AggLocalState final : public PipelineXLocalState<AggSharedState> {
RuntimeProfile::Counter* _serialize_data_timer = nullptr;
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;

using vectorized_get_result = std::function<Status(
RuntimeState* state, vectorized::Block* block, SourceState& source_state)>;
using vectorized_get_result =
std::function<Status(RuntimeState* state, vectorized::Block* block, bool* eos)>;

struct executor {
vectorized_get_result get_result;
Expand All @@ -124,8 +120,7 @@ class AggSourceOperatorX : public OperatorX<AggLocalState> {
const DescriptorTbl& descs);
~AggSourceOperatorX() = default;

Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;

bool is_source() const override { return true; }

Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,11 @@ Status AnalyticSinkOperatorX::open(RuntimeState* state) {
}

Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* input_block,
SourceState source_state) {
bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows());
local_state._shared_state->input_eos = source_state == SourceState::FINISHED;
local_state._shared_state->input_eos = eos;
if (local_state._shared_state->input_eos && input_block->rows() == 0) {
local_state._dependency->set_ready_to_read();
local_state._dependency->block();
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
DataDistribution required_data_distribution() const override {
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,13 +516,13 @@ Status AnalyticSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state
}

Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
if (local_state._shared_state->input_eos &&
(local_state._output_block_index == local_state._shared_state->input_blocks.size() ||
local_state._shared_state->input_total_rows == 0)) {
source_state = SourceState::FINISHED;
*eos = true;
return Status::OK();
}

Expand All @@ -548,7 +548,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block
RETURN_IF_ERROR(local_state.output_current_block(block));
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block,
block->columns()));
local_state.reached_limit(block, source_state);
local_state.reached_limit(block, eos);
return Status::OK();
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/analytic_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ class AnalyticSourceOperatorX final : public OperatorX<AnalyticLocalState> {
AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs);

Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;

bool is_source() const override { return true; }

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/assert_num_rows_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ AssertNumRowsOperatorX::AssertNumRowsOperatorX(ObjectPool* pool, const TPlanNode
}

Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
local_state.add_num_rows_returned(block->rows());
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/assert_num_rows_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class AssertNumRowsOperatorX final : public StreamingOperatorX<AssertNumRowsLoca
AssertNumRowsOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs);

Status pull(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override;
Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) override;

[[nodiscard]] bool is_source() const override { return false; }

Expand Down
9 changes: 3 additions & 6 deletions be/src/pipeline/exec/datagen_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,16 @@ Status DataGenSourceOperatorX::prepare(RuntimeState* state) {
return Status::OK();
}

Status DataGenSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
Status DataGenSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) {
if (state == nullptr || block == nullptr) {
return Status::InternalError("input is NULL pointer");
}
RETURN_IF_CANCELLED(state);
auto& local_state = get_local_state(state);
bool eos = false;
Status res = local_state._table_func->get_next(state, block, &eos);
source_state = eos ? SourceState::FINISHED : source_state;
Status res = local_state._table_func->get_next(state, block, eos);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block,
block->columns()));
local_state.reached_limit(block, source_state);
local_state.reached_limit(block, eos);
return res;
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/datagen_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ class DataGenSourceOperatorX final : public OperatorX<DataGenLocalState> {

Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;

[[nodiscard]] bool is_source() const override { return true; }

Expand Down
Loading

0 comments on commit 0cd0a94

Please sign in to comment.