Skip to content

Commit

Permalink
feat(fqdn): Complement the implementation of class host_port
Browse files Browse the repository at this point in the history
  • Loading branch information
GehaFearless committed Jan 9, 2024
1 parent 363d789 commit 624efc0
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 40 deletions.
2 changes: 1 addition & 1 deletion src/common/json_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ inline bool json_decode(const dsn::json::JsonObject &in, dsn::rpc_address &addre
// json serialization for rpc host_port, we use the string representation of a host_port
inline void json_encode(JsonWriter &out, const dsn::host_port &hp)
{
json_encode(out, hp.to_string());
json_encode(out, hp.to_std_string());
}
inline bool json_decode(const dsn::json::JsonObject &in, dsn::host_port &hp)
{
Expand Down
1 change: 1 addition & 0 deletions src/runtime/rpc/dns_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

#include <algorithm>
#include <memory>
#include <utility>

#include "fmt/format.h"
Expand Down
4 changes: 2 additions & 2 deletions src/runtime/rpc/group_host_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ static constexpr int kInvalidIndex = -1;
// group.group_host_port()->add(host_port("test_fqdn", 34602));
// group.group_host_port()->add(host_port("test_fqdn", 34603));
//
class rpc_group_host_port : public ref_counter
class rpc_group_host_port
{
public:
rpc_group_host_port(const char *name);
Expand Down Expand Up @@ -170,12 +170,12 @@ inline void rpc_group_host_port::leader_forward()

inline void rpc_group_host_port::set_leader(const host_port &hp)
{
CHECK_EQ_MSG(hp.type(), HOST_TYPE_IPV4, "rpc group host_port member must be ipv4");
awl_t l(_lock);
if (hp.is_invalid()) {
_leader_index = kInvalidIndex;
return;
}
CHECK_EQ_MSG(hp.type(), HOST_TYPE_IPV4, "rpc group host_port member must be ipv4");
for (int i = 0; i < _members.size(); i++) {
if (_members[i] == hp) {
_leader_index = i;
Expand Down
37 changes: 24 additions & 13 deletions src/runtime/rpc/rpc_host_port.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#include <errno.h>
#include <netdb.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/socket.h>
#include <cstring>
#include <memory>
#include <unordered_set>
#include <utility>
Expand All @@ -30,6 +30,7 @@
#include "runtime/rpc/group_host_port.h"
#include "runtime/rpc/rpc_host_port.h"
#include "utils/error_code.h"
#include "utils/fixed_size_buffer_pool.h"
#include "utils/ports.h"
#include "utils/safe_strerror_posix.h"
#include "utils/string_conv.h"
Expand Down Expand Up @@ -79,7 +80,7 @@ host_port::host_port(rpc_address addr)
_port = addr.port();
} break;
case HOST_TYPE_GROUP: {
_group_host_port = new rpc_group_host_port(addr.group_address());
_group_host_port = std::make_shared<rpc_group_host_port>(addr.group_address());
} break;
default:
break;
Expand Down Expand Up @@ -120,7 +121,7 @@ void host_port::reset()
_port = 0;
break;
case HOST_TYPE_GROUP:
group_host_port()->release_ref();
_group_host_port = nullptr;
break;
default:
break;
Expand All @@ -142,7 +143,6 @@ host_port &host_port::operator=(const host_port &other)
break;
case HOST_TYPE_GROUP:
_group_host_port = other._group_host_port;
group_host_port()->add_ref();
break;
default:
break;
Expand All @@ -151,25 +151,36 @@ host_port &host_port::operator=(const host_port &other)
return *this;
}

std::string host_port::to_string() const
std::string host_port::to_std_string() const
{
switch (type()) {
case HOST_TYPE_IPV4:
return fmt::format("{}:{}", _host, _port);
case HOST_TYPE_GROUP:
return fmt::format("address group {}", group_host_port()->name());
return fmt::format("host_port group {}", group_host_port()->name());
default:
return "invalid address";
return "invalid host_port";
}
}

// A host name has 253 ASCII characters at most. The max length of
// port(uint16_t, 65535) is 5. Thus the max length of "<host>:<port>"
// is 253 + 1(colon) + 5 = 259.
//
// Set the size to 64 * 5 = 320 to be 64-bytes aligned.
static __thread fixed_size_buffer_pool<8, 320> bf;
const char *host_port::to_string() const
{
char *p = bf.next();
std::strcpy(p, to_std_string().c_str());
return (const char *)p;
}

void host_port::assign_group(const char *name)
{
reset();
_type = HOST_TYPE_GROUP;
_group_host_port = new rpc_group_host_port(name);
// take the lifetime of rpc_uri_address, release_ref when change value or call destructor
_group_host_port->add_ref();
_group_host_port = std::make_shared<rpc_group_host_port>(name);
}

error_s host_port::resolve_addresses(std::vector<rpc_address> &addresses) const
Expand All @@ -187,7 +198,7 @@ error_s host_port::resolve_addresses(std::vector<rpc_address> &addresses) const

rpc_address rpc_addr;
// Resolve hostname like "localhost:80" or "192.168.0.1:8080".
if (rpc_addr.from_string_ipv4(this->to_string().c_str())) {
if (rpc_addr.from_string_ipv4(this->to_std_string().c_str())) {
addresses.emplace_back(rpc_addr);
return error_s::ok();
}
Expand All @@ -209,15 +220,15 @@ error_s host_port::resolve_addresses(std::vector<rpc_address> &addresses) const
sockaddr_in *addr = reinterpret_cast<sockaddr_in *>(ai->ai_addr);
addr->sin_port = htons(_port);
rpc_address rpc_addr(*addr);
LOG_INFO("resolved address {} for host_port {}", rpc_addr, to_string());
LOG_INFO("resolved address {} for host_port {}", rpc_addr, to_std_string());
if (inserted.insert(rpc_addr).second) {
result_addresses.emplace_back(rpc_addr);
}
}

if (result_addresses.empty()) {
return error_s::make(dsn::ERR_NETWORK_FAILURE,
fmt::format("can not resolve host_port {}.", to_string()));
fmt::format("can not resolve host_port {}.", to_std_string()));
}

addresses = std::move(result_addresses);
Expand Down
38 changes: 33 additions & 5 deletions src/runtime/rpc/rpc_host_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
// IWYU pragma: no_include <experimental/string_view>
#include <functional>
#include <iosfwd>
#include <memory>
#include <string>
#include <string_view>
#include <vector>
Expand Down Expand Up @@ -65,14 +66,14 @@ class host_port

bool is_invalid() const { return _type == HOST_TYPE_INVALID; }

std::string to_string() const;
std::string to_std_string() const;

friend std::ostream &operator<<(std::ostream &os, const host_port &hp)
{
return os << hp.to_string();
return os << hp.to_std_string();
}

rpc_group_host_port *group_host_port() const
std::shared_ptr<rpc_group_host_port> group_host_port() const
{
CHECK_NOTNULL(_group_host_port, "group_host_port cannot be null!");
return _group_host_port;
Expand All @@ -85,6 +86,18 @@ class host_port
// This function is used for validating the format of string like "localhost:8888".
bool from_string(const std::string &s);

// Used only for printing log, such as log_prefix(). Better to use
// to_std_string(), since using get_char_str by mistake may lead
// to some bugs, for example:
// The size of hosts are more than 8.
// std::vector<host_port> hosts(10);
// ......
// std::vector<const char *> strs;
// for (auto &e: hosts) {
// strs.push_back(e.get_char_str());
// }
const char *to_string() const;

// for serialization in thrift format
uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
Expand All @@ -93,7 +106,7 @@ class host_port
std::string _host;
uint16_t _port = 0;
dsn_host_type_t _type = HOST_TYPE_INVALID;
rpc_group_host_port *_group_host_port = nullptr;
std::shared_ptr<rpc_group_host_port> _group_host_port;
};

inline bool operator==(const host_port &hp1, const host_port &hp2)
Expand All @@ -118,6 +131,21 @@ inline bool operator==(const host_port &hp1, const host_port &hp2)

inline bool operator!=(const host_port &hp1, const host_port &hp2) { return !(hp1 == hp2); }

inline bool operator<(const host_port &hp1, const host_port &hp2)
{
if (hp1.type() != hp2.type()) {
return hp1.type() < hp2.type();
}

switch (hp1.type()) {
case HOST_TYPE_IPV4:
return hp1.host() < hp2.host() || (hp1.host() == hp2.host() && hp1.port() < hp2.port());
case HOST_TYPE_GROUP:
return hp1.group_host_port().get() < hp2.group_host_port().get();
default:
return true;
}
}
} // namespace dsn

USER_DEFINED_STRUCTURE_FORMATTER(::dsn::host_port);
Expand All @@ -132,7 +160,7 @@ struct hash<::dsn::host_port>
case HOST_TYPE_IPV4:
return std::hash<std::string>()(hp.host()) ^ std::hash<uint16_t>()(hp.port());
case HOST_TYPE_GROUP:
return std::hash<void *>()(hp.group_host_port());
return std::hash<void *>()(hp.group_host_port().get());
default:
return 0;
}
Expand Down
70 changes: 54 additions & 16 deletions src/runtime/test/host_port_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
* under the License.
*/

#include <string.h>
#include <stdint.h>
#include <memory>
#include <string>
#include <utility>
#include <vector>
Expand All @@ -41,16 +42,16 @@

namespace dsn {

TEST(host_port_test, host_port_to_string)
TEST(host_port_test, host_port_to_std_string)
{
{
host_port hp = host_port("localhost", 8080);
ASSERT_EQ(std::string("localhost:8080"), hp.to_string());
ASSERT_EQ("localhost:8080", hp.to_std_string());
}

{
host_port hp;
ASSERT_EQ(std::string("invalid address"), hp.to_string());
ASSERT_EQ("invalid host_port", hp.to_std_string());
}
}

Expand All @@ -71,6 +72,7 @@ TEST(host_port_test, host_port_build)
TEST(host_port_test, operators)
{
host_port hp("localhost", 8080);
ASSERT_STREQ("localhost:8080", hp.to_string());
ASSERT_EQ(hp, hp);

{
Expand Down Expand Up @@ -100,6 +102,9 @@ TEST(host_port_test, operators)
std::string hp_str2 = "pegasus:8080";
ASSERT_FALSE(hp4.from_string(hp_str2));
ASSERT_TRUE(hp4.is_invalid());

host_port hp5("localhost", 8081);
ASSERT_LT(hp, hp5);
}

TEST(host_port_test, rpc_group_host_port)
Expand All @@ -111,8 +116,8 @@ TEST(host_port_test, rpc_group_host_port)
host_port hp_grp;
hp_grp.assign_group("test_group");
ASSERT_EQ(HOST_TYPE_GROUP, hp_grp.type());
rpc_group_host_port *g = hp_grp.group_host_port();
ASSERT_EQ(std::string("test_group"), g->name());
const auto &g = hp_grp.group_host_port();
ASSERT_STREQ("test_group", g->name());

// invalid_hp
ASSERT_FALSE(g->remove(hp));
Expand Down Expand Up @@ -166,6 +171,40 @@ TEST(host_port_test, rpc_group_host_port)
ASSERT_FALSE(g->contains(hp2));
ASSERT_EQ(0u, g->members().size());
ASSERT_EQ(invalid_hp, g->leader());

// operator <
host_port hp_grp1;
hp_grp1.assign_group("test_group");
if (hp_grp.group_host_port().get() < hp_grp1.group_host_port().get()) {
ASSERT_LT(hp_grp, hp_grp1);
} else {
ASSERT_FALSE(hp_grp < hp_grp1);
}

// address_group -> host_port_group
rpc_address addr("127.0.0.1", 8080);
rpc_address addr2("127.0.0.1", 8081);

rpc_address addr_grp;
addr_grp.assign_group("test_group");
ASSERT_EQ(HOST_TYPE_GROUP, addr_grp.type());

auto g_addr = addr_grp.group_address();
ASSERT_STREQ("test_group", g_addr->name());

ASSERT_TRUE(g_addr->add(addr));
g_addr->set_leader(addr2);
ASSERT_EQ(addr2, g_addr->leader());
ASSERT_EQ(2, g_addr->count());

host_port hp_grp2;
hp_grp2 = host_port(addr_grp);
ASSERT_EQ(HOST_TYPE_GROUP, hp_grp2.type());

auto g_hp = hp_grp2.group_host_port();
ASSERT_STREQ("test_group", g_hp->name());
ASSERT_EQ(hp2, g_hp->leader());
ASSERT_EQ(2, g_hp->count());
}

TEST(host_port_test, transfer_rpc_address)
Expand Down Expand Up @@ -204,27 +243,26 @@ TEST(host_port_test, dns_resolver)
{
host_port hp_grp;
hp_grp.assign_group("test_group");
rpc_group_host_port *g = hp_grp.group_host_port();
auto g_hp = hp_grp.group_host_port();

host_port hp1("localhost", 8080);
ASSERT_TRUE(g->add(hp1));
ASSERT_TRUE(g_hp->add(hp1));
host_port hp2("localhost", 8081);
g->set_leader(hp2);
g_hp->set_leader(hp2);

auto addr_grp = resolver.resolve_address(hp_grp);
auto g_addr = addr_grp.group_address();

ASSERT_EQ(addr_grp.group_address()->is_update_leader_automatically(),
hp_grp.group_host_port()->is_update_leader_automatically());
ASSERT_EQ(strcmp(addr_grp.group_address()->name(), hp_grp.group_host_port()->name()), 0);
ASSERT_EQ(addr_grp.group_address()->count(), hp_grp.group_host_port()->count());
ASSERT_EQ(host_port(addr_grp.group_address()->leader()),
hp_grp.group_host_port()->leader());
ASSERT_EQ(g_addr->is_update_leader_automatically(), g_hp->is_update_leader_automatically());
ASSERT_STREQ(g_addr->name(), g_hp->name());
ASSERT_EQ(g_addr->count(), g_hp->count());
ASSERT_EQ(host_port(g_addr->leader()), g_hp->leader());
}
}

void send_and_check_host_port_by_serialize(const host_port &hp, dsn_msg_serialize_format t)
{
auto hp_str = hp.to_string();
const auto &hp_str = hp.to_std_string();
::dsn::rpc_address server("localhost", 20101);

dsn::message_ptr msg_ptr = dsn::message_ex::create_request(RPC_TEST_THRIFT_HOST_PORT_PARSER);
Expand Down
2 changes: 1 addition & 1 deletion src/runtime/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class test_client : public ::dsn::serverlet<test_client>, public ::dsn::service_
{
host_port hp;
::dsn::unmarshall(message, hp);
reply(message, hp.to_string());
reply(message, hp.to_std_string());
}

::dsn::error_code start(const std::vector<std::string> &args)
Expand Down
4 changes: 2 additions & 2 deletions src/utils/fixed_size_buffer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@

///
/// A simple buffer pool designed for efficiently formatting
/// frequently used types (like gpid, rpc_address) into string,
/// without dynamic memory allocation.
/// frequently used types (like gpid, rpc_address, host_port)
/// into string, without dynamic memory allocation.
///
/// It's not suitable to be used in multi-threaded environment,
/// unless when it's declared as thread local.
Expand Down

0 comments on commit 624efc0

Please sign in to comment.