Skip to content

Commit

Permalink
feat(FQDN): serialization format rpc_host_port (apache#1496)
Browse files Browse the repository at this point in the history
issue: apache#1495

Implement thrift protocol functon read and write for class rpc_host_port.
Both about binary protocol and json.
  • Loading branch information
GehaFearless authored Jun 9, 2023
1 parent f3d1388 commit 4ae3999
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 2 deletions.
5 changes: 5 additions & 0 deletions idl/dsn.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ struct rpc_address
{
}

// place holder
struct host_port
{
}

// place holder
struct blob
{
Expand Down
100 changes: 100 additions & 0 deletions src/common/serialization_helper/thrift_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#pragma once

#include "runtime/tool_api.h"
#include "runtime/rpc/rpc_host_port.h"
#include "runtime/rpc/rpc_stream.h"

#include <thrift/Thrift.h>
Expand Down Expand Up @@ -280,6 +281,105 @@ inline uint32_t rpc_address::write(apache::thrift::protocol::TProtocol *oprot) c
}
}

inline uint32_t host_port::read(apache::thrift::protocol::TProtocol *iprot)
{
std::string host;
int16_t port;
int8_t type_enum_number;

uint32_t xfer = 0;
auto binary_proto = dynamic_cast<apache::thrift::protocol::TBinaryProtocol *>(iprot);
if (binary_proto != nullptr) {
// the protocol is binary protocol
xfer += iprot->readString(host);
xfer += iprot->readI16(port);
xfer += iprot->readByte(type_enum_number);
} else {
// the protocol is json protocol
std::string fname;
xfer += iprot->readStructBegin(fname);

int16_t fid;
::apache::thrift::protocol::TType ftype;
while (true) {
xfer += iprot->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid) {
case 1:
if (ftype == ::apache::thrift::protocol::T_STRING) {
xfer += iprot->readString(host);
} else {
xfer += iprot->skip(ftype);
}
break;
case 2:
if (ftype == ::apache::thrift::protocol::T_I16) {
xfer += iprot->readI16(port);
} else {
xfer += iprot->skip(ftype);
}
break;
case 3:
if (ftype == ::apache::thrift::protocol::T_BYTE) {
xfer += iprot->readByte(type_enum_number);
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();
}
xfer += iprot->readStructEnd();
}

_host = host;
_port = static_cast<uint16_t>(port);
_type = static_cast<dsn_host_type_t>(type_enum_number);
CHECK(_type == HOST_TYPE_INVALID || _type == HOST_TYPE_IPV4,
"only invalid or ipv4 can be deserialized.");

return xfer;
}

inline uint32_t host_port::write(apache::thrift::protocol::TProtocol *oprot) const
{
CHECK(_type == HOST_TYPE_INVALID || _type == HOST_TYPE_IPV4,
"only invalid or ipv4 can be serialized.");
uint32_t xfer = 0;
auto binary_proto = dynamic_cast<apache::thrift::protocol::TBinaryProtocol *>(oprot);
if (binary_proto != nullptr) {
// the protocol is binary protocol
xfer += oprot->writeString(_host);
xfer += oprot->writeI16(static_cast<int16_t>(_port));
xfer += oprot->writeByte(static_cast<int8_t>(_type));
} else {
// the protocol is json protocol
xfer += oprot->writeStructBegin("host_port");

xfer += oprot->writeFieldBegin("host", ::apache::thrift::protocol::T_STRING, 1);
xfer += oprot->writeString(_host);
xfer += oprot->writeFieldEnd();

xfer += oprot->writeFieldBegin("port", ::apache::thrift::protocol::T_I16, 2);
xfer += oprot->writeI16(static_cast<int16_t>(_port));
xfer += oprot->writeFieldEnd();

xfer += oprot->writeFieldBegin("type", ::apache::thrift::protocol::T_BYTE, 3);
xfer += oprot->writeByte(static_cast<int8_t>(_type));
xfer += oprot->writeFieldEnd();

xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
}

return xfer;
}

inline uint32_t gpid::read(apache::thrift::protocol::TProtocol *iprot)
{
apache::thrift::protocol::TBinaryProtocol *binary_proto =
Expand Down
15 changes: 13 additions & 2 deletions src/runtime/rpc/rpc_host_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,17 @@
#include <vector>

#include "runtime/rpc/rpc_address.h"
#include "utils/autoref_ptr.h"
#include "utils/errors.h"
#include "utils/fmt_logging.h"

namespace apache {
namespace thrift {
namespace protocol {
class TProtocol;
} // namespace protocol
} // namespace thrift
} // namespace apache

namespace dsn {

class rpc_group_host_port;
Expand Down Expand Up @@ -74,11 +81,15 @@ class host_port
// Trere may be multiple rpc_addresses for one host_port.
error_s resolve_addresses(std::vector<rpc_address> &addresses) const;

// for serialization in thrift format
uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;

private:
std::string _host;
uint16_t _port = 0;
dsn_host_type_t _type = HOST_TYPE_INVALID;
ref_ptr<rpc_group_host_port> _group_host_port;
rpc_group_host_port *_group_host_port = nullptr;
};

inline bool operator==(const host_port &hp1, const host_port &hp2)
Expand Down
37 changes: 37 additions & 0 deletions src/runtime/test/host_port_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,22 @@
#include <gtest/gtest.h>
#include <string.h>
#include <string>
#include <utility>
#include <vector>

#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/group_address.h"
#include "runtime/rpc/group_host_port.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "runtime/rpc/rpc_message.h"
#include "runtime/rpc/serialization.h"
#include "runtime/task/async_calls.h"
#include "runtime/task/task.h"
#include "runtime/task/task_spec.h"
#include "runtime/task/task_tracker.h"
#include "test_utils.h"
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/errors.h"

Expand Down Expand Up @@ -202,4 +211,32 @@ TEST(host_port_test, dns_resolver)
}
}

void send_and_check_host_port_by_serialize(const host_port &hp, dsn_msg_serialize_format t)
{
auto hp_str = hp.to_string();
::dsn::rpc_address server("localhost", 20101);

dsn::message_ptr msg_ptr = dsn::message_ex::create_request(RPC_TEST_THRIFT_HOST_PORT_PARSER);
msg_ptr->header->context.u.serialize_format = t;

::dsn::marshall(msg_ptr.get(), hp);

dsn::task_tracker tracker;
rpc::call(server, msg_ptr.get(), &tracker, [hp_str](error_code ec, std::string &&resp) {
ASSERT_EQ(ERR_OK, ec);
ASSERT_EQ(resp, hp_str);
})->wait();
}

TEST(host_port_test, thrift_parser)
{
host_port hp1 = host_port("localhost", 8080);
send_and_check_host_port_by_serialize(hp1, DSF_THRIFT_BINARY);
send_and_check_host_port_by_serialize(hp1, DSF_THRIFT_JSON);

host_port hp2 = host_port("localhost", 1010);
send_and_check_host_port_by_serialize(hp2, DSF_THRIFT_BINARY);
send_and_check_host_port_by_serialize(hp2, DSF_THRIFT_JSON);
}

} // namespace dsn
14 changes: 14 additions & 0 deletions src/runtime/test/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "runtime/serverlet.h"
#include "runtime/service_app.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "runtime/task/task.h"
#include "runtime/task/task_worker.h"
#include <gtest/gtest.h>
Expand All @@ -69,6 +70,9 @@ DEFINE_TASK_CODE_RPC(RPC_TEST_HASH3, TASK_PRIORITY_COMMON, THREAD_POOL_TEST_SERV
DEFINE_TASK_CODE_RPC(RPC_TEST_HASH4, TASK_PRIORITY_COMMON, THREAD_POOL_TEST_SERVER)

DEFINE_TASK_CODE_RPC(RPC_TEST_STRING_COMMAND, TASK_PRIORITY_COMMON, THREAD_POOL_TEST_SERVER)
DEFINE_TASK_CODE_RPC(RPC_TEST_THRIFT_HOST_PORT_PARSER,
TASK_PRIORITY_COMMON,
THREAD_POOL_TEST_SERVER)

extern int g_test_count;
extern int g_test_ret;
Expand Down Expand Up @@ -121,6 +125,13 @@ class test_client : public ::dsn::serverlet<test_client>, public ::dsn::service_
}
}

void on_rpc_host_port_test(dsn::message_ex *message)
{
host_port hp;
::dsn::unmarshall(message, hp);
reply(message, hp.to_string());
}

::dsn::error_code start(const std::vector<std::string> &args)
{
// server
Expand All @@ -135,6 +146,9 @@ class test_client : public ::dsn::serverlet<test_client>, public ::dsn::service_
register_rpc_handler(RPC_TEST_STRING_COMMAND,
"rpc.test.string.command",
&test_client::on_rpc_string_test);
register_rpc_handler(RPC_TEST_THRIFT_HOST_PORT_PARSER,
"rpc.test.host_port",
&test_client::on_rpc_host_port_test);
}

// client
Expand Down

0 comments on commit 4ae3999

Please sign in to comment.