Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
GehaFearless committed Jun 25, 2023
1 parent 1a7dc17 commit 8773a6f
Show file tree
Hide file tree
Showing 14 changed files with 173 additions and 159 deletions.
3 changes: 3 additions & 0 deletions idl/meta_admin.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ struct configuration_update_request
// the `meta_split_status` will be set
// only used when on_config_sync
6:optional metadata.split_status meta_split_status;
7:dsn.host_port hp_node;
}

// meta server (config mgr) => primary | secondary (downgrade) (w/ new config)
Expand Down Expand Up @@ -103,6 +104,7 @@ struct configuration_query_by_node_request
1:dsn.rpc_address node;
2:optional list<metadata.replica_info> stored_replicas;
3:optional replica_server_info info;
4:dsn.host_port hp_node;
}

struct configuration_query_by_node_response
Expand Down Expand Up @@ -280,6 +282,7 @@ struct node_info
{
1:node_status status = node_status.NS_INVALID;
2:dsn.rpc_address address;
3:dsn.host_port hp_address;
}

struct configuration_list_nodes_request
Expand Down
10 changes: 7 additions & 3 deletions src/client/partition_resolver_simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,13 +413,17 @@ void partition_resolver_simple::handle_pending_requests(std::deque<request_conte
/*search in cache*/
host_port partition_resolver_simple::get_host_port(const partition_configuration &config) const
{
auto pc = config;
FILL_HP_OPTIONAL_SECTION(pc, primary);
FILL_HP_LIST_OPTIONAL_SECTION(pc, last_drops);

if (_app_is_stateful) {
return host_port(config.primary);
return pc.hp_primary;
} else {
if (config.last_drops.size() == 0) {
if (pc.last_drops.size() == 0) {
return host_port();
} else {
return host_port(config.last_drops[rand::next_u32(0, config.last_drops.size() - 1)]);
return pc.hp_last_drops[rand::next_u32(0, pc.last_drops.size() - 1)];
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/client/replication_ddl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,9 @@ dsn::error_code replication_ddl_client::list_nodes(
return resp.err;
}

for (const dsn::replication::node_info &n : resp.infos) {
nodes[host_port(n.address)] = n.status;
for (dsn::replication::node_info &n : resp.infos) {
FILL_HP_OPTIONAL_SECTION(n, address);
nodes[n.hp_address] = n.status;
}

return dsn::ERR_OK;
Expand Down
4 changes: 2 additions & 2 deletions src/meta/app_balance_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ int copy_secondary_operation::get_partition_count(const node_state &ns) const
bool copy_secondary_operation::can_select(gpid pid, migration_list *result)
{
int id_max = *_ordered_address_ids.rbegin();
const node_state &max_ns = _nodes.at(_address_vec[id_max]);
const node_state &max_ns = _nodes.at(host_port(_address_vec[id_max]));
if (max_ns.served_as(pid) == partition_status::PS_PRIMARY) {
LOG_DEBUG("{}: skip gpid({}.{}) coz it is primary",
_app->get_logname(),
Expand All @@ -183,7 +183,7 @@ bool copy_secondary_operation::can_select(gpid pid, migration_list *result)
}

int id_min = *_ordered_address_ids.begin();
const node_state &min_ns = _nodes.at(_address_vec[id_min]);
const node_state &min_ns = _nodes.at(host_port(_address_vec[id_min]));
if (min_ns.served_as(pid) != partition_status::PS_INACTIVE) {
LOG_DEBUG("{}: skip gpid({}.{}) coz it is already a member on the target node",
_app->get_logname(),
Expand Down
78 changes: 39 additions & 39 deletions src/meta/cluster_balance_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ uint32_t get_partition_count(const node_state &ns, balance_type type, int32_t ap
return (uint32_t)count;
}

uint32_t get_skew(const std::map<rpc_address, uint32_t> &count_map)
uint32_t get_skew(const std::map<host_port, uint32_t> &count_map)
{
uint32_t min = UINT_MAX, max = 0;
for (const auto &kv : count_map) {
Expand All @@ -78,11 +78,11 @@ uint32_t get_skew(const std::map<rpc_address, uint32_t> &count_map)
return max - min;
}

void get_min_max_set(const std::map<rpc_address, uint32_t> &node_count_map,
/*out*/ std::set<rpc_address> &min_set,
/*out*/ std::set<rpc_address> &max_set)
void get_min_max_set(const std::map<host_port, uint32_t> &node_count_map,
/*out*/ std::set<host_port> &min_set,
/*out*/ std::set<host_port> &max_set)
{
std::multimap<uint32_t, rpc_address> count_multimap = utils::flip_map(node_count_map);
std::multimap<uint32_t, host_port> count_multimap = utils::flip_map(node_count_map);

auto range = count_multimap.equal_range(count_multimap.begin()->first);
for (auto iter = range.first; iter != range.second; ++iter) {
Expand Down Expand Up @@ -222,22 +222,22 @@ bool cluster_balance_policy::get_app_migration_info(std::shared_ptr<app_state> a
info.app_name = app->app_name;
info.partitions.resize(app->partitions.size());
for (auto i = 0; i < app->partitions.size(); ++i) {
std::map<rpc_address, partition_status::type> pstatus_map;
pstatus_map[app->partitions[i].primary] = partition_status::PS_PRIMARY;
std::map<host_port, partition_status::type> pstatus_map;
pstatus_map[app->partitions[i].hp_primary] = partition_status::PS_PRIMARY;
if (app->partitions[i].secondaries.size() != app->partitions[i].max_replica_count - 1) {
// partition is unhealthy
return false;
}
for (const auto &addr : app->partitions[i].secondaries) {
pstatus_map[addr] = partition_status::PS_SECONDARY;
for (const auto &hp : app->partitions[i].hp_secondaries) {
pstatus_map[hp] = partition_status::PS_SECONDARY;
}
info.partitions[i] = pstatus_map;
}

for (const auto &it : nodes) {
const node_state &ns = it.second;
auto count = get_partition_count(ns, type, app->app_id);
info.replicas_count[ns.addr()] = count;
info.replicas_count[ns.host_port()] = count;
}

return true;
Expand All @@ -247,12 +247,12 @@ void cluster_balance_policy::get_node_migration_info(const node_state &ns,
const app_mapper &apps,
/*out*/ node_migration_info &info)
{
info.address = ns.addr();
info.hp = ns.host_port();
for (const auto &iter : apps) {
std::shared_ptr<app_state> app = iter.second;
for (const auto &context : app->helpers->contexts) {
std::string disk_tag;
if (!context.get_disk_tag(ns.addr(), disk_tag)) {
if (!context.get_disk_tag(ns.host_port(), disk_tag)) {
continue;
}
auto pid = context.config_owner->pid;
Expand Down Expand Up @@ -290,8 +290,8 @@ bool cluster_balance_policy::get_next_move(const cluster_migration_info &cluster
* a move that improves the app skew and the cluster skew, if possible. If
* not, attempt to pick a move that improves the app skew.
**/
std::set<rpc_address> cluster_min_count_nodes;
std::set<rpc_address> cluster_max_count_nodes;
std::set<host_port> cluster_min_count_nodes;
std::set<host_port> cluster_max_count_nodes;
get_min_max_set(cluster_info.replicas_count, cluster_min_count_nodes, cluster_max_count_nodes);

bool found = false;
Expand All @@ -303,18 +303,18 @@ bool cluster_balance_policy::get_next_move(const cluster_migration_info &cluster
continue;
}
auto app_map = it->second.replicas_count;
std::set<rpc_address> app_min_count_nodes;
std::set<rpc_address> app_max_count_nodes;
std::set<host_port> app_min_count_nodes;
std::set<host_port> app_max_count_nodes;
get_min_max_set(app_map, app_min_count_nodes, app_max_count_nodes);

/**
* Compute the intersection of the replica servers most loaded for the app
* with the replica servers most loaded overall, and likewise for least loaded.
* These are our ideal candidates for moving from and to, respectively.
**/
std::set<rpc_address> app_cluster_min_set =
std::set<host_port> app_cluster_min_set =
utils::get_intersection(app_min_count_nodes, cluster_min_count_nodes);
std::set<rpc_address> app_cluster_max_set =
std::set<host_port> app_cluster_max_set =
utils::get_intersection(app_max_count_nodes, cluster_max_count_nodes);

/**
Expand All @@ -323,7 +323,7 @@ bool cluster_balance_policy::get_next_move(const cluster_migration_info &cluster
* replicas of the app. Moving a replica in that case might keep the
* cluster skew the same or make it worse while keeping the app balanced.
**/
std::multimap<uint32_t, rpc_address> app_count_multimap = utils::flip_map(app_map);
std::multimap<uint32_t, host_port> app_count_multimap = utils::flip_map(app_map);
if (app_count_multimap.rbegin()->first <= app_count_multimap.begin()->first + 1 &&
(app_cluster_min_set.empty() || app_cluster_max_set.empty())) {
LOG_INFO("do not move replicas of a balanced app({}) if the least (most) loaded "
Expand Down Expand Up @@ -356,8 +356,8 @@ auto select_random(const S &s, size_t n)
}

bool cluster_balance_policy::pick_up_move(const cluster_migration_info &cluster_info,
const std::set<rpc_address> &max_nodes,
const std::set<rpc_address> &min_nodes,
const std::set<host_port> &max_nodes,
const std::set<host_port> &min_nodes,
const int32_t app_id,
const partition_set &selected_pid,
/*out*/ move_info &move_info)
Expand All @@ -373,19 +373,19 @@ bool cluster_balance_policy::pick_up_move(const cluster_migration_info &cluster_
max_load_disk.node.to_string(),
max_load_disk.disk_tag,
max_load_disk.partitions.size());
for (const auto &node_addr : min_nodes) {
for (const auto &node_hp : min_nodes) {
gpid picked_pid;
if (pick_up_partition(
cluster_info, node_addr, max_load_disk.partitions, selected_pid, picked_pid)) {
cluster_info, node_hp, max_load_disk.partitions, selected_pid, picked_pid)) {
move_info.pid = picked_pid;
move_info.source_node = max_load_disk.node;
move_info.source_disk_tag = max_load_disk.disk_tag;
move_info.target_node = node_addr;
move_info.target_node = node_hp;
move_info.type = cluster_info.type;
LOG_INFO("partition[{}] will migrate from {} to {}",
picked_pid,
max_load_disk.node.to_string(),
node_addr.to_string());
node_hp.to_string());
return true;
}
}
Expand All @@ -398,22 +398,22 @@ bool cluster_balance_policy::pick_up_move(const cluster_migration_info &cluster_

void cluster_balance_policy::get_max_load_disk_set(
const cluster_migration_info &cluster_info,
const std::set<rpc_address> &max_nodes,
const std::set<host_port> &max_nodes,
const int32_t app_id,
/*out*/ std::set<app_disk_info> &max_load_disk_set)
{
// key: partition count (app_disk_info.partitions.size())
// value: app_disk_info structure
std::multimap<uint32_t, app_disk_info> app_disk_info_multimap;
for (const auto &node_addr : max_nodes) {
for (const auto &node_hp : max_nodes) {
// key: disk_tag
// value: partition set for app(app id=app_id) in node(addr=node_addr)
// value: partition set for app(app id=app_id) in node(hp=node_hp)
std::map<std::string, partition_set> disk_partitions =
get_disk_partitions_map(cluster_info, node_addr, app_id);
get_disk_partitions_map(cluster_info, node_hp, app_id);
for (const auto &kv : disk_partitions) {
app_disk_info info;
info.app_id = app_id;
info.node = node_addr;
info.node = node_hp;
info.disk_tag = kv.first;
info.partitions = kv.second;
app_disk_info_multimap.insert(
Expand All @@ -427,11 +427,11 @@ void cluster_balance_policy::get_max_load_disk_set(
}

std::map<std::string, partition_set> cluster_balance_policy::get_disk_partitions_map(
const cluster_migration_info &cluster_info, const rpc_address &addr, const int32_t app_id)
const cluster_migration_info &cluster_info, const host_port &hp, const int32_t app_id)
{
std::map<std::string, partition_set> disk_partitions;
auto app_iter = cluster_info.apps_info.find(app_id);
auto node_iter = cluster_info.nodes_info.find(addr);
auto node_iter = cluster_info.nodes_info.find(hp);
if (app_iter == cluster_info.apps_info.end() || node_iter == cluster_info.nodes_info.end()) {
return disk_partitions;
}
Expand All @@ -447,7 +447,7 @@ std::map<std::string, partition_set> cluster_balance_policy::get_disk_partitions
continue;
}
auto status_map = app_partition[pid.get_partition_index()];
auto iter = status_map.find(addr);
auto iter = status_map.find(hp);
if (iter != status_map.end() && iter->second == status) {
disk_partitions[disk_tag].insert(pid);
}
Expand All @@ -457,7 +457,7 @@ std::map<std::string, partition_set> cluster_balance_policy::get_disk_partitions
}

bool cluster_balance_policy::pick_up_partition(const cluster_migration_info &cluster_info,
const rpc_address &min_node_addr,
const host_port &min_node_hp,
const partition_set &max_load_partitions,
const partition_set &selected_pid,
/*out*/ gpid &picked_pid)
Expand All @@ -476,7 +476,7 @@ bool cluster_balance_policy::pick_up_partition(const cluster_migration_info &clu

// partition has already been primary or secondary on min_node
app_migration_info info = iter->second;
if (info.get_partition_status(pid.get_partition_index(), min_node_addr) !=
if (info.get_partition_status(pid.get_partition_index(), min_node_hp) !=
partition_status::PS_INACTIVE) {
continue;
}
Expand All @@ -494,7 +494,7 @@ bool cluster_balance_policy::apply_move(const move_info &move,
/*out*/ cluster_migration_info &cluster_info)
{
int32_t app_id = move.pid.get_app_id();
rpc_address source = move.source_node, target = move.target_node;
host_port source = move.source_node, target = move.target_node;
if (cluster_info.apps_skew.find(app_id) == cluster_info.apps_skew.end() ||
cluster_info.replicas_count.find(source) == cluster_info.replicas_count.end() ||
cluster_info.replicas_count.find(target) == cluster_info.replicas_count.end() ||
Expand All @@ -512,10 +512,10 @@ bool cluster_balance_policy::apply_move(const move_info &move,
app_info.replicas_count[target]++;

auto &pmap = app_info.partitions[move.pid.get_partition_index()];
rpc_address primary_addr;
host_port primary_hp;
for (const auto &kv : pmap) {
if (kv.second == partition_status::PS_PRIMARY) {
primary_addr = kv.first;
primary_hp = kv.first;
}
}
auto status = cluster_info.type == balance_type::COPY_SECONDARY ? partition_status::PS_SECONDARY
Expand Down Expand Up @@ -544,7 +544,7 @@ bool cluster_balance_policy::apply_move(const move_info &move,
// add into migration list and selected_pid
partition_configuration pc;
pc.pid = move.pid;
pc.primary = primary_addr;
pc.hp_primary = primary_hp;
list[move.pid] = generate_balancer_request(*_global_view->apps, pc, move.type, source, target);
_migration_result->emplace(
move.pid, generate_balancer_request(*_global_view->apps, pc, move.type, source, target));
Expand Down
Loading

0 comments on commit 8773a6f

Please sign in to comment.