diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 90608a4f039801..70d56a923eab2f 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1717,7 +1717,9 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t if (config::is_cloud_mode()) { auto st = config::set_config("enable_file_cache", "true", true, true); - LOG(INFO) << "set config enable_file_cache " << "true" << " " << st; + LOG(INFO) << "set config enable_file_cache " + << "true" + << " " << st; } return true; diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index a287d7fb2786b2..260a599a947a0d 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -769,12 +769,13 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { return Status::OK(); } -Status AggSinkOperatorX::prepare(RuntimeState* state) { +Status AggSinkOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX::open(state)); _intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id); _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size()); RETURN_IF_ERROR(vectorized::VExpr::prepare( - _probe_expr_ctxs, state, DataSinkOperatorX::_child_x->row_desc())); + _probe_expr_ctxs, state, DataSinkOperatorX::_child->row_desc())); int j = _probe_expr_ctxs.size(); for (int i = 0; i < j; ++i) { @@ -789,7 +790,7 @@ Status AggSinkOperatorX::prepare(RuntimeState* state) { SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j]; SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j]; RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare( - state, DataSinkOperatorX::_child_x->row_desc(), + state, DataSinkOperatorX::_child->row_desc(), intermediate_slot_desc, output_slot_desc)); _aggregate_evaluators[i]->set_version(state->be_exec_version()); } @@ -824,10 +825,6 @@ Status AggSinkOperatorX::prepare(RuntimeState* state) { RETURN_IF_ERROR(vectorized::AggFnEvaluator::check_agg_fn_output( _probe_expr_ctxs.size(), _aggregate_evaluators, _agg_fn_output_row_descriptor)); } - return Status::OK(); -} - -Status AggSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state)); for (auto& _aggregate_evaluator : _aggregate_evaluators) { diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index f7b225311a3c06..7c146c38a2b135 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -137,19 +137,18 @@ class AggSinkOperatorX final : public DataSinkOperatorX { Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_probe_expr_ctxs.empty()) { - return _needs_finalize || DataSinkOperatorX::_child_x + return _needs_finalize || DataSinkOperatorX::_child ->ignore_data_distribution() ? DataDistribution(ExchangeType::PASSTHROUGH) : DataSinkOperatorX::required_data_distribution(); } - return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 43859c7cebd120..85d7773bdbd025 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -231,13 +231,14 @@ Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) return Status::OK(); } -Status AnalyticSinkOperatorX::prepare(RuntimeState* state) { +Status AnalyticSinkOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX::open(state)); for (const auto& ctx : _agg_expr_ctxs) { - RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child_x->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child->row_desc())); } if (!_partition_by_eq_expr_ctxs.empty() || !_order_by_eq_expr_ctxs.empty()) { vector tuple_ids; - tuple_ids.push_back(_child_x->row_desc().tuple_descriptors()[0]->id()); + tuple_ids.push_back(_child->row_desc().tuple_descriptors()[0]->id()); tuple_ids.push_back(_buffered_tuple_id); RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids, vector(2, false)); if (!_partition_by_eq_expr_ctxs.empty()) { @@ -249,10 +250,6 @@ Status AnalyticSinkOperatorX::prepare(RuntimeState* state) { vectorized::VExpr::prepare(_order_by_eq_expr_ctxs, state, cmp_row_desc)); } } - return Status::OK(); -} - -Status AnalyticSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(vectorized::VExpr::open(_partition_by_eq_expr_ctxs, state)); RETURN_IF_ERROR(vectorized::VExpr::open(_order_by_eq_expr_ctxs, state)); for (size_t i = 0; i < _agg_functions_size; ++i) { diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 6d713996b9cd72..8f7a9d1c35ae38 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -74,7 +74,6 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX::close(state); } -Status AnalyticSourceOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(OperatorX::prepare(state)); - DCHECK(_child_x->row_desc().is_prefix_of(_row_descriptor)); +Status AnalyticSourceOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(OperatorX::open(state)); + DCHECK(_child->row_desc().is_prefix_of(_row_descriptor)); _intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id); _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); for (size_t i = 0; i < _agg_functions.size(); ++i) { SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[i]; SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i]; - RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child_x->row_desc(), + RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child->row_desc(), intermediate_slot_desc, output_slot_desc)); _agg_functions[i]->set_version(state->be_exec_version()); _change_to_nullable_flags.push_back(output_slot_desc->is_nullable() && @@ -597,11 +597,6 @@ Status AnalyticSourceOperatorX::prepare(RuntimeState* state) { alignment_of_next_state * alignment_of_next_state; } } - return Status::OK(); -} - -Status AnalyticSourceOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(OperatorX::open(state)); for (auto* agg_function : _agg_functions) { RETURN_IF_ERROR(agg_function->open(state)); } diff --git a/be/src/pipeline/exec/analytic_source_operator.h b/be/src/pipeline/exec/analytic_source_operator.h index b939e0e9efaddb..38323f1b86bce2 100644 --- a/be/src/pipeline/exec/analytic_source_operator.h +++ b/be/src/pipeline/exec/analytic_source_operator.h @@ -122,7 +122,6 @@ class AnalyticSourceOperatorX final : public OperatorX { bool is_source() const override { return true; } Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; private: diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index dae39f179a68f2..93b3d058154e62 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -50,8 +50,8 @@ Status DataGenSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) return Status::OK(); } -Status DataGenSourceOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(OperatorX::prepare(state)); +Status DataGenSourceOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(OperatorX::open(state)); // get tuple desc _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); diff --git a/be/src/pipeline/exec/datagen_operator.h b/be/src/pipeline/exec/datagen_operator.h index 8aeeea2a699824..c63ef97bb7a40f 100644 --- a/be/src/pipeline/exec/datagen_operator.h +++ b/be/src/pipeline/exec/datagen_operator.h @@ -52,7 +52,7 @@ class DataGenSourceOperatorX final : public OperatorX { const DescriptorTbl& descs); Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status prepare(RuntimeState* state) override; + Status open(RuntimeState* state) override; Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; [[nodiscard]] bool is_source() const override { return true; } diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index ff8b20ebe6098a..70b73225f060e8 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -369,12 +369,12 @@ Status DistinctStreamingAggOperatorX::init(const TPlanNode& tnode, RuntimeState* return Status::OK(); } -Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(StatefulOperatorX::prepare(state)); +Status DistinctStreamingAggOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(StatefulOperatorX::open(state)); _intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id); _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size()); - RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child_x->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child->row_desc())); int j = _probe_expr_ctxs.size(); for (int i = 0; i < j; ++i) { @@ -389,7 +389,7 @@ Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) { SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j]; SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j]; RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare( - state, _child_x->row_desc(), intermediate_slot_desc, output_slot_desc)); + state, _child->row_desc(), intermediate_slot_desc, output_slot_desc)); _aggregate_evaluators[i]->set_version(state->be_exec_version()); } @@ -412,12 +412,6 @@ Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) { alignment_of_next_state * alignment_of_next_state; } } - - return Status::OK(); -} - -Status DistinctStreamingAggOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(StatefulOperatorX::open(state)); RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state)); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index 8ec1d18fd9e60f..edeb432176379d 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -98,7 +98,6 @@ class DistinctStreamingAggOperatorX final DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, const DescriptorTbl& descs, bool require_bucket_distribution); Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) const override; Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override; @@ -106,7 +105,7 @@ class DistinctStreamingAggOperatorX final DataDistribution required_data_distribution() const override { if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) { - return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } diff --git a/be/src/pipeline/exec/es_scan_operator.cpp b/be/src/pipeline/exec/es_scan_operator.cpp index 5c5f1b3eb8021e..c7e953a7fa3201 100644 --- a/be/src/pipeline/exec/es_scan_operator.cpp +++ b/be/src/pipeline/exec/es_scan_operator.cpp @@ -138,8 +138,8 @@ Status EsScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { return Status::OK(); } -Status EsScanOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(ScanOperatorX::prepare(state)); +Status EsScanOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(ScanOperatorX::open(state)); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); if (_tuple_desc == nullptr) { diff --git a/be/src/pipeline/exec/es_scan_operator.h b/be/src/pipeline/exec/es_scan_operator.h index 5f66c0a100fa8f..4e80150d0ba8c6 100644 --- a/be/src/pipeline/exec/es_scan_operator.h +++ b/be/src/pipeline/exec/es_scan_operator.h @@ -72,7 +72,7 @@ class EsScanOperatorX final : public ScanOperatorX { const DescriptorTbl& descs, int parallel_tasks); Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status prepare(RuntimeState* state) override; + Status open(RuntimeState* state) override; private: friend class EsScanLocalState; diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index c5a4629b26c99f..db5c4c78a3129a 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -114,19 +114,20 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { auto& p = _parent->cast(); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM || + _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { + std::random_device rd; + std::mt19937 g(rd()); + shuffle(channels.begin(), channels.end(), g); + } int local_size = 0; for (int i = 0; i < channels.size(); ++i) { RETURN_IF_ERROR(channels[i]->open(state)); if (channels[i]->is_local()) { local_size++; + _last_local_channel_idx = i; } } - if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM || - _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { - std::random_device rd; - std::mt19937 g(rd()); - shuffle(channels.begin(), channels.end(), g); - } only_local_exchange = local_size == channels.size(); PUniqueId id; @@ -335,19 +336,15 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { return Status::OK(); } -Status ExchangeSinkOperatorX::prepare(RuntimeState* state) { +Status ExchangeSinkOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX::open(state)); _state = state; _mem_tracker = std::make_unique("ExchangeSinkOperatorX:"); - return Status::OK(); -} - -Status ExchangeSinkOperatorX::open(RuntimeState* state) { - DCHECK(state != nullptr); _compression_type = state->fragement_transmission_compression_type(); if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { if (_output_tuple_id == -1) { RETURN_IF_ERROR( - vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, _child_x->row_desc())); + vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, _child->row_desc())); } else { auto* output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); auto* output_row_desc = _pool->add(new RowDescriptor(output_tuple_desc, false)); @@ -391,11 +388,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (local_state.only_local_exchange) { if (!block->empty()) { Status status; + size_t idx = 0; for (auto* channel : local_state.channels) { if (!channel->is_receiver_eof()) { - status = channel->send_local_block(block); + // If this channel is the last, we can move this block to downstream pipeline. + // Otherwise, this block also need to be broadcasted to other channels so should be copied. + DCHECK_GE(local_state._last_local_channel_idx, 0); + status = channel->send_local_block( + block, idx == local_state._last_local_channel_idx); HANDLE_CHANNEL_STATUS(state, channel, status); } + idx++; } } } else { @@ -418,21 +421,33 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block local_state._broadcast_pb_mem_limiter->acquire(*block_holder); + size_t idx = 0; + bool moved = false; for (auto* channel : local_state.channels) { if (!channel->is_receiver_eof()) { Status status; if (channel->is_local()) { - status = channel->send_local_block(&cur_block); + // If this channel is the last, we can move this block to downstream pipeline. + // Otherwise, this block also need to be broadcasted to other channels so should be copied. + DCHECK_GE(local_state._last_local_channel_idx, 0); + status = channel->send_local_block( + &cur_block, idx == local_state._last_local_channel_idx); + moved = idx == local_state._last_local_channel_idx; } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); status = channel->send_broadcast_block(block_holder, eos); } HANDLE_CHANNEL_STATUS(state, channel, status); } + idx++; + } + if (moved) { + local_state._serializer.reset_block(); + } else { + cur_block.clear_column_data(); + local_state._serializer.get_block()->set_mutable_columns( + cur_block.mutate_columns()); } - cur_block.clear_column_data(); - local_state._serializer.get_block()->set_mutable_columns( - cur_block.mutate_columns()); } } } @@ -443,7 +458,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (!current_channel->is_receiver_eof()) { // 2. serialize, send and rollover block if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block); + auto status = current_channel->send_local_block(block, true); HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -529,7 +544,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (!current_channel->is_receiver_eof()) { // 2. serialize, send and rollover block if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block); + auto status = current_channel->send_local_block(block, true); HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -686,10 +701,10 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { } DataDistribution ExchangeSinkOperatorX::required_data_distribution() const { - if (_child_x && _enable_local_merge_sort) { + if (_child && _enable_local_merge_sort) { // SORT_OPERATOR -> DATA_STREAM_SINK_OPERATOR // SORT_OPERATOR -> LOCAL_MERGE_SORT -> DATA_STREAM_SINK_OPERATOR - if (auto sort_source = std::dynamic_pointer_cast(_child_x); + if (auto sort_source = std::dynamic_pointer_cast(_child); sort_source && sort_source->use_local_merge()) { // Sort the data local return ExchangeType::LOCAL_MERGE_SORT; diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 66e425baad9792..c60cefabfa8380 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -202,6 +202,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { // for external table sink hash partition std::unique_ptr _partition_function = nullptr; std::atomic _reach_limit = false; + int _last_local_channel_idx = -1; }; class ExchangeSinkOperatorX final : public DataSinkOperatorX { @@ -213,7 +214,6 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX::prepare(state)); +Status ExchangeSourceOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(OperatorX::open(state)); DCHECK_GT(_num_senders, 0); if (_is_merging) { RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _row_descriptor, _row_descriptor)); } - - return Status::OK(); -} - -Status ExchangeSourceOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(OperatorX::open(state)); if (_is_merging) { RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); } diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 4dc7d1e55a2adc..0fe3dcbb590b7d 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -66,7 +66,6 @@ class ExchangeSourceOperatorX final : public OperatorX { ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs, int num_senders); Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 8c1e4d19407ff6..6fa7401e278451 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -108,8 +108,8 @@ Status FileScanLocalState::_process_conjuncts(RuntimeState* state) { return Status::OK(); } -Status FileScanOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(ScanOperatorX::prepare(state)); +Status FileScanOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(ScanOperatorX::open(state)); if (state->get_query_ctx() != nullptr && state->get_query_ctx()->file_scan_range_params_map.contains(node_id())) { TFileScanRangeParams& params = diff --git a/be/src/pipeline/exec/file_scan_operator.h b/be/src/pipeline/exec/file_scan_operator.h index a9a25cf6c31c23..2777a013d62f61 100644 --- a/be/src/pipeline/exec/file_scan_operator.h +++ b/be/src/pipeline/exec/file_scan_operator.h @@ -76,7 +76,7 @@ class FileScanOperatorX final : public ScanOperatorX { _output_tuple_id = tnode.file_scan_node.tuple_id; } - Status prepare(RuntimeState* state) override; + Status open(RuntimeState* state) override; bool is_file_scan_operator() const override { return true; } diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp index 9bac6d4ed29780..6db49bb7ab1089 100644 --- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp @@ -259,19 +259,15 @@ Status GroupCommitBlockSinkOperatorX::init(const TDataSink& t_sink) { return Status::OK(); } -Status GroupCommitBlockSinkOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(Base::prepare(state)); +Status GroupCommitBlockSinkOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(Base::open(state)); // get table's tuple descriptor _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id); if (_output_tuple_desc == nullptr) { LOG(WARNING) << "unknown destination tuple descriptor, id=" << _tuple_desc_id; return Status::InternalError("unknown destination tuple descriptor"); } - return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc); -} - -Status GroupCommitBlockSinkOperatorX::open(RuntimeState* state) { - // Prepare the exprs to run. + RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); return vectorized::VExpr::open(_output_vexpr_ctxs, state); } diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h b/be/src/pipeline/exec/group_commit_block_sink_operator.h index caf7017d050a6c..32ca0613652ae4 100644 --- a/be/src/pipeline/exec/group_commit_block_sink_operator.h +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h @@ -94,8 +94,6 @@ class GroupCommitBlockSinkOperatorX final Status init(const TDataSink& sink) override; - Status prepare(RuntimeState* state) override; - Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* block, bool eos) override; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 6e1f7c688530b7..8f7b176a979a4d 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -429,18 +429,6 @@ HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int ope : std::vector {}), _need_local_merge(need_local_merge) {} -Status HashJoinBuildSinkOperatorX::prepare(RuntimeState* state) { - if (_is_broadcast_join) { - if (state->enable_share_hash_table_for_broadcast_join()) { - _shared_hashtable_controller = - state->get_query_ctx()->get_shared_hash_table_controller(); - _shared_hash_table_context = _shared_hashtable_controller->get_context(node_id()); - } - } - RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, _child_x->row_desc())); - return Status::OK(); -} - Status HashJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(JoinBuildSinkOperatorX::init(tnode, state)); DCHECK(tnode.__isset.hash_join_node); @@ -498,6 +486,15 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st } Status HashJoinBuildSinkOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(JoinBuildSinkOperatorX::open(state)); + if (_is_broadcast_join) { + if (state->enable_share_hash_table_for_broadcast_join()) { + _shared_hashtable_controller = + state->get_query_ctx()->get_shared_hash_table_controller(); + _shared_hash_table_context = _shared_hashtable_controller->get_context(node_id()); + } + } + RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, _child->row_desc())); return vectorized::VExpr::open(_build_expr_ctxs, state); } @@ -514,7 +511,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* if (local_state._build_side_mutable_block.empty()) { auto tmp_build_block = vectorized::VectorizedUtils::create_empty_columnswithtypename( - _child_x->row_desc()); + _child->row_desc()); tmp_build_block = *(tmp_build_block.create_same_struct_block(1, false)); local_state._build_col_ids.resize(_build_expr_ctxs.size()); RETURN_IF_ERROR(local_state._do_evaluate(tmp_build_block, local_state._build_expr_ctxs, diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 4dba04559b4533..1ae9d5ae1a71f8 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -118,7 +118,6 @@ class HashJoinBuildSinkOperatorX final Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; @@ -133,9 +132,8 @@ class HashJoinBuildSinkOperatorX final if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { return {ExchangeType::NOOP}; } else if (_is_broadcast_join) { - return _child_x->ignore_data_distribution() - ? DataDistribution(ExchangeType::PASS_TO_ONE) - : DataDistribution(ExchangeType::NOOP); + return _child->ignore_data_distribution() ? DataDistribution(ExchangeType::PASS_TO_ONE) + : DataDistribution(ExchangeType::NOOP); } return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || _join_distribution == TJoinDistributionType::COLOCATE @@ -146,7 +144,7 @@ class HashJoinBuildSinkOperatorX final bool require_shuffled_data_distribution() const override { return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join; } - bool is_shuffled_hash_join() const override { + bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } bool require_data_distribution() const override { diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index d953e80b70150f..f91e1eaa2a1b17 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -276,7 +276,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc /// increase the output rows count(just same as `_probe_block`'s rows count). RETURN_IF_ERROR(local_state.filter_data_and_build_output(state, output_block, eos, &local_state._probe_block, false)); - local_state._probe_block.clear_column_data(_child_x->row_desc().num_materialized_slots()); + local_state._probe_block.clear_column_data(_child->row_desc().num_materialized_slots()); return Status::OK(); } @@ -580,8 +580,8 @@ Status HashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state) return Status::OK(); } -Status HashJoinProbeOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(JoinProbeOperatorX::prepare(state)); +Status HashJoinProbeOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(JoinProbeOperatorX::open(state)); // init left/right output slots flags, only column of slot_id in _hash_output_slot_ids need // insert to output block of hash join. // _left_output_slots_flags : column of left table need to output set flag = true @@ -597,10 +597,9 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState* state) { } } }; - init_output_slots_flags(_child_x->row_desc().tuple_descriptors(), _left_output_slot_flags); + init_output_slots_flags(_child->row_desc().tuple_descriptors(), _left_output_slot_flags); init_output_slots_flags(_build_side_child->row_desc().tuple_descriptors(), _right_output_slot_flags); - RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc)); // _other_join_conjuncts are evaluated in the context of the rows produced by this node for (auto& conjunct : _other_join_conjuncts) { RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc)); @@ -610,12 +609,12 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState* state) { RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc)); } - RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child_x->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child->row_desc())); DCHECK(_build_side_child != nullptr); // right table data types _right_table_data_types = vectorized::VectorizedUtils::get_data_types(_build_side_child->row_desc()); - _left_table_data_types = vectorized::VectorizedUtils::get_data_types(_child_x->row_desc()); + _left_table_data_types = vectorized::VectorizedUtils::get_data_types(_child->row_desc()); _right_table_column_names = vectorized::VectorizedUtils::get_column_names(_build_side_child->row_desc()); @@ -667,11 +666,6 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState* state) { } _build_side_child.reset(); - return Status::OK(); -} - -Status HashJoinProbeOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(JoinProbeOperatorX::open(state)); RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state)); for (auto& conjunct : _other_join_conjuncts) { RETURN_IF_ERROR(conjunct->open(state)); diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 69ab0808be4b37..dde9c00dfe4944 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -132,7 +132,6 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX::prepare(state)); - // Prepare the exprs to run. - RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); - return Status::OK(); -} - Status JdbcTableSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::open(state)); - // Prepare the exprs to run. + RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state)); return Status::OK(); } diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.h b/be/src/pipeline/exec/jdbc_table_sink_operator.h index 0565dfe4d9821e..3ea702fd0baf0a 100644 --- a/be/src/pipeline/exec/jdbc_table_sink_operator.h +++ b/be/src/pipeline/exec/jdbc_table_sink_operator.h @@ -44,7 +44,6 @@ class JdbcTableSinkOperatorX final : public DataSinkOperatorX& select_exprs); Status init(const TDataSink& thrift_sink) override; - Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; diff --git a/be/src/pipeline/exec/join_probe_operator.cpp b/be/src/pipeline/exec/join_probe_operator.cpp index 31331568aacd6f..05c62544d2b7ce 100644 --- a/be/src/pipeline/exec/join_probe_operator.cpp +++ b/be/src/pipeline/exec/join_probe_operator.cpp @@ -262,6 +262,7 @@ Status JoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeS template Status JoinProbeOperatorX::open(doris::RuntimeState* state) { RETURN_IF_ERROR(Base::open(state)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc)); return vectorized::VExpr::open(_output_expr_ctxs, state); } diff --git a/be/src/pipeline/exec/join_probe_operator.h b/be/src/pipeline/exec/join_probe_operator.h index 60600f1ba86d63..3f68c73d04b161 100644 --- a/be/src/pipeline/exec/join_probe_operator.h +++ b/be/src/pipeline/exec/join_probe_operator.h @@ -80,17 +80,17 @@ class JoinProbeOperatorX : public StatefulOperatorX { [[nodiscard]] bool is_source() const override { return false; } - void set_build_side_child(OperatorXPtr& build_side_child) { + void set_build_side_child(OperatorPtr& build_side_child) { _build_side_child = build_side_child; } - Status set_child(OperatorXPtr child) override { - if (OperatorX::_child_x && _build_side_child == nullptr) { + Status set_child(OperatorPtr child) override { + if (OperatorX::_child && _build_side_child == nullptr) { // when there already (probe) child, others is build child. set_build_side_child(child); } else { // first child which is probe side is in this pipeline - OperatorX::_child_x = std::move(child); + OperatorX::_child = std::move(child); } return Status::OK(); } @@ -114,7 +114,7 @@ class JoinProbeOperatorX : public StatefulOperatorX { std::unique_ptr _intermediate_row_desc; // output expr vectorized::VExprContextSPtrs _output_expr_ctxs; - OperatorXPtr _build_side_child = nullptr; + OperatorPtr _build_side_child = nullptr; const bool _short_circuit_for_null_in_build_side; // In the Old planner, there is a plan for two columns of tuple is null, // but in the Nereids planner, this logic does not exist. diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp index e59ccf0333eed8..69e30791c139af 100644 --- a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp +++ b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp @@ -72,17 +72,10 @@ Status MemoryScratchSinkOperatorX::init(const TDataSink& thrift_sink) { return Status::OK(); } -Status MemoryScratchSinkOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(DataSinkOperatorX::prepare(state)); - // Prepare the exprs to run. - RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); - _timezone_obj = state->timezone_obj(); - return Status::OK(); -} - Status MemoryScratchSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::open(state)); - // Prepare the exprs to run. + RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); + _timezone_obj = state->timezone_obj(); RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state)); return Status::OK(); } diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.h b/be/src/pipeline/exec/memory_scratch_sink_operator.h index 114e1e40fb17d6..c2cd78c7cd5aee 100644 --- a/be/src/pipeline/exec/memory_scratch_sink_operator.h +++ b/be/src/pipeline/exec/memory_scratch_sink_operator.h @@ -52,7 +52,6 @@ class MemoryScratchSinkOperatorX final : public DataSinkOperatorX& t_output_expr); Status init(const TDataSink& thrift_sink) override; - Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index c71310e3ee3327..76472f3ce85e83 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -84,8 +84,8 @@ class MultiCastDataStreamerSourceOperatorX final }; ~MultiCastDataStreamerSourceOperatorX() override = default; - Status prepare(RuntimeState* state) override { - RETURN_IF_ERROR(Base::prepare(state)); + Status open(RuntimeState* state) override { + RETURN_IF_ERROR(Base::open(state)); // init profile for runtime filter // RuntimeFilterConsumer::_init_profile(local_state._shared_state->_multi_cast_data_streamer->profile()); if (_t_data_stream_sink.__isset.output_exprs) { @@ -99,11 +99,6 @@ class MultiCastDataStreamerSourceOperatorX final conjuncts())); RETURN_IF_ERROR(vectorized::VExpr::prepare(conjuncts(), state, _row_desc())); } - return Status::OK(); - } - - Status open(RuntimeState* state) override { - RETURN_IF_ERROR(Base::open(state)); if (_t_data_stream_sink.__isset.output_exprs) { RETURN_IF_ERROR(vectorized::VExpr::open(_output_expr_contexts, state)); } diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index 9e44a399bd8ad0..793a37c7396a61 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -107,20 +107,16 @@ Status NestedLoopJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeSta return Status::OK(); } -Status NestedLoopJoinBuildSinkOperatorX::prepare(RuntimeState* state) { - // pre-compute the tuple index of build tuples in the output row - int num_build_tuples = _child_x->row_desc().tuple_descriptors().size(); +Status NestedLoopJoinBuildSinkOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(JoinBuildSinkOperatorX::open(state)); + int num_build_tuples = _child->row_desc().tuple_descriptors().size(); for (int i = 0; i < num_build_tuples; ++i) { - TupleDescriptor* build_tuple_desc = _child_x->row_desc().tuple_descriptors()[i]; + TupleDescriptor* build_tuple_desc = _child->row_desc().tuple_descriptors()[i]; auto tuple_idx = _row_descriptor.get_tuple_idx(build_tuple_desc->id()); RETURN_IF_INVALID_TUPLE_IDX(build_tuple_desc->id(), tuple_idx); } - RETURN_IF_ERROR(vectorized::VExpr::prepare(_filter_src_expr_ctxs, state, _child_x->row_desc())); - return Status::OK(); -} - -Status NestedLoopJoinBuildSinkOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(vectorized::VExpr::prepare(_filter_src_expr_ctxs, state, _child->row_desc())); return vectorized::VExpr::open(_filter_src_expr_ctxs, state); } diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index 4c2b3d442e920d..f2ca259754b661 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -68,7 +68,6 @@ class NestedLoopJoinBuildSinkOperatorX final Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; @@ -77,8 +76,8 @@ class NestedLoopJoinBuildSinkOperatorX final if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { return {ExchangeType::NOOP}; } - return _child_x->ignore_data_distribution() ? DataDistribution(ExchangeType::BROADCAST) - : DataDistribution(ExchangeType::NOOP); + return _child->ignore_data_distribution() ? DataDistribution(ExchangeType::BROADCAST) + : DataDistribution(ExchangeType::NOOP); } private: diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index 84112151e63432..9546ed8df56671 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -445,19 +445,13 @@ Status NestedLoopJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* return Status::OK(); } -Status NestedLoopJoinProbeOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(JoinProbeOperatorX::prepare(state)); +Status NestedLoopJoinProbeOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(JoinProbeOperatorX::open(state)); for (auto& conjunct : _join_conjuncts) { RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc)); } - RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc)); - _num_probe_side_columns = _child_x->row_desc().num_materialized_slots(); + _num_probe_side_columns = _child->row_desc().num_materialized_slots(); _num_build_side_columns = _build_side_child->row_desc().num_materialized_slots(); - return Status::OK(); -} - -Status NestedLoopJoinProbeOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(JoinProbeOperatorX::open(state)); return vectorized::VExpr::open(_join_conjuncts, state); } diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index 0ef4abcbc1b746..f46a99306a5713 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -187,7 +187,6 @@ class NestedLoopJoinProbeOperatorX final NestedLoopJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs); Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override; diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h b/be/src/pipeline/exec/olap_table_sink_operator.h index 5eafc2ea25fe2f..8a9ffaaf769c31 100644 --- a/be/src/pipeline/exec/olap_table_sink_operator.h +++ b/be/src/pipeline/exec/olap_table_sink_operator.h @@ -51,15 +51,12 @@ class OlapTableSinkOperatorX final : public DataSinkOperatorX::name_suffix() { } DataDistribution DataSinkOperatorXBase::required_data_distribution() const { - return _child_x && _child_x->ignore_data_distribution() + return _child && _child->ignore_data_distribution() ? DataDistribution(ExchangeType::PASSTHROUGH) : DataDistribution(ExchangeType::NOOP); } const RowDescriptor& OperatorBase::row_desc() const { - return _child_x->row_desc(); + return _child->row_desc(); } template @@ -198,7 +198,7 @@ Status OperatorXBase::init(const TPlanNode& tnode, RuntimeState* /*state*/) { return Status::OK(); } -Status OperatorXBase::prepare(RuntimeState* state) { +Status OperatorXBase::open(RuntimeState* state) { for (auto& conjunct : _conjuncts) { RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc())); } @@ -213,14 +213,6 @@ Status OperatorXBase::prepare(RuntimeState* state) { vectorized::VExpr::check_expr_output_type(_projections, *_output_row_descriptor)); } - if (_child_x && !is_source()) { - RETURN_IF_ERROR(_child_x->prepare(state)); - } - - return Status::OK(); -} - -Status OperatorXBase::open(RuntimeState* state) { for (auto& conjunct : _conjuncts) { RETURN_IF_ERROR(conjunct->open(state)); } @@ -228,15 +220,15 @@ Status OperatorXBase::open(RuntimeState* state) { for (auto& projections : _intermediate_projections) { RETURN_IF_ERROR(vectorized::VExpr::open(projections, state)); } - if (_child_x && !is_source()) { - RETURN_IF_ERROR(_child_x->open(state)); + if (_child && !is_source()) { + RETURN_IF_ERROR(_child->open(state)); } return Status::OK(); } Status OperatorXBase::close(RuntimeState* state) { - if (_child_x && !is_source()) { - RETURN_IF_ERROR(_child_x->close(state)); + if (_child && !is_source()) { + RETURN_IF_ERROR(_child->close(state)); } auto result = state->get_local_state_result(operator_id()); if (!result) { @@ -580,8 +572,7 @@ Status PipelineXSinkLocalState::close(RuntimeState* state, Status e template Status StreamingOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { - RETURN_IF_ERROR( - OperatorX::_child_x->get_block_after_projects(state, block, eos)); + RETURN_IF_ERROR(OperatorX::_child->get_block_after_projects(state, block, eos)); return pull(state, block, eos); } @@ -591,8 +582,8 @@ Status StatefulOperatorX::get_block(RuntimeState* state, vectori auto& local_state = get_local_state(state); if (need_more_input_data(state)) { local_state._child_block->clear_column_data( - OperatorX::_child_x->row_desc().num_materialized_slots()); - RETURN_IF_ERROR(OperatorX::_child_x->get_block_after_projects( + OperatorX::_child->row_desc().num_materialized_slots()); + RETURN_IF_ERROR(OperatorX::_child->get_block_after_projects( state, local_state._child_block.get(), &local_state._child_eos)); *eos = local_state._child_eos; if (local_state._child_block->rows() == 0 && !local_state._child_eos) { @@ -676,66 +667,66 @@ Status AsyncWriterSink::close(RuntimeState* state, Status exec_s return Base::close(state, exec_status); } -#define DECLARE_OPERATOR_X(LOCAL_STATE) template class DataSinkOperatorX; -DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState) -DECLARE_OPERATOR_X(ResultSinkLocalState) -DECLARE_OPERATOR_X(JdbcTableSinkLocalState) -DECLARE_OPERATOR_X(MemoryScratchSinkLocalState) -DECLARE_OPERATOR_X(ResultFileSinkLocalState) -DECLARE_OPERATOR_X(OlapTableSinkLocalState) -DECLARE_OPERATOR_X(OlapTableSinkV2LocalState) -DECLARE_OPERATOR_X(HiveTableSinkLocalState) -DECLARE_OPERATOR_X(IcebergTableSinkLocalState) -DECLARE_OPERATOR_X(AnalyticSinkLocalState) -DECLARE_OPERATOR_X(SortSinkLocalState) -DECLARE_OPERATOR_X(SpillSortSinkLocalState) -DECLARE_OPERATOR_X(LocalExchangeSinkLocalState) -DECLARE_OPERATOR_X(AggSinkLocalState) -DECLARE_OPERATOR_X(PartitionedAggSinkLocalState) -DECLARE_OPERATOR_X(ExchangeSinkLocalState) -DECLARE_OPERATOR_X(NestedLoopJoinBuildSinkLocalState) -DECLARE_OPERATOR_X(UnionSinkLocalState) -DECLARE_OPERATOR_X(MultiCastDataStreamSinkLocalState) -DECLARE_OPERATOR_X(PartitionSortSinkLocalState) -DECLARE_OPERATOR_X(SetProbeSinkLocalState) -DECLARE_OPERATOR_X(SetProbeSinkLocalState) -DECLARE_OPERATOR_X(SetSinkLocalState) -DECLARE_OPERATOR_X(SetSinkLocalState) -DECLARE_OPERATOR_X(PartitionedHashJoinSinkLocalState) -DECLARE_OPERATOR_X(GroupCommitBlockSinkLocalState) - -#undef DECLARE_OPERATOR_X - -#define DECLARE_OPERATOR_X(LOCAL_STATE) template class OperatorX; -DECLARE_OPERATOR_X(HashJoinProbeLocalState) -DECLARE_OPERATOR_X(OlapScanLocalState) -DECLARE_OPERATOR_X(GroupCommitLocalState) -DECLARE_OPERATOR_X(JDBCScanLocalState) -DECLARE_OPERATOR_X(FileScanLocalState) -DECLARE_OPERATOR_X(EsScanLocalState) -DECLARE_OPERATOR_X(AnalyticLocalState) -DECLARE_OPERATOR_X(SortLocalState) -DECLARE_OPERATOR_X(SpillSortLocalState) -DECLARE_OPERATOR_X(AggLocalState) -DECLARE_OPERATOR_X(PartitionedAggLocalState) -DECLARE_OPERATOR_X(TableFunctionLocalState) -DECLARE_OPERATOR_X(ExchangeLocalState) -DECLARE_OPERATOR_X(RepeatLocalState) -DECLARE_OPERATOR_X(NestedLoopJoinProbeLocalState) -DECLARE_OPERATOR_X(AssertNumRowsLocalState) -DECLARE_OPERATOR_X(EmptySetLocalState) -DECLARE_OPERATOR_X(UnionSourceLocalState) -DECLARE_OPERATOR_X(MultiCastDataStreamSourceLocalState) -DECLARE_OPERATOR_X(PartitionSortSourceLocalState) -DECLARE_OPERATOR_X(SetSourceLocalState) -DECLARE_OPERATOR_X(SetSourceLocalState) -DECLARE_OPERATOR_X(DataGenLocalState) -DECLARE_OPERATOR_X(SchemaScanLocalState) -DECLARE_OPERATOR_X(MetaScanLocalState) -DECLARE_OPERATOR_X(LocalExchangeSourceLocalState) -DECLARE_OPERATOR_X(PartitionedHashJoinProbeLocalState) - -#undef DECLARE_OPERATOR_X +#define DECLARE_OPERATOR(LOCAL_STATE) template class DataSinkOperatorX; +DECLARE_OPERATOR(HashJoinBuildSinkLocalState) +DECLARE_OPERATOR(ResultSinkLocalState) +DECLARE_OPERATOR(JdbcTableSinkLocalState) +DECLARE_OPERATOR(MemoryScratchSinkLocalState) +DECLARE_OPERATOR(ResultFileSinkLocalState) +DECLARE_OPERATOR(OlapTableSinkLocalState) +DECLARE_OPERATOR(OlapTableSinkV2LocalState) +DECLARE_OPERATOR(HiveTableSinkLocalState) +DECLARE_OPERATOR(IcebergTableSinkLocalState) +DECLARE_OPERATOR(AnalyticSinkLocalState) +DECLARE_OPERATOR(SortSinkLocalState) +DECLARE_OPERATOR(SpillSortSinkLocalState) +DECLARE_OPERATOR(LocalExchangeSinkLocalState) +DECLARE_OPERATOR(AggSinkLocalState) +DECLARE_OPERATOR(PartitionedAggSinkLocalState) +DECLARE_OPERATOR(ExchangeSinkLocalState) +DECLARE_OPERATOR(NestedLoopJoinBuildSinkLocalState) +DECLARE_OPERATOR(UnionSinkLocalState) +DECLARE_OPERATOR(MultiCastDataStreamSinkLocalState) +DECLARE_OPERATOR(PartitionSortSinkLocalState) +DECLARE_OPERATOR(SetProbeSinkLocalState) +DECLARE_OPERATOR(SetProbeSinkLocalState) +DECLARE_OPERATOR(SetSinkLocalState) +DECLARE_OPERATOR(SetSinkLocalState) +DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState) +DECLARE_OPERATOR(GroupCommitBlockSinkLocalState) + +#undef DECLARE_OPERATOR + +#define DECLARE_OPERATOR(LOCAL_STATE) template class OperatorX; +DECLARE_OPERATOR(HashJoinProbeLocalState) +DECLARE_OPERATOR(OlapScanLocalState) +DECLARE_OPERATOR(GroupCommitLocalState) +DECLARE_OPERATOR(JDBCScanLocalState) +DECLARE_OPERATOR(FileScanLocalState) +DECLARE_OPERATOR(EsScanLocalState) +DECLARE_OPERATOR(AnalyticLocalState) +DECLARE_OPERATOR(SortLocalState) +DECLARE_OPERATOR(SpillSortLocalState) +DECLARE_OPERATOR(AggLocalState) +DECLARE_OPERATOR(PartitionedAggLocalState) +DECLARE_OPERATOR(TableFunctionLocalState) +DECLARE_OPERATOR(ExchangeLocalState) +DECLARE_OPERATOR(RepeatLocalState) +DECLARE_OPERATOR(NestedLoopJoinProbeLocalState) +DECLARE_OPERATOR(AssertNumRowsLocalState) +DECLARE_OPERATOR(EmptySetLocalState) +DECLARE_OPERATOR(UnionSourceLocalState) +DECLARE_OPERATOR(MultiCastDataStreamSourceLocalState) +DECLARE_OPERATOR(PartitionSortSourceLocalState) +DECLARE_OPERATOR(SetSourceLocalState) +DECLARE_OPERATOR(SetSourceLocalState) +DECLARE_OPERATOR(DataGenLocalState) +DECLARE_OPERATOR(SchemaScanLocalState) +DECLARE_OPERATOR(MetaScanLocalState) +DECLARE_OPERATOR(LocalExchangeSourceLocalState) +DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState) + +#undef DECLARE_OPERATOR template class StreamingOperatorX; template class StreamingOperatorX; diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 71f23caec9ddf4..0d0b0c05b366fc 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -39,7 +39,6 @@ #include "vec/runtime/vdata_stream_recvr.h" namespace doris { -class DataSink; class RowDescriptor; class RuntimeState; class TDataSink; @@ -54,13 +53,10 @@ class OperatorBase; class OperatorXBase; class DataSinkOperatorXBase; -using OperatorPtr = std::shared_ptr; +using OperatorPtr = std::shared_ptr; using Operators = std::vector; -using OperatorXPtr = std::shared_ptr; -using OperatorXs = std::vector; - -using DataSinkOperatorXPtr = std::shared_ptr; +using DataSinkOperatorPtr = std::shared_ptr; // This struct is used only for initializing local state. struct LocalStateInfo { @@ -85,7 +81,7 @@ struct LocalSinkStateInfo { class OperatorBase { public: - explicit OperatorBase() : _child_x(nullptr), _is_closed(false) {} + explicit OperatorBase() : _child(nullptr), _is_closed(false) {} virtual ~OperatorBase() = default; virtual bool is_sink() const { return false; } @@ -96,14 +92,12 @@ class OperatorBase { [[nodiscard]] virtual Status init(const TDataSink& tsink) { return Status::OK(); } - // Prepare for running. (e.g. resource allocation, etc.) - [[nodiscard]] virtual Status prepare(RuntimeState* state) = 0; [[nodiscard]] virtual std::string get_name() const = 0; [[nodiscard]] virtual Status open(RuntimeState* state) = 0; [[nodiscard]] virtual Status close(RuntimeState* state); - [[nodiscard]] virtual Status set_child(OperatorXPtr child) { - _child_x = std::move(child); + [[nodiscard]] virtual Status set_child(OperatorPtr child) { + _child = std::move(child); return Status::OK(); } @@ -113,18 +107,21 @@ class OperatorBase { virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); } [[nodiscard]] virtual bool require_data_distribution() const { return false; } - OperatorXPtr child_x() { return _child_x; } - [[nodiscard]] bool followed_by_shuffled_join() const { return _followed_by_shuffled_join; } - void set_followed_by_shuffled_join(bool followed_by_shuffled_join) { - _followed_by_shuffled_join = followed_by_shuffled_join; + OperatorPtr child() { return _child; } + [[nodiscard]] bool followed_by_shuffled_operator() const { + return _followed_by_shuffled_operator; + } + void set_followed_by_shuffled_operator(bool followed_by_shuffled_operator) { + _followed_by_shuffled_operator = followed_by_shuffled_operator; } + [[nodiscard]] virtual bool is_shuffled_operator() const { return false; } [[nodiscard]] virtual bool require_shuffled_data_distribution() const { return false; } protected: - OperatorXPtr _child_x = nullptr; + OperatorPtr _child = nullptr; bool _is_closed; - bool _followed_by_shuffled_join = false; + bool _followed_by_shuffled_operator = false; }; class PipelineXLocalStateBase { @@ -450,7 +447,6 @@ class DataSinkOperatorXBase : public OperatorBase { return Status::InternalError("init() is only implemented in local exchange!"); } - Status prepare(RuntimeState* state) override { return Status::OK(); } Status open(RuntimeState* state) override { return Status::OK(); } [[nodiscard]] bool is_finished(RuntimeState* state) const { auto result = state->get_sink_local_state_result(); @@ -483,8 +479,6 @@ class DataSinkOperatorXBase : public OperatorBase { [[nodiscard]] virtual std::shared_ptr create_shared_state() const = 0; [[nodiscard]] virtual DataDistribution required_data_distribution() const; - [[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; } - Status close(RuntimeState* state) override { return Status::InternalError("Should not reach here!"); } @@ -651,28 +645,24 @@ class OperatorXBase : public OperatorBase { } [[nodiscard]] std::string get_name() const override { return _op_name; } [[nodiscard]] virtual DataDistribution required_data_distribution() const { - return _child_x && _child_x->ignore_data_distribution() && !is_source() + return _child && _child->ignore_data_distribution() && !is_source() ? DataDistribution(ExchangeType::PASSTHROUGH) : DataDistribution(ExchangeType::NOOP); } [[nodiscard]] virtual bool ignore_data_distribution() const { - return _child_x ? _child_x->ignore_data_distribution() : _ignore_data_distribution; + return _child ? _child->ignore_data_distribution() : _ignore_data_distribution; } [[nodiscard]] bool ignore_data_hash_distribution() const { - return _child_x ? _child_x->ignore_data_hash_distribution() : _ignore_data_distribution; + return _child ? _child->ignore_data_hash_distribution() : _ignore_data_distribution; } [[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const { return true; } void set_ignore_data_distribution() { _ignore_data_distribution = true; } - Status prepare(RuntimeState* state) override; - Status open(RuntimeState* state) override; [[nodiscard]] virtual Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) = 0; - [[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; } - Status close(RuntimeState* state) override; [[nodiscard]] virtual const RowDescriptor& intermediate_row_desc() const { @@ -716,7 +706,7 @@ class OperatorXBase : public OperatorBase { return reinterpret_cast(*this); } - [[nodiscard]] OperatorXPtr get_child() { return _child_x; } + [[nodiscard]] OperatorPtr get_child() { return _child; } [[nodiscard]] vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; } [[nodiscard]] virtual RowDescriptor& row_descriptor() { return _row_descriptor; } diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index c12653914048db..fbabdbdc8f85fe 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -124,7 +124,7 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo ADD_COUNTER(_profile, "SortedPartitionInputRows", TUnit::UNIT); _partition_sort_info = std::make_shared( &_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, p._nulls_first, - p._child_x->row_desc(), state, _profile, p._has_global_limit, p._partition_inner_limit, + p._child->row_desc(), state, _profile, p._has_global_limit, p._partition_inner_limit, p._top_n_algorithm, p._topn_phase); _profile->add_info_string("PartitionTopNPhase", to_string(p._topn_phase)); _profile->add_info_string("PartitionTopNLimit", std::to_string(p._partition_inner_limit)); @@ -163,13 +163,10 @@ Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st return Status::OK(); } -Status PartitionSortSinkOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child_x->row_desc(), _row_descriptor)); - RETURN_IF_ERROR(vectorized::VExpr::prepare(_partition_expr_ctxs, state, _child_x->row_desc())); - return Status::OK(); -} - Status PartitionSortSinkOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX::open(state)); + RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), _row_descriptor)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_partition_expr_ctxs, state, _child->row_desc())); RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); RETURN_IF_ERROR(vectorized::VExpr::open(_partition_expr_ctxs, state)); return Status::OK(); @@ -187,7 +184,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._value_places.push_back(_pool->add(new PartitionBlocks( local_state._partition_sort_info, local_state._value_places.empty()))); } - local_state._value_places[0]->append_whole_block(input_block, _child_x->row_desc()); + local_state._value_places[0]->append_whole_block(input_block, _child->row_desc()); } else { if (local_state._is_need_passthrough) { { diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index 0597d50c0b3936..f16df509dca4a0 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -265,7 +265,6 @@ class PartitionSortSinkOperatorX final : public DataSinkOperatorXset_dests_id(DataSinkOperatorX::dests_id()); - RETURN_IF_ERROR(_agg_sink_operator->set_child( - DataSinkOperatorX::_child_x)); + RETURN_IF_ERROR( + _agg_sink_operator->set_child(DataSinkOperatorX::_child)); return _agg_sink_operator->init(tnode, state); } -Status PartitionedAggSinkOperatorX::prepare(RuntimeState* state) { - return _agg_sink_operator->prepare(state); -} - Status PartitionedAggSinkOperatorX::open(RuntimeState* state) { return _agg_sink_operator->open(state); } diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 9282df073cbd58..259d7580877493 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -299,8 +299,6 @@ class PartitionedAggSinkOperatorX : public DataSinkOperatorXrequire_shuffled_data_distribution(); } - Status set_child(OperatorXPtr child) override { + Status set_child(OperatorPtr child) override { RETURN_IF_ERROR(DataSinkOperatorX::set_child(child)); return _agg_sink_operator->set_child(child); } diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 5e030e7ab49d10..48df5587198b08 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -108,11 +108,6 @@ Status PartitionedAggSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* return _agg_source_operator->init(tnode, state); } -Status PartitionedAggSourceOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(OperatorXBase::prepare(state)); - return _agg_source_operator->prepare(state); -} - Status PartitionedAggSourceOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(OperatorXBase::open(state)); return _agg_source_operator->open(state); diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h index 994290a15bb840..edae99c716a925 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h @@ -82,7 +82,6 @@ class PartitionedAggSourceOperatorX : public OperatorX ~PartitionedAggSourceOperatorX() override = default; Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 6dc1616e0eb689..018d63a6deebb1 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -523,26 +523,17 @@ Status PartitionedHashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeSt return Status::OK(); } -Status PartitionedHashJoinProbeOperatorX::prepare(RuntimeState* state) { - // to avoid prepare _child_x twice - auto child_x = std::move(_child_x); - RETURN_IF_ERROR(JoinProbeOperatorX::prepare(state)); - RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc)); - RETURN_IF_ERROR(_inner_probe_operator->set_child(child_x)); - DCHECK(_build_side_child != nullptr); - _inner_probe_operator->set_build_side_child(_build_side_child); - RETURN_IF_ERROR(_inner_probe_operator->prepare(state)); - _child_x = std::move(child_x); - RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc())); - return Status::OK(); -} Status PartitionedHashJoinProbeOperatorX::open(RuntimeState* state) { - // to avoid open _child_x twice - auto child_x = std::move(_child_x); + // to avoid open _child twice + auto child = std::move(_child); RETURN_IF_ERROR(JoinProbeOperatorX::open(state)); + RETURN_IF_ERROR(_inner_probe_operator->set_child(child)); + DCHECK(_build_side_child != nullptr); + _inner_probe_operator->set_build_side_child(_build_side_child); RETURN_IF_ERROR(_inner_probe_operator->open(state)); - _child_x = std::move(child_x); + _child = std::move(child); + RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc())); RETURN_IF_ERROR(_partitioner->open(state)); return Status::OK(); } @@ -829,8 +820,8 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori return _revoke_memory(state); } - RETURN_IF_ERROR(_child_x->get_block_after_projects(state, local_state._child_block.get(), - &local_state._child_eos)); + RETURN_IF_ERROR(_child->get_block_after_projects(state, local_state._child_block.get(), + &local_state._child_eos)); if (need_to_spill && local_state._child_eos) { RETURN_IF_ERROR(local_state.finish_spilling(0)); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index a63ddb3e69d784..3aab11f62d883e 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -143,7 +143,6 @@ class PartitionedHashJoinProbeOperatorX final PartitionedHashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs, uint32_t partition_count); Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; [[nodiscard]] Status get_block(RuntimeState* state, vectorized::Block* block, @@ -169,7 +168,7 @@ class PartitionedHashJoinProbeOperatorX final bool require_shuffled_data_distribution() const override { return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; } - bool is_shuffled_hash_join() const override { + bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index fc17ef41be62c8..a7297be493f804 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -102,7 +102,7 @@ size_t PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* state) { auto& p = _parent->cast(); _shared_state->inner_shared_state->hash_table_variants.reset(); - auto row_desc = p._child_x->row_desc(); + auto row_desc = p._child->row_desc(); const auto num_slots = row_desc.num_slots(); vectorized::Block build_block; auto inner_sink_state_ = _shared_state->inner_runtime_state->get_sink_local_state(); @@ -424,13 +424,10 @@ Status PartitionedHashJoinSinkOperatorX::init(const TPlanNode& tnode, RuntimeSta return Status::OK(); } -Status PartitionedHashJoinSinkOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(_inner_sink_operator->set_child(_child_x)); - RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc())); - return _inner_sink_operator->prepare(state); -} - Status PartitionedHashJoinSinkOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(JoinBuildSinkOperatorX::open(state)); + RETURN_IF_ERROR(_inner_sink_operator->set_child(_child)); + RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc())); RETURN_IF_ERROR(_partitioner->open(state)); return _inner_sink_operator->open(state); } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 252c53be12d057..c768d7518b95c9 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -92,7 +92,6 @@ class PartitionedHashJoinSinkOperatorX Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; @@ -119,7 +118,7 @@ class PartitionedHashJoinSinkOperatorX bool require_shuffled_data_distribution() const override { return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; } - bool is_shuffled_hash_join() const override { + bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index fe45e85c52d485..d355d99c2e352f 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -52,24 +52,17 @@ Status RepeatOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { return Status::OK(); } -Status RepeatOperatorX::prepare(RuntimeState* state) { - VLOG_CRITICAL << "VRepeatNode::prepare"; - RETURN_IF_ERROR(OperatorXBase::prepare(state)); +Status RepeatOperatorX::open(RuntimeState* state) { + VLOG_CRITICAL << "VRepeatNode::open"; + RETURN_IF_ERROR(OperatorXBase::open(state)); _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); if (_output_tuple_desc == nullptr) { return Status::InternalError("Failed to get tuple descriptor."); } - RETURN_IF_ERROR(vectorized::VExpr::prepare(_expr_ctxs, state, _child_x->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_expr_ctxs, state, _child->row_desc())); for (const auto& slot_desc : _output_tuple_desc->slots()) { _output_slots.push_back(slot_desc); } - - return Status::OK(); -} - -Status RepeatOperatorX::open(RuntimeState* state) { - VLOG_CRITICAL << "VRepeatNode::open"; - RETURN_IF_ERROR(OperatorXBase::open(state)); RETURN_IF_ERROR(vectorized::VExpr::open(_expr_ctxs, state)); return Status::OK(); } @@ -218,7 +211,7 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp int size = _repeat_id_list.size(); if (_repeat_id_idx >= size) { _intermediate_block->clear(); - _child_block.clear_column_data(_child_x->row_desc().num_materialized_slots()); + _child_block.clear_column_data(_child->row_desc().num_materialized_slots()); _repeat_id_idx = 0; } } else if (local_state._expr_ctxs.empty()) { @@ -232,7 +225,7 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp RETURN_IF_ERROR( local_state.add_grouping_id_column(rows, cur_col, columns, repeat_id_idx)); } - _child_block.clear_column_data(_child_x->row_desc().num_materialized_slots()); + _child_block.clear_column_data(_child->row_desc().num_materialized_slots()); } RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block, output_block->columns())); diff --git a/be/src/pipeline/exec/repeat_operator.h b/be/src/pipeline/exec/repeat_operator.h index a96f8074a06a9f..22398df372ae65 100644 --- a/be/src/pipeline/exec/repeat_operator.h +++ b/be/src/pipeline/exec/repeat_operator.h @@ -62,7 +62,6 @@ class RepeatOperatorX final : public StatefulOperatorX { const DescriptorTbl& descs); Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; bool need_more_input_data(RuntimeState* state) const override; diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 728bcd0917b541..4d842db5d2346a 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -72,13 +72,9 @@ Status ResultFileSinkOperatorX::init(const TDataSink& tsink) { return Status::OK(); } -Status ResultFileSinkOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(DataSinkOperatorX::prepare(state)); - return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc); -} - Status ResultFileSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::open(state)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); return vectorized::VExpr::open(_output_vexpr_ctxs, state); } @@ -201,7 +197,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) Status status; for (auto* channel : _channels) { if (!channel->is_receiver_eof()) { - status = channel->send_local_block(_output_block.get()); + status = channel->send_local_block(_output_block.get(), false); HANDLE_CHANNEL_STATUS(state, channel, status); } } @@ -225,7 +221,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) for (auto* channel : _channels) { if (!channel->is_receiver_eof()) { if (channel->is_local()) { - status = channel->send_local_block(&cur_block); + status = channel->send_local_block(&cur_block, false); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); status = channel->send_broadcast_block(_block_holder, true); diff --git a/be/src/pipeline/exec/result_file_sink_operator.h b/be/src/pipeline/exec/result_file_sink_operator.h index 7623dae7fea09a..86b6035c134ba9 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.h +++ b/be/src/pipeline/exec/result_file_sink_operator.h @@ -85,7 +85,6 @@ class ResultFileSinkOperatorX final : public DataSinkOperatorX& t_output_expr, DescriptorTbl& descs); Status init(const TDataSink& thrift_sink) override; - Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 2a32e24bf6865d..0608beaf522290 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -118,7 +118,8 @@ ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, const RowDescriptor& r _name = "ResultSink"; } -Status ResultSinkOperatorX::prepare(RuntimeState* state) { +Status ResultSinkOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX::open(state)); // prepare output_expr // From the thrift expressions create the real exprs. RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs)); @@ -136,10 +137,6 @@ Status ResultSinkOperatorX::prepare(RuntimeState* state) { state->query_id(), _result_sink_buffer_size_rows, &_sender, state->execution_timeout(), state->batch_size())); } - return Status::OK(); -} - -Status ResultSinkOperatorX::open(RuntimeState* state) { return vectorized::VExpr::open(_output_vexpr_ctxs, state); } diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index 33e32e93633453..3c503096ecb51e 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -146,7 +146,6 @@ class ResultSinkOperatorX final : public DataSinkOperatorX public: ResultSinkOperatorX(int operator_id, const RowDescriptor& row_desc, const std::vector& select_exprs, const TResultSink& sink); - Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index e7cce02cb83157..28dbd01280f3c8 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -355,7 +355,6 @@ template class ScanOperatorX : public OperatorX { public: Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status prepare(RuntimeState* state) override { return OperatorXBase::prepare(state); } Status open(RuntimeState* state) override; Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; Status get_block_after_projects(RuntimeState* state, vectorized::Block* block, diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp index 8ff0a570e3393e..006ecf8ad82e84 100644 --- a/be/src/pipeline/exec/schema_scan_operator.cpp +++ b/be/src/pipeline/exec/schema_scan_operator.cpp @@ -137,20 +137,6 @@ Status SchemaScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { Status SchemaScanOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(Base::open(state)); - if (_common_scanner_param->user) { - TSetSessionParams param; - param.__set_user(*_common_scanner_param->user); - //TStatus t_status; - //RETURN_IF_ERROR(SchemaJniHelper::set_session(param, &t_status)); - //RETURN_IF_ERROR(Status(t_status)); - } - - return Status::OK(); -} - -Status SchemaScanOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(Base::prepare(state)); - // get dest tuple desc _dest_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); @@ -205,6 +191,14 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) { _tuple_idx = 0; + if (_common_scanner_param->user) { + TSetSessionParams param; + param.__set_user(*_common_scanner_param->user); + //TStatus t_status; + //RETURN_IF_ERROR(SchemaJniHelper::set_session(param, &t_status)); + //RETURN_IF_ERROR(Status(t_status)); + } + return Status::OK(); } @@ -272,6 +266,9 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl } while (block->rows() == 0 && !*eos); local_state.reached_limit(block, eos); + if (*eos) { + local_state._finish_dependency->set_always_ready(); + } return Status::OK(); } diff --git a/be/src/pipeline/exec/schema_scan_operator.h b/be/src/pipeline/exec/schema_scan_operator.h index aa2bff7e6440a2..03cf422fbc52e6 100644 --- a/be/src/pipeline/exec/schema_scan_operator.h +++ b/be/src/pipeline/exec/schema_scan_operator.h @@ -69,7 +69,6 @@ class SchemaScanOperatorX final : public OperatorX { ~SchemaScanOperatorX() override = default; Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp index 81c0cd463c1284..3ee79629e1a788 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.cpp +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -54,15 +54,10 @@ Status SetProbeSinkOperatorX::init(const TPlanNode& tnode, Runtime return Status::OK(); } -template -Status SetProbeSinkOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(DataSinkOperatorX>::prepare(state)); - return vectorized::VExpr::prepare(_child_exprs, state, _child_x->row_desc()); -} - template Status SetProbeSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX>::open(state)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_exprs, state, _child->row_desc())); return vectorized::VExpr::open(_child_exprs, state); } diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index f21d58425814c8..ab53f5358c2a91 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -88,8 +88,6 @@ class SetProbeSinkOperatorX final : public DataSinkOperatorX _partition_exprs; - using OperatorBase::_child_x; + using OperatorBase::_child; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 6c76f9a57a3ee2..01c26e4d005c65 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -211,15 +211,10 @@ Status SetSinkOperatorX::init(const TPlanNode& tnode, RuntimeState return Status::OK(); } -template -Status SetSinkOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(Base::prepare(state)); - return vectorized::VExpr::prepare(_child_exprs, state, _child_x->row_desc()); -} - template Status SetSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(Base::open(state)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_exprs, state, _child->row_desc())); return vectorized::VExpr::open(_child_exprs, state); } diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index ac0757e4467ac0..1c08eddc141f2e 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -86,8 +86,6 @@ class SetSinkOperatorX final : public DataSinkOperatorX _partition_exprs; - using OperatorBase::_child_x; + using OperatorBase::_child; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index 5a29b0bfe614fb..b07942b9ab1c05 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -46,19 +46,19 @@ Status SortSinkLocalState::open(RuntimeState* state) { case TSortAlgorithm::HEAP_SORT: { _shared_state->sorter = vectorized::HeapSorter::create_unique( _vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first, - p._child_x->row_desc()); + p._child->row_desc()); break; } case TSortAlgorithm::TOPN_SORT: { _shared_state->sorter = vectorized::TopNSorter::create_unique( _vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first, - p._child_x->row_desc(), state, _profile); + p._child->row_desc(), state, _profile); break; } case TSortAlgorithm::FULL_SORT: { _shared_state->sorter = vectorized::FullSorter::create_unique( _vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first, - p._child_x->row_desc(), state, _profile); + p._child->row_desc(), state, _profile); break; } default: { @@ -106,11 +106,9 @@ Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { return Status::OK(); } -Status SortSinkOperatorX::prepare(RuntimeState* state) { - return _vsort_exec_exprs.prepare(state, _child_x->row_desc(), _row_descriptor); -} - Status SortSinkOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX::open(state)); + RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), _row_descriptor)); return _vsort_exec_exprs.open(state); } diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 3188bfe3990084..8462472dd02671 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -59,12 +59,11 @@ class SortSinkOperatorX final : public DataSinkOperatorX { Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_is_analytic_sort) { - return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } else if (_merge_by_exchange) { diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp index fa891196151345..02a99e183c852e 100644 --- a/be/src/pipeline/exec/sort_source_operator.cpp +++ b/be/src/pipeline/exec/sort_source_operator.cpp @@ -40,19 +40,11 @@ Status SortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { return Status::OK(); } -Status SortSourceOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(Base::prepare(state)); - // spill sort _child_x may be nullptr. - if (_child_x) { - RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child_x->row_desc(), _row_descriptor)); - } - return Status::OK(); -} - Status SortSourceOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(Base::open(state)); - // spill sort _child_x may be nullptr. - if (_child_x) { + // spill sort _child may be nullptr. + if (_child) { + RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), _row_descriptor)); RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); } return Status::OK(); diff --git a/be/src/pipeline/exec/sort_source_operator.h b/be/src/pipeline/exec/sort_source_operator.h index 86832e04ae0a7c..20714eb44e5e60 100644 --- a/be/src/pipeline/exec/sort_source_operator.h +++ b/be/src/pipeline/exec/sort_source_operator.h @@ -48,7 +48,6 @@ class SortSourceOperatorX final : public OperatorX { Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; bool is_source() const override { return true; } diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 94196a0354e5cf..4bf1ab04efb628 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -23,6 +23,7 @@ #include "vec/spill/spill_stream_manager.h" namespace doris::pipeline { + SpillSortSinkLocalState::SpillSortSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) { _finish_dependency = std::make_shared(parent->operator_id(), parent->node_id(), @@ -76,6 +77,7 @@ Status SpillSortSinkLocalState::close(RuntimeState* state, Status execsink_statu dec_running_big_mem_op_num(state); return Status::OK(); } + Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) { _runtime_state = RuntimeState::create_unique( nullptr, state->fragment_instance_id(), state->query_id(), state->fragment_id(), @@ -118,23 +120,20 @@ Status SpillSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) _name = "SPILL_SORT_SINK_OPERATOR"; _sort_sink_operator->set_dests_id(DataSinkOperatorX::dests_id()); - RETURN_IF_ERROR(_sort_sink_operator->set_child(DataSinkOperatorX::_child_x)); + RETURN_IF_ERROR(_sort_sink_operator->set_child(DataSinkOperatorX::_child)); return _sort_sink_operator->init(tnode, state); } -Status SpillSortSinkOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(DataSinkOperatorX::prepare(state)); - RETURN_IF_ERROR(_sort_sink_operator->prepare(state)); - return Status::OK(); -} Status SpillSortSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::open(state)); return _sort_sink_operator->open(state); } + Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* state) { auto& local_state = get_local_state(state); return local_state.revoke_memory(state); } + size_t SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const { auto& local_state = get_local_state(state); if (!local_state.Base::_shared_state->sink_status.ok()) { @@ -142,6 +141,7 @@ size_t SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const { } return _sort_sink_operator->get_revocable_mem_size(local_state._runtime_state.get()); } + Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) { auto& local_state = get_local_state(state); @@ -175,6 +175,7 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc } return Status::OK(); } + Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { if (!_shared_state->is_spilled) { _shared_state->is_spilled = true; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index c5b70d6fcea7f5..e74b5d2a41401a 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -71,7 +71,6 @@ class SpillSortSinkOperatorX final : public DataSinkOperatorXrequire_data_distribution(); } - Status set_child(OperatorXPtr child) override { + Status set_child(OperatorPtr child) override { RETURN_IF_ERROR(DataSinkOperatorX::set_child(child)); return _sort_sink_operator->set_child(child); } diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index 967e13d1fa527b..e766cb27168de1 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -245,11 +245,6 @@ Status SpillSortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* stat return _sort_source_operator->init(tnode, state); } -Status SpillSortSourceOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(OperatorXBase::prepare(state)); - return _sort_source_operator->prepare(state); -} - Status SpillSortSourceOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(OperatorXBase::open(state)); return _sort_source_operator->open(state); diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h b/be/src/pipeline/exec/spill_sort_source_operator.h index 09367415d91724..66d05e739d8c02 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.h +++ b/be/src/pipeline/exec/spill_sort_source_operator.h @@ -78,8 +78,6 @@ class SpillSortSourceOperatorX : public OperatorX { ~SpillSortSourceOperatorX() override = default; Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status prepare(RuntimeState* state) override; - Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index 8aa1eb8df97b22..dfbe42c637ea56 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -1177,12 +1177,12 @@ Status StreamingAggOperatorX::init(const TPlanNode& tnode, RuntimeState* state) return Status::OK(); } -Status StreamingAggOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(StatefulOperatorX::prepare(state)); +Status StreamingAggOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(StatefulOperatorX::open(state)); _intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id); _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size()); - RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child_x->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child->row_desc())); int j = _probe_expr_ctxs.size(); for (int i = 0; i < j; ++i) { @@ -1197,7 +1197,7 @@ Status StreamingAggOperatorX::prepare(RuntimeState* state) { SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j]; SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j]; RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare( - state, _child_x->row_desc(), intermediate_slot_desc, output_slot_desc)); + state, _child->row_desc(), intermediate_slot_desc, output_slot_desc)); _aggregate_evaluators[i]->set_version(state->be_exec_version()); } @@ -1231,11 +1231,6 @@ Status StreamingAggOperatorX::prepare(RuntimeState* state) { RETURN_IF_ERROR(vectorized::AggFnEvaluator::check_agg_fn_output( _probe_expr_ctxs.size(), _aggregate_evaluators, _agg_fn_output_row_descriptor)); } - return Status::OK(); -} - -Status StreamingAggOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(StatefulOperatorX::open(state)); RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state)); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { @@ -1300,7 +1295,7 @@ Status StreamingAggOperatorX::push(RuntimeState* state, vectorized::Block* in_bl if (in_block->rows() > 0) { RETURN_IF_ERROR(local_state.do_pre_agg(in_block, local_state._pre_aggregated_block.get())); } - in_block->clear_column_data(_child_x->row_desc().num_materialized_slots()); + in_block->clear_column_data(_child->row_desc().num_materialized_slots()); return Status::OK(); } diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h b/be/src/pipeline/exec/streaming_aggregation_operator.h index bf8564359827d4..c37fa5cbd881ca 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_operator.h @@ -215,7 +215,6 @@ class StreamingAggOperatorX final : public StatefulOperatorXclear_column_data(_parent->cast() - ._child_x->row_desc() + ._child->row_desc() .num_materialized_slots()); _cur_child_offset = -1; return; @@ -270,9 +270,8 @@ Status TableFunctionOperatorX::init(const TPlanNode& tnode, RuntimeState* state) return Status::OK(); } -Status TableFunctionOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(Base::prepare(state)); - +Status TableFunctionOperatorX::open(doris::RuntimeState* state) { + RETURN_IF_ERROR(Base::open(state)); for (auto* fn : _fns) { RETURN_IF_ERROR(fn->prepare()); } @@ -286,7 +285,7 @@ Status TableFunctionOperatorX::prepare(RuntimeState* state) { } // get all input slots - for (const auto& child_tuple_desc : _child_x->row_desc().tuple_descriptors()) { + for (const auto& child_tuple_desc : _child->row_desc().tuple_descriptors()) { for (const auto& child_slot_desc : child_tuple_desc->slots()) { _child_slots.push_back(child_slot_desc); } @@ -300,11 +299,6 @@ Status TableFunctionOperatorX::prepare(RuntimeState* state) { } } - return Status::OK(); -} - -Status TableFunctionOperatorX::open(doris::RuntimeState* state) { - RETURN_IF_ERROR(Base::open(state)); return vectorized::VExpr::open(_vfn_ctxs, state); } diff --git a/be/src/pipeline/exec/table_function_operator.h b/be/src/pipeline/exec/table_function_operator.h index 59222de8da9062..75b1608fad7112 100644 --- a/be/src/pipeline/exec/table_function_operator.h +++ b/be/src/pipeline/exec/table_function_operator.h @@ -75,7 +75,6 @@ class TableFunctionOperatorX final : public StatefulOperatorXrow_desc())); - RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, _row_descriptor)); - return Status::OK(); -} - Status UnionSinkOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX::open(state)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, _row_descriptor)); // open const expr lists. RETURN_IF_ERROR(vectorized::VExpr::open(_const_expr, state)); diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index d9fa5a321e1a18..f939950143ae92 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -72,7 +72,6 @@ class UnionSinkOperatorX final : public DataSinkOperatorX { Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; @@ -90,6 +89,12 @@ class UnionSinkOperatorX final : public DataSinkOperatorX { } } + bool require_shuffled_data_distribution() const override { + return _followed_by_shuffled_operator; + } + + bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; } + private: int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; } diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 88ea2e5bee59ba..2d112ebf2df579 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -82,16 +82,12 @@ class UnionSourceOperatorX final : public OperatorX { return Status::OK(); } - Status prepare(RuntimeState* state) override { - RETURN_IF_ERROR(Base::prepare(state)); + Status open(RuntimeState* state) override { + static_cast(Base::open(state)); // Prepare const expr lists. for (const vectorized::VExprContextSPtrs& exprs : _const_expr_lists) { RETURN_IF_ERROR(vectorized::VExpr::prepare(exprs, state, _row_descriptor)); } - return Status::OK(); - } - Status open(RuntimeState* state) override { - static_cast(Base::open(state)); // open const expr lists. for (const auto& exprs : _const_expr_lists) { RETURN_IF_ERROR(vectorized::VExpr::open(exprs, state)); @@ -99,6 +95,11 @@ class UnionSourceOperatorX final : public OperatorX { return Status::OK(); } [[nodiscard]] int get_child_count() const { return _child_size; } + bool require_shuffled_data_distribution() const override { + return _followed_by_shuffled_operator; + } + + bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; } private: bool _has_data(RuntimeState* state) const { diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index 98b1a719a49da7..19c37f3649bcc7 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -68,16 +68,10 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets return Status::OK(); } -Status LocalExchangeSinkOperatorX::prepare(RuntimeState* state) { - if (_type == ExchangeType::HASH_SHUFFLE || _type == ExchangeType::BUCKET_HASH_SHUFFLE) { - RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc())); - } - - return Status::OK(); -} - Status LocalExchangeSinkOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX::open(state)); if (_type == ExchangeType::HASH_SHUFFLE || _type == ExchangeType::BUCKET_HASH_SHUFFLE) { + RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc())); RETURN_IF_ERROR(_partitioner->open(state)); } diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h index e0e7688307c386..7a98840b4b323e 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h @@ -105,8 +105,6 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX& shuffle_idx_to_instance_idx) override; - Status prepare(RuntimeState* state) override; - Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/local_exchange/local_exchange_source_operator.h index 7bf92add63d702..c0da5c8120c1e9 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h @@ -70,13 +70,12 @@ class LocalExchangeSourceOperatorX final : public OperatorXintermediate_row_desc(); + return _child->intermediate_row_desc(); } - RowDescriptor& row_descriptor() override { return _child_x->row_descriptor(); } - const RowDescriptor& row_desc() const override { return _child_x->row_desc(); } + RowDescriptor& row_descriptor() override { return _child->row_descriptor(); } + const RowDescriptor& row_desc() const override { return _child->row_desc(); } Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 4b16552531c443..74e15d7cc93ea1 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -30,40 +30,38 @@ void Pipeline::_init_profile() { _pipeline_profile = std::make_unique(std::move(s)); } -Status Pipeline::add_operator(OperatorXPtr& op) { +Status Pipeline::add_operator(OperatorPtr& op) { op->set_parallel_tasks(num_tasks()); - operatorXs.emplace_back(op); + _operators.emplace_back(op); if (op->is_source()) { - std::reverse(operatorXs.begin(), operatorXs.end()); + std::reverse(_operators.begin(), _operators.end()); } return Status::OK(); } Status Pipeline::prepare(RuntimeState* state) { - RETURN_IF_ERROR(operatorXs.back()->prepare(state)); - RETURN_IF_ERROR(operatorXs.back()->open(state)); - RETURN_IF_ERROR(_sink_x->prepare(state)); - RETURN_IF_ERROR(_sink_x->open(state)); + RETURN_IF_ERROR(_operators.back()->open(state)); + RETURN_IF_ERROR(_sink->open(state)); _name.append(std::to_string(id())); _name.push_back('-'); - for (auto& op : operatorXs) { + for (auto& op : _operators) { _name.append(std::to_string(op->node_id())); _name.append(op->get_name()); } _name.push_back('-'); - _name.append(std::to_string(_sink_x->node_id())); - _name.append(_sink_x->get_name()); + _name.append(std::to_string(_sink->node_id())); + _name.append(_sink->get_name()); return Status::OK(); } -Status Pipeline::set_sink(DataSinkOperatorXPtr& sink) { - if (_sink_x) { +Status Pipeline::set_sink(DataSinkOperatorPtr& sink) { + if (_sink) { return Status::InternalError("set sink twice"); } if (!sink->is_sink()) { return Status::InternalError("should set a sink operator but {}", typeid(sink).name()); } - _sink_x = sink; + _sink = sink; return Status::OK(); } diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index ae20c760110555..dfeb53ae006116 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -50,18 +50,18 @@ class Pipeline : public std::enable_shared_from_this { } // Add operators for pipelineX - Status add_operator(OperatorXPtr& op); + Status add_operator(OperatorPtr& op); // prepare operators for pipelineX Status prepare(RuntimeState* state); - Status set_sink(DataSinkOperatorXPtr& sink_operator); + Status set_sink(DataSinkOperatorPtr& sink_operator); - DataSinkOperatorXBase* sink_x() { return _sink_x.get(); } - OperatorXs& operator_xs() { return operatorXs; } - DataSinkOperatorXPtr sink_shared_pointer() { return _sink_x; } + DataSinkOperatorXBase* sink() { return _sink.get(); } + Operators& operators() { return _operators; } + DataSinkOperatorPtr sink_shared_pointer() { return _sink; } [[nodiscard]] const RowDescriptor& output_row_desc() const { - return operatorXs.back()->row_desc(); + return _operators.back()->row_desc(); } [[nodiscard]] PipelineId id() const { return _pipeline_id; } @@ -74,7 +74,7 @@ class Pipeline : public std::enable_shared_from_this { if (target_data_distribution.distribution_type != ExchangeType::BUCKET_HASH_SHUFFLE && target_data_distribution.distribution_type != ExchangeType::HASH_SHUFFLE) { return true; - } else if (operatorXs.front()->ignore_data_hash_distribution()) { + } else if (_operators.front()->ignore_data_hash_distribution()) { if (_data_distribution.distribution_type == target_data_distribution.distribution_type && (_data_distribution.partition_exprs.empty() || @@ -93,7 +93,7 @@ class Pipeline : public std::enable_shared_from_this { } } void init_data_distribution() { - set_data_distribution(operatorXs.front()->required_data_distribution()); + set_data_distribution(_operators.front()->required_data_distribution()); } void set_data_distribution(const DataDistribution& data_distribution) { _data_distribution = data_distribution; @@ -107,7 +107,7 @@ class Pipeline : public std::enable_shared_from_this { void incr_created_tasks() { _num_tasks_created++; } void set_num_tasks(int num_tasks) { _num_tasks = num_tasks; - for (auto& op : operatorXs) { + for (auto& op : _operators) { op->set_parallel_tasks(_num_tasks); } } @@ -118,10 +118,10 @@ class Pipeline : public std::enable_shared_from_this { fmt::format_to(debug_string_buffer, "Pipeline [id: {}, _num_tasks: {}, _num_tasks_created: {}]", _pipeline_id, _num_tasks, _num_tasks_created); - for (size_t i = 0; i < operatorXs.size(); i++) { - fmt::format_to(debug_string_buffer, "\n{}", operatorXs[i]->debug_string(i)); + for (size_t i = 0; i < _operators.size(); i++) { + fmt::format_to(debug_string_buffer, "\n{}", _operators[i]->debug_string(i)); } - fmt::format_to(debug_string_buffer, "\n{}", _sink_x->debug_string(operatorXs.size())); + fmt::format_to(debug_string_buffer, "\n{}", _sink->debug_string(_operators.size())); return fmt::to_string(debug_string_buffer); } @@ -145,13 +145,11 @@ class Pipeline : public std::enable_shared_from_this { // Operators for pipelineX. All pipeline tasks share operators from this. // [SourceOperator -> ... -> SinkOperator] - OperatorXs operatorXs; - DataSinkOperatorXPtr _sink_x = nullptr; + Operators _operators; + DataSinkOperatorPtr _sink = nullptr; std::shared_ptr _obj_pool; - Operators _operators; - // Input data distribution of this pipeline. We do local exchange when input data distribution // does not match the target data distribution. DataDistribution _data_distribution {ExchangeType::NOOP}; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 3ed380483e1fbc..7538e580c3ee4f 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -328,8 +328,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re RETURN_IF_ERROR(root_pipeline->set_sink(_sink)); for (PipelinePtr& pipeline : _pipelines) { - DCHECK(pipeline->sink_x() != nullptr) << pipeline->operator_xs().size(); - RETURN_IF_ERROR(pipeline->sink_x()->set_child(pipeline->operator_xs().back())); + DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size(); + RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back())); } } if (_enable_local_shuffle()) { @@ -441,11 +441,11 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag std::map, std::shared_ptr>> le_state_map; - auto source_id = pipeline->operator_xs().front()->operator_id(); + auto source_id = pipeline->operators().front()->operator_id(); if (auto iter = _op_id_to_le_state.find(source_id); iter != _op_id_to_le_state.end()) { le_state_map.insert({source_id, iter->second}); } - for (auto sink_to_source_id : pipeline->sink_x()->dests_id()) { + for (auto sink_to_source_id : pipeline->sink()->dests_id()) { if (auto iter = _op_id_to_le_state.find(sink_to_source_id); iter != _op_id_to_le_state.end()) { le_state_map.insert({sink_to_source_id, iter->second}); @@ -651,7 +651,7 @@ void PipelineFragmentContext::trigger_report_if_necessary() { Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request, - const DescriptorTbl& descs, OperatorXPtr* root, + const DescriptorTbl& descs, OperatorPtr* root, PipelinePtr cur_pipe) { if (request.fragment.plan.nodes.empty()) { throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!"); @@ -672,10 +672,10 @@ Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, const std::vector& tnodes, const doris::TPipelineFragmentParams& request, - const DescriptorTbl& descs, OperatorXPtr parent, - int* node_idx, OperatorXPtr* root, + const DescriptorTbl& descs, OperatorPtr parent, + int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx, - const bool followed_by_shuffled_join) { + const bool followed_by_shuffled_operator) { // propagate error case if (*node_idx >= tnodes.size()) { return Status::InternalError( @@ -685,11 +685,11 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, const TPlanNode& tnode = tnodes[*node_idx]; int num_children = tnodes[*node_idx].num_children; - bool current_followed_by_shuffled_join = followed_by_shuffled_join; - OperatorXPtr op = nullptr; + bool current_followed_by_shuffled_operator = followed_by_shuffled_operator; + OperatorPtr op = nullptr; RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe, parent == nullptr ? -1 : parent->node_id(), child_idx, - followed_by_shuffled_join)); + followed_by_shuffled_operator)); // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr)); if (parent != nullptr) { @@ -699,7 +699,7 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, *root = op; } /** - * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled hash join. + * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators). * * For plan: * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2) @@ -710,18 +710,17 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, * shuffled local exchanger will be used before join so it is not followed by shuffle join. */ auto require_shuffled_data_distribution = - cur_pipe->operator_xs().empty() - ? cur_pipe->sink_x()->require_shuffled_data_distribution() - : op->require_shuffled_data_distribution(); - current_followed_by_shuffled_join = - (followed_by_shuffled_join || op->is_shuffled_hash_join()) && + cur_pipe->operators().empty() ? cur_pipe->sink()->require_shuffled_data_distribution() + : op->require_shuffled_data_distribution(); + current_followed_by_shuffled_operator = + (followed_by_shuffled_operator || op->is_shuffled_operator()) && require_shuffled_data_distribution; // rely on that tnodes is preorder of the plan for (int i = 0; i < num_children; i++) { ++*node_idx; RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr, - cur_pipe, i, current_followed_by_shuffled_join)); + cur_pipe, i, current_followed_by_shuffled_operator)); // we are expecting a child, but have used all nodes // this means we have been given a bad tree and must fail @@ -752,24 +751,24 @@ Status PipelineFragmentContext::_add_local_exchange_impl( const std::map& shuffle_idx_to_instance_idx, const bool ignore_data_hash_distribution) { num_buckets = num_buckets != 0 ? num_buckets : _num_instances; - auto& operator_xs = cur_pipe->operator_xs(); + auto& operators = cur_pipe->operators(); const auto downstream_pipeline_id = cur_pipe->id(); auto local_exchange_id = next_operator_id(); // 1. Create a new pipeline with local exchange sink. - DataSinkOperatorXPtr sink; + DataSinkOperatorPtr sink; auto sink_id = next_sink_operator_id(); /** * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment. * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`. */ - const bool followed_by_shuffled_join = - operator_xs.size() > idx ? operator_xs[idx]->followed_by_shuffled_join() - : cur_pipe->sink_x()->followed_by_shuffled_join(); + const bool followed_by_shuffled_operator = + operators.size() > idx ? operators[idx]->followed_by_shuffled_operator() + : cur_pipe->sink()->followed_by_shuffled_operator(); const bool should_disable_bucket_shuffle = bucket_seq_to_instance_idx.empty() && shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() && - followed_by_shuffled_join; + followed_by_shuffled_operator; sink.reset(new LocalExchangeSinkOperatorX( sink_id, local_exchange_id, should_disable_bucket_shuffle ? _total_instances : _num_instances, @@ -779,9 +778,9 @@ Status PipelineFragmentContext::_add_local_exchange_impl( data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE; } RETURN_IF_ERROR(new_pip->set_sink(sink)); - RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, num_buckets, - should_disable_bucket_shuffle, - shuffle_idx_to_instance_idx)); + RETURN_IF_ERROR(new_pip->sink()->init(data_distribution.distribution_type, num_buckets, + should_disable_bucket_shuffle, + shuffle_idx_to_instance_idx)); // 2. Create and initialize LocalExchangeSharedState. std::shared_ptr shared_state = @@ -836,7 +835,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( } break; case ExchangeType::LOCAL_MERGE_SORT: { - auto child_op = cur_pipe->sink_x()->child_x(); + auto child_op = cur_pipe->sink()->child(); auto sort_source = std::dynamic_pointer_cast(child_op); if (!sort_source) { return Status::InternalError( @@ -871,21 +870,21 @@ Status PipelineFragmentContext::_add_local_exchange_impl( // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink]. // 3.1 Initialize new pipeline's operator list. - std::copy(operator_xs.begin(), operator_xs.begin() + idx, - std::inserter(new_pip->operator_xs(), new_pip->operator_xs().end())); + std::copy(operators.begin(), operators.begin() + idx, + std::inserter(new_pip->operators(), new_pip->operators().end())); // 3.2 Erase unused operators in previous pipeline. - operator_xs.erase(operator_xs.begin(), operator_xs.begin() + idx); + operators.erase(operators.begin(), operators.begin() + idx); // 4. Initialize LocalExchangeSource and insert it into this pipeline. - OperatorXPtr source_op; + OperatorPtr source_op; source_op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id)); - RETURN_IF_ERROR(source_op->set_child(new_pip->operator_xs().back())); + RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back())); RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type)); - if (!operator_xs.empty()) { - RETURN_IF_ERROR(operator_xs.front()->set_child(source_op)); + if (!operators.empty()) { + RETURN_IF_ERROR(operators.front()->set_child(source_op)); } - operator_xs.insert(operator_xs.begin(), source_op); + operators.insert(operators.begin(), source_op); shared_state->create_dependencies(local_exchange_id); @@ -894,8 +893,8 @@ Status PipelineFragmentContext::_add_local_exchange_impl( std::vector edges_with_source; for (auto child : cur_pipe->children()) { bool found = false; - for (auto op : new_pip->operator_xs()) { - if (child->sink_x()->node_id() == op->node_id()) { + for (auto op : new_pip->operators()) { + if (child->sink()->node_id() == op->node_id()) { new_pip->set_children(child); found = true; }; @@ -918,8 +917,8 @@ Status PipelineFragmentContext::_add_local_exchange_impl( } cur_pipe->set_children(new_children); _dag[downstream_pipeline_id] = edges_with_source; - RETURN_IF_ERROR(new_pip->sink_x()->set_child(new_pip->operator_xs().back())); - RETURN_IF_ERROR(cur_pipe->sink_x()->set_child(cur_pipe->operator_xs().back())); + RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back())); + RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back())); // 7. Inherit properties from current pipeline. _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip); @@ -942,23 +941,23 @@ Status PipelineFragmentContext::_add_local_exchange( } *do_local_exchange = true; - auto& operator_xs = cur_pipe->operator_xs(); - auto total_op_num = operator_xs.size(); + auto& operators = cur_pipe->operators(); + auto total_op_num = operators.size(); auto new_pip = add_pipeline(cur_pipe, pip_idx + 1); RETURN_IF_ERROR(_add_local_exchange_impl( idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, ignore_data_distribution)); - CHECK(total_op_num + 1 == cur_pipe->operator_xs().size() + new_pip->operator_xs().size()) + CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size()) << "total_op_num: " << total_op_num - << " cur_pipe->operator_xs().size(): " << cur_pipe->operator_xs().size() - << " new_pip->operator_xs().size(): " << new_pip->operator_xs().size(); + << " cur_pipe->operators().size(): " << cur_pipe->operators().size() + << " new_pip->operators().size(): " << new_pip->operators().size(); // Add passthrough local exchanger if necessary if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 && Pipeline::is_hash_exchange(data_distribution.distribution_type)) { RETURN_IF_ERROR(_add_local_exchange_impl( - new_pip->operator_xs().size(), pool, new_pip, add_pipeline(new_pip, pip_idx + 2), + new_pip->operators().size(), pool, new_pip, add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH), do_local_exchange, num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, ignore_data_distribution)); } @@ -973,10 +972,10 @@ Status PipelineFragmentContext::_plan_local_exchange( // Set property if child pipeline is not join operator's child. if (!_pipelines[pip_idx]->children().empty()) { for (auto& child : _pipelines[pip_idx]->children()) { - if (child->sink_x()->node_id() == - _pipelines[pip_idx]->operator_xs().front()->node_id()) { - RETURN_IF_ERROR(_pipelines[pip_idx]->operator_xs().front()->set_child( - child->operator_xs().back())); + if (child->sink()->node_id() == + _pipelines[pip_idx]->operators().front()->node_id()) { + RETURN_IF_ERROR(_pipelines[pip_idx]->operators().front()->set_child( + child->operators().back())); _pipelines[pip_idx]->set_data_distribution(child->data_distribution()); } } @@ -986,13 +985,13 @@ Status PipelineFragmentContext::_plan_local_exchange( // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0 // still keep colocate plan after local shuffle RETURN_IF_ERROR(_plan_local_exchange( - _pipelines[pip_idx]->operator_xs().front()->ignore_data_hash_distribution() || + _pipelines[pip_idx]->operators().front()->ignore_data_hash_distribution() || num_buckets == 0 ? _num_instances : num_buckets, pip_idx, _pipelines[pip_idx], bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, - _pipelines[pip_idx]->operator_xs().front()->ignore_data_hash_distribution())); + _pipelines[pip_idx]->operators().front()->ignore_data_hash_distribution())); } return Status::OK(); } @@ -1005,7 +1004,7 @@ Status PipelineFragmentContext::_plan_local_exchange( int idx = 1; bool do_local_exchange = false; do { - auto& ops = pip->operator_xs(); + auto& ops = pip->operators(); do_local_exchange = false; // Plan local exchange for each operator. for (; idx < ops.size();) { @@ -1027,10 +1026,10 @@ Status PipelineFragmentContext::_plan_local_exchange( idx++; } } while (do_local_exchange); - if (pip->sink_x()->required_data_distribution().need_local_exchange()) { + if (pip->sink()->required_data_distribution().need_local_exchange()) { RETURN_IF_ERROR(_add_local_exchange( - pip_idx, idx, pip->sink_x()->node_id(), _runtime_state->obj_pool(), pip, - pip->sink_x()->required_data_distribution(), &do_local_exchange, num_buckets, + pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip, + pip->sink()->required_data_distribution(), &do_local_exchange, num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, ignore_data_hash_distribution)); } @@ -1168,14 +1167,14 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS _row_desc = pool->add(new RowDescriptor(tmp_row_desc)); } auto source_id = sources[i]; - OperatorXPtr source_op; + OperatorPtr source_op; // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline source_op.reset(new MultiCastDataStreamerSourceOperatorX( i, pool, thrift_sink.multi_cast_stream_sink.sinks[i], row_desc, source_id)); RETURN_IF_ERROR(new_pipeline->add_operator(source_op)); // 2. create and set sink operator of data stream sender for new pipeline - DataSinkOperatorXPtr sink_op; + DataSinkOperatorPtr sink_op; sink_op.reset( new ExchangeSinkOperatorX(state, *_row_desc, next_sink_operator_id(), thrift_sink.multi_cast_stream_sink.sinks[i], @@ -1206,10 +1205,10 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS // NOLINTBEGIN(readability-function-cognitive-complexity) Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode, const doris::TPipelineFragmentParams& request, - const DescriptorTbl& descs, OperatorXPtr& op, + const DescriptorTbl& descs, OperatorPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx, - const bool followed_by_shuffled_join) { + const bool followed_by_shuffled_operator) { // We directly construct the operator from Thrift because the given array is in the order of preorder traversal. // Therefore, here we need to use a stack-like structure. _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx); @@ -1304,7 +1303,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt) { op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, _require_bucket_distribution)); - op->set_followed_by_shuffled_join(followed_by_shuffled_join); + op->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || op->require_data_distribution(); RETURN_IF_ERROR(cur_pipe->add_operator(op)); @@ -1328,7 +1327,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo cur_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(cur_pipe->id()); - DataSinkOperatorXPtr sink; + DataSinkOperatorPtr sink; if (enable_spill) { sink.reset(new PartitionedAggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); @@ -1336,12 +1335,12 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); - RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); + RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get())); } break; } @@ -1379,15 +1378,15 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo pool, next_sink_operator_id(), tnode_, descs, _need_local_merge, partition_count); sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator); - DataSinkOperatorXPtr sink = std::move(sink_operator); + DataSinkOperatorPtr sink = std::move(sink_operator); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); - RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode_, _runtime_state.get())); + RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get())); _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); - sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join()); - op->set_followed_by_shuffled_join(op->is_shuffled_hash_join()); + sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator()); + op->set_followed_by_shuffled_operator(op->is_shuffled_operator()); } else { op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); @@ -1399,17 +1398,17 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo PipelinePtr build_side_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); - DataSinkOperatorXPtr sink; + DataSinkOperatorPtr sink; sink.reset(new HashJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _need_local_merge)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); - RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); + RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get())); _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); - sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join()); - op->set_followed_by_shuffled_join(op->is_shuffled_hash_join()); + sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator()); + op->set_followed_by_shuffled_operator(op->is_shuffled_operator()); } _require_bucket_distribution = _require_bucket_distribution || op->require_data_distribution(); @@ -1426,12 +1425,12 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo PipelinePtr build_side_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); - DataSinkOperatorXPtr sink; + DataSinkOperatorPtr sink; sink.reset(new NestedLoopJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _need_local_merge)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); - RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); + RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get())); _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); break; @@ -1439,6 +1438,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo case TPlanNodeType::UNION_NODE: { int child_count = tnode.num_children; op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); RETURN_IF_ERROR(cur_pipe->add_operator(op)); const auto downstream_pipeline_id = cur_pipe->id(); @@ -1448,11 +1448,12 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo for (int i = 0; i < child_count; i++) { PipelinePtr build_side_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); - DataSinkOperatorXPtr sink; + DataSinkOperatorPtr sink; sink.reset(new UnionSinkOperatorX(i, next_sink_operator_id(), pool, tnode, descs)); + sink->set_followed_by_shuffled_operator(_require_bucket_distribution); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); - RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); + RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get())); // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build. _pipeline_parent_map.push(op->node_id(), build_side_pipe); } @@ -1475,7 +1476,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo cur_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(cur_pipe->id()); - DataSinkOperatorXPtr sink; + DataSinkOperatorPtr sink; if (should_spill) { sink.reset(new SpillSortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); @@ -1483,12 +1484,12 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); - RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); + RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get())); break; } case doris::TPlanNodeType::PARTITION_SORT_NODE: { @@ -1502,11 +1503,11 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo cur_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(cur_pipe->id()); - DataSinkOperatorXPtr sink; + DataSinkOperatorPtr sink; sink.reset(new PartitionSortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); - RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); + RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get())); break; } case TPlanNodeType::ANALYTIC_EVAL_NODE: { @@ -1520,25 +1521,27 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo cur_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(cur_pipe->id()); - DataSinkOperatorXPtr sink; + DataSinkOperatorPtr sink; sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); - RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); + RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get())); break; } case TPlanNodeType::INTERSECT_NODE: { RETURN_IF_ERROR(_build_operators_for_set_operation_node( pool, tnode, descs, op, cur_pipe, parent_idx, child_idx)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); break; } case TPlanNodeType::EXCEPT_NODE: { RETURN_IF_ERROR(_build_operators_for_set_operation_node( pool, tnode, descs, op, cur_pipe, parent_idx, child_idx)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); break; } case TPlanNodeType::REPEAT_NODE: { @@ -1597,7 +1600,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo template Status PipelineFragmentContext::_build_operators_for_set_operation_node( - ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorXPtr& op, + ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx) { op.reset(new SetSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); @@ -1611,7 +1614,7 @@ Status PipelineFragmentContext::_build_operators_for_set_operation_node( PipelinePtr probe_side_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(probe_side_pipe->id()); - DataSinkOperatorXPtr sink; + DataSinkOperatorPtr sink; if (child_id == 0) { sink.reset(new SetSinkOperatorX(child_id, next_sink_operator_id(), pool, tnode, descs)); @@ -1621,7 +1624,7 @@ Status PipelineFragmentContext::_build_operators_for_set_operation_node( } sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(probe_side_pipe->set_sink(sink)); - RETURN_IF_ERROR(probe_side_pipe->sink_x()->init(tnode, _runtime_state.get())); + RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get())); // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build. _pipeline_parent_map.push(op->node_id(), probe_side_pipe); } @@ -1699,8 +1702,8 @@ void PipelineFragmentContext::_close_fragment_instance() { } if (_query_ctx->enable_profile()) { - _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile_x(), - collect_realtime_load_channel_profile_x()); + _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(), + collect_realtime_load_channel_profile()); } // all submitted tasks done @@ -1774,7 +1777,7 @@ std::string PipelineFragmentContext::debug_string() { } std::vector> -PipelineFragmentContext::collect_realtime_profile_x() const { +PipelineFragmentContext::collect_realtime_profile() const { std::vector> res; // we do not have mutex to protect pipeline_id_to_profile @@ -1799,7 +1802,7 @@ PipelineFragmentContext::collect_realtime_profile_x() const { } std::shared_ptr -PipelineFragmentContext::collect_realtime_load_channel_profile_x() const { +PipelineFragmentContext::collect_realtime_load_channel_profile() const { // we do not have mutex to protect pipeline_id_to_profile // so we need to make sure this funciton is invoked after fragment context // has already been prepared. diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index e0e4c12ef0d1e7..f95eb03fb12d48 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -69,8 +69,8 @@ class PipelineFragmentContext : public TaskExecutionContext { ~PipelineFragmentContext(); - std::vector> collect_realtime_profile_x() const; - std::shared_ptr collect_realtime_load_channel_profile_x() const; + std::vector> collect_realtime_profile() const; + std::shared_ptr collect_realtime_load_channel_profile() const; bool is_timeout(timespec now) const; @@ -141,20 +141,20 @@ class PipelineFragmentContext : public TaskExecutionContext { private: Status _build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request, - const DescriptorTbl& descs, OperatorXPtr* root, PipelinePtr cur_pipe); + const DescriptorTbl& descs, OperatorPtr* root, PipelinePtr cur_pipe); Status _create_tree_helper(ObjectPool* pool, const std::vector& tnodes, const doris::TPipelineFragmentParams& request, - const DescriptorTbl& descs, OperatorXPtr parent, int* node_idx, - OperatorXPtr* root, PipelinePtr& cur_pipe, int child_idx, + const DescriptorTbl& descs, OperatorPtr parent, int* node_idx, + OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx, const bool followed_by_shuffled_join); Status _create_operator(ObjectPool* pool, const TPlanNode& tnode, const doris::TPipelineFragmentParams& request, - const DescriptorTbl& descs, OperatorXPtr& op, PipelinePtr& cur_pipe, + const DescriptorTbl& descs, OperatorPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx, const bool followed_by_shuffled_join); template Status _build_operators_for_set_operation_node(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs, OperatorXPtr& op, + const DescriptorTbl& descs, OperatorPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx); @@ -243,7 +243,7 @@ class PipelineFragmentContext : public TaskExecutionContext { int _timeout = -1; - OperatorXPtr _root_op = nullptr; + OperatorPtr _root_op = nullptr; // this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines. std::vector>> _tasks; @@ -255,7 +255,7 @@ class PipelineFragmentContext : public TaskExecutionContext { #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wshadow-field" #endif - DataSinkOperatorXPtr _sink = nullptr; + DataSinkOperatorPtr _sink = nullptr; #ifdef __clang__ #pragma clang diagnostic pop #endif diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 8692075622a906..4f362ac5042e8f 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -58,7 +58,7 @@ PipelineTask::PipelineTask( _state(state), _fragment_context(fragment_context), _parent_profile(parent_profile), - _operators(pipeline->operator_xs()), + _operators(pipeline->operators()), _source(_operators.front().get()), _root(_operators.back().get()), _sink(pipeline->sink_shared_pointer()), diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 8fb4b4eb7992f5..dd2ead4b5dcc91 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -130,7 +130,7 @@ class PipelineTask { void wake_up(); - DataSinkOperatorXPtr sink() const { return _sink; } + DataSinkOperatorPtr sink() const { return _sink; } int task_id() const { return _index; }; bool is_finalized() const { return _finalized; } @@ -282,10 +282,10 @@ class PipelineTask { MonotonicStopWatch _pipeline_task_watcher; - OperatorXs _operators; // left is _source, right is _root + Operators _operators; // left is _source, right is _root OperatorXBase* _source; OperatorXBase* _root; - DataSinkOperatorXPtr _sink; + DataSinkOperatorPtr _sink; // `_read_dependencies` is stored as same order as `_operators` std::vector> _read_dependencies; diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index d0a821b6cdfdae..2b333057a539e4 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -402,7 +402,7 @@ QueryContext::_collect_realtime_query_profile() const { continue; } - auto profile = fragment_ctx->collect_realtime_profile_x(); + auto profile = fragment_ctx->collect_realtime_profile(); if (profile.empty()) { std::string err_msg = fmt::format( diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index fb2f24ee0e1817..1f24ba122e7e3f 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -210,7 +210,7 @@ Status Channel::send_local_block(Status exec_status, bool eos) { } template -Status Channel::send_local_block(Block* block) { +Status Channel::send_local_block(Block* block, bool can_be_moved) { SCOPED_TIMER(_parent->local_send_timer()); if (_recvr_is_valid()) { if constexpr (!std::is_same_v) { @@ -218,7 +218,7 @@ Status Channel::send_local_block(Block* block) { COUNTER_UPDATE(_parent->local_sent_rows(), block->rows()); COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); } - _local_recvr->add_block(block, _parent->sender_id(), false); + _local_recvr->add_block(block, _parent->sender_id(), can_be_moved); return Status::OK(); } else { return _receiver_status; diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 7c86a62519a851..b046c2efafcddf 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -156,7 +156,7 @@ class Channel { Status send_local_block(Status exec_status, bool eos = false); - Status send_local_block(Block* block); + Status send_local_block(Block* block, bool can_be_moved); // Flush buffered rows and close channel. This function don't wait the response // of close operation, client should call close_wait() to finish channel's close. // We split one close operation into two phases in order to make multiple channels diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp index f6a9d33885cfb3..0944193e6d09b8 100644 --- a/be/test/vec/exec/vwal_scanner_test.cpp +++ b/be/test/vec/exec/vwal_scanner_test.cpp @@ -259,7 +259,7 @@ void VWalScannerTest::init() { std::make_shared(&_obj_pool, _tnode, 0, *_desc_tbl, 1); _scan_node->_output_tuple_desc = _runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id); WARN_IF_ERROR(_scan_node->init(_tnode, &_runtime_state), "fail to init scan_node"); - WARN_IF_ERROR(_scan_node->prepare(&_runtime_state), "fail to prepare scan_node"); + WARN_IF_ERROR(_scan_node->open(&_runtime_state), "fail to prepare scan_node"); auto local_state = pipeline::FileScanLocalState::create_unique(&_runtime_state, _scan_node.get());