From 33904b255d87df77915449696d4b19228df52471 Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Sun, 28 Jul 2024 23:29:30 +0800 Subject: [PATCH] 2 --- src/client/replication_ddl_client.cpp | 3 ++ src/meta/partition_guardian.cpp | 23 +++++++-------- src/meta/server_state.cpp | 18 +++++++----- .../test/meta_partition_guardian_test.cpp | 2 +- src/meta/test/misc/misc.cpp | 10 +++---- src/replica/replica_config.cpp | 4 +-- src/runtime/rpc/rpc_host_port.h | 18 ++++++++++++ src/shell/commands/recovery.cpp | 29 ++++++++++++------- 8 files changed, 69 insertions(+), 38 deletions(-) diff --git a/src/client/replication_ddl_client.cpp b/src/client/replication_ddl_client.cpp index de4ad750b3..52967bcfa2 100644 --- a/src/client/replication_ddl_client.cpp +++ b/src/client/replication_ddl_client.cpp @@ -931,6 +931,9 @@ dsn::error_code replication_ddl_client::do_recovery(const std::vector CLEAR_IP_AND_HOST_PORT(*req, recovery_nodes1); for (const auto &node : replica_nodes) { if (utils::contains(req->hp_recovery_nodes1, node)) { + DCHECK(utils::contains(req->recovery_nodes1, + dsn::dns_resolver::instance().resolve_address(node)), + ""); out << "duplicate replica node " << node << ", just ignore it" << std::endl; } else { ADD_IP_AND_HOST_PORT_BY_DNS(*req, recovery_nodes1, node); diff --git a/src/meta/partition_guardian.cpp b/src/meta/partition_guardian.cpp index 38e9df95a5..c5b99a7a71 100644 --- a/src/meta/partition_guardian.cpp +++ b/src/meta/partition_guardian.cpp @@ -469,14 +469,13 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi } } - // Use the action.hp_node after being updated. - if (action.hp_node1) { - CHECK(action.node1, ""); + host_port node; + GET_HOST_PORT(action, node1, node); + if (node) { SET_OBJ_IP_AND_HOST_PORT(action, target1, action, node1); action.type = config_type::CT_ASSIGN_PRIMARY; - get_newly_partitions(*view.nodes, action.hp_node1) - ->newly_add_primary(gpid.get_app_id(), false); + get_newly_partitions(*view.nodes, node)->newly_add_primary(gpid.get_app_id(), false); } else { LOG_WARNING("{}: don't select any node for security reason, administrator can select " "a proper one by shell", @@ -645,9 +644,9 @@ pc_status partition_guardian::on_missing_secondary(meta_view &view, const dsn::g SET_IP_AND_HOST_PORT_BY_DNS(action, node1, server.node); } - // Use the action.hp_node after being updated. - DCHECK(action.__isset.node1, ""); - if (action.hp_node1) { + host_port node; + GET_HOST_PORT(action, node1, node); + if (node) { LOG_INFO("gpid({}): choose node({}) as secondary coz it is last_dropped_node and is " "alive now", gpid, @@ -660,13 +659,13 @@ pc_status partition_guardian::on_missing_secondary(meta_view &view, const dsn::g } } - // Use the action.hp_node after being updated. - DCHECK(action.__isset.node1, ""); - if (action.hp_node1) { + host_port node; + GET_HOST_PORT(action, node1, node); + if (node) { action.type = config_type::CT_ADD_SECONDARY; SET_OBJ_IP_AND_HOST_PORT(action, target1, pc, primary); - newly_partitions *np = get_newly_partitions(*(view.nodes), action.hp_node1); + newly_partitions *np = get_newly_partitions(*(view.nodes), node); CHECK_NOTNULL(np, ""); np->newly_add_partition(gpid.get_app_id()); diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index ed99533c0f..e5ecfd55fe 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -2594,10 +2594,11 @@ bool server_state::check_all_partitions() GET_HOST_PORTS(*pc, secondaries, secondaries); if (!add_secondary_proposed[i] && secondaries.empty()) { const auto &action = add_secondary_actions[i]; - CHECK(action.hp_node1, ""); + dsn::host_port node; + GET_HOST_PORT(action, node1, node); + CHECK(node, ""); if (_add_secondary_enable_flow_control && - add_secondary_running_nodes[action.hp_node1] >= - _add_secondary_max_count_for_one_node) { + add_secondary_running_nodes[node] >= _add_secondary_max_count_for_one_node) { // ignore continue; } @@ -2605,7 +2606,7 @@ bool server_state::check_all_partitions() send_proposal(action, *pc, *app); send_proposal_count++; add_secondary_proposed[i] = true; - add_secondary_running_nodes[action.hp_node1]++; + add_secondary_running_nodes[node]++; } } @@ -2613,12 +2614,13 @@ bool server_state::check_all_partitions() for (int i = 0; i < add_secondary_actions.size(); ++i) { if (!add_secondary_proposed[i]) { const auto &action = add_secondary_actions[i]; - CHECK(action.hp_node1, ""); + dsn::host_port node; + GET_HOST_PORT(action, node1, node); + CHECK(node, ""); gpid pid = add_secondary_gpids[i]; const auto *pc = get_config(_all_apps, pid); if (_add_secondary_enable_flow_control && - add_secondary_running_nodes[action.hp_node1] >= - _add_secondary_max_count_for_one_node) { + add_secondary_running_nodes[node] >= _add_secondary_max_count_for_one_node) { LOG_INFO("do not send {} proposal for gpid({}) for flow control reason, target = " "{}, node = {}", ::dsn::enum_to_string(action.type), @@ -2631,7 +2633,7 @@ bool server_state::check_all_partitions() send_proposal(action, *pc, *app); send_proposal_count++; add_secondary_proposed[i] = true; - add_secondary_running_nodes[action.hp_node1]++; + add_secondary_running_nodes[node]++; } } diff --git a/src/meta/test/meta_partition_guardian_test.cpp b/src/meta/test/meta_partition_guardian_test.cpp index b980453305..cd6c76d077 100644 --- a/src/meta/test/meta_partition_guardian_test.cpp +++ b/src/meta/test/meta_partition_guardian_test.cpp @@ -89,7 +89,7 @@ static void apply_update_request(/*in-out*/ configuration_update_request &update case config_type::CT_ADD_SECONDARY: case config_type::CT_ADD_SECONDARY_FOR_LB: - ADD_IP_AND_HOST_PORT(pc, secondaries, update_req.node1, update_req.hp_node1); + ADD_OBJ_IP_AND_HOST_PORT(pc, secondaries, update_req, node1); update_req.type = config_type::CT_UPGRADE_TO_SECONDARY; break; diff --git a/src/meta/test/misc/misc.cpp b/src/meta/test/misc/misc.cpp index 092c9a7c05..598e58836d 100644 --- a/src/meta/test/misc/misc.cpp +++ b/src/meta/test/misc/misc.cpp @@ -236,13 +236,13 @@ void track_disk_info_check_and_apply(const dsn::replication::configuration_propo config_context *cc = get_config_context(apps, pid); CHECK_NOTNULL(cc, ""); - dsn::host_port hp_target, node; + dsn::host_port hp_target, hp_node; GET_HOST_PORT(act, target1, hp_target); - GET_HOST_PORT(act, node1, node); + GET_HOST_PORT(act, node1, hp_node); fs_manager *target_manager = get_fs_manager(manager, hp_target); CHECK_NOTNULL(target_manager, ""); - fs_manager *node_manager = get_fs_manager(manager, node); + fs_manager *node_manager = get_fs_manager(manager, hp_node); CHECK_NOTNULL(node_manager, ""); std::string dir; @@ -260,7 +260,7 @@ void track_disk_info_check_and_apply(const dsn::replication::configuration_propo auto selected = node_manager->find_best_dir_for_new_replica(pid); CHECK_NOTNULL(selected, ""); selected->holding_replicas[pid.get_app_id()].emplace(pid); - cc->collect_serving_replica(node, ri); + cc->collect_serving_replica(hp_node, ri); break; } case config_type::CT_DOWNGRADE_TO_SECONDARY: @@ -270,7 +270,7 @@ void track_disk_info_check_and_apply(const dsn::replication::configuration_propo case config_type::CT_REMOVE: case config_type::CT_DOWNGRADE_TO_INACTIVE: node_manager->remove_replica(pid); - cc->remove_from_serving(node); + cc->remove_from_serving(hp_node); break; default: diff --git a/src/replica/replica_config.cpp b/src/replica/replica_config.cpp index 682467eeff..c60007aec7 100644 --- a/src/replica/replica_config.cpp +++ b/src/replica/replica_config.cpp @@ -296,9 +296,9 @@ void replica::downgrade_to_secondary_on_primary(configuration_update_request &pr CHECK_EQ(proposal.node1, proposal.config.primary); RESET_IP_AND_HOST_PORT(proposal.config, primary); - ADD_IP_AND_HOST_PORT(proposal.config, secondaries, proposal.node1, proposal.hp_node1); + ADD_OBJ_IP_AND_HOST_PORT(proposal.config, secondaries, proposal, node1); update_configuration_on_meta_server( - config_type::CT_DOWNGRADE_TO_SECONDARY, proposal.hp_node1, proposal.config); + config_type::CT_DOWNGRADE_TO_SECONDARY, node, proposal.config); } void replica::downgrade_to_inactive_on_primary(configuration_update_request &proposal) diff --git a/src/runtime/rpc/rpc_host_port.h b/src/runtime/rpc/rpc_host_port.h index 196605b1ff..4d5b0dd26d 100644 --- a/src/runtime/rpc/rpc_host_port.h +++ b/src/runtime/rpc/rpc_host_port.h @@ -177,6 +177,24 @@ class TProtocol; DCHECK_EQ(_obj.field.size(), _obj.hp_##field.size()); \ } while (0) +// Add '' and 'hp_' of 'src_obj' to the '' and optional +// 'hp_' of 'dst_obj'. The types of the fields are rpc_address and host_port, +// respectively. +#define ADD_OBJ_IP_AND_HOST_PORT(dst_obj, dst_field, src_obj, src_field) \ + do { \ + const auto &_src_obj = (src_obj); \ + auto &_dst_obj = (dst_obj); \ + DCHECK_EQ(_src_obj.src_field, \ + dsn::dns_resolver::instance().resolve_address(_src_obj.hp_##src_field)); \ + _dst_obj.dst_field.push_back(_src_obj.src_field); \ + if (!_dst_obj.__isset.hp_##dst_field) { \ + _dst_obj.__set_hp_##dst_field({_src_obj.hp_##src_field}); \ + } else { \ + _dst_obj.hp_##dst_field.push_back(_src_obj.hp_##src_field); \ + } \ + DCHECK_EQ(_dst_obj.dst_field.size(), _dst_obj.hp_##dst_field.size()); \ + } while (0) + #define SET_IPS_AND_HOST_PORTS_BY_DNS_1(obj, field, hp1) \ do { \ auto &_obj = (obj); \ diff --git a/src/shell/commands/recovery.cpp b/src/shell/commands/recovery.cpp index ffd6c98906..8a8a889df7 100644 --- a/src/shell/commands/recovery.cpp +++ b/src/shell/commands/recovery.cpp @@ -173,18 +173,23 @@ dsn::host_port diagnose_recommend(const ddd_partition_info &pinfo) std::vector last_two_nodes(last_drops.end() - 2, last_drops.end()); std::vector last_dropped; - for (auto &node : last_two_nodes) { - auto it = std::find_if(pinfo.dropped.begin(), - pinfo.dropped.end(), - [&node](const ddd_node_info &r) { return r.hp_node1 == node; }); + for (const auto &node : last_two_nodes) { + auto it = std::find_if( + pinfo.dropped.begin(), pinfo.dropped.end(), [&node](const ddd_node_info &r) { + dsn::host_port drop_node; + GET_HOST_PORT(r, node1, drop_node); + return drop_node == node; + }); if (it->is_alive && it->is_collected) last_dropped.push_back(*it); } if (last_dropped.size() == 1) { - const ddd_node_info &ninfo = last_dropped.back(); - if (ninfo.last_committed_decree >= pinfo.config.last_committed_decree) { - return ninfo.hp_node1; + const ddd_node_info &latest = last_dropped.back(); + if (latest.last_committed_decree >= pinfo.config.last_committed_decree) { + dsn::host_port node; + GET_HOST_PORT(latest, node1, node); + return node; } } else if (last_dropped.size() == 2) { const ddd_node_info &secondary = last_dropped.front(); @@ -194,19 +199,23 @@ dsn::host_port diagnose_recommend(const ddd_partition_info &pinfo) // - choose the node with the largest last committed decree // - if last committed decree is the same, choose node with the largest ballot + dsn::host_port latest_node; + GET_HOST_PORT(latest, node1, latest_node); + dsn::host_port secondary_node; + GET_HOST_PORT(secondary, node1, secondary_node); if (latest.last_committed_decree == secondary.last_committed_decree && latest.last_committed_decree >= pinfo.config.last_committed_decree) { - return latest.ballot >= secondary.ballot ? latest.hp_node1 : secondary.hp_node1; + return latest.ballot >= secondary.ballot ? latest_node : secondary_node; } if (latest.last_committed_decree > secondary.last_committed_decree && latest.last_committed_decree >= pinfo.config.last_committed_decree) { - return latest.hp_node1; + return latest_node; } if (secondary.last_committed_decree > latest.last_committed_decree && secondary.last_committed_decree >= pinfo.config.last_committed_decree) { - return secondary.hp_node1; + return secondary_node; } }