Skip to content

Commit

Permalink
feat(fqdn): Complement the implementation of class host_port
Browse files Browse the repository at this point in the history
  • Loading branch information
GehaFearless committed Jan 10, 2024
1 parent 363d789 commit d40310c
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 31 deletions.
1 change: 1 addition & 0 deletions src/runtime/rpc/dns_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

#include <algorithm>
#include <memory>
#include <utility>

#include "fmt/format.h"
Expand Down
4 changes: 2 additions & 2 deletions src/runtime/rpc/group_host_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ static constexpr int kInvalidIndex = -1;
// group.group_host_port()->add(host_port("test_fqdn", 34602));
// group.group_host_port()->add(host_port("test_fqdn", 34603));
//
class rpc_group_host_port : public ref_counter
class rpc_group_host_port
{
public:
rpc_group_host_port(const char *name);
Expand Down Expand Up @@ -170,12 +170,12 @@ inline void rpc_group_host_port::leader_forward()

inline void rpc_group_host_port::set_leader(const host_port &hp)
{
CHECK_EQ_MSG(hp.type(), HOST_TYPE_IPV4, "rpc group host_port member must be ipv4");
awl_t l(_lock);
if (hp.is_invalid()) {
_leader_index = kInvalidIndex;
return;
}
CHECK_EQ_MSG(hp.type(), HOST_TYPE_IPV4, "rpc group host_port member must be ipv4");
for (int i = 0; i < _members.size(); i++) {
if (_members[i] == hp) {
_leader_index = i;
Expand Down
15 changes: 6 additions & 9 deletions src/runtime/rpc/rpc_host_port.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#include <errno.h>
#include <netdb.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/socket.h>
#include <cstring>
#include <memory>
#include <unordered_set>
#include <utility>
Expand Down Expand Up @@ -79,7 +79,7 @@ host_port::host_port(rpc_address addr)
_port = addr.port();
} break;
case HOST_TYPE_GROUP: {
_group_host_port = new rpc_group_host_port(addr.group_address());
_group_host_port = std::make_shared<rpc_group_host_port>(addr.group_address());
} break;
default:
break;
Expand Down Expand Up @@ -120,7 +120,7 @@ void host_port::reset()
_port = 0;
break;
case HOST_TYPE_GROUP:
group_host_port()->release_ref();
_group_host_port = nullptr;
break;
default:
break;
Expand All @@ -142,7 +142,6 @@ host_port &host_port::operator=(const host_port &other)
break;
case HOST_TYPE_GROUP:
_group_host_port = other._group_host_port;
group_host_port()->add_ref();
break;
default:
break;
Expand All @@ -157,19 +156,17 @@ std::string host_port::to_string() const
case HOST_TYPE_IPV4:
return fmt::format("{}:{}", _host, _port);
case HOST_TYPE_GROUP:
return fmt::format("address group {}", group_host_port()->name());
return fmt::format("host_port group {}", group_host_port()->name());
default:
return "invalid address";
return "invalid host_port";
}
}

void host_port::assign_group(const char *name)
{
reset();
_type = HOST_TYPE_GROUP;
_group_host_port = new rpc_group_host_port(name);
// take the lifetime of rpc_uri_address, release_ref when change value or call destructor
_group_host_port->add_ref();
_group_host_port = std::make_shared<rpc_group_host_port>(name);
}

error_s host_port::resolve_addresses(std::vector<rpc_address> &addresses) const
Expand Down
22 changes: 19 additions & 3 deletions src/runtime/rpc/rpc_host_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
// IWYU pragma: no_include <experimental/string_view>
#include <functional>
#include <iosfwd>
#include <memory>
#include <string>
#include <string_view>
#include <vector>
Expand Down Expand Up @@ -72,7 +73,7 @@ class host_port
return os << hp.to_string();
}

rpc_group_host_port *group_host_port() const
std::shared_ptr<rpc_group_host_port> group_host_port() const
{
CHECK_NOTNULL(_group_host_port, "group_host_port cannot be null!");
return _group_host_port;
Expand All @@ -93,7 +94,7 @@ class host_port
std::string _host;
uint16_t _port = 0;
dsn_host_type_t _type = HOST_TYPE_INVALID;
rpc_group_host_port *_group_host_port = nullptr;
std::shared_ptr<rpc_group_host_port> _group_host_port;
};

inline bool operator==(const host_port &hp1, const host_port &hp2)
Expand All @@ -118,6 +119,21 @@ inline bool operator==(const host_port &hp1, const host_port &hp2)

inline bool operator!=(const host_port &hp1, const host_port &hp2) { return !(hp1 == hp2); }

inline bool operator<(const host_port &hp1, const host_port &hp2)
{
if (hp1.type() != hp2.type()) {
return hp1.type() < hp2.type();
}

switch (hp1.type()) {
case HOST_TYPE_IPV4:
return hp1.host() < hp2.host() || (hp1.host() == hp2.host() && hp1.port() < hp2.port());
case HOST_TYPE_GROUP:
return hp1.group_host_port().get() < hp2.group_host_port().get();
default:
return true;
}
}
} // namespace dsn

USER_DEFINED_STRUCTURE_FORMATTER(::dsn::host_port);
Expand All @@ -132,7 +148,7 @@ struct hash<::dsn::host_port>
case HOST_TYPE_IPV4:
return std::hash<std::string>()(hp.host()) ^ std::hash<uint16_t>()(hp.port());
case HOST_TYPE_GROUP:
return std::hash<void *>()(hp.group_host_port());
return std::hash<void *>()(hp.group_host_port().get());
default:
return 0;
}
Expand Down
66 changes: 51 additions & 15 deletions src/runtime/test/host_port_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

