Skip to content

Commit

Permalink
[pick](branch-2.1) pick apache#41676 apache#41740 apache#41857 (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Oct 15, 2024
1 parent a4b7d93 commit b185dfc
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 26 deletions.
45 changes: 32 additions & 13 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,19 +170,20 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
_part_type = p._part_type;
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());

if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM ||
_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
std::random_device rd;
std::mt19937 g(rd());
shuffle(channels.begin(), channels.end(), g);
}
int local_size = 0;
for (int i = 0; i < channels.size(); ++i) {
RETURN_IF_ERROR(channels[i]->open(state));
if (channels[i]->is_local()) {
local_size++;
_last_local_channel_idx = i;
}
}
if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM ||
_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
std::random_device rd;
std::mt19937 g(rd());
shuffle(channels.begin(), channels.end(), g);
}
only_local_exchange = local_size == channels.size();

PUniqueId id;
Expand Down Expand Up @@ -446,11 +447,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (local_state.only_local_exchange) {
if (!block->empty()) {
Status status;
size_t idx = 0;
for (auto* channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
status = channel->send_local_block(block);
// If this channel is the last, we can move this block to downstream pipeline.
// Otherwise, this block also need to be broadcasted to other channels so should be copied.
DCHECK_GE(local_state._last_local_channel_idx, 0);
status = channel->send_local_block(
block, idx == local_state._last_local_channel_idx);
HANDLE_CHANNEL_STATUS(state, channel, status);
}
idx++;
}
}
} else {
Expand All @@ -471,21 +478,33 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
} else {
block_holder->get_block()->Clear();
}
size_t idx = 0;
bool moved = false;
for (auto* channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
Status status;
if (channel->is_local()) {
status = channel->send_local_block(&cur_block);
// If this channel is the last, we can move this block to downstream pipeline.
// Otherwise, this block also need to be broadcasted to other channels so should be copied.
DCHECK_GE(local_state._last_local_channel_idx, 0);
status = channel->send_local_block(
&cur_block, idx == local_state._last_local_channel_idx);
moved = idx == local_state._last_local_channel_idx;
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_broadcast_block(block_holder, eos);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
}
idx++;
}
if (moved) {
local_state._serializer.reset_block();
} else {
cur_block.clear_column_data();
local_state._serializer.get_block()->set_mutable_columns(
cur_block.mutate_columns());
}
cur_block.clear_column_data();
local_state._serializer.get_block()->set_mutable_columns(
cur_block.mutate_columns());
}
}
}
Expand All @@ -496,7 +515,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
if (current_channel->is_local()) {
auto status = current_channel->send_local_block(block);
auto status = current_channel->send_local_block(block, true);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Expand Down Expand Up @@ -582,7 +601,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
if (current_channel->is_local()) {
auto status = current_channel->send_local_block(block);
auto status = current_channel->send_local_block(block, true);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
// for external table sink hash partition
std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
std::atomic<bool> _reach_limit = false;
int _last_local_channel_idx = -1;
};

class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalState> {
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/result_file_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status)
Status status;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
status = channel->send_local_block(_output_block.get());
status = channel->send_local_block(_output_block.get(), false);
HANDLE_CHANNEL_STATUS(state, channel, status);
}
}
Expand All @@ -234,7 +234,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status)
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
if (channel->is_local()) {
status = channel->send_local_block(&cur_block);
status = channel->send_local_block(&cur_block, false);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_broadcast_block(_block_holder, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
SCOPED_TIMER(_init_timer);
_compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime");
_distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
if (_parent->cast<LocalExchangeSinkOperatorX>()._type == ExchangeType::HASH_SHUFFLE) {
_profile->add_info_string(
"UseGlobalShuffle",
std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle));
}
_channel_id = info.task_idx;
return Status::OK();
}
Expand Down Expand Up @@ -61,10 +66,12 @@ Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status exec_statu
std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, "
"{}, _use_global_shuffle: {}, _channel_id: {}, _num_partitions: {}, "
"_num_senders: {}, _num_sources: {}, "
"_running_sink_operators: {}, _running_source_operators: {}, _release_count: {}",
Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions,
_exchanger->_num_senders, _exchanger->_num_sources,
Base::debug_string(indentation_level),
_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle, _channel_id,
_exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources,
_exchanger->_running_sink_operators, _exchanger->_running_source_operators,
_release_count);
return fmt::to_string(debug_string_buffer);
Expand All @@ -76,6 +83,7 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets
_name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")";
_type = type;
if (_type == ExchangeType::HASH_SHUFFLE) {
_use_global_shuffle = should_disable_bucket_shuffle;
// For shuffle join, if data distribution has been broken by previous operator, we
// should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned,
// we should use map shuffle idx to instance idx because all instances will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX<LocalExchangeS
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
const std::map<int, int> _bucket_seq_to_instance_idx;
std::vector<std::pair<int, int>> _shuffle_idx_to_instance_idx;
bool _use_global_shuffle = false;
};

} // namespace doris::pipeline
14 changes: 7 additions & 7 deletions be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,15 @@ Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {
}

template <typename Parent>
Status Channel<Parent>::send_local_block(Block* block) {
Status Channel<Parent>::send_local_block(Block* block, bool can_be_moved) {
SCOPED_TIMER(_parent->local_send_timer());
if (_recvr_is_valid()) {
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes());
COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
}
_local_recvr->add_block(block, _parent->sender_id(), false);
_local_recvr->add_block(block, _parent->sender_id(), can_be_moved);
return Status::OK();
} else {
return _receiver_status;
Expand Down Expand Up @@ -646,7 +646,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
Status status;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
status = channel->send_local_block(block);
status = channel->send_local_block(block, false);
HANDLE_CHANNEL_STATUS(state, channel, status);
}
}
Expand All @@ -671,7 +671,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
if (channel->is_local()) {
status = channel->send_local_block(&cur_block);
status = channel->send_local_block(&cur_block, false);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_broadcast_block(block_holder, eos);
Expand All @@ -698,7 +698,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
if (channel->is_local()) {
status = channel->send_local_block(&cur_block);
status = channel->send_local_block(&cur_block, false);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_remote_block(_cur_pb_block, false);
Expand All @@ -717,7 +717,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
if (current_channel->is_local()) {
auto status = current_channel->send_local_block(block);
auto status = current_channel->send_local_block(block, false);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Expand Down Expand Up @@ -829,7 +829,7 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) {
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
if (channel->is_local()) {
status = channel->send_local_block(&block);
status = channel->send_local_block(&block, false);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_remote_block(_cur_pb_block, false);
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/vdata_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ class Channel {

Status send_local_block(Status exec_status, bool eos = false);

Status send_local_block(Block* block);
Status send_local_block(Block* block, bool can_be_moved);
// Flush buffered rows and close channel. This function don't wait the response
// of close operation, client should call close_wait() to finish channel's close.
// We split one close operation into two phases in order to make multiple channels
Expand Down

0 comments on commit b185dfc

Please sign in to comment.