Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

refactor: move construct_replica from simple_load_balancer to meta_data #893

Merged
merged 5 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions src/meta/meta_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,73 @@ void maintain_drops(std::vector<rpc_address> &drops, const rpc_address &node, co
when_update_replicas(t, action);
}

bool construct_replica(meta_view view, const gpid &pid, int max_replica_count)
{
partition_configuration &pc = *get_config(*view.apps, pid);
config_context &cc = *get_config_context(*view.apps, pid);

dassert(replica_count(pc) == 0,
"replica count of gpid(%d.%d) must be 0",
pid.get_app_id(),
pid.get_partition_index());
dassert(
max_replica_count > 0, "max replica count is %d, should be at lease 1", max_replica_count);

std::vector<dropped_replica> &drop_list = cc.dropped;
if (drop_list.empty()) {
dwarn("construct for (%d.%d) failed, coz no replicas collected",
pid.get_app_id(),
pid.get_partition_index());
return false;
}

// treat last server in drop_list as the primary
const dropped_replica &server = drop_list.back();
dassert(server.ballot != invalid_ballot,
"the ballot of server must not be invalid_ballot, node = %s",
server.node.to_string());
pc.primary = server.node;
pc.ballot = server.ballot;
pc.partition_flags = 0;
pc.max_replica_count = max_replica_count;

ddebug("construct for (%d.%d), select %s as primary, ballot(%" PRId64
"), committed_decree(%" PRId64 "), prepare_decree(%" PRId64 ")",
pid.get_app_id(),
pid.get_partition_index(),
server.node.to_string(),
server.ballot,
server.last_committed_decree,
server.last_prepared_decree);

drop_list.pop_back();

// we put max_replica_count-1 recent replicas to last_drops, in case of the DDD-state when the
// only primary dead
// when add node to pc.last_drops, we don't remove it from our cc.drop_list
dassert(pc.last_drops.empty(),
"last_drops of partition(%d.%d) must be empty",
pid.get_app_id(),
pid.get_partition_index());
for (auto iter = drop_list.rbegin(); iter != drop_list.rend(); ++iter) {
if (pc.last_drops.size() + 1 >= max_replica_count)
break;
// similar to cc.drop_list, pc.last_drop is also a stack structure
pc.last_drops.insert(pc.last_drops.begin(), iter->node);
ddebug("construct for (%d.%d), select %s into last_drops, ballot(%" PRId64
"), committed_decree(%" PRId64 "), prepare_decree(%" PRId64 ")",
pid.get_app_id(),
pid.get_partition_index(),
iter->node.to_string(),
iter->ballot,
iter->last_committed_decree,
iter->last_prepared_decree);
}

cc.prefered_dropped = (int)drop_list.size() - 1;
return true;
}

bool collect_replica(meta_view view, const rpc_address &node, const replica_info &info)
{
partition_configuration &pc = *get_config(*view.apps, info.pid);
Expand Down
8 changes: 7 additions & 1 deletion src/meta/meta_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,13 @@ void maintain_drops(/*inout*/ std::vector<dsn::rpc_address> &drops,
const dsn::rpc_address &node,
config_type::type t);

//
// Try to construct a replica-group by current replica-infos of a gpid
// ret:
// if construct the replica successfully, return true.
// Notice: as long as we can construct something from current infos, we treat it as a
// success
bool construct_replica(meta_view view, const gpid &pid, int max_replica_count);

// When replica infos are collected from replica servers, meta-server
// will use this to check if a replica on a server is useful
// params:
Expand Down
67 changes: 0 additions & 67 deletions src/meta/server_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -931,72 +931,5 @@ pc_status simple_load_balancer::cure(meta_view view,
}
return status;
}

