Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/master' into construct_replica
Browse files Browse the repository at this point in the history
  • Loading branch information
levy5307 committed Aug 31, 2021
2 parents 236ba91 + a63f453 commit 6427b8f
Show file tree
Hide file tree
Showing 15 changed files with 471 additions and 389 deletions.
142 changes: 82 additions & 60 deletions src/meta/meta_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,71 +78,93 @@ void maintain_drops(std::vector<rpc_address> &drops, const rpc_address &node, co
when_update_replicas(t, action);
}

bool construct_replica(meta_view view, const gpid &pid, int max_replica_count)
{
partition_configuration &pc = *get_config(*view.apps, pid);
config_context &cc = *get_config_context(*view.apps, pid);

dassert(replica_count(pc) == 0,
"replica count of gpid(%d.%d) must be 0",
pid.get_app_id(),
pid.get_partition_index());
dassert(
max_replica_count > 0, "max replica count is %d, should be at lease 1", max_replica_count);

std::vector<dropped_replica> &drop_list = cc.dropped;
if (drop_list.empty()) {
dwarn("construct for (%d.%d) failed, coz no replicas collected",
pid.get_app_id(),
pid.get_partition_index());
return false;
}
bool construct_replica(meta_view view, const gpid &pid, int max_replica_count) {
partition_configuration &pc = *get_config(*view.apps, pid);
config_context &cc = *get_config_context(*view.apps, pid);

// treat last server in drop_list as the primary
const dropped_replica &server = drop_list.back();
dassert(server.ballot != invalid_ballot,
"the ballot of server must not be invalid_ballot, node = %s",
server.node.to_string());
pc.primary = server.node;
pc.ballot = server.ballot;
pc.partition_flags = 0;
pc.max_replica_count = max_replica_count;

ddebug("construct for (%d.%d), select %s as primary, ballot(%" PRId64
"), committed_decree(%" PRId64 "), prepare_decree(%" PRId64 ")",
pid.get_app_id(),
pid.get_partition_index(),
server.node.to_string(),
server.ballot,
server.last_committed_decree,
server.last_prepared_decree);

drop_list.pop_back();

// we put max_replica_count-1 recent replicas to last_drops, in case of the DDD-state when the
// only primary dead
// when add node to pc.last_drops, we don't remove it from our cc.drop_list
dassert(pc.last_drops.empty(),
"last_drops of partition(%d.%d) must be empty",
pid.get_app_id(),
pid.get_partition_index());
for (auto iter = drop_list.rbegin(); iter != drop_list.rend(); ++iter) {
if (pc.last_drops.size() + 1 >= max_replica_count)
break;
// similar to cc.drop_list, pc.last_drop is also a stack structure
pc.last_drops.insert(pc.last_drops.begin(), iter->node);
ddebug("construct for (%d.%d), select %s into last_drops, ballot(%" PRId64
"), committed_decree(%" PRId64 "), prepare_decree(%" PRId64 ")",
dassert(replica_count(pc) == 0,
"replica count of gpid(%d.%d) must be 0",
pid.get_app_id(),
pid.get_partition_index());
dassert(
max_replica_count > 0, "max replica count is %d, should be at lease 1", max_replica_count);

std::vector<dropped_replica> &drop_list = cc.dropped;
if (drop_list.empty()) {
dwarn("construct for (%d.%d) failed, coz no replicas collected",
pid.get_app_id(),
pid.get_partition_index());
return false;
}

// treat last server in drop_list as the primary
const dropped_replica &server = drop_list.back();
dassert(server.ballot != invalid_ballot,
"the ballot of server must not be invalid_ballot, node = %s",
server.node.to_string());
pc.primary = server.node;
pc.ballot = server.ballot;
pc.partition_flags = 0;
pc.max_replica_count = max_replica_count;

ddebug("construct for (%d.%d), select %s as primary, ballot(%" PRId64
"), committed_decree(%" PRId64 "), prepare_decree(%" PRId64 ")",
pid.get_app_id(),
pid.get_partition_index(),
iter->node.to_string(),
iter->ballot,
iter->last_committed_decree,
iter->last_prepared_decree);
server.node.to_string(),
server.ballot,
server.last_committed_decree,
server.last_prepared_decree);

drop_list.pop_back();

// we put max_replica_count-1 recent replicas to last_drops, in case of the DDD-state when the
// only primary dead
// when add node to pc.last_drops, we don't remove it from our cc.drop_list
dassert(pc.last_drops.empty(),
"last_drops of partition(%d.%d) must be empty",
pid.get_app_id(),
pid.get_partition_index());
for (auto iter = drop_list.rbegin(); iter != drop_list.rend(); ++iter) {
if (pc.last_drops.size() + 1 >= max_replica_count)
break;
// similar to cc.drop_list, pc.last_drop is also a stack structure
pc.last_drops.insert(pc.last_drops.begin(), iter->node);
ddebug("construct for (%d.%d), select %s into last_drops, ballot(%" PRId64
"), committed_decree(%" PRId64 "), prepare_decree(%" PRId64 ")",
pid.get_app_id(),
pid.get_partition_index(),
iter->node.to_string(),
iter->ballot,
iter->last_committed_decree,
iter->last_prepared_decree);
}

cc.prefered_dropped = (int) drop_list.size() - 1;
return true;
}

cc.prefered_dropped = (int)drop_list.size() - 1;
return true;
bool collect_replica(meta_view view, const rpc_address &node, const replica_info &info)
{
partition_configuration &pc = *get_config(*view.apps, info.pid);
// current partition is during partition split
if (pc.ballot == invalid_ballot)
return false;
config_context &cc = *get_config_context(*view.apps, info.pid);
if (is_member(pc, node)) {
cc.collect_serving_replica(node, info);
return true;
}

// compare current node's replica information with current proposal,
// and try to find abnormal situations in send proposal
cc.adjust_proposal(node, info);

// adjust the drop list
int ans = cc.collect_drop_replica(node, info);
dassert(cc.check_order(), "");

return info.status == partition_status::PS_POTENTIAL_SECONDARY || ans != -1;
}

proposal_actions::proposal_actions() : from_balancer(false) { reset_tracked_current_learner(); }
Expand Down
11 changes: 10 additions & 1 deletion src/meta/meta_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -538,14 +538,23 @@ void maintain_drops(/*inout*/ std::vector<dsn::rpc_address> &drops,
const dsn::rpc_address &node,
config_type::type t);

//
// Try to construct a replica-group by current replica-infos of a gpid
// ret:
// if construct the replica successfully, return true.
// Notice: as long as we can construct something from current infos, we treat it as a
// success
bool construct_replica(meta_view view, const gpid &pid, int max_replica_count);

// When replica infos are collected from replica servers, meta-server
// will use this to check if a replica on a server is useful
// params:
// node: the owner of the replica info
// info: the replica info on node
// ret:
// return true if the replica is accepted as an useful replica. Or-else false.
// WARNING: if false is returned, the replica on node may be garbage-collected
bool collect_replica(meta_view view, const rpc_address &node, const replica_info &info);

inline bool has_seconds_expired(uint64_t second_ts) { return second_ts * 1000 < dsn_now_ms(); }

inline bool has_milliseconds_expired(uint64_t milliseconds_ts)
Expand Down
90 changes: 90 additions & 0 deletions src/meta/partition_guardian.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -616,5 +616,95 @@ void partition_guardian::finish_cure_proposal(meta_view &view,
}
}
}

