Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

refactor: move on_missing_secondary from simple_load_balancer to partition_guardian #886

Merged
merged 1 commit into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 157 additions & 2 deletions src/meta/partition_guardian.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ namespace dsn {
namespace replication {
partition_guardian::partition_guardian(meta_service *svc) : _svc(svc)
{
if (svc != nullptr) {
_mutation_2pc_min_replica_count = svc->get_options().mutation_2pc_min_replica_count;
_replica_assign_delay_ms_for_dropouts =
svc->get_meta_options()._lb_opts.replica_assign_delay_ms_for_dropouts;
config_context::MAX_REPLICA_COUNT_IN_GRROUP =
svc->get_meta_options()._lb_opts.max_replicas_in_group;
} else {
_mutation_2pc_min_replica_count = 0;
_replica_assign_delay_ms_for_dropouts = 0;
}

_recent_choose_primary_fail_count.init_app_counter(
"eon.server_load_balancer",
"recent_choose_primary_fail_count",
Expand Down Expand Up @@ -414,8 +425,152 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi

pc_status partition_guardian::on_missing_secondary(meta_view &view, const dsn::gpid &gpid)
{
// TBD(zlw)
return pc_status::invalid;
partition_configuration &pc = *get_config(*(view.apps), gpid);
config_context &cc = *get_config_context(*(view.apps), gpid);

configuration_proposal_action action;
bool is_emergency = false;
if (cc.config_owner->max_replica_count > _mutation_2pc_min_replica_count &&
replica_count(pc) < _mutation_2pc_min_replica_count) {
// ATTENTION:
// when max_replica_count == 2, even if there is only 1 replica alive now, we will still
// wait for replica_assign_delay_ms_for_dropouts before recover the second replica.
is_emergency = true;
ddebug("gpid(%s): is emergency due to too few replicas", gpid.to_string());
} else if (cc.dropped.empty()) {
is_emergency = true;
ddebug("gpid(%s): is emergency due to no dropped candidate", gpid.to_string());
} else if (has_milliseconds_expired(cc.dropped.back().time +
_replica_assign_delay_ms_for_dropouts)) {
is_emergency = true;
char time_buf[30];
::dsn::utils::time_ms_to_string(cc.dropped.back().time, time_buf);
ddebug("gpid(%s): is emergency due to lose secondary for a long time, "
"last_dropped_node(%s), drop_time(%s), delay_ms(%" PRIu64 ")",
gpid.to_string(),
cc.dropped.back().node.to_string(),
time_buf,
_replica_assign_delay_ms_for_dropouts);
} else if (in_black_list(cc.dropped.back().node)) {
ddebug("gpid(%s) is emergency due to recent dropped(%s) is in black list",
gpid.to_string(),
cc.dropped.back().node.to_string());
is_emergency = true;
}
action.node.set_invalid();

if (is_emergency) {
std::ostringstream oss;
for (int i = 0; i < cc.dropped.size(); ++i) {
if (i != 0)
oss << ",";
oss << cc.dropped[i].node.to_string();
}
ddebug("gpid(%s): try to choose node in dropped list, dropped_list(%s), "
"prefered_dropped(%d)",
gpid.to_string(),
oss.str().c_str(),
cc.prefered_dropped);
if (cc.prefered_dropped < 0 || cc.prefered_dropped >= (int)cc.dropped.size()) {
ddebug("gpid(%s): prefered_dropped(%d) is invalid according to drop_list(size %d), "
"reset it to %d (drop_list.size - 1)",
gpid.to_string(),
cc.prefered_dropped,
(int)cc.dropped.size(),
(int)cc.dropped.size() - 1);
cc.prefered_dropped = (int)cc.dropped.size() - 1;
}

while (cc.prefered_dropped >= 0) {
const dropped_replica &server = cc.dropped[cc.prefered_dropped];
if (is_node_alive(*view.nodes, server.node)) {
ddebug("gpid(%s): node(%s) at cc.dropped[%d] is alive now, choose it, "
"and forward prefered_dropped from (%d) to (%d)",
gpid.to_string(),
server.node.to_string(),
cc.prefered_dropped,
cc.prefered_dropped,
cc.prefered_dropped - 1);
action.node = server.node;
cc.prefered_dropped--;
break;
} else {
ddebug("gpid(%s): node(%s) at cc.dropped[%d] is not alive now, "
"changed prefered_dropped from (%d) to (%d)",
gpid.to_string(),
server.node.to_string(),
cc.prefered_dropped,
cc.prefered_dropped,
cc.prefered_dropped - 1);
cc.prefered_dropped--;
}
}

if (action.node.is_invalid() || in_black_list(action.node)) {
if (!action.node.is_invalid()) {
ddebug("gpid(%s) refuse to use selected node(%s) as it is in black list",
gpid.to_string(),
action.node.to_string());
}
newly_partitions *min_server_np = nullptr;
for (auto &pairs : *view.nodes) {
node_state &ns = pairs.second;
if (!ns.alive() || is_member(pc, ns.addr()) || in_black_list(ns.addr()))
continue;
newly_partitions *np = newly_partitions_ext::get_inited(&ns);
if (min_server_np == nullptr ||
np->less_partitions(*min_server_np, gpid.get_app_id())) {
action.node = ns.addr();
min_server_np = np;
}
}

if (!action.node.is_invalid()) {
ddebug("gpid(%s): can't find valid node in dropped list to add as secondary, "
"choose new node(%s) with minimal partitions serving",
gpid.to_string(),
action.node.to_string());
} else {
ddebug("gpid(%s): can't find valid node in dropped list to add as secondary, "
"but also we can't find a new node to add as secondary",
gpid.to_string());
}
}
} else {
// if not emergency, only try to recover last dropped server
const dropped_replica &server = cc.dropped.back();
if (is_node_alive(*view.nodes, server.node)) {
dassert(!server.node.is_invalid(),
"invalid server address, address = %s",
server.node.to_string());
action.node = server.node;
}

if (!action.node.is_invalid()) {
ddebug("gpid(%s): choose node(%s) as secondary coz it is last_dropped_node and is "
"alive now",
gpid.to_string(),
server.node.to_string());
} else {
ddebug("gpid(%s): can't add secondary coz last_dropped_node(%s) is not alive now, "
"ignore this as not in emergency",
gpid.to_string(),
server.node.to_string());
}
}

if (!action.node.is_invalid()) {
action.type = config_type::CT_ADD_SECONDARY;
action.target = pc.primary;

newly_partitions *np = get_newly_partitions(*(view.nodes), action.node);
dassert(np != nullptr, "");
np->newly_add_partition(gpid.get_app_id());

cc.lb_actions.assign_cure_proposal(action);
}

return pc_status::ill;
}

pc_status partition_guardian::on_redundant_secondary(meta_view &view, const dsn::gpid &gpid)
Expand Down
24 changes: 22 additions & 2 deletions src/meta/partition_guardian.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,30 @@ class partition_guardian
_ddd_partitions[partition.config.pid] = std::move(partition);
}

perf_counter_wrapper _recent_choose_primary_fail_count;
bool in_black_list(dsn::rpc_address addr)
{
dsn::zauto_read_lock l(_black_list_lock);
return _assign_secondary_black_list.count(addr) != 0;
}

meta_service *_svc;
mutable zlock _ddd_partitions_lock;
perf_counter_wrapper _recent_choose_primary_fail_count;

mutable zlock _ddd_partitions_lock; // [
std::map<gpid, ddd_partition_info> _ddd_partitions;
// ]

// NOTICE: the command handler is called in THREADPOOL_DEFAULT
// but when adding secondary, the black list is accessed in THREADPOOL_META_STATE
// so we need a lock to protect it
dsn::zrwlock_nr _black_list_lock; // [
std::set<dsn::rpc_address> _assign_secondary_black_list;
// ]
dsn_handle_t _ctrl_assign_secondary_black_list;

int32_t _mutation_2pc_min_replica_count;
dsn_handle_t _ctrl_assign_delay_ms;
uint64_t _replica_assign_delay_ms_for_dropouts;
};

} // namespace replication
Expand Down