Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
refactor: move construct_replica from simple_load_balancer to meta_data
Browse files Browse the repository at this point in the history
  • Loading branch information
levy5307 committed Aug 30, 2021
1 parent 021d9f4 commit c18949e
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 202 deletions.
67 changes: 67 additions & 0 deletions src/meta/meta_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,73 @@ void maintain_drops(std::vector<rpc_address> &drops, const rpc_address &node, co
when_update_replicas(t, action);
}

bool construct_replica(meta_view view, const gpid &pid, int max_replica_count)
{
partition_configuration &pc = *get_config(*view.apps, pid);
config_context &cc = *get_config_context(*view.apps, pid);

dassert(replica_count(pc) == 0,
"replica count of gpid(%d.%d) must be 0",
pid.get_app_id(),
pid.get_partition_index());
dassert(
max_replica_count > 0, "max replica count is %d, should be at lease 1", max_replica_count);

std::vector<dropped_replica> &drop_list = cc.dropped;
if (drop_list.empty()) {
dwarn("construct for (%d.%d) failed, coz no replicas collected",
pid.get_app_id(),
pid.get_partition_index());
return false;
}

// treat last server in drop_list as the primary
const dropped_replica &server = drop_list.back();
dassert(server.ballot != invalid_ballot,
"the ballot of server must not be invalid_ballot, node = %s",
server.node.to_string());
pc.primary = server.node;
pc.ballot = server.ballot;
pc.partition_flags = 0;
pc.max_replica_count = max_replica_count;

ddebug("construct for (%d.%d), select %s as primary, ballot(%" PRId64
"), committed_decree(%" PRId64 "), prepare_decree(%" PRId64 ")",
pid.get_app_id(),
pid.get_partition_index(),
server.node.to_string(),
server.ballot,
server.last_committed_decree,
server.last_prepared_decree);

drop_list.pop_back();

// we put max_replica_count-1 recent replicas to last_drops, in case of the DDD-state when the
// only primary dead
// when add node to pc.last_drops, we don't remove it from our cc.drop_list
dassert(pc.last_drops.empty(),
"last_drops of partition(%d.%d) must be empty",
pid.get_app_id(),
pid.get_partition_index());
for (auto iter = drop_list.rbegin(); iter != drop_list.rend(); ++iter) {
if (pc.last_drops.size() + 1 >= max_replica_count)
break;
// similar to cc.drop_list, pc.last_drop is also a stack structure
pc.last_drops.insert(pc.last_drops.begin(), iter->node);
ddebug("construct for (%d.%d), select %s into last_drops, ballot(%" PRId64
"), committed_decree(%" PRId64 "), prepare_decree(%" PRId64 ")",
pid.get_app_id(),
pid.get_partition_index(),
iter->node.to_string(),
iter->ballot,
iter->last_committed_decree,
iter->last_prepared_decree);
}

cc.prefered_dropped = (int)drop_list.size() - 1;
return true;
}

proposal_actions::proposal_actions() : from_balancer(false) { reset_tracked_current_learner(); }

void proposal_actions::reset_tracked_current_learner()
Expand Down
8 changes: 8 additions & 0 deletions src/meta/meta_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,14 @@ void maintain_drops(/*inout*/ std::vector<dsn::rpc_address> &drops,
const dsn::rpc_address &node,
config_type::type t);

//
// Try to construct a replica-group by current replica-infos of a gpid
// ret:
// if construct the replica successfully, return true.
// Notice: as long as we can construct something from current infos, we treat it as a
// success
bool construct_replica(meta_view view, const gpid &pid, int max_replica_count);

inline bool has_seconds_expired(uint64_t second_ts) { return second_ts * 1000 < dsn_now_ms(); }

inline bool has_milliseconds_expired(uint64_t milliseconds_ts)
Expand Down
67 changes: 0 additions & 67 deletions src/meta/server_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -956,72 +956,5 @@ bool simple_load_balancer::collect_replica(meta_view view,

return info.status == partition_status::PS_POTENTIAL_SECONDARY || ans != -1;
}

