Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat: implement greedy_load_balancer::apply_move (#882)
Browse files Browse the repository at this point in the history
  • Loading branch information
levy5307 authored Aug 26, 2021
1 parent f5ecd5e commit 384b19a
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 2 deletions.
69 changes: 67 additions & 2 deletions src/meta/greedy_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <dsn/utility/math.h>
#include <dsn/utility/utils.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/fail_point.h>
#include "greedy_load_balancer.h"
#include "meta_data.h"
#include "meta_admin_types.h"
Expand Down Expand Up @@ -278,6 +279,8 @@ greedy_load_balancer::generate_balancer_request(const partition_configuration &p
const rpc_address &from,
const rpc_address &to)
{
FAIL_POINT_INJECT_F("generate_balancer_request", [](string_view name) { return nullptr; });

configuration_balancer_request result;
result.gpid = pc.pid;

Expand Down Expand Up @@ -1369,8 +1372,70 @@ bool greedy_load_balancer::apply_move(const move_info &move,
/*out*/ migration_list &list,
/*out*/ cluster_migration_info &cluster_info)
{
// TBD(zlw)
return false;
int32_t app_id = move.pid.get_app_id();
rpc_address source = move.source_node, target = move.target_node;
if (cluster_info.apps_skew.find(app_id) == cluster_info.apps_skew.end() ||
cluster_info.replicas_count.find(source) == cluster_info.replicas_count.end() ||
cluster_info.replicas_count.find(target) == cluster_info.replicas_count.end() ||
cluster_info.apps_info.find(app_id) == cluster_info.apps_info.end()) {
return false;
}

app_migration_info app_info = cluster_info.apps_info[app_id];
if (app_info.partitions.size() <= move.pid.get_partition_index() ||
app_info.replicas_count.find(source) == app_info.replicas_count.end() ||
app_info.replicas_count.find(target) == app_info.replicas_count.end()) {
return false;
}
app_info.replicas_count[source]--;
app_info.replicas_count[target]++;

auto &pmap = app_info.partitions[move.pid.get_partition_index()];
rpc_address primary_addr;
for (const auto &kv : pmap) {
if (kv.second == partition_status::PS_PRIMARY) {
primary_addr = kv.first;
}
}
auto status = cluster_info.type == cluster_balance_type::COPY_SECONDARY
? partition_status::PS_SECONDARY
: partition_status::PS_PRIMARY;
auto iter = pmap.find(source);
if (iter == pmap.end() || iter->second != status) {
return false;
}
pmap.erase(source);
pmap[target] = status;

auto iters = cluster_info.nodes_info.find(source);
auto itert = cluster_info.nodes_info.find(target);
if (iters == cluster_info.nodes_info.end() || itert == cluster_info.nodes_info.end()) {
return false;
}
node_migration_info node_source = iters->second;
node_migration_info node_target = itert->second;
auto it = node_source.partitions.find(move.source_disk_tag);
if (it == node_source.partitions.end()) {
return false;
}
it->second.erase(move.pid);
node_target.future_partitions.insert(move.pid);

// add into migration list and selected_pid
partition_configuration pc;
pc.pid = move.pid;
pc.primary = primary_addr;
list[move.pid] = generate_balancer_request(pc, move.type, source, target);
t_migration_result->emplace(move.pid, generate_balancer_request(pc, move.type, source, target));
selected_pids.insert(move.pid);

cluster_info.apps_skew[app_id] = get_skew(app_info.replicas_count);
cluster_info.apps_info[app_id] = app_info;
cluster_info.nodes_info[source] = node_source;
cluster_info.nodes_info[target] = node_target;
cluster_info.replicas_count[source]--;
cluster_info.replicas_count[target]++;
return true;
}

bool greedy_load_balancer::balance(meta_view view, migration_list &list)
Expand Down
1 change: 1 addition & 0 deletions src/meta/greedy_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ class greedy_load_balancer : public simple_load_balancer
FRIEND_TEST(greedy_load_balancer, get_node_migration_info);
FRIEND_TEST(greedy_load_balancer, get_disk_partitions_map);
FRIEND_TEST(greedy_load_balancer, get_max_load_disk);
FRIEND_TEST(greedy_load_balancer, apply_move);
FRIEND_TEST(greedy_load_balancer, pick_up_partition);
};

Expand Down
87 changes: 87 additions & 0 deletions src/meta/test/greedy_load_balancer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <gtest/gtest.h>
#include <dsn/utility/defer.h>
#include <dsn/utility/fail_point.h>
#include "meta/greedy_load_balancer.h"

namespace dsn {
Expand Down Expand Up @@ -260,6 +261,92 @@ TEST(greedy_load_balancer, get_max_load_disk)
ASSERT_EQ(target_partitions.count(pid), 1);
}

TEST(greedy_load_balancer, apply_move)
{
struct greedy_load_balancer::move_info minfo;
int32_t app_id = 1;
int32_t partition_index = 1;
minfo.pid = gpid(app_id, partition_index);
rpc_address source_node(1, 10086);
minfo.source_node = source_node;
std::string disk_tag = "disk1";
minfo.source_disk_tag = disk_tag;
rpc_address target_node(2, 10086);
minfo.target_node = target_node;
minfo.type = greedy_load_balancer::balance_type::move_primary;

greedy_load_balancer balancer(nullptr);
greedy_load_balancer::cluster_migration_info cluster_info;
cluster_info.type = cluster_balance_type::COPY_SECONDARY;
partition_set selected_pids;
migration_list list;
balancer.t_migration_result = &list;

// target_node is not found in cluster_info.replicas_count
auto res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
ASSERT_FALSE(res);

// source_node is not found in cluster_info.replicas_count
cluster_info.apps_skew[app_id] = 1;
res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
ASSERT_FALSE(res);

// target_node is not found in cluster_info.replicas_count
cluster_info.replicas_count[source_node] = 1;
res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
ASSERT_FALSE(res);

// app_id is not found in cluster_info.app_skew
cluster_info.replicas_count[target_node] = 1;
res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
ASSERT_FALSE(res);

// source_node and target_node are not found in app_info
greedy_load_balancer::app_migration_info app_info;
cluster_info.apps_info[app_id] = app_info;
res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
ASSERT_FALSE(res);

// app_info.partitions.size() < partition_index
app_info.replicas_count[target_node] = 1;
app_info.replicas_count[source_node] = 1;
cluster_info.apps_info[app_id] = app_info;
res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
ASSERT_FALSE(res);

// all of the partition status are not PS_SECONDARY
std::map<rpc_address, partition_status::type> partition_status;
partition_status[source_node] = partition_status::type::PS_PRIMARY;
cluster_info.apps_info[app_id].partitions.push_back(partition_status);
cluster_info.apps_info[app_id].partitions.push_back(partition_status);
res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
ASSERT_FALSE(res);

// target_node and source_node are not found in cluster_info.nodes_info
partition_status[source_node] = partition_status::type::PS_SECONDARY;
cluster_info.apps_info[app_id].partitions.clear();
cluster_info.apps_info[app_id].partitions.push_back(partition_status);
cluster_info.apps_info[app_id].partitions.push_back(partition_status);
res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
ASSERT_FALSE(res);

// disk_tag is not found in node_info
greedy_load_balancer::node_migration_info target_info;
greedy_load_balancer::node_migration_info source_info;
cluster_info.nodes_info[target_node] = target_info;
cluster_info.nodes_info[source_node] = source_info;
res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
ASSERT_FALSE(res);

fail::setup();
fail::cfg("generate_balancer_request", "return()");
partition_set source_partition_set;
cluster_info.nodes_info[source_node].partitions[disk_tag] = source_partition_set;
res = balancer.apply_move(minfo, selected_pids, list, cluster_info);
fail::teardown();
ASSERT_TRUE(res);
}

TEST(greedy_load_balancer, pick_up_partition)
{
greedy_load_balancer::cluster_migration_info cluster_info;
Expand Down

0 comments on commit 384b19a

Please sign in to comment.