From a657566f1f3462e61a957099b9d33fccd5043117 Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Wed, 29 May 2024 00:47:05 +0800 Subject: [PATCH] fix(FQDN): Fix the bug of crash caused by un-resolved IP address --- src/failure_detector/test/failure_detector.cpp | 3 ++- src/meta/meta_bulk_load_service.cpp | 4 +++- src/meta/server_state.cpp | 7 +++++-- src/redis_protocol/proxy_lib/proxy_layer.cpp | 9 ++++++--- src/replica/storage/simple_kv/test/case.cpp | 3 ++- src/runtime/rpc/asio_net_provider.cpp | 6 ++++-- src/runtime/rpc/group_host_port.h | 6 ++++-- src/runtime/rpc/network.cpp | 5 ++++- src/runtime/rpc/network.sim.cpp | 6 ++++-- src/runtime/rpc/rpc_engine.cpp | 14 ++++++++++---- src/runtime/rpc/rpc_host_port.cpp | 12 +++++++----- src/runtime/rpc/rpc_host_port.h | 8 +++++--- src/runtime/rpc/rpc_message.cpp | 3 ++- src/runtime/service_api_c.cpp | 9 ++++++--- src/runtime/test/dns_resolver_test.cpp | 2 +- src/runtime/test/host_port_test.cpp | 6 ++---- src/utils/errors.h | 8 ++++++++ 17 files changed, 75 insertions(+), 36 deletions(-) diff --git a/src/failure_detector/test/failure_detector.cpp b/src/failure_detector/test/failure_detector.cpp index 1e14ebece0..21b3ab4841 100644 --- a/src/failure_detector/test/failure_detector.cpp +++ b/src/failure_detector/test/failure_detector.cpp @@ -240,7 +240,8 @@ class test_master : public service_app utils::split_args(args[2].c_str(), ports, ','); for (auto &port : ports) { rpc_address addr(network::get_local_ipv4(), std::stoi(port)); - const auto hp = ::dsn::host_port::from_address(addr); + const auto hp = ::dsn::host_port::from_address1(addr); + ASSERT_TRUE(hp); _master_fd->add_allow_list(hp); } use_allow_list = true; diff --git a/src/meta/meta_bulk_load_service.cpp b/src/meta/meta_bulk_load_service.cpp index 9dd97141e3..78df4360ff 100644 --- a/src/meta/meta_bulk_load_service.cpp +++ b/src/meta/meta_bulk_load_service.cpp @@ -458,7 +458,9 @@ void bulk_load_service::partition_bulk_load(const std::string &app_name, const g if (!bulk_load_resp.__isset.hp_group_bulk_load_state) { bulk_load_resp.__set_hp_group_bulk_load_state({}); for (const auto & [ addr, pbls ] : bulk_load_resp.group_bulk_load_state) { - bulk_load_resp.hp_group_bulk_load_state[host_port::from_address(addr)] = pbls; + // TODO(yingchun): 'hp_group_bulk_load_state' key is possible to be invalid after + // this! + bulk_load_resp.hp_group_bulk_load_state[host_port::from_address1(addr)] = pbls; } } diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index 09dc7781be..72626de071 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -1595,8 +1595,10 @@ void server_state::update_configuration_locally( } } else { for (const auto &secondary : config_request->config.secondaries) { + // TODO(yingchun): 'hp_group_bulk_load_state' key is possible to be invalid + // after this! auto secondary_node = - get_node_state(_nodes, host_port::from_address(secondary), false); + get_node_state(_nodes, host_port::from_address1(secondary), false); secondary_node->put_partition(gpid, false); } } @@ -1609,7 +1611,8 @@ void server_state::update_configuration_locally( } else { CHECK_EQ(old_cfg.ballot, new_cfg.ballot); - const auto host_node = host_port::from_address(config_request->host_node); + // TODO(yingchun): 'host_node' is possible to be invalid after this! + const auto host_node = host_port::from_address1(config_request->host_node); new_cfg = old_cfg; partition_configuration_stateless pcs(new_cfg); if (config_request->type == config_type::type::CT_ADD_SECONDARY) { diff --git a/src/redis_protocol/proxy_lib/proxy_layer.cpp b/src/redis_protocol/proxy_lib/proxy_layer.cpp index f3f15e6d5a..af8b94b46a 100644 --- a/src/redis_protocol/proxy_lib/proxy_layer.cpp +++ b/src/redis_protocol/proxy_lib/proxy_layer.cpp @@ -62,7 +62,8 @@ proxy_stub::proxy_stub(const proxy_session::factory &f, void proxy_stub::on_rpc_request(dsn::message_ex *request) { - auto source = ::dsn::host_port::from_address(request->header->from_address); + // TODO(yingchun): 'source' is possible to be invalid after this! + auto source = ::dsn::host_port::from_address1(request->header->from_address); std::shared_ptr session; { ::dsn::zauto_read_lock l(_lock); @@ -87,7 +88,8 @@ void proxy_stub::on_rpc_request(dsn::message_ex *request) void proxy_stub::on_recv_remove_session_request(dsn::message_ex *request) { - auto source = ::dsn::host_port::from_address(request->header->from_address); + // TODO(yingchun): 'source' is possible to be invalid after this! + auto source = ::dsn::host_port::from_address1(request->header->from_address); remove_session(source); } @@ -114,7 +116,8 @@ proxy_session::proxy_session(proxy_stub *op, dsn::message_ex *first_msg) CHECK_NOTNULL(first_msg, "null msg when create session"); _backup_one_request->add_ref(); - _session_remote = ::dsn::host_port::from_address(_backup_one_request->header->from_address); + // TODO(yingchun): '_session_remote' is possible to be invalid after this! + _session_remote = ::dsn::host_port::from_address1(_backup_one_request->header->from_address); _session_remote_str = _session_remote.to_string(); CHECK_EQ_MSG(_session_remote.type(), HOST_TYPE_IPV4, "invalid host_port type"); } diff --git a/src/replica/storage/simple_kv/test/case.cpp b/src/replica/storage/simple_kv/test/case.cpp index 730b04e93c..bd3b6501f6 100644 --- a/src/replica/storage/simple_kv/test/case.cpp +++ b/src/replica/storage/simple_kv/test/case.cpp @@ -534,7 +534,8 @@ void event_on_rpc::init(message_ex *msg, task *tsk) if (msg != nullptr) { _trace_id = fmt::sprintf("%016llx", msg->header->trace_id); _rpc_name = msg->header->rpc_name; - _from = address_to_node(host_port::from_address(msg->header->from_address)); + // TODO(yingchun): '_from' is possible to be invalid after this! + _from = address_to_node(host_port::from_address1(msg->header->from_address)); _to = address_to_node(msg->to_host_port); } } diff --git a/src/runtime/rpc/asio_net_provider.cpp b/src/runtime/rpc/asio_net_provider.cpp index 1bc85f2d5b..b7e0d221bf 100644 --- a/src/runtime/rpc/asio_net_provider.cpp +++ b/src/runtime/rpc/asio_net_provider.cpp @@ -146,7 +146,8 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie channel); _address = rpc_address(get_local_ipv4(), port); - _hp = ::dsn::host_port::from_address(_address); + _hp = ::dsn::host_port::from_address1(_address); + CHECK(_hp, "rpc_address '{}' can not be reverse resolved", _address); if (!client_only) { auto v4_addr = boost::asio::ip::address_v4::any(); //(ntohl(_address.ip)); @@ -455,7 +456,8 @@ error_code asio_udp_provider::start(rpc_channel channel, int port, bool client_o } } - _hp = ::dsn::host_port::from_address(_address); + _hp = ::dsn::host_port::from_address1(_address); + CHECK(_hp, "rpc_address '{}' can not be reverse resolved", _address); for (int i = 0; i < FLAGS_io_service_worker_count; i++) { _workers.push_back(std::make_shared([this, i]() { diff --git a/src/runtime/rpc/group_host_port.h b/src/runtime/rpc/group_host_port.h index f8ceea212f..f09727bc4f 100644 --- a/src/runtime/rpc/group_host_port.h +++ b/src/runtime/rpc/group_host_port.h @@ -127,10 +127,12 @@ inline rpc_group_host_port::rpc_group_host_port(const rpc_group_address *g_addr) { _name = g_addr->name(); for (const auto &addr : g_addr->members()) { - CHECK_TRUE(add(host_port::from_address(addr))); + // TODO(yingchun): It is possible to be invalid after this! + CHECK_TRUE(add(host_port::from_address1(addr))); } _update_leader_automatically = g_addr->is_update_leader_automatically(); - set_leader(host_port::from_address(g_addr->leader())); + // TODO(yingchun): It is possible to be invalid after this! + set_leader(host_port::from_address1(g_addr->leader())); } inline rpc_group_host_port &rpc_group_host_port::operator=(const rpc_group_host_port &other) diff --git a/src/runtime/rpc/network.cpp b/src/runtime/rpc/network.cpp index 72a9c55e9b..40e6b75481 100644 --- a/src/runtime/rpc/network.cpp +++ b/src/runtime/rpc/network.cpp @@ -388,7 +388,9 @@ rpc_session::rpc_session(connection_oriented_network &net, _message_sent(0), _net(net), _remote_addr(remote_addr), - _remote_host_port(host_port::from_address(remote_addr)), + // TODO(yingchun): '_remote_host_port' is possible to be invalid after this! + // TODO(yingchun): It's too cost to reverse resolve host in constructor. + _remote_host_port(host_port::from_address1(remote_addr)), _max_buffer_block_count_per_send(net.max_buffer_block_count_per_send()), _reader(net.message_buffer_block_size()), _parser(parser), @@ -396,6 +398,7 @@ rpc_session::rpc_session(connection_oriented_network &net, _matcher(_net.engine()->matcher()), _delay_server_receive_ms(0) { + CHECK(_remote_host_port, "rpc_address '{}' can not be reverse resolved", remote_addr); if (!is_client) { on_rpc_session_connected.execute(this); } diff --git a/src/runtime/rpc/network.sim.cpp b/src/runtime/rpc/network.sim.cpp index 4aad6b933a..47e1d5340d 100644 --- a/src/runtime/rpc/network.sim.cpp +++ b/src/runtime/rpc/network.sim.cpp @@ -161,7 +161,8 @@ sim_network_provider::sim_network_provider(rpc_engine *rpc, network *inner_provi : connection_oriented_network(rpc, inner_provider) { _address = rpc_address::from_host_port("localhost", 1); - _hp = ::dsn::host_port::from_address(_address); + _hp = ::dsn::host_port::from_address1(_address); + CHECK(_hp, "rpc_address '{}' can not be reverse resolved", _address); } error_code sim_network_provider::start(rpc_channel channel, int port, bool client_only) @@ -171,7 +172,8 @@ error_code sim_network_provider::start(rpc_channel channel, int port, bool clien channel); _address = dsn::rpc_address::from_host_port("localhost", port); - _hp = ::dsn::host_port::from_address(_address); + _hp = ::dsn::host_port::from_address1(_address); + CHECK(_hp, "rpc_address '{}' can not be reverse resolved", _address); auto hostname = boost::asio::ip::host_name(); if (!client_only) { for (int i = NET_HDR_INVALID + 1; i <= network_header_format::max_value(); i++) { diff --git a/src/runtime/rpc/rpc_engine.cpp b/src/runtime/rpc/rpc_engine.cpp index 0e80e7fd1e..6614ee0bed 100644 --- a/src/runtime/rpc/rpc_engine.cpp +++ b/src/runtime/rpc/rpc_engine.cpp @@ -150,8 +150,9 @@ bool rpc_client_matcher::on_recv_reply(network *net, uint64_t key, message_ex *r case GRPC_TO_LEADER: if (req->server_address.group_address()->is_update_leader_automatically()) { req->server_address.group_address()->set_leader(addr); + // TODO(yingchun): leader is possible to be invalid after this! req->server_host_port.group_host_port()->set_leader( - host_port::from_address(addr)); + host_port::from_address1(addr)); } break; default: @@ -180,8 +181,9 @@ bool rpc_client_matcher::on_recv_reply(network *net, uint64_t key, message_ex *r req->server_address.group_address()->is_update_leader_automatically()) { req->server_address.group_address()->set_leader( reply->header->from_address); + // TODO(yingchun): leader is possible to be invalid after this! req->server_host_port.group_host_port()->set_leader( - host_port::from_address(reply->header->from_address)); + host_port::from_address1(reply->header->from_address)); } break; default: @@ -522,7 +524,10 @@ error_code rpc_engine::start(const service_app_spec &aspec) _local_primary_address = _client_nets[NET_HDR_DSN][0]->address(); _local_primary_address.set_port(aspec.ports.size() > 0 ? *aspec.ports.begin() : aspec.id); - _local_primary_host_port = host_port::from_address(_local_primary_address); + _local_primary_host_port = host_port::from_address1(_local_primary_address); + CHECK(_local_primary_host_port, + "rpc_address '{}' can not be reverse resolved", + _local_primary_address); LOG_INFO("=== service_node=[{}], primary_address=[{}({})] ===", _node->full_name(), @@ -674,7 +679,8 @@ void rpc_engine::call_ip(rpc_address addr, } request->to_address = addr; - request->to_host_port = host_port::from_address(addr); + // TODO(yingchun): 'request->to_host_port' is possible to be invalid after this! + request->to_host_port = host_port::from_address1(addr); auto sp = task_spec::get(request->local_rpc_code); auto &hdr = *request->header; diff --git a/src/runtime/rpc/rpc_host_port.cpp b/src/runtime/rpc/rpc_host_port.cpp index 1e8583da68..e8cb855b51 100644 --- a/src/runtime/rpc/rpc_host_port.cpp +++ b/src/runtime/rpc/rpc_host_port.cpp @@ -51,16 +51,18 @@ host_port::host_port(std::string host, uint16_t port) } } -host_port host_port::from_address(rpc_address addr) +host_port host_port::from_address1(rpc_address addr) { host_port hp; SCOPED_LOG_SLOW_EXECUTION( WARNING, 100, "construct host_port '{}' from rpc_address '{}'", hp, addr); switch (addr.type()) { case HOST_TYPE_IPV4: { - CHECK_OK(lookup_hostname(htonl(addr.ip()), &hp._host), - "lookup_hostname failed for {}", - addr.ipv4_str()); + const auto s = lookup_hostname(htonl(addr.ip()), &hp._host); + if (dsn_unlikely(!s)) { + LOG_WARNING("lookup_hostname failed for {}: {}", addr.ipv4_str(), s.description()); + return hp; + } hp._port = addr.port(); } break; case HOST_TYPE_GROUP: { @@ -70,7 +72,7 @@ host_port host_port::from_address(rpc_address addr) break; } - // Now is valid. + // 'hp' become valid now. hp._type = addr.type(); return hp; } diff --git a/src/runtime/rpc/rpc_host_port.h b/src/runtime/rpc/rpc_host_port.h index 2a643bb4e0..d4801d90c6 100644 --- a/src/runtime/rpc/rpc_host_port.h +++ b/src/runtime/rpc/rpc_host_port.h @@ -46,6 +46,7 @@ class TProtocol; // Get host_port from 'obj', the result is filled in 'target', the source is from host_port type // field 'hp_' if it is set, otherwise, reverse resolve from the rpc_address ''. +// TODO(yingchun): 'target' is possible to be invalid after this! #define GET_HOST_PORT(obj, field, target) \ do { \ const auto &_obj = (obj); \ @@ -55,13 +56,14 @@ class TProtocol; DCHECK_EQ(_obj.field, dsn::dns_resolver::instance().resolve_address(_obj.hp_##field)); \ _target = _obj.hp_##field; \ } else { \ - _target = std::move(dsn::host_port::from_address(_obj.field)); \ + _target = std::move(dsn::host_port::from_address1(_obj.field)); \ } \ } while (0) // Get std::vector from 'obj', the result is filled in 'target', the source is from // std::vector type field 'hp_' if it is set, otherwise, reverse resolve from the // std::vector ''. +// TODO(yingchun): 'target' is possible to contain invalid host_port after this! #define GET_HOST_PORTS(obj, field, target) \ do { \ const auto &_obj = (obj); \ @@ -73,7 +75,7 @@ class TProtocol; } else { \ _target.reserve(_obj.field.size()); \ for (const auto &addr : _obj.field) { \ - _target.emplace_back(host_port::from_address(addr)); \ + _target.emplace_back(host_port::from_address1(addr)); \ } \ } \ } while (0) @@ -298,7 +300,7 @@ class host_port void assign_group(const char *name); // Construct a host_port object from 'addr' - static host_port from_address(rpc_address addr); + static host_port from_address1(rpc_address addr); // Construct a host_port object from 'host_port_str', the latter is in the format of // "localhost:8888". diff --git a/src/runtime/rpc/rpc_message.cpp b/src/runtime/rpc/rpc_message.cpp index 00412db721..79f8069049 100644 --- a/src/runtime/rpc/rpc_message.cpp +++ b/src/runtime/rpc/rpc_message.cpp @@ -355,7 +355,8 @@ message_ex *message_ex::create_response() // the primary address. msg->header->from_address = to_address; msg->to_address = header->from_address; - msg->to_host_port = host_port::from_address(header->from_address); + // TODO(yingchun): 'to_host_port' is possible to be invalid after this! + msg->to_host_port = host_port::from_address1(header->from_address); msg->io_session = io_session; msg->hdr_format = hdr_format; diff --git a/src/runtime/service_api_c.cpp b/src/runtime/service_api_c.cpp index 276fec4f3a..b37eede4c6 100644 --- a/src/runtime/service_api_c.cpp +++ b/src/runtime/service_api_c.cpp @@ -165,7 +165,8 @@ void dsn_rpc_call(dsn::rpc_address server, dsn::rpc_response_task *rpc_call) auto msg = rpc_call->get_request(); msg->server_address = server; - msg->server_host_port = dsn::host_port::from_address(server); + // TODO(yingchun): 'msg->server_host_port' is possible to be invalid after this! + msg->server_host_port = dsn::host_port::from_address1(server); ::dsn::task::get_current_rpc()->call(msg, dsn::rpc_response_task_ptr(rpc_call)); } @@ -173,7 +174,8 @@ dsn::message_ex *dsn_rpc_call_wait(dsn::rpc_address server, dsn::message_ex *req { auto msg = ((::dsn::message_ex *)request); msg->server_address = server; - msg->server_host_port = dsn::host_port::from_address(server); + // TODO(yingchun): 'msg->server_host_port' is possible to be invalid after this! + msg->server_host_port = dsn::host_port::from_address1(server); ::dsn::rpc_response_task *rtask = new ::dsn::rpc_response_task(msg, nullptr, 0); rtask->add_ref(); @@ -194,7 +196,8 @@ void dsn_rpc_call_one_way(dsn::rpc_address server, dsn::message_ex *request) { auto msg = ((::dsn::message_ex *)request); msg->server_address = server; - msg->server_host_port = dsn::host_port::from_address(server); + // TODO(yingchun): 'msg->server_host_port' is possible to be invalid after this! + msg->server_host_port = dsn::host_port::from_address1(server); ::dsn::task::get_current_rpc()->call(msg, nullptr); } diff --git a/src/runtime/test/dns_resolver_test.cpp b/src/runtime/test/dns_resolver_test.cpp index 86a0cc7e17..af300a6777 100644 --- a/src/runtime/test/dns_resolver_test.cpp +++ b/src/runtime/test/dns_resolver_test.cpp @@ -56,7 +56,7 @@ TEST(host_port_test, dns_resolver) ASSERT_EQ(g_addr->is_update_leader_automatically(), g_hp->is_update_leader_automatically()); ASSERT_STREQ(g_addr->name(), g_hp->name()); ASSERT_EQ(g_addr->count(), g_hp->count()); - ASSERT_EQ(host_port::from_address(g_addr->leader()), g_hp->leader()); + ASSERT_EQ(host_port::from_address1(g_addr->leader()), g_hp->leader()); } // Resolve host_port list. diff --git a/src/runtime/test/host_port_test.cpp b/src/runtime/test/host_port_test.cpp index b64622189f..962b02fd1c 100644 --- a/src/runtime/test/host_port_test.cpp +++ b/src/runtime/test/host_port_test.cpp @@ -71,8 +71,7 @@ TEST(host_port_test, host_port_build) { const auto addr = rpc_address::from_host_port("localhost", 8080); - host_port hp1 = host_port::from_address(addr); - ASSERT_EQ(hp, hp1); + ASSERT_EQ(hp, host_port::from_address1(addr)); } } @@ -203,8 +202,7 @@ TEST(host_port_test, rpc_group_host_port) ASSERT_EQ(addr2, g_addr->leader()); ASSERT_EQ(2, g_addr->count()); - host_port hp_grp2; - hp_grp2 = host_port::from_address(addr_grp); + host_port hp_grp2 = host_port::from_address1(addr_grp); ASSERT_EQ(HOST_TYPE_GROUP, hp_grp2.type()); auto g_hp = hp_grp2.group_host_port(); diff --git a/src/utils/errors.h b/src/utils/errors.h index c611e1beff..6f8b7f66e0 100644 --- a/src/utils/errors.h +++ b/src/utils/errors.h @@ -233,6 +233,14 @@ USER_DEFINED_STRUCTURE_FORMATTER(::dsn::error_s); } \ } while (false) +#define WARN_NOT_OK(s, ...) \ + do { \ + const ::dsn::error_s &_s = (s); \ + if (dsn_unlikely(!_s)) { \ + LOG_WARNING("{}: {}", _s.description(), fmt::format(__VA_ARGS__)); \ + } \ + } while (false) + #define RETURN_ES_NOT_OK_MSG(s, ...) \ do { \ const ::dsn::error_s &_s = (s); \