Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Jul 28, 2024
1 parent f4312bf commit 33904b2
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 38 deletions.
3 changes: 3 additions & 0 deletions src/client/replication_ddl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,9 @@ dsn::error_code replication_ddl_client::do_recovery(const std::vector<host_port>
CLEAR_IP_AND_HOST_PORT(*req, recovery_nodes1);
for (const auto &node : replica_nodes) {
if (utils::contains(req->hp_recovery_nodes1, node)) {
DCHECK(utils::contains(req->recovery_nodes1,
dsn::dns_resolver::instance().resolve_address(node)),
"");
out << "duplicate replica node " << node << ", just ignore it" << std::endl;
} else {
ADD_IP_AND_HOST_PORT_BY_DNS(*req, recovery_nodes1, node);
Expand Down
23 changes: 11 additions & 12 deletions src/meta/partition_guardian.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,14 +469,13 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi
}
}

// Use the action.hp_node after being updated.
if (action.hp_node1) {
CHECK(action.node1, "");
host_port node;
GET_HOST_PORT(action, node1, node);
if (node) {
SET_OBJ_IP_AND_HOST_PORT(action, target1, action, node1);
action.type = config_type::CT_ASSIGN_PRIMARY;

get_newly_partitions(*view.nodes, action.hp_node1)
->newly_add_primary(gpid.get_app_id(), false);
get_newly_partitions(*view.nodes, node)->newly_add_primary(gpid.get_app_id(), false);
} else {
LOG_WARNING("{}: don't select any node for security reason, administrator can select "
"a proper one by shell",
Expand Down Expand Up @@ -645,9 +644,9 @@ pc_status partition_guardian::on_missing_secondary(meta_view &view, const dsn::g
SET_IP_AND_HOST_PORT_BY_DNS(action, node1, server.node);
}

// Use the action.hp_node after being updated.
DCHECK(action.__isset.node1, "");
if (action.hp_node1) {
host_port node;
GET_HOST_PORT(action, node1, node);
if (node) {
LOG_INFO("gpid({}): choose node({}) as secondary coz it is last_dropped_node and is "
"alive now",
gpid,
Expand All @@ -660,13 +659,13 @@ pc_status partition_guardian::on_missing_secondary(meta_view &view, const dsn::g
}
}

// Use the action.hp_node after being updated.
DCHECK(action.__isset.node1, "");
if (action.hp_node1) {
host_port node;
GET_HOST_PORT(action, node1, node);
if (node) {
action.type = config_type::CT_ADD_SECONDARY;
SET_OBJ_IP_AND_HOST_PORT(action, target1, pc, primary);

newly_partitions *np = get_newly_partitions(*(view.nodes), action.hp_node1);
newly_partitions *np = get_newly_partitions(*(view.nodes), node);
CHECK_NOTNULL(np, "");
np->newly_add_partition(gpid.get_app_id());

Expand Down
18 changes: 10 additions & 8 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2594,31 +2594,33 @@ bool server_state::check_all_partitions()
GET_HOST_PORTS(*pc, secondaries, secondaries);
if (!add_secondary_proposed[i] && secondaries.empty()) {
const auto &action = add_secondary_actions[i];
CHECK(action.hp_node1, "");
dsn::host_port node;
GET_HOST_PORT(action, node1, node);
CHECK(node, "");
if (_add_secondary_enable_flow_control &&
add_secondary_running_nodes[action.hp_node1] >=
_add_secondary_max_count_for_one_node) {
add_secondary_running_nodes[node] >= _add_secondary_max_count_for_one_node) {
// ignore
continue;
}
std::shared_ptr<app_state> app = get_app(pid.get_app_id());
send_proposal(action, *pc, *app);
send_proposal_count++;
add_secondary_proposed[i] = true;
add_secondary_running_nodes[action.hp_node1]++;
add_secondary_running_nodes[node]++;
}
}

// assign secondary for all
for (int i = 0; i < add_secondary_actions.size(); ++i) {
if (!add_secondary_proposed[i]) {
const auto &action = add_secondary_actions[i];
CHECK(action.hp_node1, "");
dsn::host_port node;
GET_HOST_PORT(action, node1, node);
CHECK(node, "");
gpid pid = add_secondary_gpids[i];
const auto *pc = get_config(_all_apps, pid);
if (_add_secondary_enable_flow_control &&
add_secondary_running_nodes[action.hp_node1] >=
_add_secondary_max_count_for_one_node) {
add_secondary_running_nodes[node] >= _add_secondary_max_count_for_one_node) {
LOG_INFO("do not send {} proposal for gpid({}) for flow control reason, target = "
"{}, node = {}",
::dsn::enum_to_string(action.type),
Expand All @@ -2631,7 +2633,7 @@ bool server_state::check_all_partitions()
send_proposal(action, *pc, *app);
send_proposal_count++;
add_secondary_proposed[i] = true;
add_secondary_running_nodes[action.hp_node1]++;
add_secondary_running_nodes[node]++;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/meta/test/meta_partition_guardian_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ static void apply_update_request(/*in-out*/ configuration_update_request &update

case config_type::CT_ADD_SECONDARY:
case config_type::CT_ADD_SECONDARY_FOR_LB:
ADD_IP_AND_HOST_PORT(pc, secondaries, update_req.node1, update_req.hp_node1);
ADD_OBJ_IP_AND_HOST_PORT(pc, secondaries, update_req, node1);
update_req.type = config_type::CT_UPGRADE_TO_SECONDARY;
break;

Expand Down
10 changes: 5 additions & 5 deletions src/meta/test/misc/misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,13 @@ void track_disk_info_check_and_apply(const dsn::replication::configuration_propo
config_context *cc = get_config_context(apps, pid);
CHECK_NOTNULL(cc, "");

dsn::host_port hp_target, node;
dsn::host_port hp_target, hp_node;
GET_HOST_PORT(act, target1, hp_target);
GET_HOST_PORT(act, node1, node);
GET_HOST_PORT(act, node1, hp_node);

fs_manager *target_manager = get_fs_manager(manager, hp_target);
CHECK_NOTNULL(target_manager, "");
fs_manager *node_manager = get_fs_manager(manager, node);
fs_manager *node_manager = get_fs_manager(manager, hp_node);
CHECK_NOTNULL(node_manager, "");

std::string dir;
Expand All @@ -260,7 +260,7 @@ void track_disk_info_check_and_apply(const dsn::replication::configuration_propo
auto selected = node_manager->find_best_dir_for_new_replica(pid);
CHECK_NOTNULL(selected, "");
selected->holding_replicas[pid.get_app_id()].emplace(pid);
cc->collect_serving_replica(node, ri);
cc->collect_serving_replica(hp_node, ri);
break;
}
case config_type::CT_DOWNGRADE_TO_SECONDARY:
Expand All @@ -270,7 +270,7 @@ void track_disk_info_check_and_apply(const dsn::replication::configuration_propo
case config_type::CT_REMOVE:
case config_type::CT_DOWNGRADE_TO_INACTIVE:
node_manager->remove_replica(pid);
cc->remove_from_serving(node);
cc->remove_from_serving(hp_node);
break;

default:
Expand Down
4 changes: 2 additions & 2 deletions src/replica/replica_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,9 @@ void replica::downgrade_to_secondary_on_primary(configuration_update_request &pr
CHECK_EQ(proposal.node1, proposal.config.primary);

RESET_IP_AND_HOST_PORT(proposal.config, primary);
ADD_IP_AND_HOST_PORT(proposal.config, secondaries, proposal.node1, proposal.hp_node1);
ADD_OBJ_IP_AND_HOST_PORT(proposal.config, secondaries, proposal, node1);
update_configuration_on_meta_server(
config_type::CT_DOWNGRADE_TO_SECONDARY, proposal.hp_node1, proposal.config);
config_type::CT_DOWNGRADE_TO_SECONDARY, node, proposal.config);
}

void replica::downgrade_to_inactive_on_primary(configuration_update_request &proposal)
Expand Down
18 changes: 18 additions & 0 deletions src/runtime/rpc/rpc_host_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,24 @@ class TProtocol;
DCHECK_EQ(_obj.field.size(), _obj.hp_##field.size()); \
} while (0)

// Add '<src_field>' and 'hp_<src_field>' of 'src_obj' to the '<dst_field>' and optional
// 'hp_<dst_field>' of 'dst_obj'. The types of the fields are rpc_address and host_port,
// respectively.
#define ADD_OBJ_IP_AND_HOST_PORT(dst_obj, dst_field, src_obj, src_field) \
do { \
const auto &_src_obj = (src_obj); \
auto &_dst_obj = (dst_obj); \
DCHECK_EQ(_src_obj.src_field, \
dsn::dns_resolver::instance().resolve_address(_src_obj.hp_##src_field)); \
_dst_obj.dst_field.push_back(_src_obj.src_field); \
if (!_dst_obj.__isset.hp_##dst_field) { \
_dst_obj.__set_hp_##dst_field({_src_obj.hp_##src_field}); \
} else { \
_dst_obj.hp_##dst_field.push_back(_src_obj.hp_##src_field); \
} \
DCHECK_EQ(_dst_obj.dst_field.size(), _dst_obj.hp_##dst_field.size()); \
} while (0)

#define SET_IPS_AND_HOST_PORTS_BY_DNS_1(obj, field, hp1) \
do { \
auto &_obj = (obj); \
Expand Down
29 changes: 19 additions & 10 deletions src/shell/commands/recovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,18 +173,23 @@ dsn::host_port diagnose_recommend(const ddd_partition_info &pinfo)

std::vector<dsn::host_port> last_two_nodes(last_drops.end() - 2, last_drops.end());
std::vector<ddd_node_info> last_dropped;
for (auto &node : last_two_nodes) {
auto it = std::find_if(pinfo.dropped.begin(),
pinfo.dropped.end(),
[&node](const ddd_node_info &r) { return r.hp_node1 == node; });
for (const auto &node : last_two_nodes) {
auto it = std::find_if(
pinfo.dropped.begin(), pinfo.dropped.end(), [&node](const ddd_node_info &r) {
dsn::host_port drop_node;
GET_HOST_PORT(r, node1, drop_node);
return drop_node == node;
});
if (it->is_alive && it->is_collected)
last_dropped.push_back(*it);
}

if (last_dropped.size() == 1) {
const ddd_node_info &ninfo = last_dropped.back();
if (ninfo.last_committed_decree >= pinfo.config.last_committed_decree) {
return ninfo.hp_node1;
const ddd_node_info &latest = last_dropped.back();
if (latest.last_committed_decree >= pinfo.config.last_committed_decree) {
dsn::host_port node;
GET_HOST_PORT(latest, node1, node);
return node;
}
} else if (last_dropped.size() == 2) {
const ddd_node_info &secondary = last_dropped.front();
Expand All @@ -194,19 +199,23 @@ dsn::host_port diagnose_recommend(const ddd_partition_info &pinfo)
// - choose the node with the largest last committed decree
// - if last committed decree is the same, choose node with the largest ballot

dsn::host_port latest_node;
GET_HOST_PORT(latest, node1, latest_node);
dsn::host_port secondary_node;
GET_HOST_PORT(secondary, node1, secondary_node);
if (latest.last_committed_decree == secondary.last_committed_decree &&
latest.last_committed_decree >= pinfo.config.last_committed_decree) {
return latest.ballot >= secondary.ballot ? latest.hp_node1 : secondary.hp_node1;
return latest.ballot >= secondary.ballot ? latest_node : secondary_node;
}

if (latest.last_committed_decree > secondary.last_committed_decree &&
latest.last_committed_decree >= pinfo.config.last_committed_decree) {
return latest.hp_node1;
return latest_node;
}

if (secondary.last_committed_decree > latest.last_committed_decree &&
secondary.last_committed_decree >= pinfo.config.last_committed_decree) {
return secondary.hp_node1;
return secondary_node;
}
}

Expand Down

0 comments on commit 33904b2

Please sign in to comment.