diff --git a/src/common/serialization_helper/thrift_helper.h b/src/common/serialization_helper/thrift_helper.h index 4afcc4cc25..dbe7566f3e 100644 --- a/src/common/serialization_helper/thrift_helper.h +++ b/src/common/serialization_helper/thrift_helper.h @@ -280,6 +280,103 @@ inline uint32_t rpc_address::write(apache::thrift::protocol::TProtocol *oprot) c } } +inline uint32_t host_port::read(apache::thrift::protocol::TProtocol *iprot) +{ + std::string host; + int16_t port; + int8_t type_enum_number; + + uint32_t xfer = 0; + auto binary_proto = dynamic_cast(iprot); + if (binary_proto != nullptr) { + // the protocol is binary protocol + xfer += iprot->readString(host); + xfer += iprot->readI16(port); + xfer += iprot->readByte(type_enum_number); + } else { + // the protocol is json protocol + std::string fname; + xfer += iprot->readStructBegin(fname); + + int16_t fid; + ::apache::thrift::protocol::TType ftype; + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(host); + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_I16) { + xfer += iprot->readI16(port); + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == ::apache::thrift::protocol::T_BYTE) { + xfer += iprot->readByte(type_enum_number); + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + xfer += iprot->readStructEnd(); + } + + _host = host; + _port = static_cast(port); + _type = static_cast(type_enum_number); + + return xfer; +} + +inline uint32_t host_port::write(apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + auto binary_proto = dynamic_cast(oprot); + if (binary_proto != nullptr) { + // the protocol is binary protocol + CHECK(_type == HOST_TYPE_INVALID || _type == HOST_TYPE_IPV4, + "only invalid or ipv4 can be serialized to binary"); + xfer += oprot->writeString(_host); + xfer += oprot->writeI16(static_cast(_port)); + xfer += oprot->writeByte(static_cast(_type)); + } else { + // the protocol is json protocol + xfer += oprot->writeStructBegin("host_port"); + + xfer += oprot->writeFieldBegin("_host", ::apache::thrift::protocol::T_STRING, 1); + xfer += oprot->writeString(_host); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("_port", ::apache::thrift::protocol::T_I16, 2); + xfer += oprot->writeI16(static_cast(_port)); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("_type", ::apache::thrift::protocol::T_BYTE, 3); + xfer += oprot->writeByte(static_cast(_type)); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + } + + return xfer; +} + inline uint32_t gpid::read(apache::thrift::protocol::TProtocol *iprot) { apache::thrift::protocol::TBinaryProtocol *binary_proto = diff --git a/src/runtime/rpc/rpc_host_port.h b/src/runtime/rpc/rpc_host_port.h index 033ba089cb..8f212aa169 100644 --- a/src/runtime/rpc/rpc_host_port.h +++ b/src/runtime/rpc/rpc_host_port.h @@ -74,11 +74,15 @@ class host_port // Trere may be multiple rpc_addresses for one host_port. error_s resolve_addresses(std::vector &addresses) const; + // for serialization in thrift format + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + private: std::string _host; uint16_t _port = 0; dsn_host_type_t _type = HOST_TYPE_INVALID; - ref_ptr _group_host_port; + rpc_group_host_port* _group_host_port = nullptr; }; inline bool operator==(const host_port &hp1, const host_port &hp2)