From a400d419968dacd49133d2e39f7dc5c3779a52d2 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Mon, 24 Sep 2018 21:21:19 +0200 Subject: [PATCH] Add support for kafka 2.0.0 (#8399) Add support to kafka 2.0.0 in kafka output and metricbeat module. Merge kafka versioning helpers of output and metricbeat module. Set version in kafka module configuration of metricbeat system tests (cherry picked from commit 1bfd445873eb839ce0c4f3dd353a90ecd0144f8d) --- CHANGELOG.asciidoc | 2 + libbeat/{outputs => common}/kafka/version.go | 41 +++++++++- libbeat/docs/outputconfig.asciidoc | 2 +- libbeat/outputs/kafka/config.go | 11 +-- libbeat/outputs/kafka/kafka.go | 7 -- metricbeat/module/kafka/broker.go | 10 ++- .../kafka/consumergroup/consumergroup.go | 2 +- .../module/kafka/partition/partition.go | 1 + metricbeat/module/kafka/version.go | 79 ------------------- metricbeat/tests/system/test_kafka.py | 6 +- testing/environments/docker/kafka/Dockerfile | 2 +- 11 files changed, 63 insertions(+), 100 deletions(-) rename libbeat/{outputs => common}/kafka/version.go (75%) delete mode 100644 metricbeat/module/kafka/version.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 22aa8b46d16..be3235b83e2 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -89,6 +89,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff] - Make kubernetes autodiscover ignore events with empty container IDs {pull}7971[7971] - Implement CheckConfig in RunnerFactory to make autodiscover check configs {pull}7961[7961] - Add DNS processor with support for performing reverse lookups on IP addresses. {issue}7770[7770] +- Support for Kafka 2.0.0 in kafka output {pull}8399[8399] *Auditbeat* @@ -113,6 +114,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff] - Added 'died' PID state to process_system metricset on system module {pull}8275[8275] - Add `metrics` metricset to MongoDB module. {pull}7611[7611] - Added `ccr` metricset to Elasticsearch module. {pull}8335[8335] +- Support for Kafka 2.0.0 {pull}8399[8399] *Packetbeat* diff --git a/libbeat/outputs/kafka/version.go b/libbeat/common/kafka/version.go similarity index 75% rename from libbeat/outputs/kafka/version.go rename to libbeat/common/kafka/version.go index 884e67616a2..6cc8d9cb369 100644 --- a/libbeat/outputs/kafka/version.go +++ b/libbeat/common/kafka/version.go @@ -17,7 +17,14 @@ package kafka -import "github.com/Shopify/sarama" +import ( + "fmt" + + "github.com/Shopify/sarama" +) + +// Version is a kafka version +type Version string // TODO: remove me. // Compat version overwrite for missing versions in sarama @@ -31,8 +38,6 @@ var ( v1_1_1 = parseKafkaVersion("1.1.1") kafkaVersions = map[string]sarama.KafkaVersion{ - "": sarama.V1_0_0_0, - "0.8.2.0": sarama.V0_8_2_0, "0.8.2.1": sarama.V0_8_2_1, "0.8.2.2": sarama.V0_8_2_2, @@ -68,6 +73,10 @@ var ( "1.1.1": v1_1_1, "1.1": v1_1_1, "1": v1_1_1, + + "2.0.0": sarama.V2_0_0_0, + "2.0": sarama.V2_0_0_0, + "2": sarama.V2_0_0_0, } ) @@ -78,3 +87,29 @@ func parseKafkaVersion(s string) sarama.KafkaVersion { } return v } + +// Validate that a kafka version is among the possible options +func (v *Version) Validate() error { + if _, ok := kafkaVersions[string(*v)]; !ok { + return fmt.Errorf("unknown/unsupported kafka vesion '%v'", *v) + } + + return nil +} + +// Unpack a kafka version +func (v *Version) Unpack(s string) error { + tmp := Version(s) + if err := tmp.Validate(); err != nil { + return err + } + + *v = tmp + return nil +} + +// Get a sarama kafka version +func (v Version) Get() (sarama.KafkaVersion, bool) { + kv, ok := kafkaVersions[string(v)] + return kv, ok +} diff --git a/libbeat/docs/outputconfig.asciidoc b/libbeat/docs/outputconfig.asciidoc index 735a5cecb00..5155775dc4a 100644 --- a/libbeat/docs/outputconfig.asciidoc +++ b/libbeat/docs/outputconfig.asciidoc @@ -674,7 +674,7 @@ NOTE: Events bigger than <> will be ==== Compatibility -This output works with all Kafka in between 0.11 and 1.1.1. Older versions +This output works with all Kafka versions in between 0.11 and 2.0.0. Older versions might work as well, but are not supported. ==== Configuration options diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 956a8bd8cb4..9cf81d34261 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/fmtstr" + "github.com/elastic/beats/libbeat/common/kafka" "github.com/elastic/beats/libbeat/common/transport/tlscommon" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" @@ -48,7 +49,7 @@ type kafkaConfig struct { BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"` Compression string `config:"compression"` CompressionLevel int `config:"compression_level"` - Version string `config:"version"` + Version kafka.Version `config:"version"` BulkMaxSize int `config:"bulk_max_size"` MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` ClientID string `config:"client_id"` @@ -96,7 +97,7 @@ func defaultConfig() kafkaConfig { BrokerTimeout: 10 * time.Second, Compression: "gzip", CompressionLevel: 4, - Version: "1.0.0", + Version: kafka.Version("1.0.0"), MaxRetries: 3, ClientID: "beats", ChanBufferSize: 256, @@ -114,8 +115,8 @@ func (c *kafkaConfig) Validate() error { return fmt.Errorf("compression mode '%v' unknown", c.Compression) } - if _, ok := kafkaVersions[c.Version]; !ok { - return fmt.Errorf("unknown/unsupported kafka version '%v'", c.Version) + if err := c.Version.Validate(); err != nil { + return err } if c.Username != "" && c.Password == "" { @@ -200,7 +201,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) { // configure client ID k.ClientID = config.ClientID - version, ok := kafkaVersions[config.Version] + version, ok := config.Version.Get() if !ok { return nil, fmt.Errorf("Unknown/unsupported kafka version: %v", config.Version) } diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index 039d8b8000c..d9f9e86ac18 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -33,13 +33,6 @@ import ( "github.com/elastic/beats/libbeat/outputs/outil" ) -type kafka struct { - config kafkaConfig - topic outil.Selector - - partitioner sarama.PartitionerConstructor -} - const ( defaultWaitRetry = 1 * time.Second diff --git a/metricbeat/module/kafka/broker.go b/metricbeat/module/kafka/broker.go index dd5a15b47a8..1efff5cd1f2 100644 --- a/metricbeat/module/kafka/broker.go +++ b/metricbeat/module/kafka/broker.go @@ -29,8 +29,14 @@ import ( "github.com/Shopify/sarama" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/kafka" ) +// Version returns a kafka version from its string representation +func Version(version string) kafka.Version { + return kafka.Version(version) +} + // Broker provides functionality for communicating with a single kafka broker type Broker struct { broker *sarama.Broker @@ -50,7 +56,7 @@ type BrokerSettings struct { Backoff time.Duration TLS *tls.Config Username, Password string - Version Version + Version kafka.Version } type GroupDescription struct { @@ -83,7 +89,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker { cfg.Net.SASL.User = user cfg.Net.SASL.Password = settings.Password } - cfg.Version = settings.Version.get() + cfg.Version, _ = settings.Version.Get() return &Broker{ broker: sarama.NewBroker(host), diff --git a/metricbeat/module/kafka/consumergroup/consumergroup.go b/metricbeat/module/kafka/consumergroup/consumergroup.go index 2aec964f2e9..713e27e5df9 100644 --- a/metricbeat/module/kafka/consumergroup/consumergroup.go +++ b/metricbeat/module/kafka/consumergroup/consumergroup.go @@ -86,7 +86,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { Password: config.Password, // consumer groups API requires at least 0.9.0.0 - Version: kafka.Version{String: "0.9.0.0"}, + Version: kafka.Version("0.9.0.0"), } return &MetricSet{ diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index a2318fcfc90..e9cbbf9154e 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -83,6 +83,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { TLS: tls, Username: config.Username, Password: config.Password, + Version: kafka.Version("0.8.2.0"), } return &MetricSet{ diff --git a/metricbeat/module/kafka/version.go b/metricbeat/module/kafka/version.go deleted file mode 100644 index 01ba381a962..00000000000 --- a/metricbeat/module/kafka/version.go +++ /dev/null @@ -1,79 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package kafka - -import ( - "fmt" - - "github.com/Shopify/sarama" -) - -type Version struct { - String string -} - -var ( - minVersion = sarama.V0_8_2_0 - - kafkaVersions = map[string]sarama.KafkaVersion{ - "": sarama.V0_8_2_0, - - "0.8.2.0": sarama.V0_8_2_0, - "0.8.2.1": sarama.V0_8_2_1, - "0.8.2.2": sarama.V0_8_2_2, - "0.8.2": sarama.V0_8_2_2, - "0.8": sarama.V0_8_2_2, - - "0.9.0.0": sarama.V0_9_0_0, - "0.9.0.1": sarama.V0_9_0_1, - "0.9.0": sarama.V0_9_0_1, - "0.9": sarama.V0_9_0_1, - - "0.10.0.0": sarama.V0_10_0_0, - "0.10.0.1": sarama.V0_10_0_1, - "0.10.0": sarama.V0_10_0_1, - "0.10.1.0": sarama.V0_10_1_0, - "0.10.1": sarama.V0_10_1_0, - "0.10": sarama.V0_10_1_0, - } -) - -func (v *Version) Validate() error { - if _, ok := kafkaVersions[v.String]; !ok { - return fmt.Errorf("unknown/unsupported kafka vesion '%v'", v.String) - } - return nil -} - -func (v *Version) Unpack(s string) error { - tmp := Version{s} - if err := tmp.Validate(); err != nil { - return err - } - - *v = tmp - return nil -} - -func (v *Version) get() sarama.KafkaVersion { - if v, ok := kafkaVersions[v.String]; ok { - return v - } - - return minVersion -} diff --git a/metricbeat/tests/system/test_kafka.py b/metricbeat/tests/system/test_kafka.py index de38d53d0e7..a7ba2d26e28 100644 --- a/metricbeat/tests/system/test_kafka.py +++ b/metricbeat/tests/system/test_kafka.py @@ -7,6 +7,7 @@ class KafkaTest(metricbeat.BaseTest): COMPOSE_SERVICES = ['kafka'] + VERSION = "2.0.0" @unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test") def test_partition(self): @@ -20,7 +21,8 @@ def test_partition(self): "name": "kafka", "metricsets": ["partition"], "hosts": self.get_hosts(), - "period": "1s" + "period": "1s", + "version": self.VERSION, }]) proc = self.start_beat() self.wait_until(lambda: self.output_lines() > 0, max_timeout=20) @@ -47,7 +49,9 @@ def get_hosts(self): class Kafka_1_1_0_Test(KafkaTest): COMPOSE_SERVICES = ['kafka_1_1_0'] + VERSION = "1.1.0" class Kafka_0_10_2_Test(KafkaTest): COMPOSE_SERVICES = ['kafka_0_10_2'] + VERSION = "0.10.2" diff --git a/testing/environments/docker/kafka/Dockerfile b/testing/environments/docker/kafka/Dockerfile index cda937e2587..581513efcbd 100644 --- a/testing/environments/docker/kafka/Dockerfile +++ b/testing/environments/docker/kafka/Dockerfile @@ -4,7 +4,7 @@ ENV KAFKA_HOME /kafka # The advertised host is kafka. This means it will not work if container is started locally and connected from localhost to it ENV KAFKA_ADVERTISED_HOST kafka ENV KAFKA_LOGS_DIR="/kafka-logs" -ENV KAFKA_VERSION 1.1.1 +ENV KAFKA_VERSION 2.0.0 ENV _JAVA_OPTIONS "-Djava.net.preferIPv4Stack=true" ENV TERM=linux