Skip to content

Commit

Permalink
fix(FQDN): Fix the bug of crash caused by un-resolved IP address
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Jun 5, 2024
1 parent 8fd8902 commit a657566
Show file tree
Hide file tree
Showing 17 changed files with 75 additions and 36 deletions.
3 changes: 2 additions & 1 deletion src/failure_detector/test/failure_detector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ class test_master : public service_app
utils::split_args(args[2].c_str(), ports, ',');
for (auto &port : ports) {
rpc_address addr(network::get_local_ipv4(), std::stoi(port));
const auto hp = ::dsn::host_port::from_address(addr);
const auto hp = ::dsn::host_port::from_address1(addr);
ASSERT_TRUE(hp);
_master_fd->add_allow_list(hp);
}
use_allow_list = true;
Expand Down
4 changes: 3 additions & 1 deletion src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,9 @@ void bulk_load_service::partition_bulk_load(const std::string &app_name, const g
if (!bulk_load_resp.__isset.hp_group_bulk_load_state) {
bulk_load_resp.__set_hp_group_bulk_load_state({});
for (const auto & [ addr, pbls ] : bulk_load_resp.group_bulk_load_state) {
bulk_load_resp.hp_group_bulk_load_state[host_port::from_address(addr)] = pbls;
// TODO(yingchun): 'hp_group_bulk_load_state' key is possible to be invalid after
// this!
bulk_load_resp.hp_group_bulk_load_state[host_port::from_address1(addr)] = pbls;
}
}

Expand Down
7 changes: 5 additions & 2 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1595,8 +1595,10 @@ void server_state::update_configuration_locally(
}
} else {
for (const auto &secondary : config_request->config.secondaries) {
// TODO(yingchun): 'hp_group_bulk_load_state' key is possible to be invalid
// after this!
auto secondary_node =
get_node_state(_nodes, host_port::from_address(secondary), false);
get_node_state(_nodes, host_port::from_address1(secondary), false);
secondary_node->put_partition(gpid, false);
}
}
Expand All @@ -1609,7 +1611,8 @@ void server_state::update_configuration_locally(
} else {
CHECK_EQ(old_cfg.ballot, new_cfg.ballot);

const auto host_node = host_port::from_address(config_request->host_node);
// TODO(yingchun): 'host_node' is possible to be invalid after this!
const auto host_node = host_port::from_address1(config_request->host_node);
new_cfg = old_cfg;
partition_configuration_stateless pcs(new_cfg);
if (config_request->type == config_type::type::CT_ADD_SECONDARY) {
Expand Down
9 changes: 6 additions & 3 deletions src/redis_protocol/proxy_lib/proxy_layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ proxy_stub::proxy_stub(const proxy_session::factory &f,

void proxy_stub::on_rpc_request(dsn::message_ex *request)
{
auto source = ::dsn::host_port::from_address(request->header->from_address);
// TODO(yingchun): 'source' is possible to be invalid after this!
auto source = ::dsn::host_port::from_address1(request->header->from_address);
std::shared_ptr<proxy_session> session;
{
::dsn::zauto_read_lock l(_lock);
Expand All @@ -87,7 +88,8 @@ void proxy_stub::on_rpc_request(dsn::message_ex *request)

void proxy_stub::on_recv_remove_session_request(dsn::message_ex *request)
{
auto source = ::dsn::host_port::from_address(request->header->from_address);
// TODO(yingchun): 'source' is possible to be invalid after this!
auto source = ::dsn::host_port::from_address1(request->header->from_address);
remove_session(source);
}

Expand All @@ -114,7 +116,8 @@ proxy_session::proxy_session(proxy_stub *op, dsn::message_ex *first_msg)
CHECK_NOTNULL(first_msg, "null msg when create session");
_backup_one_request->add_ref();

_session_remote = ::dsn::host_port::from_address(_backup_one_request->header->from_address);
// TODO(yingchun): '_session_remote' is possible to be invalid after this!
_session_remote = ::dsn::host_port::from_address1(_backup_one_request->header->from_address);
_session_remote_str = _session_remote.to_string();
CHECK_EQ_MSG(_session_remote.type(), HOST_TYPE_IPV4, "invalid host_port type");
}
Expand Down
3 changes: 2 additions & 1 deletion src/replica/storage/simple_kv/test/case.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,8 @@ void event_on_rpc::init(message_ex *msg, task *tsk)
if (msg != nullptr) {
_trace_id = fmt::sprintf("%016llx", msg->header->trace_id);
_rpc_name = msg->header->rpc_name;
_from = address_to_node(host_port::from_address(msg->header->from_address));
// TODO(yingchun): '_from' is possible to be invalid after this!
_from = address_to_node(host_port::from_address1(msg->header->from_address));
_to = address_to_node(msg->to_host_port);
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/runtime/rpc/asio_net_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie
channel);

_address = rpc_address(get_local_ipv4(), port);
_hp = ::dsn::host_port::from_address(_address);
_hp = ::dsn::host_port::from_address1(_address);
CHECK(_hp, "rpc_address '{}' can not be reverse resolved", _address);

if (!client_only) {
auto v4_addr = boost::asio::ip::address_v4::any(); //(ntohl(_address.ip));
Expand Down Expand Up @@ -455,7 +456,8 @@ error_code asio_udp_provider::start(rpc_channel channel, int port, bool client_o
}
}

_hp = ::dsn::host_port::from_address(_address);
_hp = ::dsn::host_port::from_address1(_address);
CHECK(_hp, "rpc_address '{}' can not be reverse resolved", _address);

for (int i = 0; i < FLAGS_io_service_worker_count; i++) {
_workers.push_back(std::make_shared<std::thread>([this, i]() {
Expand Down
6 changes: 4 additions & 2 deletions src/runtime/rpc/group_host_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,12 @@ inline rpc_group_host_port::rpc_group_host_port(const rpc_group_address *g_addr)
{
_name = g_addr->name();
for (const auto &addr : g_addr->members()) {
CHECK_TRUE(add(host_port::from_address(addr)));
// TODO(yingchun): It is possible to be invalid after this!
CHECK_TRUE(add(host_port::from_address1(addr)));
}
_update_leader_automatically = g_addr->is_update_leader_automatically();
set_leader(host_port::from_address(g_addr->leader()));
// TODO(yingchun): It is possible to be invalid after this!
set_leader(host_port::from_address1(g_addr->leader()));
}

inline rpc_group_host_port &rpc_group_host_port::operator=(const rpc_group_host_port &other)
Expand Down
5 changes: 4 additions & 1 deletion src/runtime/rpc/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,14 +388,17 @@ rpc_session::rpc_session(connection_oriented_network &net,
_message_sent(0),
_net(net),
_remote_addr(remote_addr),
_remote_host_port(host_port::from_address(remote_addr)),
// TODO(yingchun): '_remote_host_port' is possible to be invalid after this!
// TODO(yingchun): It's too cost to reverse resolve host in constructor.
_remote_host_port(host_port::from_address1(remote_addr)),
_max_buffer_block_count_per_send(net.max_buffer_block_count_per_send()),
_reader(net.message_buffer_block_size()),
_parser(parser),
_is_client(is_client),
_matcher(_net.engine()->matcher()),
_delay_server_receive_ms(0)
{
CHECK(_remote_host_port, "rpc_address '{}' can not be reverse resolved", remote_addr);
if (!is_client) {
on_rpc_session_connected.execute(this);
}
Expand Down
6 changes: 4 additions & 2 deletions src/runtime/rpc/network.sim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ sim_network_provider::sim_network_provider(rpc_engine *rpc, network *inner_provi
: connection_oriented_network(rpc, inner_provider)
{
_address = rpc_address::from_host_port("localhost", 1);
_hp = ::dsn::host_port::from_address(_address);
_hp = ::dsn::host_port::from_address1(_address);
CHECK(_hp, "rpc_address '{}' can not be reverse resolved", _address);
}

error_code sim_network_provider::start(rpc_channel channel, int port, bool client_only)
Expand All @@ -171,7 +172,8 @@ error_code sim_network_provider::start(rpc_channel channel, int port, bool clien
channel);

_address = dsn::rpc_address::from_host_port("localhost", port);
_hp = ::dsn::host_port::from_address(_address);
_hp = ::dsn::host_port::from_address1(_address);
CHECK(_hp, "rpc_address '{}' can not be reverse resolved", _address);
auto hostname = boost::asio::ip::host_name();
if (!client_only) {
for (int i = NET_HDR_INVALID + 1; i <= network_header_format::max_value(); i++) {
Expand Down
14 changes: 10 additions & 4 deletions src/runtime/rpc/rpc_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,9 @@ bool rpc_client_matcher::on_recv_reply(network *net, uint64_t key, message_ex *r
case GRPC_TO_LEADER:
if (req->server_address.group_address()->is_update_leader_automatically()) {
req->server_address.group_address()->set_leader(addr);
// TODO(yingchun): leader is possible to be invalid after this!
req->server_host_port.group_host_port()->set_leader(
host_port::from_address(addr));
host_port::from_address1(addr));
}
break;
default:
Expand Down Expand Up @@ -180,8 +181,9 @@ bool rpc_client_matcher::on_recv_reply(network *net, uint64_t key, message_ex *r
req->server_address.group_address()->is_update_leader_automatically()) {
req->server_address.group_address()->set_leader(
reply->header->from_address);
// TODO(yingchun): leader is possible to be invalid after this!
req->server_host_port.group_host_port()->set_leader(
host_port::from_address(reply->header->from_address));
host_port::from_address1(reply->header->from_address));
}
break;
default:
Expand Down Expand Up @@ -522,7 +524,10 @@ error_code rpc_engine::start(const service_app_spec &aspec)

_local_primary_address = _client_nets[NET_HDR_DSN][0]->address();
_local_primary_address.set_port(aspec.ports.size() > 0 ? *aspec.ports.begin() : aspec.id);
_local_primary_host_port = host_port::from_address(_local_primary_address);
_local_primary_host_port = host_port::from_address1(_local_primary_address);
CHECK(_local_primary_host_port,
"rpc_address '{}' can not be reverse resolved",
_local_primary_address);

LOG_INFO("=== service_node=[{}], primary_address=[{}({})] ===",
_node->full_name(),
Expand Down Expand Up @@ -674,7 +679,8 @@ void rpc_engine::call_ip(rpc_address addr,
}

request->to_address = addr;
request->to_host_port = host_port::from_address(addr);
// TODO(yingchun): 'request->to_host_port' is possible to be invalid after this!
request->to_host_port = host_port::from_address1(addr);

auto sp = task_spec::get(request->local_rpc_code);
auto &hdr = *request->header;
Expand Down
12 changes: 7 additions & 5 deletions src/runtime/rpc/rpc_host_port.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,18 @@ host_port::host_port(std::string host, uint16_t port)
}
}

host_port host_port::from_address(rpc_address addr)
host_port host_port::from_address1(rpc_address addr)
{
host_port hp;
SCOPED_LOG_SLOW_EXECUTION(
WARNING, 100, "construct host_port '{}' from rpc_address '{}'", hp, addr);
switch (addr.type()) {
case HOST_TYPE_IPV4: {
CHECK_OK(lookup_hostname(htonl(addr.ip()), &hp._host),
"lookup_hostname failed for {}",
addr.ipv4_str());
const auto s = lookup_hostname(htonl(addr.ip()), &hp._host);
if (dsn_unlikely(!s)) {
LOG_WARNING("lookup_hostname failed for {}: {}", addr.ipv4_str(), s.description());
return hp;
}
hp._port = addr.port();
} break;
case HOST_TYPE_GROUP: {
Expand All @@ -70,7 +72,7 @@ host_port host_port::from_address(rpc_address addr)
break;
}

// Now is valid.
// 'hp' become valid now.
hp._type = addr.type();
return hp;
}
Expand Down
8 changes: 5 additions & 3 deletions src/runtime/rpc/rpc_host_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class TProtocol;

// Get host_port from 'obj', the result is filled in 'target', the source is from host_port type
// field 'hp_<field>' if it is set, otherwise, reverse resolve from the rpc_address '<field>'.
// TODO(yingchun): 'target' is possible to be invalid after this!
#define GET_HOST_PORT(obj, field, target) \
do { \
const auto &_obj = (obj); \
Expand All @@ -55,13 +56,14 @@ class TProtocol;
DCHECK_EQ(_obj.field, dsn::dns_resolver::instance().resolve_address(_obj.hp_##field)); \
_target = _obj.hp_##field; \
} else { \
_target = std::move(dsn::host_port::from_address(_obj.field)); \
_target = std::move(dsn::host_port::from_address1(_obj.field)); \
} \
} while (0)

// Get std::vector<host_port> from 'obj', the result is filled in 'target', the source is from
// std::vector<host_port> type field 'hp_<field>' if it is set, otherwise, reverse resolve from the
// std::vector<rpc_address> '<field>'.
// TODO(yingchun): 'target' is possible to contain invalid host_port after this!
#define GET_HOST_PORTS(obj, field, target) \
do { \
const auto &_obj = (obj); \
Expand All @@ -73,7 +75,7 @@ class TProtocol;
} else { \
_target.reserve(_obj.field.size()); \
for (const auto &addr : _obj.field) { \
_target.emplace_back(host_port::from_address(addr)); \
_target.emplace_back(host_port::from_address1(addr)); \
} \
} \
} while (0)
Expand Down Expand Up @@ -298,7 +300,7 @@ class host_port
void assign_group(const char *name);

// Construct a host_port object from 'addr'
static host_port from_address(rpc_address addr);
static host_port from_address1(rpc_address addr);

// Construct a host_port object from 'host_port_str', the latter is in the format of
// "localhost:8888".
Expand Down
3 changes: 2 additions & 1 deletion src/runtime/rpc/rpc_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ message_ex *message_ex::create_response()
// the primary address.
msg->header->from_address = to_address;
msg->to_address = header->from_address;
msg->to_host_port = host_port::from_address(header->from_address);
// TODO(yingchun): 'to_host_port' is possible to be invalid after this!
msg->to_host_port = host_port::from_address1(header->from_address);
msg->io_session = io_session;
msg->hdr_format = hdr_format;

Expand Down
9 changes: 6 additions & 3 deletions src/runtime/service_api_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,17 @@ void dsn_rpc_call(dsn::rpc_address server, dsn::rpc_response_task *rpc_call)

auto msg = rpc_call->get_request();
msg->server_address = server;
msg->server_host_port = dsn::host_port::from_address(server);
// TODO(yingchun): 'msg->server_host_port' is possible to be invalid after this!
msg->server_host_port = dsn::host_port::from_address1(server);
::dsn::task::get_current_rpc()->call(msg, dsn::rpc_response_task_ptr(rpc_call));
}

dsn::message_ex *dsn_rpc_call_wait(dsn::rpc_address server, dsn::message_ex *request)
{
auto msg = ((::dsn::message_ex *)request);
msg->server_address = server;
msg->server_host_port = dsn::host_port::from_address(server);
// TODO(yingchun): 'msg->server_host_port' is possible to be invalid after this!
msg->server_host_port = dsn::host_port::from_address1(server);

::dsn::rpc_response_task *rtask = new ::dsn::rpc_response_task(msg, nullptr, 0);
rtask->add_ref();
Expand All @@ -194,7 +196,8 @@ void dsn_rpc_call_one_way(dsn::rpc_address server, dsn::message_ex *request)
{
auto msg = ((::dsn::message_ex *)request);
msg->server_address = server;
msg->server_host_port = dsn::host_port::from_address(server);
// TODO(yingchun): 'msg->server_host_port' is possible to be invalid after this!
msg->server_host_port = dsn::host_port::from_address1(server);

::dsn::task::get_current_rpc()->call(msg, nullptr);
}
Expand Down
2 changes: 1 addition & 1 deletion src/runtime/test/dns_resolver_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ TEST(host_port_test, dns_resolver)
ASSERT_EQ(g_addr->is_update_leader_automatically(), g_hp->is_update_leader_automatically());
ASSERT_STREQ(g_addr->name(), g_hp->name());
ASSERT_EQ(g_addr->count(), g_hp->count());
ASSERT_EQ(host_port::from_address(g_addr->leader()), g_hp->leader());
ASSERT_EQ(host_port::from_address1(g_addr->leader()), g_hp->leader());
}

// Resolve host_port list.
Expand Down
6 changes: 2 additions & 4 deletions src/runtime/test/host_port_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ TEST(host_port_test, host_port_build)

{
const auto addr = rpc_address::from_host_port("localhost", 8080);
host_port hp1 = host_port::from_address(addr);
ASSERT_EQ(hp, hp1);
ASSERT_EQ(hp, host_port::from_address1(addr));
}
}

Expand Down Expand Up @@ -203,8 +202,7 @@ TEST(host_port_test, rpc_group_host_port)
ASSERT_EQ(addr2, g_addr->leader());
ASSERT_EQ(2, g_addr->count());

host_port hp_grp2;
hp_grp2 = host_port::from_address(addr_grp);
host_port hp_grp2 = host_port::from_address1(addr_grp);
ASSERT_EQ(HOST_TYPE_GROUP, hp_grp2.type());

auto g_hp = hp_grp2.group_host_port();
Expand Down
8 changes: 8 additions & 0 deletions src/utils/errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,14 @@ USER_DEFINED_STRUCTURE_FORMATTER(::dsn::error_s);
} \
} while (false)

#define WARN_NOT_OK(s, ...) \
do { \
const ::dsn::error_s &_s = (s); \
if (dsn_unlikely(!_s)) { \
LOG_WARNING("{}: {}", _s.description(), fmt::format(__VA_ARGS__)); \
} \
} while (false)

#define RETURN_ES_NOT_OK_MSG(s, ...) \
do { \
const ::dsn::error_s &_s = (s); \
Expand Down

0 comments on commit a657566

Please sign in to comment.