Skip to content

Commit

Permalink
kafka: 2.4.0 support - add support for new message types added in 2.4 (
Browse files Browse the repository at this point in the history
…#10000)

Signed-off-by: Adam Kotwasinski <[email protected]>
  • Loading branch information
adamkotwasinski authored Feb 14, 2020
1 parent fade668 commit bbf365b
Show file tree
Hide file tree
Showing 22 changed files with 526 additions and 125 deletions.
18 changes: 9 additions & 9 deletions bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -303,18 +303,18 @@ REPOSITORY_LOCATIONS = dict(
urls = ["https://github.com/protocolbuffers/upb/archive/8a3ae1ef3e3e3f26b45dec735c5776737fc7247f.tar.gz"],
),
kafka_source = dict(
sha256 = "feaa32e5c42acf42bd587f8f0b1ccce679db227620da97eed013f4c44a44f64d",
strip_prefix = "kafka-2.3.1/clients/src/main/resources/common/message",
urls = ["https://github.com/apache/kafka/archive/2.3.1.zip"],
sha256 = "e7b748a62e432b5770db6dbb3b034c68c0ea212812cb51603ee7f3a8a35f06be",
strip_prefix = "kafka-2.4.0/clients/src/main/resources/common/message",
urls = ["https://github.com/apache/kafka/archive/2.4.0.zip"],
),
kafka_server_binary = dict(
sha256 = "5a3ddd4148371284693370d56f6f66c7a86d86dd96c533447d2a94d176768d2e",
strip_prefix = "kafka_2.12-2.3.1",
urls = ["http://us.mirrors.quenda.co/apache/kafka/2.3.1/kafka_2.12-2.3.1.tgz"],
sha256 = "b9582bab0c3e8d131953b1afa72d6885ca1caae0061c2623071e7f396f2ccfee",
strip_prefix = "kafka_2.12-2.4.0",
urls = ["http://us.mirrors.quenda.co/apache/kafka/2.4.0/kafka_2.12-2.4.0.tgz"],
),
kafka_python_client = dict(
sha256 = "81f24a5d297531495e0ccb931fbd6c4d1ec96583cf5a730579a3726e63f59c47",
strip_prefix = "kafka-python-1.4.7",
urls = ["https://github.com/dpkp/kafka-python/archive/1.4.7.tar.gz"],
sha256 = "454bf3aafef9348017192417b7f0828a347ec2eaf3efba59336f3a3b68f10094",
strip_prefix = "kafka-python-2.0.0",
urls = ["https://github.com/dpkp/kafka-python/archive/2.0.0.tar.gz"],
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Kafka Broker filter

The Apache Kafka broker filter decodes the client protocol for
`Apache Kafka <https://kafka.apache.org/>`_, both the requests and responses in the payload.
The message versions in `Kafka 2.3.1 <http://kafka.apache.org/231/protocol.html#protocol_api_keys>`_
The message versions in `Kafka 2.4.0 <http://kafka.apache.org/24/protocol.html#protocol_api_keys>`_
are supported.
The filter attempts not to influence the communication between client and brokers, so the messages
that could not be decoded (due to Kafka client or broker running a newer version than supported by
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/network/kafka/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ envoy_cc_library(
deps = [
":kafka_request_lib",
":parser_lib",
":tagged_fields_lib",
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
],
Expand Down Expand Up @@ -143,6 +144,7 @@ envoy_cc_library(
deps = [
":kafka_response_lib",
":parser_lib",
":tagged_fields_lib",
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
],
Expand Down
57 changes: 48 additions & 9 deletions source/extensions/filters/network/kafka/kafka_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,22 @@

#include "extensions/filters/network/kafka/external/serialization_composite.h"
#include "extensions/filters/network/kafka/serialization.h"
#include "extensions/filters/network/kafka/tagged_fields.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {

/**
* Decides if request with given api key & version should have tagged fields in header.
* This method gets implemented in generated code through 'kafka_request_resolver_cc.j2'.
* @param api_key Kafka request key.
* @param api_version Kafka request's version.
* @return Whether tagged fields should be used for this request.
*/
bool requestUsesTaggedFieldsInHeader(const uint16_t api_key, const uint16_t api_version);

/**
* Represents fields that are present in every Kafka request message.
* @see http://kafka.apache.org/protocol.html#protocol_messages
Expand All @@ -19,10 +29,45 @@ struct RequestHeader {
int16_t api_version_;
int32_t correlation_id_;
NullableString client_id_;
TaggedFields tagged_fields_;

RequestHeader(const int16_t api_key, const int16_t api_version, const int32_t correlation_id,
const NullableString& client_id)
: RequestHeader{api_key, api_version, correlation_id, client_id, TaggedFields{}} {};

RequestHeader(const int16_t api_key, const int16_t api_version, const int32_t correlation_id,
const NullableString& client_id, const TaggedFields& tagged_fields)
: api_key_{api_key}, api_version_{api_version}, correlation_id_{correlation_id},
client_id_{client_id}, tagged_fields_{tagged_fields} {};

uint32_t computeSize(const EncodingContext& context) const {
uint32_t result{0};
result += context.computeSize(api_key_);
result += context.computeSize(api_version_);
result += context.computeSize(correlation_id_);
result += context.computeSize(client_id_);
if (requestUsesTaggedFieldsInHeader(api_key_, api_version_)) {
result += context.computeCompactSize(tagged_fields_);
}
return result;
}

uint32_t encode(Buffer::Instance& dst, EncodingContext& context) const {
uint32_t written{0};
written += context.encode(api_key_, dst);
written += context.encode(api_version_, dst);
written += context.encode(correlation_id_, dst);
written += context.encode(client_id_, dst);
if (requestUsesTaggedFieldsInHeader(api_key_, api_version_)) {
written += context.encodeCompact(tagged_fields_, dst);
}
return written;
}

bool operator==(const RequestHeader& rhs) const {
return api_key_ == rhs.api_key_ && api_version_ == rhs.api_version_ &&
correlation_id_ == rhs.correlation_id_ && client_id_ == rhs.client_id_;
correlation_id_ == rhs.correlation_id_ && client_id_ == rhs.client_id_ &&
tagged_fields_ == rhs.tagged_fields_;
};
};

Expand Down Expand Up @@ -95,10 +140,7 @@ template <typename Data> class Request : public AbstractRequest {
const EncodingContext context{request_header_.api_version_};
uint32_t result{0};
// Compute size of header.
result += context.computeSize(request_header_.api_key_);
result += context.computeSize(request_header_.api_version_);
result += context.computeSize(request_header_.correlation_id_);
result += context.computeSize(request_header_.client_id_);
result += context.computeSize(request_header_);
// Compute size of request data.
result += context.computeSize(data_);
return result;
Expand All @@ -111,10 +153,7 @@ template <typename Data> class Request : public AbstractRequest {
EncodingContext context{request_header_.api_version_};
uint32_t written{0};
// Encode request header.
written += context.encode(request_header_.api_key_, dst);
written += context.encode(request_header_.api_version_, dst);
written += context.encode(request_header_.correlation_id_, dst);
written += context.encode(request_header_.client_id_, dst);
written += context.encode(request_header_, dst);
// Encode request-specific data.
written += context.encode(data_, dst);
return written;
Expand Down
30 changes: 30 additions & 0 deletions source/extensions/filters/network/kafka/kafka_request_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,36 @@ RequestParseResponse RequestStartParser::parse(absl::string_view& data) {
}
}

uint32_t RequestHeaderDeserializer::feed(absl::string_view& data) {
uint32_t consumed = 0;

consumed += common_part_deserializer_.feed(data);
if (common_part_deserializer_.ready()) {
const auto request_header = common_part_deserializer_.get();
if (requestUsesTaggedFieldsInHeader(request_header.api_key_, request_header.api_version_)) {
tagged_fields_present_ = true;
consumed += tagged_fields_deserializer_.feed(data);
}
}

return consumed;
}

bool RequestHeaderDeserializer::ready() const {
// Header is only fully parsed after we have processed everything, including tagged fields (if
// they are present).
return common_part_deserializer_.ready() &&
(tagged_fields_present_ ? tagged_fields_deserializer_.ready() : true);
}

RequestHeader RequestHeaderDeserializer::get() const {
auto result = common_part_deserializer_.get();
if (tagged_fields_present_) {
result.tagged_fields_ = tagged_fields_deserializer_.get();
}
return result;
}

RequestParseResponse RequestHeaderParser::parse(absl::string_view& data) {
context_->remaining_request_size_ -= deserializer_->feed(data);
// One of the two needs must have happened when feeding finishes:
Expand Down
40 changes: 35 additions & 5 deletions source/extensions/filters/network/kafka/kafka_request_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "extensions/filters/network/kafka/kafka_request.h"
#include "extensions/filters/network/kafka/parser.h"
#include "extensions/filters/network/kafka/tagged_fields.h"

namespace Envoy {
namespace Extensions {
Expand All @@ -22,8 +23,16 @@ using RequestParserSharedPtr = std::shared_ptr<RequestParser>;
* Context that is shared between parsers that are handling the same single message.
*/
struct RequestContext {

/**
* Bytes left to consume.
*/
uint32_t remaining_request_size_{0};
RequestHeader request_header_{};

/**
* Request header that gets filled in during the parse.
*/
RequestHeader request_header_{-1, -1, -1, absl::nullopt};

/**
* Bytes left to consume.
Expand Down Expand Up @@ -91,10 +100,31 @@ class RequestStartParser : public RequestParser {
* Can throw, as one of the fields (client-id) can throw (nullable string with invalid length).
* @see http://kafka.apache.org/protocol.html#protocol_messages
*/
class RequestHeaderDeserializer
: public CompositeDeserializerWith4Delegates<RequestHeader, Int16Deserializer,
Int16Deserializer, Int32Deserializer,
NullableStringDeserializer> {};
class RequestHeaderDeserializer : public Deserializer<RequestHeader>,
private Logger::Loggable<Logger::Id::kafka> {

// Request header, no matter what, has at least 4 fields. They are extracted here.
using CommonPartDeserializer =
CompositeDeserializerWith4Delegates<RequestHeader, Int16Deserializer, Int16Deserializer,
Int32Deserializer, NullableStringDeserializer>;

public:
RequestHeaderDeserializer() = default;

uint32_t feed(absl::string_view& data) override;
bool ready() const override;
RequestHeader get() const override;

private:
// Deserializer for the first 4 fields, that are present in every request header.
CommonPartDeserializer common_part_deserializer_;

// Tagged fields are used only in request header v2.
// This flag will be set depending on common part's result (api key & version), and will decide
// whether we want to feed data to tagged fields deserializer.
bool tagged_fields_present_;
TaggedFieldsDeserializer tagged_fields_deserializer_;
};

using RequestHeaderDeserializerPtr = std::unique_ptr<RequestHeaderDeserializer>;

Expand Down
54 changes: 49 additions & 5 deletions source/extensions/filters/network/kafka/kafka_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,67 @@

#include "extensions/filters/network/kafka/external/serialization_composite.h"
#include "extensions/filters/network/kafka/serialization.h"
#include "extensions/filters/network/kafka/tagged_fields.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {

/**
* Decides if response with given api key & version should have tagged fields in header.
* Bear in mind, that ApiVersions responses DO NOT contain tagged fields in header (despite having
* flexible versions) as per
* https://github.com/apache/kafka/blob/2.4.0/clients/src/main/resources/common/message/ApiVersionsResponse.json#L24
* This method gets implemented in generated code through 'kafka_response_resolver_cc.j2'.
*
* @param api_key Kafka request key.
* @param api_version Kafka request's version.
* @return Whether tagged fields should be used for this request.
*/
bool responseUsesTaggedFieldsInHeader(const uint16_t api_key, const uint16_t api_version);

/**
* Represents Kafka response metadata: expected api key, version and correlation id.
* @see http://kafka.apache.org/protocol.html#protocol_messages
*/
struct ResponseMetadata {
ResponseMetadata(const int16_t api_key, const int16_t api_version, const int32_t correlation_id)
: api_key_{api_key}, api_version_{api_version}, correlation_id_{correlation_id} {};
: ResponseMetadata{api_key, api_version, correlation_id, TaggedFields{}} {};

ResponseMetadata(const int16_t api_key, const int16_t api_version, const int32_t correlation_id,
const TaggedFields& tagged_fields)
: api_key_{api_key}, api_version_{api_version}, correlation_id_{correlation_id},
tagged_fields_{tagged_fields} {};

uint32_t computeSize(const EncodingContext& context) const {
uint32_t result{0};
result += context.computeSize(correlation_id_);
if (responseUsesTaggedFieldsInHeader(api_key_, api_version_)) {
result += context.computeCompactSize(tagged_fields_);
}
return result;
}

uint32_t encode(Buffer::Instance& dst, EncodingContext& context) const {
uint32_t written{0};
// Encode correlation id (api key / version are not present in responses).
written += context.encode(correlation_id_, dst);
if (responseUsesTaggedFieldsInHeader(api_key_, api_version_)) {
written += context.encodeCompact(tagged_fields_, dst);
}
return written;
}

bool operator==(const ResponseMetadata& rhs) const {
return api_key_ == rhs.api_key_ && api_version_ == rhs.api_version_ &&
correlation_id_ == rhs.correlation_id_;
correlation_id_ == rhs.correlation_id_ && tagged_fields_ == rhs.tagged_fields_;
};

const int16_t api_key_;
const int16_t api_version_;
const int32_t correlation_id_;
const TaggedFields tagged_fields_;
};

using ResponseMetadataSharedPtr = std::shared_ptr<ResponseMetadata>;
Expand Down Expand Up @@ -77,7 +116,12 @@ template <typename Data> class Response : public AbstractResponse {
*/
uint32_t computeSize() const override {
const EncodingContext context{metadata_.api_version_};
return context.computeSize(metadata_.correlation_id_) + context.computeSize(data_);
uint32_t result{0};
// Compute size of header.
result += context.computeSize(metadata_);
// Compute size of response data.
result += context.computeSize(data_);
return result;
}

/**
Expand All @@ -86,8 +130,8 @@ template <typename Data> class Response : public AbstractResponse {
uint32_t encode(Buffer::Instance& dst) const override {
EncodingContext context{metadata_.api_version_};
uint32_t written{0};
// Encode correlation id (api key / version are not present in responses).
written += context.encode(metadata_.correlation_id_, dst);
// Encode response header.
written += context.encode(metadata_, dst);
// Encode response-specific data.
written += context.encode(data_, dst);
return written;
Expand Down
33 changes: 25 additions & 8 deletions source/extensions/filters/network/kafka/kafka_response_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,33 @@ ResponseParseResponse ResponseHeaderParser::parse(absl::string_view& data) {
return ResponseParseResponse::stillWaiting();
}

context_->remaining_response_size_ = length_deserializer_.get();
context_->remaining_response_size_ -= sizeof(context_->correlation_id_);
context_->correlation_id_ = correlation_id_deserializer_.get();
if (!context_->api_info_set_) {
// We have consumed first two response header fields: payload length and correlation id.
context_->remaining_response_size_ = length_deserializer_.get();
context_->remaining_response_size_ -= sizeof(context_->correlation_id_);
context_->correlation_id_ = correlation_id_deserializer_.get();

const ExpectedResponseSpec spec = getResponseSpec(context_->correlation_id_);
context_->api_key_ = spec.first;
context_->api_version_ = spec.second;
// At this stage, we have setup the context - we know the response's api key & version, so we can
// safely create the payload parser.
// We have correlation id now, so we can see what is the expected response api key & version.
const ExpectedResponseSpec spec = getResponseSpec(context_->correlation_id_);
context_->api_key_ = spec.first;
context_->api_version_ = spec.second;

// Mark that version data has been set, so we do not attempt to re-initialize again.
context_->api_info_set_ = true;
}

// Depending on response's api key & version, we might need to parse tagged fields element.
if (responseUsesTaggedFieldsInHeader(context_->api_key_, context_->api_version_)) {
context_->remaining_response_size_ -= tagged_fields_deserializer_.feed(data);
if (tagged_fields_deserializer_.ready()) {
context_->tagged_fields_ = tagged_fields_deserializer_.get();
} else {
return ResponseParseResponse::stillWaiting();
}
}

// At this stage, we have fully setup the context - we know the response's api key & version,
// so we can safely create the payload parser.
auto next_parser = parser_resolver_.createParser(context_);
return ResponseParseResponse::nextParser(next_parser);
}
Expand Down
Loading

0 comments on commit bbf365b

Please sign in to comment.