bool simple_load_balancer::construct_replica(meta_view view, const gpid &pid, int max_replica_count)
{
partition_configuration &pc = *get_config(*view.apps, pid);
config_context &cc = *get_config_context(*view.apps, pid);

dassert(replica_count(pc) == 0,
"replica count of gpid(%d.%d) must be 0",
pid.get_app_id(),
pid.get_partition_index());
dassert(
max_replica_count > 0, "max replica count is %d, should be at lease 1", max_replica_count);

std::vector<dropped_replica> &drop_list = cc.dropped;
if (drop_list.empty()) {
dwarn("construct for (%d.%d) failed, coz no replicas collected",
pid.get_app_id(),
pid.get_partition_index());
return false;
}

// treat last server in drop_list as the primary
const dropped_replica &server = drop_list.back();
dassert(server.ballot != invalid_ballot,
"the ballot of server must not be invalid_ballot, node = %s",
server.node.to_string());
pc.primary = server.node;
pc.ballot = server.ballot;
pc.partition_flags = 0;
pc.max_replica_count = max_replica_count;

ddebug("construct for (%d.%d), select %s as primary, ballot(%" PRId64
"), committed_decree(%" PRId64 "), prepare_decree(%" PRId64 ")",
pid.get_app_id(),
pid.get_partition_index(),
server.node.to_string(),
server.ballot,
server.last_committed_decree,
server.last_prepared_decree);

drop_list.pop_back();

// we put max_replica_count-1 recent replicas to last_drops, in case of the DDD-state when the
// only primary dead
// when add node to pc.last_drops, we don't remove it from our cc.drop_list
dassert(pc.last_drops.empty(),
"last_drops of partition(%d.%d) must be empty",
pid.get_app_id(),
pid.get_partition_index());
for (auto iter = drop_list.rbegin(); iter != drop_list.rend(); ++iter) {
if (pc.last_drops.size() + 1 >= max_replica_count)
break;
// similar to cc.drop_list, pc.last_drop is also a stack structure
pc.last_drops.insert(pc.last_drops.begin(), iter->node);
ddebug("construct for (%d.%d), select %s into last_drops, ballot(%" PRId64
"), committed_decree(%" PRId64 "), prepare_decree(%" PRId64 ")",
pid.get_app_id(),
pid.get_partition_index(),
iter->node.to_string(),
iter->ballot,
iter->last_committed_decree,
iter->last_prepared_decree);
}

cc.prefered_dropped = (int)drop_list.size() - 1;
return true;
}
} // namespace replication
} // namespace dsn
11 changes: 0 additions & 11 deletions src/meta/server_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,6 @@ class server_load_balancer
virtual void
score(meta_view view, double &primary_stddev /*out*/, double &total_stddev /*out*/) = 0;

//
// Try to construct a replica-group by current replica-infos of a gpid
// ret:
// if construct the replica successfully, return true.
// Notice: as long as we can construct something from current infos, we treat it as a
// success
//
virtual bool construct_replica(meta_view view, const gpid &pid, int max_replica_count) = 0;

void register_proposals(meta_view view,
const configuration_balancer_request &req,
configuration_balancer_response &resp);
Expand Down Expand Up @@ -292,8 +283,6 @@ class simple_load_balancer : public server_load_balancer
pc_status
cure(meta_view view, const dsn::gpid &gpid, configuration_proposal_action &action) override;

bool construct_replica(meta_view view, const gpid &pid, int max_replica_count) override;

void register_ctrl_commands() override;