#include <string.h>
#include <memory>
#include <string>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -45,12 +45,12 @@ TEST(host_port_test, host_port_to_string)
{
{
host_port hp = host_port("localhost", 8080);
ASSERT_EQ(std::string("localhost:8080"), hp.to_string());
ASSERT_EQ("localhost:8080", hp.to_string());
}

{
host_port hp;
ASSERT_EQ(std::string("invalid address"), hp.to_string());
ASSERT_EQ("invalid host_port", hp.to_string());
}
}

Expand Down Expand Up @@ -100,6 +100,9 @@ TEST(host_port_test, operators)
std::string hp_str2 = "pegasus:8080";
ASSERT_FALSE(hp4.from_string(hp_str2));
ASSERT_TRUE(hp4.is_invalid());

host_port hp5("localhost", 8081);
ASSERT_LT(hp, hp5);
}

TEST(host_port_test, rpc_group_host_port)
Expand All @@ -111,8 +114,8 @@ TEST(host_port_test, rpc_group_host_port)
host_port hp_grp;
hp_grp.assign_group("test_group");
ASSERT_EQ(HOST_TYPE_GROUP, hp_grp.type());
rpc_group_host_port *g = hp_grp.group_host_port();
ASSERT_EQ(std::string("test_group"), g->name());
const auto &g = hp_grp.group_host_port();
ASSERT_STREQ("test_group", g->name());

// invalid_hp
ASSERT_FALSE(g->remove(hp));
Expand Down Expand Up @@ -166,6 +169,40 @@ TEST(host_port_test, rpc_group_host_port)
ASSERT_FALSE(g->contains(hp2));
ASSERT_EQ(0u, g->members().size());
ASSERT_EQ(invalid_hp, g->leader());

// operator <
host_port hp_grp1;
hp_grp1.assign_group("test_group");
if (hp_grp.group_host_port().get() < hp_grp1.group_host_port().get()) {
ASSERT_LT(hp_grp, hp_grp1);
} else {
ASSERT_FALSE(hp_grp < hp_grp1);
}

// address_group -> host_port_group
rpc_address addr("127.0.0.1", 8080);
rpc_address addr2("127.0.0.1", 8081);

rpc_address addr_grp;
addr_grp.assign_group("test_group");
ASSERT_EQ(HOST_TYPE_GROUP, addr_grp.type());

auto g_addr = addr_grp.group_address();
ASSERT_STREQ("test_group", g_addr->name());

ASSERT_TRUE(g_addr->add(addr));
g_addr->set_leader(addr2);
ASSERT_EQ(addr2, g_addr->leader());
ASSERT_EQ(2, g_addr->count());

host_port hp_grp2;
hp_grp2 = host_port(addr_grp);
ASSERT_EQ(HOST_TYPE_GROUP, hp_grp2.type());

auto g_hp = hp_grp2.group_host_port();
ASSERT_STREQ("test_group", g_hp->name());
ASSERT_EQ(hp2, g_hp->leader());
ASSERT_EQ(2, g_hp->count());
}

TEST(host_port_test, transfer_rpc_address)
Expand Down Expand Up @@ -204,27 +241,26 @@ TEST(host_port_test, dns_resolver)
{
host_port hp_grp;
hp_grp.assign_group("test_group");
rpc_group_host_port *g = hp_grp.group_host_port();
auto g_hp = hp_grp.group_host_port();

host_port hp1("localhost", 8080);
ASSERT_TRUE(g->add(hp1));
ASSERT_TRUE(g_hp->add(hp1));
host_port hp2("localhost", 8081);
g->set_leader(hp2);
g_hp->set_leader(hp2);

auto addr_grp = resolver.resolve_address(hp_grp);
auto g_addr = addr_grp.group_address();

ASSERT_EQ(addr_grp.group_address()->is_update_leader_automatically(),
hp_grp.group_host_port()->is_update_leader_automatically());
ASSERT_EQ(strcmp(addr_grp.group_address()->name(), hp_grp.group_host_port()->name()), 0);
ASSERT_EQ(addr_grp.group_address()->count(), hp_grp.group_host_port()->count());
ASSERT_EQ(host_port(addr_grp.group_address()->leader()),
hp_grp.group_host_port()->leader());
ASSERT_EQ(g_addr->is_update_leader_automatically(), g_hp->is_update_leader_automatically());
ASSERT_STREQ(g_addr->name(), g_hp->name());
ASSERT_EQ(g_addr->count(), g_hp->count());
ASSERT_EQ(host_port(g_addr->leader()), g_hp->leader());
}
}

void send_and_check_host_port_by_serialize(const host_port &hp, dsn_msg_serialize_format t)
{
auto hp_str = hp.to_string();
const auto &hp_str = hp.to_string();
::dsn::rpc_address server("localhost", 20101);

dsn::message_ptr msg_ptr = dsn::message_ex::create_request(RPC_TEST_THRIFT_HOST_PORT_PARSER);
Expand Down
4 changes: 2 additions & 2 deletions src/utils/fixed_size_buffer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@

///
/// A simple buffer pool designed for efficiently formatting
/// frequently used types (like gpid, rpc_address) into string,
/// without dynamic memory allocation.
/// frequently used types (like gpid, rpc_address, host_port)
/// into string, without dynamic memory allocation.
///
/// It's not suitable to be used in multi-threaded environment,
/// unless when it's declared as thread local.
Expand Down

0 comments on commit d40310c

Please sign in to comment.