Skip to content

Commit

Permalink
Cherry-pick #7992, #8330 and #8399 to 6.x: Support for kafka 2.0.0 (#…
Browse files Browse the repository at this point in the history
…8467)

Test metricbeat kafka module with kafka 2.0.0
Upgrade sarama client to 1.18.0.
Add support to kafka 2.0.0 in kafka output and metricbeat module.
Merge kafka versioning helpers of output and metricbeat module.
Set version in kafka module configuration of metricbeat system tests

(cherry picked from commits 7635731, 9749f3d and 1bfd445)
  • Loading branch information
jsoriano authored Oct 1, 2018
1 parent 555cd90 commit 48ecd0f
Show file tree
Hide file tree
Showing 35 changed files with 870 additions and 517 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]
- Make kubernetes autodiscover ignore events with empty container IDs {pull}7971[7971]
- Implement CheckConfig in RunnerFactory to make autodiscover check configs {pull}7961[7961]
- Add DNS processor with support for performing reverse lookups on IP addresses. {issue}7770[7770]
- Support for Kafka 2.0.0 in kafka output {pull}8399[8399]

*Auditbeat*

Expand All @@ -113,6 +114,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]
- Added 'died' PID state to process_system metricset on system module {pull}8275[8275]
- Add `metrics` metricset to MongoDB module. {pull}7611[7611]
- Added `ccr` metricset to Elasticsearch module. {pull}8335[8335]
- Support for Kafka 2.0.0 {pull}8399[8399]
- Added support for query params in configuration {issue}8286[8286] {pull}8292[8292]

*Packetbeat*
Expand Down
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2175,8 +2175,8 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
--------------------------------------------------------------------
Dependency: github.com/Shopify/sarama
Version: v1.17.0/enh/offset-replica-id
Revision: d1575e4abe04acbbe8ac766320585cdf271dd189
Version: =v1.18.0/enh/offset-replica-id
Revision: 0143592836b090a1b481def4d902cfb3c5c05ae5
License type (autodetected): MIT
./vendor/github.com/Shopify/sarama/LICENSE:
--------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@

package kafka

import "github.com/Shopify/sarama"
import (
"fmt"

"github.com/Shopify/sarama"
)

// Version is a kafka version
type Version string

// TODO: remove me.
// Compat version overwrite for missing versions in sarama
Expand All @@ -31,8 +38,6 @@ var (
v1_1_1 = parseKafkaVersion("1.1.1")

kafkaVersions = map[string]sarama.KafkaVersion{
"": sarama.V1_0_0_0,

"0.8.2.0": sarama.V0_8_2_0,
"0.8.2.1": sarama.V0_8_2_1,
"0.8.2.2": sarama.V0_8_2_2,
Expand Down Expand Up @@ -68,6 +73,10 @@ var (
"1.1.1": v1_1_1,
"1.1": v1_1_1,
"1": v1_1_1,

"2.0.0": sarama.V2_0_0_0,
"2.0": sarama.V2_0_0_0,
"2": sarama.V2_0_0_0,
}
)

Expand All @@ -78,3 +87,29 @@ func parseKafkaVersion(s string) sarama.KafkaVersion {
}
return v
}

// Validate that a kafka version is among the possible options
func (v *Version) Validate() error {
if _, ok := kafkaVersions[string(*v)]; !ok {
return fmt.Errorf("unknown/unsupported kafka vesion '%v'", *v)
}

return nil
}

// Unpack a kafka version
func (v *Version) Unpack(s string) error {
tmp := Version(s)
if err := tmp.Validate(); err != nil {
return err
}

*v = tmp
return nil
}

// Get a sarama kafka version
func (v Version) Get() (sarama.KafkaVersion, bool) {
kv, ok := kafkaVersions[string(v)]
return kv, ok
}
2 changes: 1 addition & 1 deletion libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ NOTE: Events bigger than <<kafka-max_message_bytes,`max_message_bytes`>> will be

==== Compatibility

This output works with all Kafka in between 0.11 and 1.1.1. Older versions
This output works with all Kafka versions in between 0.11 and 2.0.0. Older versions
might work as well, but are not supported.

