Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat: add implementation of get next move #872

Merged
merged 8 commits into from
Aug 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions include/dsn/utility/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,28 @@ bool hostname(const dsn::rpc_address &address, std::string *hostname_result);
// valid_ip_network_order -> return TRUE && hostname_result=hostname |
// invalid_ip_network_order -> return FALSE
bool hostname_from_ip(uint32_t ip, std::string *hostname_result);

template <typename A, typename B>
std::multimap<B, A> flip_map(const std::map<A, B> &source)
{
std::multimap<B, A> target;
std::transform(source.begin(),
source.end(),
std::inserter(target, target.begin()),
[](const std::pair<A, B> &p) { return std::pair<B, A>(p.second, p.first); });
return target;
}

template <typename T>
std::set<T> get_intersection(const std::set<T> &set1, const std::set<T> &set2)
{
std::set<T> intersection;
std::set_intersection(set1.begin(),
set1.end(),
set2.begin(),
set2.end(),
std::inserter(intersection, intersection.begin()));
return intersection;
}
} // namespace utils
} // namespace dsn
173 changes: 158 additions & 15 deletions src/meta/greedy_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <queue>
#include <dsn/tool-api/command_manager.h>
#include <dsn/utility/math.h>
#include <dsn/utility/utils.h>
#include <dsn/dist/fmt_logging.h>
#include "greedy_load_balancer.h"
#include "meta_data.h"
Expand All @@ -39,18 +40,24 @@ namespace replication {
DSN_DEFINE_bool("meta_server", balance_cluster, false, "whether to enable cluster balancer");
DSN_TAG_VARIABLE(balance_cluster, FT_MUTABLE);

uint32_t get_count(const node_state &ns, cluster_balance_type type, int32_t app_id)
DSN_DEFINE_uint32("meta_server",
balance_op_count_per_round,
10,
"balance operation count per round for cluster balancer");
DSN_TAG_VARIABLE(balance_op_count_per_round, FT_MUTABLE);

uint32_t get_partition_count(const node_state &ns, cluster_balance_type type, int32_t app_id)
{
unsigned count = 0;
switch (type) {
case cluster_balance_type::Secondary:
case cluster_balance_type::COPY_SECONDARY:
if (app_id > 0) {
count = ns.partition_count(app_id) - ns.primary_count(app_id);
} else {
count = ns.partition_count() - ns.primary_count();
}
break;
case cluster_balance_type::Primary:
case cluster_balance_type::COPY_PRIMARY:
if (app_id > 0) {
count = ns.primary_count(app_id);
} else {
Expand All @@ -77,6 +84,23 @@ uint32_t get_skew(const std::map<rpc_address, uint32_t> &count_map)
return max - min;
}

void get_min_max_set(const std::map<rpc_address, uint32_t> &node_count_map,
/*out*/ std::set<rpc_address> &min_set,
/*out*/ std::set<rpc_address> &max_set)
{
std::multimap<uint32_t, rpc_address> count_multimap = utils::flip_map(node_count_map);

auto range = count_multimap.equal_range(count_multimap.begin()->first);
for (auto iter = range.first; iter != range.second; ++iter) {
min_set.insert(iter->second);
}

range = count_multimap.equal_range(count_multimap.rbegin()->first);
for (auto iter = range.first; iter != range.second; ++iter) {
max_set.insert(iter->second);
}
}

greedy_load_balancer::greedy_load_balancer(meta_service *_svc)
: simple_load_balancer(_svc),
_ctrl_balancer_ignored_apps(nullptr),
Expand Down Expand Up @@ -979,36 +1003,50 @@ void greedy_load_balancer::balance_cluster()
}
}

bool need_continue = cluster_replica_balance(t_global_view, cluster_balance_type::Secondary);
bool need_continue = cluster_replica_balance(
t_global_view, cluster_balance_type::COPY_SECONDARY, *t_migration_result);
if (!need_continue) {
return;
}

// TBD(zlw): copy primary
}

bool greedy_load_balancer::cluster_replica_balance(const meta_view *global_view,
const cluster_balance_type type)
const cluster_balance_type type,
/*out*/ migration_list &list)
{
bool enough_information = do_cluster_replica_balance(global_view, type);
bool enough_information = do_cluster_replica_balance(global_view, type, list);
if (!enough_information) {
return false;
}
if (!t_migration_result->empty()) {
ddebug_f(
"migration count of copy {} = {}", enum_to_string(type), t_migration_result->size());
if (!list.empty()) {
ddebug_f("migration count of {} = {}", enum_to_string(type), list.size());
return false;
}
return true;
}

bool greedy_load_balancer::do_cluster_replica_balance(const meta_view *global_view,
const cluster_balance_type type)
const cluster_balance_type type,
/*out*/ migration_list &list)
{
cluster_migration_info cluster_info;
if (!get_cluster_migration_info(global_view, type, cluster_info)) {
return false;
}

/// TBD(zlw)
partition_set selected_pid;
move_info next_move;
while (get_next_move(cluster_info, selected_pid, next_move)) {
if (!apply_move(next_move, selected_pid, list, cluster_info)) {
break;
}
if (list.size() >= FLAGS_balance_op_count_per_round) {
break;
}
}

return true;
}

