Skip to content

Commit

Permalink
4
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Jun 24, 2024
1 parent cbc4273 commit bedbce5
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 31 deletions.
2 changes: 1 addition & 1 deletion src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ void bulk_load_service::partition_bulk_load(const std::string &app_name, const g
const app_bulk_load_info &ainfo = _app_bulk_load_info[pid.get_app_id()];
req->pid = pid;
req->app_name = app_name;
SET_IP_AND_HOST_PORT(*req, primary, pc.primary1, pc.hp_primary1);
SET_OBJ_IP_AND_HOST_PORT(*req, primary, pc, primary1);
req->remote_provider_name = ainfo.file_provider_type;
req->cluster_name = ainfo.cluster_name;
req->meta_bulk_load_status = get_partition_bulk_load_status_unlocked(pid);
Expand Down
116 changes: 86 additions & 30 deletions src/replica/replica_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,11 @@ void replica::assign_primary(configuration_update_request &proposal)
SET_IP_AND_HOST_PORT(
proposal.config, primary1, _stub->primary_address(), _stub->primary_host_port());
replica_helper::remove_node(_stub->primary_address(), proposal.config.secondaries1);
replica_helper::remove_node(_stub->primary_host_port(), proposal.config.hp_secondaries1);

if (proposal.config.__isset.hp_secondaries1) {
replica_helper::remove_node(_stub->primary_host_port(), proposal.config.hp_secondaries1);
} else {
LOG_WARNING_PREFIX("proposal.config.hp_secondaries field is not set");
}
update_configuration_on_meta_server(proposal.type, node, proposal.config);
}

