Skip to content

Commit

Permalink
fix(FQDN): Fix the bug of crash caused by un-resolved IP address
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Jun 6, 2024
1 parent 8fd8902 commit d74f4fc
Show file tree
Hide file tree
Showing 16 changed files with 77 additions and 47 deletions.
1 change: 1 addition & 0 deletions src/failure_detector/test/failure_detector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ class test_master : public service_app
for (auto &port : ports) {
rpc_address addr(network::get_local_ipv4(), std::stoi(port));
const auto hp = ::dsn::host_port::from_address(addr);
CHECK(hp, "{} can not be reverse resolved", addr);
_master_fd->add_allow_list(hp);
}
use_allow_list = true;
Expand Down
24 changes: 11 additions & 13 deletions src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,19 +451,17 @@ void bulk_load_service::partition_bulk_load(const std::string &app_name, const g
req->remote_root_path);

bulk_load_rpc rpc(std::move(req), RPC_BULK_LOAD, 0_ms, 0, pid.thread_hash());
rpc.call(pconfig.primary, _meta_svc->tracker(), [this, rpc](error_code err) mutable {
// fill host_port struct if needed
// remote server maybe not supported host_post, just have address
auto &bulk_load_resp = rpc.response();
if (!bulk_load_resp.__isset.hp_group_bulk_load_state) {
bulk_load_resp.__set_hp_group_bulk_load_state({});
for (const auto & [ addr, pbls ] : bulk_load_resp.group_bulk_load_state) {
bulk_load_resp.hp_group_bulk_load_state[host_port::from_address(addr)] = pbls;
}
}

on_partition_bulk_load_reply(err, rpc.request(), rpc.response());
});
rpc.call(
pconfig.primary, _meta_svc->tracker(), [this, pid, rpc, pconfig](error_code err) mutable {
// The remote server may not support FQDN, but do not try to reverse resolve the
// IP addresses because they may be unresolved. Just warning and ignore this.
LOG_WARNING_IF(!rpc.response().__isset.hp_group_bulk_load_state,
"The {} primary {} doesn't support FQDN, the response "
"hp_group_bulk_load_state field is not set",
pid,
FMT_HOST_PORT_AND_IP(pconfig, primary));
on_partition_bulk_load_reply(err, rpc.request(), rpc.response());
});
}

