Skip to content

Commit

Permalink
lyc 16
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Jan 24, 2024
1 parent b16a879 commit 5dbba78
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 47 deletions.
20 changes: 11 additions & 9 deletions src/runtime/rpc/asio_net_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,19 +334,19 @@ void asio_udp_provider::do_receive()
return;
}

// Get the remote endpoint of the socket.
boost::system::error_code ec;
::dsn::rpc_address remote_addr;
auto remote = _socket->remote_endpoint(ec);
if (ec) {
LOG_ERROR("failed to get the remote endpoint: {}", ec.message());
do_receive();
return;
} else {
auto ip = remote.address().to_v4().to_ulong();
auto port = remote.port();
remote_addr = ::dsn::rpc_address(ip, port);
}

auto ip = remote.address().to_v4().to_ulong();
auto port = remote.port();
::dsn::rpc_address remote_addr = ::dsn::rpc_address(ip, port);

auto hdr_format = message_parser::get_header_type(_recv_reader._buffer.data());
if (NET_HDR_INVALID == hdr_format) {
LOG_ERROR("{}: asio udp read failed: invalid header type '{}'",
Expand All @@ -373,13 +373,15 @@ void asio_udp_provider::do_receive()
if (msg->header->from_address != remote_addr) {
if (!msg->header->context.u.is_forwarded) {
msg->header->from_address = remote_addr;
LOG_DEBUG("msg from_address {} not be same as socket remote_addr {}, assign it "
"to remote_addr.",
LOG_DEBUG("{}: message's from_address {} is not equal to socket's remote_addr "
"{}, assign it to remote_addr.",
_address,
msg->header->from_address,
remote_addr);
} else {
LOG_DEBUG("msg from_address {} not be same as socket remote_addr {}, but it's "
"forwarded message, ignore it!.",
LOG_DEBUG("{}: message's from_address {} is not equal to socket's remote_addr "
"{}, but it's forwarded message, ignore it!.",
_address,
msg->header->from_address,
remote_addr);
}
Expand Down
76 changes: 44 additions & 32 deletions src/runtime/rpc/dns_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,37 @@
#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/group_address.h"
#include "runtime/rpc/group_host_port.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"

namespace dsn {
METRIC_DEFINE_gauge_int64(server,
dns_resolver_cache_size,
dsn::metric_unit::kKeys,
"The size of the host_port to rpc_address resolve results cache");

METRIC_DEFINE_percentile_int64(
server,
dns_resolver_resolve_duration_ns,
dsn::metric_unit::kNanoSeconds,
"The duration of resolving a host port, may either get from cache or resolve by DNS lookup");

DSN_DEFINE_bool(network,
use_cache_resolve_hostname,
true,
"Whether to use the cache to store the "
"association between hostname and "
"address, false means that a "
"system-level query is required every "
"time");
METRIC_DEFINE_percentile_int64(server,
dns_resolver_resolve_by_dns_duration_ns,
dsn::metric_unit::kNanoSeconds,
"The duration of resolving a host port by DNS lookup");
namespace dsn {

void dns_resolver::add_item(const host_port &hp, const rpc_address &addr)
dns_resolver::dns_resolver()
: METRIC_VAR_INIT_server(dns_resolver_cache_size),
METRIC_VAR_INIT_server(dns_resolver_resolve_duration_ns),
METRIC_VAR_INIT_server(dns_resolver_resolve_by_dns_duration_ns)
{
utils::auto_write_lock l(_lock);
_dsn_cache.insert(std::make_pair(hp, addr));
}

bool dns_resolver::get_cached_addresses(const host_port &hp, std::vector<rpc_address> &addresses)
{
utils::auto_read_lock l(_lock);
const auto &found = _dsn_cache.find(hp);
if (found == _dsn_cache.end()) {
const auto &found = _dns_cache.find(hp);
if (found == _dns_cache.end()) {
return false;
}

Expand All @@ -60,25 +66,30 @@ bool dns_resolver::get_cached_addresses(const host_port &hp, std::vector<rpc_add
error_s dns_resolver::resolve_addresses(const host_port &hp, std::vector<rpc_address> &addresses)
{
CHECK(addresses.empty(), "invalid addresses, not empty");
if (FLAGS_use_cache_resolve_hostname && get_cached_addresses(hp, addresses)) {
if (get_cached_addresses(hp, addresses)) {
return error_s::ok();
}

std::vector<rpc_address> resolved_addresses;
RETURN_NOT_OK(hp.resolve_addresses(resolved_addresses));
{
METRIC_VAR_AUTO_LATENCY(dns_resolver_resolve_by_dns_duration_ns);
RETURN_NOT_OK(hp.resolve_addresses(resolved_addresses));
}

{
utils::auto_write_lock l(_lock);
if (resolved_addresses.size() > 1) {
LOG_DEBUG(
"host_port '{}' resolves to {} different addresses {}, using the first one {}.",
hp,
resolved_addresses.size(),
fmt::join(resolved_addresses, ","),
resolved_addresses[0]);
LOG_DEBUG("host_port '{}' resolves to {} different addresses {}, only the first one {} "
"will be cached.",
hp,
resolved_addresses.size(),
fmt::join(resolved_addresses, ","),
resolved_addresses[0]);
}
if (FLAGS_use_cache_resolve_hostname) {
_dsn_cache.insert(std::make_pair(hp, resolved_addresses[0]));

utils::auto_write_lock l(_lock);
const auto it = _dns_cache.insert(std::make_pair(hp, resolved_addresses[0]));
if (it.second) {
METRIC_VAR_INCREMENT(dns_resolver_cache_size);
}
}

Expand All @@ -88,18 +99,19 @@ error_s dns_resolver::resolve_addresses(const host_port &hp, std::vector<rpc_add

rpc_address dns_resolver::resolve_address(const host_port &hp)
{
METRIC_VAR_AUTO_LATENCY(dns_resolver_resolve_duration_ns);
switch (hp.type()) {
case HOST_TYPE_GROUP: {
rpc_address addr;
auto group_address = hp.group_host_port();
addr.assign_group(group_address->name());
auto hp_group = hp.group_host_port();
addr.assign_group(hp_group->name());

for (const auto &hp : group_address->members()) {
for (const auto &hp : hp_group->members()) {
CHECK_TRUE(addr.group_address()->add(resolve_address(hp)));
}
addr.group_address()->set_update_leader_automatically(
group_address->is_update_leader_automatically());
addr.group_address()->set_leader(resolve_address(group_address->leader()));
hp_group->is_update_leader_automatically());
addr.group_address()->set_leader(resolve_address(hp_group->leader()));
return addr;
}
case HOST_TYPE_IPV4: {
Expand All @@ -116,7 +128,7 @@ rpc_address dns_resolver::resolve_address(const host_port &hp)
return addresses[0];
}
default:
return rpc_address();
return {};
}
}

Expand Down
21 changes: 15 additions & 6 deletions src/runtime/rpc/dns_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,22 @@
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "utils/errors.h"
#include "utils/metrics.h"
#include "utils/synchronize.h"

namespace dsn {

// This class provide a way to resolve host_port to rpc_address.
// Now each host_post will be resolved just once, and then cached the first rpc_address result in
// the resolved result list.
// If some host_port's rpc_address changes, you need to restart the Pegasus process to make it take
// effect.
// TODO(yingchun): Now the cache is unlimited, the cache size may be huge. Implement an expiration
// mechanism to limit the cache size and make it possible to update the resolve result.
class dns_resolver
{
public:
explicit dns_resolver() = default;

void add_item(const host_port &hp, const rpc_address &addr);
explicit dns_resolver();

// Resolve this host_port to an unique rpc_address.
rpc_address resolve_address(const host_port &hp);
Expand All @@ -45,10 +50,14 @@ class dns_resolver

error_s resolve_addresses(const host_port &hp, std::vector<rpc_address> &addresses);

error_s do_resolution(const host_port &hp, std::vector<rpc_address> &addresses);

mutable utils::rw_lock_nr _lock;
std::unordered_map<host_port, rpc_address> _dsn_cache;
// Cache the host_port resolve results, the cached rpc_address is the first one in the resolved
// list.
std::unordered_map<host_port, rpc_address> _dns_cache;

METRIC_VAR_DECLARE_gauge_int64(dns_resolver_cache_size);
METRIC_VAR_DECLARE_percentile_int64(dns_resolver_resolve_duration_ns);
METRIC_VAR_DECLARE_percentile_int64(dns_resolver_resolve_by_dns_duration_ns);
};

} // namespace dsn
3 changes: 3 additions & 0 deletions src/runtime/rpc/rpc_host_port.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ error_s host_port::resolve_addresses(std::vector<rpc_address> &addresses) const
return error_s::make(dsn::ERR_INVALID_STATE, "invalid host_port type: HOST_TYPE_GROUP");
case HOST_TYPE_IPV4:
break;
default:
CHECK(false, "");
__builtin_unreachable();
}

rpc_address rpc_addr;
Expand Down

0 comments on commit 5dbba78

Please sign in to comment.