==== Configuration options
Expand Down
11 changes: 6 additions & 5 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/common/kafka"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
Expand All @@ -48,7 +49,7 @@ type kafkaConfig struct {
BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"`
Compression string `config:"compression"`
CompressionLevel int `config:"compression_level"`
Version string `config:"version"`
Version kafka.Version `config:"version"`
BulkMaxSize int `config:"bulk_max_size"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
ClientID string `config:"client_id"`
Expand Down Expand Up @@ -96,7 +97,7 @@ func defaultConfig() kafkaConfig {
BrokerTimeout: 10 * time.Second,
Compression: "gzip",
CompressionLevel: 4,
Version: "1.0.0",
Version: kafka.Version("1.0.0"),
MaxRetries: 3,
ClientID: "beats",
ChanBufferSize: 256,
Expand All @@ -114,8 +115,8 @@ func (c *kafkaConfig) Validate() error {
return fmt.Errorf("compression mode '%v' unknown", c.Compression)
}

if _, ok := kafkaVersions[c.Version]; !ok {
return fmt.Errorf("unknown/unsupported kafka version '%v'", c.Version)
if err := c.Version.Validate(); err != nil {
return err
}

if c.Username != "" && c.Password == "" {
Expand Down Expand Up @@ -200,7 +201,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) {
// configure client ID
k.ClientID = config.ClientID

version, ok := kafkaVersions[config.Version]
version, ok := config.Version.Get()
if !ok {
return nil, fmt.Errorf("Unknown/unsupported kafka version: %v", config.Version)
}
Expand Down
7 changes: 0 additions & 7 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ import (
"github.com/elastic/beats/libbeat/outputs/outil"
)

type kafka struct {
config kafkaConfig
topic outil.Selector

partitioner sarama.PartitionerConstructor
}

const (
defaultWaitRetry = 1 * time.Second

Expand Down
12 changes: 10 additions & 2 deletions metricbeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,20 @@ services:
kafka:
build:
context: ./module/kafka/_meta
dockerfile: Dockerfile.1.1.0
args:
KAFKA_VERSION: 2.0.0

kafka_1_1_0:
build:
context: ./module/kafka/_meta
args:
KAFKA_VERSION: 1.1.0

kafka_0_10_2:
build:
context: ./module/kafka/_meta
dockerfile: Dockerfile.0.10.2
args:
KAFKA_VERSION: 0.10.2.1

kibana:
build: ./module/kibana/_meta
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/docs/modules/kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The default metricsets are `consumergroup` and `partition`.
[float]
=== Compability

This module is tested with Kafka 0.10.2 and 1.1.0.
This module is tested with Kafka 0.10.2.1, 1.1.0 and 2.0.0.


[float]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
FROM debian:stretch

ARG KAFKA_VERSION=2.0.0

ENV KAFKA_HOME /kafka
# The advertised host is kafka. This means it will not work if container is started locally and connected from localhost to it
ENV KAFKA_LOGS_DIR="/kafka-logs"
ENV KAFKA_VERSION 1.1.0
ENV _JAVA_OPTIONS "-Djava.net.preferIPv4Stack=true"
ENV TERM=linux

Expand Down
25 changes: 0 additions & 25 deletions metricbeat/module/kafka/_meta/Dockerfile.0.10.2

This file was deleted.

2 changes: 1 addition & 1 deletion metricbeat/module/kafka/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ The default metricsets are `consumergroup` and `partition`.
[float]
=== Compability

This module is tested with Kafka 0.10.2 and 1.1.0.
This module is tested with Kafka 0.10.2.1, 1.1.0 and 2.0.0.
10 changes: 8 additions & 2 deletions metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@ import (
"github.com/Shopify/sarama"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/kafka"
)

// Version returns a kafka version from its string representation
func Version(version string) kafka.Version {
return kafka.Version(version)
}

// Broker provides functionality for communicating with a single kafka broker
type Broker struct {
broker *sarama.Broker
Expand All @@ -50,7 +56,7 @@ type BrokerSettings struct {
Backoff time.Duration
TLS *tls.Config
Username, Password string
Version Version
Version kafka.Version
}

type GroupDescription struct {
Expand Down Expand Up @@ -83,7 +89,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker {
cfg.Net.SASL.User = user
cfg.Net.SASL.Password = settings.Password
}
cfg.Version = settings.Version.get()
cfg.Version, _ = settings.Version.Get()

return &Broker{
broker: sarama.NewBroker(host),
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kafka/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
Password: config.Password,

// consumer groups API requires at least 0.9.0.0
Version: kafka.Version{String: "0.9.0.0"},
Version: kafka.Version("0.9.0.0"),
}

return &MetricSet{
Expand Down
1 change: 1 addition & 0 deletions metricbeat/module/kafka/partition/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
TLS: tls,
Username: config.Username,
Password: config.Password,
Version: kafka.Version("0.8.2.0"),
}

return &MetricSet{
Expand Down
79 changes: 0 additions & 79 deletions metricbeat/module/kafka/version.go

This file was deleted.

10 changes: 9 additions & 1 deletion metricbeat/tests/system/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

class KafkaTest(metricbeat.BaseTest):
COMPOSE_SERVICES = ['kafka']
VERSION = "2.0.0"

@unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test")
def test_partition(self):
Expand All @@ -20,7 +21,8 @@ def test_partition(self):
"name": "kafka",
"metricsets": ["partition"],
"hosts": self.get_hosts(),
"period": "1s"
"period": "1s",
"version": self.VERSION,
}])
proc = self.start_beat()
self.wait_until(lambda: self.output_lines() > 0, max_timeout=20)
Expand All @@ -45,5 +47,11 @@ def get_hosts(self):
os.getenv('KAFKA_PORT', '9092')]


class Kafka_1_1_0_Test(KafkaTest):
COMPOSE_SERVICES = ['kafka_1_1_0']
VERSION = "1.1.0"


class Kafka_0_10_2_Test(KafkaTest):
COMPOSE_SERVICES = ['kafka_0_10_2']
VERSION = "0.10.2"
Loading

0 comments on commit 48ecd0f

Please sign in to comment.