From d40310cb46f156f0790bbb5c399fb2fdee2fa922 Mon Sep 17 00:00:00 2001 From: GuoHao Li Date: Mon, 8 Jan 2024 16:11:36 +0800 Subject: [PATCH] feat(fqdn): Complement the implementation of class host_port --- src/runtime/rpc/dns_resolver.cpp | 1 + src/runtime/rpc/group_host_port.h | 4 +- src/runtime/rpc/rpc_host_port.cpp | 15 +++---- src/runtime/rpc/rpc_host_port.h | 22 ++++++++-- src/runtime/test/host_port_test.cpp | 66 ++++++++++++++++++++++------- src/utils/fixed_size_buffer_pool.h | 4 +- 6 files changed, 81 insertions(+), 31 deletions(-) diff --git a/src/runtime/rpc/dns_resolver.cpp b/src/runtime/rpc/dns_resolver.cpp index 0cc9a06161..8ccbdac567 100644 --- a/src/runtime/rpc/dns_resolver.cpp +++ b/src/runtime/rpc/dns_resolver.cpp @@ -18,6 +18,7 @@ */ #include +#include #include #include "fmt/format.h" diff --git a/src/runtime/rpc/group_host_port.h b/src/runtime/rpc/group_host_port.h index cf7cab19ba..1bc8989ea7 100644 --- a/src/runtime/rpc/group_host_port.h +++ b/src/runtime/rpc/group_host_port.h @@ -44,7 +44,7 @@ static constexpr int kInvalidIndex = -1; // group.group_host_port()->add(host_port("test_fqdn", 34602)); // group.group_host_port()->add(host_port("test_fqdn", 34603)); // -class rpc_group_host_port : public ref_counter +class rpc_group_host_port { public: rpc_group_host_port(const char *name); @@ -170,12 +170,12 @@ inline void rpc_group_host_port::leader_forward() inline void rpc_group_host_port::set_leader(const host_port &hp) { - CHECK_EQ_MSG(hp.type(), HOST_TYPE_IPV4, "rpc group host_port member must be ipv4"); awl_t l(_lock); if (hp.is_invalid()) { _leader_index = kInvalidIndex; return; } + CHECK_EQ_MSG(hp.type(), HOST_TYPE_IPV4, "rpc group host_port member must be ipv4"); for (int i = 0; i < _members.size(); i++) { if (_members[i] == hp) { _leader_index = i; diff --git a/src/runtime/rpc/rpc_host_port.cpp b/src/runtime/rpc/rpc_host_port.cpp index fa4d36c9c9..2da05e7b8f 100644 --- a/src/runtime/rpc/rpc_host_port.cpp +++ b/src/runtime/rpc/rpc_host_port.cpp @@ -20,8 +20,8 @@ #include #include #include -#include #include +#include #include #include #include @@ -79,7 +79,7 @@ host_port::host_port(rpc_address addr) _port = addr.port(); } break; case HOST_TYPE_GROUP: { - _group_host_port = new rpc_group_host_port(addr.group_address()); + _group_host_port = std::make_shared(addr.group_address()); } break; default: break; @@ -120,7 +120,7 @@ void host_port::reset() _port = 0; break; case HOST_TYPE_GROUP: - group_host_port()->release_ref(); + _group_host_port = nullptr; break; default: break; @@ -142,7 +142,6 @@ host_port &host_port::operator=(const host_port &other) break; case HOST_TYPE_GROUP: _group_host_port = other._group_host_port; - group_host_port()->add_ref(); break; default: break; @@ -157,9 +156,9 @@ std::string host_port::to_string() const case HOST_TYPE_IPV4: return fmt::format("{}:{}", _host, _port); case HOST_TYPE_GROUP: - return fmt::format("address group {}", group_host_port()->name()); + return fmt::format("host_port group {}", group_host_port()->name()); default: - return "invalid address"; + return "invalid host_port"; } } @@ -167,9 +166,7 @@ void host_port::assign_group(const char *name) { reset(); _type = HOST_TYPE_GROUP; - _group_host_port = new rpc_group_host_port(name); - // take the lifetime of rpc_uri_address, release_ref when change value or call destructor - _group_host_port->add_ref(); + _group_host_port = std::make_shared(name); } error_s host_port::resolve_addresses(std::vector &addresses) const diff --git a/src/runtime/rpc/rpc_host_port.h b/src/runtime/rpc/rpc_host_port.h index 763c8ac915..4cda4180bb 100644 --- a/src/runtime/rpc/rpc_host_port.h +++ b/src/runtime/rpc/rpc_host_port.h @@ -24,6 +24,7 @@ // IWYU pragma: no_include #include #include +#include #include #include #include @@ -72,7 +73,7 @@ class host_port return os << hp.to_string(); } - rpc_group_host_port *group_host_port() const + std::shared_ptr group_host_port() const { CHECK_NOTNULL(_group_host_port, "group_host_port cannot be null!"); return _group_host_port; @@ -93,7 +94,7 @@ class host_port std::string _host; uint16_t _port = 0; dsn_host_type_t _type = HOST_TYPE_INVALID; - rpc_group_host_port *_group_host_port = nullptr; + std::shared_ptr _group_host_port; }; inline bool operator==(const host_port &hp1, const host_port &hp2) @@ -118,6 +119,21 @@ inline bool operator==(const host_port &hp1, const host_port &hp2) inline bool operator!=(const host_port &hp1, const host_port &hp2) { return !(hp1 == hp2); } +inline bool operator<(const host_port &hp1, const host_port &hp2) +{ + if (hp1.type() != hp2.type()) { + return hp1.type() < hp2.type(); + } + + switch (hp1.type()) { + case HOST_TYPE_IPV4: + return hp1.host() < hp2.host() || (hp1.host() == hp2.host() && hp1.port() < hp2.port()); + case HOST_TYPE_GROUP: + return hp1.group_host_port().get() < hp2.group_host_port().get(); + default: + return true; + } +} } // namespace dsn USER_DEFINED_STRUCTURE_FORMATTER(::dsn::host_port); @@ -132,7 +148,7 @@ struct hash<::dsn::host_port> case HOST_TYPE_IPV4: return std::hash()(hp.host()) ^ std::hash()(hp.port()); case HOST_TYPE_GROUP: - return std::hash()(hp.group_host_port()); + return std::hash()(hp.group_host_port().get()); default: return 0; } diff --git a/src/runtime/test/host_port_test.cpp b/src/runtime/test/host_port_test.cpp index 0cd656a079..05d6982b2d 100644 --- a/src/runtime/test/host_port_test.cpp +++ b/src/runtime/test/host_port_test.cpp @@ -17,7 +17,7 @@ * under the License. */ -#include +#include #include #include #include @@ -45,12 +45,12 @@ TEST(host_port_test, host_port_to_string) { { host_port hp = host_port("localhost", 8080); - ASSERT_EQ(std::string("localhost:8080"), hp.to_string()); + ASSERT_EQ("localhost:8080", hp.to_string()); } { host_port hp; - ASSERT_EQ(std::string("invalid address"), hp.to_string()); + ASSERT_EQ("invalid host_port", hp.to_string()); } } @@ -100,6 +100,9 @@ TEST(host_port_test, operators) std::string hp_str2 = "pegasus:8080"; ASSERT_FALSE(hp4.from_string(hp_str2)); ASSERT_TRUE(hp4.is_invalid()); + + host_port hp5("localhost", 8081); + ASSERT_LT(hp, hp5); } TEST(host_port_test, rpc_group_host_port) @@ -111,8 +114,8 @@ TEST(host_port_test, rpc_group_host_port) host_port hp_grp; hp_grp.assign_group("test_group"); ASSERT_EQ(HOST_TYPE_GROUP, hp_grp.type()); - rpc_group_host_port *g = hp_grp.group_host_port(); - ASSERT_EQ(std::string("test_group"), g->name()); + const auto &g = hp_grp.group_host_port(); + ASSERT_STREQ("test_group", g->name()); // invalid_hp ASSERT_FALSE(g->remove(hp)); @@ -166,6 +169,40 @@ TEST(host_port_test, rpc_group_host_port) ASSERT_FALSE(g->contains(hp2)); ASSERT_EQ(0u, g->members().size()); ASSERT_EQ(invalid_hp, g->leader()); + + // operator < + host_port hp_grp1; + hp_grp1.assign_group("test_group"); + if (hp_grp.group_host_port().get() < hp_grp1.group_host_port().get()) { + ASSERT_LT(hp_grp, hp_grp1); + } else { + ASSERT_FALSE(hp_grp < hp_grp1); + } + + // address_group -> host_port_group + rpc_address addr("127.0.0.1", 8080); + rpc_address addr2("127.0.0.1", 8081); + + rpc_address addr_grp; + addr_grp.assign_group("test_group"); + ASSERT_EQ(HOST_TYPE_GROUP, addr_grp.type()); + + auto g_addr = addr_grp.group_address(); + ASSERT_STREQ("test_group", g_addr->name()); + + ASSERT_TRUE(g_addr->add(addr)); + g_addr->set_leader(addr2); + ASSERT_EQ(addr2, g_addr->leader()); + ASSERT_EQ(2, g_addr->count()); + + host_port hp_grp2; + hp_grp2 = host_port(addr_grp); + ASSERT_EQ(HOST_TYPE_GROUP, hp_grp2.type()); + + auto g_hp = hp_grp2.group_host_port(); + ASSERT_STREQ("test_group", g_hp->name()); + ASSERT_EQ(hp2, g_hp->leader()); + ASSERT_EQ(2, g_hp->count()); } TEST(host_port_test, transfer_rpc_address) @@ -204,27 +241,26 @@ TEST(host_port_test, dns_resolver) { host_port hp_grp; hp_grp.assign_group("test_group"); - rpc_group_host_port *g = hp_grp.group_host_port(); + auto g_hp = hp_grp.group_host_port(); host_port hp1("localhost", 8080); - ASSERT_TRUE(g->add(hp1)); + ASSERT_TRUE(g_hp->add(hp1)); host_port hp2("localhost", 8081); - g->set_leader(hp2); + g_hp->set_leader(hp2); auto addr_grp = resolver.resolve_address(hp_grp); + auto g_addr = addr_grp.group_address(); - ASSERT_EQ(addr_grp.group_address()->is_update_leader_automatically(), - hp_grp.group_host_port()->is_update_leader_automatically()); - ASSERT_EQ(strcmp(addr_grp.group_address()->name(), hp_grp.group_host_port()->name()), 0); - ASSERT_EQ(addr_grp.group_address()->count(), hp_grp.group_host_port()->count()); - ASSERT_EQ(host_port(addr_grp.group_address()->leader()), - hp_grp.group_host_port()->leader()); + ASSERT_EQ(g_addr->is_update_leader_automatically(), g_hp->is_update_leader_automatically()); + ASSERT_STREQ(g_addr->name(), g_hp->name()); + ASSERT_EQ(g_addr->count(), g_hp->count()); + ASSERT_EQ(host_port(g_addr->leader()), g_hp->leader()); } } void send_and_check_host_port_by_serialize(const host_port &hp, dsn_msg_serialize_format t) { - auto hp_str = hp.to_string(); + const auto &hp_str = hp.to_string(); ::dsn::rpc_address server("localhost", 20101); dsn::message_ptr msg_ptr = dsn::message_ex::create_request(RPC_TEST_THRIFT_HOST_PORT_PARSER); diff --git a/src/utils/fixed_size_buffer_pool.h b/src/utils/fixed_size_buffer_pool.h index ab0ed4f4d7..5e556ad26d 100644 --- a/src/utils/fixed_size_buffer_pool.h +++ b/src/utils/fixed_size_buffer_pool.h @@ -28,8 +28,8 @@ /// /// A simple buffer pool designed for efficiently formatting -/// frequently used types (like gpid, rpc_address) into string, -/// without dynamic memory allocation. +/// frequently used types (like gpid, rpc_address, host_port) +/// into string, without dynamic memory allocation. /// /// It's not suitable to be used in multi-threaded environment, /// unless when it's declared as thread local.