// ThreadPool: THREAD_POOL_META_STATE
Expand Down
17 changes: 13 additions & 4 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1587,16 +1587,24 @@ void server_state::update_configuration_locally(
break;
case config_type::CT_REGISTER_CHILD: {
ns->put_partition(gpid, true);
// TODO(yingchun): optimize this
// TODO(yingchun): optimize the duplicate loops.
if (config_request->config.__isset.hp_secondaries) {
for (const auto &secondary : config_request->config.hp_secondaries) {
auto secondary_node = get_node_state(_nodes, secondary, false);
secondary_node->put_partition(gpid, false);
}
} else {
for (const auto &secondary : config_request->config.secondaries) {
auto secondary_node =
get_node_state(_nodes, host_port::from_address(secondary), false);
const auto hp = host_port::from_address(secondary);
if (!hp) {
LOG_ERROR("The registering secondary {} for pid {} can no be reverse "
"resolved, skip registering it, please check the network "
"configuration",
secondary,
config_request->config.pid);
continue;
}
auto secondary_node = get_node_state(_nodes, hp, false);
secondary_node->put_partition(gpid, false);
}
}
Expand All @@ -1608,8 +1616,9 @@ void server_state::update_configuration_locally(
}
} else {
CHECK_EQ(old_cfg.ballot, new_cfg.ballot);

const auto host_node = host_port::from_address(config_request->host_node);
// The non-stateful app is just for testing, so just check the host_node is resolvable.
CHECK(host_node, "{} can not be reverse resolved", config_request->host_node);
new_cfg = old_cfg;
partition_configuration_stateless pcs(new_cfg);
if (config_request->type == config_type::type::CT_ADD_SECONDARY) {
Expand Down
11 changes: 5 additions & 6 deletions src/redis_protocol/proxy_lib/proxy_layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ proxy_stub::proxy_stub(const proxy_session::factory &f,

void proxy_stub::on_rpc_request(dsn::message_ex *request)
{
auto source = ::dsn::host_port::from_address(request->header->from_address);
const auto &source = request->header->from_address;
std::shared_ptr<proxy_session> session;
{
::dsn::zauto_read_lock l(_lock);
Expand All @@ -87,11 +87,10 @@ void proxy_stub::on_rpc_request(dsn::message_ex *request)

void proxy_stub::on_recv_remove_session_request(dsn::message_ex *request)
{
auto source = ::dsn::host_port::from_address(request->header->from_address);
remove_session(source);
remove_session(request->header->from_address);
}

void proxy_stub::remove_session(dsn::host_port remote)
void proxy_stub::remove_session(dsn::rpc_address remote)
{
std::shared_ptr<proxy_session> session;
{
Expand All @@ -114,9 +113,9 @@ proxy_session::proxy_session(proxy_stub *op, dsn::message_ex *first_msg)
CHECK_NOTNULL(first_msg, "null msg when create session");
_backup_one_request->add_ref();

_session_remote = ::dsn::host_port::from_address(_backup_one_request->header->from_address);
_session_remote = _backup_one_request->header->from_address;
_session_remote_str = _session_remote.to_string();
CHECK_EQ_MSG(_session_remote.type(), HOST_TYPE_IPV4, "invalid host_port type");
CHECK_EQ_MSG(_session_remote.type(), HOST_TYPE_IPV4, "invalid rpc_address type");
}

proxy_session::~proxy_session()
Expand Down
10 changes: 6 additions & 4 deletions src/redis_protocol/proxy_lib/proxy_layer.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ class proxy_session : public std::enable_shared_from_this<proxy_session>
// when get message from raw parser, request & response of "dsn::message_ex*" are not in couple.
// we need to backup one request to create a response struct.
dsn::message_ex *_backup_one_request;
// the client for which this session served
dsn::host_port _session_remote;
// The client for which this session served for.
// The source IP address is possible to be reverse un-resolved, so use rpc_address directly.
dsn::rpc_address _session_remote;
std::string _session_remote_str;
};

Expand All @@ -107,14 +108,15 @@ class proxy_stub : public ::dsn::serverlet<proxy_stub>
this->unregister_rpc_handler(RPC_CALL_RAW_MESSAGE);
this->unregister_rpc_handler(RPC_CALL_RAW_SESSION_DISCONNECT);
}
void remove_session(dsn::host_port remote_address);
void remove_session(dsn::rpc_address remote_address);

private:
void on_rpc_request(dsn::message_ex *request);
void on_recv_remove_session_request(dsn::message_ex *);

::dsn::zrwlock_nr _lock;
std::unordered_map<::dsn::host_port, std::shared_ptr<proxy_session>> _sessions;
// The source IP address is possible to be un-reverse resolved, so use rpc_address.
std::unordered_map<::dsn::rpc_address, std::shared_ptr<proxy_session>> _sessions;
proxy_session::factory _factory;
::dsn::host_port _uri_address;
std::string _cluster;
Expand Down
4 changes: 3 additions & 1 deletion src/replica/storage/simple_kv/test/case.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,9 @@ void event_on_rpc::init(message_ex *msg, task *tsk)
if (msg != nullptr) {
_trace_id = fmt::sprintf("%016llx", msg->header->trace_id);
_rpc_name = msg->header->rpc_name;
_from = address_to_node(host_port::from_address(msg->header->from_address));
const auto hp = host_port::from_address(msg->header->from_address);
CHECK(hp, "{} can not be reverse resolved", msg->header->from_address);
_from = address_to_node(hp);
_to = address_to_node(msg->to_host_port);
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/runtime/rpc/asio_net_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie

_address = rpc_address(get_local_ipv4(), port);
_hp = ::dsn::host_port::from_address(_address);
LOG_WARNING_IF(!_hp, "'{}' can not be reverse resolved", _address);

if (!client_only) {
auto v4_addr = boost::asio::ip::address_v4::any(); //(ntohl(_address.ip));
Expand Down Expand Up @@ -456,6 +457,7 @@ error_code asio_udp_provider::start(rpc_channel channel, int port, bool client_o
}

_hp = ::dsn::host_port::from_address(_address);
LOG_WARNING_IF(!_hp, "'{}' can not be reverse resolved", _address);

for (int i = 0; i < FLAGS_io_service_worker_count; i++) {
_workers.push_back(std::make_shared<std::thread>([this, i]() {
Expand Down
1 change: 1 addition & 0 deletions src/runtime/rpc/asio_net_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class asio_network_provider : public connection_oriented_network
std::vector<std::unique_ptr<boost::asio::io_service>> _io_services;
std::vector<std::shared_ptr<std::thread>> _workers;
::dsn::rpc_address _address;
// NOTE: '_hp' is possible to be invalid if '_address' can not be reverse resolved.
::dsn::host_port _hp;
};

Expand Down
8 changes: 6 additions & 2 deletions src/runtime/rpc/group_host_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,14 @@ inline rpc_group_host_port::rpc_group_host_port(const rpc_group_address *g_addr)
{
_name = g_addr->name();
for (const auto &addr : g_addr->members()) {
CHECK_TRUE(add(host_port::from_address(addr)));
const auto hp = host_port::from_address(addr);
CHECK(hp, "'{}' can not be reverse resolved", addr);
CHECK_TRUE(add(hp));
}
_update_leader_automatically = g_addr->is_update_leader_automatically();
set_leader(host_port::from_address(g_addr->leader()));
const auto hp = host_port::from_address(g_addr->leader());
CHECK(hp, "'{}' can not be reverse resolved", g_addr->leader());
set_leader(hp);
}

inline rpc_group_host_port &rpc_group_host_port::operator=(const rpc_group_host_port &other)
Expand Down
5 changes: 4 additions & 1 deletion src/runtime/rpc/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,14 +388,17 @@ rpc_session::rpc_session(connection_oriented_network &net,
_message_sent(0),
_net(net),
_remote_addr(remote_addr),
_remote_host_port(host_port::from_address(remote_addr)),
// TODO(yingchun): '_remote_host_port' is possible to be invalid after this!
// TODO(yingchun): It's too cost to reverse resolve host in constructor.
_remote_host_port(host_port::from_address(_remote_addr)),
_max_buffer_block_count_per_send(net.max_buffer_block_count_per_send()),
_reader(net.message_buffer_block_size()),
_parser(parser),
_is_client(is_client),
_matcher(_net.engine()->matcher()),
_delay_server_receive_ms(0)
{
LOG_WARNING_IF(!_remote_host_port, "'{}' can not be reverse resolved", _remote_addr);
if (!is_client) {
on_rpc_session_connected.execute(this);
}
Expand Down
2 changes: 2 additions & 0 deletions src/runtime/rpc/network.sim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ sim_network_provider::sim_network_provider(rpc_engine *rpc, network *inner_provi
{
_address = rpc_address::from_host_port("localhost", 1);
_hp = ::dsn::host_port::from_address(_address);
LOG_WARNING_IF(!_hp, "'{}' can not be reverse resolved", _address);
}

error_code sim_network_provider::start(rpc_channel channel, int port, bool client_only)
Expand All @@ -172,6 +173,7 @@ error_code sim_network_provider::start(rpc_channel channel, int port, bool clien

_address = dsn::rpc_address::from_host_port("localhost", port);
_hp = ::dsn::host_port::from_address(_address);
LOG_WARNING_IF(!_hp, "'{}' can not be reverse resolved", _address);
auto hostname = boost::asio::ip::host_name();
if (!client_only) {
for (int i = NET_HDR_INVALID + 1; i <= network_header_format::max_value(); i++) {
Expand Down
11 changes: 7 additions & 4 deletions src/runtime/rpc/rpc_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,9 @@ bool rpc_client_matcher::on_recv_reply(network *net, uint64_t key, message_ex *r
case GRPC_TO_LEADER:
if (req->server_address.group_address()->is_update_leader_automatically()) {
req->server_address.group_address()->set_leader(addr);
req->server_host_port.group_host_port()->set_leader(
host_port::from_address(addr));
const auto hp = host_port::from_address(addr);
CHECK(hp, "{} can not be reverse resolved", addr);
req->server_host_port.group_host_port()->set_leader(hp);
}
break;
default:
Expand Down Expand Up @@ -180,8 +181,9 @@ bool rpc_client_matcher::on_recv_reply(network *net, uint64_t key, message_ex *r
req->server_address.group_address()->is_update_leader_automatically()) {
req->server_address.group_address()->set_leader(
reply->header->from_address);
req->server_host_port.group_host_port()->set_leader(
host_port::from_address(reply->header->from_address));
const auto hp = host_port::from_address(reply->header->from_address);
CHECK(hp, "{} can not be reverse resolved", reply->header->from_address);
req->server_host_port.group_host_port()->set_leader(hp);
}
break;
default:
Expand Down Expand Up @@ -523,6 +525,7 @@ error_code rpc_engine::start(const service_app_spec &aspec)
_local_primary_address = _client_nets[NET_HDR_DSN][0]->address();
_local_primary_address.set_port(aspec.ports.size() > 0 ? *aspec.ports.begin() : aspec.id);
_local_primary_host_port = host_port::from_address(_local_primary_address);
CHECK(_local_primary_host_port, "'{}' can not be reverse resolved", _local_primary_address);

LOG_INFO("=== service_node=[{}], primary_address=[{}({})] ===",
_node->full_name(),
Expand Down
10 changes: 6 additions & 4 deletions src/runtime/rpc/rpc_host_port.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ host_port host_port::from_address(rpc_address addr)
WARNING, 100, "construct host_port '{}' from rpc_address '{}'", hp, addr);
switch (addr.type()) {
case HOST_TYPE_IPV4: {
CHECK_OK(lookup_hostname(htonl(addr.ip()), &hp._host),
"lookup_hostname failed for {}",
addr.ipv4_str());
const auto s = lookup_hostname(htonl(addr.ip()), &hp._host);
if (dsn_unlikely(!s)) {
LOG_WARNING("lookup_hostname failed for {}: {}", addr.ipv4_str(), s.description());
return hp;
}
hp._port = addr.port();
} break;
case HOST_TYPE_GROUP: {
Expand All @@ -70,7 +72,7 @@ host_port host_port::from_address(rpc_address addr)
break;
}

// Now is valid.
// 'hp' become valid now.
hp._type = addr.type();
return hp;
}
Expand Down
8 changes: 6 additions & 2 deletions src/runtime/rpc/rpc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,12 @@ class message_ex : public ref_counter, public extensible_object<message_ex, 4>
rpc_session_ptr io_session; // send/recv session
rpc_address to_address; // always ipv4/v6 address, it is the to_node's net address
rpc_address server_address; // used by requests, and may be of uri/group address
host_port to_host_port; // fqdn from 'to_address'
host_port server_host_port; // fqdn from 'server_address'
// hostname from 'to_address'. It's possible to be invalid if 'to_address' can not be reverse
// resolved.
host_port to_host_port;
// hostname from 'server_address'. It's possible to be invalid if 'server_address' can not be
// reverse resolved.
host_port server_host_port;
dsn::task_code local_rpc_code;
network_header_format hdr_format;
int send_retry_count;
Expand Down
4 changes: 2 additions & 2 deletions src/runtime/service_api_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,15 @@ void dsn_rpc_call(dsn::rpc_address server, dsn::rpc_response_task *rpc_call)

auto msg = rpc_call->get_request();
msg->server_address = server;
msg->server_host_port = dsn::host_port::from_address(server);
msg->server_host_port = dsn::host_port::from_address(msg->server_address);
::dsn::task::get_current_rpc()->call(msg, dsn::rpc_response_task_ptr(rpc_call));
}

dsn::message_ex *dsn_rpc_call_wait(dsn::rpc_address server, dsn::message_ex *request)
{
auto msg = ((::dsn::message_ex *)request);
msg->server_address = server;
msg->server_host_port = dsn::host_port::from_address(server);
msg->server_host_port = dsn::host_port::from_address(msg->server_address);

::dsn::rpc_response_task *rtask = new ::dsn::rpc_response_task(msg, nullptr, 0);
rtask->add_ref();
Expand Down
6 changes: 2 additions & 4 deletions src/runtime/test/host_port_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ TEST(host_port_test, host_port_build)

{
const auto addr = rpc_address::from_host_port("localhost", 8080);
host_port hp1 = host_port::from_address(addr);
ASSERT_EQ(hp, hp1);
ASSERT_EQ(hp, host_port::from_address(addr));
}
}

Expand Down Expand Up @@ -203,8 +202,7 @@ TEST(host_port_test, rpc_group_host_port)
ASSERT_EQ(addr2, g_addr->leader());
ASSERT_EQ(2, g_addr->count());

host_port hp_grp2;
hp_grp2 = host_port::from_address(addr_grp);
host_port hp_grp2 = host_port::from_address(addr_grp);
ASSERT_EQ(HOST_TYPE_GROUP, hp_grp2.type());

auto g_hp = hp_grp2.group_host_port();
Expand Down

0 comments on commit d74f4fc

Please sign in to comment.