diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index 55d81f449e7927..d87113ca80a959 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -41,6 +41,7 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")"; _type = type; if (_type == ExchangeType::HASH_SHUFFLE) { + _use_global_shuffle = should_disable_bucket_shuffle; // For shuffle join, if data distribution has been broken by previous operator, we // should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned, // we should use map shuffle idx to instance idx because all instances will be @@ -84,6 +85,11 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo SCOPED_TIMER(_init_timer); _compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime"); _distribute_timer = ADD_TIMER(profile(), "DistributeDataTime"); + if (_parent->cast()._type == ExchangeType::HASH_SHUFFLE) { + _profile->add_info_string( + "UseGlobalShuffle", + std::to_string(_parent->cast()._use_global_shuffle)); + } _channel_id = info.task_idx; return Status::OK(); } @@ -107,10 +113,12 @@ Status LocalExchangeSinkLocalState::open(RuntimeState* state) { std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, - "{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, " + "{}, _use_global_shuffle: {}, _channel_id: {}, _num_partitions: {}, " + "_num_senders: {}, _num_sources: {}, " "_running_sink_operators: {}, _running_source_operators: {}", - Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions, - _exchanger->_num_senders, _exchanger->_num_sources, + Base::debug_string(indentation_level), + _parent->cast()._use_global_shuffle, _channel_id, + _exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources, _exchanger->_running_sink_operators, _exchanger->_running_source_operators); return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h index fa9512677dc8d8..1cd9736d4291d6 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h @@ -116,6 +116,7 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX _partitioner; const std::map _bucket_seq_to_instance_idx; std::vector> _shuffle_idx_to_instance_idx; + bool _use_global_shuffle = false; }; } // namespace doris::pipeline