diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 7584c0b0e4591c..ada5d5455b0dd3 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -170,19 +170,20 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { _part_type = p._part_type; SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM || + _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { + std::random_device rd; + std::mt19937 g(rd()); + shuffle(channels.begin(), channels.end(), g); + } int local_size = 0; for (int i = 0; i < channels.size(); ++i) { RETURN_IF_ERROR(channels[i]->open(state)); if (channels[i]->is_local()) { local_size++; + _last_local_channel_idx = i; } } - if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM || - _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { - std::random_device rd; - std::mt19937 g(rd()); - shuffle(channels.begin(), channels.end(), g); - } only_local_exchange = local_size == channels.size(); PUniqueId id; @@ -446,11 +447,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (local_state.only_local_exchange) { if (!block->empty()) { Status status; + size_t idx = 0; for (auto* channel : local_state.channels) { if (!channel->is_receiver_eof()) { - status = channel->send_local_block(block); + // If this channel is the last, we can move this block to downstream pipeline. + // Otherwise, this block also need to be broadcasted to other channels so should be copied. + DCHECK_GE(local_state._last_local_channel_idx, 0); + status = channel->send_local_block( + block, idx == local_state._last_local_channel_idx); HANDLE_CHANNEL_STATUS(state, channel, status); } + idx++; } } } else { @@ -471,21 +478,33 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } else { block_holder->get_block()->Clear(); } + size_t idx = 0; + bool moved = false; for (auto* channel : local_state.channels) { if (!channel->is_receiver_eof()) { Status status; if (channel->is_local()) { - status = channel->send_local_block(&cur_block); + // If this channel is the last, we can move this block to downstream pipeline. + // Otherwise, this block also need to be broadcasted to other channels so should be copied. + DCHECK_GE(local_state._last_local_channel_idx, 0); + status = channel->send_local_block( + &cur_block, idx == local_state._last_local_channel_idx); + moved = idx == local_state._last_local_channel_idx; } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); status = channel->send_broadcast_block(block_holder, eos); } HANDLE_CHANNEL_STATUS(state, channel, status); } + idx++; + } + if (moved) { + local_state._serializer.reset_block(); + } else { + cur_block.clear_column_data(); + local_state._serializer.get_block()->set_mutable_columns( + cur_block.mutate_columns()); } - cur_block.clear_column_data(); - local_state._serializer.get_block()->set_mutable_columns( - cur_block.mutate_columns()); } } } @@ -496,7 +515,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (!current_channel->is_receiver_eof()) { // 2. serialize, send and rollover block if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block); + auto status = current_channel->send_local_block(block, true); HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -582,7 +601,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (!current_channel->is_receiver_eof()) { // 2. serialize, send and rollover block if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block); + auto status = current_channel->send_local_block(block, true); HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index a94392b906d259..aeb6a1503b7c06 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -234,6 +234,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { // for external table sink hash partition std::unique_ptr _partition_function = nullptr; std::atomic _reach_limit = false; + int _last_local_channel_idx = -1; }; class ExchangeSinkOperatorX final : public DataSinkOperatorX { diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 4ed414a0774204..6fe0b7f9e256c6 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -210,7 +210,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) Status status; for (auto channel : _channels) { if (!channel->is_receiver_eof()) { - status = channel->send_local_block(_output_block.get()); + status = channel->send_local_block(_output_block.get(), false); HANDLE_CHANNEL_STATUS(state, channel, status); } } @@ -234,7 +234,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) for (auto channel : _channels) { if (!channel->is_receiver_eof()) { if (channel->is_local()) { - status = channel->send_local_block(&cur_block); + status = channel->send_local_block(&cur_block, false); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); status = channel->send_broadcast_block(_block_holder, true); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp index 78c2761dcc783e..c6f675f3c1b858 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp @@ -27,6 +27,11 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo SCOPED_TIMER(_init_timer); _compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime"); _distribute_timer = ADD_TIMER(profile(), "DistributeDataTime"); + if (_parent->cast()._type == ExchangeType::HASH_SHUFFLE) { + _profile->add_info_string( + "UseGlobalShuffle", + std::to_string(_parent->cast()._use_global_shuffle)); + } _channel_id = info.task_idx; return Status::OK(); } @@ -61,10 +66,12 @@ Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status exec_statu std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, - "{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, " + "{}, _use_global_shuffle: {}, _channel_id: {}, _num_partitions: {}, " + "_num_senders: {}, _num_sources: {}, " "_running_sink_operators: {}, _running_source_operators: {}, _release_count: {}", - Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions, - _exchanger->_num_senders, _exchanger->_num_sources, + Base::debug_string(indentation_level), + _parent->cast()._use_global_shuffle, _channel_id, + _exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources, _exchanger->_running_sink_operators, _exchanger->_running_source_operators, _release_count); return fmt::to_string(debug_string_buffer); @@ -76,6 +83,7 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")"; _type = type; if (_type == ExchangeType::HASH_SHUFFLE) { + _use_global_shuffle = should_disable_bucket_shuffle; // For shuffle join, if data distribution has been broken by previous operator, we // should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned, // we should use map shuffle idx to instance idx because all instances will be diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h index f1d60fc03c4360..9b72402abce990 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -125,6 +125,7 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX _partitioner; const std::map _bucket_seq_to_instance_idx; std::vector> _shuffle_idx_to_instance_idx; + bool _use_global_shuffle = false; }; } // namespace doris::pipeline diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 44124ea7954f5a..394005f6adf00c 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -231,7 +231,7 @@ Status Channel::send_local_block(Status exec_status, bool eos) { } template -Status Channel::send_local_block(Block* block) { +Status Channel::send_local_block(Block* block, bool can_be_moved) { SCOPED_TIMER(_parent->local_send_timer()); if (_recvr_is_valid()) { if constexpr (!std::is_same_v) { @@ -239,7 +239,7 @@ Status Channel::send_local_block(Block* block) { COUNTER_UPDATE(_parent->local_sent_rows(), block->rows()); COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); } - _local_recvr->add_block(block, _parent->sender_id(), false); + _local_recvr->add_block(block, _parent->sender_id(), can_be_moved); return Status::OK(); } else { return _receiver_status; @@ -646,7 +646,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { Status status; for (auto channel : _channels) { if (!channel->is_receiver_eof()) { - status = channel->send_local_block(block); + status = channel->send_local_block(block, false); HANDLE_CHANNEL_STATUS(state, channel, status); } } @@ -671,7 +671,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { for (auto channel : _channels) { if (!channel->is_receiver_eof()) { if (channel->is_local()) { - status = channel->send_local_block(&cur_block); + status = channel->send_local_block(&cur_block, false); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); status = channel->send_broadcast_block(block_holder, eos); @@ -698,7 +698,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { for (auto channel : _channels) { if (!channel->is_receiver_eof()) { if (channel->is_local()) { - status = channel->send_local_block(&cur_block); + status = channel->send_local_block(&cur_block, false); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); status = channel->send_remote_block(_cur_pb_block, false); @@ -717,7 +717,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { if (!current_channel->is_receiver_eof()) { // 2. serialize, send and rollover block if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block); + auto status = current_channel->send_local_block(block, false); HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -829,7 +829,7 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { for (auto channel : _channels) { if (!channel->is_receiver_eof()) { if (channel->is_local()) { - status = channel->send_local_block(&block); + status = channel->send_local_block(&block, false); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); status = channel->send_remote_block(_cur_pb_block, false); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index b9462434f073dc..92344b994e08f7 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -307,7 +307,7 @@ class Channel { Status send_local_block(Status exec_status, bool eos = false); - Status send_local_block(Block* block); + Status send_local_block(Block* block, bool can_be_moved); // Flush buffered rows and close channel. This function don't wait the response // of close operation, client should call close_wait() to finish channel's close. // We split one close operation into two phases in order to make multiple channels