Skip to content

Commit

Permalink
kafka: add serialization methods for compact encoding
Browse files Browse the repository at this point in the history
Signed-off-by: Adam Kotwasinski <[email protected]>
  • Loading branch information
adamkotwasinski committed Feb 6, 2020
1 parent 9774626 commit 862482c
Show file tree
Hide file tree
Showing 2 changed files with 258 additions and 13 deletions.
232 changes: 231 additions & 1 deletion source/extensions/filters/network/kafka/serialization.h
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,6 @@ class NullableCompactArrayDeserializer : public Deserializer<NullableArray<Respo
* structure-tree during encoding (currently api_version, as different request versions serialize
* differently).
*/
// TODO(adamkotwasinski) that class might be split into Request/ResponseEncodingContext in future
class EncodingContext {
public:
EncodingContext(int16_t api_version) : api_version_{api_version} {};
Expand All @@ -794,6 +793,24 @@ class EncodingContext {
*/
template <typename T> uint32_t computeSize(const NullableArray<T>& arg) const;

/**
* Compute size of given reference, if it were to be compactly encoded.
* @return serialized size of argument.
*/
template <typename T> uint32_t computeCompactSize(const T& arg) const;

/**
* Compute size of given array, if it were to be compactly encoded.
* @return serialized size of argument.
*/
template <typename T> uint32_t computeCompactSize(const std::vector<T>& arg) const;

/**
* Compute size of given nullable array, if it were to be encoded.
* @return serialized size of argument.
*/
template <typename T> uint32_t computeCompactSize(const NullableArray<T>& arg) const;

/**
* Encode given reference in a buffer.
* @return bytes written
Expand All @@ -812,6 +829,24 @@ class EncodingContext {
*/
template <typename T> uint32_t encode(const NullableArray<T>& arg, Buffer::Instance& dst);

/**
* Compactly encode given reference in a buffer.
* @return bytes written.
*/
template <typename T> uint32_t encodeCompact(const T& arg, Buffer::Instance& dst);

/**
* Compactly encode given array in a buffer.
* @return bytes written.
*/
template <typename T> uint32_t encodeCompact(const std::vector<T>& arg, Buffer::Instance& dst);

/**
* Compactly encode given nullable array in a buffer.
* @return bytes written.
*/
template <typename T> uint32_t encodeCompact(const NullableArray<T>& arg, Buffer::Instance& dst);

int16_t apiVersion() const { return api_version_; }

private:
Expand Down Expand Up @@ -895,6 +930,89 @@ inline uint32_t EncodingContext::computeSize(const NullableArray<T>& arg) const
return arg ? computeSize(*arg) : sizeof(int32_t);
}

/**
* For non-primitive types, call `computeCompactSize` on them, to delegate the work to the entity
* itself. The entity may use the information in context to decide which fields are included etc.
*/
template <typename T> inline uint32_t EncodingContext::computeCompactSize(const T& arg) const {
return arg.computeCompactSize(*this);
}

/**
* Template overload for int32_t.
* This data type is not compacted, so we just point to non-compact implementation.
*/
template <> inline uint32_t EncodingContext::computeCompactSize(const int32_t& arg) const {
return computeSize(arg);
}

/**
* Template overload for uint32_t.
* For this data type, we notice that the result's length depends on whether there are any bits set
* in groups (1-7, 8-14, 15-21, 22-28, 29-32).
*/
template <> inline uint32_t EncodingContext::computeCompactSize(const uint32_t& arg) const {
if (arg <= 0x7f) /* 2^7-1 */ {
return 1;
} else if (arg <= 0x3fff) /* 2^14-1 */ {
return 2;
} else if (arg <= 0x1fffff) /* 2^21-1 */ {
return 3;
} else if (arg <= 0xfffffff) /* 2^28-1 */ {
return 4;
} else {
return 5;
}
}

/**
* Template overload for compact string.
* Kafka CompactString's size is var-len encoding of N+1 + N bytes.
*/
template <> inline uint32_t EncodingContext::computeCompactSize(const std::string& arg) const {
return computeCompactSize(static_cast<uint32_t>(arg.size()) + 1) + arg.size();
}

/**
* Template overload for compact nullable string.
* Kafka CompactString's size is var-len encoding of N+1 + N bytes, or 1 otherwise (because we
* var-length encode the length of 0).
*/
template <> inline uint32_t EncodingContext::computeCompactSize(const NullableString& arg) const {
return arg ? computeCompactSize(*arg) : 1;
}

/**
* Template overload for compact byte array.
* Kafka CompactBytes' size is var-len encoding of N+1 + N bytes.
*/
template <> inline uint32_t EncodingContext::computeCompactSize(const Bytes& arg) const {
return computeCompactSize(static_cast<uint32_t>(arg.size()) + 1) + arg.size();
}

/**
* Template overload for CompactArray of T.
* The size of array is compact size of header and all of its elements.
*/
template <typename T>
uint32_t EncodingContext::computeCompactSize(const std::vector<T>& arg) const {
uint32_t result = computeCompactSize(static_cast<uint32_t>(arg.size()) + 1);
for (const T& el : arg) {
result += computeCompactSize(el);
}
return result;
}

/**
* Template overload for CompactNullableArray of T.
* The size of array is compact size of header and all of its elements; 1 otherwise (because we
* var-length encode the length of 0).
*/
template <typename T>
uint32_t EncodingContext::computeCompactSize(const NullableArray<T>& arg) const {
return arg ? computeCompactSize(*arg) : 1;
}

/**
* For non-primitive types, call `encode` on them, to delegate the serialization to the entity
* itself.
Expand Down Expand Up @@ -1020,6 +1138,118 @@ uint32_t EncodingContext::encode(const NullableArray<T>& arg, Buffer::Instance&
}
}

/**
* For non-primitive types, call `encodeCompact` on them, to delegate the serialization to the
* entity itself.
*/
template <typename T>
inline uint32_t EncodingContext::encodeCompact(const T& arg, Buffer::Instance& dst) {
return arg.encodeCompact(dst, *this);
}

/**
* int32_t is not encoded in compact fashion, so we just delegate to normal implementation.
*/
template <>
inline uint32_t EncodingContext::encodeCompact(const int32_t& arg, Buffer::Instance& dst) {
return encode(arg, dst);
}

/**
* Template overload for variable-length uint32_t (VAR_UINT).
* Encode the value in 7-bit chunks + marker if field is the last one.
* Details:
* https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields#KIP-482:TheKafkaProtocolshouldSupportOptionalTaggedFields-UnsignedVarints
*/
template <>
inline uint32_t EncodingContext::encodeCompact(const uint32_t& arg, Buffer::Instance& dst) {
uint32_t value = arg;

uint32_t elements_with_1 = 0;
// As long as there are bits set on indexes 8 or higher (counting from 1).
while ((value & ~(0x7f)) != 0) {
// Save next 7-bit batch with highest bit set.
const uint8_t el = (value & 0x7f) | 0x80;
dst.add(&el, sizeof(uint8_t));
value >>= 7;
elements_with_1++;
}

// After the loop has finished, we are certain that bit 8 = 0, so we can just add final element.
const uint8_t el = value;
dst.add(&el, sizeof(uint8_t));

return elements_with_1 + 1;
}

/**
* Template overload for std::string.
* Encode string as VAR_UINT + N bytes.
*/
template <>
inline uint32_t EncodingContext::encodeCompact(const std::string& arg, Buffer::Instance& dst) {
const uint32_t string_length = arg.length();
const uint32_t header_length = encodeCompact(string_length + 1, dst);
dst.add(arg.c_str(), string_length);
return header_length + string_length;
}

/**
* Template overload for NullableString.
* Encode string as VAR_UINT + N bytes, or VAR_UINT 0 for null value.
*/
template <>
inline uint32_t EncodingContext::encodeCompact(const NullableString& arg, Buffer::Instance& dst) {
if (arg.has_value()) {
return encodeCompact(*arg, dst);
} else {
const uint32_t len = 0;
return encodeCompact(len, dst);
}
}

/**
* Template overload for Bytes.
* Encode byte array as VAR_UINT + N bytes.
*/
template <>
inline uint32_t EncodingContext::encodeCompact(const Bytes& arg, Buffer::Instance& dst) {
const uint32_t data_length = arg.size();
const uint32_t header_length = encodeCompact(data_length + 1, dst);
dst.add(arg.data(), data_length);
return header_length + data_length;
}

/**
* Encode object array of T as VAR_UINT + N elements.
* Each element of type T then serializes itself on its own.
*/
template <typename T>
uint32_t EncodingContext::encodeCompact(const std::vector<T>& arg, Buffer::Instance& dst) {
const NullableArray<T> wrapped = {arg};
return encodeCompact(wrapped, dst);
}

/**
* Encode nullable object array of T as VAR_UINT + N elements, or VAR_UINT 0 for null value.
* Each element of type T then serializes itself on its own.
*/
template <typename T>
uint32_t EncodingContext::encodeCompact(const NullableArray<T>& arg, Buffer::Instance& dst) {
if (arg.has_value()) {
const uint32_t len = arg->size() + 1;
const uint32_t header_length = encodeCompact(len, dst);
uint32_t written{0};
for (const T& el : *arg) {
written += encodeCompact(el, dst);
}
return header_length + written;
} else {
const uint32_t len = 0;
return encodeCompact(len, dst);
}
}

} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
Expand Down
39 changes: 27 additions & 12 deletions source/extensions/filters/network/kafka/tagged_fields.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,21 @@ struct TaggedField {
uint32_t tag_;
std::vector<unsigned char> data_;

uint32_t computeSize(const EncodingContext&) const {
// FIXME(adam.kotwasinski)
throw std::runtime_error("not implemented");
uint32_t computeCompactSize(const EncodingContext& encoder) const {
uint32_t result{0};
result += encoder.computeCompactSize(tag_);
result += encoder.computeCompactSize(static_cast<uint32_t>(data_.size()));
result += data_.size();
return result;
}

uint32_t encode(Buffer::Instance&, EncodingContext&) const {
// FIXME(adam.kotwasinski)
throw std::runtime_error("not implemented");
uint32_t encodeCompact(Buffer::Instance& dst, EncodingContext& encoder) const {
uint32_t written{0};
written += encoder.encodeCompact(tag_, dst);
written += encoder.encodeCompact(static_cast<uint32_t>(data_.size()), dst);
dst.add(data_.data(), data_.size());
written += data_.size();
return written;
}

bool operator==(const TaggedField& rhs) const { return tag_ == rhs.tag_ && data_ == rhs.data_; }
Expand Down Expand Up @@ -95,14 +102,22 @@ struct TaggedFields {

const std::vector<TaggedField> fields_;

uint32_t computeSize(const EncodingContext&) const {
// FIXME(adam.kotwasinski)
throw std::runtime_error("not implemented");
uint32_t computeCompactSize(const EncodingContext& encoder) const {
uint32_t result{0};
result += encoder.computeCompactSize(static_cast<uint32_t>(fields_.size()));
for (const TaggedField& tagged_field : fields_) {
result += tagged_field.computeCompactSize(encoder);
}
return result;
}

uint32_t encode(Buffer::Instance&, EncodingContext&) const {
// FIXME(adam.kotwasinski)
throw std::runtime_error("not implemented");
uint32_t encodeCompact(Buffer::Instance& dst, EncodingContext& encoder) const {
uint32_t written{0};
written += encoder.encodeCompact(static_cast<uint32_t>(fields_.size()), dst);
for (const TaggedField& tagged_field : fields_) {
written += tagged_field.encodeCompact(dst, encoder);
}
return written;
}

bool operator==(const TaggedFields& rhs) const { return fields_ == rhs.fields_; }
Expand Down

0 comments on commit 862482c

Please sign in to comment.