Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(load-balance): update selection strategy for max load disk (#916)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored Sep 22, 2021
1 parent 865c839 commit 4cce780
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 51 deletions.
79 changes: 49 additions & 30 deletions src/meta/greedy_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ void get_min_max_set(const std::map<rpc_address, uint32_t> &node_count_map,
}
}

template <typename S>
auto select_random(const S &s, size_t n)
{
auto it = std::begin(s);
std::advance(it, n);
return it;
}

greedy_load_balancer::greedy_load_balancer(meta_service *_svc)
: server_load_balancer(_svc),
_ctrl_balancer_ignored_apps(nullptr),
Expand Down Expand Up @@ -1217,9 +1225,10 @@ bool greedy_load_balancer::get_next_move(const cluster_migration_info &cluster_i
std::multimap<uint32_t, rpc_address> app_count_multimap = utils::flip_map(app_map);
if (app_count_multimap.rbegin()->first <= app_count_multimap.begin()->first + 1 &&
(app_cluster_min_set.empty() || app_cluster_max_set.empty())) {
ddebug_f(
"do not move replicas of a balanced app if the least (most) loaded servers overall "
"do not intersect the servers hosting the least (most) replicas of the app");
ddebug_f("do not move replicas of a balanced app({}) if the least (most) loaded "
"servers overall do not intersect the servers hosting the least (most) "
"replicas of the app",
app_id);
continue;
}

Expand All @@ -1244,59 +1253,69 @@ bool greedy_load_balancer::pick_up_move(const cluster_migration_info &cluster_in
const partition_set &selected_pid,
/*out*/ move_info &move_info)
{
rpc_address max_load_node;
std::string max_load_disk;
partition_set max_load_partitions;
get_max_load_disk(
cluster_info, max_nodes, app_id, max_load_node, max_load_disk, max_load_partitions);

std::set<app_disk_info> max_load_disk_set;
get_max_load_disk_set(cluster_info, max_nodes, app_id, max_load_disk_set);
if (max_load_disk_set.empty()) {
return false;
}
auto index = rand() % max_load_disk_set.size();
auto max_load_disk = *select_random(max_load_disk_set, index);
ddebug_f("most load disk({}) on node({}) is picked, has {} partition",
max_load_disk.node.to_string(),
max_load_disk.disk_tag,
max_load_disk.partitions.size());
for (const auto &node_addr : min_nodes) {
gpid picked_pid;
if (pick_up_partition(
cluster_info, node_addr, max_load_partitions, selected_pid, picked_pid)) {
cluster_info, node_addr, max_load_disk.partitions, selected_pid, picked_pid)) {
move_info.pid = picked_pid;
move_info.source_node = max_load_node;
move_info.source_disk_tag = max_load_disk;
move_info.source_node = max_load_disk.node;
move_info.source_disk_tag = max_load_disk.disk_tag;
move_info.target_node = node_addr;
move_info.type = cluster_info.type == cluster_balance_type::COPY_SECONDARY
? balance_type::copy_secondary
: balance_type::copy_primary;
ddebug_f("partition[{}] will migrate from {} to {}",
picked_pid,
max_load_node.to_string(),
max_load_disk.node.to_string(),
node_addr.to_string());
return true;
}
}
ddebug_f("can not find a partition(app_id={}) from random max load disk(node={}, disk={})",
app_id,
max_load_disk.node.to_string(),
max_load_disk.disk_tag);
return false;
}

void greedy_load_balancer::get_max_load_disk(const cluster_migration_info &cluster_info,
const std::set<rpc_address> &max_nodes,
const int32_t app_id,
/*out*/ rpc_address &picked_node,
/*out*/ std::string &picked_disk,
/*out*/ partition_set &target_partitions)
void greedy_load_balancer::get_max_load_disk_set(const cluster_migration_info &cluster_info,
const std::set<rpc_address> &max_nodes,
const int32_t app_id,
/*out*/ std::set<app_disk_info> &max_load_disk_set)
{
int32_t max_load_size = 0;
// key: partition count (app_disk_info.partitions.size())
// value: app_disk_info structure
std::multimap<uint32_t, app_disk_info> app_disk_info_multimap;
for (const auto &node_addr : max_nodes) {
// key: disk_tag
// value: partition set for app(app id=app_id) in node(addr=node_addr)
std::map<std::string, partition_set> disk_partitions =
get_disk_partitions_map(cluster_info, node_addr, app_id);
for (const auto &kv : disk_partitions) {
if (kv.second.size() > max_load_size) {
picked_node = node_addr;
picked_disk = kv.first;
target_partitions = kv.second;
max_load_size = kv.second.size();
}
app_disk_info info;
info.app_id = app_id;
info.node = node_addr;
info.disk_tag = kv.first;
info.partitions = kv.second;
app_disk_info_multimap.insert(
std::pair<uint32_t, app_disk_info>(kv.second.size(), info));
}
}
ddebug_f("most load is node({}), disk_tag({}), target partition count = {}",
picked_node.to_string(),
picked_disk,
target_partitions.size());
auto range = app_disk_info_multimap.equal_range(app_disk_info_multimap.rbegin()->first);
for (auto iter = range.first; iter != range.second; ++iter) {
max_load_disk_set.insert(iter->second);
}
}

