Skip to content

Commit

Permalink
thrift: framed/unframed transport, binary/compact protocol (envoyprox…
Browse files Browse the repository at this point in the history
…y#3509)

Provides Thrift protocol primitives, with tests, that will be used to create Thrift protocol filters in a subsequent PR. Implements the Thrift framed and unframed transports as well as an "auto" transport that detects the transport based on the first few bytes in a Buffer. Similarly, the Thrift binary and compact protocols are implemented along with an "auto" protocol.

Signed-off-by: <Stephan Zuercher [email protected]>

Risk Level: Low - no filters currently make use of these primitives
Testing: unit tests, manual integration testing with a work-in-progress filter in a private branch
Docs Changes: n/a
Release Notes: n/a
Relates To: envoyproxy#2247
  • Loading branch information
zuercher authored Jun 7, 2018
1 parent caea847 commit 2c37930
Show file tree
Hide file tree
Showing 23 changed files with 4,894 additions and 0 deletions.
6 changes: 6 additions & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# TODO(zuercher): determine how we want to deal deal with auto-assignment
# By default, @envoyproxy/maintainers own everything.
#* @envoyproxy/maintainers

# thrift_proxy extension
/*/extensions/filters/network/thrift_proxy @zuercher @brian-pane
9 changes: 9 additions & 0 deletions source/common/common/byte_order.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,20 @@

#include <libkern/OSByteOrder.h>

#define htole16(x) OSSwapHostToLittleInt16((x))
#define htole32(x) OSSwapHostToLittleInt32((x))
#define htole64(x) OSSwapHostToLittleInt64((x))
#define le16toh(x) OSSwapLittleToHostInt16((x))
#define le32toh(x) OSSwapLittleToHostInt32((x))
#define le64toh(x) OSSwapLittleToHostInt64((x))

#define htobe16(x) OSSwapHostToBigInt16((x))
#define htobe32(x) OSSwapHostToBigInt32((x))
#define htobe64(x) OSSwapHostToBigInt64((x))
#define be16toh(x) OSSwapBigToHostInt16((x))
#define be32toh(x) OSSwapBigToHostInt32((x))
#define be64toh(x) OSSwapBigToHostInt64((x))

#else

#include <endian.h>
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/extensions_build_config.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ EXTENSIONS = {
"envoy.filters.network.redis_proxy": "//source/extensions/filters/network/redis_proxy:config",
"envoy.filters.network.ratelimit": "//source/extensions/filters/network/ratelimit:config",
"envoy.filters.network.tcp_proxy": "//source/extensions/filters/network/tcp_proxy:config",
# TODO(zuercher): switch to config target once a filter exists
"envoy.filters.network.thrift_proxy": "//source/extensions/filters/network/thrift_proxy:transport_lib",

#
# Stat sinks
Expand Down
52 changes: 52 additions & 0 deletions source/extensions/filters/network/thrift_proxy/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
licenses(["notice"]) # Apache 2

load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_package",
)

envoy_package()

envoy_cc_library(
name = "buffer_helper_lib",
srcs = ["buffer_helper.cc"],
hdrs = ["buffer_helper.h"],
deps = [
"//source/common/buffer:buffer_lib",
"//source/common/common:assert_lib",
"//source/common/common:byte_order_lib",
],
)

envoy_cc_library(
name = "protocol_lib",
srcs = [
"binary_protocol.cc",
"compact_protocol.cc",
"protocol.cc",
],
hdrs = [
"binary_protocol.h",
"compact_protocol.h",
"protocol.h",
],
external_deps = ["abseil_optional"],
deps = [
":buffer_helper_lib",
"//source/common/singleton:const_singleton",
],
)

envoy_cc_library(
name = "transport_lib",
srcs = ["transport.cc"],
hdrs = ["transport.h"],
deps = [
":buffer_helper_lib",
":protocol_lib",
"//source/common/common:assert_lib",
"//source/common/common:utility_lib",
"//source/common/singleton:const_singleton",
],
)
300 changes: 300 additions & 0 deletions source/extensions/filters/network/thrift_proxy/binary_protocol.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
#include "extensions/filters/network/thrift_proxy/binary_protocol.h"

#include "envoy/common/exception.h"

#include "common/common/assert.h"
#include "common/common/fmt.h"
#include "common/common/macros.h"

#include "extensions/filters/network/thrift_proxy/buffer_helper.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace ThriftProxy {

const uint16_t BinaryProtocolImpl::Magic = 0x8001;

bool BinaryProtocolImpl::readMessageBegin(Buffer::Instance& buffer, std::string& name,
MessageType& msg_type, int32_t& seq_id) {
// Minimum message length:
// version: 2 bytes +
// unused: 1 byte +
// msg type: 1 byte +
// name len: 4 bytes +
// name: 0 bytes +
// seq id: 4 bytes
if (buffer.length() < 12) {
return false;
}

uint16_t version = BufferHelper::peekU16(buffer);
if (version != Magic) {
throw EnvoyException(
fmt::format("invalid binary protocol version 0x{:04x} != 0x{:04x}", version, Magic));
}

// The byte at offset 2 is unused and ignored.

MessageType type = static_cast<MessageType>(BufferHelper::peekI8(buffer, 3));
if (type < MessageType::Call || type > MessageType::LastMessageType) {
throw EnvoyException(
fmt::format("invalid binary protocol message type {}", static_cast<int8_t>(type)));
}

uint32_t name_len = BufferHelper::peekU32(buffer, 4);
if (buffer.length() < name_len + 12) {
return false;
}

buffer.drain(8);

if (name_len > 0) {
name.assign(std::string(static_cast<char*>(buffer.linearize(name_len)), name_len));
buffer.drain(name_len);
} else {
name.clear();
}
msg_type = type;
seq_id = BufferHelper::drainI32(buffer);

return true;
}

bool BinaryProtocolImpl::readMessageEnd(Buffer::Instance& buffer) {
UNREFERENCED_PARAMETER(buffer);
return true;
}

bool BinaryProtocolImpl::readStructBegin(Buffer::Instance& buffer, std::string& name) {
UNREFERENCED_PARAMETER(buffer);
name.clear(); // binary protocol does not transmit struct names
return true;
}

bool BinaryProtocolImpl::readStructEnd(Buffer::Instance& buffer) {
UNREFERENCED_PARAMETER(buffer);
return true;
}

bool BinaryProtocolImpl::readFieldBegin(Buffer::Instance& buffer, std::string& name,
FieldType& field_type, int16_t& field_id) {
// FieldType::Stop is encoded as 1 byte.
if (buffer.length() < 1) {
return false;
}

FieldType type = static_cast<FieldType>(BufferHelper::peekI8(buffer));
if (type == FieldType::Stop) {
field_id = 0;
buffer.drain(1);
} else {
// FieldType followed by 2 bytes of field id
if (buffer.length() < 3) {
return false;
}
field_id = BufferHelper::peekI16(buffer, 1);
buffer.drain(3);
}

name.clear(); // binary protocol does not transmit field names
field_type = type;

return true;
}

bool BinaryProtocolImpl::readFieldEnd(Buffer::Instance& buffer) {
UNREFERENCED_PARAMETER(buffer);
return true;
}

bool BinaryProtocolImpl::readMapBegin(Buffer::Instance& buffer, FieldType& key_type,
FieldType& value_type, uint32_t& size) {
// Minimum length:
// key type: 1 byte +
// value type: 1 byte +
// map size: 4 bytes
if (buffer.length() < 6) {
return false;
}

FieldType ktype = static_cast<FieldType>(BufferHelper::peekI8(buffer, 0));
FieldType vtype = static_cast<FieldType>(BufferHelper::peekI8(buffer, 1));
int32_t s = BufferHelper::peekI32(buffer, 2);
if (s < 0) {
throw EnvoyException(fmt::format("negative binary protocol map size {}", s));
}

buffer.drain(6);

key_type = ktype;
value_type = vtype;
size = static_cast<uint32_t>(s);

return true;
}

bool BinaryProtocolImpl::readMapEnd(Buffer::Instance& buffer) {
UNREFERENCED_PARAMETER(buffer);
return true;
}

bool BinaryProtocolImpl::readListBegin(Buffer::Instance& buffer, FieldType& elem_type,
uint32_t& size) {
// Minimum length:
// elem type: 1 byte +
// map size: 4 bytes
if (buffer.length() < 5) {
return false;
}

FieldType type = static_cast<FieldType>(BufferHelper::peekI8(buffer));
int32_t s = BufferHelper::peekI32(buffer, 1);
if (s < 0) {
throw EnvoyException(fmt::format("negative binary protocol list/set size {}", s));
}
buffer.drain(5);

elem_type = type;
size = static_cast<uint32_t>(s);

return true;
}

bool BinaryProtocolImpl::readListEnd(Buffer::Instance& buffer) {
UNREFERENCED_PARAMETER(buffer);
return true;
}

bool BinaryProtocolImpl::readSetBegin(Buffer::Instance& buffer, FieldType& elem_type,
uint32_t& size) {
return readListBegin(buffer, elem_type, size);
}

bool BinaryProtocolImpl::readSetEnd(Buffer::Instance& buffer) { return readListEnd(buffer); }

bool BinaryProtocolImpl::readBool(Buffer::Instance& buffer, bool& value) {
if (buffer.length() < 1) {
return false;
}

value = BufferHelper::drainI8(buffer) != 0;
return true;
}

bool BinaryProtocolImpl::readByte(Buffer::Instance& buffer, uint8_t& value) {
if (buffer.length() < 1) {
return false;
}
value = BufferHelper::drainI8(buffer);
return true;
}

bool BinaryProtocolImpl::readInt16(Buffer::Instance& buffer, int16_t& value) {
if (buffer.length() < 2) {
return false;
}
value = BufferHelper::drainI16(buffer);
return true;
}

bool BinaryProtocolImpl::readInt32(Buffer::Instance& buffer, int32_t& value) {
if (buffer.length() < 4) {
return false;
}
value = BufferHelper::drainI32(buffer);
return true;
}

bool BinaryProtocolImpl::readInt64(Buffer::Instance& buffer, int64_t& value) {
if (buffer.length() < 8) {
return false;
}
value = BufferHelper::drainI64(buffer);
return true;
}

bool BinaryProtocolImpl::readDouble(Buffer::Instance& buffer, double& value) {
static_assert(sizeof(double) == sizeof(uint64_t), "sizeof(double) != size(uint64_t)");

if (buffer.length() < 8) {
return false;
}

value = BufferHelper::drainDouble(buffer);
return true;
}

bool BinaryProtocolImpl::readString(Buffer::Instance& buffer, std::string& value) {
// Encoded as size (4 bytes) followed by string (0+ bytes).
if (buffer.length() < 4) {
return false;
}

int32_t str_len = BufferHelper::peekI32(buffer);
if (str_len < 0) {
throw EnvoyException(fmt::format("negative binary protocol string/binary length {}", str_len));
}

if (str_len == 0) {
buffer.drain(4);
value.clear();
return true;
}

if (buffer.length() < static_cast<uint64_t>(str_len) + 4) {
return false;
}

buffer.drain(4);
value.assign(static_cast<char*>(buffer.linearize(str_len)), str_len);
buffer.drain(str_len);
return true;
}

bool BinaryProtocolImpl::readBinary(Buffer::Instance& buffer, std::string& value) {
return readString(buffer, value);
}

bool LaxBinaryProtocolImpl::readMessageBegin(Buffer::Instance& buffer, std::string& name,
MessageType& msg_type, int32_t& seq_id) {
// Minimum message length:
// name len: 4 bytes +
// name: 0 bytes +
// msg type: 1 byte +
// seq id: 4 bytes
if (buffer.length() < 9) {
return false;
}

uint32_t name_len = BufferHelper::peekU32(buffer);

if (buffer.length() < 9 + name_len) {
return false;
}

MessageType type = static_cast<MessageType>(BufferHelper::peekI8(buffer, name_len + 4));
if (type < MessageType::Call || type > MessageType::LastMessageType) {
throw EnvoyException(
fmt::format("invalid (lax) binary protocol message type {}", static_cast<int8_t>(type)));
}

buffer.drain(4);
if (name_len > 0) {
name.assign(std::string(static_cast<char*>(buffer.linearize(name_len)), name_len));
buffer.drain(name_len);
} else {
name.clear();
}

msg_type = type;
seq_id = BufferHelper::peekI32(buffer, 1);

buffer.drain(5);
return true;
}

} // namespace ThriftProxy
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
Loading

0 comments on commit 2c37930

Please sign in to comment.