diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f29ec307914eeb..212233dcbae6ee 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1181,14 +1181,15 @@ DEFINE_mString(ca_cert_file_paths, "/etc/ssl/ca-bundle.pem"); /** Table sink configurations(currently contains only external table types) **/ -// Minimum data processed to scale writers when non partition writing +// Minimum data processed to scale writers in exchange when non partition writing DEFINE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold, - "125829120"); // 120MB -// Minimum data processed to start rebalancing in exchange when partition writing -DEFINE_mInt64(table_sink_partition_write_data_processed_threshold, "209715200"); // 200MB + "26214400"); // 25MB // Minimum data processed to trigger skewed partition rebalancing in exchange when partition writing -DEFINE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold, - "209715200"); // 200MB +DEFINE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold, + "26214400"); // 25MB +// Minimum partition data processed to rebalance writers in exchange when partition writing +DEFINE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold, + "15728640"); // 15MB // Maximum processed partition nums of per writer when partition writing DEFINE_mInt32(table_sink_partition_write_max_partition_nums_per_writer, "128"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 81fabfb9517879..b6b7e0c1d3660f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1265,12 +1265,12 @@ DECLARE_String(trino_connector_plugin_dir); DECLARE_mString(ca_cert_file_paths); /** Table sink configurations(currently contains only external table types) **/ -// Minimum data processed to scale writers when non partition writing +// Minimum data processed to scale writers in exchange when non partition writing DECLARE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold); -// Minimum data processed to start rebalancing in exchange when partition writing -DECLARE_mInt64(table_sink_partition_write_data_processed_threshold); // Minimum data processed to trigger skewed partition rebalancing in exchange when partition writing -DECLARE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold); +DECLARE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold); +// Minimum partition data processed to rebalance writers in exchange when partition writing +DECLARE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold); // Maximum processed partition nums of per writer when partition writing DECLARE_mInt32(table_sink_partition_write_max_partition_nums_per_writer); diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index bc55bc8f805803..e1ca46dcdcc4c3 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -257,17 +257,31 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _partitioner.reset( new vectorized::Crc32HashPartitioner(_partition_count)); _partition_function.reset(new HashPartitionFunction(_partitioner.get())); - // const long MEGABYTE = 1024 * 1024; - // const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 10000 * MEGABYTE; // 1MB - // const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50000 * MEGABYTE; // 50MB - // const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1; // 1MB - // const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 1; // 50MB + std::vector task_addresses; + task_addresses.reserve(channels.size()); + for (int i = 0; i < channels.size(); ++i) { + const TNetworkAddress& brpc_dest_addr = channels[i]->brpc_dest_addr(); + task_addresses.emplace_back( + fmt::format("{}:{}", brpc_dest_addr.hostname, brpc_dest_addr.port)); + } scale_writer_partitioning_exchanger.reset(new vectorized::ScaleWriterPartitioningExchanger< HashPartitionFunction>( channels.size(), *_partition_function, _partition_count, channels.size(), 1, - config::table_sink_partition_write_data_processed_threshold, - config::table_sink_partition_write_skewed_data_processed_rebalance_threshold)); + config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold / + state->task_num() == + 0 + ? config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold + : config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold / + state->task_num(), + config::table_sink_partition_write_min_data_processed_rebalance_threshold / + state->task_num() == + 0 + ? config::table_sink_partition_write_min_data_processed_rebalance_threshold + : config::table_sink_partition_write_min_data_processed_rebalance_threshold / + state->task_num(), + &task_addresses)); + RETURN_IF_ERROR(_partitioner->init(p._texprs)); RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); _profile->add_info_string("Partitioner", diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index ce2d4a507487eb..f23e39472ab993 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -593,6 +593,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( init_runtime_state(task_runtime_state); auto cur_task_id = _total_tasks++; task_runtime_state->set_task_id(cur_task_id); + task_runtime_state->set_task_num(pipeline->num_tasks()); auto task = std::make_unique( pipeline, cur_task_id, get_task_runtime_state(cur_task_id), this, pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline), diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 6fae242d53cd95..85413406fd2978 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -624,6 +624,10 @@ class RuntimeState { int task_id() const { return _task_id; } + void set_task_num(int task_num) { _task_num = task_num; } + + int task_num() const { return _task_num; } + private: Status create_error_log_file(); @@ -734,6 +738,7 @@ class RuntimeState { std::vector _error_tablet_infos; int _max_operator_id = 0; int _task_id = -1; + int _task_num = 0; std::vector _hive_partition_updates; diff --git a/be/src/vec/exec/skewed_partition_rebalancer.cpp b/be/src/vec/exec/skewed_partition_rebalancer.cpp index ae12d365f057dc..a56a35eab9bf34 100644 --- a/be/src/vec/exec/skewed_partition_rebalancer.cpp +++ b/be/src/vec/exec/skewed_partition_rebalancer.cpp @@ -28,7 +28,7 @@ namespace doris::vectorized { SkewedPartitionRebalancer::SkewedPartitionRebalancer( int partition_count, int task_count, int task_bucket_count, long min_partition_data_processed_rebalance_threshold, - long min_data_processed_rebalance_threshold) + long min_data_processed_rebalance_threshold, const std::vector* task_addresses) : _partition_count(partition_count), _task_count(task_count), _task_bucket_count(task_bucket_count), @@ -45,13 +45,40 @@ SkewedPartitionRebalancer::SkewedPartitionRebalancer( _partition_data_size_since_last_rebalance_per_task(partition_count, 0), _estimated_task_bucket_data_size_since_last_rebalance(task_count * task_bucket_count, 0), _partition_assignments(partition_count) { + if (task_addresses != nullptr) { + CHECK(task_addresses->size() == task_count); + _task_addresses = *task_addresses; + for (int i = 0; i < _task_addresses.size(); ++i) { + auto it = _assigned_address_to_task_buckets_num.find(_task_addresses[i]); + if (it == _assigned_address_to_task_buckets_num.end()) { + _assigned_address_to_task_buckets_num.insert({_task_addresses[i], 0}); + } + } + } else { + _assigned_address_to_task_buckets_num.insert({TASK_BUCKET_ADDRESS_NOT_SET, 0}); + } + std::vector task_bucket_ids(task_count, 0); for (int partition = 0; partition < partition_count; partition++) { int task_id = partition % task_count; int bucket_id = task_bucket_ids[task_id]++ % task_bucket_count; - TaskBucket task_bucket(task_id, bucket_id, task_bucket_count); + TaskBucket task_bucket( + task_id, bucket_id, task_bucket_count, + (_task_addresses.empty()) ? TASK_BUCKET_ADDRESS_NOT_SET : _task_addresses[task_id]); _partition_assignments[partition].emplace_back(std::move(task_bucket)); + + for (int i = 0; i < _partition_assignments[partition].size(); ++i) { + auto it = _assigned_address_to_task_buckets_num.find( + _partition_assignments[partition][i].task_address); + if (it != _assigned_address_to_task_buckets_num.end()) { + _assigned_address_to_task_buckets_num[_partition_assignments[partition][i] + .task_address]++; + } else { + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + } } } @@ -140,7 +167,7 @@ void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness( continue; } - std::vector min_skewed_task_buckets = + std::multimap min_skewed_task_buckets = _find_skewed_min_task_buckets(max_task_bucket.value(), min_task_buckets); if (min_skewed_task_buckets.empty()) { break; @@ -161,10 +188,27 @@ void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness( int total_assigned_tasks = _partition_assignments[max_partition_value].size(); if (_partition_data_size[max_partition_value] >= (_min_partition_data_processed_rebalance_threshold * total_assigned_tasks)) { - for (const TaskBucket& min_task_bucket : min_skewed_task_buckets) { - if (_rebalance_partition(max_partition_value, min_task_bucket, max_task_buckets, - min_task_buckets)) { - scaled_partitions.push_back(max_partition_value); + bool found = false; + std::vector> + sorted_assigned_address_to_task_buckets_num( + _assigned_address_to_task_buckets_num.begin(), + _assigned_address_to_task_buckets_num.end()); + + std::sort(sorted_assigned_address_to_task_buckets_num.begin(), + sorted_assigned_address_to_task_buckets_num.end(), + [](const auto& a, const auto& b) { return a.second < b.second; }); + for (auto& pair : sorted_assigned_address_to_task_buckets_num) { + auto range = min_skewed_task_buckets.equal_range(pair.first); + for (auto it = range.first; it != range.second; ++it) { + TaskBucket& task_bucket = it->second; + if (_rebalance_partition(max_partition_value, task_bucket, max_task_buckets, + min_task_buckets)) { + scaled_partitions.push_back(max_partition_value); + found = true; + break; + } + } + if (found) { break; } } @@ -175,12 +219,12 @@ void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness( } } -std::vector +std::multimap SkewedPartitionRebalancer::_find_skewed_min_task_buckets( const TaskBucket& max_task_bucket, const IndexedPriorityQueue& min_task_buckets) { - std::vector min_skewed_task_buckets; + std::multimap min_skewed_task_buckets; for (const auto& min_task_bucket : min_task_buckets) { double skewness = @@ -192,7 +236,7 @@ SkewedPartitionRebalancer::_find_skewed_min_task_buckets( break; } if (max_task_bucket.task_id != min_task_bucket.task_id) { - min_skewed_task_buckets.push_back(min_task_bucket); + min_skewed_task_buckets.insert({min_task_bucket.task_address, min_task_bucket}); } } return min_skewed_task_buckets; @@ -213,6 +257,7 @@ bool SkewedPartitionRebalancer::_rebalance_partition( } assignments.push_back(to_task_bucket); + _assigned_address_to_task_buckets_num[to_task_bucket.task_address]++; int new_task_count = assignments.size(); int old_task_count = new_task_count - 1; @@ -279,10 +324,14 @@ void SkewedPartitionRebalancer::_rebalance_partitions(long data_processed) { IndexedPriorityQueue min_task_buckets; - for (int taskId = 0; taskId < _task_count; taskId++) { - for (int bucketId = 0; bucketId < _task_bucket_count; bucketId++) { - TaskBucket task_bucket1(taskId, bucketId, _task_bucket_count); - TaskBucket task_bucket2(taskId, bucketId, _task_bucket_count); + for (int task_id = 0; task_id < _task_count; task_id++) { + for (int bucket_id = 0; bucket_id < _task_bucket_count; bucket_id++) { + TaskBucket task_bucket1(task_id, bucket_id, _task_bucket_count, + (_task_addresses.empty()) ? TASK_BUCKET_ADDRESS_NOT_SET + : _task_addresses[task_id]); + TaskBucket task_bucket2(task_id, bucket_id, _task_bucket_count, + (_task_addresses.empty()) ? TASK_BUCKET_ADDRESS_NOT_SET + : _task_addresses[task_id]); _estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id] = _calculate_task_bucket_data_size_since_last_rebalance( task_bucket_max_partitions[task_bucket1.id]); @@ -299,4 +348,4 @@ void SkewedPartitionRebalancer::_rebalance_partitions(long data_processed) { task_bucket_max_partitions); _data_processed_at_last_rebalance = data_processed; } -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/be/src/vec/exec/skewed_partition_rebalancer.h b/be/src/vec/exec/skewed_partition_rebalancer.h index 814ebc1d465ca1..57b4e067718f22 100644 --- a/be/src/vec/exec/skewed_partition_rebalancer.h +++ b/be/src/vec/exec/skewed_partition_rebalancer.h @@ -60,9 +60,12 @@ class SkewedPartitionRebalancer { struct TaskBucket { int task_id; int id; + std::string task_address; - TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_) - : task_id(task_id_), id(task_id_ * task_bucket_count_ + bucket_id_) {} + TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_, std::string task_address_) + : task_id(task_id_), + id(task_id_ * task_bucket_count_ + bucket_id_), + task_address(task_address_) {} bool operator==(const TaskBucket& other) const { return id == other.id; } @@ -74,7 +77,8 @@ class SkewedPartitionRebalancer { public: SkewedPartitionRebalancer(int partition_count, int task_count, int task_bucket_count, long min_partition_data_processed_rebalance_threshold, - long min_data_processed_rebalance_threshold); + long min_data_processed_rebalance_threshold, + const std::vector* task_addresses = nullptr); std::vector> get_partition_assignments(); int get_task_count(); @@ -96,7 +100,7 @@ class SkewedPartitionRebalancer { std::vector< IndexedPriorityQueue>& task_bucket_max_partitions); - std::vector _find_skewed_min_task_buckets( + std::multimap _find_skewed_min_task_buckets( const TaskBucket& max_task_bucket, const IndexedPriorityQueue& @@ -113,12 +117,14 @@ class SkewedPartitionRebalancer { private: static constexpr double TASK_BUCKET_SKEWNESS_THRESHOLD = 0.7; + static constexpr const char* TASK_BUCKET_ADDRESS_NOT_SET = "TASK_BUCKET_ADDRESS_NOT_SET"; int _partition_count; int _task_count; int _task_bucket_count; long _min_partition_data_processed_rebalance_threshold; long _min_data_processed_rebalance_threshold; + std::vector _task_addresses; std::vector _partition_row_count; long _data_processed; long _data_processed_at_last_rebalance; @@ -128,5 +134,6 @@ class SkewedPartitionRebalancer { std::vector _estimated_task_bucket_data_size_since_last_rebalance; std::vector> _partition_assignments; + std::unordered_map _assigned_address_to_task_buckets_num; }; -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp index f7435249c20f64..ec9c4163690d20 100644 --- a/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp +++ b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp @@ -33,12 +33,13 @@ class ScaleWriterPartitioningExchanger { ScaleWriterPartitioningExchanger(int channel_size, PartitionFunction& partition_function, int partition_count, int task_count, int task_bucket_count, long min_partition_data_processed_rebalance_threshold, - long min_data_processed_rebalance_threshold) + long min_data_processed_rebalance_threshold, + const std::vector* task_addresses) : _channel_size(channel_size), _partition_function(partition_function), _partition_rebalancer(partition_count, task_count, task_bucket_count, min_partition_data_processed_rebalance_threshold, - min_data_processed_rebalance_threshold), + min_data_processed_rebalance_threshold, task_addresses), _partition_row_counts(partition_count, 0), _partition_writer_ids(partition_count, -1), _partition_writer_indexes(partition_count, 0) {} @@ -89,4 +90,4 @@ class ScaleWriterPartitioningExchanger { std::vector _partition_writer_indexes; }; -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized