Skip to content

Commit

Permalink
feat(FQDN): serialization format rpc_host_port
Browse files Browse the repository at this point in the history
  • Loading branch information
GehaFearless committed May 29, 2023
1 parent 86181aa commit 8d64a7f
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 1 deletion.
97 changes: 97 additions & 0 deletions src/common/serialization_helper/thrift_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,103 @@ inline uint32_t rpc_address::write(apache::thrift::protocol::TProtocol *oprot) c
}
}

inline uint32_t host_port::read(apache::thrift::protocol::TProtocol *iprot)
{
std::string host;
int16_t port;
int8_t type_enum_number;

uint32_t xfer = 0;
auto binary_proto = dynamic_cast<apache::thrift::protocol::TBinaryProtocol *>(iprot);
if (binary_proto != nullptr) {
// the protocol is binary protocol
xfer += iprot->readString(host);
xfer += iprot->readI16(port);
xfer += iprot->readByte(type_enum_number);
} else {
// the protocol is json protocol
std::string fname;
xfer += iprot->readStructBegin(fname);

int16_t fid;
::apache::thrift::protocol::TType ftype;
while (true) {
xfer += iprot->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid) {
case 1:
if (ftype == ::apache::thrift::protocol::T_STRING) {
xfer += iprot->readString(host);
} else {
xfer += iprot->skip(ftype);
}
break;
case 2:
if (ftype == ::apache::thrift::protocol::T_I16) {
xfer += iprot->readI16(port);
} else {
xfer += iprot->skip(ftype);
}
break;
case 3:
if (ftype == ::apache::thrift::protocol::T_BYTE) {
xfer += iprot->readByte(type_enum_number);
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();
}
xfer += iprot->readStructEnd();
}

_host = host;
_port = static_cast<uint16_t>(port);
_type = static_cast<dsn_host_type_t>(type_enum_number);

return xfer;
}

inline uint32_t host_port::write(apache::thrift::protocol::TProtocol *oprot) const
{
uint32_t xfer = 0;
auto binary_proto = dynamic_cast<apache::thrift::protocol::TBinaryProtocol *>(oprot);
if (binary_proto != nullptr) {
// the protocol is binary protocol
CHECK(_type == HOST_TYPE_INVALID || _type == HOST_TYPE_IPV4,
"only invalid or ipv4 can be serialized to binary");
xfer += oprot->writeString(_host);
xfer += oprot->writeI16(static_cast<int16_t>(_port));
xfer += oprot->writeByte(static_cast<int8_t>(_type));
} else {
// the protocol is json protocol
xfer += oprot->writeStructBegin("host_port");

xfer += oprot->writeFieldBegin("_host", ::apache::thrift::protocol::T_STRING, 1);
xfer += oprot->writeString(_host);
xfer += oprot->writeFieldEnd();

xfer += oprot->writeFieldBegin("_port", ::apache::thrift::protocol::T_I16, 2);
xfer += oprot->writeI16(static_cast<int16_t>(_port));
xfer += oprot->writeFieldEnd();

xfer += oprot->writeFieldBegin("_type", ::apache::thrift::protocol::T_BYTE, 3);
xfer += oprot->writeByte(static_cast<int8_t>(_type));
xfer += oprot->writeFieldEnd();

xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
}

return xfer;
}

inline uint32_t gpid::read(apache::thrift::protocol::TProtocol *iprot)
{
apache::thrift::protocol::TBinaryProtocol *binary_proto =
Expand Down
6 changes: 5 additions & 1 deletion src/runtime/rpc/rpc_host_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,15 @@ class host_port
// Trere may be multiple rpc_addresses for one host_port.
error_s resolve_addresses(std::vector<rpc_address> &addresses) const;

// for serialization in thrift format
uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;

private:
std::string _host;
uint16_t _port = 0;
dsn_host_type_t _type = HOST_TYPE_INVALID;
ref_ptr<rpc_group_host_port> _group_host_port;
rpc_group_host_port* _group_host_port = nullptr;
};

inline bool operator==(const host_port &hp1, const host_port &hp2)
Expand Down

0 comments on commit 8d64a7f

Please sign in to comment.