From 3af3dd34a2286b21bf57436b87784694e244d339 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 9 Oct 2024 11:21:27 +0800 Subject: [PATCH] [local exchange](fix) Fix correctness caused by local exchange (#41555) For plan `local exchange (hash shuffle) -> union -> colocated agg`, we must ensure local exchange use the same hash algorithm as MPP shuffling. This problem is covered by our test cases but only can be reproduced on multiple BEs so no case is added in this PR. --- .../pipeline/exec/aggregation_sink_operator.h | 2 +- be/src/pipeline/exec/analytic_sink_operator.h | 2 +- .../distinct_streaming_aggregation_operator.h | 2 +- be/src/pipeline/exec/hashjoin_build_sink.h | 2 +- .../pipeline/exec/hashjoin_probe_operator.h | 2 +- be/src/pipeline/exec/operator.h | 15 +++---- .../partitioned_hash_join_probe_operator.h | 2 +- .../partitioned_hash_join_sink_operator.h | 2 +- be/src/pipeline/exec/sort_sink_operator.h | 2 +- be/src/pipeline/exec/union_sink_operator.h | 6 +++ be/src/pipeline/exec/union_source_operator.h | 5 +++ be/src/pipeline/pipeline_fragment_context.cpp | 44 ++++++++++--------- 12 files changed, 50 insertions(+), 36 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 97440de3f09e4c..7c146c38a2b135 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -148,7 +148,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX { ? DataDistribution(ExchangeType::PASSTHROUGH) : DataSinkOperatorX::required_data_distribution(); } - return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index b8615717198dd3..8f7a9d1c35ae38 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -81,7 +81,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX create_shared_state() const = 0; [[nodiscard]] virtual DataDistribution required_data_distribution() const; - [[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; } - Status close(RuntimeState* state) override { return Status::InternalError("Should not reach here!"); } @@ -662,8 +663,6 @@ class OperatorXBase : public OperatorBase { [[nodiscard]] virtual Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) = 0; - [[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; } - Status close(RuntimeState* state) override; [[nodiscard]] virtual const RowDescriptor& intermediate_row_desc() const { diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 8cccc9f8faeba6..3aab11f62d883e 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -168,7 +168,7 @@ class PartitionedHashJoinProbeOperatorX final bool require_shuffled_data_distribution() const override { return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; } - bool is_shuffled_hash_join() const override { + bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 1376964663f7f3..c768d7518b95c9 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -118,7 +118,7 @@ class PartitionedHashJoinSinkOperatorX bool require_shuffled_data_distribution() const override { return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; } - bool is_shuffled_hash_join() const override { + bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 0bd6dd9096482c..8462472dd02671 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -63,7 +63,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_is_analytic_sort) { - return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } else if (_merge_by_exchange) { diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index 13dfb0ba6379cb..f939950143ae92 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -89,6 +89,12 @@ class UnionSinkOperatorX final : public DataSinkOperatorX { } } + bool require_shuffled_data_distribution() const override { + return _followed_by_shuffled_operator; + } + + bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; } + private: int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; } diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index bf32e9a25c2454..2d112ebf2df579 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -95,6 +95,11 @@ class UnionSourceOperatorX final : public OperatorX { return Status::OK(); } [[nodiscard]] int get_child_count() const { return _child_size; } + bool require_shuffled_data_distribution() const override { + return _followed_by_shuffled_operator; + } + + bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; } private: bool _has_data(RuntimeState* state) const { diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 7273b480dd211f..7538e580c3ee4f 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -675,7 +675,7 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, const DescriptorTbl& descs, OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx, - const bool followed_by_shuffled_join) { + const bool followed_by_shuffled_operator) { // propagate error case if (*node_idx >= tnodes.size()) { return Status::InternalError( @@ -685,11 +685,11 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, const TPlanNode& tnode = tnodes[*node_idx]; int num_children = tnodes[*node_idx].num_children; - bool current_followed_by_shuffled_join = followed_by_shuffled_join; + bool current_followed_by_shuffled_operator = followed_by_shuffled_operator; OperatorPtr op = nullptr; RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe, parent == nullptr ? -1 : parent->node_id(), child_idx, - followed_by_shuffled_join)); + followed_by_shuffled_operator)); // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr)); if (parent != nullptr) { @@ -699,7 +699,7 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, *root = op; } /** - * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled hash join. + * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators). * * For plan: * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2) @@ -712,15 +712,15 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, auto require_shuffled_data_distribution = cur_pipe->operators().empty() ? cur_pipe->sink()->require_shuffled_data_distribution() : op->require_shuffled_data_distribution(); - current_followed_by_shuffled_join = - (followed_by_shuffled_join || op->is_shuffled_hash_join()) && + current_followed_by_shuffled_operator = + (followed_by_shuffled_operator || op->is_shuffled_operator()) && require_shuffled_data_distribution; // rely on that tnodes is preorder of the plan for (int i = 0; i < num_children; i++) { ++*node_idx; RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr, - cur_pipe, i, current_followed_by_shuffled_join)); + cur_pipe, i, current_followed_by_shuffled_operator)); // we are expecting a child, but have used all nodes // this means we have been given a bad tree and must fail @@ -762,13 +762,13 @@ Status PipelineFragmentContext::_add_local_exchange_impl( * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment. * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`. */ - const bool followed_by_shuffled_join = operators.size() > idx - ? operators[idx]->followed_by_shuffled_join() - : cur_pipe->sink()->followed_by_shuffled_join(); + const bool followed_by_shuffled_operator = + operators.size() > idx ? operators[idx]->followed_by_shuffled_operator() + : cur_pipe->sink()->followed_by_shuffled_operator(); const bool should_disable_bucket_shuffle = bucket_seq_to_instance_idx.empty() && shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() && - followed_by_shuffled_join; + followed_by_shuffled_operator; sink.reset(new LocalExchangeSinkOperatorX( sink_id, local_exchange_id, should_disable_bucket_shuffle ? _total_instances : _num_instances, @@ -1208,7 +1208,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo const DescriptorTbl& descs, OperatorPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx, - const bool followed_by_shuffled_join) { + const bool followed_by_shuffled_operator) { // We directly construct the operator from Thrift because the given array is in the order of preorder traversal. // Therefore, here we need to use a stack-like structure. _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx); @@ -1303,7 +1303,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt) { op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, _require_bucket_distribution)); - op->set_followed_by_shuffled_join(followed_by_shuffled_join); + op->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || op->require_data_distribution(); RETURN_IF_ERROR(cur_pipe->add_operator(op)); @@ -1335,7 +1335,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1385,8 +1385,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); - sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join()); - op->set_followed_by_shuffled_join(op->is_shuffled_hash_join()); + sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator()); + op->set_followed_by_shuffled_operator(op->is_shuffled_operator()); } else { op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); @@ -1407,8 +1407,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); - sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join()); - op->set_followed_by_shuffled_join(op->is_shuffled_hash_join()); + sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator()); + op->set_followed_by_shuffled_operator(op->is_shuffled_operator()); } _require_bucket_distribution = _require_bucket_distribution || op->require_data_distribution(); @@ -1438,6 +1438,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo case TPlanNodeType::UNION_NODE: { int child_count = tnode.num_children; op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); RETURN_IF_ERROR(cur_pipe->add_operator(op)); const auto downstream_pipeline_id = cur_pipe->id(); @@ -1449,6 +1450,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); DataSinkOperatorPtr sink; sink.reset(new UnionSinkOperatorX(i, next_sink_operator_id(), pool, tnode, descs)); + sink->set_followed_by_shuffled_operator(_require_bucket_distribution); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get())); @@ -1482,7 +1484,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1522,7 +1524,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo DataSinkOperatorPtr sink; sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1533,11 +1535,13 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo case TPlanNodeType::INTERSECT_NODE: { RETURN_IF_ERROR(_build_operators_for_set_operation_node( pool, tnode, descs, op, cur_pipe, parent_idx, child_idx)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); break; } case TPlanNodeType::EXCEPT_NODE: { RETURN_IF_ERROR(_build_operators_for_set_operation_node( pool, tnode, descs, op, cur_pipe, parent_idx, child_idx)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); break; } case TPlanNodeType::REPEAT_NODE: {