void unregister_ctrl_commands() override;
Expand Down
4 changes: 2 additions & 2 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2087,8 +2087,8 @@ error_code server_state::construct_partitions(
ddebug("ignore constructing partitions for dropping app(%d)", app->app_id);
} else {
for (partition_configuration &pc : app->partitions) {
bool is_succeed = _meta_svc->get_balancer()->construct_replica(
{&_all_apps, &_nodes}, pc.pid, app->max_replica_count);
bool is_succeed =
construct_replica({&_all_apps, &_nodes}, pc.pid, app->max_replica_count);
if (is_succeed) {
ddebug("construct partition(%d.%d) succeed: %s",
app->app_id,
Expand Down
115 changes: 115 additions & 0 deletions src/meta/test/meta_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,118 @@ TEST(meta_data, collect_replica)
#undef CLEAR_REPLICA
#undef CLEAR_DROP_LIST
}

TEST(meta_data, construct_replica)
{
app_mapper app;
node_mapper nodes;

dsn::app_info info;
info.app_id = 1;
info.is_stateful = true;
info.status = dsn::app_status::AS_AVAILABLE;
info.app_name = "test";
info.app_type = "test";
info.max_replica_count = 3;
info.partition_count = 1024;
std::shared_ptr<app_state> the_app = app_state::create(info);
app.emplace(the_app->app_id, the_app);
meta_view view = {&app, &nodes};

replica_info rep;
rep.app_type = "test";
rep.pid = dsn::gpid(1, 0);

dsn::partition_configuration &pc = *get_config(app, rep.pid);
config_context &cc = *get_config_context(app, rep.pid);

std::vector<dsn::rpc_address> node_list;
generate_node_list(node_list, 10, 10);

#define CLEAR_REPLICA \
do { \
pc.primary.set_invalid(); \
pc.secondaries.clear(); \
pc.last_drops.clear(); \
} while (false)

#define CLEAR_DROP_LIST \
do { \
cc.dropped.clear(); \
} while (false)

#define CLEAR_ALL \
CLEAR_REPLICA; \
CLEAR_DROP_LIST

// drop_list is empty, can't construct replica
{
CLEAR_ALL;
ASSERT_FALSE(construct_replica(view, rep.pid, 3));
ASSERT_EQ(0, replica_count(pc));
}

// only have one node in drop_list
{
CLEAR_ALL;
cc.dropped = {dropped_replica{node_list[0], dropped_replica::INVALID_TIMESTAMP, 5, 10, 12}};
ASSERT_TRUE(construct_replica(view, rep.pid, 3));
ASSERT_EQ(node_list[0], pc.primary);
ASSERT_TRUE(pc.secondaries.empty());
ASSERT_TRUE(cc.dropped.empty());
ASSERT_EQ(-1, cc.prefered_dropped);
}

// have multiple nodes, ballots are not same
{
CLEAR_ALL;
cc.dropped = {dropped_replica{node_list[1], dropped_replica::INVALID_TIMESTAMP, 6, 10, 12},
dropped_replica{node_list[2], dropped_replica::INVALID_TIMESTAMP, 7, 10, 12},
dropped_replica{node_list[3], dropped_replica::INVALID_TIMESTAMP, 8, 10, 12},
dropped_replica{node_list[4], dropped_replica::INVALID_TIMESTAMP, 9, 11, 12}};
ASSERT_TRUE(construct_replica(view, rep.pid, 3));
ASSERT_EQ(node_list[4], pc.primary);
ASSERT_TRUE(pc.secondaries.empty());

std::vector<dsn::rpc_address> nodes = {node_list[2], node_list[3]};
ASSERT_EQ(nodes, pc.last_drops);
ASSERT_EQ(3, cc.dropped.size());
ASSERT_EQ(2, cc.prefered_dropped);
}

// have multiple node, two have same ballots
{
CLEAR_ALL;
cc.dropped = {dropped_replica{node_list[0], dropped_replica::INVALID_TIMESTAMP, 5, 10, 12},
dropped_replica{node_list[1], dropped_replica::INVALID_TIMESTAMP, 7, 11, 12},
dropped_replica{node_list[2], dropped_replica::INVALID_TIMESTAMP, 7, 12, 12}};

ASSERT_TRUE(construct_replica(view, rep.pid, 3));
ASSERT_EQ(node_list[2], pc.primary);
ASSERT_TRUE(pc.secondaries.empty());

std::vector<dsn::rpc_address> nodes = {node_list[0], node_list[1]};
ASSERT_EQ(nodes, pc.last_drops);
ASSERT_EQ(2, cc.dropped.size());
ASSERT_EQ(1, cc.prefered_dropped);
}

// have multiple nodes, all have same ballots
{
CLEAR_ALL;
cc.dropped = {dropped_replica{node_list[0], dropped_replica::INVALID_TIMESTAMP, 7, 11, 14},
dropped_replica{node_list[1], dropped_replica::INVALID_TIMESTAMP, 7, 12, 14},
dropped_replica{node_list[2], dropped_replica::INVALID_TIMESTAMP, 7, 13, 14},
dropped_replica{node_list[3], dropped_replica::INVALID_TIMESTAMP, 7, 14, 14}};

ASSERT_TRUE(construct_replica(view, rep.pid, 3));
ASSERT_EQ(node_list[3], pc.primary);
ASSERT_TRUE(pc.secondaries.empty());

std::vector<dsn::rpc_address> nodes = {node_list[1], node_list[2]};
ASSERT_EQ(nodes, pc.last_drops);

ASSERT_EQ(3, cc.dropped.size());
ASSERT_EQ(2, cc.prefered_dropped);
}
}
Loading