diff --git a/idl/meta_admin.thrift b/idl/meta_admin.thrift index eec65717e3..493efc950b 100644 --- a/idl/meta_admin.thrift +++ b/idl/meta_admin.thrift @@ -68,15 +68,15 @@ struct configuration_update_request 1:dsn.layer2.app_info info; 2:dsn.layer2.partition_configuration config; 3:config_type type = config_type.CT_INVALID; - 4:dsn.rpc_address node; - 5:dsn.rpc_address host_node; // deprecated, only used by stateless apps + 4:dsn.rpc_address node1; + 5:dsn.rpc_address host_node1; // deprecated, only used by stateless apps // Used for partition split // if replica is splitting (whose split_status is not NOT_SPLIT) // the `meta_split_status` will be set // only used when on_config_sync 6:optional metadata.split_status meta_split_status; - 7:optional dsn.host_port hp_node; + 7:optional dsn.host_port hp_node1; } // meta server (config mgr) => primary | secondary (downgrade) (w/ new config) @@ -101,10 +101,10 @@ struct replica_server_info struct configuration_query_by_node_request { - 1:dsn.rpc_address node; + 1:dsn.rpc_address node1; 2:optional list stored_replicas; 3:optional replica_server_info info; - 4:optional dsn.host_port hp_node; + 4:optional dsn.host_port hp_node1; } struct configuration_query_by_node_response @@ -116,10 +116,10 @@ struct configuration_query_by_node_response struct configuration_recovery_request { - 1:list recovery_nodes; + 1:list recovery_nodes1; 2:bool skip_bad_nodes; 3:bool skip_lost_partitions; - 4:optional list hp_recovery_nodes; + 4:optional list hp_recovery_nodes1; } struct configuration_recovery_response @@ -207,8 +207,8 @@ struct configuration_list_apps_response struct query_app_info_request { - 1:dsn.rpc_address meta_server; - 2:optional dsn.host_port hp_meta_server; + 1:dsn.rpc_address meta_server1; + 2:optional dsn.host_port hp_meta_server1; } struct query_app_info_response @@ -283,8 +283,8 @@ struct query_app_manual_compact_response struct node_info { 1:node_status status = node_status.NS_INVALID; - 2:dsn.rpc_address node; - 3:optional dsn.host_port hp_node; + 2:dsn.rpc_address node1; + 3:optional dsn.host_port hp_node1; } struct configuration_list_nodes_request @@ -347,15 +347,15 @@ enum balancer_request_type struct configuration_proposal_action { - 1:dsn.rpc_address target; - 2:dsn.rpc_address node; + 1:dsn.rpc_address target1; + 2:dsn.rpc_address node1; 3:config_type type; // depricated now // new fields of this struct should start with 5 // 4:i64 period_ts; - 5:optional dsn.host_port hp_target; - 6:optional dsn.host_port hp_node; + 5:optional dsn.host_port hp_target1; + 6:optional dsn.host_port hp_node1; } struct configuration_balancer_request @@ -381,14 +381,14 @@ struct ddd_diagnose_request struct ddd_node_info { - 1:dsn.rpc_address node; + 1:dsn.rpc_address node1; 2:i64 drop_time_ms; 3:bool is_alive; // if the node is alive now 4:bool is_collected; // if replicas has been collected from this node 5:i64 ballot; // collected && ballot == -1 means replica not exist on this node 6:i64 last_committed_decree; 7:i64 last_prepared_decree; - 8:optional dsn.host_port hp_node; + 8:optional dsn.host_port hp_node1; } struct ddd_partition_info diff --git a/src/client/replication_ddl_client.cpp b/src/client/replication_ddl_client.cpp index 86c2c2d337..de4ad750b3 100644 --- a/src/client/replication_ddl_client.cpp +++ b/src/client/replication_ddl_client.cpp @@ -519,10 +519,10 @@ dsn::error_code replication_ddl_client::list_nodes( return resp.err; } - for (const auto &n : resp.infos) { - host_port hp; - GET_HOST_PORT(n, node, hp); - nodes[hp] = n.status; + for (const auto &ni : resp.infos) { + host_port node; + GET_HOST_PORT(ni, node1, node); + nodes[node] = ni.status; } return dsn::ERR_OK; @@ -928,22 +928,23 @@ dsn::error_code replication_ddl_client::do_recovery(const std::vector std::ostream out(buf); auto req = std::make_shared(); - CLEAR_IP_AND_HOST_PORT(*req, recovery_nodes); + CLEAR_IP_AND_HOST_PORT(*req, recovery_nodes1); for (const auto &node : replica_nodes) { - if (utils::contains(req->hp_recovery_nodes, node)) { - out << "duplicate replica node " << node << ", just ingore it" << std::endl; + if (utils::contains(req->hp_recovery_nodes1, node)) { + out << "duplicate replica node " << node << ", just ignore it" << std::endl; } else { - ADD_IP_AND_HOST_PORT_BY_DNS(*req, recovery_nodes, node); + ADD_IP_AND_HOST_PORT_BY_DNS(*req, recovery_nodes1, node); } } - if (req->hp_recovery_nodes.empty()) { - CHECK(req->recovery_nodes.empty(), - "recovery_nodes should be set together with hp_recovery_nodes"); + if (req->hp_recovery_nodes1.empty()) { + DCHECK(req->recovery_nodes1.empty(), + "recovery_nodes should be set together with hp_recovery_nodes"); out << "node set for recovery it empty" << std::endl; return ERR_INVALID_PARAMETERS; } - CHECK(!req->recovery_nodes.empty(), - "recovery_nodes should be set together with hp_recovery_nodes"); + DCHECK(!req->hp_recovery_nodes1.empty(), ""); + DCHECK(!req->recovery_nodes1.empty(), + "recovery_nodes should be set together with hp_recovery_nodes"); req->skip_bad_nodes = skip_bad_nodes; req->skip_lost_partitions = skip_lost_partitions; @@ -952,7 +953,7 @@ dsn::error_code replication_ddl_client::do_recovery(const std::vector out << "Skip lost partitions: " << (skip_lost_partitions ? "true" : "false") << std::endl; out << "Node list:" << std::endl; out << "=============================" << std::endl; - for (auto &node : req->hp_recovery_nodes) { + for (auto &node : req->hp_recovery_nodes1) { out << node << std::endl; } out << "=============================" << std::endl; diff --git a/src/meta/load_balance_policy.cpp b/src/meta/load_balance_policy.cpp index ff1529dd03..40ce063697 100644 --- a/src/meta/load_balance_policy.cpp +++ b/src/meta/load_balance_policy.cpp @@ -53,8 +53,8 @@ configuration_proposal_action new_proposal_action(const host_port &target, const host_port &node, config_type::type type) { configuration_proposal_action act; - SET_IP_AND_HOST_PORT_BY_DNS(act, target, target); - SET_IP_AND_HOST_PORT_BY_DNS(act, node, node); + SET_IP_AND_HOST_PORT_BY_DNS(act, target1, target); + SET_IP_AND_HOST_PORT_BY_DNS(act, node1, node); act.__set_type(type); return act; } diff --git a/src/meta/meta_data.cpp b/src/meta/meta_data.cpp index 3c9d8be9e4..9e54ef788b 100644 --- a/src/meta/meta_data.cpp +++ b/src/meta/meta_data.cpp @@ -189,14 +189,15 @@ void proposal_actions::reset_tracked_current_learner() current_learner.last_prepared_decree = invalid_decree; } -void proposal_actions::track_current_learner(const dsn::host_port &node, const replica_info &info) +void proposal_actions::track_current_learner(const dsn::host_port &learner, const replica_info &info) { if (empty()) { return; } const auto &act = acts.front(); - CHECK(act.hp_node, ""); - if (act.hp_node != node) { + host_port first_act_node; + GET_HOST_PORT(act, node1, first_act_node); + if (first_act_node != learner) { return; } @@ -216,7 +217,7 @@ void proposal_actions::track_current_learner(const dsn::host_port &node, const r learning_progress_abnormal_detected = true; } else { LOG_DEBUG( - "{}: ignore abnormal status of {}, perhaps learn not start", info.pid, node); + "{}: ignore abnormal status of {}, perhaps learn not start", info.pid, learner); } } else if (info.status == partition_status::PS_POTENTIAL_SECONDARY) { if (current_learner.ballot > info.ballot || @@ -226,7 +227,7 @@ void proposal_actions::track_current_learner(const dsn::host_port &node, const r // TODO: need to add a metric here. LOG_WARNING("{}: learner({})'s progress step back, please trace this carefully", info.pid, - node); + learner); } // NOTICE: the flag may be abormal currently. it's balancer's duty to make use of the diff --git a/src/meta/meta_data.h b/src/meta/meta_data.h index 23df944347..214d1ff38b 100644 --- a/src/meta/meta_data.h +++ b/src/meta/meta_data.h @@ -115,7 +115,7 @@ class proposal_actions public: proposal_actions(); void reset_tracked_current_learner(); - void track_current_learner(const host_port &node, const replica_info &info); + void track_current_learner(const host_port &learner, const replica_info &info); void clear(); // return the action in acts & whether the action is from balancer diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp index 1de9069bb9..0a9608e031 100644 --- a/src/meta/meta_service.cpp +++ b/src/meta/meta_service.cpp @@ -692,16 +692,16 @@ void meta_service::on_list_nodes(configuration_list_nodes_rpc rpc) dsn::replication::node_info info; if (request.status == node_status::NS_INVALID || request.status == node_status::NS_ALIVE) { info.status = node_status::NS_ALIVE; - for (auto &node : _alive_set) { - SET_IP_AND_HOST_PORT_BY_DNS(info, node, node); + for (const auto &node : _alive_set) { + SET_IP_AND_HOST_PORT_BY_DNS(info, node1, node); response.infos.push_back(info); } } if (request.status == node_status::NS_INVALID || request.status == node_status::NS_UNALIVE) { info.status = node_status::NS_UNALIVE; - for (auto &node : _dead_set) { - SET_IP_AND_HOST_PORT_BY_DNS(info, node, node); + for (const auto &node : _dead_set) { + SET_IP_AND_HOST_PORT_BY_DNS(info, node1, node); response.infos.push_back(info); } } diff --git a/src/meta/meta_split_service.cpp b/src/meta/meta_split_service.cpp index 2b61245653..b2fe2e99bc 100644 --- a/src/meta/meta_split_service.cpp +++ b/src/meta/meta_split_service.cpp @@ -305,7 +305,7 @@ void meta_split_service::on_add_child_on_remote_storage_reply(error_code ec, update_child_request->config = request.child_config; update_child_request->info = *app; update_child_request->type = config_type::CT_REGISTER_CHILD; - SET_OBJ_IP_AND_HOST_PORT(*update_child_request, node, request, primary); + SET_OBJ_IP_AND_HOST_PORT(*update_child_request, node1, request, primary); // TODO(yingchun): should use conference? auto child_pc = app->pcs[child_gpid.get_partition_index()]; diff --git a/src/meta/partition_guardian.cpp b/src/meta/partition_guardian.cpp index 5ba4593ceb..38e9df95a5 100644 --- a/src/meta/partition_guardian.cpp +++ b/src/meta/partition_guardian.cpp @@ -136,16 +136,16 @@ void partition_guardian::reconfig(meta_view view, const configuration_update_req } } else { when_update_replicas(request.type, [cc, &request](bool is_adding) { - host_port hp; - GET_HOST_PORT(request, node, hp); + host_port node; + GET_HOST_PORT(request, node1, node); if (is_adding) { - cc->remove_from_dropped(hp); + cc->remove_from_dropped(node); // when some replicas are added to partition_config // we should try to adjust the size of drop_list cc->check_size(); } else { - cc->remove_from_serving(hp); - CHECK(cc->record_drop_history(hp), "node({}) has been in the dropped", hp); + cc->remove_from_serving(node); + CHECK(cc->record_drop_history(node), "node({}) has been in the dropped", node); } }); } @@ -166,7 +166,7 @@ bool partition_guardian::from_proposals(meta_view &view, action = *(cc.lb_actions.front()); host_port target; host_port node; - GET_HOST_PORT(action, target, target); + GET_HOST_PORT(action, target1, target); host_port primary; GET_HOST_PORT(pc, primary, primary); std::string reason; @@ -178,7 +178,7 @@ bool partition_guardian::from_proposals(meta_view &view, reason = fmt::format("action target({}) is not alive", target); goto invalid_action; } - GET_HOST_PORT(action, node, node); + GET_HOST_PORT(action, node1, node); if (!node) { reason = "action node is invalid"; goto invalid_action; @@ -258,7 +258,7 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi std::vector last_drops; GET_HOST_PORTS(pc, last_drops, last_drops); if (!secondaries.empty()) { - RESET_IP_AND_HOST_PORT(action, node); + RESET_IP_AND_HOST_PORT(action, node1); for (const auto &secondary : secondaries) { const auto ns = get_node_state(*(view.nodes), secondary, false); CHECK_NOTNULL(ns, "invalid secondary: {}", secondary); @@ -268,16 +268,16 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi // find a node with minimal primaries host_port node; - GET_HOST_PORT(action, node, node); + GET_HOST_PORT(action, node1, node); auto *np = newly_partitions_ext::get_inited(ns); if (!node || np->less_primaries(*get_newly_partitions(*(view.nodes), node), gpid.get_app_id())) { - SET_IP_AND_HOST_PORT_BY_DNS(action, node, ns->host_port()); + SET_IP_AND_HOST_PORT_BY_DNS(action, node1, ns->host_port()); } } host_port node; - GET_HOST_PORT(action, node, node); + GET_HOST_PORT(action, node1, node); if (!node) { LOG_ERROR( "all nodes for gpid({}) are dead, waiting for some secondary to come back....", @@ -288,7 +288,7 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi newly_partitions *np = get_newly_partitions(*(view.nodes), node); np->newly_add_primary(gpid.get_app_id(), true); - SET_OBJ_IP_AND_HOST_PORT(action, target, action, node); + SET_OBJ_IP_AND_HOST_PORT(action, target1, action, node1); result = pc_status::ill; } } @@ -312,8 +312,8 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi } if (min_primary_server_np != nullptr) { - SET_IP_AND_HOST_PORT_BY_DNS(action, node, min_primary_server); - SET_OBJ_IP_AND_HOST_PORT(action, target, action, node); + SET_IP_AND_HOST_PORT_BY_DNS(action, node1, min_primary_server); + SET_OBJ_IP_AND_HOST_PORT(action, target1, action, node1); action.type = config_type::CT_ASSIGN_PRIMARY; min_primary_server_np->newly_add_primary(gpid.get_app_id(), false); } @@ -330,7 +330,7 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi // so the last removed replica can't act as primary directly. std::string reason; config_context &cc = *get_config_context(*view.apps, gpid); - RESET_IP_AND_HOST_PORT(action, node); + RESET_IP_AND_HOST_PORT(action, node1); for (int i = 0; i < cc.dropped.size(); ++i) { const dropped_replica &dr = cc.dropped[i]; char time_buf[30] = {0}; @@ -367,7 +367,7 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi LOG_WARNING("{}: the only node({}) is dead, waiting it to come back", gpid_name, FMT_HOST_PORT_AND_IP(pc, last_drops.back())); - SET_OBJ_IP_AND_HOST_PORT(action, node, pc, last_drops.back()); + SET_OBJ_IP_AND_HOST_PORT(action, node1, pc, last_drops.back()); } else { std::vector nodes(last_drops.end() - 2, last_drops.end()); std::vector collected_info(2); @@ -446,10 +446,10 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi ? previous_dead.node : recent_dead.node; } - SET_IP_AND_HOST_PORT_BY_DNS(action, node, hp); + SET_IP_AND_HOST_PORT_BY_DNS(action, node1, hp); LOG_INFO("{}: select {} as a new primary", gpid_name, - FMT_HOST_PORT_AND_IP(action, node)); + FMT_HOST_PORT_AND_IP(action, node1)); } else { char buf[1000]; sprintf(buf, @@ -470,12 +470,12 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi } // Use the action.hp_node after being updated. - if (action.hp_node) { - CHECK(action.node, ""); - SET_OBJ_IP_AND_HOST_PORT(action, target, action, node); + if (action.hp_node1) { + CHECK(action.node1, ""); + SET_OBJ_IP_AND_HOST_PORT(action, target1, action, node1); action.type = config_type::CT_ASSIGN_PRIMARY; - get_newly_partitions(*view.nodes, action.hp_node) + get_newly_partitions(*view.nodes, action.hp_node1) ->newly_add_primary(gpid.get_app_id(), false); } else { LOG_WARNING("{}: don't select any node for security reason, administrator can select " @@ -489,7 +489,7 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi for (int i = 0; i < cc.dropped.size(); ++i) { const dropped_replica &dr = cc.dropped[i]; ddd_node_info ninfo; - SET_IP_AND_HOST_PORT_BY_DNS(ninfo, node, dr.node); + SET_IP_AND_HOST_PORT_BY_DNS(ninfo, node1, dr.node); ninfo.drop_time_ms = dr.time; ninfo.ballot = invalid_ballot; ninfo.last_committed_decree = invalid_decree; @@ -555,7 +555,7 @@ pc_status partition_guardian::on_missing_secondary(meta_view &view, const dsn::g cc.dropped.back().node); is_emergency = true; } - RESET_IP_AND_HOST_PORT(action, node); + RESET_IP_AND_HOST_PORT(action, node1); if (is_emergency) { std::ostringstream oss; @@ -589,7 +589,7 @@ pc_status partition_guardian::on_missing_secondary(meta_view &view, const dsn::g cc.prefered_dropped, cc.prefered_dropped, cc.prefered_dropped - 1); - SET_IP_AND_HOST_PORT_BY_DNS(action, node, server.node); + SET_IP_AND_HOST_PORT_BY_DNS(action, node1, server.node); cc.prefered_dropped--; break; } else { @@ -605,7 +605,7 @@ pc_status partition_guardian::on_missing_secondary(meta_view &view, const dsn::g } host_port node; - GET_HOST_PORT(action, node, node); + GET_HOST_PORT(action, node1, node); if (!node || in_black_list(node)) { if (node) { LOG_INFO( @@ -619,17 +619,18 @@ pc_status partition_guardian::on_missing_secondary(meta_view &view, const dsn::g newly_partitions *np = newly_partitions_ext::get_inited(&ns); if (min_server_np == nullptr || np->less_partitions(*min_server_np, gpid.get_app_id())) { - SET_IP_AND_HOST_PORT_BY_DNS(action, node, ns.host_port()); + SET_IP_AND_HOST_PORT_BY_DNS(action, node1, ns.host_port()); min_server_np = np; } } // Use the action.hp_node after being updated. - if (action.hp_node) { + DCHECK(action.__isset.node1, ""); + if (action.hp_node1) { LOG_INFO("gpid({}): can't find valid node in dropped list to add as secondary, " "choose new node({}) with minimal partitions serving", gpid, - action.hp_node); + node); } else { LOG_INFO("gpid({}): can't find valid node in dropped list to add as secondary, " "but also we can't find a new node to add as secondary", @@ -641,11 +642,12 @@ pc_status partition_guardian::on_missing_secondary(meta_view &view, const dsn::g const dropped_replica &server = cc.dropped.back(); if (is_node_alive(*view.nodes, server.node)) { CHECK(server.node, "invalid server address, address = {}", server.node); - SET_IP_AND_HOST_PORT_BY_DNS(action, node, server.node); + SET_IP_AND_HOST_PORT_BY_DNS(action, node1, server.node); } // Use the action.hp_node after being updated. - if (action.hp_node) { + DCHECK(action.__isset.node1, ""); + if (action.hp_node1) { LOG_INFO("gpid({}): choose node({}) as secondary coz it is last_dropped_node and is " "alive now", gpid, @@ -659,11 +661,12 @@ pc_status partition_guardian::on_missing_secondary(meta_view &view, const dsn::g } // Use the action.hp_node after being updated. - if (action.hp_node) { + DCHECK(action.__isset.node1, ""); + if (action.hp_node1) { action.type = config_type::CT_ADD_SECONDARY; - SET_OBJ_IP_AND_HOST_PORT(action, target, pc, primary); + SET_OBJ_IP_AND_HOST_PORT(action, target1, pc, primary); - newly_partitions *np = get_newly_partitions(*(view.nodes), action.hp_node); + newly_partitions *np = get_newly_partitions(*(view.nodes), action.hp_node1); CHECK_NOTNULL(np, ""); np->newly_add_partition(gpid.get_app_id()); @@ -691,8 +694,8 @@ pc_status partition_guardian::on_redundant_secondary(meta_view &view, const dsn: configuration_proposal_action action; action.type = config_type::CT_REMOVE; - SET_OBJ_IP_AND_HOST_PORT(action, node, pc, secondaries[target]); - SET_OBJ_IP_AND_HOST_PORT(action, target, pc, primary); + SET_OBJ_IP_AND_HOST_PORT(action, node1, pc, secondaries[target]); + SET_OBJ_IP_AND_HOST_PORT(action, target1, pc, primary); // TODO: treat remove as cure proposals too get_config_context(*view.apps, gpid)->lb_actions.assign_balancer_proposals({action}); @@ -704,7 +707,7 @@ void partition_guardian::finish_cure_proposal(meta_view &view, const configuration_proposal_action &act) { host_port target; - GET_HOST_PORT(act, node, target); + GET_HOST_PORT(act, node1, target); newly_partitions *np = get_newly_partitions(*(view.nodes), target); if (np == nullptr) { LOG_INFO("can't get the newly_partitions extension structure for node({}), " diff --git a/src/meta/server_load_balancer.cpp b/src/meta/server_load_balancer.cpp index a3016634a4..ae5b533aa2 100644 --- a/src/meta/server_load_balancer.cpp +++ b/src/meta/server_load_balancer.cpp @@ -175,7 +175,9 @@ void server_load_balancer::register_proposals(meta_view view, // to send the proposal to. // for these proposals, they should keep the target empty and // the meta-server will fill primary as target. - if (act.target) { + host_port target; + GET_HOST_PORT(act, target1, target); + if (target) { continue; } @@ -186,7 +188,7 @@ void server_load_balancer::register_proposals(meta_view view, return; } - SET_OBJ_IP_AND_HOST_PORT(act, target, pc, primary); + SET_OBJ_IP_AND_HOST_PORT(act, target1, pc, primary); } resp.err = ERR_OK; diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index 650eb6b8df..dc552c328c 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -805,19 +805,19 @@ void server_state::on_config_sync(configuration_query_by_node_rpc rpc) bool reject_this_request = false; response.__isset.gc_replicas = false; - host_port node; - GET_HOST_PORT(request, node, node); + host_port req_node; + GET_HOST_PORT(request, node1, req_node); LOG_INFO("got config sync request from {}, stored_replicas_count({})", - node, + req_node, request.stored_replicas.size()); { zauto_read_lock l(_lock); // sync the partitions to the replica server - node_state *ns = get_node_state(_nodes, node, false); + node_state *ns = get_node_state(_nodes, req_node, false); if (ns == nullptr) { - LOG_INFO("node({}) not found in meta server", node); + LOG_INFO("node({}) not found in meta server", req_node); response.err = ERR_OBJECT_NOT_FOUND; } else { response.err = ERR_OK; @@ -841,15 +841,15 @@ void server_state::on_config_sync(configuration_query_by_node_rpc rpc) } host_port target; - GET_HOST_PORT(*pending_request, node, target); - if (target == node) { + GET_HOST_PORT(*pending_request, node1, target); + if (target == req_node) { return false; } } response.partitions[i].info = *app; response.partitions[i].config = app->pcs[pid.get_partition_index()]; - response.partitions[i].host_node = request.node; + response.partitions[i].host_node1 = request.node1; // set meta_split_status const split_state &app_split_states = app->helpers->split_states; if (app->splitting()) { @@ -878,7 +878,7 @@ void server_state::on_config_sync(configuration_query_by_node_rpc rpc) // the app is deleted but not expired, we need to ignore it // if the app is deleted and expired, we need to gc it for (const replica_info &rep : replicas) { - LOG_DEBUG("receive stored replica from {}, pid({})", node, rep.pid); + LOG_DEBUG("receive stored replica from {}, pid({})", req_node, rep.pid); std::shared_ptr app = get_app(rep.pid.get_app_id()); if (app == nullptr || rep.pid.get_partition_index() >= app->partition_count) { // This app has garbage partition after cancel split, the canceled child @@ -890,7 +890,7 @@ void server_state::on_config_sync(configuration_query_by_node_rpc rpc) LOG_WARNING( "notify node({}) to gc replica({}) because it is useless partition " "which is caused by cancel split", - node, + req_node, rep.pid); } else { // app is not recognized or partition is not recognized @@ -898,14 +898,14 @@ void server_state::on_config_sync(configuration_query_by_node_rpc rpc) "gpid({}) on node({}) is not exist on meta server, administrator " "should check consistency of meta data", rep.pid, - node); + req_node); } } else if (app->status == app_status::AS_DROPPED) { if (app->expire_second == 0) { LOG_INFO("gpid({}) on node({}) is of dropped table, but expire second is " "not specified, do not delete it for safety reason", rep.pid, - node); + req_node); } else if (has_seconds_expired(app->expire_second)) { // can delete replica only when expire second is explicitely specified and // expired. @@ -914,29 +914,29 @@ void server_state::on_config_sync(configuration_query_by_node_rpc rpc) "current function level is {}, do not delete it for safety " "reason", rep.pid, - node, + req_node, _meta_function_level_VALUES_TO_NAMES.find(level)->second); } else { response.gc_replicas.push_back(rep); LOG_WARNING("notify node({}) to gc replica({}) coz the app is " "dropped and expired", - node, + req_node, rep.pid); } } } else if (app->status == app_status::AS_AVAILABLE) { - bool is_useful_replica = collect_replica({&_all_apps, &_nodes}, node, rep); + bool is_useful_replica = collect_replica({&_all_apps, &_nodes}, req_node, rep); if (!is_useful_replica) { if (level <= meta_function_level::fl_steady) { LOG_INFO("gpid({}) on node({}) is useless, but current function " "level is {}, do not delete it for safety reason", rep.pid, - node, + req_node, _meta_function_level_VALUES_TO_NAMES.find(level)->second); } else { response.gc_replicas.push_back(rep); LOG_WARNING("notify node({}) to gc replica({}) coz it is useless", - node, + req_node, rep.pid); } } @@ -955,7 +955,7 @@ void server_state::on_config_sync(configuration_query_by_node_rpc rpc) } LOG_INFO("send config sync response to {}, err({}), partitions_count({}), " "gc_replicas_count({})", - node, + req_node, response.err, response.partitions.size(), response.gc_replicas.size()); @@ -1442,7 +1442,7 @@ void server_state::send_proposal(const host_port &target, proposal.config.pid, proposal.config.ballot, target, - FMT_HOST_PORT_AND_IP(proposal, node)); + FMT_HOST_PORT_AND_IP(proposal, node1)); dsn::message_ex *msg = dsn::message_ex::create_request(RPC_CONFIG_PROPOSAL, 0, proposal.config.pid.thread_hash()); dsn::marshall(msg, proposal); @@ -1456,10 +1456,10 @@ void server_state::send_proposal(const configuration_proposal_action &action, configuration_update_request request; request.info = app; request.type = action.type; - SET_OBJ_IP_AND_HOST_PORT(request, node, action, node); + SET_OBJ_IP_AND_HOST_PORT(request, node1, action, node1); request.config = pc; host_port target; - GET_HOST_PORT(action, target, target); + GET_HOST_PORT(action, target1, target); send_proposal(target, request); } @@ -1470,7 +1470,7 @@ void server_state::request_check(const partition_configuration &old_pc, host_port old_primary; GET_HOST_PORT(old_pc, primary, old_primary); host_port req_node; - GET_HOST_PORT(request, node, req_node); + GET_HOST_PORT(request, node1, req_node); std::vector old_secondaries; GET_HOST_PORTS(old_pc, secondaries, old_secondaries); switch (request.type) { @@ -1521,7 +1521,7 @@ void server_state::update_configuration_locally( health_status new_health_status = partition_health_status(new_pc, min_2pc_count); host_port node; - GET_HOST_PORT(*config_request, node, node); + GET_HOST_PORT(*config_request, node1, node); if (app.is_stateful) { CHECK(old_pc.ballot == invalid_ballot || old_pc.ballot + 1 == new_pc.ballot, @@ -1605,9 +1605,9 @@ void server_state::update_configuration_locally( } } else { CHECK_EQ(old_pc.ballot, new_pc.ballot); - const auto host_node = host_port::from_address(config_request->host_node); + const auto host_node = host_port::from_address(config_request->host_node1); // The non-stateful app is just for testing, so just check the host_node is resolvable. - CHECK(host_node, "'{}' can not be reverse resolved", config_request->host_node); + CHECK(host_node, "'{}' can not be reverse resolved", config_request->host_node1); new_pc = old_pc; partition_configuration_stateless pcs(new_pc); if (config_request->type == config_type::type::CT_ADD_SECONDARY) { @@ -1746,10 +1746,10 @@ void server_state::on_update_configuration_on_remote_reply( // ignore adding secondary if add_secondary_enable_flow_control = true } else { config_request->type = action.type; - SET_OBJ_IP_AND_HOST_PORT(*config_request, node, action, node); + SET_OBJ_IP_AND_HOST_PORT(*config_request, node1, action, node1); config_request->info = *app; host_port target; - GET_HOST_PORT(action, target, target); + GET_HOST_PORT(action, target1, target); send_proposal(target, *config_request); } } @@ -1798,7 +1798,7 @@ void server_state::drop_partition(std::shared_ptr &app, int pidx) request.info = *app; request.type = config_type::CT_DROP_PARTITION; - SET_OBJ_IP_AND_HOST_PORT(request, node, pc, primary); + SET_OBJ_IP_AND_HOST_PORT(request, node1, pc, primary); request.config = pc; for (const auto &secondary : pc.hp_secondaries) { @@ -1870,7 +1870,7 @@ void server_state::downgrade_primary_to_inactive(std::shared_ptr &app request.info = *app; request.config = pc; request.type = config_type::CT_DOWNGRADE_TO_INACTIVE; - SET_OBJ_IP_AND_HOST_PORT(request, node, pc, primary); + SET_OBJ_IP_AND_HOST_PORT(request, node1, pc, primary); request.config.ballot++; RESET_IP_AND_HOST_PORT(request.config, primary); maintain_drops(request.config.hp_last_drops, pc.hp_primary, request.type); @@ -1887,8 +1887,8 @@ void server_state::downgrade_secondary_to_inactive(std::shared_ptr &a int pidx, const host_port &node) { - partition_configuration &pc = app->pcs[pidx]; - config_context &cc = app->helpers->contexts[pidx]; + const partition_configuration &pc = app->pcs[pidx]; + const config_context &cc = app->helpers->contexts[pidx]; host_port primary; GET_HOST_PORT(pc, primary, primary); @@ -1898,12 +1898,10 @@ void server_state::downgrade_secondary_to_inactive(std::shared_ptr &a request.info = *app; request.config = pc; request.type = config_type::CT_DOWNGRADE_TO_INACTIVE; - SET_IP_AND_HOST_PORT_BY_DNS(request, node, node); - host_port primary; - GET_HOST_PORT(pc, primary, primary); + SET_IP_AND_HOST_PORT_BY_DNS(request, node1, node); send_proposal(primary, request); } else { - LOG_INFO("gpid({}.{}) is syncing with remote storage, ignore the remove seconary({})", + LOG_INFO("gpid({}.{}) is syncing with remote storage, ignore the remove secondary({})", app->app_id, pidx, node); @@ -1917,8 +1915,8 @@ void server_state::downgrade_stateless_nodes(std::shared_ptr &app, auto req = std::make_shared(); req->info = *app; req->type = config_type::CT_REMOVE; - req->host_node = dsn::dns_resolver::instance().resolve_address(node); - RESET_IP_AND_HOST_PORT(*req, node); + req->host_node1 = dsn::dns_resolver::instance().resolve_address(node); + RESET_IP_AND_HOST_PORT(*req, node1); req->config = app->pcs[pidx]; config_context &cc = app->helpers->contexts[pidx]; @@ -1929,12 +1927,12 @@ void server_state::downgrade_stateless_nodes(std::shared_ptr &app, GET_HOST_PORTS(pc, secondaries, secondaries); for (; i < secondaries.size(); ++i) { if (secondaries[i] == node) { - SET_OBJ_IP_AND_HOST_PORT(*req, node, pc, last_drops[i]); + SET_OBJ_IP_AND_HOST_PORT(*req, node1, pc, last_drops[i]); break; } } host_port req_node; - GET_HOST_PORT(*req, node, req_node); + GET_HOST_PORT(*req, node1, req_node); CHECK(req_node, "invalid node: {}", req_node); // remove host_node & node from secondaries/last_drops, as it will be sync to remote // storage @@ -1954,7 +1952,7 @@ void server_state::downgrade_stateless_nodes(std::shared_ptr &app, LOG_WARNING("gpid({}) is syncing another request with remote, cancel it due to meta is " "removing host({}) worker({})", pc.pid, - req->host_node, + req->host_node1, req_node); cc.cancel_sync(); } @@ -2015,8 +2013,8 @@ void server_state::on_update_configuration( msg->release_ref(); return; } else { - maintain_drops(cfg_request->config.hp_last_drops, cfg_request->hp_node, cfg_request->type); - maintain_drops(cfg_request->config.last_drops, cfg_request->node, cfg_request->type); + maintain_drops(cfg_request->config.hp_last_drops, cfg_request->hp_node1, cfg_request->type); + maintain_drops(cfg_request->config.last_drops, cfg_request->node1, cfg_request->type); } if (response.err != ERR_IO_PENDING) { @@ -2323,7 +2321,7 @@ server_state::sync_apps_from_replica_nodes(const std::vector &re auto app_query_req = std::make_unique(); SET_IP_AND_HOST_PORT( - *app_query_req, meta_server, dsn_primary_address(), dsn_primary_host_port()); + *app_query_req, meta_server1, dsn_primary_address(), dsn_primary_host_port()); query_app_info_rpc app_rpc(std::move(app_query_req), RPC_QUERY_APP_INFO); const auto addr = dsn::dns_resolver::instance().resolve_address(replica_nodes[i]); app_rpc.call(addr, @@ -2432,13 +2430,14 @@ server_state::sync_apps_from_replica_nodes(const std::vector &re void server_state::on_start_recovery(const configuration_recovery_request &req, configuration_recovery_response &resp) { + std::vector recovery_nodes; + GET_HOST_PORTS(req, recovery_nodes1, recovery_nodes); + LOG_INFO("start recovery, node_count = {}, skip_bad_nodes = {}, skip_lost_partitions = {}", - req.recovery_nodes.size(), + recovery_nodes.size(), req.skip_bad_nodes ? "true" : "false", req.skip_lost_partitions ? "true" : "false"); - std::vector recovery_nodes; - GET_HOST_PORTS(req, recovery_nodes, recovery_nodes); resp.err = sync_apps_from_replica_nodes( recovery_nodes, req.skip_bad_nodes, req.skip_lost_partitions, resp.hint_message); @@ -2595,8 +2594,8 @@ bool server_state::check_all_partitions() GET_HOST_PORTS(*pc, secondaries, secondaries); if (!add_secondary_proposed[i] && secondaries.empty()) { const auto &action = add_secondary_actions[i]; - CHECK(action.hp_node, ""); - if (_add_secondary_enable_flow_control && add_secondary_running_nodes[action.hp_node] >= + CHECK(action.hp_node1, ""); + if (_add_secondary_enable_flow_control && add_secondary_running_nodes[action.hp_node1] >= _add_secondary_max_count_for_one_node) { // ignore continue; @@ -2605,7 +2604,7 @@ bool server_state::check_all_partitions() send_proposal(action, *pc, *app); send_proposal_count++; add_secondary_proposed[i] = true; - add_secondary_running_nodes[action.hp_node]++; + add_secondary_running_nodes[action.hp_node1]++; } } @@ -2613,24 +2612,24 @@ bool server_state::check_all_partitions() for (int i = 0; i < add_secondary_actions.size(); ++i) { if (!add_secondary_proposed[i]) { const auto &action = add_secondary_actions[i]; - CHECK(action.hp_node, ""); + CHECK(action.hp_node1, ""); gpid pid = add_secondary_gpids[i]; const auto *pc = get_config(_all_apps, pid); - if (_add_secondary_enable_flow_control && add_secondary_running_nodes[action.hp_node] >= + if (_add_secondary_enable_flow_control && add_secondary_running_nodes[action.hp_node1] >= _add_secondary_max_count_for_one_node) { LOG_INFO("do not send {} proposal for gpid({}) for flow control reason, target = " "{}, node = {}", ::dsn::enum_to_string(action.type), pc->pid, - FMT_HOST_PORT_AND_IP(action, target), - FMT_HOST_PORT_AND_IP(action, node)); + FMT_HOST_PORT_AND_IP(action, target1), + FMT_HOST_PORT_AND_IP(action, node1)); continue; } std::shared_ptr app = get_app(pid.get_app_id()); send_proposal(action, *pc, *app); send_proposal_count++; add_secondary_proposed[i] = true; - add_secondary_running_nodes[action.hp_node]++; + add_secondary_running_nodes[action.hp_node1]++; } } diff --git a/src/meta/test/meta_split_service_test.cpp b/src/meta/test/meta_split_service_test.cpp index 5eb3940a7b..2d35db0f78 100644 --- a/src/meta/test/meta_split_service_test.cpp +++ b/src/meta/test/meta_split_service_test.cpp @@ -506,7 +506,7 @@ TEST_F(meta_split_service_test, on_config_sync_test) info1.pid = pid1; info2.pid = pid2; configuration_query_by_node_request req; - SET_IP_AND_HOST_PORT_BY_DNS(req, node, NODE); + SET_IP_AND_HOST_PORT_BY_DNS(req, node1, NODE); req.__isset.stored_replicas = true; req.stored_replicas.emplace_back(info1); req.stored_replicas.emplace_back(info2); diff --git a/src/replica/replica_config.cpp b/src/replica/replica_config.cpp index 49ad94c2cb..a0b1ae82c9 100644 --- a/src/replica/replica_config.cpp +++ b/src/replica/replica_config.cpp @@ -151,7 +151,7 @@ void replica::on_config_proposal(configuration_update_request &proposal) void replica::assign_primary(configuration_update_request &proposal) { host_port node; - GET_HOST_PORT(proposal, node, node); + GET_HOST_PORT(proposal, node1, node); CHECK_EQ(node, _stub->primary_host_port()); if (status() == partition_status::PS_PRIMARY) { @@ -204,7 +204,7 @@ void replica::add_potential_secondary(const configuration_update_request &propos CHECK(_primary_states.pc.__isset.hp_secondaries, ""); CHECK(secondaries == _primary_states.pc.hp_secondaries, ""); host_port node; - GET_HOST_PORT(proposal, node, node); + GET_HOST_PORT(proposal, node1, node); CHECK(!_primary_states.check_exist(node, partition_status::PS_PRIMARY), "node = {}", node); CHECK(!_primary_states.check_exist(node, partition_status::PS_SECONDARY), "node = {}", node); @@ -246,7 +246,7 @@ void replica::add_potential_secondary(const configuration_update_request &propos group_check_request request; request.app = _app_info; - SET_OBJ_IP_AND_HOST_PORT(request, node, proposal, node); + SET_OBJ_IP_AND_HOST_PORT(request, node1, proposal, node1); _primary_states.get_replica_config( partition_status::PS_POTENTIAL_SECONDARY, request.config, state.signature); request.last_committed_decree = last_committed_decree(); diff --git a/src/replica/replica_failover.cpp b/src/replica/replica_failover.cpp index e563336f46..07065a5e22 100644 --- a/src/replica/replica_failover.cpp +++ b/src/replica/replica_failover.cpp @@ -86,7 +86,7 @@ void replica::handle_remote_failure(partition_status::type st, enum_to_string(st)); { configuration_update_request request; - SET_IP_AND_HOST_PORT_BY_DNS(request, node, node); + SET_IP_AND_HOST_PORT_BY_DNS(request, node1, node); request.type = config_type::CT_DOWNGRADE_TO_INACTIVE; request.config = _primary_states.pc; downgrade_to_inactive_on_primary(request); diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 02b75bf916..73aee61793 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -845,7 +845,7 @@ void replica_stub::on_config_proposal(const configuration_update_request &propos proposal.config.pid, _primary_host_port_cache, enum_to_string(proposal.type), - FMT_HOST_PORT_AND_IP(proposal, node)); + FMT_HOST_PORT_AND_IP(proposal, node1)); return; } @@ -853,7 +853,7 @@ void replica_stub::on_config_proposal(const configuration_update_request &propos proposal.config.pid, _primary_host_port_cache, enum_to_string(proposal.type), - FMT_HOST_PORT_AND_IP(proposal, node)); + FMT_HOST_PORT_AND_IP(proposal, node1)); // Normalize the partition_configuration type 'config' before using it. configuration_update_request normalized_proposal = proposal; @@ -991,7 +991,7 @@ void replica_stub::on_query_app_info(query_app_info_rpc rpc) const query_app_info_request &req = rpc.request(); query_app_info_response &resp = rpc.response(); - LOG_INFO("got query app info request from ({})", FMT_HOST_PORT_AND_IP(req, meta_server)); + LOG_INFO("got query app info request from ({})", FMT_HOST_PORT_AND_IP(req, meta_server1)); resp.err = dsn::ERR_OK; std::set visited_apps; { @@ -1271,7 +1271,7 @@ void replica_stub::query_configuration_by_node() dsn::message_ex *msg = dsn::message_ex::create_request(RPC_CM_CONFIG_SYNC); configuration_query_by_node_request req; - SET_IP_AND_HOST_PORT(req, node, primary_address(), _primary_host_port); + SET_IP_AND_HOST_PORT(req, node1, primary_address(), _primary_host_port); // TODO: send stored replicas may cost network, we shouldn't config the frequency get_local_replicas(req.stored_replicas); @@ -1479,7 +1479,7 @@ void replica_stub::remove_replica_on_meta_server(const app_info &info, request->info = info; request->config = pc; request->config.ballot++; - SET_IP_AND_HOST_PORT(*request, node, primary_address(), _primary_host_port); + SET_IP_AND_HOST_PORT(*request, node1, primary_address(), _primary_host_port); request->type = config_type::CT_DOWNGRADE_TO_INACTIVE; if (_primary_host_port == pc.hp_primary) {