Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Upgrade Kafka dependencies to 2.3.1 #9582

Merged
merged 1 commit into from
Jan 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,14 @@ REPOSITORY_LOCATIONS = dict(
urls = ["https://github.com/protocolbuffers/upb/archive/8a3ae1ef3e3e3f26b45dec735c5776737fc7247f.tar.gz"],
),
kafka_source = dict(
sha256 = "ae7a1696c0a0302b43c5b21e515c37e6ecd365941f68a510a7e442eebddf39a1", # 2.2.0-rc2
strip_prefix = "kafka-2.2.0-rc2/clients/src/main/resources/common/message",
urls = ["https://github.com/apache/kafka/archive/2.2.0-rc2.zip"],
sha256 = "feaa32e5c42acf42bd587f8f0b1ccce679db227620da97eed013f4c44a44f64d",
strip_prefix = "kafka-2.3.1/clients/src/main/resources/common/message",
urls = ["https://github.com/apache/kafka/archive/2.3.1.zip"],
),
kafka_server_binary = dict(
sha256 = "a009624fae678fa35968f945e18e45fbea9a30fa8080d5dcce7fdea726120027",
strip_prefix = "kafka_2.12-2.2.0",
urls = ["http://us.mirrors.quenda.co/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"],
sha256 = "5a3ddd4148371284693370d56f6f66c7a86d86dd96c533447d2a94d176768d2e",
strip_prefix = "kafka_2.12-2.3.1",
urls = ["http://us.mirrors.quenda.co/apache/kafka/2.3.1/kafka_2.12-2.3.1.tgz"],
),
kafka_python_client = dict(
sha256 = "81f24a5d297531495e0ccb931fbd6c4d1ec96583cf5a730579a3726e63f59c47",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Kafka Broker filter

The Apache Kafka broker filter decodes the client protocol for
`Apache Kafka <https://kafka.apache.org/>`_, both the requests and responses in the payload.
The message versions in `Kafka 2.0 <http://kafka.apache.org/20/protocol.html#protocol_api_keys>`_
The message versions in `Kafka 2.3.1 <http://kafka.apache.org/231/protocol.html#protocol_api_keys>`_
are supported.
The filter attempts not to influence the communication between client and brokers, so the messages
that could not be decoded (due to Kafka client or broker running a newer version than supported by
Expand Down Expand Up @@ -37,10 +37,12 @@ in the configuration snippet below:
filter_chains:
- filters:
- name: envoy.filters.network.kafka_broker
config:
typed_config:
"@type": type.googleapis.com/envoy.config.filter.network.kafka_broker.v2alpha1.KafkaBroker
stat_prefix: exampleprefix
- name: envoy.tcp_proxy
config:
typed_config:
"@type": type.googleapis.com/envoy.config.filter.network.tcp_proxy.v2.TcpProxy
stat_prefix: tcp
cluster: localkafka
clusters:
Expand Down
171 changes: 102 additions & 69 deletions source/extensions/filters/network/kafka/protocol/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ def generate_main_code(type, main_header_file, resolver_cc_file, metrics_header_
- resolver_cc_file - contains request api key & version mapping to deserializer (from header file)
- metrics_header_file - contains metrics with names corresponding to messages
"""
processor = StatefulProcessor()
# Parse provided input files.
messages = parse_messages(input_files)
messages = processor.parse_messages(input_files)

complex_type_template = RenderingHelper.get_template('complex_type_template.j2')
parsers_template = RenderingHelper.get_template("%s_parser.j2" % type)
Expand Down Expand Up @@ -64,8 +65,9 @@ def generate_test_code(type, header_test_cc_file, codec_test_cc_file, utilities_
- codec_test_cc_file - tests involving codec and Request/ResponseParserResolver,
- utilities_cc_file - utilities for creating sample messages.
"""
processor = StatefulProcessor()
# Parse provided input files.
messages = parse_messages(input_files)
messages = processor.parse_messages(input_files)

# Generate header-test file.
template = RenderingHelper.get_template("%ss_test_cc.j2" % type)
Expand All @@ -86,80 +88,106 @@ def generate_test_code(type, header_test_cc_file, codec_test_cc_file, utilities_
fd.write(contents)


def parse_messages(input_files):
class StatefulProcessor:
"""
Parse request/response structures from provided input files.
Helper entity that keeps state during the processing.
Some state needs to be shared across multiple message types, as we need to handle identical
sub-type names (e.g. both AlterConfigsRequest & IncrementalAlterConfigsRequest have child
AlterConfigsResource, what would cause a compile-time error if we were to handle it trivially).
"""
import re
import json

messages = []
# For each specification file, remove comments, and parse the remains.
for input_file in input_files:
with open(input_file, 'r') as fd:
raw_contents = fd.read()
without_comments = re.sub(r'//.*\n', '', raw_contents)
message_spec = json.loads(without_comments)
message = parse_top_level_element(message_spec)
messages.append(message)

# Sort messages by api_key.
messages.sort(key=lambda x: x.get_extra('api_key'))
return messages


def parse_top_level_element(spec):
"""
Parse a given structure into a request/response.
Request/response is just a complex type, that has name & version information kept in differently
named fields, compared to sub-structures in a message.
"""
type_name = spec['name']
versions = Statics.parse_version_string(spec['validVersions'], 2 << 16 - 1)
return parse_complex_type(type_name, spec, versions).with_extra('api_key', spec['apiKey'])


def parse_complex_type(type_name, field_spec, versions):
"""
Parse given complex type, returning a structure that holds its name, field specification and
allowed versions.
"""
fields = []
for child_field in field_spec['fields']:
child = parse_field(child_field, versions[-1])
fields.append(child)
return Complex(type_name, fields, versions)
def __init__(self):
# Complex types that have been encountered during processing.
self.known_types = set()
# Name of parent message type that's being processed right now.
self.currently_processed_message_type = None

def parse_messages(self, input_files):
"""
Parse request/response structures from provided input files.
"""
import re
import json

messages = []
# Sort the input files, as the processing is stateful, as we want the same order every time.
input_files.sort()
# For each specification file, remove comments, and parse the remains.
for input_file in input_files:
with open(input_file, 'r') as fd:
raw_contents = fd.read()
without_comments = re.sub(r'//.*\n', '', raw_contents)
message_spec = json.loads(without_comments)
message = self.parse_top_level_element(message_spec)
messages.append(message)

# Sort messages by api_key.
messages.sort(key=lambda x: x.get_extra('api_key'))
return messages

def parse_top_level_element(self, spec):
"""
Parse a given structure into a request/response.
Request/response is just a complex type, that has name & version information kept in differently
named fields, compared to sub-structures in a message.
"""
self.currently_processed_message_type = spec['name']
versions = Statics.parse_version_string(spec['validVersions'], 2 << 16 - 1)
complex_type = self.parse_complex_type(self.currently_processed_message_type, spec, versions)
# Request / response types need to carry api key version.
return complex_type.with_extra('api_key', spec['apiKey'])

def parse_field(field_spec, highest_possible_version):
"""
Parse given field, returning a structure holding the name, type, and versions when this field is
actually used (nullable or not). Obviously, field cannot be used in version higher than its
type's usage.
"""
version_usage = Statics.parse_version_string(field_spec['versions'], highest_possible_version)
version_usage_as_nullable = Statics.parse_version_string(
field_spec['nullableVersions'],
highest_possible_version) if 'nullableVersions' in field_spec else range(-1)
parsed_type = parse_type(field_spec['type'], field_spec, highest_possible_version)
return FieldSpec(field_spec['name'], parsed_type, version_usage, version_usage_as_nullable)
def parse_complex_type(self, type_name, field_spec, versions):
"""
Parse given complex type, returning a structure that holds its name, field specification and
allowed versions.
"""
fields = []
for child_field in field_spec['fields']:
child = self.parse_field(child_field, versions[-1])
fields.append(child)

# Some of the types repeat multiple times (e.g. AlterableConfig).
# In such a case, every second or later occurrence of the same name is going to be prefixed
# with parent type, e.g. we have AlterableConfig (for AlterConfigsRequest) and then
# IncrementalAlterConfigsRequestAlterableConfig (for IncrementalAlterConfigsRequest).
# This keeps names unique, while keeping non-duplicate ones short.
if type_name not in self.known_types:
self.known_types.add(type_name)
else:
type_name = self.currently_processed_message_type + type_name
self.known_types.add(type_name)

return Complex(type_name, fields, versions)

def parse_type(type_name, field_spec, highest_possible_version):
"""
Parse a given type element - returns an array type, primitive (e.g. uint32_t) or complex one.
"""
if (type_name.startswith('[]')):
# In spec files, array types are defined as `[]underlying_type` instead of having its own
# element with type inside.
underlying_type = parse_type(type_name[2:], field_spec, highest_possible_version)
return Array(underlying_type)
else:
if (type_name in Primitive.PRIMITIVE_TYPE_NAMES):
return Primitive(type_name, field_spec.get('default'))
def parse_field(self, field_spec, highest_possible_version):
"""
Parse given field, returning a structure holding the name, type, and versions when this field is
actually used (nullable or not). Obviously, field cannot be used in version higher than its
type's usage.
"""
version_usage = Statics.parse_version_string(field_spec['versions'], highest_possible_version)
version_usage_as_nullable = Statics.parse_version_string(
field_spec['nullableVersions'],
highest_possible_version) if 'nullableVersions' in field_spec else range(-1)
parsed_type = self.parse_type(field_spec['type'], field_spec, highest_possible_version)
return FieldSpec(field_spec['name'], parsed_type, version_usage, version_usage_as_nullable)

def parse_type(self, type_name, field_spec, highest_possible_version):
"""
Parse a given type element - returns an array type, primitive (e.g. uint32_t) or complex one.
"""
if (type_name.startswith('[]')):
# In spec files, array types are defined as `[]underlying_type` instead of having its own
# element with type inside.
underlying_type = self.parse_type(type_name[2:], field_spec, highest_possible_version)
return Array(underlying_type)
else:
versions = Statics.parse_version_string(field_spec['versions'], highest_possible_version)
return parse_complex_type(type_name, field_spec, versions)
if (type_name in Primitive.PRIMITIVE_TYPE_NAMES):
return Primitive(type_name, field_spec.get('default'))
else:
versions = Statics.parse_version_string(field_spec['versions'], highest_possible_version)
return self.parse_complex_type(type_name, field_spec, versions)


class Statics:
Expand Down Expand Up @@ -283,7 +311,12 @@ def parameter_declaration(self, version):

def default_value(self):
if self.is_nullable():
return '{%s}' % self.type.default_value()
type_default_value = self.type.default_value()
# For nullable fields, it's possible to have (Java) null as default value.
if type_default_value != 'null':
return '{%s}' % type_default_value
else:
return 'absl::nullopt'
else:
return str(self.type.default_value())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def get_field_counts():
"""
Generate argument counts that should be processed by composite deserializers.
"""
return range(1, 10)
return range(1, 11)


class RenderingHelper:
Expand Down