diff --git a/run.sh b/run.sh index 46164ef693..623f7a2a13 100755 --- a/run.sh +++ b/run.sh @@ -812,6 +812,7 @@ function run_start_onebox() sed "s/@META_PORT@/$meta_port/;s/@REPLICA_PORT@/34800/;s/@PROMETHEUS_PORT@/$prometheus_port/" ${ROOT}/config-server.ini >config.ini $PWD/pegasus_server config.ini -app_list meta &>result & PID=$! + sleep 1 ps -ef | grep '/pegasus_server config.ini' | grep "\<$PID\>" cd .. done @@ -825,6 +826,7 @@ function run_start_onebox() sed "s/@META_PORT@/34600/;s/@REPLICA_PORT@/$replica_port/;s/@PROMETHEUS_PORT@/$prometheus_port/" ${ROOT}/config-server.ini >config.ini $PWD/pegasus_server config.ini -app_list replica &>result & PID=$! + sleep 1 ps -ef | grep '/pegasus_server config.ini' | grep "\<$PID\>" cd .. done diff --git a/src/common/json_helper.h b/src/common/json_helper.h index fd88a5feab..a0a061e80b 100644 --- a/src/common/json_helper.h +++ b/src/common/json_helper.h @@ -711,7 +711,10 @@ NON_MEMBER_JSON_SERIALIZATION(dsn::partition_configuration, secondaries, last_drops, last_committed_decree, - partition_flags) + partition_flags, + hp_primary, + hp_secondaries, + hp_last_drops) NON_MEMBER_JSON_SERIALIZATION(dsn::app_info, status, diff --git a/src/common/replication_other_types.h b/src/common/replication_other_types.h index a5d8ba5fec..1a0fd10e51 100644 --- a/src/common/replication_other_types.h +++ b/src/common/replication_other_types.h @@ -79,7 +79,9 @@ inline bool is_partition_config_equal(const partition_configuration &pc1, return false; // last_drops is not considered into equality check return pc1.ballot == pc2.ballot && pc1.pid == pc2.pid && - pc1.max_replica_count == pc2.max_replica_count && pc1.hp_primary == pc2.hp_primary && + pc1.max_replica_count == pc2.max_replica_count && + pc1.primary == pc2.primary && pc1.hp_primary == pc2.hp_primary && + pc1.secondaries.size() == pc2.secondaries.size() && pc1.hp_secondaries.size() == pc2.hp_secondaries.size() && pc1.last_committed_decree == pc2.last_committed_decree; } diff --git a/src/meta/app_balance_policy.cpp b/src/meta/app_balance_policy.cpp index 47d5ac11e0..0cfad24e9c 100644 --- a/src/meta/app_balance_policy.cpp +++ b/src/meta/app_balance_policy.cpp @@ -80,6 +80,7 @@ void app_balance_policy::balance(bool checker, const meta_view *global_view, mig { init(global_view, list); const app_mapper &apps = *_global_view->apps; + if (!execute_balance(apps, checker, _balancer_in_turn, diff --git a/src/meta/cluster_balance_policy.cpp b/src/meta/cluster_balance_policy.cpp index fe31671025..e6536ffee1 100644 --- a/src/meta/cluster_balance_policy.cpp +++ b/src/meta/cluster_balance_policy.cpp @@ -545,8 +545,14 @@ bool cluster_balance_policy::apply_move(const move_info &move, partition_configuration pc; pc.pid = move.pid; pc.hp_primary = primary_hp; - auto source_addr = _svc->get_dns_resolver()->resolve_address(source); - auto target_addr = _svc->get_dns_resolver()->resolve_address(target); + std::shared_ptr resolver; + if (_svc == nullptr) { + resolver = std::make_shared(); + } else { + resolver = _svc->get_dns_resolver(); + } + auto source_addr = resolver->resolve_address(source); + auto target_addr = resolver->resolve_address(target); list[move.pid] = generate_balancer_request(*_global_view->apps, pc, move.type, source_addr, target_addr, source, target); _migration_result->emplace( move.pid, generate_balancer_request(*_global_view->apps, pc, move.type, source_addr, target_addr, source, target)); diff --git a/src/meta/duplication/meta_duplication_service.cpp b/src/meta/duplication/meta_duplication_service.cpp index e011707069..d803e2c981 100644 --- a/src/meta/duplication/meta_duplication_service.cpp +++ b/src/meta/duplication/meta_duplication_service.cpp @@ -426,12 +426,12 @@ void meta_duplication_service::check_follower_app_if_create_completed( while (count-- > 0) { partition_configuration p; p.primary = rpc_address("127.0.0.1", 34801); - p.secondaries.emplace_back(rpc_address("127.0.0.2", 34801)); - p.secondaries.emplace_back(rpc_address("127.0.0.3", 34801)); + p.secondaries.emplace_back(rpc_address("127.0.0.1", 34802)); + p.secondaries.emplace_back(rpc_address("127.0.0.1", 34803)); p.__set_hp_primary(host_port("localhost", 34801)); p.__set_hp_secondaries(std::vector()); - p.hp_secondaries.emplace_back(host_port("127.0.0.2", 34801)); - p.hp_secondaries.emplace_back(host_port("127.0.0.3", 34801)); + p.hp_secondaries.emplace_back(host_port("localhost", 34802)); + p.hp_secondaries.emplace_back(host_port("localhost", 34803)); resp.partitions.emplace_back(p); } }); diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index 89847330d5..a878b130fc 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -594,6 +594,7 @@ error_code server_state::sync_apps_to_remote_storage() init_app_partition_node(app, i, init_callback); } } + tracker.wait_outstanding_tasks(); t = _meta_svc->get_remote_storage()->set_data(_apps_root, blob(unlock_state, 0, strlen(unlock_state)), @@ -624,6 +625,9 @@ dsn::error_code server_state::sync_apps_from_remote_storage() const blob &value) mutable { if (ec == ERR_OK) { partition_configuration pc; + pc.__isset.hp_secondaries = true; + pc.__isset.hp_last_drops = true; + pc.__isset.hp_primary = true; dsn::json::json_forwarder::decode(value, pc); CHECK(pc.pid.get_app_id() == app->app_id && @@ -1097,6 +1101,7 @@ void server_state::init_app_partition_node(std::shared_ptr &app, std::string app_partition_path = get_partition_path(*app, pidx); dsn::blob value = dsn::json::json_forwarder::encode(app->partitions[pidx]); + _meta_svc->get_remote_storage()->create_node( app_partition_path, LPC_META_STATE_HIGH, on_create_app_partition, value); } diff --git a/src/meta/test/backup_test.cpp b/src/meta/test/backup_test.cpp index e4140f6de0..8f0acadbd8 100644 --- a/src/meta/test/backup_test.cpp +++ b/src/meta/test/backup_test.cpp @@ -189,7 +189,7 @@ class progress_liar : public meta_service public: // req is held by callback, we don't need to handle the life-time of it virtual void send_request(dsn::message_ex *req, - const rpc_address &target, + const host_port &target, const rpc_response_task_ptr &callback) { // need to handle life-time manually diff --git a/src/meta/test/balancer_validator.cpp b/src/meta/test/balancer_validator.cpp index ccfa3453b2..365734447d 100644 --- a/src/meta/test/balancer_validator.cpp +++ b/src/meta/test/balancer_validator.cpp @@ -151,6 +151,7 @@ void meta_service_test_app::balancer_validator() for (int i = 0; i < 1000000 && glb.balance({&apps, &nodes}, ml); ++i) { LOG_DEBUG("the {}th round of balancer", i); migration_check_and_apply(apps, nodes, ml, &manager); + glb.check({&apps, &nodes}, ml); LOG_DEBUG("balance checker operation count = {}", ml.size()); } diff --git a/src/meta/test/json_compacity.cpp b/src/meta/test/json_compacity.cpp index 5c2354c158..801436b21f 100644 --- a/src/meta/test/json_compacity.cpp +++ b/src/meta/test/json_compacity.cpp @@ -88,14 +88,18 @@ void meta_service_test_app::json_compacity() // 4. old pc version const char *json3 = "{\"pid\":\"1.1\",\"ballot\":234,\"max_replica_count\":3," "\"primary\":\"invalid address\",\"secondaries\":[\"127.0.0.1:6\"]," + "\"hp_primary\":\"invalid host_port\",\"hp_secondaries\":[\"localhost:6\"]," "\"last_drops\":[],\"last_committed_decree\":157}"; dsn::partition_configuration pc; dsn::json::json_forwarder::decode( dsn::blob(json3, 0, strlen(json3)), pc); ASSERT_EQ(234, pc.ballot); ASSERT_TRUE(pc.hp_primary.is_invalid()); + ASSERT_TRUE(pc.primary.is_invalid()); ASSERT_EQ(1, pc.hp_secondaries.size()); + ASSERT_EQ(1, pc.secondaries.size()); ASSERT_STREQ("127.0.0.1:6", pc.secondaries[0].to_string()); + ASSERT_STREQ("localhost:6", pc.hp_secondaries[0].to_string().c_str()); ASSERT_EQ(157, pc.last_committed_decree); ASSERT_EQ(0, pc.partition_flags); diff --git a/src/meta/test/meta_duplication_service_test.cpp b/src/meta/test/meta_duplication_service_test.cpp index 813e649d94..94753af923 100644 --- a/src/meta/test/meta_duplication_service_test.cpp +++ b/src/meta/test/meta_duplication_service_test.cpp @@ -128,11 +128,13 @@ class meta_duplication_service_test : public meta_test_base } duplication_sync_response - duplication_sync(const host_port &node, + duplication_sync(const rpc_address &addr, + const host_port &hp, std::map> confirm_list) { auto req = std::make_unique(); - req->__set_hp_node(node); + req->node = addr; + req->__set_hp_node(hp); req->confirm_list = confirm_list; duplication_sync_rpc rpc(std::move(req), RPC_CM_DUPLICATION_SYNC); @@ -539,8 +541,10 @@ TEST_F(meta_duplication_service_test, remove_dup) TEST_F(meta_duplication_service_test, duplication_sync) { + auto resolver = std::make_shared(); std::vector server_nodes = ensure_enough_alive_nodes(3); auto node = server_nodes[0]; + auto addr = resolver->resolve_address(server_nodes[0]); std::string test_app = "test_app_0"; create_app(test_app); @@ -549,7 +553,8 @@ TEST_F(meta_duplication_service_test, duplication_sync) // generate all primaries on node[0] for (partition_configuration &pc : app->partitions) { pc.ballot = random32(1, 10000); - pc.hp_primary = server_nodes[0]; + pc.primary = addr; + pc.__set_hp_primary(server_nodes[0]); pc.hp_secondaries.push_back(server_nodes[1]); pc.hp_secondaries.push_back(server_nodes[2]); } @@ -576,7 +581,7 @@ TEST_F(meta_duplication_service_test, duplication_sync) ce.confirmed_decree = 7; confirm_list[gpid(app->app_id, 3)].push_back(ce); - duplication_sync_response resp = duplication_sync(node, confirm_list); + duplication_sync_response resp = duplication_sync(addr, node, confirm_list); ASSERT_EQ(resp.err, ERR_OK); ASSERT_EQ(resp.dup_map.size(), 1); ASSERT_EQ(resp.dup_map[app->app_id].size(), 1); @@ -607,7 +612,7 @@ TEST_F(meta_duplication_service_test, duplication_sync) ce.confirmed_decree = 5; confirm_list[gpid(app->app_id, 1)].push_back(ce); - duplication_sync_response resp = duplication_sync(node, confirm_list); + duplication_sync_response resp = duplication_sync(addr, node, confirm_list); ASSERT_EQ(resp.err, ERR_OK); ASSERT_EQ(resp.dup_map.size(), 1); ASSERT_TRUE(resp.dup_map[app->app_id].find(dupid + 1) == resp.dup_map[app->app_id].end()); @@ -621,7 +626,7 @@ TEST_F(meta_duplication_service_test, duplication_sync) ce.confirmed_decree = 5; confirm_list[gpid(app->app_id + 1, 1)].push_back(ce); - duplication_sync_response resp = duplication_sync(node, confirm_list); + duplication_sync_response resp = duplication_sync(addr, node, confirm_list); ASSERT_EQ(resp.err, ERR_OK); ASSERT_EQ(resp.dup_map.size(), 1); ASSERT_TRUE(resp.dup_map.find(app->app_id + 1) == resp.dup_map.end()); @@ -637,7 +642,7 @@ TEST_F(meta_duplication_service_test, duplication_sync) ce.confirmed_decree = 5; confirm_list[gpid(app->app_id, 1)].push_back(ce); - duplication_sync_response resp = duplication_sync(node, confirm_list); + duplication_sync_response resp = duplication_sync(addr, node, confirm_list); ASSERT_EQ(resp.err, ERR_OK); ASSERT_EQ(resp.dup_map.size(), 0); } @@ -780,7 +785,7 @@ TEST_F(meta_duplication_service_test, fail_mode) pc.__set_hp_primary(node.first); } initialize_node_state(); - duplication_sync_response sync_resp = duplication_sync(node.first, {}); + duplication_sync_response sync_resp = duplication_sync(node.second, node.first, {}); ASSERT_TRUE(sync_resp.dup_map[app->app_id][dup->id].__isset.fail_mode); ASSERT_EQ(sync_resp.dup_map[app->app_id][dup->id].fail_mode, duplication_fail_mode::FAIL_SKIP); diff --git a/src/meta/test/meta_service_test.cpp b/src/meta/test/meta_service_test.cpp index 1eeb22ed24..d9cfb8c35c 100644 --- a/src/meta/test/meta_service_test.cpp +++ b/src/meta/test/meta_service_test.cpp @@ -68,7 +68,7 @@ class meta_service_test : public meta_test_base ASSERT_FALSE(_ms->check_status_and_authz(rpc)); ASSERT_EQ(app_env_rpc::forward_mail_box().size(), 1); ASSERT_EQ(app_env_rpc::forward_mail_box()[0].remote_address().to_std_string(), - "1.2.3.4:10086"); + "127.0.0.1:10086"); } fail::teardown(); diff --git a/src/meta/test/misc/misc.cpp b/src/meta/test/misc/misc.cpp index 9fe7e1b03e..5c5edae950 100644 --- a/src/meta/test/misc/misc.cpp +++ b/src/meta/test/misc/misc.cpp @@ -114,6 +114,7 @@ void generate_node_mapper( void generate_app(/*out*/ std::shared_ptr &app, const std::vector &node_list) { + auto resolver = std::make_shared(); for (dsn::partition_configuration &pc : app->partitions) { pc.ballot = random32(1, 10000); std::vector indices(3, 0); @@ -122,10 +123,15 @@ void generate_app(/*out*/ std::shared_ptr &app, indices[2] = random32(indices[1] + 1, node_list.size() - 1); int p = random32(0, 2); - pc.hp_primary = node_list[indices[p]]; - for (unsigned int i = 0; i != indices.size(); ++i) - if (i != p) + pc.__set_hp_primary(node_list[indices[p]]); + pc.__set_hp_secondaries(std::vector()); + pc.primary = resolver->resolve_address(node_list[indices[p]]); + for (unsigned int i = 0; i != indices.size(); ++i) { + if (i != p) { + pc.secondaries.push_back(resolver->resolve_address(node_list[indices[i]])); pc.hp_secondaries.push_back(node_list[indices[i]]); + } + } CHECK(!pc.hp_primary.is_invalid(), ""); CHECK(!is_secondary(pc, pc.hp_primary), ""); @@ -309,10 +315,12 @@ void proposal_action_check_and_apply(const configuration_proposal_action &act, case config_type::CT_ASSIGN_PRIMARY: CHECK_EQ(act.node, act.target); CHECK(pc.hp_primary.is_invalid(), ""); + CHECK(pc.primary.is_invalid(), ""); CHECK(pc.hp_secondaries.empty(), ""); + CHECK(pc.secondaries.empty(), ""); pc.primary = act.node; - pc.hp_primary = hp_node; + pc.__set_hp_primary(hp_node); ns = &nodes[hp_node]; CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_INACTIVE); ns->put_partition(pc.pid, true); @@ -320,9 +328,11 @@ void proposal_action_check_and_apply(const configuration_proposal_action &act, case config_type::CT_ADD_SECONDARY: CHECK_EQ(hp_target, pc.hp_primary); + CHECK_EQ(act.target, pc.primary); CHECK(!is_member(pc, hp_node), ""); pc.hp_secondaries.push_back(hp_node); + pc.secondaries.push_back(act.node); ns = &nodes[hp_node]; CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_INACTIVE); ns->put_partition(pc.pid, false); @@ -330,7 +340,9 @@ void proposal_action_check_and_apply(const configuration_proposal_action &act, break; case config_type::CT_DOWNGRADE_TO_SECONDARY: + CHECK_EQ(act.node, act.target); CHECK_EQ(hp_node, hp_target); + CHECK_EQ(act.node, pc.primary); CHECK_EQ(hp_node, pc.hp_primary); CHECK(nodes.find(hp_node) != nodes.end(), ""); CHECK(!is_secondary(pc, pc.hp_primary), ""); @@ -343,21 +355,31 @@ void proposal_action_check_and_apply(const configuration_proposal_action &act, case config_type::CT_UPGRADE_TO_PRIMARY: CHECK(pc.hp_primary.is_invalid(), ""); + CHECK(pc.primary.is_invalid(), ""); CHECK_EQ(hp_node, hp_target); + CHECK_EQ(act.node, act.target); CHECK(is_secondary(pc, hp_node), ""); CHECK(nodes.find(hp_node) != nodes.end(), ""); ns = &nodes[hp_node]; pc.hp_primary = hp_node; + pc.primary = act.node; CHECK(replica_helper::remove_node(hp_node, pc.hp_secondaries), ""); + CHECK(replica_helper::remove_node(act.node, pc.secondaries), ""); ns->put_partition(pc.pid, true); break; case config_type::CT_ADD_SECONDARY_FOR_LB: CHECK_EQ(hp_target, pc.hp_primary); + CHECK_EQ(act.target, pc.primary); CHECK(!is_member(pc, hp_node), ""); CHECK(!act.hp_node.is_invalid(), ""); + CHECK(!act.node.is_invalid(), ""); + if (!pc.__isset.hp_secondaries) { + pc.__set_hp_secondaries(std::vector()); + } pc.hp_secondaries.push_back(hp_node); + pc.secondaries.push_back(act.node); ns = &nodes[hp_node]; ns->put_partition(pc.pid, false); @@ -368,10 +390,13 @@ void proposal_action_check_and_apply(const configuration_proposal_action &act, case config_type::CT_REMOVE: case config_type::CT_DOWNGRADE_TO_INACTIVE: CHECK(!pc.hp_primary.is_invalid(), ""); + CHECK(!pc.primary.is_invalid(), ""); CHECK_EQ(pc.hp_primary, hp_target); + CHECK_EQ(pc.primary, act.target); CHECK(is_secondary(pc, hp_node), ""); CHECK(nodes.find(hp_node) != nodes.end(), ""); CHECK(replica_helper::remove_node(hp_node, pc.hp_secondaries), ""); + CHECK(replica_helper::remove_node(act.node, pc.secondaries), ""); ns = &nodes[hp_node]; CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_SECONDARY); @@ -409,10 +434,12 @@ void migration_check_and_apply(app_mapper &apps, for (unsigned int j = 0; j < proposal->action_list.size(); ++j) { configuration_proposal_action &act = proposal->action_list[j]; - LOG_DEBUG("the {}th round of action, type: {}, node: {}, target: {}", + LOG_DEBUG("the {}th round of action, type: {}, node: {}({}), target: {}({})", j, dsn::enum_to_string(act.type), + act.hp_node, act.node, + act.hp_target, act.target); proposal_action_check_and_apply(act, proposal->gpid, apps, nodes, manager); } diff --git a/src/meta/test/update_configuration_test.cpp b/src/meta/test/update_configuration_test.cpp index fb45f22c30..075dad3590 100644 --- a/src/meta/test/update_configuration_test.cpp +++ b/src/meta/test/update_configuration_test.cpp @@ -437,11 +437,12 @@ void meta_service_test_app::apply_balancer_test() // initialize data structure std::vector> node_list; + generate_node_list(node_list, 5, 10); + std::vector hps; for (const auto& p : node_list) { hps.emplace_back(p.first); } - generate_node_list(node_list, 5, 10); server_state *ss = meta_svc->_state.get(); generate_apps(ss->_all_apps, hps, 5, 5, std::pair(2, 5), false); diff --git a/src/replica/duplication/test/replica_follower_test.cpp b/src/replica/duplication/test/replica_follower_test.cpp index 793d01955e..a0f2bd8110 100644 --- a/src/replica/duplication/test/replica_follower_test.cpp +++ b/src/replica/duplication/test/replica_follower_test.cpp @@ -122,7 +122,7 @@ TEST_F(replica_follower_test, test_init_master_info) { _app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterClusterKey, "master"); _app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterMetasKey, - "127.0.0.1:34801,127.0.0.2:34801,127.0.0.3:34802"); + "127.0.0.1:34801,127.0.0.1:34802,127.0.0.1:34803"); update_mock_replica(_app_info); auto follower = _mock_replica->get_replica_follower(); @@ -130,7 +130,7 @@ TEST_F(replica_follower_test, test_init_master_info) ASSERT_EQ(follower->get_master_cluster_name(), "master"); ASSERT_TRUE(follower->is_need_duplicate()); ASSERT_TRUE(_mock_replica->is_duplication_follower()); - std::vector test_ip{"127.0.0.1:34801", "127.0.0.2:34801", "127.0.0.3:34802"}; + std::vector test_ip{"127.0.0.1:34801", "127.0.0.1:34802", "127.0.0.1:34803"}; for (int i = 0; i < follower->get_master_meta_list().size(); i++) { ASSERT_EQ(std::string(follower->get_master_meta_list()[i].to_string()), test_ip[i]); } @@ -146,7 +146,7 @@ TEST_F(replica_follower_test, test_duplicate_checkpoint) { _app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterClusterKey, "master"); _app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterMetasKey, - "127.0.0.1:34801,127.0.0.2:34801,127.0.0.3:34802"); + "127.0.0.1:34801,127.0.0.1:34802,127.0.0.1:34803"); update_mock_replica(_app_info); auto follower = _mock_replica->get_replica_follower(); @@ -166,7 +166,7 @@ TEST_F(replica_follower_test, test_async_duplicate_checkpoint_from_master_replic { _app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterClusterKey, "master"); _app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterMetasKey, - "127.0.0.1:34801,127.0.0.2:34801,127.0.0.3:34802"); + "127.0.0.1:34801,127.0.0.1:34802,127.0.0.1:34803"); update_mock_replica(_app_info); auto follower = _mock_replica->get_replica_follower(); @@ -188,7 +188,7 @@ TEST_F(replica_follower_test, test_update_master_replica_config) { _app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterClusterKey, "master"); _app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterMetasKey, - "127.0.0.1:34801,127.0.0.2:34801,127.0.0.3:34802"); + "127.0.0.1:34801,127.0.0.1:34802,127.0.0.1:34803"); update_mock_replica(_app_info); auto follower = _mock_replica->get_replica_follower(); @@ -231,8 +231,8 @@ TEST_F(replica_follower_test, test_update_master_replica_config) p.primary = rpc_address("127.0.0.1", 34801); p.__set_hp_primary(host_port("localhost", 34801)); p.__set_hp_secondaries(std::vector()); - p.hp_secondaries.emplace_back(host_port("127.0.0.2", 34801)); - p.hp_secondaries.emplace_back(host_port("127.0.0.3", 34801)); + p.hp_secondaries.emplace_back(host_port("localhost", 34802)); + p.hp_secondaries.emplace_back(host_port("localhost", 34803)); resp.partitions.emplace_back(p); ASSERT_EQ(update_master_replica_config(follower, resp), ERR_OK); ASSERT_EQ(master_replica_config(follower).primary, p.primary); @@ -244,7 +244,7 @@ TEST_F(replica_follower_test, test_nfs_copy_checkpoint) { _app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterClusterKey, "master"); _app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterMetasKey, - "127.0.0.1:34801,127.0.0.2:34801,127.0.0.3:34802"); + "127.0.0.1:34801,127.0.0.1:34802,127.0.0.1:34803"); update_mock_replica(_app_info); init_nfs(); auto follower = _mock_replica->get_replica_follower(); @@ -253,7 +253,7 @@ TEST_F(replica_follower_test, test_nfs_copy_checkpoint) auto resp = learn_response(); resp.address = rpc_address("127.0.0.1", 34801); - resp.__set_hp_address(host_port("127.0.0.1", 34801)); + resp.__set_hp_address(host_port("localhost", 34801)); std::string dest = utils::filesystem::path_combine( _mock_replica->dir(), duplication_constants::kDuplicationCheckpointRootDir); diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index b6f23e2d51..f810970e05 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -3056,7 +3056,6 @@ void replica_stub::update_disk_holding_replicas() void replica_stub::on_bulk_load(bulk_load_rpc rpc) { - FILL_OPTIONAL_HP_IF_NEEDED(rpc.request(), primary_addr); const bulk_load_request &request = rpc.request(); bulk_load_response &response = rpc.response(); @@ -3072,7 +3071,6 @@ void replica_stub::on_bulk_load(bulk_load_rpc rpc) void replica_stub::on_group_bulk_load(group_bulk_load_rpc rpc) { - FILL_OPTIONAL_HP_IF_NEEDED(rpc.request().config, primary); const group_bulk_load_request &request = rpc.request(); group_bulk_load_response &response = rpc.response(); diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h index 2074ff84fb..fdb75cf6d6 100644 --- a/src/replica/test/mock_utils.h +++ b/src/replica/test/mock_utils.h @@ -276,7 +276,7 @@ class mock_replica_stub : public replica_stub void set_state_connected() { _state = replica_node_state::NS_Connected; } - rpc_address get_meta_server_address() const override { return rpc_address("127.0.0.2", 12321); } + rpc_address get_meta_server_address() const override { return rpc_address("127.0.0.1", 12321); } std::map mock_replicas; diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp index 45ca0eab3c..d3b155c84c 100644 --- a/src/replica/test/replica_test.cpp +++ b/src/replica/test/replica_test.cpp @@ -273,6 +273,7 @@ TEST_F(replica_test, write_size_limited) auto write_request = dsn::message_ex::create_request(RPC_TEST); auto cleanup = dsn::defer([=]() { delete write_request; }); + header.context.u.is_forwarded = false; write_request->header = &header; std::unique_ptr sim_net( new tools::sim_network_provider(nullptr, nullptr)); diff --git a/src/runtime/rpc/network.cpp b/src/runtime/rpc/network.cpp index d739e80895..059b580a73 100644 --- a/src/runtime/rpc/network.cpp +++ b/src/runtime/rpc/network.cpp @@ -424,9 +424,6 @@ bool rpc_session::on_recv_message(message_ex *msg, int delay_ms) { if (msg->header->from_address.is_invalid()) { msg->header->from_address = _remote_addr; - } else if (msg->header->from_address != _remote_addr) { - msg->header->from_address = _remote_addr; - LOG_DEBUG("msg from_address {} not be same as socket remote_addr {}, assign it to remote_addr.", msg->header->from_address, _remote_addr); } msg->to_address = _net.address(); diff --git a/src/runtime/rpc/rpc_address.h b/src/runtime/rpc/rpc_address.h index 7b8cc68201..bee2ad39f1 100644 --- a/src/runtime/rpc/rpc_address.h +++ b/src/runtime/rpc/rpc_address.h @@ -153,7 +153,7 @@ class rpc_address switch (type()) { case HOST_TYPE_IPV4: - return ip() == r.ip() && _addr.v4.port == r.port(); + return ip() == r.ip() && port() == r.port(); case HOST_TYPE_GROUP: return _addr.group.group == r._addr.group.group; default: diff --git a/src/runtime/rpc/serialization.cpp b/src/runtime/rpc/serialization.cpp deleted file mode 100644 index ff72c73180..0000000000 --- a/src/runtime/rpc/serialization.cpp +++ /dev/null @@ -1,14 +0,0 @@ -#include "runtime/rpc/serialization.h" - -#include "dsn.layer2_types.h" - -namespace dsn { -template<> inline void unmarshall(dsn::message_ex *msg, /*out*/ partition_configuration &val) -{ - ::dsn::rpc_read_stream reader(msg); - unmarshall(reader, val, (dsn_msg_serialize_format)msg->header->context.u.serialize_format); - FILL_OPTIONAL_HP_IF_NEEDED(val, primary); - FILL_OPTIONAL_HP_LIST_IF_NEEDED(val, secondaries); - FILL_OPTIONAL_HP_LIST_IF_NEEDED(val, last_drops); -} -} diff --git a/src/runtime/test/host_port_test.cpp b/src/runtime/test/host_port_test.cpp index d4cd15e4ff..20175aaa9d 100644 --- a/src/runtime/test/host_port_test.cpp +++ b/src/runtime/test/host_port_test.cpp @@ -240,21 +240,4 @@ TEST(host_port_test, thrift_parser) send_and_check_host_port_by_serialize(hp2, DSF_THRIFT_BINARY); send_and_check_host_port_by_serialize(hp2, DSF_THRIFT_JSON); } - -TEST(host_port_test, optional_struct_macro_fuction) -{ - partition_configuration config; - config.primary = rpc_address("127.0.0.1", 8080); - config.secondaries = { rpc_address("127.0.0.1", 8081), rpc_address("127.0.0.1", 8082) }; - config.last_drops = { rpc_address("127.0.0.1", 8083) }; - - FILL_OPTIONAL_HP_IF_NEEDED(config, primary); - FILL_OPTIONAL_HP_LIST_IF_NEEDED(config, secondaries); - FILL_OPTIONAL_HP_LIST_IF_NEEDED(config, last_drops); - - ASSERT_EQ(config.hp_primary, host_port("localhost", 8080)); - ASSERT_EQ(2, config.hp_secondaries.size()); - ASSERT_EQ(host_port("localhost", 8083), config.hp_last_drops[0]); -} - } // namespace dsn diff --git a/src/runtime/test/rpc.cpp b/src/runtime/test/rpc.cpp index dd67bef01d..0ab2d5824c 100644 --- a/src/runtime/test/rpc.cpp +++ b/src/runtime/test/rpc.cpp @@ -96,10 +96,10 @@ TEST(core, group_address_talk_to_others) auto typed_callback = [addr](error_code err_code, const std::string &result) { EXPECT_EQ(ERR_OK, err_code); - dsn::rpc_address addr_got; + dsn::host_port hp_got; LOG_INFO("talk to others callback, result: {}", result); - EXPECT_TRUE(addr_got.from_string_ipv4(result.c_str())); - EXPECT_EQ(TEST_PORT_END, addr_got.port()); + EXPECT_TRUE(hp_got.from_string(result)); + EXPECT_EQ(TEST_PORT_END, hp_got.port()); }; ::dsn::task_ptr resp = ::dsn::rpc::call(addr, RPC_TEST_STRING_COMMAND, @@ -109,6 +109,7 @@ TEST(core, group_address_talk_to_others) resp->wait(); } + TEST(core, group_address_change_leader) { ::dsn::rpc_address addr = build_group(); @@ -117,10 +118,10 @@ TEST(core, group_address_change_leader) auto typed_callback = [addr, &rpc_err](error_code err_code, const std::string &result) -> void { rpc_err = err_code; if (ERR_OK == err_code) { - ::dsn::rpc_address addr_got; + dsn::host_port hp_got; LOG_INFO("talk to others callback, result: {}", result); - EXPECT_TRUE(addr_got.from_string_ipv4(result.c_str())); - EXPECT_EQ(TEST_PORT_END, addr_got.port()); + EXPECT_TRUE(hp_got.from_string(result)); + EXPECT_EQ(TEST_PORT_END, hp_got.port()); } }; @@ -240,7 +241,7 @@ TEST(core, send_to_invalid_address) { ::dsn::rpc_address group = build_group(); /* here we assume 10.255.254.253:32766 is not assigned */ - group.group_address()->set_leader(dsn::rpc_address("10.255.254.253", 32766)); + group.group_address()->set_leader(dsn::rpc_address("127.0.0.1", 32766)); rpc_reply_handler action_on_succeed = [](error_code err, dsn::message_ex *, dsn::message_ex *resp) { diff --git a/src/utils/utils.cpp b/src/utils/utils.cpp index 1f12d4c06e..65520af921 100644 --- a/src/utils/utils.cpp +++ b/src/utils/utils.cpp @@ -45,7 +45,7 @@ #include #include -#include "runtime/rpc/rpc_host_port.h" +#include "runtime/rpc/rpc_address.h" #include "utils/fmt_logging.h" #include "utils/strings.h" @@ -107,12 +107,16 @@ bool hostname_from_ip(const char *ip, std::string *hostname_result) bool hostname_from_ip_port(const char *ip_port, std::string *hostname_result) { - dsn::host_port hp; - if (!hp.from_string(ip_port)) { + dsn::rpc_address addr; + if (!addr.from_string_ipv4(ip_port)) { LOG_WARNING("invalid ip_port({})", ip_port); *hostname_result = ip_port; return false; } + if (!hostname(addr, hostname_result)) { + *hostname_result = ip_port; + return false; + } return true; }