diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 1ce143c7d92e..a00b6e0ed3f9 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -105,6 +105,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] - Report configured queue type. {pull}8091[8091] - Added the `add_process_metadata` processor to enrich events with process information. {pull}6789[6789] - Report number of open file handles on Windows. {pull}8329[8329] +- Support for Kafka 2.0.0 in kafka output {pull}8399[8399] *Auditbeat* @@ -133,6 +134,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] - Allow TCP helper to support delimiters and graphite module to accept multiple metrics in a single payload. {pull}8278[8278] - Added 'died' PID state to process_system metricset on system module{pull}8275[8275] - Added `ccr` metricset to Elasticsearch module. {pull}8335[8335] +- Support for Kafka 2.0.0 {pull}8399[8399] *Packetbeat* diff --git a/libbeat/outputs/kafka/version.go b/libbeat/common/kafka/version.go similarity index 75% rename from libbeat/outputs/kafka/version.go rename to libbeat/common/kafka/version.go index 884e67616a20..6cc8d9cb369a 100644 --- a/libbeat/outputs/kafka/version.go +++ b/libbeat/common/kafka/version.go @@ -17,7 +17,14 @@ package kafka -import "github.com/Shopify/sarama" +import ( + "fmt" + + "github.com/Shopify/sarama" +) + +// Version is a kafka version +type Version string // TODO: remove me. // Compat version overwrite for missing versions in sarama @@ -31,8 +38,6 @@ var ( v1_1_1 = parseKafkaVersion("1.1.1") kafkaVersions = map[string]sarama.KafkaVersion{ - "": sarama.V1_0_0_0, - "0.8.2.0": sarama.V0_8_2_0, "0.8.2.1": sarama.V0_8_2_1, "0.8.2.2": sarama.V0_8_2_2, @@ -68,6 +73,10 @@ var ( "1.1.1": v1_1_1, "1.1": v1_1_1, "1": v1_1_1, + + "2.0.0": sarama.V2_0_0_0, + "2.0": sarama.V2_0_0_0, + "2": sarama.V2_0_0_0, } ) @@ -78,3 +87,29 @@ func parseKafkaVersion(s string) sarama.KafkaVersion { } return v } + +// Validate that a kafka version is among the possible options +func (v *Version) Validate() error { + if _, ok := kafkaVersions[string(*v)]; !ok { + return fmt.Errorf("unknown/unsupported kafka vesion '%v'", *v) + } + + return nil +} + +// Unpack a kafka version +func (v *Version) Unpack(s string) error { + tmp := Version(s) + if err := tmp.Validate(); err != nil { + return err + } + + *v = tmp + return nil +} + +// Get a sarama kafka version +func (v Version) Get() (sarama.KafkaVersion, bool) { + kv, ok := kafkaVersions[string(v)] + return kv, ok +} diff --git a/libbeat/docs/outputconfig.asciidoc b/libbeat/docs/outputconfig.asciidoc index 735a5cecb005..5155775dc4a3 100644 --- a/libbeat/docs/outputconfig.asciidoc +++ b/libbeat/docs/outputconfig.asciidoc @@ -674,7 +674,7 @@ NOTE: Events bigger than <> will be ==== Compatibility -This output works with all Kafka in between 0.11 and 1.1.1. Older versions +This output works with all Kafka versions in between 0.11 and 2.0.0. Older versions might work as well, but are not supported. ==== Configuration options diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 956a8bd8cb4b..9cf81d342611 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/fmtstr" + "github.com/elastic/beats/libbeat/common/kafka" "github.com/elastic/beats/libbeat/common/transport/tlscommon" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" @@ -48,7 +49,7 @@ type kafkaConfig struct { BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"` Compression string `config:"compression"` CompressionLevel int `config:"compression_level"` - Version string `config:"version"` + Version kafka.Version `config:"version"` BulkMaxSize int `config:"bulk_max_size"` MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` ClientID string `config:"client_id"` @@ -96,7 +97,7 @@ func defaultConfig() kafkaConfig { BrokerTimeout: 10 * time.Second, Compression: "gzip", CompressionLevel: 4, - Version: "1.0.0", + Version: kafka.Version("1.0.0"), MaxRetries: 3, ClientID: "beats", ChanBufferSize: 256, @@ -114,8 +115,8 @@ func (c *kafkaConfig) Validate() error { return fmt.Errorf("compression mode '%v' unknown", c.Compression) } - if _, ok := kafkaVersions[c.Version]; !ok { - return fmt.Errorf("unknown/unsupported kafka version '%v'", c.Version) + if err := c.Version.Validate(); err != nil { + return err } if c.Username != "" && c.Password == "" { @@ -200,7 +201,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) { // configure client ID k.ClientID = config.ClientID - version, ok := kafkaVersions[config.Version] + version, ok := config.Version.Get() if !ok { return nil, fmt.Errorf("Unknown/unsupported kafka version: %v", config.Version) } diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index 039d8b8000ce..d9f9e86ac18c 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -33,13 +33,6 @@ import ( "github.com/elastic/beats/libbeat/outputs/outil" ) -type kafka struct { - config kafkaConfig - topic outil.Selector - - partitioner sarama.PartitionerConstructor -} - const ( defaultWaitRetry = 1 * time.Second diff --git a/metricbeat/module/kafka/broker.go b/metricbeat/module/kafka/broker.go index dd5a15b47a88..1efff5cd1f2c 100644 --- a/metricbeat/module/kafka/broker.go +++ b/metricbeat/module/kafka/broker.go @@ -29,8 +29,14 @@ import ( "github.com/Shopify/sarama" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/kafka" ) +// Version returns a kafka version from its string representation +func Version(version string) kafka.Version { + return kafka.Version(version) +} + // Broker provides functionality for communicating with a single kafka broker type Broker struct { broker *sarama.Broker @@ -50,7 +56,7 @@ type BrokerSettings struct { Backoff time.Duration TLS *tls.Config Username, Password string - Version Version + Version kafka.Version } type GroupDescription struct { @@ -83,7 +89,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker { cfg.Net.SASL.User = user cfg.Net.SASL.Password = settings.Password } - cfg.Version = settings.Version.get() + cfg.Version, _ = settings.Version.Get() return &Broker{ broker: sarama.NewBroker(host), diff --git a/metricbeat/module/kafka/consumergroup/consumergroup.go b/metricbeat/module/kafka/consumergroup/consumergroup.go index 2aec964f2e91..713e27e5df95 100644 --- a/metricbeat/module/kafka/consumergroup/consumergroup.go +++ b/metricbeat/module/kafka/consumergroup/consumergroup.go @@ -86,7 +86,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { Password: config.Password, // consumer groups API requires at least 0.9.0.0 - Version: kafka.Version{String: "0.9.0.0"}, + Version: kafka.Version("0.9.0.0"), } return &MetricSet{ diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index a2318fcfc90a..e9cbbf9154ec 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -83,6 +83,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { TLS: tls, Username: config.Username, Password: config.Password, + Version: kafka.Version("0.8.2.0"), } return &MetricSet{ diff --git a/metricbeat/module/kafka/version.go b/metricbeat/module/kafka/version.go deleted file mode 100644 index 01ba381a962e..000000000000 --- a/metricbeat/module/kafka/version.go +++ /dev/null @@ -1,79 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package kafka - -import ( - "fmt" - - "github.com/Shopify/sarama" -) - -type Version struct { - String string -} - -var ( - minVersion = sarama.V0_8_2_0 - - kafkaVersions = map[string]sarama.KafkaVersion{ - "": sarama.V0_8_2_0, - - "0.8.2.0": sarama.V0_8_2_0, - "0.8.2.1": sarama.V0_8_2_1, - "0.8.2.2": sarama.V0_8_2_2, - "0.8.2": sarama.V0_8_2_2, - "0.8": sarama.V0_8_2_2, - - "0.9.0.0": sarama.V0_9_0_0, - "0.9.0.1": sarama.V0_9_0_1, - "0.9.0": sarama.V0_9_0_1, - "0.9": sarama.V0_9_0_1, - - "0.10.0.0": sarama.V0_10_0_0, - "0.10.0.1": sarama.V0_10_0_1, - "0.10.0": sarama.V0_10_0_1, - "0.10.1.0": sarama.V0_10_1_0, - "0.10.1": sarama.V0_10_1_0, - "0.10": sarama.V0_10_1_0, - } -) - -func (v *Version) Validate() error { - if _, ok := kafkaVersions[v.String]; !ok { - return fmt.Errorf("unknown/unsupported kafka vesion '%v'", v.String) - } - return nil -} - -func (v *Version) Unpack(s string) error { - tmp := Version{s} - if err := tmp.Validate(); err != nil { - return err - } - - *v = tmp - return nil -} - -func (v *Version) get() sarama.KafkaVersion { - if v, ok := kafkaVersions[v.String]; ok { - return v - } - - return minVersion -} diff --git a/metricbeat/tests/system/test_kafka.py b/metricbeat/tests/system/test_kafka.py index de38d53d0e70..a7ba2d26e288 100644 --- a/metricbeat/tests/system/test_kafka.py +++ b/metricbeat/tests/system/test_kafka.py @@ -7,6 +7,7 @@ class KafkaTest(metricbeat.BaseTest): COMPOSE_SERVICES = ['kafka'] + VERSION = "2.0.0" @unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test") def test_partition(self): @@ -20,7 +21,8 @@ def test_partition(self): "name": "kafka", "metricsets": ["partition"], "hosts": self.get_hosts(), - "period": "1s" + "period": "1s", + "version": self.VERSION, }]) proc = self.start_beat() self.wait_until(lambda: self.output_lines() > 0, max_timeout=20) @@ -47,7 +49,9 @@ def get_hosts(self): class Kafka_1_1_0_Test(KafkaTest): COMPOSE_SERVICES = ['kafka_1_1_0'] + VERSION = "1.1.0" class Kafka_0_10_2_Test(KafkaTest): COMPOSE_SERVICES = ['kafka_0_10_2'] + VERSION = "0.10.2" diff --git a/testing/environments/docker/kafka/Dockerfile b/testing/environments/docker/kafka/Dockerfile index cda937e25873..581513efcbda 100644 --- a/testing/environments/docker/kafka/Dockerfile +++ b/testing/environments/docker/kafka/Dockerfile @@ -4,7 +4,7 @@ ENV KAFKA_HOME /kafka # The advertised host is kafka. This means it will not work if container is started locally and connected from localhost to it ENV KAFKA_ADVERTISED_HOST kafka ENV KAFKA_LOGS_DIR="/kafka-logs" -ENV KAFKA_VERSION 1.1.1 +ENV KAFKA_VERSION 2.0.0 ENV _JAVA_OPTIONS "-Djava.net.preferIPv4Stack=true" ENV TERM=linux