From cbc4273722a41d7fdef968df43395ba16179bead Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Mon, 24 Jun 2024 15:13:00 +0800 Subject: [PATCH] 3 --- src/meta/meta_bulk_load_service.cpp | 7 +- .../balancer_simulator/balancer_simulator.cpp | 7 +- src/replica/bulk_load/replica_bulk_loader.cpp | 69 +++++++++++-------- src/replica/duplication/replica_follower.cpp | 15 ++-- src/replica/duplication/replica_follower.h | 5 +- src/replica/replica_2pc.cpp | 18 ++--- src/replica/replica_backup.cpp | 9 ++- src/replica/split/replica_split_manager.cpp | 39 ++++++----- src/test/kill_test/kill_testor.cpp | 4 +- 9 files changed, 106 insertions(+), 67 deletions(-) diff --git a/src/meta/meta_bulk_load_service.cpp b/src/meta/meta_bulk_load_service.cpp index 00a04a5d9c..3e4e90f60e 100644 --- a/src/meta/meta_bulk_load_service.cpp +++ b/src/meta/meta_bulk_load_service.cpp @@ -1202,7 +1202,9 @@ bool bulk_load_service::check_ever_ingestion_succeed(const partition_configurati } std::vector current_nodes; - current_nodes.emplace_back(pc.hp_primary1); + dsn::host_port primary; + GET_HOST_PORT(pc, primary1, primary); + current_nodes.emplace_back(primary); std::vector secondaries; GET_HOST_PORTS(pc, secondaries1, secondaries); for (const auto &secondary : secondaries) { @@ -1275,12 +1277,11 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g host_port primary; GET_HOST_PORT(pc, primary1, primary); - ballot meta_ballot = pc.ballot; tasking::enqueue( LPC_BULK_LOAD_INGESTION, _meta_svc->tracker(), std::bind( - &bulk_load_service::send_ingestion_request, this, app_name, pid, primary, meta_ballot), + &bulk_load_service::send_ingestion_request, this, app_name, pid, primary, pc.ballot), 0, std::chrono::seconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL)); } diff --git a/src/meta/test/balancer_simulator/balancer_simulator.cpp b/src/meta/test/balancer_simulator/balancer_simulator.cpp index 8f956ae7cc..31a7b0a4ab 100644 --- a/src/meta/test/balancer_simulator/balancer_simulator.cpp +++ b/src/meta/test/balancer_simulator/balancer_simulator.cpp @@ -112,10 +112,12 @@ void generate_balanced_apps(/*out*/ app_mapper &apps, for (auto &pc : app->pcs) { temp.clear(); - while (pc.hp_secondaries1.size() + 1 < pc.max_replica_count) { + std::vector secondaries; + GET_HOST_PORTS(pc, secondaries1, secondaries); + while (secondaries.size() + 1 < pc.max_replica_count) { const auto &n = pq2.pop(); if (!is_member(pc, n)) { - pc.hp_secondaries1.push_back(n); + secondaries.push_back(n); nodes[n].put_partition(pc.pid, false); } temp.push_back(n); @@ -154,6 +156,7 @@ void random_move_primary(app_mapper &apps, node_mapper &nodes, int primary_move_ for (auto &pc : app.pcs) { int n = random32(1, space_size) / 100; if (n < primary_move_ratio) { + CHECK(pc.hp_primary1, ""); int indice = random32(0, 1); nodes[pc.hp_primary1].remove_partition(pc.pid, true); std::swap(pc.primary1, pc.secondaries1[indice]); diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp b/src/replica/bulk_load/replica_bulk_loader.cpp index 92d3b02133..40f82c066d 100644 --- a/src/replica/bulk_load/replica_bulk_loader.cpp +++ b/src/replica/bulk_load/replica_bulk_loader.cpp @@ -188,14 +188,17 @@ void replica_bulk_loader::broadcast_group_bulk_load(const bulk_load_request &met LOG_INFO_PREFIX("start to broadcast group bulk load"); - for (const auto &hp : _replica->_primary_states.pc.hp_secondaries1) { - if (hp == _stub->primary_host_port()) + std::vector secondaries; + GET_HOST_PORTS(_replica->_primary_states.pc, secondaries1, secondaries); + for (const auto &secondary : secondaries) { + if (secondary == _stub->primary_host_port()) { continue; + } auto request = std::make_unique(); request->app_name = _replica->_app_info.app_name; - const auto &addr = dsn::dns_resolver::instance().resolve_address(hp); - SET_IP_AND_HOST_PORT(*request, target, addr, hp); + const auto &addr = dsn::dns_resolver::instance().resolve_address(secondary); + SET_IP_AND_HOST_PORT(*request, target, addr, secondary); _replica->_primary_states.get_replica_config(partition_status::PS_SECONDARY, request->config); request->cluster_name = meta_req.cluster_name; @@ -203,14 +206,14 @@ void replica_bulk_loader::broadcast_group_bulk_load(const bulk_load_request &met request->meta_bulk_load_status = meta_req.meta_bulk_load_status; request->remote_root_path = meta_req.remote_root_path; - LOG_INFO_PREFIX("send group_bulk_load_request to {}({})", hp, addr); + LOG_INFO_PREFIX("send group_bulk_load_request to {}({})", secondary, addr); group_bulk_load_rpc rpc( std::move(request), RPC_GROUP_BULK_LOAD, 0_ms, 0, get_gpid().thread_hash()); auto callback_task = rpc.call(addr, tracker(), [this, rpc](error_code err) mutable { on_group_bulk_load_reply(err, rpc.request(), rpc.response()); }); - _replica->_primary_states.group_bulk_load_pending_replies[hp] = callback_task; + _replica->_primary_states.group_bulk_load_pending_replies[secondary] = callback_task; } } @@ -740,8 +743,10 @@ void replica_bulk_loader::handle_bulk_load_finish(bulk_load_status::type new_sta } if (status() == partition_status::PS_PRIMARY) { - for (const auto &target_hp : _replica->_primary_states.pc.hp_secondaries1) { - _replica->_primary_states.reset_node_bulk_load_states(target_hp); + std::vector secondaries; + GET_HOST_PORTS(_replica->_primary_states.pc, secondaries1, secondaries); + for (const auto &secondary : secondaries) { + _replica->_primary_states.reset_node_bulk_load_states(secondary); } } @@ -938,16 +943,18 @@ void replica_bulk_loader::report_group_download_progress(/*out*/ bulk_load_respo primary_state.download_status); int32_t total_progress = primary_state.download_progress; - for (const auto &target_hp : _replica->_primary_states.pc.hp_secondaries1) { + std::vector secondaries; + GET_HOST_PORTS(_replica->_primary_states.pc, secondaries1, secondaries); + for (const auto &secondary : secondaries) { const auto &secondary_state = - _replica->_primary_states.secondary_bulk_load_states[target_hp]; + _replica->_primary_states.secondary_bulk_load_states[secondary]; int32_t s_progress = secondary_state.__isset.download_progress ? secondary_state.download_progress : 0; error_code s_status = secondary_state.__isset.download_status ? secondary_state.download_status : ERR_OK; LOG_INFO_PREFIX( - "secondary = {}, download progress = {}%, status={}", target_hp, s_progress, s_status); - SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, target_hp, secondary_state); + "secondary = {}, download progress = {}%, status={}", secondary, s_progress, s_status); + SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, secondary, secondary_state); total_progress += s_progress; } @@ -978,19 +985,20 @@ void replica_bulk_loader::report_group_ingestion_status(/*out*/ bulk_load_respon FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary1), enum_to_string(primary_state.ingest_status)); + std::vector secondaries; + GET_HOST_PORTS(_replica->_primary_states.pc, secondaries1, secondaries); bool is_group_ingestion_finish = (primary_state.ingest_status == ingestion_status::IS_SUCCEED) && - (_replica->_primary_states.pc.hp_secondaries1.size() + 1 == - _replica->_primary_states.pc.max_replica_count); - for (const auto &target_hp : _replica->_primary_states.pc.hp_secondaries1) { + (secondaries.size() + 1 == _replica->_primary_states.pc.max_replica_count); + for (const auto &secondary : secondaries) { const auto &secondary_state = - _replica->_primary_states.secondary_bulk_load_states[target_hp]; + _replica->_primary_states.secondary_bulk_load_states[secondary]; ingestion_status::type ingest_status = secondary_state.__isset.ingest_status ? secondary_state.ingest_status : ingestion_status::IS_INVALID; LOG_INFO_PREFIX( - "secondary = {}, ingestion status={}", target_hp, enum_to_string(ingest_status)); - SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, target_hp, secondary_state); + "secondary = {}, ingestion status={}", secondary, enum_to_string(ingest_status)); + SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, secondary, secondary_state); is_group_ingestion_finish &= (ingest_status == ingestion_status::IS_SUCCEED); } response.__set_is_group_ingestion_finished(is_group_ingestion_finish); @@ -1025,17 +1033,18 @@ void replica_bulk_loader::report_group_cleaned_up(bulk_load_response &response) FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary1), primary_state.is_cleaned_up); - bool group_flag = - (primary_state.is_cleaned_up) && (_replica->_primary_states.pc.hp_secondaries1.size() + 1 == - _replica->_primary_states.pc.max_replica_count); - for (const auto &target_hp : _replica->_primary_states.pc.hp_secondaries1) { + std::vector secondaries; + GET_HOST_PORTS(_replica->_primary_states.pc, secondaries1, secondaries); + bool group_flag = (primary_state.is_cleaned_up) && + (secondaries.size() + 1 == _replica->_primary_states.pc.max_replica_count); + for (const auto &secondary : secondaries) { const auto &secondary_state = - _replica->_primary_states.secondary_bulk_load_states[target_hp]; + _replica->_primary_states.secondary_bulk_load_states[secondary]; bool is_cleaned_up = secondary_state.__isset.is_cleaned_up ? secondary_state.is_cleaned_up : false; LOG_INFO_PREFIX( - "secondary = {}, bulk load states cleaned_up = {}", target_hp, is_cleaned_up); - SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, target_hp, secondary_state); + "secondary = {}, bulk load states cleaned_up = {}", secondary, is_cleaned_up); + SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, secondary, secondary_state); group_flag &= is_cleaned_up; } LOG_INFO_PREFIX("group bulk load states cleaned_up = {}", group_flag); @@ -1067,12 +1076,14 @@ void replica_bulk_loader::report_group_is_paused(bulk_load_response &response) bool group_is_paused = primary_state.is_paused && (_replica->_primary_states.pc.hp_secondaries1.size() + 1 == _replica->_primary_states.pc.max_replica_count); - for (const auto &target_hp : _replica->_primary_states.pc.hp_secondaries1) { + std::vector secondaries; + GET_HOST_PORTS(_replica->_primary_states.pc, secondaries1, secondaries); + for (const auto &secondary : secondaries) { partition_bulk_load_state secondary_state = - _replica->_primary_states.secondary_bulk_load_states[target_hp]; + _replica->_primary_states.secondary_bulk_load_states[secondary]; bool is_paused = secondary_state.__isset.is_paused ? secondary_state.is_paused : false; - LOG_INFO_PREFIX("secondary = {}, bulk_load is_paused = {}", target_hp, is_paused); - SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, target_hp, secondary_state); + LOG_INFO_PREFIX("secondary = {}, bulk_load is_paused = {}", secondary, is_paused); + SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, secondary, secondary_state); group_is_paused &= is_paused; } LOG_INFO_PREFIX("group bulk load is_paused = {}", group_is_paused); diff --git a/src/replica/duplication/replica_follower.cpp b/src/replica/duplication/replica_follower.cpp index 19ddaed46b..423b3520d2 100644 --- a/src/replica/duplication/replica_follower.cpp +++ b/src/replica/duplication/replica_follower.cpp @@ -178,7 +178,9 @@ error_code replica_follower::update_master_replica_config(error_code err, query_ return ERR_INCONSISTENT_STATE; } - if (dsn_unlikely(!resp.partitions[0].hp_primary1)) { + dsn::host_port primary; + GET_HOST_PORT(resp.partitions[0], primary1, primary); + if (dsn_unlikely(!primary)) { LOG_ERROR_PREFIX("master[{}] partition address is invalid", master_replica_name()); return ERR_INVALID_STATE; } @@ -203,9 +205,14 @@ void replica_follower::copy_master_replica_checkpoint() dsn::message_ex *msg = dsn::message_ex::create_request(RPC_QUERY_LAST_CHECKPOINT_INFO, 0, _pc.pid.thread_hash()); dsn::marshall(msg, request); - rpc::call(_pc.primary1, msg, &_tracker, [&](error_code err, learn_response &&resp) mutable { - nfs_copy_checkpoint(err, std::move(resp)); - }); + dsn::host_port primary; + GET_HOST_PORT(_pc, primary1, primary); + rpc::call(dsn::dns_resolver::instance().resolve_address(primary), + msg, + &_tracker, + [&](error_code err, learn_response &&resp) mutable { + nfs_copy_checkpoint(err, std::move(resp)); + }); } // ThreadPool: THREAD_POOL_DEFAULT diff --git a/src/replica/duplication/replica_follower.h b/src/replica/duplication/replica_follower.h index 4ae467493c..bd4c82b633 100644 --- a/src/replica/duplication/replica_follower.h +++ b/src/replica/duplication/replica_follower.h @@ -26,6 +26,7 @@ #include "common/gpid.h" #include "dsn.layer2_types.h" #include "replica/replica_base.h" +#include "runtime/rpc/dns_resolver.h" #include "runtime/rpc/rpc_host_port.h" #include "runtime/task/task_tracker.h" #include "utils/error_code.h" @@ -78,7 +79,9 @@ class replica_follower : replica_base std::string master_replica_name() { std::string app_info = fmt::format("{}.{}", _master_cluster_name, _master_app_name); - if (_pc.hp_primary1) { + dsn::host_port primary; + GET_HOST_PORT(_pc, primary1, primary); + if (primary) { return fmt::format("{}({}|{})", app_info, FMT_HOST_PORT_AND_IP(_pc, primary1), _pc.pid); } return app_info; diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 85f9582344..bd75453dfc 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -188,6 +188,8 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) return; } + std::vector secondaries; + GET_HOST_PORTS(_primary_states.pc, secondaries1, secondaries); if (request->rpc_code() == dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) { auto cur_bulk_load_status = _bulk_loader->get_bulk_load_status(); if (cur_bulk_load_status != bulk_load_status::BLS_DOWNLOADED && @@ -200,8 +202,7 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) LOG_INFO_PREFIX("receive bulk load ingestion request"); // bulk load ingestion request requires that all secondaries should be alive - if (static_cast(_primary_states.pc.hp_secondaries1.size()) + 1 < - _primary_states.pc.max_replica_count) { + if (static_cast(secondaries.size()) + 1 < _primary_states.pc.max_replica_count) { response_client_write(request, ERR_NOT_ENOUGH_MEMBER); return; } @@ -209,7 +210,7 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) _bulk_load_ingestion_start_time_ms = dsn_now_ms(); } - if (static_cast(_primary_states.pc.hp_secondaries1.size()) + 1 < + if (static_cast(secondaries.size()) + 1 < _options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count)) { response_client_write(request, ERR_NOT_ENOUGH_MEMBER); return; @@ -257,6 +258,8 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c mu->set_is_sync_to_child(_primary_states.sync_send_write_request); // check bounded staleness + std::vector secondaries; + GET_HOST_PORTS(_primary_states.pc, secondaries1, secondaries); if (mu->data.header.decree > last_committed_decree() + FLAGS_staleness_for_commit) { err = ERR_CAPACITY_EXCEEDED; goto ErrOut; @@ -269,8 +272,7 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c break; } LOG_INFO_PREFIX("try to prepare bulk load mutation({})", mu->name()); - if (static_cast(_primary_states.pc.hp_secondaries1.size()) + 1 < - _primary_states.pc.max_replica_count) { + if (static_cast(secondaries.size()) + 1 < _primary_states.pc.max_replica_count) { err = ERR_NOT_ENOUGH_MEMBER; break; } @@ -282,7 +284,7 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c // stop prepare if there are too few replicas unless it's a reconciliation // for reconciliation, we should ensure every prepared mutation to be committed // please refer to PacificA paper - if (static_cast(_primary_states.pc.hp_secondaries1.size()) + 1 < + if (static_cast(secondaries.size()) + 1 < _options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count) && !reconciliation) { err = ERR_NOT_ENOUGH_MEMBER; @@ -299,8 +301,8 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c // remote prepare mu->set_prepare_ts(); - mu->set_left_secondary_ack_count((unsigned int)_primary_states.pc.hp_secondaries1.size()); - for (const auto &secondary : _primary_states.pc.hp_secondaries1) { + mu->set_left_secondary_ack_count(static_cast(secondaries.size())); + for (const auto &secondary : secondaries) { send_prepare_message(secondary, partition_status::PS_SECONDARY, mu, diff --git a/src/replica/replica_backup.cpp b/src/replica/replica_backup.cpp index 9527f743c1..ac13f63661 100644 --- a/src/replica/replica_backup.cpp +++ b/src/replica/replica_backup.cpp @@ -255,10 +255,15 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo void replica::send_backup_request_to_secondary(const backup_request &request) { - for (const auto &target_address : _primary_states.pc.secondaries1) { + std::vector secondaries; + GET_HOST_PORTS(_primary_states.pc, secondaries1, secondaries); + for (const auto &secondary : secondaries) { // primary will send backup_request to secondary periodically // so, we shouldn't handle the response - rpc::call_one_way_typed(target_address, RPC_COLD_BACKUP, request, get_gpid().thread_hash()); + rpc::call_one_way_typed(dsn::dns_resolver::instance().resolve_address(secondary), + RPC_COLD_BACKUP, + request, + get_gpid().thread_hash()); } } diff --git a/src/replica/split/replica_split_manager.cpp b/src/replica/split/replica_split_manager.cpp index 393e6e4af2..228aaf1f64 100644 --- a/src/replica/split/replica_split_manager.cpp +++ b/src/replica/split/replica_split_manager.cpp @@ -775,14 +775,18 @@ void replica_split_manager::update_child_group_partition_count( return; } - if (!_replica->_primary_states.learners.empty() || - _replica->_primary_states.pc.hp_secondaries1.size() + 1 < - _replica->_primary_states.pc.max_replica_count) { - LOG_ERROR_PREFIX("there are {} learners or not have enough secondaries(count is {})", - _replica->_primary_states.learners.size(), - _replica->_primary_states.pc.hp_secondaries1.size()); - parent_handle_split_error( - "update_child_group_partition_count failed, have learner or lack of secondary", true); + if (!_replica->_primary_states.learners.empty()) { + LOG_ERROR_PREFIX("there are {} learners", _replica->_primary_states.learners.size()); + parent_handle_split_error("update_child_group_partition_count failed, have learner", true); + return; + } + + std::vector secondaries; + GET_HOST_PORTS(_replica->_primary_states.pc, secondaries1, secondaries); + if (secondaries.size() + 1 < _replica->_primary_states.pc.max_replica_count) { + LOG_ERROR_PREFIX("there are not enough secondaries(count is {})", secondaries.size()); + parent_handle_split_error("update_child_group_partition_count failed, lack of secondary", + true); return; } @@ -1221,14 +1225,17 @@ void replica_split_manager::trigger_primary_parent_split( _meta_split_status = meta_split_status; if (meta_split_status == split_status::SPLITTING) { - if (!_replica->_primary_states.learners.empty() || - _replica->_primary_states.pc.hp_secondaries1.size() + 1 < - _replica->_primary_states.pc.max_replica_count) { - LOG_WARNING_PREFIX( - "there are {} learners or not have enough secondaries(count is {}), wait for " - "next round", - _replica->_primary_states.learners.size(), - _replica->_primary_states.pc.hp_secondaries1.size()); + if (!_replica->_primary_states.learners.empty()) { + LOG_WARNING_PREFIX("there are {} learners, wait for next round", + _replica->_primary_states.learners.size()); + return; + } + + std::vector secondaries; + GET_HOST_PORTS(_replica->_primary_states.pc, secondaries1, secondaries); + if (secondaries.size() + 1 < _replica->_primary_states.pc.max_replica_count) { + LOG_WARNING_PREFIX("there are not enough secondaries(count is {}), wait for next round", + secondaries.size()); return; } diff --git a/src/test/kill_test/kill_testor.cpp b/src/test/kill_test/kill_testor.cpp index da4d351b69..143293d69e 100644 --- a/src/test/kill_test/kill_testor.cpp +++ b/src/test/kill_test/kill_testor.cpp @@ -121,9 +121,9 @@ dsn::error_code kill_testor::get_partition_info(bool debug_unhealthy, healthy_partition_cnt++; } else { const auto &info = - fmt::format("gpid={}, primary={}, secondaries=[{}]], last_committed_decree={}", + fmt::format("gpid={}, primary={}, secondaries=[{}], last_committed_decree={}", pc.pid, - pc.hp_primary1, + primary, fmt::join(secondaries, ", "), pc.last_committed_decree); if (debug_unhealthy) {