Skip to content

Commit

Permalink
[minor](log) Add debug log (#41857)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Oct 15, 2024
1 parent 94ce0b4 commit 1e5c758
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
14 changes: 11 additions & 3 deletions be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets
_name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")";
_type = type;
if (_type == ExchangeType::HASH_SHUFFLE) {
_use_global_shuffle = should_disable_bucket_shuffle;
// For shuffle join, if data distribution has been broken by previous operator, we
// should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned,
// we should use map shuffle idx to instance idx because all instances will be
Expand Down Expand Up @@ -84,6 +85,11 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
SCOPED_TIMER(_init_timer);
_compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime");
_distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
if (_parent->cast<LocalExchangeSinkOperatorX>()._type == ExchangeType::HASH_SHUFFLE) {
_profile->add_info_string(
"UseGlobalShuffle",
std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle));
}
_channel_id = info.task_idx;
return Status::OK();
}
Expand All @@ -107,10 +113,12 @@ Status LocalExchangeSinkLocalState::open(RuntimeState* state) {
std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, "
"{}, _use_global_shuffle: {}, _channel_id: {}, _num_partitions: {}, "
"_num_senders: {}, _num_sources: {}, "
"_running_sink_operators: {}, _running_source_operators: {}",
Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions,
_exchanger->_num_senders, _exchanger->_num_sources,
Base::debug_string(indentation_level),
_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle, _channel_id,
_exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources,
_exchanger->_running_sink_operators, _exchanger->_running_source_operators);
return fmt::to_string(debug_string_buffer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX<LocalExchangeS
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
const std::map<int, int> _bucket_seq_to_instance_idx;
std::vector<std::pair<int, int>> _shuffle_idx_to_instance_idx;
bool _use_global_shuffle = false;
};

} // namespace doris::pipeline

0 comments on commit 1e5c758

Please sign in to comment.