std::map<std::string, partition_set> greedy_load_balancer::get_disk_partitions_map(
Expand Down
31 changes: 24 additions & 7 deletions src/meta/greedy_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,25 @@ class greedy_load_balancer : public server_load_balancer
std::map<rpc_address, uint32_t> replicas_count;
};

struct app_disk_info
{
int32_t app_id;
rpc_address node;
std::string disk_tag;
partition_set partitions;
bool operator==(const app_disk_info &another) const
{
return app_id == another.app_id && node == another.node && disk_tag == another.disk_tag;
}
bool operator<(const app_disk_info &another) const
{
if (app_id < another.app_id || (app_id == another.app_id && node < another.node) ||
(app_id == another.app_id && node == another.node && disk_tag < another.disk_tag))
return true;
return false;
}
};

struct move_info
{
gpid pid;
Expand Down Expand Up @@ -249,12 +268,10 @@ class greedy_load_balancer : public server_load_balancer
const partition_set &selected_pid,
/*out*/ move_info &move_info);

void get_max_load_disk(const cluster_migration_info &cluster_info,
const std::set<rpc_address> &max_nodes,
const int32_t app_id,
/*out*/ rpc_address &picked_node,
/*out*/ std::string &picked_disk,
/*out*/ partition_set &target_partitions);
void get_max_load_disk_set(const cluster_migration_info &cluster_info,
const std::set<rpc_address> &max_nodes,
const int32_t app_id,
/*out*/ std::set<app_disk_info> &max_load_disk_set);

std::map<std::string, partition_set> get_disk_partitions_map(
const cluster_migration_info &cluster_info, const rpc_address &addr, const int32_t app_id);
Expand Down Expand Up @@ -302,7 +319,7 @@ class greedy_load_balancer : public server_load_balancer
FRIEND_TEST(greedy_load_balancer, get_app_migration_info);
FRIEND_TEST(greedy_load_balancer, get_node_migration_info);
FRIEND_TEST(greedy_load_balancer, get_disk_partitions_map);
FRIEND_TEST(greedy_load_balancer, get_max_load_disk);
FRIEND_TEST(greedy_load_balancer, get_max_load_disk_set);
FRIEND_TEST(greedy_load_balancer, apply_move);
FRIEND_TEST(greedy_load_balancer, pick_up_partition);
};
Expand Down
41 changes: 27 additions & 14 deletions src/meta/test/greedy_load_balancer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,41 +224,54 @@ TEST(greedy_load_balancer, get_disk_partitions_map)
ASSERT_EQ(disk_partitions[disk_tag].count(pid), 1);
}

TEST(greedy_load_balancer, get_max_load_disk)
TEST(greedy_load_balancer, get_max_load_disk_set)
{
greedy_load_balancer::cluster_migration_info cluster_info;
cluster_info.type = cluster_balance_type::COPY_SECONDARY;

rpc_address addr(1, 10086);
int32_t app_id = 1;
rpc_address addr(1, 10086);
rpc_address addr2(2, 10086);
std::map<rpc_address, partition_status::type> partition;
partition[addr] = partition_status::PS_SECONDARY;
std::map<rpc_address, partition_status::type> partition2;
partition2[addr] = partition_status::PS_SECONDARY;
partition2[addr2] = partition_status::PS_SECONDARY;
greedy_load_balancer::app_migration_info app_info;
app_info.partitions.push_back(partition);
app_info.partitions.push_back(partition2);
cluster_info.apps_info[app_id] = app_info;

greedy_load_balancer::node_migration_info node_info;
partition_set partitions;
gpid pid(app_id, 0);
partitions.insert(pid);
greedy_load_balancer::node_migration_info node_info;
std::string disk_tag = "disk1";
node_info.partitions[disk_tag] = partitions;
partition_set partitions2;
gpid pid2(app_id, 1);
partitions2.insert(pid2);
std::string disk_tag2 = "disk2";
node_info.partitions[disk_tag2] = partitions2;
cluster_info.nodes_info[addr] = node_info;

greedy_load_balancer::node_migration_info node_info2;
partition_set partitions3;
gpid pid3(app_id, 1);
partitions3.insert(pid3);
std::string disk_tag3 = "disk3";
node_info2.partitions[disk_tag3] = partitions3;
cluster_info.nodes_info[addr2] = node_info2;

greedy_load_balancer balancer(nullptr);
std::set<rpc_address> max_nodes;
max_nodes.insert(addr);
max_nodes.insert(addr2);

greedy_load_balancer balancer(nullptr);
rpc_address picked_node;
std::string picked_disk;
partition_set target_partitions;
balancer.get_max_load_disk(
cluster_info, max_nodes, app_id, picked_node, picked_disk, target_partitions);

ASSERT_EQ(picked_node, addr);
ASSERT_EQ(picked_disk, disk_tag);
ASSERT_EQ(target_partitions.size(), 1);
ASSERT_EQ(target_partitions.count(pid), 1);
std::set<greedy_load_balancer::app_disk_info> max_load_disk_set;
balancer.get_max_load_disk_set(cluster_info, max_nodes, app_id, max_load_disk_set);

ASSERT_EQ(max_load_disk_set.size(), 3);
}

TEST(greedy_load_balancer, apply_move)
Expand Down

0 comments on commit 4cce780

Please sign in to comment.