From d74f4fcb660b345b74787c4b84088406e318aaf3 Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Wed, 29 May 2024 00:47:05 +0800 Subject: [PATCH] fix(FQDN): Fix the bug of crash caused by un-resolved IP address --- .../test/failure_detector.cpp | 1 + src/meta/meta_bulk_load_service.cpp | 24 +++++++++---------- src/meta/server_state.cpp | 17 +++++++++---- src/redis_protocol/proxy_lib/proxy_layer.cpp | 11 ++++----- src/redis_protocol/proxy_lib/proxy_layer.h | 10 ++++---- src/replica/storage/simple_kv/test/case.cpp | 4 +++- src/runtime/rpc/asio_net_provider.cpp | 2 ++ src/runtime/rpc/asio_net_provider.h | 1 + src/runtime/rpc/group_host_port.h | 8 +++++-- src/runtime/rpc/network.cpp | 5 +++- src/runtime/rpc/network.sim.cpp | 2 ++ src/runtime/rpc/rpc_engine.cpp | 11 +++++---- src/runtime/rpc/rpc_host_port.cpp | 10 ++++---- src/runtime/rpc/rpc_message.h | 8 +++++-- src/runtime/service_api_c.cpp | 4 ++-- src/runtime/test/host_port_test.cpp | 6 ++--- 16 files changed, 77 insertions(+), 47 deletions(-) diff --git a/src/failure_detector/test/failure_detector.cpp b/src/failure_detector/test/failure_detector.cpp index 1e14ebece0..6917336f57 100644 --- a/src/failure_detector/test/failure_detector.cpp +++ b/src/failure_detector/test/failure_detector.cpp @@ -241,6 +241,7 @@ class test_master : public service_app for (auto &port : ports) { rpc_address addr(network::get_local_ipv4(), std::stoi(port)); const auto hp = ::dsn::host_port::from_address(addr); + CHECK(hp, "{} can not be reverse resolved", addr); _master_fd->add_allow_list(hp); } use_allow_list = true; diff --git a/src/meta/meta_bulk_load_service.cpp b/src/meta/meta_bulk_load_service.cpp index 9dd97141e3..7317d57ea1 100644 --- a/src/meta/meta_bulk_load_service.cpp +++ b/src/meta/meta_bulk_load_service.cpp @@ -451,19 +451,17 @@ void bulk_load_service::partition_bulk_load(const std::string &app_name, const g req->remote_root_path); bulk_load_rpc rpc(std::move(req), RPC_BULK_LOAD, 0_ms, 0, pid.thread_hash()); - rpc.call(pconfig.primary, _meta_svc->tracker(), [this, rpc](error_code err) mutable { - // fill host_port struct if needed - // remote server maybe not supported host_post, just have address - auto &bulk_load_resp = rpc.response(); - if (!bulk_load_resp.__isset.hp_group_bulk_load_state) { - bulk_load_resp.__set_hp_group_bulk_load_state({}); - for (const auto & [ addr, pbls ] : bulk_load_resp.group_bulk_load_state) { - bulk_load_resp.hp_group_bulk_load_state[host_port::from_address(addr)] = pbls; - } - } - - on_partition_bulk_load_reply(err, rpc.request(), rpc.response()); - }); + rpc.call( + pconfig.primary, _meta_svc->tracker(), [this, pid, rpc, pconfig](error_code err) mutable { + // The remote server may not support FQDN, but do not try to reverse resolve the + // IP addresses because they may be unresolved. Just warning and ignore this. + LOG_WARNING_IF(!rpc.response().__isset.hp_group_bulk_load_state, + "The {} primary {} doesn't support FQDN, the response " + "hp_group_bulk_load_state field is not set", + pid, + FMT_HOST_PORT_AND_IP(pconfig, primary)); + on_partition_bulk_load_reply(err, rpc.request(), rpc.response()); + }); } // ThreadPool: THREAD_POOL_META_STATE diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index 09dc7781be..3c7d749d04 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -1587,7 +1587,7 @@ void server_state::update_configuration_locally( break; case config_type::CT_REGISTER_CHILD: { ns->put_partition(gpid, true); - // TODO(yingchun): optimize this + // TODO(yingchun): optimize the duplicate loops. if (config_request->config.__isset.hp_secondaries) { for (const auto &secondary : config_request->config.hp_secondaries) { auto secondary_node = get_node_state(_nodes, secondary, false); @@ -1595,8 +1595,16 @@ void server_state::update_configuration_locally( } } else { for (const auto &secondary : config_request->config.secondaries) { - auto secondary_node = - get_node_state(_nodes, host_port::from_address(secondary), false); + const auto hp = host_port::from_address(secondary); + if (!hp) { + LOG_ERROR("The registering secondary {} for pid {} can no be reverse " + "resolved, skip registering it, please check the network " + "configuration", + secondary, + config_request->config.pid); + continue; + } + auto secondary_node = get_node_state(_nodes, hp, false); secondary_node->put_partition(gpid, false); } } @@ -1608,8 +1616,9 @@ void server_state::update_configuration_locally( } } else { CHECK_EQ(old_cfg.ballot, new_cfg.ballot); - const auto host_node = host_port::from_address(config_request->host_node); + // The non-stateful app is just for testing, so just check the host_node is resolvable. + CHECK(host_node, "{} can not be reverse resolved", config_request->host_node); new_cfg = old_cfg; partition_configuration_stateless pcs(new_cfg); if (config_request->type == config_type::type::CT_ADD_SECONDARY) { diff --git a/src/redis_protocol/proxy_lib/proxy_layer.cpp b/src/redis_protocol/proxy_lib/proxy_layer.cpp index f3f15e6d5a..b8eb1666d2 100644 --- a/src/redis_protocol/proxy_lib/proxy_layer.cpp +++ b/src/redis_protocol/proxy_lib/proxy_layer.cpp @@ -62,7 +62,7 @@ proxy_stub::proxy_stub(const proxy_session::factory &f, void proxy_stub::on_rpc_request(dsn::message_ex *request) { - auto source = ::dsn::host_port::from_address(request->header->from_address); + const auto &source = request->header->from_address; std::shared_ptr session; { ::dsn::zauto_read_lock l(_lock); @@ -87,11 +87,10 @@ void proxy_stub::on_rpc_request(dsn::message_ex *request) void proxy_stub::on_recv_remove_session_request(dsn::message_ex *request) { - auto source = ::dsn::host_port::from_address(request->header->from_address); - remove_session(source); + remove_session(request->header->from_address); } -void proxy_stub::remove_session(dsn::host_port remote) +void proxy_stub::remove_session(dsn::rpc_address remote) { std::shared_ptr session; { @@ -114,9 +113,9 @@ proxy_session::proxy_session(proxy_stub *op, dsn::message_ex *first_msg) CHECK_NOTNULL(first_msg, "null msg when create session"); _backup_one_request->add_ref(); - _session_remote = ::dsn::host_port::from_address(_backup_one_request->header->from_address); + _session_remote = _backup_one_request->header->from_address; _session_remote_str = _session_remote.to_string(); - CHECK_EQ_MSG(_session_remote.type(), HOST_TYPE_IPV4, "invalid host_port type"); + CHECK_EQ_MSG(_session_remote.type(), HOST_TYPE_IPV4, "invalid rpc_address type"); } proxy_session::~proxy_session() diff --git a/src/redis_protocol/proxy_lib/proxy_layer.h b/src/redis_protocol/proxy_lib/proxy_layer.h index 99884074ca..ce93695d22 100644 --- a/src/redis_protocol/proxy_lib/proxy_layer.h +++ b/src/redis_protocol/proxy_lib/proxy_layer.h @@ -79,8 +79,9 @@ class proxy_session : public std::enable_shared_from_this // when get message from raw parser, request & response of "dsn::message_ex*" are not in couple. // we need to backup one request to create a response struct. dsn::message_ex *_backup_one_request; - // the client for which this session served - dsn::host_port _session_remote; + // The client for which this session served for. + // The source IP address is possible to be reverse un-resolved, so use rpc_address directly. + dsn::rpc_address _session_remote; std::string _session_remote_str; }; @@ -107,14 +108,15 @@ class proxy_stub : public ::dsn::serverlet this->unregister_rpc_handler(RPC_CALL_RAW_MESSAGE); this->unregister_rpc_handler(RPC_CALL_RAW_SESSION_DISCONNECT); } - void remove_session(dsn::host_port remote_address); + void remove_session(dsn::rpc_address remote_address); private: void on_rpc_request(dsn::message_ex *request); void on_recv_remove_session_request(dsn::message_ex *); ::dsn::zrwlock_nr _lock; - std::unordered_map<::dsn::host_port, std::shared_ptr> _sessions; + // The source IP address is possible to be un-reverse resolved, so use rpc_address. + std::unordered_map<::dsn::rpc_address, std::shared_ptr> _sessions; proxy_session::factory _factory; ::dsn::host_port _uri_address; std::string _cluster; diff --git a/src/replica/storage/simple_kv/test/case.cpp b/src/replica/storage/simple_kv/test/case.cpp index 730b04e93c..1559407343 100644 --- a/src/replica/storage/simple_kv/test/case.cpp +++ b/src/replica/storage/simple_kv/test/case.cpp @@ -534,7 +534,9 @@ void event_on_rpc::init(message_ex *msg, task *tsk) if (msg != nullptr) { _trace_id = fmt::sprintf("%016llx", msg->header->trace_id); _rpc_name = msg->header->rpc_name; - _from = address_to_node(host_port::from_address(msg->header->from_address)); + const auto hp = host_port::from_address(msg->header->from_address); + CHECK(hp, "{} can not be reverse resolved", msg->header->from_address); + _from = address_to_node(hp); _to = address_to_node(msg->to_host_port); } } diff --git a/src/runtime/rpc/asio_net_provider.cpp b/src/runtime/rpc/asio_net_provider.cpp index 1bc85f2d5b..8c9457cbb3 100644 --- a/src/runtime/rpc/asio_net_provider.cpp +++ b/src/runtime/rpc/asio_net_provider.cpp @@ -147,6 +147,7 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie _address = rpc_address(get_local_ipv4(), port); _hp = ::dsn::host_port::from_address(_address); + LOG_WARNING_IF(!_hp, "'{}' can not be reverse resolved", _address); if (!client_only) { auto v4_addr = boost::asio::ip::address_v4::any(); //(ntohl(_address.ip)); @@ -456,6 +457,7 @@ error_code asio_udp_provider::start(rpc_channel channel, int port, bool client_o } _hp = ::dsn::host_port::from_address(_address); + LOG_WARNING_IF(!_hp, "'{}' can not be reverse resolved", _address); for (int i = 0; i < FLAGS_io_service_worker_count; i++) { _workers.push_back(std::make_shared([this, i]() { diff --git a/src/runtime/rpc/asio_net_provider.h b/src/runtime/rpc/asio_net_provider.h index 20eb6f9d91..c26ce1b191 100644 --- a/src/runtime/rpc/asio_net_provider.h +++ b/src/runtime/rpc/asio_net_provider.h @@ -95,6 +95,7 @@ class asio_network_provider : public connection_oriented_network std::vector> _io_services; std::vector> _workers; ::dsn::rpc_address _address; + // NOTE: '_hp' is possible to be invalid if '_address' can not be reverse resolved. ::dsn::host_port _hp; }; diff --git a/src/runtime/rpc/group_host_port.h b/src/runtime/rpc/group_host_port.h index f8ceea212f..1110e99b83 100644 --- a/src/runtime/rpc/group_host_port.h +++ b/src/runtime/rpc/group_host_port.h @@ -127,10 +127,14 @@ inline rpc_group_host_port::rpc_group_host_port(const rpc_group_address *g_addr) { _name = g_addr->name(); for (const auto &addr : g_addr->members()) { - CHECK_TRUE(add(host_port::from_address(addr))); + const auto hp = host_port::from_address(addr); + CHECK(hp, "'{}' can not be reverse resolved", addr); + CHECK_TRUE(add(hp)); } _update_leader_automatically = g_addr->is_update_leader_automatically(); - set_leader(host_port::from_address(g_addr->leader())); + const auto hp = host_port::from_address(g_addr->leader()); + CHECK(hp, "'{}' can not be reverse resolved", g_addr->leader()); + set_leader(hp); } inline rpc_group_host_port &rpc_group_host_port::operator=(const rpc_group_host_port &other) diff --git a/src/runtime/rpc/network.cpp b/src/runtime/rpc/network.cpp index 72a9c55e9b..2c506fd720 100644 --- a/src/runtime/rpc/network.cpp +++ b/src/runtime/rpc/network.cpp @@ -388,7 +388,9 @@ rpc_session::rpc_session(connection_oriented_network &net, _message_sent(0), _net(net), _remote_addr(remote_addr), - _remote_host_port(host_port::from_address(remote_addr)), + // TODO(yingchun): '_remote_host_port' is possible to be invalid after this! + // TODO(yingchun): It's too cost to reverse resolve host in constructor. + _remote_host_port(host_port::from_address(_remote_addr)), _max_buffer_block_count_per_send(net.max_buffer_block_count_per_send()), _reader(net.message_buffer_block_size()), _parser(parser), @@ -396,6 +398,7 @@ rpc_session::rpc_session(connection_oriented_network &net, _matcher(_net.engine()->matcher()), _delay_server_receive_ms(0) { + LOG_WARNING_IF(!_remote_host_port, "'{}' can not be reverse resolved", _remote_addr); if (!is_client) { on_rpc_session_connected.execute(this); } diff --git a/src/runtime/rpc/network.sim.cpp b/src/runtime/rpc/network.sim.cpp index 4aad6b933a..ca1986ae8b 100644 --- a/src/runtime/rpc/network.sim.cpp +++ b/src/runtime/rpc/network.sim.cpp @@ -162,6 +162,7 @@ sim_network_provider::sim_network_provider(rpc_engine *rpc, network *inner_provi { _address = rpc_address::from_host_port("localhost", 1); _hp = ::dsn::host_port::from_address(_address); + LOG_WARNING_IF(!_hp, "'{}' can not be reverse resolved", _address); } error_code sim_network_provider::start(rpc_channel channel, int port, bool client_only) @@ -172,6 +173,7 @@ error_code sim_network_provider::start(rpc_channel channel, int port, bool clien _address = dsn::rpc_address::from_host_port("localhost", port); _hp = ::dsn::host_port::from_address(_address); + LOG_WARNING_IF(!_hp, "'{}' can not be reverse resolved", _address); auto hostname = boost::asio::ip::host_name(); if (!client_only) { for (int i = NET_HDR_INVALID + 1; i <= network_header_format::max_value(); i++) { diff --git a/src/runtime/rpc/rpc_engine.cpp b/src/runtime/rpc/rpc_engine.cpp index 0e80e7fd1e..bd7241de5f 100644 --- a/src/runtime/rpc/rpc_engine.cpp +++ b/src/runtime/rpc/rpc_engine.cpp @@ -150,8 +150,9 @@ bool rpc_client_matcher::on_recv_reply(network *net, uint64_t key, message_ex *r case GRPC_TO_LEADER: if (req->server_address.group_address()->is_update_leader_automatically()) { req->server_address.group_address()->set_leader(addr); - req->server_host_port.group_host_port()->set_leader( - host_port::from_address(addr)); + const auto hp = host_port::from_address(addr); + CHECK(hp, "{} can not be reverse resolved", addr); + req->server_host_port.group_host_port()->set_leader(hp); } break; default: @@ -180,8 +181,9 @@ bool rpc_client_matcher::on_recv_reply(network *net, uint64_t key, message_ex *r req->server_address.group_address()->is_update_leader_automatically()) { req->server_address.group_address()->set_leader( reply->header->from_address); - req->server_host_port.group_host_port()->set_leader( - host_port::from_address(reply->header->from_address)); + const auto hp = host_port::from_address(reply->header->from_address); + CHECK(hp, "{} can not be reverse resolved", reply->header->from_address); + req->server_host_port.group_host_port()->set_leader(hp); } break; default: @@ -523,6 +525,7 @@ error_code rpc_engine::start(const service_app_spec &aspec) _local_primary_address = _client_nets[NET_HDR_DSN][0]->address(); _local_primary_address.set_port(aspec.ports.size() > 0 ? *aspec.ports.begin() : aspec.id); _local_primary_host_port = host_port::from_address(_local_primary_address); + CHECK(_local_primary_host_port, "'{}' can not be reverse resolved", _local_primary_address); LOG_INFO("=== service_node=[{}], primary_address=[{}({})] ===", _node->full_name(), diff --git a/src/runtime/rpc/rpc_host_port.cpp b/src/runtime/rpc/rpc_host_port.cpp index 1e8583da68..42ed764010 100644 --- a/src/runtime/rpc/rpc_host_port.cpp +++ b/src/runtime/rpc/rpc_host_port.cpp @@ -58,9 +58,11 @@ host_port host_port::from_address(rpc_address addr) WARNING, 100, "construct host_port '{}' from rpc_address '{}'", hp, addr); switch (addr.type()) { case HOST_TYPE_IPV4: { - CHECK_OK(lookup_hostname(htonl(addr.ip()), &hp._host), - "lookup_hostname failed for {}", - addr.ipv4_str()); + const auto s = lookup_hostname(htonl(addr.ip()), &hp._host); + if (dsn_unlikely(!s)) { + LOG_WARNING("lookup_hostname failed for {}: {}", addr.ipv4_str(), s.description()); + return hp; + } hp._port = addr.port(); } break; case HOST_TYPE_GROUP: { @@ -70,7 +72,7 @@ host_port host_port::from_address(rpc_address addr) break; } - // Now is valid. + // 'hp' become valid now. hp._type = addr.type(); return hp; } diff --git a/src/runtime/rpc/rpc_message.h b/src/runtime/rpc/rpc_message.h index 63e4ecb5be..0645b48cb7 100644 --- a/src/runtime/rpc/rpc_message.h +++ b/src/runtime/rpc/rpc_message.h @@ -138,8 +138,12 @@ class message_ex : public ref_counter, public extensible_object rpc_session_ptr io_session; // send/recv session rpc_address to_address; // always ipv4/v6 address, it is the to_node's net address rpc_address server_address; // used by requests, and may be of uri/group address - host_port to_host_port; // fqdn from 'to_address' - host_port server_host_port; // fqdn from 'server_address' + // hostname from 'to_address'. It's possible to be invalid if 'to_address' can not be reverse + // resolved. + host_port to_host_port; + // hostname from 'server_address'. It's possible to be invalid if 'server_address' can not be + // reverse resolved. + host_port server_host_port; dsn::task_code local_rpc_code; network_header_format hdr_format; int send_retry_count; diff --git a/src/runtime/service_api_c.cpp b/src/runtime/service_api_c.cpp index 276fec4f3a..f4ed022344 100644 --- a/src/runtime/service_api_c.cpp +++ b/src/runtime/service_api_c.cpp @@ -165,7 +165,7 @@ void dsn_rpc_call(dsn::rpc_address server, dsn::rpc_response_task *rpc_call) auto msg = rpc_call->get_request(); msg->server_address = server; - msg->server_host_port = dsn::host_port::from_address(server); + msg->server_host_port = dsn::host_port::from_address(msg->server_address); ::dsn::task::get_current_rpc()->call(msg, dsn::rpc_response_task_ptr(rpc_call)); } @@ -173,7 +173,7 @@ dsn::message_ex *dsn_rpc_call_wait(dsn::rpc_address server, dsn::message_ex *req { auto msg = ((::dsn::message_ex *)request); msg->server_address = server; - msg->server_host_port = dsn::host_port::from_address(server); + msg->server_host_port = dsn::host_port::from_address(msg->server_address); ::dsn::rpc_response_task *rtask = new ::dsn::rpc_response_task(msg, nullptr, 0); rtask->add_ref(); diff --git a/src/runtime/test/host_port_test.cpp b/src/runtime/test/host_port_test.cpp index b64622189f..70d3699e1e 100644 --- a/src/runtime/test/host_port_test.cpp +++ b/src/runtime/test/host_port_test.cpp @@ -71,8 +71,7 @@ TEST(host_port_test, host_port_build) { const auto addr = rpc_address::from_host_port("localhost", 8080); - host_port hp1 = host_port::from_address(addr); - ASSERT_EQ(hp, hp1); + ASSERT_EQ(hp, host_port::from_address(addr)); } } @@ -203,8 +202,7 @@ TEST(host_port_test, rpc_group_host_port) ASSERT_EQ(addr2, g_addr->leader()); ASSERT_EQ(2, g_addr->count()); - host_port hp_grp2; - hp_grp2 = host_port::from_address(addr_grp); + host_port hp_grp2 = host_port::from_address(addr_grp); ASSERT_EQ(HOST_TYPE_GROUP, hp_grp2.type()); auto g_hp = hp_grp2.group_host_port();