Skip to content

Commit

Permalink
Init commit.
Browse files Browse the repository at this point in the history
  • Loading branch information
kaka11chen committed Apr 8, 2024
1 parent 272d58a commit 45e6cc2
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 40 deletions.
13 changes: 7 additions & 6 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1181,14 +1181,15 @@ DEFINE_mString(ca_cert_file_paths,
"/etc/ssl/ca-bundle.pem");

/** Table sink configurations(currently contains only external table types) **/
// Minimum data processed to scale writers when non partition writing
// Minimum data processed to scale writers in exchange when non partition writing
DEFINE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold,
"125829120"); // 120MB
// Minimum data processed to start rebalancing in exchange when partition writing
DEFINE_mInt64(table_sink_partition_write_data_processed_threshold, "209715200"); // 200MB
"26214400"); // 25MB
// Minimum data processed to trigger skewed partition rebalancing in exchange when partition writing
DEFINE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold,
"209715200"); // 200MB
DEFINE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold,
"26214400"); // 25MB
// Minimum partition data processed to rebalance writers in exchange when partition writing
DEFINE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold,
"15728640"); // 15MB
// Maximum processed partition nums of per writer when partition writing
DEFINE_mInt32(table_sink_partition_write_max_partition_nums_per_writer, "128");

Expand Down
8 changes: 4 additions & 4 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1265,12 +1265,12 @@ DECLARE_String(trino_connector_plugin_dir);
DECLARE_mString(ca_cert_file_paths);

/** Table sink configurations(currently contains only external table types) **/
// Minimum data processed to scale writers when non partition writing
// Minimum data processed to scale writers in exchange when non partition writing
DECLARE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold);
// Minimum data processed to start rebalancing in exchange when partition writing
DECLARE_mInt64(table_sink_partition_write_data_processed_threshold);
// Minimum data processed to trigger skewed partition rebalancing in exchange when partition writing
DECLARE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold);
DECLARE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold);
// Minimum partition data processed to rebalance writers in exchange when partition writing
DECLARE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold);
// Maximum processed partition nums of per writer when partition writing
DECLARE_mInt32(table_sink_partition_write_max_partition_nums_per_writer);

Expand Down
28 changes: 21 additions & 7 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,17 +257,31 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_partitioner.reset(
new vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_partition_count));
_partition_function.reset(new HashPartitionFunction(_partitioner.get()));
// const long MEGABYTE = 1024 * 1024;
// const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 10000 * MEGABYTE; // 1MB
// const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50000 * MEGABYTE; // 50MB

// const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1; // 1MB
// const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 1; // 50MB
std::vector<std::string> task_addresses;
task_addresses.reserve(channels.size());
for (int i = 0; i < channels.size(); ++i) {
const TNetworkAddress& brpc_dest_addr = channels[i]->brpc_dest_addr();
task_addresses.emplace_back(
fmt::format("{}:{}", brpc_dest_addr.hostname, brpc_dest_addr.port));
}
scale_writer_partitioning_exchanger.reset(new vectorized::ScaleWriterPartitioningExchanger<
HashPartitionFunction>(
channels.size(), *_partition_function, _partition_count, channels.size(), 1,
config::table_sink_partition_write_data_processed_threshold,
config::table_sink_partition_write_skewed_data_processed_rebalance_threshold));
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold /
state->task_num() ==
0
? config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
: config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold /
state->task_num(),
config::table_sink_partition_write_min_data_processed_rebalance_threshold /
state->task_num() ==
0
? config::table_sink_partition_write_min_data_processed_rebalance_threshold
: config::table_sink_partition_write_min_data_processed_rebalance_threshold /
state->task_num(),
&task_addresses));

RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
init_runtime_state(task_runtime_state);
auto cur_task_id = _total_tasks++;
task_runtime_state->set_task_id(cur_task_id);
task_runtime_state->set_task_num(pipeline->num_tasks());
auto task = std::make_unique<PipelineXTask>(
pipeline, cur_task_id, get_task_runtime_state(cur_task_id), this,
pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline),
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,10 @@ class RuntimeState {

int task_id() const { return _task_id; }

void set_task_num(int task_num) { _task_num = task_num; }

int task_num() const { return _task_num; }

private:
Status create_error_log_file();

Expand Down Expand Up @@ -734,6 +738,7 @@ class RuntimeState {
std::vector<TErrorTabletInfo> _error_tablet_infos;
int _max_operator_id = 0;
int _task_id = -1;
int _task_num = 0;

std::vector<THivePartitionUpdate> _hive_partition_updates;

Expand Down
79 changes: 64 additions & 15 deletions be/src/vec/exec/skewed_partition_rebalancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace doris::vectorized {
SkewedPartitionRebalancer::SkewedPartitionRebalancer(
int partition_count, int task_count, int task_bucket_count,
long min_partition_data_processed_rebalance_threshold,
long min_data_processed_rebalance_threshold)
long min_data_processed_rebalance_threshold, const std::vector<std::string>* task_addresses)
: _partition_count(partition_count),
_task_count(task_count),
_task_bucket_count(task_bucket_count),
Expand All @@ -45,13 +45,40 @@ SkewedPartitionRebalancer::SkewedPartitionRebalancer(
_partition_data_size_since_last_rebalance_per_task(partition_count, 0),
_estimated_task_bucket_data_size_since_last_rebalance(task_count * task_bucket_count, 0),
_partition_assignments(partition_count) {
if (task_addresses != nullptr) {
CHECK(task_addresses->size() == task_count);
_task_addresses = *task_addresses;
for (int i = 0; i < _task_addresses.size(); ++i) {
auto it = _assigned_address_to_task_buckets_num.find(_task_addresses[i]);
if (it == _assigned_address_to_task_buckets_num.end()) {
_assigned_address_to_task_buckets_num.insert({_task_addresses[i], 0});
}
}
} else {
_assigned_address_to_task_buckets_num.insert({TASK_BUCKET_ADDRESS_NOT_SET, 0});
}

std::vector<int> task_bucket_ids(task_count, 0);

for (int partition = 0; partition < partition_count; partition++) {
int task_id = partition % task_count;
int bucket_id = task_bucket_ids[task_id]++ % task_bucket_count;
TaskBucket task_bucket(task_id, bucket_id, task_bucket_count);
TaskBucket task_bucket(
task_id, bucket_id, task_bucket_count,
(_task_addresses.empty()) ? TASK_BUCKET_ADDRESS_NOT_SET : _task_addresses[task_id]);
_partition_assignments[partition].emplace_back(std::move(task_bucket));

for (int i = 0; i < _partition_assignments[partition].size(); ++i) {
auto it = _assigned_address_to_task_buckets_num.find(
_partition_assignments[partition][i].task_address);
if (it != _assigned_address_to_task_buckets_num.end()) {
_assigned_address_to_task_buckets_num[_partition_assignments[partition][i]
.task_address]++;
} else {
LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
}
}
}
}

Expand Down Expand Up @@ -140,7 +167,7 @@ void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness(
continue;
}

std::vector<TaskBucket> min_skewed_task_buckets =
std::multimap<std::string, SkewedPartitionRebalancer::TaskBucket> min_skewed_task_buckets =
_find_skewed_min_task_buckets(max_task_bucket.value(), min_task_buckets);
if (min_skewed_task_buckets.empty()) {
break;
Expand All @@ -161,10 +188,27 @@ void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness(
int total_assigned_tasks = _partition_assignments[max_partition_value].size();
if (_partition_data_size[max_partition_value] >=
(_min_partition_data_processed_rebalance_threshold * total_assigned_tasks)) {
for (const TaskBucket& min_task_bucket : min_skewed_task_buckets) {
if (_rebalance_partition(max_partition_value, min_task_bucket, max_task_buckets,
min_task_buckets)) {
scaled_partitions.push_back(max_partition_value);
bool found = false;
std::vector<std::pair<std::string, int>>
sorted_assigned_address_to_task_buckets_num(
_assigned_address_to_task_buckets_num.begin(),
_assigned_address_to_task_buckets_num.end());

std::sort(sorted_assigned_address_to_task_buckets_num.begin(),
sorted_assigned_address_to_task_buckets_num.end(),
[](const auto& a, const auto& b) { return a.second < b.second; });
for (auto& pair : sorted_assigned_address_to_task_buckets_num) {
auto range = min_skewed_task_buckets.equal_range(pair.first);
for (auto it = range.first; it != range.second; ++it) {
TaskBucket& task_bucket = it->second;
if (_rebalance_partition(max_partition_value, task_bucket, max_task_buckets,
min_task_buckets)) {
scaled_partitions.push_back(max_partition_value);
found = true;
break;
}
}
if (found) {
break;
}
}
Expand All @@ -175,12 +219,12 @@ void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness(
}
}

std::vector<SkewedPartitionRebalancer::TaskBucket>
std::multimap<std::string, SkewedPartitionRebalancer::TaskBucket>
SkewedPartitionRebalancer::_find_skewed_min_task_buckets(
const TaskBucket& max_task_bucket,
const IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
min_task_buckets) {
std::vector<TaskBucket> min_skewed_task_buckets;
std::multimap<std::string, SkewedPartitionRebalancer::TaskBucket> min_skewed_task_buckets;

for (const auto& min_task_bucket : min_task_buckets) {
double skewness =
Expand All @@ -192,7 +236,7 @@ SkewedPartitionRebalancer::_find_skewed_min_task_buckets(
break;
}
if (max_task_bucket.task_id != min_task_bucket.task_id) {
min_skewed_task_buckets.push_back(min_task_bucket);
min_skewed_task_buckets.insert({min_task_bucket.task_address, min_task_bucket});
}
}
return min_skewed_task_buckets;
Expand All @@ -213,6 +257,7 @@ bool SkewedPartitionRebalancer::_rebalance_partition(
}

assignments.push_back(to_task_bucket);
_assigned_address_to_task_buckets_num[to_task_bucket.task_address]++;

int new_task_count = assignments.size();
int old_task_count = new_task_count - 1;
Expand Down Expand Up @@ -279,10 +324,14 @@ void SkewedPartitionRebalancer::_rebalance_partitions(long data_processed) {
IndexedPriorityQueue<TaskBucket, IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>
min_task_buckets;

for (int taskId = 0; taskId < _task_count; taskId++) {
for (int bucketId = 0; bucketId < _task_bucket_count; bucketId++) {
TaskBucket task_bucket1(taskId, bucketId, _task_bucket_count);
TaskBucket task_bucket2(taskId, bucketId, _task_bucket_count);
for (int task_id = 0; task_id < _task_count; task_id++) {
for (int bucket_id = 0; bucket_id < _task_bucket_count; bucket_id++) {
TaskBucket task_bucket1(task_id, bucket_id, _task_bucket_count,
(_task_addresses.empty()) ? TASK_BUCKET_ADDRESS_NOT_SET
: _task_addresses[task_id]);
TaskBucket task_bucket2(task_id, bucket_id, _task_bucket_count,
(_task_addresses.empty()) ? TASK_BUCKET_ADDRESS_NOT_SET
: _task_addresses[task_id]);
_estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id] =
_calculate_task_bucket_data_size_since_last_rebalance(
task_bucket_max_partitions[task_bucket1.id]);
Expand All @@ -299,4 +348,4 @@ void SkewedPartitionRebalancer::_rebalance_partitions(long data_processed) {
task_bucket_max_partitions);
_data_processed_at_last_rebalance = data_processed;
}
} // namespace doris::vectorized
} // namespace doris::vectorized
17 changes: 12 additions & 5 deletions be/src/vec/exec/skewed_partition_rebalancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,12 @@ class SkewedPartitionRebalancer {
struct TaskBucket {
int task_id;
int id;
std::string task_address;

TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_)
: task_id(task_id_), id(task_id_ * task_bucket_count_ + bucket_id_) {}
TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_, std::string task_address_)
: task_id(task_id_),
id(task_id_ * task_bucket_count_ + bucket_id_),
task_address(task_address_) {}

bool operator==(const TaskBucket& other) const { return id == other.id; }

Expand All @@ -74,7 +77,8 @@ class SkewedPartitionRebalancer {
public:
SkewedPartitionRebalancer(int partition_count, int task_count, int task_bucket_count,
long min_partition_data_processed_rebalance_threshold,
long min_data_processed_rebalance_threshold);
long min_data_processed_rebalance_threshold,
const std::vector<std::string>* task_addresses = nullptr);

std::vector<std::list<int>> get_partition_assignments();
int get_task_count();
Expand All @@ -96,7 +100,7 @@ class SkewedPartitionRebalancer {
std::vector<
IndexedPriorityQueue<int, IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>&
task_bucket_max_partitions);
std::vector<TaskBucket> _find_skewed_min_task_buckets(
std::multimap<std::string, SkewedPartitionRebalancer::TaskBucket> _find_skewed_min_task_buckets(
const TaskBucket& max_task_bucket,
const IndexedPriorityQueue<TaskBucket,
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
Expand All @@ -113,12 +117,14 @@ class SkewedPartitionRebalancer {

private:
static constexpr double TASK_BUCKET_SKEWNESS_THRESHOLD = 0.7;
static constexpr const char* TASK_BUCKET_ADDRESS_NOT_SET = "TASK_BUCKET_ADDRESS_NOT_SET";

int _partition_count;
int _task_count;
int _task_bucket_count;
long _min_partition_data_processed_rebalance_threshold;
long _min_data_processed_rebalance_threshold;
std::vector<std::string> _task_addresses;
std::vector<long> _partition_row_count;
long _data_processed;
long _data_processed_at_last_rebalance;
Expand All @@ -128,5 +134,6 @@ class SkewedPartitionRebalancer {
std::vector<long> _estimated_task_bucket_data_size_since_last_rebalance;

std::vector<std::vector<TaskBucket>> _partition_assignments;
std::unordered_map<std::string, int> _assigned_address_to_task_buckets_num;
};
} // namespace doris::vectorized
} // namespace doris::vectorized
7 changes: 4 additions & 3 deletions be/src/vec/sink/scale_writer_partitioning_exchanger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ class ScaleWriterPartitioningExchanger {
ScaleWriterPartitioningExchanger(int channel_size, PartitionFunction& partition_function,
int partition_count, int task_count, int task_bucket_count,
long min_partition_data_processed_rebalance_threshold,
long min_data_processed_rebalance_threshold)
long min_data_processed_rebalance_threshold,
const std::vector<string>* task_addresses)
: _channel_size(channel_size),
_partition_function(partition_function),
_partition_rebalancer(partition_count, task_count, task_bucket_count,
min_partition_data_processed_rebalance_threshold,
min_data_processed_rebalance_threshold),
min_data_processed_rebalance_threshold, task_addresses),
_partition_row_counts(partition_count, 0),
_partition_writer_ids(partition_count, -1),
_partition_writer_indexes(partition_count, 0) {}
Expand Down Expand Up @@ -89,4 +90,4 @@ class ScaleWriterPartitioningExchanger {
std::vector<int> _partition_writer_indexes;
};

} // namespace doris::vectorized
} // namespace doris::vectorized

0 comments on commit 45e6cc2

Please sign in to comment.