void partition_guardian::register_ctrl_commands()
{
_ctrl_assign_delay_ms = dsn::command_manager::instance().register_command(
{"meta.lb.assign_delay_ms"},
"lb.assign_delay_ms [num | DEFAULT]",
"control the replica_assign_delay_ms_for_dropouts config",
[this](const std::vector<std::string> &args) { return ctrl_assign_delay_ms(args); });

_ctrl_assign_secondary_black_list = dsn::command_manager::instance().register_command(
{"meta.lb.assign_secondary_black_list"},
"lb.assign_secondary_black_list [<ip:port,ip:port,ip:port>|clear]",
"control the assign secondary black list",
[this](const std::vector<std::string> &args) {
return ctrl_assign_secondary_black_list(args);
});
}

void partition_guardian::unregister_ctrl_commands()
{
UNREGISTER_VALID_HANDLER(_ctrl_assign_delay_ms);
UNREGISTER_VALID_HANDLER(_ctrl_assign_secondary_black_list);
}

std::string partition_guardian::ctrl_assign_delay_ms(const std::vector<std::string> &args)
{
std::string result("OK");
if (args.empty()) {
result = std::to_string(_replica_assign_delay_ms_for_dropouts);
} else {
if (args[0] == "DEFAULT") {
_replica_assign_delay_ms_for_dropouts =
_svc->get_meta_options()._lb_opts.replica_assign_delay_ms_for_dropouts;
} else {
int32_t v = 0;
if (!dsn::buf2int32(args[0], v) || v <= 0) {
result = std::string("ERR: invalid arguments");
} else {
_replica_assign_delay_ms_for_dropouts = v;
}
}
}
return result;
}