Expand All @@ -190,15 +193,22 @@ void replica::add_potential_secondary(const configuration_update_request &propos

CHECK_EQ(proposal.config.ballot, get_ballot());
CHECK_EQ(proposal.config.pid, _primary_states.pc.pid);
CHECK_EQ(proposal.config.hp_primary1, _primary_states.pc.hp_primary1);
CHECK(proposal.config.hp_secondaries1 == _primary_states.pc.hp_secondaries1, "");

CHECK_EQ(proposal.config.primary1, _primary_states.pc.primary1);
CHECK(proposal.config.secondaries1 == _primary_states.pc.secondaries1, "");
dsn::host_port primary;
GET_HOST_PORT(proposal.config, primary1, primary);
CHECK(_primary_states.pc.__isset.hp_primary1, "");
CHECK_EQ(primary, _primary_states.pc.hp_primary1);
std::vector<dsn::host_port> secondaries;
GET_HOST_PORTS(proposal.config, secondaries1, secondaries);
CHECK(_primary_states.pc.__isset.hp_secondaries1, "");
CHECK(secondaries == _primary_states.pc.hp_secondaries1, "");
host_port node;
GET_HOST_PORT(proposal, node, node);
CHECK(!_primary_states.check_exist(node, partition_status::PS_PRIMARY), "node = {}", node);
CHECK(!_primary_states.check_exist(node, partition_status::PS_SECONDARY), "node = {}", node);

int potential_secondaries_count =
const int potential_secondaries_count =
_primary_states.pc.hp_secondaries1.size() + _primary_states.learners.size();
if (potential_secondaries_count >= _primary_states.pc.max_replica_count - 1) {
if (proposal.type == config_type::CT_ADD_SECONDARY) {
Expand Down Expand Up @@ -270,9 +280,17 @@ void replica::downgrade_to_secondary_on_primary(configuration_update_request &pr
}

CHECK_EQ(proposal.config.pid, _primary_states.pc.pid);
CHECK_EQ(proposal.config.hp_primary1, _primary_states.pc.hp_primary1);
CHECK(proposal.config.hp_secondaries1 == _primary_states.pc.hp_secondaries1, "");
CHECK_EQ(proposal.hp_node, proposal.config.hp_primary1);
dsn::host_port primary;
GET_HOST_PORT(proposal.config, primary1, primary);
CHECK(_primary_states.pc.__isset.hp_primary1, "");
CHECK_EQ(primary, _primary_states.pc.hp_primary1);
std::vector<dsn::host_port> secondaries;
GET_HOST_PORTS(proposal.config, secondaries1, secondaries);
CHECK(_primary_states.pc.__isset.hp_secondaries1, "");
CHECK(secondaries == _primary_states.pc.hp_secondaries1, "");
dsn::host_port node;
GET_HOST_PORT(proposal, node, node);
CHECK_EQ(node, primary);
CHECK_EQ(proposal.node, proposal.config.primary1);

RESET_IP_AND_HOST_PORT(proposal.config, primary1);
Expand All @@ -287,22 +305,32 @@ void replica::downgrade_to_inactive_on_primary(configuration_update_request &pro
return;

CHECK_EQ(proposal.config.pid, _primary_states.pc.pid);
CHECK_EQ(proposal.config.hp_primary1, _primary_states.pc.hp_primary1);
CHECK(proposal.config.hp_secondaries1 == _primary_states.pc.hp_secondaries1, "");
dsn::host_port primary;
GET_HOST_PORT(proposal.config, primary1, primary);
CHECK(_primary_states.pc.__isset.hp_primary1, "");
CHECK_EQ(primary, _primary_states.pc.hp_primary1);
std::vector<dsn::host_port> secondaries;
GET_HOST_PORTS(proposal.config, secondaries1, secondaries);
CHECK(_primary_states.pc.__isset.hp_secondaries1, "");
CHECK(secondaries == _primary_states.pc.hp_secondaries1, "");

host_port node;
GET_HOST_PORT(proposal, node, node);
if (node == proposal.config.hp_primary1) {
if (node == primary) {
CHECK_EQ(proposal.node, proposal.config.primary1);
RESET_IP_AND_HOST_PORT(proposal.config, primary1);
} else {
CHECK_NE(proposal.node, proposal.config.primary1);
CHECK(replica_helper::remove_node(proposal.node, proposal.config.secondaries1),
"remove node failed, node = {}",
proposal.node);
CHECK(replica_helper::remove_node(node, proposal.config.hp_secondaries1),
"remove node failed, node = {}",
node);
if (proposal.config.__isset.hp_secondaries1) {
CHECK(replica_helper::remove_node(node, proposal.config.hp_secondaries1),
"remove node failed, node = {}",
node);
} else {
LOG_WARNING_PREFIX("proposal.config.hp_secondaries field is not set");
}
}

update_configuration_on_meta_server(
Expand All @@ -315,26 +343,36 @@ void replica::remove(configuration_update_request &proposal)
return;

CHECK_EQ(proposal.config.pid, _primary_states.pc.pid);
CHECK_EQ(proposal.config.hp_primary1, _primary_states.pc.hp_primary1);
CHECK(proposal.config.hp_secondaries1 == _primary_states.pc.hp_secondaries1, "");
dsn::host_port primary;
GET_HOST_PORT(proposal.config, primary1, primary);
CHECK(_primary_states.pc.__isset.hp_primary1, "");
CHECK_EQ(primary, _primary_states.pc.hp_primary1);
std::vector<dsn::host_port> secondaries;
GET_HOST_PORTS(proposal.config, secondaries1, secondaries);
CHECK(_primary_states.pc.__isset.hp_secondaries1, "");
CHECK(secondaries == _primary_states.pc.hp_secondaries1, "");

host_port node;
GET_HOST_PORT(proposal, node, node);
auto st = _primary_states.get_node_status(node);

switch (st) {
case partition_status::PS_PRIMARY:
CHECK_EQ(proposal.config.hp_primary1, node);
CHECK_EQ(primary, node);
CHECK_EQ(proposal.config.primary1, proposal.node);
RESET_IP_AND_HOST_PORT(proposal.config, primary1);
break;
case partition_status::PS_SECONDARY: {
CHECK(replica_helper::remove_node(proposal.node, proposal.config.secondaries1),
"remove node failed, node = {}",
proposal.node);
CHECK(replica_helper::remove_node(node, proposal.config.hp_secondaries1),
"remove_node failed, node = {}",
node);
if (proposal.config.__isset.hp_secondaries1) {
CHECK(replica_helper::remove_node(node, proposal.config.hp_secondaries1),
"remove_node failed, node = {}",
node);
} else {
LOG_WARNING_PREFIX("proposal.config.hp_secondaries field is not set");
}
} break;
case partition_status::PS_POTENTIAL_SECONDARY:
break;
Expand Down Expand Up @@ -388,7 +426,9 @@ void replica::update_configuration_on_meta_server(config_type::type type,
CHECK(status() == partition_status::PS_INACTIVE && _inactive_is_transient &&
_is_initializing,
"");
CHECK_EQ(new_pc.hp_primary1, node);
dsn::host_port primary;
GET_HOST_PORT(new_pc, primary1, primary);
CHECK_EQ(primary, node);
} else if (type != config_type::CT_ASSIGN_PRIMARY &&
type != config_type::CT_UPGRADE_TO_PRIMARY) {
CHECK_EQ(status(), partition_status::PS_PRIMARY);
Expand Down Expand Up @@ -502,8 +542,18 @@ void replica::on_update_configuration_on_meta_server_reply(
// post-update work items?
if (resp.err == ERR_OK) {
CHECK_EQ(req->config.pid, resp.config.pid);
CHECK_EQ(req->config.hp_primary1, resp.config.hp_primary1);
CHECK(req->config.hp_secondaries1 == resp.config.hp_secondaries1, "");
CHECK_EQ(req->config.primary1, resp.config.primary1);
dsn::host_port req_primary;
GET_HOST_PORT(req->config, primary1, req_primary);
dsn::host_port resp_primary;
GET_HOST_PORT(resp.config, primary1, resp_primary);
CHECK_EQ(req_primary, resp_primary);
CHECK(req->config.secondaries1 == resp.config.secondaries1, "");
std::vector<dsn::host_port> req_secondaries;
GET_HOST_PORTS(req->config, secondaries1, req_secondaries);
std::vector<dsn::host_port> resp_secondaries;
GET_HOST_PORTS(resp.config, secondaries1, resp_secondaries);
CHECK(req_secondaries == resp_secondaries, "");

switch (req->type) {
case config_type::CT_UPGRADE_TO_PRIMARY:
Expand Down Expand Up @@ -654,14 +704,17 @@ bool replica::update_configuration(const partition_configuration &pc)

if (rconfig.status == partition_status::PS_PRIMARY &&
(rconfig.ballot > get_ballot() || status() != partition_status::PS_PRIMARY)) {
_primary_states.reset_membership(pc, pc.hp_primary1 != _stub->primary_host_port());
dsn::host_port primary;
GET_HOST_PORT(pc, primary1, primary);
_primary_states.reset_membership(pc, primary != _stub->primary_host_port());
}

if (pc.ballot > get_ballot() ||
is_same_ballot_status_change_allowed(status(), rconfig.status)) {
return update_local_configuration(rconfig, true);
} else
} else {
return false;
}
}

bool replica::is_same_ballot_status_change_allowed(partition_status::type olds,
Expand Down Expand Up @@ -1041,6 +1094,7 @@ bool replica::update_local_configuration(const replica_configuration &config,
init_prepare(next, false);
}

CHECK(_primary_states.pc.__isset.hp_secondaries1, "");
if (_primary_states.pc.hp_secondaries1.size() + 1 <
_options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count)) {
std::vector<mutation_ptr> queued;
Expand Down Expand Up @@ -1082,6 +1136,8 @@ void replica::on_config_sync(const app_info &info,
update_app_envs(info.envs);
_is_duplication_master = info.duplicating;

dsn::host_port primary;
GET_HOST_PORT(pc, primary1, primary);
if (status() == partition_status::PS_PRIMARY) {
if (nullptr != _primary_states.reconfiguration_task) {
// already under reconfiguration, skip configuration sync
Expand All @@ -1091,10 +1147,10 @@ void replica::on_config_sync(const app_info &info,
} else {
if (_is_initializing) {
// in initializing, when replica still primary, need to inc ballot
if (pc.hp_primary1 == _stub->primary_host_port() &&
if (primary == _stub->primary_host_port() &&
status() == partition_status::PS_INACTIVE && _inactive_is_transient) {
update_configuration_on_meta_server(config_type::CT_PRIMARY_FORCE_UPDATE_BALLOT,
pc.hp_primary1,
primary,
const_cast<partition_configuration &>(pc));
return;
}
Expand All @@ -1104,9 +1160,9 @@ void replica::on_config_sync(const app_info &info,
update_configuration(pc);

if (status() == partition_status::PS_INACTIVE && !_inactive_is_transient) {
if (pc.hp_primary1 == _stub->primary_host_port() // dead primary
if (primary == _stub->primary_host_port() // dead primary
||
!pc.hp_primary1 // primary is dead (otherwise let primary remove this)
!primary // primary is dead (otherwise let primary remove this)
) {
LOG_INFO_PREFIX("downgrade myself as inactive is not transient, remote_config({})",
boost::lexical_cast<std::string>(pc));
Expand Down

0 comments on commit bedbce5

Please sign in to comment.