bool simple_load_balancer::construct_replica(meta_view view, const gpid &pid, int max_replica_count)
{
partition_configuration &pc = *get_config(*view.apps, pid);
config_context &cc = *get_config_context(*view.apps, pid);

dassert(replica_count(pc) == 0,
"replica count of gpid(%d.%d) must be 0",
pid.get_app_id(),
pid.get_partition_index());
dassert(
max_replica_count > 0, "max replica count is %d, should be at lease 1", max_replica_count);

std::vector<dropped_replica> &drop_list = cc.dropped;
if (drop_list.empty()) {
dwarn("construct for (%d.%d) failed, coz no replicas collected",
pid.get_app_id(),
pid.get_partition_index());
return false;
}

// treat last server in drop_list as the primary
const dropped_replica &server = drop_list.back();
dassert(server.ballot != invalid_ballot,
"the ballot of server must not be invalid_ballot, node = %s",
server.node.to_string());
pc.primary = server.node;
pc.ballot = server.ballot;
pc.partition_flags = 0;
pc.max_replica_count = max_replica_count;

ddebug("construct for (%d.%d), select %s as primary, ballot(%" PRId64
"), committed_decree(%" PRId64 "), prepare_decree(%" PRId64 ")",
pid.get_app_id(),
pid.get_partition_index(),
server.node.to_string(),
server.ballot,
server.last_committed_decree,
server.last_prepared_decree);

drop_list.pop_back();

// we put max_replica_count-1 recent replicas to last_drops, in case of the DDD-state when the
// only primary dead
// when add node to pc.last_drops, we don't remove it from our cc.drop_list
dassert(pc.last_drops.empty(),
"last_drops of partition(%d.%d) must be empty",
pid.get_app_id(),
pid.get_partition_index());
for (auto iter = drop_list.rbegin(); iter != drop_list.rend(); ++iter) {
if (pc.last_drops.size() + 1 >= max_replica_count)
break;
// similar to cc.drop_list, pc.last_drop is also a stack structure
pc.last_drops.insert(pc.last_drops.begin(), iter->node);
ddebug("construct for (%d.%d), select %s into last_drops, ballot(%" PRId64
"), committed_decree(%" PRId64 "), prepare_decree(%" PRId64 ")",
pid.get_app_id(),
pid.get_partition_index(),
iter->node.to_string(),
iter->ballot,
iter->last_committed_decree,
iter->last_prepared_decree);
}

cc.prefered_dropped = (int)drop_list.size() - 1;
return true;
}
} // namespace replication
} // namespace dsn
11 changes: 0 additions & 11 deletions src/meta/server_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,6 @@ class server_load_balancer
virtual bool
collect_replica(meta_view view, const dsn::rpc_address &node, const replica_info &info) = 0;

//
// Try to construct a replica-group by current replica-infos of a gpid
// ret:
// if construct the replica successfully, return true.
// Notice: as long as we can construct something from current infos, we treat it as a
// success
//
virtual bool construct_replica(meta_view view, const gpid &pid, int max_replica_count) = 0;

void register_proposals(meta_view view,
const configuration_balancer_request &req,
configuration_balancer_response &resp);
Expand Down Expand Up @@ -308,8 +299,6 @@ class simple_load_balancer : public server_load_balancer
bool
collect_replica(meta_view view, const rpc_address &node, const replica_info &info) override;

bool construct_replica(meta_view view, const gpid &pid, int max_replica_count) override;

void register_ctrl_commands() override;