Expand All @@ -1030,8 +1068,14 @@ bool greedy_load_balancer::get_cluster_migration_info(const meta_view *global_vi
app_mapper apps;
for (const auto &kv : all_apps) {
const std::shared_ptr<app_state> &app = kv.second;
if (is_ignored_app(app->app_id) || app->is_bulk_loading || app->splitting()) {
return false;
auto ignored = is_ignored_app(app->app_id);
if (ignored || app->is_bulk_loading || app->splitting()) {
ddebug_f("skip to balance app({}), ignored={}, bulk loading={}, splitting={}",
app->app_name,
ignored,
app->is_bulk_loading,
app->splitting());
continue;
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
}
if (app->status == app_status::AS_AVAILABLE) {
apps[app->app_id] = app;
Expand All @@ -1054,7 +1098,7 @@ bool greedy_load_balancer::get_cluster_migration_info(const meta_view *global_vi
get_node_migration_info(ns, apps, info);
cluster_info.nodes_info.emplace(kv.first, std::move(info));

auto count = get_count(ns, type, -1);
auto count = get_partition_count(ns, type, -1);
cluster_info.replicas_count[kv.first] = count;
}

Expand Down Expand Up @@ -1085,7 +1129,7 @@ bool greedy_load_balancer::get_app_migration_info(std::shared_ptr<app_state> app

for (const auto &it : nodes) {
const node_state &ns = it.second;
auto count = get_count(ns, type, app->app_id);
auto count = get_partition_count(ns, type, app->app_id);
info.replicas_count[ns.addr()] = count;
}

Expand Down Expand Up @@ -1116,6 +1160,105 @@ void greedy_load_balancer::get_node_migration_info(const node_state &ns,
}
}

bool greedy_load_balancer::get_next_move(const cluster_migration_info &cluster_info,
const partition_set &selected_pid,
/*out*/ move_info &next_move)
{
// key-app skew, value-app id
std::multimap<uint32_t, int32_t> app_skew_multimap = utils::flip_map(cluster_info.apps_skew);
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
auto max_app_skew = app_skew_multimap.rbegin()->first;
if (max_app_skew == 0) {
ddebug_f("every app is balanced and any move will unbalance a app");
return false;
}

auto server_skew = get_skew(cluster_info.replicas_count);
if (max_app_skew <= 1 && server_skew <= 1) {
ddebug_f("every app is balanced and the cluster as a whole is balanced");
return false;
}

/**
* Among the apps with maximum skew, attempt to pick a app where there is
* a move that improves the app skew and the cluster skew, if possible. If
* not, attempt to pick a move that improves the app skew.
**/
std::set<rpc_address> cluster_min_count_nodes;
std::set<rpc_address> cluster_max_count_nodes;
get_min_max_set(cluster_info.replicas_count, cluster_min_count_nodes, cluster_max_count_nodes);

bool found = false;
auto app_range = app_skew_multimap.equal_range(max_app_skew);
for (auto iter = app_range.first; iter != app_range.second; ++iter) {
auto app_id = iter->second;
auto it = cluster_info.apps_info.find(app_id);
if (it == cluster_info.apps_info.end()) {
continue;
}
auto app_map = it->second.replicas_count;
std::set<rpc_address> app_min_count_nodes;
std::set<rpc_address> app_max_count_nodes;
get_min_max_set(app_map, app_min_count_nodes, app_max_count_nodes);

/**
* Compute the intersection of the replica servers most loaded for the app
* with the replica servers most loaded overall, and likewise for least loaded.
* These are our ideal candidates for moving from and to, respectively.
**/
std::set<rpc_address> app_cluster_min_set =
utils::get_intersection(app_min_count_nodes, cluster_min_count_nodes);
std::set<rpc_address> app_cluster_max_set =
utils::get_intersection(app_max_count_nodes, cluster_max_count_nodes);

/**
* Do not move replicas of a balanced app if the least (most) loaded
* servers overall do not intersect the servers hosting the least (most)
* replicas of the app. Moving a replica in that case might keep the
* cluster skew the same or make it worse while keeping the app balanced.
**/
std::multimap<uint32_t, rpc_address> app_count_multimap = utils::flip_map(app_map);
if (app_count_multimap.rbegin()->first <= app_count_multimap.begin()->first + 1 &&
(app_cluster_min_set.empty() || app_cluster_max_set.empty())) {
ddebug_f(
"do not move replicas of a balanced app if the least (most) loaded servers overall "
"do not intersect the servers hosting the least (most) replicas of the app");
continue;
}

if (pick_up_move(cluster_info,
app_cluster_max_set.empty() ? app_max_count_nodes : app_cluster_max_set,
app_cluster_min_set.empty() ? app_min_count_nodes : app_cluster_min_set,
app_id,
selected_pid,
next_move)) {
found = true;
break;
}
}

return found;
}

bool greedy_load_balancer::pick_up_move(const cluster_migration_info &cluster_info,
const std::set<rpc_address> &max_nodes,
const std::set<rpc_address> &min_nodes,
const int32_t app_id,
const partition_set &selected_pid,
/*out*/ move_info &move_info)
{
// TBD(zlw)
return false;
}

bool greedy_load_balancer::apply_move(const move_info &move,
/*out*/ partition_set &selected_pids,
/*out*/ migration_list &list,
/*out*/ cluster_migration_info &cluster_info)
{
// TBD(zlw)
return false;
}

bool greedy_load_balancer::balance(meta_view view, migration_list &list)
{
ddebug("balancer round");
Expand Down
57 changes: 44 additions & 13 deletions src/meta/greedy_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,20 @@ namespace dsn {
namespace replication {
enum class cluster_balance_type
{
Primary = 0,
Secondary,
Invalid,
COPY_PRIMARY = 0,
COPY_SECONDARY,
INVALID,
};
ENUM_BEGIN(cluster_balance_type, cluster_balance_type::Invalid)
ENUM_REG(cluster_balance_type::Primary)
ENUM_REG(cluster_balance_type::Secondary)
ENUM_BEGIN(cluster_balance_type, cluster_balance_type::INVALID)
ENUM_REG(cluster_balance_type::COPY_PRIMARY)
ENUM_REG(cluster_balance_type::COPY_SECONDARY)
ENUM_END(cluster_balance_type)

uint32_t get_count(const node_state &ns, cluster_balance_type type, int32_t app_id);
uint32_t get_partition_count(const node_state &ns, cluster_balance_type type, int32_t app_id);
uint32_t get_skew(const std::map<rpc_address, uint32_t> &count_map);
void get_min_max_set(const std::map<rpc_address, uint32_t> &node_count_map,
/*out*/ std::set<rpc_address> &min_set,
/*out*/ std::set<rpc_address> &max_set);

class greedy_load_balancer : public simple_load_balancer
{
Expand Down Expand Up @@ -153,9 +156,13 @@ class greedy_load_balancer : public simple_load_balancer

void balance_cluster();

bool cluster_replica_balance(const meta_view *global_view, const cluster_balance_type type);
bool cluster_replica_balance(const meta_view *global_view,
const cluster_balance_type type,
/*out*/ migration_list &list);

bool do_cluster_replica_balance(const meta_view *global_view, const cluster_balance_type type);
bool do_cluster_replica_balance(const meta_view *global_view,
const cluster_balance_type type,
/*out*/ migration_list &list);

struct app_migration_info
{
Expand Down Expand Up @@ -187,13 +194,12 @@ class greedy_load_balancer : public simple_load_balancer
struct node_migration_info
{
rpc_address address;
// key-disk tag, value-partition set
std::map<std::string, partition_set> partitions;
partition_set future_partitions;
bool operator<(const node_migration_info &another) const
{
if (address < another.address)
return true;
return false;
return address < another.address;
}
bool operator==(const node_migration_info &another) const
{
Expand All @@ -210,6 +216,15 @@ class greedy_load_balancer : public simple_load_balancer
std::map<rpc_address, uint32_t> replicas_count;
};

struct move_info
{
gpid pid;
rpc_address source_node;
std::string source_disk_tag;
rpc_address target_node;
balance_type type;
};

bool get_cluster_migration_info(const meta_view *global_view,
const cluster_balance_type type,
/*out*/ cluster_migration_info &cluster_info);
Expand All @@ -223,6 +238,22 @@ class greedy_load_balancer : public simple_load_balancer
const app_mapper &all_apps,
/*out*/ node_migration_info &info);

bool get_next_move(const cluster_migration_info &cluster_info,
const partition_set &selected_pid,
/*out*/ move_info &next_move);

bool pick_up_move(const cluster_migration_info &cluster_info,
const std::set<rpc_address> &max_nodes,
const std::set<rpc_address> &min_nodes,
const int32_t app_id,
const partition_set &selected_pid,
/*out*/ move_info &move_info);

bool apply_move(const move_info &move,
/*out*/ partition_set &selected_pids,
/*out*/ migration_list &list,
/*out*/ cluster_migration_info &cluster_info);

bool all_replica_infos_collected(const node_state &ns);
// using t_global_view to get disk_tag of node's pid
const std::string &get_disk_tag(const dsn::rpc_address &node, const dsn::gpid &pid);
Expand Down Expand Up @@ -251,7 +282,7 @@ class greedy_load_balancer : public simple_load_balancer
FRIEND_TEST(greedy_load_balancer, app_migration_info);
FRIEND_TEST(greedy_load_balancer, node_migration_info);
FRIEND_TEST(greedy_load_balancer, get_skew);
FRIEND_TEST(greedy_load_balancer, get_count);
FRIEND_TEST(greedy_load_balancer, get_partition_count);
FRIEND_TEST(greedy_load_balancer, get_app_migration_info);
FRIEND_TEST(greedy_load_balancer, get_node_migration_info);
};
Expand Down
Loading