std::string
partition_guardian::ctrl_assign_secondary_black_list(const std::vector<std::string> &args)
{
std::string invalid_arguments("invalid arguments");
std::stringstream oss;
if (args.empty()) {
dsn::zauto_read_lock l(_black_list_lock);
oss << "get ok: ";
for (auto iter = _assign_secondary_black_list.begin();
iter != _assign_secondary_black_list.end();
++iter) {
if (iter != _assign_secondary_black_list.begin())
oss << ",";
oss << iter->to_string();
}
return oss.str();
}

if (args.size() != 1) {
return invalid_arguments;
}

dsn::zauto_write_lock l(_black_list_lock);
if (args[0] == "clear") {
_assign_secondary_black_list.clear();
return "clear ok";
}

std::vector<std::string> ip_ports;
dsn::utils::split_args(args[0].c_str(), ip_ports, ',');
if (args.size() == 0) {
return invalid_arguments;
}

std::set<dsn::rpc_address> addr_list;
for (const std::string &s : ip_ports) {
dsn::rpc_address addr;
if (!addr.from_string_ipv4(s.c_str())) {
return invalid_arguments;
}
addr_list.insert(addr);
}
_assign_secondary_black_list = std::move(addr_list);
return "set ok";
}
} // namespace replication
} // namespace dsn
4 changes: 4 additions & 0 deletions src/meta/partition_guardian.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class partition_guardian
~partition_guardian() = default;

pc_status cure(meta_view view, const dsn::gpid &gpid, configuration_proposal_action &action);
void register_ctrl_commands();
void unregister_ctrl_commands();

private:
bool
Expand All @@ -44,6 +46,8 @@ class partition_guardian
void finish_cure_proposal(meta_view &view,
const dsn::gpid &gpid,
const configuration_proposal_action &action);
std::string ctrl_assign_delay_ms(const std::vector<std::string> &args);
std::string ctrl_assign_secondary_black_list(const std::vector<std::string> &args);

void set_ddd_partition(ddd_partition_info &&partition)
{
Expand Down
25 changes: 0 additions & 25 deletions src/meta/server_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -931,30 +931,5 @@ pc_status simple_load_balancer::cure(meta_view view,
}
return status;
}

bool simple_load_balancer::collect_replica(meta_view view,
const rpc_address &node,
const replica_info &info)
{
partition_configuration &pc = *get_config(*view.apps, info.pid);
// current partition is during partition split
if (pc.ballot == invalid_ballot)
return false;
config_context &cc = *get_config_context(*view.apps, info.pid);
if (is_member(pc, node)) {
cc.collect_serving_replica(node, info);
return true;
}

// compare current node's replica information with current proposal,
// and try to find abnormal situations in send proposal
cc.adjust_proposal(node, info);

// adjust the drop list
int ans = cc.collect_drop_replica(node, info);
dassert(cc.check_order(), "");

return info.status == partition_status::PS_POTENTIAL_SECONDARY || ans != -1;
}
} // namespace replication
} // namespace dsn
16 changes: 0 additions & 16 deletions src/meta/server_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,6 @@ class server_load_balancer
virtual void
score(meta_view view, double &primary_stddev /*out*/, double &total_stddev /*out*/) = 0;

//
// When replica infos are collected from replica servers, meta-server
// will use this to check if a replica on a server is useful
// params:
// node: the owner of the replica info
// info: the replica info on node
// ret:
// return true if the replica is accepted as an useful replica. Or-else false.
// WARNING: if false is returned, the replica on node may be garbage-collected
//
virtual bool
collect_replica(meta_view view, const dsn::rpc_address &node, const replica_info &info) = 0;

void register_proposals(meta_view view,
const configuration_balancer_request &req,
configuration_balancer_response &resp);
Expand Down Expand Up @@ -296,9 +283,6 @@ class simple_load_balancer : public server_load_balancer
pc_status
cure(meta_view view, const dsn::gpid &gpid, configuration_proposal_action &action) override;

bool
collect_replica(meta_view view, const rpc_address &node, const replica_info &info) override;

void register_ctrl_commands() override;

void unregister_ctrl_commands() override;
Expand Down
7 changes: 3 additions & 4 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -890,8 +890,8 @@ void server_state::on_config_sync(configuration_query_by_node_rpc rpc)
}
}
} else if (app->status == app_status::AS_AVAILABLE) {
bool is_useful_replica = _meta_svc->get_balancer()->collect_replica(
{&_all_apps, &_nodes}, request.node, rep);
bool is_useful_replica =
collect_replica({&_all_apps, &_nodes}, request.node, rep);
if (!is_useful_replica) {
if (level <= meta_function_level::fl_steady) {
ddebug("gpid(%d.%d) on node(%s) is useless, but current function level "
Expand Down Expand Up @@ -2063,8 +2063,7 @@ error_code server_state::construct_partitions(

for (replica_info &r : query_resp.replicas) {
dassert(_all_apps.find(r.pid.get_app_id()) != _all_apps.end(), "");
bool is_accepted = _meta_svc->get_balancer()->collect_replica(
{&_all_apps, &_nodes}, replica_nodes[i], r);
bool is_accepted = collect_replica({&_all_apps, &_nodes}, replica_nodes[i], r);
if (is_accepted) {
ddebug("accept replica(%s) from node(%s)",
boost::lexical_cast<std::string>(r).c_str(),
Expand Down
Loading

0 comments on commit 6427b8f

Please sign in to comment.