void unregister_ctrl_commands() override;
Expand Down
4 changes: 2 additions & 2 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2088,8 +2088,8 @@ error_code server_state::construct_partitions(
ddebug("ignore constructing partitions for dropping app(%d)", app->app_id);
} else {
for (partition_configuration &pc : app->partitions) {
bool is_succeed = _meta_svc->get_balancer()->construct_replica(
{&_all_apps, &_nodes}, pc.pid, app->max_replica_count);
bool is_succeed =
construct_replica({&_all_apps, &_nodes}, pc.pid, app->max_replica_count);
if (is_succeed) {
ddebug("construct partition(%d.%d) succeed: %s",
app->app_id,
Expand Down
116 changes: 116 additions & 0 deletions src/meta/test/meta_data.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <gtest/gtest.h>
#include "misc/misc.h"
#include "meta/meta_data.h"

using namespace dsn::replication;
Expand Down Expand Up @@ -48,3 +49,118 @@ TEST(meta_data, dropped_cmp)
ASSERT_TRUE(dropped_cmp(d2, d1) == 0);
}
}

TEST(meta_data, construct_replica)
{
app_mapper app;
node_mapper nodes;

dsn::app_info info;
info.app_id = 1;
info.is_stateful = true;
info.status = dsn::app_status::AS_AVAILABLE;
info.app_name = "test";
info.app_type = "test";
info.max_replica_count = 3;
info.partition_count = 1024;
std::shared_ptr<app_state> the_app = app_state::create(info);
app.emplace(the_app->app_id, the_app);
meta_view view = {&app, &nodes};

replica_info rep;
rep.app_type = "test";
rep.pid = dsn::gpid(1, 0);

dsn::partition_configuration &pc = *get_config(app, rep.pid);
config_context &cc = *get_config_context(app, rep.pid);

std::vector<dsn::rpc_address> node_list;
generate_node_list(node_list, 10, 10);

#define CLEAR_REPLICA \
do { \
pc.primary.set_invalid(); \
pc.secondaries.clear(); \
pc.last_drops.clear(); \
} while (false)

#define CLEAR_DROP_LIST \
do { \
cc.dropped.clear(); \
} while (false)

#define CLEAR_ALL \
CLEAR_REPLICA; \
CLEAR_DROP_LIST

// drop_list is empty, can't construct replica
{
CLEAR_ALL;
ASSERT_FALSE(construct_replica(view, rep.pid, 3));
ASSERT_EQ(0, replica_count(pc));
}

// only have one node in drop_list
{
CLEAR_ALL;
cc.dropped = {dropped_replica{node_list[0], dropped_replica::INVALID_TIMESTAMP, 5, 10, 12}};
ASSERT_TRUE(construct_replica(view, rep.pid, 3));
ASSERT_EQ(node_list[0], pc.primary);
ASSERT_TRUE(pc.secondaries.empty());
ASSERT_TRUE(cc.dropped.empty());
ASSERT_EQ(-1, cc.prefered_dropped);
}

// have multiple nodes, ballots are not same
{
CLEAR_ALL;
cc.dropped = {dropped_replica{node_list[1], dropped_replica::INVALID_TIMESTAMP, 6, 10, 12},
dropped_replica{node_list[2], dropped_replica::INVALID_TIMESTAMP, 7, 10, 12},
dropped_replica{node_list[3], dropped_replica::INVALID_TIMESTAMP, 8, 10, 12},
dropped_replica{node_list[4], dropped_replica::INVALID_TIMESTAMP, 9, 11, 12}};
ASSERT_TRUE(construct_replica(view, rep.pid, 3));
ASSERT_EQ(node_list[4], pc.primary);
ASSERT_TRUE(pc.secondaries.empty());

std::vector<dsn::rpc_address> nodes = {node_list[2], node_list[3]};
ASSERT_EQ(nodes, pc.last_drops);
ASSERT_EQ(3, cc.dropped.size());
ASSERT_EQ(2, cc.prefered_dropped);
}

// have multiple node, two have same ballots
{
CLEAR_ALL;
cc.dropped = {dropped_replica{node_list[0], dropped_replica::INVALID_TIMESTAMP, 5, 10, 12},
dropped_replica{node_list[1], dropped_replica::INVALID_TIMESTAMP, 7, 11, 12},
dropped_replica{node_list[2], dropped_replica::INVALID_TIMESTAMP, 7, 12, 12}};

ASSERT_TRUE(construct_replica(view, rep.pid, 3));
ASSERT_EQ(node_list[2], pc.primary);
ASSERT_TRUE(pc.secondaries.empty());

std::vector<dsn::rpc_address> nodes = {node_list[0], node_list[1]};
ASSERT_EQ(nodes, pc.last_drops);
ASSERT_EQ(2, cc.dropped.size());
ASSERT_EQ(1, cc.prefered_dropped);
}

// have multiple nodes, all have same ballots
{
CLEAR_ALL;
cc.dropped = {dropped_replica{node_list[0], dropped_replica::INVALID_TIMESTAMP, 7, 11, 14},
dropped_replica{node_list[1], dropped_replica::INVALID_TIMESTAMP, 7, 12, 14},
dropped_replica{node_list[2], dropped_replica::INVALID_TIMESTAMP, 7, 13, 14},
dropped_replica{node_list[3], dropped_replica::INVALID_TIMESTAMP, 7, 14, 14}};

ASSERT_TRUE(construct_replica(view, rep.pid, 3));
ASSERT_EQ(node_list[3], pc.primary);
ASSERT_TRUE(pc.secondaries.empty());

std::vector<dsn::rpc_address> nodes = {node_list[1], node_list[2]};
ASSERT_EQ(nodes, pc.last_drops);

ASSERT_EQ(3, cc.dropped.size());
ASSERT_EQ(2, cc.prefered_dropped);
}
}
Loading

0 comments on commit c18949e

Please sign in to comment.