From c0dbd6a2301b5c1725ff7200f54d5d4f35672419 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 26 Jun 2019 15:20:06 -0400 Subject: [PATCH 01/50] Add kafka input to filebeat import list --- filebeat/include/list.go | 1 + 1 file changed, 1 insertion(+) diff --git a/filebeat/include/list.go b/filebeat/include/list.go index 6876c7a1a62..44996dead8f 100644 --- a/filebeat/include/list.go +++ b/filebeat/include/list.go @@ -23,6 +23,7 @@ import ( // Import packages that need to register themselves. _ "github.com/elastic/beats/filebeat/input/container" _ "github.com/elastic/beats/filebeat/input/docker" + _ "github.com/elastic/beats/filebeat/input/kafka" _ "github.com/elastic/beats/filebeat/input/log" _ "github.com/elastic/beats/filebeat/input/redis" _ "github.com/elastic/beats/filebeat/input/stdin" From 878a9e7e2d98db54eaafb5a0e7ea1a8b13f4446c Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 8 Jul 2019 15:54:27 -0400 Subject: [PATCH 02/50] Initial skeleton of filebeat kafka input --- filebeat/input/kafka/config.go | 73 ++++ filebeat/input/kafka/input.go | 183 +++++++++ .../input/kafka/kafka_integration_test.go | 348 ++++++++++++++++++ 3 files changed, 604 insertions(+) create mode 100644 filebeat/input/kafka/config.go create mode 100644 filebeat/input/kafka/input.go create mode 100644 filebeat/input/kafka/kafka_integration_test.go diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go new file mode 100644 index 00000000000..467a0b23ac3 --- /dev/null +++ b/filebeat/input/kafka/config.go @@ -0,0 +1,73 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package kafka + +import ( + "fmt" + + "github.com/Shopify/sarama" + "github.com/elastic/beats/libbeat/common/kafka" + "github.com/elastic/beats/libbeat/logp" +) + +var defaultConfig = kafkaInputConfig{ + Version: kafka.Version("1.0.0"), + GroupID: "FilebeatGroup", +} + +type kafkaInputConfig struct { + // Kafka hosts with port, e.g. "localhost:9092" + Hosts []string `config:"hosts" validate:"required"` + Topics []string `config:"topics" validate:"required"` + Version kafka.Version `config:"version"` + GroupID string `config:"group_id"` +} + +// Validate validates the config. +func (c *kafkaInputConfig) Validate() error { + + return nil +} + +func stringInSlice(str string, list []string) bool { + for _, v := range list { + if v == str { + return true + } + } + return false +} + +func newSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) { + k := sarama.NewConfig() + + version, ok := config.Version.Get() + if !ok { + return nil, fmt.Errorf("Unknown/unsupported kafka version: %v", config.Version) + } + k.Version = version + + k.Consumer.Return.Errors = true + k.Consumer.Offsets.Initial = sarama.OffsetOldest + + if err := k.Validate(); err != nil { + logp.Err("Invalid kafka configuration: %v", err) + return nil, err + } + return k, nil +} diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go new file mode 100644 index 00000000000..7fa2677633b --- /dev/null +++ b/filebeat/input/kafka/input.go @@ -0,0 +1,183 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package kafka + +import ( + "context" + "fmt" + "time" + + "github.com/Shopify/sarama" + "github.com/elastic/beats/filebeat/channel" + "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/util" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + + "github.com/pkg/errors" +) + +func init() { + err := input.Register("kafka", NewInput) + if err != nil { + panic(err) + } +} + +// Input contains the input and its config +type Input struct { + config kafkaInputConfig + rawConfig *common.Config // The Config given to NewInput + started bool + outlet channel.Outleter + consumerGroup sarama.ConsumerGroup + goContext context.Context +} + +// NewInput creates a new kafka input +func NewInput( + cfg *common.Config, + outletFactory channel.Connector, + inputContext input.Context, +) (input.Input, error) { + + out, err := outletFactory(cfg, inputContext.DynamicFields) + if err != nil { + return nil, err + } + + //forwarder := harvester.NewForwarder(out) + + config := defaultConfig + if err := cfg.Unpack(&config); err != nil { + return nil, errors.Wrap(err, "reading kafka input config") + } + + saramaConfig, err := newSaramaConfig(config) + if err != nil { + return nil, errors.Wrap(err, "initializing Sarama config") + } + consumerGroup, err := + sarama.NewConsumerGroup(config.Hosts, config.GroupID, saramaConfig) + if err != nil { + return nil, errors.Wrap(err, "initializing kafka consumer group") + } + + // Sarama uses standard go contexts to control cancellation, so we need to + // wrap our input context channel in that interface. + goContext, cancel := context.WithCancel(context.Background()) + go func() { + select { + case <-inputContext.Done: + logp.Info("Closing kafka context because input stopped.") + cancel() + return + } + }() + + input := &Input{ + config: config, + rawConfig: cfg, + started: false, + outlet: out, + consumerGroup: consumerGroup, + goContext: goContext, + } + + return input, nil +} + +func (p *Input) newConsumerGroup() (sarama.ConsumerGroup, error) { + consumerGroup, err := + sarama.NewConsumerGroup(p.config.Hosts, p.config.GroupID, nil) + return consumerGroup, err +} + +// Run starts the input by scanning for incoming messages and errors. +func (p *Input) Run() { + if !p.started { + // Track errors + go func() { + for err := range p.consumerGroup.Errors() { + // TODO: handle + fmt.Println("ERROR", err) + } + }() + + go func() { + for { + handler := groupHandler{input: p} + + err := p.consumerGroup.Consume(p.goContext, p.config.Topics, handler) + if err != nil { + fmt.Printf("Consume error: %v\n", err) + //panic(err) + // TODO: report error + } + } + }() + p.started = true + } +} + +func (p *Input) Wait() { +} + +func (p *Input) Stop() { +} + +type groupHandler struct { + input *Input +} + +func createEvent( + sess sarama.ConsumerGroupSession, + claim sarama.ConsumerGroupClaim, + message *sarama.ConsumerMessage, +) *util.Data { + data := util.NewData() + data.Event = beat.Event{ + Timestamp: time.Now(), + Fields: common.MapStr{ + "message": string(message.Value), + "kafka": common.MapStr{ + "topic": claim.Topic(), + "partition": claim.Partition(), + }, + // TODO: add more metadata + }, + } + return data +} + +func (groupHandler) Setup(session sarama.ConsumerGroupSession) error { + return nil +} +func (groupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { + return nil +} +func (h groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for msg := range claim.Messages() { + event := createEvent(sess, claim, msg) + fmt.Printf("event: %v\n", event) + h.input.outlet.OnEvent(event) + sess.MarkMessage(msg, "") + } + return nil +} diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go new file mode 100644 index 00000000000..c73e8778ae2 --- /dev/null +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -0,0 +1,348 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package kafka + +import ( + "encoding/json" + "fmt" + "math/rand" + "os" + "strconv" + "sync" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/filebeat/channel" + "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/util" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/fmtstr" + _ "github.com/elastic/beats/libbeat/outputs/codec/format" + _ "github.com/elastic/beats/libbeat/outputs/codec/json" +) + +const ( + kafkaDefaultHost = "kafka" + kafkaDefaultPort = "9092" +) + +type eventInfo struct { + events []beat.Event +} + +type eventCapturer struct { + closed bool + c chan struct{} + closeOnce sync.Once + events chan *util.Data +} + +func NewEventCapturer(events chan *util.Data) channel.Outleter { + return &eventCapturer{ + c: make(chan struct{}), + events: events, + } +} + +func (o *eventCapturer) OnEvent(event *util.Data) bool { + o.events <- event + return true +} + +func (o *eventCapturer) Close() error { + o.closeOnce.Do(func() { + o.closed = true + close(o.c) + }) + return nil +} + +func (o *eventCapturer) Done() <-chan struct{} { + return o.c +} + +func TestInput(t *testing.T) { + id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int()) + testTopic := fmt.Sprintf("Filebeat-TestInput-%s", id) + context := input.Context{ + Done: make(chan struct{}), + BeatDone: make(chan struct{}), + } + + // Send test messages to the topic for the input to read. + messageStrs := []string{"testing", "stuff", "blah"} + for _, s := range messageStrs { + writeToKafkaTopic(t, testTopic, s, time.Second*20) + } + + // Setup the input config + config, _ := common.NewConfigFrom(common.MapStr{ + "hosts": "kafka:9092", + "topics": []string{testTopic}, + }) + + // Route input events through our capturer instead of sending through ES. + events := make(chan *util.Data, 100) + defer close(events) + capturer := NewEventCapturer(events) + defer capturer.Close() + connector := func(*common.Config, *common.MapStrPointer) (channel.Outleter, error) { + return channel.SubOutlet(capturer), nil + } + + input, err := NewInput(config, connector, context) + if err != nil { + t.Fatal(err) + } + + // Run the input and wait for finalization + input.Run() + + timeout := time.After(30 * time.Second) + done := make(chan struct{}) + for _, m := range messageStrs { + select { + case event := <-events: + result, err := event.GetEvent().Fields.GetValue("message") + if err != nil { + t.Fatal(err) + } + assert.Equal(t, result, m) + if state := event.GetState(); state.Finished { + //assert.Equal(t, len(logs), int(state.Offset), "file has not been fully read") + go func() { + //closer(context, input.(*Input)) + close(done) + }() + } + case <-done: + return + case <-timeout: + t.Fatal("timeout waiting for closed state") + } + } +} + +func validateJSON(t *testing.T, value []byte, event beat.Event) { + var decoded map[string]interface{} + err := json.Unmarshal(value, &decoded) + if err != nil { + t.Errorf("can not json decode event value: %v", value) + return + } + assert.Equal(t, decoded["type"], event.Fields["type"]) + assert.Equal(t, decoded["message"], event.Fields["message"]) +} + +func makeValidateFmtStr(fmt string) func(*testing.T, []byte, beat.Event) { + fmtString := fmtstr.MustCompileEvent(fmt) + return func(t *testing.T, value []byte, event beat.Event) { + expectedMessage, err := fmtString.Run(&event) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, string(expectedMessage), string(value)) + } +} + +func strDefault(a, defaults string) string { + if len(a) == 0 { + return defaults + } + return a +} + +func getenv(name, defaultValue string) string { + return strDefault(os.Getenv(name), defaultValue) +} + +func getTestKafkaHost() string { + return fmt.Sprintf("%v:%v", + getenv("KAFKA_HOST", kafkaDefaultHost), + getenv("KAFKA_PORT", kafkaDefaultPort), + ) +} + +func writeToKafkaTopic( + t *testing.T, topic string, message string, timeout time.Duration, +) { + config := sarama.NewConfig() + config.Producer.RequiredAcks = sarama.WaitForAll + config.Producer.Return.Successes = true + config.Producer.Partitioner = sarama.NewHashPartitioner + + hosts := []string{getTestKafkaHost()} + producer, err := sarama.NewSyncProducer(hosts, config) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := producer.Close(); err != nil { + t.Fatal(err) + } + }() + + msg := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.StringEncoder(message), + } + + _, _, err = producer.SendMessage(msg) + if err != nil { + t.Fatal(err) + } +} + +func makeConfig(t *testing.T, in map[string]interface{}) *common.Config { + cfg, err := common.NewConfigFrom(in) + if err != nil { + t.Fatal(err) + } + return cfg +} + +func newTestConsumer(t *testing.T) sarama.Consumer { + hosts := []string{getTestKafkaHost()} + consumer, err := sarama.NewConsumer(hosts, nil) + if err != nil { + t.Fatal(err) + } + return consumer +} + +var testTopicOffsets = map[string]int64{} + +func testReadFromKafkaTopic( + t *testing.T, topic string, nMessages int, + timeout time.Duration, +) []*sarama.ConsumerMessage { + consumer := newTestConsumer(t) + defer func() { + consumer.Close() + }() + + offset, found := testTopicOffsets[topic] + if !found { + offset = sarama.OffsetOldest + } + + partitionConsumer, err := consumer.ConsumePartition(topic, 0, offset) + if err != nil { + t.Fatal(err) + } + defer func() { + partitionConsumer.Close() + }() + + timer := time.After(timeout) + var messages []*sarama.ConsumerMessage + for i := 0; i < nMessages; i++ { + select { + case msg := <-partitionConsumer.Messages(): + messages = append(messages, msg) + testTopicOffsets[topic] = msg.Offset + 1 + case <-timer: + break + } + } + + return messages +} + +func flatten(infos []eventInfo) []beat.Event { + var out []beat.Event + for _, info := range infos { + out = append(out, info.events...) + } + return out +} + +func single(fields common.MapStr) []eventInfo { + return []eventInfo{ + { + events: []beat.Event{ + {Timestamp: time.Now(), Fields: fields}, + }, + }, + } +} + +func randMulti(batches, n int, event common.MapStr) []eventInfo { + var out []eventInfo + for i := 0; i < batches; i++ { + var data []beat.Event + for j := 0; j < n; j++ { + tmp := common.MapStr{} + for k, v := range event { + tmp[k] = v + } + //tmp["message"] = randString(100) + data = append(data, beat.Event{Timestamp: time.Now(), Fields: tmp}) + } + + out = append(out, eventInfo{data}) + } + return out +} + +func setupInput(t *testing.T, context input.Context, closer func(input.Context, *Input)) { + // Setup the input + config, _ := common.NewConfigFrom(common.MapStr{ + "host": "localhost:9092", + }) + + events := make(chan *util.Data, 100) + defer close(events) + capturer := NewEventCapturer(events) + defer capturer.Close() + connector := func(*common.Config, *common.MapStrPointer) (channel.Outleter, error) { + return channel.SubOutlet(capturer), nil + } + + input, err := NewInput(config, connector, context) + if err != nil { + t.Error(err) + return + } + + // Run the input and wait for finalization + input.Run() + + timeout := time.After(30 * time.Second) + done := make(chan struct{}) + for { + select { + case event := <-events: + if state := event.GetState(); state.Finished { + //assert.Equal(t, len(logs), int(state.Offset), "file has not been fully read") + go func() { + closer(context, input.(*Input)) + close(done) + }() + } + case <-done: + return + case <-timeout: + t.Fatal("timeout waiting for closed state") + } + } +} From 5f6c0d95fb4b8b6ff52e000db740193130fe73ab Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 10 Jul 2019 11:08:35 -0400 Subject: [PATCH 03/50] Cleanup --- filebeat/input/kafka/input.go | 2 + .../input/kafka/kafka_integration_test.go | 147 +----------------- 2 files changed, 3 insertions(+), 146 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 7fa2677633b..9230e7d8f91 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -159,6 +159,8 @@ func createEvent( "kafka": common.MapStr{ "topic": claim.Topic(), "partition": claim.Partition(), + "offset": message.Offset, + //message.Timestamp }, // TODO: add more metadata }, diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index c73e8778ae2..1e4299ae68e 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -118,7 +118,6 @@ func TestInput(t *testing.T) { input.Run() timeout := time.After(30 * time.Second) - done := make(chan struct{}) for _, m := range messageStrs { select { case event := <-events: @@ -127,17 +126,8 @@ func TestInput(t *testing.T) { t.Fatal(err) } assert.Equal(t, result, m) - if state := event.GetState(); state.Finished { - //assert.Equal(t, len(logs), int(state.Offset), "file has not been fully read") - go func() { - //closer(context, input.(*Input)) - close(done) - }() - } - case <-done: - return case <-timeout: - t.Fatal("timeout waiting for closed state") + t.Fatal("timeout waiting for incoming events") } } } @@ -211,138 +201,3 @@ func writeToKafkaTopic( t.Fatal(err) } } - -func makeConfig(t *testing.T, in map[string]interface{}) *common.Config { - cfg, err := common.NewConfigFrom(in) - if err != nil { - t.Fatal(err) - } - return cfg -} - -func newTestConsumer(t *testing.T) sarama.Consumer { - hosts := []string{getTestKafkaHost()} - consumer, err := sarama.NewConsumer(hosts, nil) - if err != nil { - t.Fatal(err) - } - return consumer -} - -var testTopicOffsets = map[string]int64{} - -func testReadFromKafkaTopic( - t *testing.T, topic string, nMessages int, - timeout time.Duration, -) []*sarama.ConsumerMessage { - consumer := newTestConsumer(t) - defer func() { - consumer.Close() - }() - - offset, found := testTopicOffsets[topic] - if !found { - offset = sarama.OffsetOldest - } - - partitionConsumer, err := consumer.ConsumePartition(topic, 0, offset) - if err != nil { - t.Fatal(err) - } - defer func() { - partitionConsumer.Close() - }() - - timer := time.After(timeout) - var messages []*sarama.ConsumerMessage - for i := 0; i < nMessages; i++ { - select { - case msg := <-partitionConsumer.Messages(): - messages = append(messages, msg) - testTopicOffsets[topic] = msg.Offset + 1 - case <-timer: - break - } - } - - return messages -} - -func flatten(infos []eventInfo) []beat.Event { - var out []beat.Event - for _, info := range infos { - out = append(out, info.events...) - } - return out -} - -func single(fields common.MapStr) []eventInfo { - return []eventInfo{ - { - events: []beat.Event{ - {Timestamp: time.Now(), Fields: fields}, - }, - }, - } -} - -func randMulti(batches, n int, event common.MapStr) []eventInfo { - var out []eventInfo - for i := 0; i < batches; i++ { - var data []beat.Event - for j := 0; j < n; j++ { - tmp := common.MapStr{} - for k, v := range event { - tmp[k] = v - } - //tmp["message"] = randString(100) - data = append(data, beat.Event{Timestamp: time.Now(), Fields: tmp}) - } - - out = append(out, eventInfo{data}) - } - return out -} - -func setupInput(t *testing.T, context input.Context, closer func(input.Context, *Input)) { - // Setup the input - config, _ := common.NewConfigFrom(common.MapStr{ - "host": "localhost:9092", - }) - - events := make(chan *util.Data, 100) - defer close(events) - capturer := NewEventCapturer(events) - defer capturer.Close() - connector := func(*common.Config, *common.MapStrPointer) (channel.Outleter, error) { - return channel.SubOutlet(capturer), nil - } - - input, err := NewInput(config, connector, context) - if err != nil { - t.Error(err) - return - } - - // Run the input and wait for finalization - input.Run() - - timeout := time.After(30 * time.Second) - done := make(chan struct{}) - for { - select { - case event := <-events: - if state := event.GetState(); state.Finished { - //assert.Equal(t, len(logs), int(state.Offset), "file has not been fully read") - go func() { - closer(context, input.(*Input)) - close(done) - }() - } - case <-done: - return - case <-timeout: - t.Fatal("timeout waiting for closed state") - } - } -} From 1c83d553fa02c5c8ead2ce4625c8b49ec5dcb6ff Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 10 Jul 2019 11:54:54 -0400 Subject: [PATCH 04/50] Turn on Wait() and Stop() --- filebeat/input/kafka/input.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 9230e7d8f91..b988e2268da 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -47,7 +47,8 @@ type Input struct { started bool outlet channel.Outleter consumerGroup sarama.ConsumerGroup - goContext context.Context + kafkaContext context.Context + kafkaCancel context.CancelFunc // The CancelFunc for kafkaContext } // NewInput creates a new kafka input @@ -81,12 +82,12 @@ func NewInput( // Sarama uses standard go contexts to control cancellation, so we need to // wrap our input context channel in that interface. - goContext, cancel := context.WithCancel(context.Background()) + kafkaContext, kafkaCancel := context.WithCancel(context.Background()) go func() { select { case <-inputContext.Done: logp.Info("Closing kafka context because input stopped.") - cancel() + kafkaCancel() return } }() @@ -97,7 +98,8 @@ func NewInput( started: false, outlet: out, consumerGroup: consumerGroup, - goContext: goContext, + kafkaContext: kafkaContext, + kafkaCancel: kafkaCancel, } return input, nil @@ -124,7 +126,7 @@ func (p *Input) Run() { for { handler := groupHandler{input: p} - err := p.consumerGroup.Consume(p.goContext, p.config.Topics, handler) + err := p.consumerGroup.Consume(p.kafkaContext, p.config.Topics, handler) if err != nil { fmt.Printf("Consume error: %v\n", err) //panic(err) @@ -136,10 +138,14 @@ func (p *Input) Run() { } } +// Wait shuts down the Input by cancelling the internal context. func (p *Input) Wait() { + p.Stop() } +// Stop shuts down the Input by cancelling the internal context. func (p *Input) Stop() { + p.kafkaCancel() } type groupHandler struct { @@ -171,9 +177,11 @@ func createEvent( func (groupHandler) Setup(session sarama.ConsumerGroupSession) error { return nil } + func (groupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } + func (h groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { event := createEvent(sess, claim, msg) From 7298e9cc2b74d42e68772e4d984ecde1358f4583 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 10 Jul 2019 12:47:04 -0400 Subject: [PATCH 05/50] add InitialOffset configuration parameter --- filebeat/input/kafka/config.go | 53 +++++++++++++++---- .../input/kafka/kafka_integration_test.go | 5 +- 2 files changed, 46 insertions(+), 12 deletions(-) diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index 467a0b23ac3..0ef176a9201 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -25,22 +25,43 @@ import ( "github.com/elastic/beats/libbeat/logp" ) -var defaultConfig = kafkaInputConfig{ - Version: kafka.Version("1.0.0"), - GroupID: "FilebeatGroup", -} +type initialOffset int + +const ( + initialOffsetOldest initialOffset = iota + initialOffsetNewest +) + +var ( + defaultConfig = kafkaInputConfig{ + Version: kafka.Version("1.0.0"), + InitialOffset: initialOffsetOldest, + } + + initialOffsets = map[string]initialOffset{ + "oldest": initialOffsetOldest, + "newest": initialOffsetNewest, + } +) type kafkaInputConfig struct { // Kafka hosts with port, e.g. "localhost:9092" - Hosts []string `config:"hosts" validate:"required"` - Topics []string `config:"topics" validate:"required"` - Version kafka.Version `config:"version"` - GroupID string `config:"group_id"` + Hosts []string `config:"hosts" validate:"required"` + Topics []string `config:"topics" validate:"required"` + GroupID string `config:"group_id" validate:"required"` + Version kafka.Version `config:"version"` + InitialOffset initialOffset `config:"initial_offset"` +} + +func (off initialOffset) asSaramaOffset() int64 { + return map[initialOffset]int64{ + initialOffsetOldest: sarama.OffsetOldest, + initialOffsetNewest: sarama.OffsetNewest, + }[off] } // Validate validates the config. func (c *kafkaInputConfig) Validate() error { - return nil } @@ -63,7 +84,7 @@ func newSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) { k.Version = version k.Consumer.Return.Errors = true - k.Consumer.Offsets.Initial = sarama.OffsetOldest + k.Consumer.Offsets.Initial = config.InitialOffset.asSaramaOffset() if err := k.Validate(); err != nil { logp.Err("Invalid kafka configuration: %v", err) @@ -71,3 +92,15 @@ func newSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) { } return k, nil } + +// Unpack validates and unpack the "initial_offset" config option +func (off *initialOffset) Unpack(value string) error { + initialOffset, ok := initialOffsets[value] + if !ok { + return fmt.Errorf("invalid initialOffset '%s'", value) + } + + *off = initialOffset + + return nil +} diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index 1e4299ae68e..33337c36f26 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -96,8 +96,9 @@ func TestInput(t *testing.T) { // Setup the input config config, _ := common.NewConfigFrom(common.MapStr{ - "hosts": "kafka:9092", - "topics": []string{testTopic}, + "hosts": "kafka:9092", + "topics": []string{testTopic}, + "group_id": "filebeat", }) // Route input events through our capturer instead of sending through ES. From 7345c187890ddf120fe4c87219d298e2f6189a99 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 10 Jul 2019 15:11:30 -0400 Subject: [PATCH 06/50] Document new kafka output fields --- filebeat/docs/fields.asciidoc | 38 +++++++++++++++++++++++++++ filebeat/input/kafka/_meta/fields.yml | 21 +++++++++++++++ 2 files changed, 59 insertions(+) create mode 100644 filebeat/input/kafka/_meta/fields.yml diff --git a/filebeat/docs/fields.asciidoc b/filebeat/docs/fields.asciidoc index 8bb7b5b1ad6..5c19a95c8df 100644 --- a/filebeat/docs/fields.asciidoc +++ b/filebeat/docs/fields.asciidoc @@ -29,6 +29,7 @@ grouped in the following categories: * <> * <> * <> +* <> * <> * <> * <> @@ -6984,6 +6985,43 @@ type: text Message part of the trace. +-- + +[[exported-fields-kafka-input]] +== Kafka Input fields + +Kafka metadata added by the kafka input + + + +*`kafka.topic`*:: ++ +-- +type: keyword + +Kafka topic + + +-- + +*`kafka.partition`*:: ++ +-- +type: long + +Kafka partition number + + +-- + +*`kafka.offset`*:: ++ +-- +type: long + +Kafka offset of this message + + -- [[exported-fields-kibana]] diff --git a/filebeat/input/kafka/_meta/fields.yml b/filebeat/input/kafka/_meta/fields.yml new file mode 100644 index 00000000000..d877776bbad --- /dev/null +++ b/filebeat/input/kafka/_meta/fields.yml @@ -0,0 +1,21 @@ +- key: kafka-input + title: Kafka Input + description: > + Kafka metadata added by the kafka input + short_config: false + anchor: kafka-input + fields: + - name: kafka.topic + type: keyword + description: > + Kafka topic + + - name: kafka.partition + type: long + description: > + Kafka partition number + + - name: kafka.offset + type: long + description: > + Kafka offset of this message From b925014a389cadb03d13b234c0069af5e5e289b7 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 11 Jul 2019 10:57:20 -0400 Subject: [PATCH 07/50] Add username / password and ssl config --- filebeat/input/kafka/config.go | 42 ++++++++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index 0ef176a9201..ae23606ab85 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -18,11 +18,14 @@ package kafka import ( + "errors" "fmt" "github.com/Shopify/sarama" "github.com/elastic/beats/libbeat/common/kafka" + "github.com/elastic/beats/libbeat/common/transport/tlscommon" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs" ) type initialOffset int @@ -46,11 +49,14 @@ var ( type kafkaInputConfig struct { // Kafka hosts with port, e.g. "localhost:9092" - Hosts []string `config:"hosts" validate:"required"` - Topics []string `config:"topics" validate:"required"` - GroupID string `config:"group_id" validate:"required"` - Version kafka.Version `config:"version"` - InitialOffset initialOffset `config:"initial_offset"` + Hosts []string `config:"hosts" validate:"required"` + Topics []string `config:"topics" validate:"required"` + GroupID string `config:"group_id" validate:"required"` + Version kafka.Version `config:"version"` + InitialOffset initialOffset `config:"initial_offset"` + TLS *tlscommon.Config `config:"ssl"` + Username string `config:"username"` + Password string `config:"password"` } func (off initialOffset) asSaramaOffset() int64 { @@ -62,6 +68,17 @@ func (off initialOffset) asSaramaOffset() int64 { // Validate validates the config. func (c *kafkaInputConfig) Validate() error { + if len(c.Hosts) == 0 { + return errors.New("no hosts configured") + } + + if err := c.Version.Validate(); err != nil { + return err + } + + if c.Username != "" && c.Password == "" { + return fmt.Errorf("password must be set when username is configured") + } return nil } @@ -86,6 +103,21 @@ func newSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) { k.Consumer.Return.Errors = true k.Consumer.Offsets.Initial = config.InitialOffset.asSaramaOffset() + tls, err := outputs.LoadTLSConfig(config.TLS) + if err != nil { + return nil, err + } + if tls != nil { + k.Net.TLS.Enable = true + k.Net.TLS.Config = tls.BuildModuleConfig("") + } + + if config.Username != "" { + k.Net.SASL.Enable = true + k.Net.SASL.User = config.Username + k.Net.SASL.Password = config.Password + } + if err := k.Validate(); err != nil { logp.Err("Invalid kafka configuration: %v", err) return nil, err From 1f6ecc3fde6e57f5b8597b5aa526a7183455ff1c Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 11 Jul 2019 11:04:32 -0400 Subject: [PATCH 08/50] Add parameter for client id --- filebeat/input/kafka/config.go | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index ae23606ab85..983d319b9c7 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -39,6 +39,7 @@ var ( defaultConfig = kafkaInputConfig{ Version: kafka.Version("1.0.0"), InitialOffset: initialOffsetOldest, + ClientID: "filebeat", } initialOffsets = map[string]initialOffset{ @@ -52,6 +53,7 @@ type kafkaInputConfig struct { Hosts []string `config:"hosts" validate:"required"` Topics []string `config:"topics" validate:"required"` GroupID string `config:"group_id" validate:"required"` + ClientID string `config:"client_id"` Version kafka.Version `config:"version"` InitialOffset initialOffset `config:"initial_offset"` TLS *tlscommon.Config `config:"ssl"` @@ -59,13 +61,6 @@ type kafkaInputConfig struct { Password string `config:"password"` } -func (off initialOffset) asSaramaOffset() int64 { - return map[initialOffset]int64{ - initialOffsetOldest: sarama.OffsetOldest, - initialOffsetNewest: sarama.OffsetNewest, - }[off] -} - // Validate validates the config. func (c *kafkaInputConfig) Validate() error { if len(c.Hosts) == 0 { @@ -82,15 +77,6 @@ func (c *kafkaInputConfig) Validate() error { return nil } -func stringInSlice(str string, list []string) bool { - for _, v := range list { - if v == str { - return true - } - } - return false -} - func newSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) { k := sarama.NewConfig() @@ -118,6 +104,9 @@ func newSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) { k.Net.SASL.Password = config.Password } + // configure client ID + k.ClientID = config.ClientID + if err := k.Validate(); err != nil { logp.Err("Invalid kafka configuration: %v", err) return nil, err @@ -125,6 +114,15 @@ func newSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) { return k, nil } +// asSaramaOffset converts an initialOffset enum to the corresponding +// sarama offset value. +func (off initialOffset) asSaramaOffset() int64 { + return map[initialOffset]int64{ + initialOffsetOldest: sarama.OffsetOldest, + initialOffsetNewest: sarama.OffsetNewest, + }[off] +} + // Unpack validates and unpack the "initial_offset" config option func (off *initialOffset) Unpack(value string) error { initialOffset, ok := initialOffsets[value] From 207bebec622406fc4e4ce9dc0cef36ec55b5b3d8 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 11 Jul 2019 11:28:01 -0400 Subject: [PATCH 09/50] Add metric registry to sarama reader --- filebeat/input/kafka/config.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index 983d319b9c7..958988a8728 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -25,6 +25,8 @@ import ( "github.com/elastic/beats/libbeat/common/kafka" "github.com/elastic/beats/libbeat/common/transport/tlscommon" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/monitoring" + "github.com/elastic/beats/libbeat/monitoring/adapter" "github.com/elastic/beats/libbeat/outputs" ) @@ -107,6 +109,14 @@ func newSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) { // configure client ID k.ClientID = config.ClientID + k.MetricRegistry = adapter.GetGoMetrics( + monitoring.Default, + "filebeat.inputs.kafka", + adapter.Rename("incoming-byte-rate", "bytes_read"), + adapter.Rename("outgoing-byte-rate", "bytes_write"), + adapter.GoMetricsNilify, + ) + if err := k.Validate(); err != nil { logp.Err("Invalid kafka configuration: %v", err) return nil, err From ec386a01500c99161a1c17df142d7bfd41ec33ca Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 11 Jul 2019 14:49:07 -0400 Subject: [PATCH 10/50] Log kafka errors --- filebeat/input/kafka/input.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index b988e2268da..a3ecca77267 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -49,6 +49,7 @@ type Input struct { consumerGroup sarama.ConsumerGroup kafkaContext context.Context kafkaCancel context.CancelFunc // The CancelFunc for kafkaContext + log *logp.Logger } // NewInput creates a new kafka input @@ -100,6 +101,7 @@ func NewInput( consumerGroup: consumerGroup, kafkaContext: kafkaContext, kafkaCancel: kafkaCancel, + log: logp.NewLogger("kafka input").With("hosts", config.Hosts), } return input, nil @@ -117,8 +119,7 @@ func (p *Input) Run() { // Track errors go func() { for err := range p.consumerGroup.Errors() { - // TODO: handle - fmt.Println("ERROR", err) + p.log.Errorw("Error reading from kafka", "error", err) } }() @@ -128,9 +129,7 @@ func (p *Input) Run() { err := p.consumerGroup.Consume(p.kafkaContext, p.config.Topics, handler) if err != nil { - fmt.Printf("Consume error: %v\n", err) - //panic(err) - // TODO: report error + p.log.Errorw("Kafka consume error", "error", err) } } }() From 0aaa71003e929e6ddfb5b778eb12c677883b40b9 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Tue, 16 Jul 2019 13:42:19 -0400 Subject: [PATCH 11/50] Add remaining kafka metadata fields --- filebeat/input/kafka/input.go | 46 ++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index a3ecca77267..837b4e17006 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -64,8 +64,6 @@ func NewInput( return nil, err } - //forwarder := harvester.NewForwarder(out) - config := defaultConfig if err := cfg.Unpack(&config); err != nil { return nil, errors.Wrap(err, "reading kafka input config") @@ -147,11 +145,22 @@ func (p *Input) Stop() { p.kafkaCancel() } +func arrayForKafkaHeaders(headers []*sarama.RecordHeader) []interface{} { + array := []interface{}{} + for _, header := range headers { + array = append(array, common.MapStr{ + "key": header.Key, + "value": header.Value, + }) + } + return array +} + type groupHandler struct { input *Input } -func createEvent( +func (h groupHandler) createEvent( sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, message *sarama.ConsumerMessage, @@ -159,17 +168,26 @@ func createEvent( data := util.NewData() data.Event = beat.Event{ Timestamp: time.Now(), - Fields: common.MapStr{ - "message": string(message.Value), - "kafka": common.MapStr{ - "topic": claim.Topic(), - "partition": claim.Partition(), - "offset": message.Offset, - //message.Timestamp - }, - // TODO: add more metadata - }, } + eventFields := common.MapStr{ + "message": string(message.Value), + } + kafkaMetadata := common.MapStr{ + "topic": claim.Topic(), + "partition": claim.Partition(), + "offset": message.Offset, + "key": message.Key, + } + version, ok := h.input.config.Version.Get() + if ok && version.IsAtLeast(sarama.V0_10_0_0) { + data.Event.Timestamp = message.Timestamp + kafkaMetadata["block_timestamp"] = message.BlockTimestamp + } + if ok && version.IsAtLeast(sarama.V0_11_0_0) { + kafkaMetadata["headers"] = arrayForKafkaHeaders(message.Headers) + } + eventFields["kafka"] = kafkaMetadata + data.Event.Fields = eventFields return data } @@ -183,7 +201,7 @@ func (groupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { func (h groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { - event := createEvent(sess, claim, msg) + event := h.createEvent(sess, claim, msg) fmt.Printf("event: %v\n", event) h.input.outlet.OnEvent(event) sess.MarkMessage(msg, "") From 95e6bff3b92da847b9ac76e77acb2fb9fa7a772e Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Tue, 16 Jul 2019 13:47:32 -0400 Subject: [PATCH 12/50] document new metadata fields --- filebeat/input/kafka/_meta/fields.yml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/filebeat/input/kafka/_meta/fields.yml b/filebeat/input/kafka/_meta/fields.yml index d877776bbad..7d10108d558 100644 --- a/filebeat/input/kafka/_meta/fields.yml +++ b/filebeat/input/kafka/_meta/fields.yml @@ -19,3 +19,19 @@ type: long description: > Kafka offset of this message + + - name: kafka.key + type: keyword + description: > + Kafka key, corresponding to the Kafka value stored in the message + + - name: kafka.block_timestamp + type: date + description: > + Kafka outer (compressed) block timestamp + + - name: kafka.headers + type: array + description: > + The array of kafka headers, each an object containing subfields + "key" and "value". From 5bb711cdfb61d09af68b556e17a4f41dcad9a89d Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Tue, 16 Jul 2019 13:59:20 -0400 Subject: [PATCH 13/50] Adjust kafka producer version / test message in tests --- filebeat/input/kafka/kafka_integration_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index 33337c36f26..86892e7c48c 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -180,6 +180,7 @@ func writeToKafkaTopic( config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Return.Successes = true config.Producer.Partitioner = sarama.NewHashPartitioner + config.Version = sarama.V1_0_0_0 hosts := []string{getTestKafkaHost()} producer, err := sarama.NewSyncProducer(hosts, config) @@ -195,6 +196,12 @@ func writeToKafkaTopic( msg := &sarama.ProducerMessage{ Topic: topic, Value: sarama.StringEncoder(message), + Headers: []sarama.RecordHeader{ + sarama.RecordHeader{ + Key: []byte("testkey"), + Value: []byte("testvalue"), + }, + }, } _, _, err = producer.SendMessage(msg) From 9601e0cbe6a29d92beb71e784ab4c5c07a29e182 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Tue, 16 Jul 2019 14:06:21 -0400 Subject: [PATCH 14/50] Don't record BlockTimestamp if it's zero --- filebeat/input/kafka/input.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 837b4e17006..7c3bae728e6 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -178,12 +178,14 @@ func (h groupHandler) createEvent( "offset": message.Offset, "key": message.Key, } - version, ok := h.input.config.Version.Get() - if ok && version.IsAtLeast(sarama.V0_10_0_0) { + version, versionOk := h.input.config.Version.Get() + if versionOk && version.IsAtLeast(sarama.V0_10_0_0) { data.Event.Timestamp = message.Timestamp - kafkaMetadata["block_timestamp"] = message.BlockTimestamp + if !message.BlockTimestamp.IsZero() { + kafkaMetadata["block_timestamp"] = message.BlockTimestamp + } } - if ok && version.IsAtLeast(sarama.V0_11_0_0) { + if versionOk && version.IsAtLeast(sarama.V0_11_0_0) { kafkaMetadata["headers"] = arrayForKafkaHeaders(message.Headers) } eventFields["kafka"] = kafkaMetadata From 2d8d37ca52f2e8783ec67e27743054db56f3ced9 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Tue, 16 Jul 2019 14:41:06 -0400 Subject: [PATCH 15/50] Remove debug printf --- filebeat/input/kafka/input.go | 2 -- filebeat/input/kafka/kafka_integration_test.go | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 7c3bae728e6..c03bc2cf2f1 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -19,7 +19,6 @@ package kafka import ( "context" - "fmt" "time" "github.com/Shopify/sarama" @@ -204,7 +203,6 @@ func (groupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { func (h groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { event := h.createEvent(sess, claim, msg) - fmt.Printf("event: %v\n", event) h.input.outlet.OnEvent(event) sess.MarkMessage(msg, "") } diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index 86892e7c48c..e231d8c62b0 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -64,6 +64,7 @@ func NewEventCapturer(events chan *util.Data) channel.Outleter { } func (o *eventCapturer) OnEvent(event *util.Data) bool { + fmt.Printf("event: %v\n", event) o.events <- event return true } From e6b8b53b807cc3718cfbeaa368f2d5378dff1a0f Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 17 Jul 2019 11:35:59 -0400 Subject: [PATCH 16/50] make fmt --- filebeat/docs/fields.asciidoc | 30 +++++++++++++++++++ filebeat/input/kafka/config.go | 1 + filebeat/input/kafka/input.go | 1 + .../input/kafka/kafka_integration_test.go | 2 ++ 4 files changed, 34 insertions(+) diff --git a/filebeat/docs/fields.asciidoc b/filebeat/docs/fields.asciidoc index 5c19a95c8df..f97fbfe8e6e 100644 --- a/filebeat/docs/fields.asciidoc +++ b/filebeat/docs/fields.asciidoc @@ -7022,6 +7022,36 @@ type: long Kafka offset of this message +-- + +*`kafka.key`*:: ++ +-- +type: keyword + +Kafka key, corresponding to the Kafka value stored in the message + + +-- + +*`kafka.block_timestamp`*:: ++ +-- +type: date + +Kafka outer (compressed) block timestamp + + +-- + +*`kafka.headers`*:: ++ +-- +type: array + +The array of kafka headers, each an object containing subfields "key" and "value". + + -- [[exported-fields-kibana]] diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index 958988a8728..1cc63e51116 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -22,6 +22,7 @@ import ( "fmt" "github.com/Shopify/sarama" + "github.com/elastic/beats/libbeat/common/kafka" "github.com/elastic/beats/libbeat/common/transport/tlscommon" "github.com/elastic/beats/libbeat/logp" diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index c03bc2cf2f1..c637fd9e5b2 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -22,6 +22,7 @@ import ( "time" "github.com/Shopify/sarama" + "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/util" diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index e231d8c62b0..d8ea62dd2f1 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +// +build integration + package kafka import ( From 426c98f99d065bb9343ecfc78ace792630621718 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 18 Jul 2019 11:44:27 -0400 Subject: [PATCH 17/50] Add kafka container to filebeat integration tests --- filebeat/docker-compose.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/filebeat/docker-compose.yml b/filebeat/docker-compose.yml index 746e7bc313d..52437a78a8d 100644 --- a/filebeat/docker-compose.yml +++ b/filebeat/docker-compose.yml @@ -12,6 +12,8 @@ services: - ES_PORT=9200 - ES_USER=beats - ES_PASS=testing + - KAFKA_HOST=kafka + - KAFKA_PORT=9092 - KIBANA_HOST=kibana - KIBANA_PORT=5601 working_dir: /go/src/github.com/elastic/beats/filebeat @@ -27,6 +29,7 @@ services: image: busybox depends_on: elasticsearch: { condition: service_healthy } + kafka: { condition: service_healthy } kibana: { condition: service_healthy } redis: { condition: service_healthy } @@ -35,6 +38,14 @@ services: file: ${ES_BEATS}/testing/environments/${TESTING_ENVIRONMENT}.yml service: elasticsearch + kafka: + build: ${ES_BEATS}/testing/environments/docker/kafka + expose: + - 9092 + - 2181 + environment: + - ADVERTISED_HOST=kafka + kibana: extends: file: ${ES_BEATS}/testing/environments/${TESTING_ENVIRONMENT}.yml From 43f1cde0d374d2f70a50d7e7b85f62d0d8a15b70 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 18 Jul 2019 14:22:39 -0400 Subject: [PATCH 18/50] regenerate docs --- filebeat/docs/fields.asciidoc | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/filebeat/docs/fields.asciidoc b/filebeat/docs/fields.asciidoc index 51ff1c4c8a6..9486bd4356e 100644 --- a/filebeat/docs/fields.asciidoc +++ b/filebeat/docs/fields.asciidoc @@ -7232,61 +7232,61 @@ Kafka metadata added by the kafka input *`kafka.topic`*:: + -- -type: keyword - Kafka topic +type: keyword + -- *`kafka.partition`*:: + -- -type: long - Kafka partition number +type: long + -- *`kafka.offset`*:: + -- -type: long - Kafka offset of this message +type: long + -- *`kafka.key`*:: + -- -type: keyword - Kafka key, corresponding to the Kafka value stored in the message +type: keyword + -- *`kafka.block_timestamp`*:: + -- -type: date - Kafka outer (compressed) block timestamp +type: date + -- *`kafka.headers`*:: + -- -type: array - The array of kafka headers, each an object containing subfields "key" and "value". +type: array + -- [[exported-fields-kibana]] From da3eb9988b3402c6944c5b67790343dcd7f39bca Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 18 Jul 2019 15:55:29 -0400 Subject: [PATCH 19/50] Remove unused test helpers --- .../input/kafka/kafka_integration_test.go | 26 +------------------ 1 file changed, 1 insertion(+), 25 deletions(-) diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index d8ea62dd2f1..335eba2e027 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -20,7 +20,6 @@ package kafka import ( - "encoding/json" "fmt" "math/rand" "os" @@ -37,7 +36,6 @@ import ( "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/common/fmtstr" _ "github.com/elastic/beats/libbeat/outputs/codec/format" _ "github.com/elastic/beats/libbeat/outputs/codec/json" ) @@ -99,7 +97,7 @@ func TestInput(t *testing.T) { // Setup the input config config, _ := common.NewConfigFrom(common.MapStr{ - "hosts": "kafka:9092", + "hosts": getTestKafkaHost(), "topics": []string{testTopic}, "group_id": "filebeat", }) @@ -136,28 +134,6 @@ func TestInput(t *testing.T) { } } -func validateJSON(t *testing.T, value []byte, event beat.Event) { - var decoded map[string]interface{} - err := json.Unmarshal(value, &decoded) - if err != nil { - t.Errorf("can not json decode event value: %v", value) - return - } - assert.Equal(t, decoded["type"], event.Fields["type"]) - assert.Equal(t, decoded["message"], event.Fields["message"]) -} - -func makeValidateFmtStr(fmt string) func(*testing.T, []byte, beat.Event) { - fmtString := fmtstr.MustCompileEvent(fmt) - return func(t *testing.T, value []byte, event beat.Event) { - expectedMessage, err := fmtString.Run(&event) - if err != nil { - t.Fatal(err) - } - assert.Equal(t, string(expectedMessage), string(value)) - } -} - func strDefault(a, defaults string) string { if len(a) == 0 { return defaults From 3da5f9940a210651077cf6d25772e4082f38dc11 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 18 Jul 2019 16:41:46 -0400 Subject: [PATCH 20/50] Add header verification to kafka integration test --- .../input/kafka/kafka_integration_test.go | 93 +++++++++++++++---- 1 file changed, 77 insertions(+), 16 deletions(-) diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index 335eba2e027..7091b3d4421 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -64,7 +64,6 @@ func NewEventCapturer(events chan *util.Data) channel.Outleter { } func (o *eventCapturer) OnEvent(event *util.Data) bool { - fmt.Printf("event: %v\n", event) o.events <- event return true } @@ -81,6 +80,18 @@ func (o *eventCapturer) Done() <-chan struct{} { return o.c } +type testMessage struct { + message string + headers []sarama.RecordHeader +} + +func recordHeader(key, value string) sarama.RecordHeader { + return sarama.RecordHeader{ + Key: []byte(key), + Value: []byte(value), + } +} + func TestInput(t *testing.T) { id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int()) testTopic := fmt.Sprintf("Filebeat-TestInput-%s", id) @@ -90,9 +101,24 @@ func TestInput(t *testing.T) { } // Send test messages to the topic for the input to read. - messageStrs := []string{"testing", "stuff", "blah"} - for _, s := range messageStrs { - writeToKafkaTopic(t, testTopic, s, time.Second*20) + messages := []testMessage{ + testMessage{message: "testing"}, + testMessage{ + message: "stuff", + headers: []sarama.RecordHeader{ + recordHeader("X-Test-Header", "test header value"), + }, + }, + testMessage{ + message: "things", + headers: []sarama.RecordHeader{ + recordHeader("keys and things", "3^3 = 27"), + recordHeader("kafka yay", "3^3 - 2^4 = 11"), + }, + }, + } + for _, m := range messages { + writeToKafkaTopic(t, testTopic, m.message, m.headers, time.Second*20) } // Setup the input config @@ -120,20 +146,59 @@ func TestInput(t *testing.T) { input.Run() timeout := time.After(30 * time.Second) - for _, m := range messageStrs { + for _, m := range messages { select { case event := <-events: - result, err := event.GetEvent().Fields.GetValue("message") + text, err := event.GetEvent().Fields.GetValue("message") if err != nil { t.Fatal(err) } - assert.Equal(t, result, m) + assert.Equal(t, text, m.message) + + checkMatchingHeaders(t, event.GetEvent(), m.headers) case <-timeout: t.Fatal("timeout waiting for incoming events") } } } +func checkMatchingHeaders( + t *testing.T, event beat.Event, expected []sarama.RecordHeader, +) { + kafka, err := event.Fields.GetValue("kafka") + if err != nil { + t.Error(err) + return + } + kafkaMap, ok := kafka.(common.MapStr) + if !ok { + t.Error("event.Fields.kafka isn't MapStr") + return + } + headers, err := kafkaMap.GetValue("headers") + if err != nil { + t.Error(err) + return + } + headerArray, ok := headers.([]interface{}) + if !ok { + t.Error("event.Fields.kafka.headers isn't a []interface{}") + return + } + assert.Equal(t, len(expected), len(headerArray)) + for i := 0; i < len(expected); i++ { + headerMap, ok := headerArray[i].(common.MapStr) + if !ok { + t.Errorf("event.Fields.kafka.headers[%v] isn't a MapStr", i) + continue + } + key, _ := headerMap.GetValue("key") + value, _ := headerMap.GetValue("value") + assert.Equal(t, expected[i].Key, key) + assert.Equal(t, expected[i].Value, value) + } +} + func strDefault(a, defaults string) string { if len(a) == 0 { return defaults @@ -153,7 +218,8 @@ func getTestKafkaHost() string { } func writeToKafkaTopic( - t *testing.T, topic string, message string, timeout time.Duration, + t *testing.T, topic string, message string, + headers []sarama.RecordHeader, timeout time.Duration, ) { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll @@ -173,14 +239,9 @@ func writeToKafkaTopic( }() msg := &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.StringEncoder(message), - Headers: []sarama.RecordHeader{ - sarama.RecordHeader{ - Key: []byte("testkey"), - Value: []byte("testvalue"), - }, - }, + Topic: topic, + Value: sarama.StringEncoder(message), + Headers: headers, } _, _, err = producer.SendMessage(msg) From 6bdb13fa836b1d78ab6e99969ae14fa48f5c4919 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 22 Jul 2019 15:49:36 -0400 Subject: [PATCH 21/50] Addressing review comments --- filebeat/input/kafka/config.go | 36 ++++++++++++++++--------------- filebeat/input/kafka/input.go | 39 +++++++++++++++------------------- 2 files changed, 36 insertions(+), 39 deletions(-) diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index 1cc63e51116..6d17c746602 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -31,6 +31,19 @@ import ( "github.com/elastic/beats/libbeat/outputs" ) +type kafkaInputConfig struct { + // Kafka hosts with port, e.g. "localhost:9092" + Hosts []string `config:"hosts" validate:"required"` + Topics []string `config:"topics" validate:"required"` + GroupID string `config:"group_id" validate:"required"` + ClientID string `config:"client_id"` + Version kafka.Version `config:"version"` + InitialOffset initialOffset `config:"initial_offset"` + TLS *tlscommon.Config `config:"ssl"` + Username string `config:"username"` + Password string `config:"password"` +} + type initialOffset int const ( @@ -39,29 +52,18 @@ const ( ) var ( - defaultConfig = kafkaInputConfig{ - Version: kafka.Version("1.0.0"), - InitialOffset: initialOffsetOldest, - ClientID: "filebeat", - } - initialOffsets = map[string]initialOffset{ "oldest": initialOffsetOldest, "newest": initialOffsetNewest, } ) -type kafkaInputConfig struct { - // Kafka hosts with port, e.g. "localhost:9092" - Hosts []string `config:"hosts" validate:"required"` - Topics []string `config:"topics" validate:"required"` - GroupID string `config:"group_id" validate:"required"` - ClientID string `config:"client_id"` - Version kafka.Version `config:"version"` - InitialOffset initialOffset `config:"initial_offset"` - TLS *tlscommon.Config `config:"ssl"` - Username string `config:"username"` - Password string `config:"password"` +func defaultConfig() kafkaInputConfig { + return kafkaInputConfig{ + Version: kafka.Version("1.0.0"), + InitialOffset: initialOffsetOldest, + ClientID: "filebeat", + } } // Validate validates the config. diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index c637fd9e5b2..e626c98990b 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -41,7 +41,7 @@ func init() { } // Input contains the input and its config -type Input struct { +type kafkaInput struct { config kafkaInputConfig rawConfig *common.Config // The Config given to NewInput started bool @@ -64,7 +64,7 @@ func NewInput( return nil, err } - config := defaultConfig + config := defaultConfig() if err := cfg.Unpack(&config); err != nil { return nil, errors.Wrap(err, "reading kafka input config") } @@ -91,7 +91,7 @@ func NewInput( } }() - input := &Input{ + input := &kafkaInput{ config: config, rawConfig: cfg, started: false, @@ -105,44 +105,39 @@ func NewInput( return input, nil } -func (p *Input) newConsumerGroup() (sarama.ConsumerGroup, error) { - consumerGroup, err := - sarama.NewConsumerGroup(p.config.Hosts, p.config.GroupID, nil) - return consumerGroup, err -} - // Run starts the input by scanning for incoming messages and errors. -func (p *Input) Run() { - if !p.started { +func (input *kafkaInput) Run() { + if !input.started { // Track errors go func() { - for err := range p.consumerGroup.Errors() { - p.log.Errorw("Error reading from kafka", "error", err) + for err := range input.consumerGroup.Errors() { + input.log.Errorw("Error reading from kafka", "error", err) } }() go func() { for { - handler := groupHandler{input: p} + handler := groupHandler{input: input} - err := p.consumerGroup.Consume(p.kafkaContext, p.config.Topics, handler) + err := input.consumerGroup.Consume( + input.kafkaContext, input.config.Topics, handler) if err != nil { - p.log.Errorw("Kafka consume error", "error", err) + input.log.Errorw("Kafka consume error", "error", err) } } }() - p.started = true + input.started = true } } // Wait shuts down the Input by cancelling the internal context. -func (p *Input) Wait() { - p.Stop() +func (input *kafkaInput) Wait() { + input.Stop() } // Stop shuts down the Input by cancelling the internal context. -func (p *Input) Stop() { - p.kafkaCancel() +func (input *kafkaInput) Stop() { + input.kafkaCancel() } func arrayForKafkaHeaders(headers []*sarama.RecordHeader) []interface{} { @@ -157,7 +152,7 @@ func arrayForKafkaHeaders(headers []*sarama.RecordHeader) []interface{} { } type groupHandler struct { - input *Input + input *kafkaInput } func (h groupHandler) createEvent( From bfeaeb038a96c7714cb2595bae3bcca20a808b06 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Tue, 23 Jul 2019 16:11:24 -0400 Subject: [PATCH 22/50] Review comments --- filebeat/input/kafka/input.go | 38 +++++++++---------- .../input/kafka/kafka_integration_test.go | 2 +- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index e626c98990b..4cbd0315d8c 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -44,7 +44,6 @@ func init() { type kafkaInput struct { config kafkaInputConfig rawConfig *common.Config // The Config given to NewInput - started bool outlet channel.Outleter consumerGroup sarama.ConsumerGroup kafkaContext context.Context @@ -94,7 +93,6 @@ func NewInput( input := &kafkaInput{ config: config, rawConfig: cfg, - started: false, outlet: out, consumerGroup: consumerGroup, kafkaContext: kafkaContext, @@ -107,32 +105,32 @@ func NewInput( // Run starts the input by scanning for incoming messages and errors. func (input *kafkaInput) Run() { - if !input.started { - // Track errors - go func() { - for err := range input.consumerGroup.Errors() { - input.log.Errorw("Error reading from kafka", "error", err) - } - }() + // Track errors + go func() { + for err := range input.consumerGroup.Errors() { + input.log.Errorw("Error reading from kafka", "error", err) + } + }() - go func() { - for { - handler := groupHandler{input: input} + go func() { + for { + handler := groupHandler{input: input} - err := input.consumerGroup.Consume( - input.kafkaContext, input.config.Topics, handler) - if err != nil { - input.log.Errorw("Kafka consume error", "error", err) - } + err := input.consumerGroup.Consume( + input.kafkaContext, input.config.Topics, handler) + if err != nil { + input.log.Errorw("Kafka consume error", "error", err) } - }() - input.started = true - } + } + }() } // Wait shuts down the Input by cancelling the internal context. func (input *kafkaInput) Wait() { input.Stop() + // TODO: wait on any messages still pending internal delivery + // Wait for the consumer group to shut down + input.consumerGroup.Close() } // Stop shuts down the Input by cancelling the internal context. diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index 7091b3d4421..87d62f2ee2f 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -200,7 +200,7 @@ func checkMatchingHeaders( } func strDefault(a, defaults string) string { - if len(a) == 0 { + if a == "" { return defaults } return a From 8061d8691161cc85781792896785c86e07598734 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Tue, 23 Jul 2019 17:28:37 -0400 Subject: [PATCH 23/50] Add several more kafka configuration settings --- filebeat/input/kafka/config.go | 73 +++++++++++++++++++++++++++++++++- 1 file changed, 72 insertions(+), 1 deletion(-) diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index 6d17c746602..13a10cd027c 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -20,6 +20,7 @@ package kafka import ( "errors" "fmt" + "time" "github.com/Shopify/sarama" @@ -39,11 +40,28 @@ type kafkaInputConfig struct { ClientID string `config:"client_id"` Version kafka.Version `config:"version"` InitialOffset initialOffset `config:"initial_offset"` + RetryBackoff time.Duration `config:"retry_backoff" validate:"min=0"` + MaxWaitTime time.Duration `config:"max_wait_time"` + Fetch *kafkaFetch `config:"fetch"` + Rebalance *kafkaRebalance `config:"rebalance"` TLS *tlscommon.Config `config:"ssl"` Username string `config:"username"` Password string `config:"password"` } +type kafkaFetch struct { + Min int32 `config:"min" validate:"min=1"` + Default int32 `config:"default" validate:"min=1"` + Max int32 `config:"max" validate:"min=0"` +} + +type kafkaRebalance struct { + Strategy rebalanceStrategy `config:"strategy"` + Timeout time.Duration `config:"timeout"` + MaxRetries int `config:"max_retries"` + RetryBackoff time.Duration `config:"retry_backoff" validate:"min=0"` +} + type initialOffset int const ( @@ -51,18 +69,44 @@ const ( initialOffsetNewest ) +type rebalanceStrategy int + +const ( + rebalanceStrategyRange rebalanceStrategy = iota + rebalanceStrategyRoundRobin +) + var ( initialOffsets = map[string]initialOffset{ "oldest": initialOffsetOldest, "newest": initialOffsetNewest, } + rebalanceStrategies = map[string]rebalanceStrategy{ + "range": rebalanceStrategyRange, + "roundrobin": rebalanceStrategyRoundRobin, + } ) +// The default config for the kafka input. When in doubt, default values +// were chosen to match sarama's defaults. func defaultConfig() kafkaInputConfig { return kafkaInputConfig{ Version: kafka.Version("1.0.0"), InitialOffset: initialOffsetOldest, ClientID: "filebeat", + RetryBackoff: 2 * time.Second, + MaxWaitTime: 250 * time.Millisecond, + Fetch: &kafkaFetch{ + Min: 1, + Default: (1 << 20), // 1 MB + Max: 0, + }, + Rebalance: &kafkaRebalance{ + Strategy: rebalanceStrategyRange, + Timeout: 60 * time.Second, + MaxRetries: 4, + RetryBackoff: 2 * time.Second, + }, } } @@ -93,6 +137,18 @@ func newSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) { k.Consumer.Return.Errors = true k.Consumer.Offsets.Initial = config.InitialOffset.asSaramaOffset() + k.Consumer.Retry.Backoff = config.RetryBackoff + k.Consumer.MaxWaitTime = config.MaxWaitTime + + k.Consumer.Fetch.Min = config.Fetch.Min + k.Consumer.Fetch.Default = config.Fetch.Default + k.Consumer.Fetch.Max = config.Fetch.Max + + k.Consumer.Group.Rebalance.Strategy = + config.Rebalance.Strategy.asSaramaStrategy() + k.Consumer.Group.Rebalance.Timeout = config.Rebalance.Timeout + k.Consumer.Group.Rebalance.Retry.Backoff = config.Rebalance.RetryBackoff + k.Consumer.Group.Rebalance.Retry.Max = config.Rebalance.MaxRetries tls, err := outputs.LoadTLSConfig(config.TLS) if err != nil { @@ -142,8 +198,23 @@ func (off *initialOffset) Unpack(value string) error { if !ok { return fmt.Errorf("invalid initialOffset '%s'", value) } - *off = initialOffset + return nil +} +func (st rebalanceStrategy) asSaramaStrategy() sarama.BalanceStrategy { + return map[rebalanceStrategy]sarama.BalanceStrategy{ + rebalanceStrategyRange: sarama.BalanceStrategyRange, + rebalanceStrategyRoundRobin: sarama.BalanceStrategyRoundRobin, + }[st] +} + +// Unpack validates and unpack the "rebalance.strategy" config option +func (st *rebalanceStrategy) Unpack(value string) error { + strategy, ok := rebalanceStrategies[value] + if !ok { + return fmt.Errorf("invalid rebalance strategy '%s'", value) + } + *st = strategy return nil } From 678d71b1d9f2c54a91c4efdd492de15955a504e0 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 25 Jul 2019 17:01:26 -0400 Subject: [PATCH 24/50] Document kafka input configuration --- filebeat/docs/inputs/input-kafka.asciidoc | 107 ++++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 filebeat/docs/inputs/input-kafka.asciidoc diff --git a/filebeat/docs/inputs/input-kafka.asciidoc b/filebeat/docs/inputs/input-kafka.asciidoc new file mode 100644 index 00000000000..91bc36fe559 --- /dev/null +++ b/filebeat/docs/inputs/input-kafka.asciidoc @@ -0,0 +1,107 @@ +:type: kafka + +[id="{beatname_lc}-input-{type}"] +=== Kafka input + +++++ +Kafka +++++ + +Use the `kafka` input to read from topics in a Kafka cluster. + +To configure this input, specify a list of <> to use for this +cluster, a list of <> to track, and a <> +to connect with. + +Example configuration: + +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: kafka + hosts: + - kafka-broker-1:9092 + - kafka-broker-2:9092 + topics: ["my-topic"] + group_id: "filebeat" + +---- + + +[id="{beatname_lc}-input-{type}-options"] +==== Configuration options + +The `kafka` input supports the following configuration options plus the +<<{beatname_lc}-input-{type}-common-options>> described later. + +[float] +[[hosts]] +===== `hosts` + +A list of Kafka hosts (brokers) for this cluster. + +[float] +[[topics]] +===== `topics` + +A list of topics to read from. + +[float] +[[groupid]] +===== `group_id` + +The Kafka consumer group id. + +[float] +===== `client_id` + +The Kafka client id (optional). + +[float] +===== `version` + +The version of the Kafka protocol to use (defaults to `"1.0.0"`). + +[float] +===== `initial_offset` + +The initial offset to start reading, either "oldest" or "newest". Defaults to +"oldest". + +===== `retry_backoff` + +How long to wait before retrying a failed read. Default is 2s. + +===== `max_wait_time` + +How long to wait for the minimum number of input bytes while reading. Default +is 250ms. + +===== `fetch` + +Kafka fetch settings: + +*`min`*:: The minimum number of bytes to wait for. Defaults to 1. + +*`default`*:: The default number of bytes to read per request. Defaults to 1MB. + +*`max`*:: The maximum number of bytes to read per request. Defaults to 0 +(no limit). + +===== `rebalance` + +Kafka rebalance settings: + +*`strategy`*:: Either `"range"` or `"roundrobin"`. Defaults to `"range"`. + +*`timeout`*:: How long to wait for an attempted rebalance. Defaults to 60s. + +*`max_retries`*:: How many times to retry if rebalancing fails. Defaults to 4. + +*`retry_backoff`*:: How long to wait after an unsuccessful rebalance attempt. +Defaults to 2s. + +[id="{beatname_lc}-input-{type}-common-options"] +include::../inputs/input-common-options.asciidoc[] + +:type!: From 6a45e0509928ed8b479a9bc858703aa212e14099 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 26 Jul 2019 15:40:44 -0400 Subject: [PATCH 25/50] Update for new outlet api --- filebeat/input/kafka/input.go | 54 +++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 4cbd0315d8c..1132bdc0b5e 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -19,13 +19,13 @@ package kafka import ( "context" + "sync" "time" "github.com/Shopify/sarama" "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/input" - "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -49,16 +49,21 @@ type kafkaInput struct { kafkaContext context.Context kafkaCancel context.CancelFunc // The CancelFunc for kafkaContext log *logp.Logger + runOnce sync.Once } // NewInput creates a new kafka input func NewInput( cfg *common.Config, - outletFactory channel.Connector, + connector channel.Connector, inputContext input.Context, ) (input.Input, error) { - out, err := outletFactory(cfg, inputContext.DynamicFields) + out, err := connector.ConnectWith(cfg, beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + DynamicFields: inputContext.DynamicFields, + }, + }) if err != nil { return nil, err } @@ -105,24 +110,26 @@ func NewInput( // Run starts the input by scanning for incoming messages and errors. func (input *kafkaInput) Run() { - // Track errors - go func() { - for err := range input.consumerGroup.Errors() { - input.log.Errorw("Error reading from kafka", "error", err) - } - }() + input.runOnce.Do(func() { + // Track errors + go func() { + for err := range input.consumerGroup.Errors() { + input.log.Errorw("Error reading from kafka", "error", err) + } + }() - go func() { - for { - handler := groupHandler{input: input} + go func() { + for { + handler := groupHandler{input: input} - err := input.consumerGroup.Consume( - input.kafkaContext, input.config.Topics, handler) - if err != nil { - input.log.Errorw("Kafka consume error", "error", err) + err := input.consumerGroup.Consume( + input.kafkaContext, input.config.Topics, handler) + if err != nil { + input.log.Errorw("Kafka consume error", "error", err) + } } - } - }() + }() + }) } // Wait shuts down the Input by cancelling the internal context. @@ -157,9 +164,8 @@ func (h groupHandler) createEvent( sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, message *sarama.ConsumerMessage, -) *util.Data { - data := util.NewData() - data.Event = beat.Event{ +) beat.Event { + event := beat.Event{ Timestamp: time.Now(), } eventFields := common.MapStr{ @@ -173,7 +179,7 @@ func (h groupHandler) createEvent( } version, versionOk := h.input.config.Version.Get() if versionOk && version.IsAtLeast(sarama.V0_10_0_0) { - data.Event.Timestamp = message.Timestamp + event.Timestamp = message.Timestamp if !message.BlockTimestamp.IsZero() { kafkaMetadata["block_timestamp"] = message.BlockTimestamp } @@ -182,8 +188,8 @@ func (h groupHandler) createEvent( kafkaMetadata["headers"] = arrayForKafkaHeaders(message.Headers) } eventFields["kafka"] = kafkaMetadata - data.Event.Fields = eventFields - return data + event.Fields = eventFields + return event } func (groupHandler) Setup(session sarama.ConsumerGroupSession) error { From 5cee0820cb0ffdcc262aa48043a098b027484a37 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 29 Jul 2019 15:33:45 -0400 Subject: [PATCH 26/50] Add end-to-end ACK to the kafka input / synchronize access to the session --- filebeat/input/kafka/input.go | 61 +++++++++++++++++++++++++++++++++-- 1 file changed, 58 insertions(+), 3 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 1132bdc0b5e..9379cce736d 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -46,12 +46,21 @@ type kafkaInput struct { rawConfig *common.Config // The Config given to NewInput outlet channel.Outleter consumerGroup sarama.ConsumerGroup + sessionState *kafkaSessionState kafkaContext context.Context kafkaCancel context.CancelFunc // The CancelFunc for kafkaContext log *logp.Logger runOnce sync.Once } +// A synchronized wrapper to read and write the kafka session, since it may +// change while ACKs are still pending. +type kafkaSessionState struct { + session sarama.ConsumerGroupSession + mutex sync.Mutex // Hold to access the session field + waitGroup sync.WaitGroup // Hold while using the session field +} + // NewInput creates a new kafka input func NewInput( cfg *common.Config, @@ -59,10 +68,27 @@ func NewInput( inputContext input.Context, ) (input.Input, error) { + // We create the empty session state first because it must be referenced by + // the ACK callback in the connector configuration. + sessionState := &kafkaSessionState{} + out, err := connector.ConnectWith(cfg, beat.ClientConfig{ Processing: beat.ProcessingConfig{ DynamicFields: inputContext.DynamicFields, }, + ACKEvents: func(events []interface{}) { + sessionState.accessSession(func(session sarama.ConsumerGroupSession) { + if session == nil { + // The kafka connection is closed and / or is being rebalanced. + return + } + for _, event := range events { + if cm, ok := event.(*sarama.ConsumerMessage); ok { + session.MarkMessage(cm, "") + } + } + }) + }, }) if err != nil { return nil, err @@ -100,6 +126,7 @@ func NewInput( rawConfig: cfg, outlet: out, consumerGroup: consumerGroup, + sessionState: sessionState, kafkaContext: kafkaContext, kafkaCancel: kafkaCancel, log: logp.NewLogger("kafka input").With("hosts", config.Hosts), @@ -108,6 +135,33 @@ func NewInput( return input, nil } +// A helper to safely use the current sarama session for the duration of the +// given callback. Used when ACKing messages outside the body of the main +// sarama callbacks. The session parameter may be nil if there is no active +// session. +func (state *kafkaSessionState) accessSession( + fn func(session sarama.ConsumerGroupSession), +) { + state.mutex.Lock() + state.waitGroup.Add(1) + session := state.session + state.mutex.Unlock() + defer state.waitGroup.Done() + fn(session) +} + +// A helper to safely set the session field after waiting on any pending +// operations. +func (state *kafkaSessionState) setSession(sess sarama.ConsumerGroupSession) { + state.mutex.Lock() + // Once we claim the mutex we still wait for any pending ACKs to be + // sent. (These may well fail if the session is ending, but that's better + // than calling a stale pointer.) + state.waitGroup.Wait() + state.session = sess + state.mutex.Unlock() +} + // Run starts the input by scanning for incoming messages and errors. func (input *kafkaInput) Run() { input.runOnce.Do(func() { @@ -135,7 +189,6 @@ func (input *kafkaInput) Run() { // Wait shuts down the Input by cancelling the internal context. func (input *kafkaInput) Wait() { input.Stop() - // TODO: wait on any messages still pending internal delivery // Wait for the consumer group to shut down input.consumerGroup.Close() } @@ -192,11 +245,13 @@ func (h groupHandler) createEvent( return event } -func (groupHandler) Setup(session sarama.ConsumerGroupSession) error { +func (h groupHandler) Setup(session sarama.ConsumerGroupSession) error { + h.input.sessionState.setSession(session) return nil } -func (groupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { +func (h groupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { + h.input.sessionState.setSession(nil) return nil } From a16e862e524c411a7bc349d1939a08ecb9558468 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 29 Jul 2019 16:02:16 -0400 Subject: [PATCH 27/50] Update integration test --- .../input/kafka/kafka_integration_test.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index 87d62f2ee2f..b175e43876c 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -// +build integration +// + build integration package kafka @@ -33,7 +33,6 @@ import ( "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/input" - "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" _ "github.com/elastic/beats/libbeat/outputs/codec/format" @@ -53,17 +52,17 @@ type eventCapturer struct { closed bool c chan struct{} closeOnce sync.Once - events chan *util.Data + events chan beat.Event } -func NewEventCapturer(events chan *util.Data) channel.Outleter { +func NewEventCapturer(events chan beat.Event) channel.Outleter { return &eventCapturer{ c: make(chan struct{}), events: events, } } -func (o *eventCapturer) OnEvent(event *util.Data) bool { +func (o *eventCapturer) OnEvent(event beat.Event) bool { o.events <- event return true } @@ -129,13 +128,13 @@ func TestInput(t *testing.T) { }) // Route input events through our capturer instead of sending through ES. - events := make(chan *util.Data, 100) + events := make(chan beat.Event, 100) defer close(events) capturer := NewEventCapturer(events) defer capturer.Close() - connector := func(*common.Config, *common.MapStrPointer) (channel.Outleter, error) { + connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { return channel.SubOutlet(capturer), nil - } + }) input, err := NewInput(config, connector, context) if err != nil { @@ -149,13 +148,13 @@ func TestInput(t *testing.T) { for _, m := range messages { select { case event := <-events: - text, err := event.GetEvent().Fields.GetValue("message") + text, err := event.Fields.GetValue("message") if err != nil { t.Fatal(err) } assert.Equal(t, text, m.message) - checkMatchingHeaders(t, event.GetEvent(), m.headers) + checkMatchingHeaders(t, event, m.headers) case <-timeout: t.Fatal("timeout waiting for incoming events") } From e2137cb42e222f50b5488692d0fce4bc4b216d60 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 29 Jul 2019 16:04:27 -0400 Subject: [PATCH 28/50] make integration test an integration test againk not unit --- filebeat/input/kafka/kafka_integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index b175e43876c..e7ac3a39ef7 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -// + build integration +// +build integration package kafka From cd8a3d9340e018d229fb8e22966c4aeb5993d874 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 29 Jul 2019 16:54:54 -0400 Subject: [PATCH 29/50] Replace sarama context with a minimal wrapper suggested by @urso --- filebeat/input/kafka/input.go | 56 ++++++++++++++++++++++++----------- 1 file changed, 38 insertions(+), 18 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 9379cce736d..b91541fd5c4 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -44,11 +44,10 @@ func init() { type kafkaInput struct { config kafkaInputConfig rawConfig *common.Config // The Config given to NewInput + context input.Context outlet channel.Outleter consumerGroup sarama.ConsumerGroup sessionState *kafkaSessionState - kafkaContext context.Context - kafkaCancel context.CancelFunc // The CancelFunc for kafkaContext log *logp.Logger runOnce sync.Once } @@ -109,26 +108,13 @@ func NewInput( return nil, errors.Wrap(err, "initializing kafka consumer group") } - // Sarama uses standard go contexts to control cancellation, so we need to - // wrap our input context channel in that interface. - kafkaContext, kafkaCancel := context.WithCancel(context.Background()) - go func() { - select { - case <-inputContext.Done: - logp.Info("Closing kafka context because input stopped.") - kafkaCancel() - return - } - }() - input := &kafkaInput{ config: config, rawConfig: cfg, + context: inputContext, outlet: out, consumerGroup: consumerGroup, sessionState: sessionState, - kafkaContext: kafkaContext, - kafkaCancel: kafkaCancel, log: logp.NewLogger("kafka input").With("hosts", config.Hosts), } @@ -176,11 +162,20 @@ func (input *kafkaInput) Run() { for { handler := groupHandler{input: input} + // Sarama uses standard go contexts to control cancellation, so we need + // to wrap our input context channel in that interface. err := input.consumerGroup.Consume( - input.kafkaContext, input.config.Topics, handler) + doneChannelContext(input.context.Done), input.config.Topics, handler) if err != nil { input.log.Errorw("Kafka consume error", "error", err) } + + // If Consume returned because the context was cancelled, don't resume. + select { + case <-input.context.Done: + return + default: + } } }() }) @@ -195,7 +190,7 @@ func (input *kafkaInput) Wait() { // Stop shuts down the Input by cancelling the internal context. func (input *kafkaInput) Stop() { - input.kafkaCancel() + close(input.context.Done) } func arrayForKafkaHeaders(headers []*sarama.RecordHeader) []interface{} { @@ -209,6 +204,31 @@ func arrayForKafkaHeaders(headers []*sarama.RecordHeader) []interface{} { return array } +// A barebones implementation of context.Context wrapped around the done +// channels that are more common in the beats codebase. This could be added +// as a utility in a shared part of the code, but right now it's a special +// case for sarama which requires a context.Context, so it's private until +// there's at least one other use case. +type channelCtx <-chan struct{} + +func doneChannelContext(ch <-chan struct{}) context.Context { + return channelCtx(ch) +} + +func (c channelCtx) Deadline() (deadline time.Time, ok bool) { return } +func (c channelCtx) Done() <-chan struct{} { + return (<-chan struct{})(c) +} +func (c channelCtx) Err() error { + select { + case <-c: + return context.Canceled + default: + return nil + } +} +func (c channelCtx) Value(key interface{}) interface{} { return nil } + type groupHandler struct { input *kafkaInput } From e54b9d87f7306ec8468e0e1c02ae0389e2e13ed1 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 31 Jul 2019 11:58:59 -0400 Subject: [PATCH 30/50] Clarify docs --- filebeat/docs/inputs/input-kafka.asciidoc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/filebeat/docs/inputs/input-kafka.asciidoc b/filebeat/docs/inputs/input-kafka.asciidoc index 91bc36fe559..7af2b2c9ce0 100644 --- a/filebeat/docs/inputs/input-kafka.asciidoc +++ b/filebeat/docs/inputs/input-kafka.asciidoc @@ -9,9 +9,9 @@ Use the `kafka` input to read from topics in a Kafka cluster. -To configure this input, specify a list of <> to use for this -cluster, a list of <> to track, and a <> -to connect with. +To configure this input, specify a list of one or more <> in the +cluster to bootstrap the connection with, a list of <> to +track, and a <> for the connection. Example configuration: @@ -38,7 +38,7 @@ The `kafka` input supports the following configuration options plus the [[hosts]] ===== `hosts` -A list of Kafka hosts (brokers) for this cluster. +A list of Kafka bootstrapping hosts (brokers) for this cluster. [float] [[topics]] From 418364ac1d0a3e08bc05d4b518e079240fbe8c48 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 31 Jul 2019 12:21:40 -0400 Subject: [PATCH 31/50] Addressing review comments --- filebeat/input/kafka/config.go | 8 ++++---- filebeat/input/kafka/input.go | 10 +++------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index 13a10cd027c..7f810012790 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -42,8 +42,8 @@ type kafkaInputConfig struct { InitialOffset initialOffset `config:"initial_offset"` RetryBackoff time.Duration `config:"retry_backoff" validate:"min=0"` MaxWaitTime time.Duration `config:"max_wait_time"` - Fetch *kafkaFetch `config:"fetch"` - Rebalance *kafkaRebalance `config:"rebalance"` + Fetch kafkaFetch `config:"fetch"` + Rebalance kafkaRebalance `config:"rebalance"` TLS *tlscommon.Config `config:"ssl"` Username string `config:"username"` Password string `config:"password"` @@ -96,12 +96,12 @@ func defaultConfig() kafkaInputConfig { ClientID: "filebeat", RetryBackoff: 2 * time.Second, MaxWaitTime: 250 * time.Millisecond, - Fetch: &kafkaFetch{ + Fetch: kafkaFetch{ Min: 1, Default: (1 << 20), // 1 MB Max: 0, }, - Rebalance: &kafkaRebalance{ + Rebalance: kafkaRebalance{ Strategy: rebalanceStrategyRange, Timeout: 60 * time.Second, MaxRetries: 4, diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index b91541fd5c4..be1fa7ff913 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -43,7 +43,6 @@ func init() { // Input contains the input and its config type kafkaInput struct { config kafkaInputConfig - rawConfig *common.Config // The Config given to NewInput context input.Context outlet channel.Outleter consumerGroup sarama.ConsumerGroup @@ -110,7 +109,6 @@ func NewInput( input := &kafkaInput{ config: config, - rawConfig: cfg, context: inputContext, outlet: out, consumerGroup: consumerGroup, @@ -205,10 +203,9 @@ func arrayForKafkaHeaders(headers []*sarama.RecordHeader) []interface{} { } // A barebones implementation of context.Context wrapped around the done -// channels that are more common in the beats codebase. This could be added -// as a utility in a shared part of the code, but right now it's a special -// case for sarama which requires a context.Context, so it's private until -// there's at least one other use case. +// channels that are more common in the beats codebase. +// TODO(faec): Generalize this to a common utility in a shared library +// (https://github.com/elastic/beats/issues/13125). type channelCtx <-chan struct{} func doneChannelContext(ch <-chan struct{}) context.Context { @@ -279,7 +276,6 @@ func (h groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim saram for msg := range claim.Messages() { event := h.createEvent(sess, claim, msg) h.input.outlet.OnEvent(event) - sess.MarkMessage(msg, "") } return nil } From 73aa0bc1c25739b6265833eb66eee23a257d44fe Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 31 Jul 2019 15:34:41 -0400 Subject: [PATCH 32/50] Fix kafka input Stop() --- filebeat/input/kafka/input.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index be1fa7ff913..3503b4b6e21 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -186,9 +186,12 @@ func (input *kafkaInput) Wait() { input.consumerGroup.Close() } -// Stop shuts down the Input by cancelling the internal context. +// Stop closes the input's outlet on close. We don't need to shutdown the +// kafka consumer group explicitly, because it listens to the original input +// done channel passed in by input.Runner, and that channel is already closed +// as part of the shutdown process in Runner.Stop(). func (input *kafkaInput) Stop() { - close(input.context.Done) + input.outlet.Close() } func arrayForKafkaHeaders(headers []*sarama.RecordHeader) []interface{} { From 76f77922a7254453bf1990b191efb1b274f644b5 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 1 Aug 2019 10:30:00 -0400 Subject: [PATCH 33/50] revised run loop in progress --- filebeat/input/kafka/config.go | 40 +++++---- filebeat/input/kafka/input.go | 89 +++++++++++-------- .../input/kafka/kafka_integration_test.go | 5 +- 3 files changed, 77 insertions(+), 57 deletions(-) diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index 7f810012790..cbc5b98071c 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -34,19 +34,20 @@ import ( type kafkaInputConfig struct { // Kafka hosts with port, e.g. "localhost:9092" - Hosts []string `config:"hosts" validate:"required"` - Topics []string `config:"topics" validate:"required"` - GroupID string `config:"group_id" validate:"required"` - ClientID string `config:"client_id"` - Version kafka.Version `config:"version"` - InitialOffset initialOffset `config:"initial_offset"` - RetryBackoff time.Duration `config:"retry_backoff" validate:"min=0"` - MaxWaitTime time.Duration `config:"max_wait_time"` - Fetch kafkaFetch `config:"fetch"` - Rebalance kafkaRebalance `config:"rebalance"` - TLS *tlscommon.Config `config:"ssl"` - Username string `config:"username"` - Password string `config:"password"` + Hosts []string `config:"hosts" validate:"required"` + Topics []string `config:"topics" validate:"required"` + GroupID string `config:"group_id" validate:"required"` + ClientID string `config:"client_id"` + Version kafka.Version `config:"version"` + InitialOffset initialOffset `config:"initial_offset"` + InitRetryBackoff time.Duration `config:"init_retry_backoff" validate:"min=0"` + ConsumeRetryBackoff time.Duration `config:"consume_retry_backoff" validate:"min=0"` + MaxWaitTime time.Duration `config:"max_wait_time"` + Fetch kafkaFetch `config:"fetch"` + Rebalance kafkaRebalance `config:"rebalance"` + TLS *tlscommon.Config `config:"ssl"` + Username string `config:"username"` + Password string `config:"password"` } type kafkaFetch struct { @@ -91,11 +92,12 @@ var ( // were chosen to match sarama's defaults. func defaultConfig() kafkaInputConfig { return kafkaInputConfig{ - Version: kafka.Version("1.0.0"), - InitialOffset: initialOffsetOldest, - ClientID: "filebeat", - RetryBackoff: 2 * time.Second, - MaxWaitTime: 250 * time.Millisecond, + Version: kafka.Version("1.0.0"), + InitialOffset: initialOffsetOldest, + ClientID: "filebeat", + InitRetryBackoff: 30 * time.Second, + ConsumeRetryBackoff: 2 * time.Second, + MaxWaitTime: 250 * time.Millisecond, Fetch: kafkaFetch{ Min: 1, Default: (1 << 20), // 1 MB @@ -137,7 +139,7 @@ func newSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) { k.Consumer.Return.Errors = true k.Consumer.Offsets.Initial = config.InitialOffset.asSaramaOffset() - k.Consumer.Retry.Backoff = config.RetryBackoff + k.Consumer.Retry.Backoff = config.ConsumeRetryBackoff k.Consumer.MaxWaitTime = config.MaxWaitTime k.Consumer.Fetch.Min = config.Fetch.Min diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 3503b4b6e21..3c0e4fed261 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/kafka" "github.com/elastic/beats/libbeat/logp" "github.com/pkg/errors" @@ -43,10 +44,10 @@ func init() { // Input contains the input and its config type kafkaInput struct { config kafkaInputConfig + saramaConfig *sarama.Config context input.Context outlet channel.Outleter consumerGroup sarama.ConsumerGroup - sessionState *kafkaSessionState log *logp.Logger runOnce sync.Once } @@ -101,19 +102,13 @@ func NewInput( if err != nil { return nil, errors.Wrap(err, "initializing Sarama config") } - consumerGroup, err := - sarama.NewConsumerGroup(config.Hosts, config.GroupID, saramaConfig) - if err != nil { - return nil, errors.Wrap(err, "initializing kafka consumer group") - } input := &kafkaInput{ - config: config, - context: inputContext, - outlet: out, - consumerGroup: consumerGroup, - sessionState: sessionState, - log: logp.NewLogger("kafka input").With("hosts", config.Hosts), + config: config, + saramaConfig: saramaConfig, + context: inputContext, + outlet: out, + log: logp.NewLogger("kafka input").With("hosts", config.Hosts), } return input, nil @@ -146,33 +141,52 @@ func (state *kafkaSessionState) setSession(sess sarama.ConsumerGroupSession) { state.mutex.Unlock() } +func (input *kafkaInput) runConsumerGroup() { + // Sarama uses standard go contexts to control cancellation, so we need + // to wrap our input context channel in that interface. + context := doneChannelContext(input.context.Done) + handler := &groupHandler{ + version: input.config.Version, + outlet: input.outlet, + } + + // Create a consumer group and listen to its error channel. + consumerGroup, err := + sarama.NewConsumerGroup( + input.config.Hosts, input.config.GroupID, input.saramaConfig) + if err != nil { + input.log.Errorw( + "Error initializing kafka consumer group", "error", err) + return + } + go func() { + for err := range consumerGroup.Errors() { + input.log.Errorw("Error reading from kafka", "error", err) + } + }() + + err = consumerGroup.Consume(context, input.config.Topics, handler) + if err != nil { + input.log.Errorw("Kafka consume error", "error", err) + } +} + // Run starts the input by scanning for incoming messages and errors. func (input *kafkaInput) Run() { input.runOnce.Do(func() { - // Track errors - go func() { - for err := range input.consumerGroup.Errors() { - input.log.Errorw("Error reading from kafka", "error", err) - } - }() - go func() { for { - handler := groupHandler{input: input} - - // Sarama uses standard go contexts to control cancellation, so we need - // to wrap our input context channel in that interface. - err := input.consumerGroup.Consume( - doneChannelContext(input.context.Done), input.config.Topics, handler) - if err != nil { - input.log.Errorw("Kafka consume error", "error", err) - } + // Try to start the consumer group event loop: create a consumer + // group client (wbich connects to the kafka cluster) and call + // Consume (which starts an asynchronous consumer). + input.runConsumerGroup() - // If Consume returned because the context was cancelled, don't resume. + // If runConsumerGroup returns, then either input.context.Done has + // been closed (in which case we should shut down) select { case <-input.context.Done: return - default: + case <-time.After(input.config.InitRetryBackoff): } } }() @@ -230,10 +244,13 @@ func (c channelCtx) Err() error { func (c channelCtx) Value(key interface{}) interface{} { return nil } type groupHandler struct { - input *kafkaInput + sync.Mutex + version kafka.Version + state kafkaSessionState + outlet channel.Outleter } -func (h groupHandler) createEvent( +func (h *groupHandler) createEvent( sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, message *sarama.ConsumerMessage, @@ -250,7 +267,7 @@ func (h groupHandler) createEvent( "offset": message.Offset, "key": message.Key, } - version, versionOk := h.input.config.Version.Get() + version, versionOk := h.version.Get() if versionOk && version.IsAtLeast(sarama.V0_10_0_0) { event.Timestamp = message.Timestamp if !message.BlockTimestamp.IsZero() { @@ -266,19 +283,19 @@ func (h groupHandler) createEvent( } func (h groupHandler) Setup(session sarama.ConsumerGroupSession) error { - h.input.sessionState.setSession(session) + h.state.setSession(session) return nil } func (h groupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { - h.input.sessionState.setSession(nil) + h.state.setSession(nil) return nil } func (h groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { event := h.createEvent(sess, claim, msg) - h.input.outlet.OnEvent(event) + h.outlet.OnEvent(event) } return nil } diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index e7ac3a39ef7..e022197813a 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -// +build integration +// + build integration package kafka @@ -117,7 +117,8 @@ func TestInput(t *testing.T) { }, } for _, m := range messages { - writeToKafkaTopic(t, testTopic, m.message, m.headers, time.Second*20) + fmt.Printf("Would have sent %v\n", m) + //(t, testTopic, m.message, m.headers, time.Second*20) } // Setup the input config From 1a29d7b4ac0ab038bf86fdfab3747702aec6fe9f Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 1 Aug 2019 12:14:37 -0400 Subject: [PATCH 34/50] refactor groupHandler / fix end-to-end ACK --- filebeat/input/kafka/input.go | 121 ++++++++---------- .../input/kafka/kafka_integration_test.go | 20 ++- 2 files changed, 73 insertions(+), 68 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 3c0e4fed261..6a5b7dd7f85 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -43,21 +43,13 @@ func init() { // Input contains the input and its config type kafkaInput struct { - config kafkaInputConfig - saramaConfig *sarama.Config - context input.Context - outlet channel.Outleter - consumerGroup sarama.ConsumerGroup - log *logp.Logger - runOnce sync.Once -} - -// A synchronized wrapper to read and write the kafka session, since it may -// change while ACKs are still pending. -type kafkaSessionState struct { - session sarama.ConsumerGroupSession - mutex sync.Mutex // Hold to access the session field - waitGroup sync.WaitGroup // Hold while using the session field + config kafkaInputConfig + saramaConfig *sarama.Config + context input.Context + outlet channel.Outleter + saramaWaitGroup sync.WaitGroup // indicates a sarama consumer group is active + log *logp.Logger + runOnce sync.Once } // NewInput creates a new kafka input @@ -67,26 +59,16 @@ func NewInput( inputContext input.Context, ) (input.Input, error) { - // We create the empty session state first because it must be referenced by - // the ACK callback in the connector configuration. - sessionState := &kafkaSessionState{} - out, err := connector.ConnectWith(cfg, beat.ClientConfig{ Processing: beat.ProcessingConfig{ DynamicFields: inputContext.DynamicFields, }, ACKEvents: func(events []interface{}) { - sessionState.accessSession(func(session sarama.ConsumerGroupSession) { - if session == nil { - // The kafka connection is closed and / or is being rebalanced. - return - } - for _, event := range events { - if cm, ok := event.(*sarama.ConsumerMessage); ok { - session.MarkMessage(cm, "") - } + for _, event := range events { + if meta, ok := event.(eventMeta); ok { + meta.handler.ack(meta.message) } - }) + } }, }) if err != nil { @@ -114,33 +96,6 @@ func NewInput( return input, nil } -// A helper to safely use the current sarama session for the duration of the -// given callback. Used when ACKing messages outside the body of the main -// sarama callbacks. The session parameter may be nil if there is no active -// session. -func (state *kafkaSessionState) accessSession( - fn func(session sarama.ConsumerGroupSession), -) { - state.mutex.Lock() - state.waitGroup.Add(1) - session := state.session - state.mutex.Unlock() - defer state.waitGroup.Done() - fn(session) -} - -// A helper to safely set the session field after waiting on any pending -// operations. -func (state *kafkaSessionState) setSession(sess sarama.ConsumerGroupSession) { - state.mutex.Lock() - // Once we claim the mutex we still wait for any pending ACKs to be - // sent. (These may well fail if the session is ending, but that's better - // than calling a stale pointer.) - state.waitGroup.Wait() - state.session = sess - state.mutex.Unlock() -} - func (input *kafkaInput) runConsumerGroup() { // Sarama uses standard go contexts to control cancellation, so we need // to wrap our input context channel in that interface. @@ -150,7 +105,7 @@ func (input *kafkaInput) runConsumerGroup() { outlet: input.outlet, } - // Create a consumer group and listen to its error channel. + // Create a consumer group and make sure it's closed before we return. consumerGroup, err := sarama.NewConsumerGroup( input.config.Hosts, input.config.GroupID, input.saramaConfig) @@ -159,6 +114,13 @@ func (input *kafkaInput) runConsumerGroup() { "Error initializing kafka consumer group", "error", err) return } + input.saramaWaitGroup.Add(1) + defer func() { + consumerGroup.Close() + input.saramaWaitGroup.Done() + }() + + // Listen asynchronously to any errors during the consume process go func() { for err := range consumerGroup.Errors() { input.log.Errorw("Error reading from kafka", "error", err) @@ -196,8 +158,8 @@ func (input *kafkaInput) Run() { // Wait shuts down the Input by cancelling the internal context. func (input *kafkaInput) Wait() { input.Stop() - // Wait for the consumer group to shut down - input.consumerGroup.Close() + // Wait for sarama to shut down + input.saramaWaitGroup.Wait() } // Stop closes the input's outlet on close. We don't need to shutdown the @@ -243,13 +205,24 @@ func (c channelCtx) Err() error { } func (c channelCtx) Value(key interface{}) interface{} { return nil } +// The group handler for the sarama consumer group interface. In addition to +// providing the basic consumption callbacks needed by sarama, groupHandler is +// also currently responsible for marshalling kafka messages into beat.Event, +// and passing ACKs from the output channel back to the kafka cluster. type groupHandler struct { sync.Mutex version kafka.Version - state kafkaSessionState + session sarama.ConsumerGroupSession outlet channel.Outleter } +// The metadata attached to incoming events so they can be ACKed once they've +// been successfully sent. +type eventMeta struct { + handler *groupHandler + message *sarama.ConsumerMessage +} + func (h *groupHandler) createEvent( sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, @@ -257,6 +230,10 @@ func (h *groupHandler) createEvent( ) beat.Event { event := beat.Event{ Timestamp: time.Now(), + Private: eventMeta{ + handler: h, + message: message, + }, } eventFields := common.MapStr{ "message": string(message.Value), @@ -282,17 +259,31 @@ func (h *groupHandler) createEvent( return event } -func (h groupHandler) Setup(session sarama.ConsumerGroupSession) error { - h.state.setSession(session) +func (h *groupHandler) Setup(session sarama.ConsumerGroupSession) error { + h.Lock() + h.session = session + h.Unlock() return nil } -func (h groupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { - h.state.setSession(nil) +func (h *groupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { + h.Lock() + h.session = nil + h.Unlock() return nil } -func (h groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { +// ack informs the kafka cluster that this message has been consumed. Called +// from the input's ACKEvents handler. +func (h *groupHandler) ack(message *sarama.ConsumerMessage) { + h.Lock() + if h.session != nil { + h.session.MarkMessage(message, "") + } + h.Unlock() +} + +func (h *groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { event := h.createEvent(sess, claim, msg) h.outlet.OnEvent(event) diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index e022197813a..52dd6cd3ad6 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -// + build integration +// +build integration package kafka @@ -117,8 +117,7 @@ func TestInput(t *testing.T) { }, } for _, m := range messages { - fmt.Printf("Would have sent %v\n", m) - //(t, testTopic, m.message, m.headers, time.Second*20) + writeToKafkaTopic(t, testTopic, m.message, m.headers, time.Second*20) } // Setup the input config @@ -160,6 +159,21 @@ func TestInput(t *testing.T) { t.Fatal("timeout waiting for incoming events") } } + + // Close the done channel and make sure the beat shuts down in a reasonable + // amount of time. + close(context.Done) + didClose := make(chan struct{}) + go func() { + input.Wait() + close(didClose) + }() + + select { + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for beat to shut down") + case <-didClose: + } } func checkMatchingHeaders( From c9031774939cc68890854c90e163df62e7d880a2 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 1 Aug 2019 12:34:27 -0400 Subject: [PATCH 35/50] Use strings for kafka headers --- filebeat/input/kafka/input.go | 4 ++-- filebeat/input/kafka/kafka_integration_test.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 6a5b7dd7f85..cf24311b521 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -174,8 +174,8 @@ func arrayForKafkaHeaders(headers []*sarama.RecordHeader) []interface{} { array := []interface{}{} for _, header := range headers { array = append(array, common.MapStr{ - "key": header.Key, - "value": header.Value, + "key": string(header.Key), + "value": string(header.Value), }) } return array diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index 52dd6cd3ad6..4de11a69378 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -// +build integration +// + build integration package kafka @@ -208,8 +208,8 @@ func checkMatchingHeaders( } key, _ := headerMap.GetValue("key") value, _ := headerMap.GetValue("value") - assert.Equal(t, expected[i].Key, key) - assert.Equal(t, expected[i].Value, value) + assert.Equal(t, string(expected[i].Key), key) + assert.Equal(t, string(expected[i].Value), value) } } From f5dd36060062067e7d57d9b12781237d478c0df5 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 1 Aug 2019 12:43:30 -0400 Subject: [PATCH 36/50] Make kafka message keys strings on indexing --- filebeat/input/kafka/input.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index cf24311b521..e8f6b611e41 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -242,7 +242,7 @@ func (h *groupHandler) createEvent( "topic": claim.Topic(), "partition": claim.Partition(), "offset": message.Offset, - "key": message.Key, + "key": string(message.Key), } version, versionOk := h.version.Get() if versionOk && version.IsAtLeast(sarama.V0_10_0_0) { From 4e73ac82ba5d4642cfc24a85affe2c180840835a Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 1 Aug 2019 12:46:43 -0400 Subject: [PATCH 37/50] Adjust config parameter names --- filebeat/input/kafka/config.go | 42 +++++++++++++++++----------------- filebeat/input/kafka/input.go | 5 ++-- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index cbc5b98071c..091d0f86b90 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -34,20 +34,20 @@ import ( type kafkaInputConfig struct { // Kafka hosts with port, e.g. "localhost:9092" - Hosts []string `config:"hosts" validate:"required"` - Topics []string `config:"topics" validate:"required"` - GroupID string `config:"group_id" validate:"required"` - ClientID string `config:"client_id"` - Version kafka.Version `config:"version"` - InitialOffset initialOffset `config:"initial_offset"` - InitRetryBackoff time.Duration `config:"init_retry_backoff" validate:"min=0"` - ConsumeRetryBackoff time.Duration `config:"consume_retry_backoff" validate:"min=0"` - MaxWaitTime time.Duration `config:"max_wait_time"` - Fetch kafkaFetch `config:"fetch"` - Rebalance kafkaRebalance `config:"rebalance"` - TLS *tlscommon.Config `config:"ssl"` - Username string `config:"username"` - Password string `config:"password"` + Hosts []string `config:"hosts" validate:"required"` + Topics []string `config:"topics" validate:"required"` + GroupID string `config:"group_id" validate:"required"` + ClientID string `config:"client_id"` + Version kafka.Version `config:"version"` + InitialOffset initialOffset `config:"initial_offset"` + ConnectBackoff time.Duration `config:"connect_backoff" validate:"min=0"` + ConsumeBackoff time.Duration `config:"consume_backoff" validate:"min=0"` + MaxWaitTime time.Duration `config:"max_wait_time"` + Fetch kafkaFetch `config:"fetch"` + Rebalance kafkaRebalance `config:"rebalance"` + TLS *tlscommon.Config `config:"ssl"` + Username string `config:"username"` + Password string `config:"password"` } type kafkaFetch struct { @@ -92,12 +92,12 @@ var ( // were chosen to match sarama's defaults. func defaultConfig() kafkaInputConfig { return kafkaInputConfig{ - Version: kafka.Version("1.0.0"), - InitialOffset: initialOffsetOldest, - ClientID: "filebeat", - InitRetryBackoff: 30 * time.Second, - ConsumeRetryBackoff: 2 * time.Second, - MaxWaitTime: 250 * time.Millisecond, + Version: kafka.Version("1.0.0"), + InitialOffset: initialOffsetOldest, + ClientID: "filebeat", + ConnectBackoff: 30 * time.Second, + ConsumeBackoff: 2 * time.Second, + MaxWaitTime: 250 * time.Millisecond, Fetch: kafkaFetch{ Min: 1, Default: (1 << 20), // 1 MB @@ -139,7 +139,7 @@ func newSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) { k.Consumer.Return.Errors = true k.Consumer.Offsets.Initial = config.InitialOffset.asSaramaOffset() - k.Consumer.Retry.Backoff = config.ConsumeRetryBackoff + k.Consumer.Retry.Backoff = config.ConsumeBackoff k.Consumer.MaxWaitTime = config.MaxWaitTime k.Consumer.Fetch.Min = config.Fetch.Min diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index e8f6b611e41..32db99aea92 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -144,11 +144,12 @@ func (input *kafkaInput) Run() { input.runConsumerGroup() // If runConsumerGroup returns, then either input.context.Done has - // been closed (in which case we should shut down) + // been closed (in which case we should shut down) or there was an + // error, and we should try running it again after the backoff interval. select { case <-input.context.Done: return - case <-time.After(input.config.InitRetryBackoff): + case <-time.After(input.config.ConnectBackoff): } } }() From f2441d83c31dcd58704af789f1241e198e198f0a Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 1 Aug 2019 15:02:41 -0400 Subject: [PATCH 38/50] Update changed config fields in docs --- filebeat/docs/inputs/input-kafka.asciidoc | 7 ++++++- filebeat/input/kafka/_meta/fields.yml | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/filebeat/docs/inputs/input-kafka.asciidoc b/filebeat/docs/inputs/input-kafka.asciidoc index 7af2b2c9ce0..a8e71095d11 100644 --- a/filebeat/docs/inputs/input-kafka.asciidoc +++ b/filebeat/docs/inputs/input-kafka.asciidoc @@ -68,7 +68,12 @@ The version of the Kafka protocol to use (defaults to `"1.0.0"`). The initial offset to start reading, either "oldest" or "newest". Defaults to "oldest". -===== `retry_backoff` +===== `connect_backoff` + +How long to wait before trying to reconnect to the kafka cluster after a +fatal error. Default is 30s. + +===== `consume_backoff` How long to wait before retrying a failed read. Default is 2s. diff --git a/filebeat/input/kafka/_meta/fields.yml b/filebeat/input/kafka/_meta/fields.yml index 7d10108d558..3803f635939 100644 --- a/filebeat/input/kafka/_meta/fields.yml +++ b/filebeat/input/kafka/_meta/fields.yml @@ -33,5 +33,5 @@ - name: kafka.headers type: array description: > - The array of kafka headers, each an object containing subfields + The array of kafka headers, each an object containing string subfields "key" and "value". From 6ad363ecb252587844e9853eae7f6679e9bc4ead Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 1 Aug 2019 15:24:19 -0400 Subject: [PATCH 39/50] Add IsolationLevel config option --- filebeat/input/kafka/config.go | 30 +++++++++++++++++++ .../input/kafka/kafka_integration_test.go | 2 +- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index 091d0f86b90..7e2ea6acb0a 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -43,6 +43,7 @@ type kafkaInputConfig struct { ConnectBackoff time.Duration `config:"connect_backoff" validate:"min=0"` ConsumeBackoff time.Duration `config:"consume_backoff" validate:"min=0"` MaxWaitTime time.Duration `config:"max_wait_time"` + IsolationLevel isolationLevel `config:"isolation_level"` Fetch kafkaFetch `config:"fetch"` Rebalance kafkaRebalance `config:"rebalance"` TLS *tlscommon.Config `config:"ssl"` @@ -77,6 +78,13 @@ const ( rebalanceStrategyRoundRobin ) +type isolationLevel int + +const ( + isolationLevelReadUncommitted = iota + isolationLevelReadCommitted +) + var ( initialOffsets = map[string]initialOffset{ "oldest": initialOffsetOldest, @@ -86,6 +94,10 @@ var ( "range": rebalanceStrategyRange, "roundrobin": rebalanceStrategyRoundRobin, } + isolationLevels = map[string]isolationLevel{ + "read_uncommitted": isolationLevelReadUncommitted, + "read_committed": isolationLevelReadCommitted, + } ) // The default config for the kafka input. When in doubt, default values @@ -98,6 +110,7 @@ func defaultConfig() kafkaInputConfig { ConnectBackoff: 30 * time.Second, ConsumeBackoff: 2 * time.Second, MaxWaitTime: 250 * time.Millisecond, + IsolationLevel: isolationLevelReadUncommitted, Fetch: kafkaFetch{ Min: 1, Default: (1 << 20), // 1 MB @@ -141,6 +154,7 @@ func newSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) { k.Consumer.Offsets.Initial = config.InitialOffset.asSaramaOffset() k.Consumer.Retry.Backoff = config.ConsumeBackoff k.Consumer.MaxWaitTime = config.MaxWaitTime + k.Consumer.IsolationLevel = config.IsolationLevel.asSaramaIsolationLevel() k.Consumer.Fetch.Min = config.Fetch.Min k.Consumer.Fetch.Default = config.Fetch.Default @@ -220,3 +234,19 @@ func (st *rebalanceStrategy) Unpack(value string) error { *st = strategy return nil } + +func (is isolationLevel) asSaramaIsolationLevel() sarama.IsolationLevel { + return map[isolationLevel]sarama.IsolationLevel{ + isolationLevelReadUncommitted: sarama.ReadUncommitted, + isolationLevelReadCommitted: sarama.ReadCommitted, + }[is] +} + +func (is *isolationLevel) Unpack(value string) error { + isolationLevel, ok := isolationLevels[value] + if !ok { + return fmt.Errorf("invalid isolation level '%s'", value) + } + *is = isolationLevel + return nil +} diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index 4de11a69378..08212651496 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -// + build integration +// +build integration package kafka From df98c90e3c3a1e46d2748a8e7cc2bd9e6d0dbf5e Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 1 Aug 2019 15:35:29 -0400 Subject: [PATCH 40/50] Document IsolationLevel --- filebeat/docs/fields.asciidoc | 2 +- filebeat/docs/inputs/input-kafka.asciidoc | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/filebeat/docs/fields.asciidoc b/filebeat/docs/fields.asciidoc index 0b2a8373341..4ef4af8db9f 100644 --- a/filebeat/docs/fields.asciidoc +++ b/filebeat/docs/fields.asciidoc @@ -7379,7 +7379,7 @@ type: date *`kafka.headers`*:: + -- -The array of kafka headers, each an object containing subfields "key" and "value". +The array of kafka headers, each an object containing string subfields "key" and "value". type: array diff --git a/filebeat/docs/inputs/input-kafka.asciidoc b/filebeat/docs/inputs/input-kafka.asciidoc index a8e71095d11..0c0b7a7afc5 100644 --- a/filebeat/docs/inputs/input-kafka.asciidoc +++ b/filebeat/docs/inputs/input-kafka.asciidoc @@ -82,6 +82,15 @@ How long to wait before retrying a failed read. Default is 2s. How long to wait for the minimum number of input bytes while reading. Default is 250ms. +===== `isolation_level` + +This configures the Kafka group isolation level: + +- `"read_uncommitted"` returns _all_ messages in the message channel. +- `"read_committed"` hides messages that are part of an aborted transaction. + +The default is `"read_uncommitted"`. + ===== `fetch` Kafka fetch settings: From a90d7c5bbae5bfcec8210298086de77a39324037 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 8 Aug 2019 15:32:34 -0400 Subject: [PATCH 41/50] working on corrected index template --- filebeat/_meta/fields.common.yml | 42 +++++++++ filebeat/docs/fields.asciidoc | 129 ++++++++++++-------------- filebeat/include/fields.go | 2 +- filebeat/input/kafka/_meta/fields.yml | 37 -------- 4 files changed, 104 insertions(+), 106 deletions(-) delete mode 100644 filebeat/input/kafka/_meta/fields.yml diff --git a/filebeat/_meta/fields.common.yml b/filebeat/_meta/fields.common.yml index 5674e7f9930..555438689c9 100644 --- a/filebeat/_meta/fields.common.yml +++ b/filebeat/_meta/fields.common.yml @@ -150,3 +150,45 @@ type: keyword description: > Name of organization associated with the autonomous system. + + - name: kafka + type: group + fields: + - name: topic + type: keyword + description: > + Kafka topic + + - name: partition + type: long + description: > + Kafka partition number + + - name: offset + type: long + description: > + Kafka offset of this message + + - name: key + type: keyword + description: > + Kafka key, corresponding to the Kafka value stored in the message + + - name: block_timestamp + type: date + description: > + Kafka outer (compressed) block timestamp + + - name: headers + type: nested + description: > + Kafka headers for this message. + fields: + - name: key + type: keyword + description: > + The key for a kafka header + - name: value + type: keyword + description: > + The value for a kafka header diff --git a/filebeat/docs/fields.asciidoc b/filebeat/docs/fields.asciidoc index 4ef4af8db9f..a3c3042baba 100644 --- a/filebeat/docs/fields.asciidoc +++ b/filebeat/docs/fields.asciidoc @@ -31,7 +31,6 @@ grouped in the following categories: * <> * <> * <> -* <> * <> * <> * <> @@ -7319,73 +7318,6 @@ type: text -- -[[exported-fields-kafka-input]] -== Kafka Input fields - -Kafka metadata added by the kafka input - - - -*`kafka.topic`*:: -+ --- -Kafka topic - - -type: keyword - --- - -*`kafka.partition`*:: -+ --- -Kafka partition number - - -type: long - --- - -*`kafka.offset`*:: -+ --- -Kafka offset of this message - - -type: long - --- - -*`kafka.key`*:: -+ --- -Kafka key, corresponding to the Kafka value stored in the message - - -type: keyword - --- - -*`kafka.block_timestamp`*:: -+ --- -Kafka outer (compressed) block timestamp - - -type: date - --- - -*`kafka.headers`*:: -+ --- -The array of kafka headers, each an object containing string subfields "key" and "value". - - -type: array - --- - [[exported-fields-kibana]] == kibana fields @@ -7895,6 +7827,67 @@ type: keyword -- + +*`kafka.topic`*:: ++ +-- +Kafka topic + + +type: keyword + +-- + +*`kafka.partition`*:: ++ +-- +Kafka partition number + + +type: long + +-- + +*`kafka.offset`*:: ++ +-- +Kafka offset of this message + + +type: long + +-- + +*`kafka.key`*:: ++ +-- +Kafka key, corresponding to the Kafka value stored in the message + + +type: keyword + +-- + +*`kafka.block_timestamp`*:: ++ +-- +Kafka outer (compressed) block timestamp + + +type: date + +-- + +*`kafka.headers`*:: ++ +-- +Kafka headers for this message. + + +type: nested + +-- + [[exported-fields-logstash]] == logstash fields diff --git a/filebeat/include/fields.go b/filebeat/include/fields.go index e4db3741d4e..4e1fd26a98d 100644 --- a/filebeat/include/fields.go +++ b/filebeat/include/fields.go @@ -32,5 +32,5 @@ func init() { // AssetFieldsYml returns asset data. // This is the base64 encoded gzipped contents of fields.yml. func AssetFieldsYml() string { - return "" + return "" } diff --git a/filebeat/input/kafka/_meta/fields.yml b/filebeat/input/kafka/_meta/fields.yml deleted file mode 100644 index 3803f635939..00000000000 --- a/filebeat/input/kafka/_meta/fields.yml +++ /dev/null @@ -1,37 +0,0 @@ -- key: kafka-input - title: Kafka Input - description: > - Kafka metadata added by the kafka input - short_config: false - anchor: kafka-input - fields: - - name: kafka.topic - type: keyword - description: > - Kafka topic - - - name: kafka.partition - type: long - description: > - Kafka partition number - - - name: kafka.offset - type: long - description: > - Kafka offset of this message - - - name: kafka.key - type: keyword - description: > - Kafka key, corresponding to the Kafka value stored in the message - - - name: kafka.block_timestamp - type: date - description: > - Kafka outer (compressed) block timestamp - - - name: kafka.headers - type: array - description: > - The array of kafka headers, each an object containing string subfields - "key" and "value". From c30212593dfa1341296943b4cd817772c722a8d0 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 9 Aug 2019 16:20:28 -0400 Subject: [PATCH 42/50] add compromise data layout for kafka headers --- filebeat/_meta/fields.common.yml | 14 +++----------- filebeat/input/kafka/input.go | 20 ++++++++++++++------ 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/filebeat/_meta/fields.common.yml b/filebeat/_meta/fields.common.yml index 555438689c9..da88144ca30 100644 --- a/filebeat/_meta/fields.common.yml +++ b/filebeat/_meta/fields.common.yml @@ -180,15 +180,7 @@ Kafka outer (compressed) block timestamp - name: headers - type: nested + type: array description: > - Kafka headers for this message. - fields: - - name: key - type: keyword - description: > - The key for a kafka header - - name: value - type: keyword - description: > - The value for a kafka header + An array of Kafka header strings for this message, in the form + ": ". diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 32db99aea92..2ffc1873772 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -19,6 +19,8 @@ package kafka import ( "context" + "fmt" + "strings" "sync" "time" @@ -171,13 +173,19 @@ func (input *kafkaInput) Stop() { input.outlet.Close() } -func arrayForKafkaHeaders(headers []*sarama.RecordHeader) []interface{} { - array := []interface{}{} +func arrayForKafkaHeaders(headers []*sarama.RecordHeader) []string { + array := []string{} for _, header := range headers { - array = append(array, common.MapStr{ - "key": string(header.Key), - "value": string(header.Value), - }) + // Rather than indexing headers in the same object structure Kafka does + // (which would give maximal fidelity, but would be effectively unsearchable + // in elasticsearch and kibana) we compromise by serializing them all as + // strings in the form ": ". For this we need to mask + // occurrences of ":" in the original key, which we expect to be uncommon. + // We may consider another approach in the future when it's more clear what + // the most common use cases are. + key := strings.ReplaceAll(string(header.Key), ":", "_") + value := string(header.Value) + array = append(array, fmt.Sprintf("%s: %s", key, value)) } return array } From 6ae3210149e94187f88ed14dd3fc5ec06f638e2b Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 9 Aug 2019 16:34:47 -0400 Subject: [PATCH 43/50] addressing review comments --- filebeat/input/kafka/input.go | 54 ++++++++++--------- .../input/kafka/kafka_integration_test.go | 2 +- 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 2ffc1873772..5156c73d2da 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -45,13 +45,13 @@ func init() { // Input contains the input and its config type kafkaInput struct { - config kafkaInputConfig - saramaConfig *sarama.Config - context input.Context - outlet channel.Outleter - saramaWaitGroup sync.WaitGroup // indicates a sarama consumer group is active - log *logp.Logger - runOnce sync.Once + config kafkaInputConfig + saramaConfig *sarama.Config + context input.Context + outlet channel.Outleter + saramaWaitGroup sync.WaitGroup // indicates a sarama consumer group is active + log *logp.Logger + runOnce, stopOnce sync.Once } // NewInput creates a new kafka input @@ -170,7 +170,9 @@ func (input *kafkaInput) Wait() { // done channel passed in by input.Runner, and that channel is already closed // as part of the shutdown process in Runner.Stop(). func (input *kafkaInput) Stop() { - input.outlet.Close() + input.stopOnce.Do(func() { + input.outlet.Close() + }) } func arrayForKafkaHeaders(headers []*sarama.RecordHeader) []string { @@ -237,34 +239,36 @@ func (h *groupHandler) createEvent( claim sarama.ConsumerGroupClaim, message *sarama.ConsumerMessage, ) beat.Event { - event := beat.Event{ - Timestamp: time.Now(), - Private: eventMeta{ - handler: h, - message: message, - }, - } - eventFields := common.MapStr{ - "message": string(message.Value), - } - kafkaMetadata := common.MapStr{ + timestamp := time.Now() + kafkaFields := common.MapStr{ "topic": claim.Topic(), "partition": claim.Partition(), "offset": message.Offset, "key": string(message.Key), } + version, versionOk := h.version.Get() if versionOk && version.IsAtLeast(sarama.V0_10_0_0) { - event.Timestamp = message.Timestamp + timestamp = message.Timestamp if !message.BlockTimestamp.IsZero() { - kafkaMetadata["block_timestamp"] = message.BlockTimestamp + kafkaFields["block_timestamp"] = message.BlockTimestamp } } if versionOk && version.IsAtLeast(sarama.V0_11_0_0) { - kafkaMetadata["headers"] = arrayForKafkaHeaders(message.Headers) + kafkaFields["headers"] = arrayForKafkaHeaders(message.Headers) } - eventFields["kafka"] = kafkaMetadata - event.Fields = eventFields + event := beat.Event{ + Timestamp: timestamp, + Fields: common.MapStr{ + "message": string(message.Value), + "kafka": kafkaFields, + }, + Private: eventMeta{ + handler: h, + message: message, + }, + } + return event } @@ -286,10 +290,10 @@ func (h *groupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { // from the input's ACKEvents handler. func (h *groupHandler) ack(message *sarama.ConsumerMessage) { h.Lock() + defer h.Unlock() if h.session != nil { h.session.MarkMessage(message, "") } - h.Unlock() } func (h *groupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index 08212651496..f303bab626b 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -121,7 +121,7 @@ func TestInput(t *testing.T) { } // Setup the input config - config, _ := common.NewConfigFrom(common.MapStr{ + config, _ := common.MustNewConfigFrom(common.MapStr{ "hosts": getTestKafkaHost(), "topics": []string{testTopic}, "group_id": "filebeat", From 43b5bec2a96b70ad10e2dc302a998836f920b387 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 9 Aug 2019 16:56:12 -0400 Subject: [PATCH 44/50] use exponential backoff for connecting --- filebeat/input/kafka/input.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 5156c73d2da..cb11f167061 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/backoff" "github.com/elastic/beats/libbeat/common/kafka" "github.com/elastic/beats/libbeat/logp" @@ -139,6 +140,13 @@ func (input *kafkaInput) runConsumerGroup() { func (input *kafkaInput) Run() { input.runOnce.Do(func() { go func() { + + // If the consumer fails to connect, we use exponential backoff with + // jitter up to 8 * the initial backoff interval. + backoff := backoff.NewEqualJitterBackoff( + input.context.Done, + input.config.ConnectBackoff, + 8*input.config.ConnectBackoff) for { // Try to start the consumer group event loop: create a consumer // group client (wbich connects to the kafka cluster) and call @@ -148,10 +156,18 @@ func (input *kafkaInput) Run() { // If runConsumerGroup returns, then either input.context.Done has // been closed (in which case we should shut down) or there was an // error, and we should try running it again after the backoff interval. + waitChan := make(chan struct{}) + go func() { + defer close(waitChan) + backoff.Wait() + }() + select { case <-input.context.Done: + // We are shutting down, return return - case <-time.After(input.config.ConnectBackoff): + case <-waitChan: + // We are still running after the backoff delay, try again } } }() From 62488ef5667e9ccd329fc73593e11fcc28dc69c2 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 9 Aug 2019 17:11:33 -0400 Subject: [PATCH 45/50] shutdown outlet via the CloseRef config field --- filebeat/input/kafka/input.go | 37 ++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index cb11f167061..c5217297c4b 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -46,13 +46,13 @@ func init() { // Input contains the input and its config type kafkaInput struct { - config kafkaInputConfig - saramaConfig *sarama.Config - context input.Context - outlet channel.Outleter - saramaWaitGroup sync.WaitGroup // indicates a sarama consumer group is active - log *logp.Logger - runOnce, stopOnce sync.Once + config kafkaInputConfig + saramaConfig *sarama.Config + context input.Context + outlet channel.Outleter + saramaWaitGroup sync.WaitGroup // indicates a sarama consumer group is active + log *logp.Logger + runOnce sync.Once } // NewInput creates a new kafka input @@ -73,6 +73,7 @@ func NewInput( } } }, + CloseRef: doneChannelContext(inputContext.Done), }) if err != nil { return nil, err @@ -174,23 +175,23 @@ func (input *kafkaInput) Run() { }) } -// Wait shuts down the Input by cancelling the internal context. +// Stop doesn't need to do anything because the kafka consumer group and the +// input's outlet both have a context based on input.context.Done and will +// shut themselves down, since the done channel is already closed as part of +// the shutdown process in Runner.Stop(). +func (input *kafkaInput) Stop() { +} + +// Wait should shut down the input and wait for it to complete, however (see +// Stop above) we don't need to take actions to shut down as long as the +// input.config.Done channel is closed, so we just make a (currently no-op) +// call to Stop() and then wait for sarama to signal completion. func (input *kafkaInput) Wait() { input.Stop() // Wait for sarama to shut down input.saramaWaitGroup.Wait() } -// Stop closes the input's outlet on close. We don't need to shutdown the -// kafka consumer group explicitly, because it listens to the original input -// done channel passed in by input.Runner, and that channel is already closed -// as part of the shutdown process in Runner.Stop(). -func (input *kafkaInput) Stop() { - input.stopOnce.Do(func() { - input.outlet.Close() - }) -} - func arrayForKafkaHeaders(headers []*sarama.RecordHeader) []string { array := []string{} for _, header := range headers { From 60f436c9ce93d9b5f6f8ef213c5a01880bd3d4f7 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 12 Aug 2019 15:07:25 -0400 Subject: [PATCH 46/50] Fixing backoff wait call --- filebeat/input/kafka/input.go | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index c5217297c4b..66ac66345ee 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -154,21 +154,13 @@ func (input *kafkaInput) Run() { // Consume (which starts an asynchronous consumer). input.runConsumerGroup() - // If runConsumerGroup returns, then either input.context.Done has - // been closed (in which case we should shut down) or there was an - // error, and we should try running it again after the backoff interval. - waitChan := make(chan struct{}) - go func() { - defer close(waitChan) - backoff.Wait() - }() + // If runConsumerGroup returns, we wait for the backoff interval. + backoff.Wait() + // Check the Done channel before we try again. select { case <-input.context.Done: - // We are shutting down, return return - case <-waitChan: - // We are still running after the backoff delay, try again } } }() From e223a64a43d54e836a34138c648e422468482ad1 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 12 Aug 2019 15:27:14 -0400 Subject: [PATCH 47/50] Add wait_close parameter --- filebeat/docs/fields.asciidoc | 4 ++-- filebeat/docs/inputs/input-kafka.asciidoc | 5 +++++ filebeat/include/fields.go | 2 +- filebeat/input/kafka/config.go | 2 ++ filebeat/input/kafka/input.go | 13 +++++++------ 5 files changed, 17 insertions(+), 9 deletions(-) diff --git a/filebeat/docs/fields.asciidoc b/filebeat/docs/fields.asciidoc index a3c3042baba..620839c5779 100644 --- a/filebeat/docs/fields.asciidoc +++ b/filebeat/docs/fields.asciidoc @@ -7881,10 +7881,10 @@ type: date *`kafka.headers`*:: + -- -Kafka headers for this message. +An array of Kafka header strings for this message, in the form ": ". -type: nested +type: array -- diff --git a/filebeat/docs/inputs/input-kafka.asciidoc b/filebeat/docs/inputs/input-kafka.asciidoc index 0c0b7a7afc5..ced85bc705d 100644 --- a/filebeat/docs/inputs/input-kafka.asciidoc +++ b/filebeat/docs/inputs/input-kafka.asciidoc @@ -82,6 +82,11 @@ How long to wait before retrying a failed read. Default is 2s. How long to wait for the minimum number of input bytes while reading. Default is 250ms. +===== `wait_close` + +When shutting down, how long to wait for in-flight messages to be delivered +and acknowledged. + ===== `isolation_level` This configures the Kafka group isolation level: diff --git a/filebeat/include/fields.go b/filebeat/include/fields.go index 4e1fd26a98d..b3b64ef47bc 100644 --- a/filebeat/include/fields.go +++ b/filebeat/include/fields.go @@ -32,5 +32,5 @@ func init() { // AssetFieldsYml returns asset data. // This is the base64 encoded gzipped contents of fields.yml. func AssetFieldsYml() string { - return "" + return "" } diff --git a/filebeat/input/kafka/config.go b/filebeat/input/kafka/config.go index 7e2ea6acb0a..ddc505bf4c7 100644 --- a/filebeat/input/kafka/config.go +++ b/filebeat/input/kafka/config.go @@ -42,6 +42,7 @@ type kafkaInputConfig struct { InitialOffset initialOffset `config:"initial_offset"` ConnectBackoff time.Duration `config:"connect_backoff" validate:"min=0"` ConsumeBackoff time.Duration `config:"consume_backoff" validate:"min=0"` + WaitClose time.Duration `config:"wait_close" validate:"min=0"` MaxWaitTime time.Duration `config:"max_wait_time"` IsolationLevel isolationLevel `config:"isolation_level"` Fetch kafkaFetch `config:"fetch"` @@ -109,6 +110,7 @@ func defaultConfig() kafkaInputConfig { ClientID: "filebeat", ConnectBackoff: 30 * time.Second, ConsumeBackoff: 2 * time.Second, + WaitClose: 2 * time.Second, MaxWaitTime: 250 * time.Millisecond, IsolationLevel: isolationLevelReadUncommitted, Fetch: kafkaFetch{ diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 66ac66345ee..1cea74d32db 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -62,6 +62,11 @@ func NewInput( inputContext input.Context, ) (input.Input, error) { + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return nil, errors.Wrap(err, "reading kafka input config") + } + out, err := connector.ConnectWith(cfg, beat.ClientConfig{ Processing: beat.ProcessingConfig{ DynamicFields: inputContext.DynamicFields, @@ -73,17 +78,13 @@ func NewInput( } } }, - CloseRef: doneChannelContext(inputContext.Done), + CloseRef: doneChannelContext(inputContext.Done), + WaitClose: config.WaitClose, }) if err != nil { return nil, err } - config := defaultConfig() - if err := cfg.Unpack(&config); err != nil { - return nil, errors.Wrap(err, "reading kafka input config") - } - saramaConfig, err := newSaramaConfig(config) if err != nil { return nil, errors.Wrap(err, "initializing Sarama config") From 135da3145097b0bc3b4bbf3993422fc6f807c890 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 12 Aug 2019 15:39:17 -0400 Subject: [PATCH 48/50] Update header handling in integration test --- .../input/kafka/kafka_integration_test.go | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index f303bab626b..4d4687d963a 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -// +build integration +// + build integration package kafka @@ -24,6 +24,7 @@ import ( "math/rand" "os" "strconv" + "strings" "sync" "testing" "time" @@ -121,7 +122,7 @@ func TestInput(t *testing.T) { } // Setup the input config - config, _ := common.MustNewConfigFrom(common.MapStr{ + config := common.MustNewConfigFrom(common.MapStr{ "hosts": getTestKafkaHost(), "topics": []string{testTopic}, "group_id": "filebeat", @@ -194,20 +195,21 @@ func checkMatchingHeaders( t.Error(err) return } - headerArray, ok := headers.([]interface{}) + headerArray, ok := headers.([]string) if !ok { - t.Error("event.Fields.kafka.headers isn't a []interface{}") + t.Error("event.Fields.kafka.headers isn't a []string") return } assert.Equal(t, len(expected), len(headerArray)) for i := 0; i < len(expected); i++ { - headerMap, ok := headerArray[i].(common.MapStr) - if !ok { - t.Errorf("event.Fields.kafka.headers[%v] isn't a MapStr", i) + splitIndex := strings.Index(headerArray[i], ": ") + if splitIndex == -1 { + t.Errorf( + "event.Fields.kafka.headers[%v] doesn't have form 'key: value'", i) continue } - key, _ := headerMap.GetValue("key") - value, _ := headerMap.GetValue("value") + key := headerArray[i][:splitIndex] + value := headerArray[i][splitIndex+2:] assert.Equal(t, string(expected[i].Key), key) assert.Equal(t, string(expected[i].Value), value) } From 5f64da9c78d5f1854a34d9ad93b9307439cd8e41 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 15 Aug 2019 11:22:34 -0400 Subject: [PATCH 49/50] Fix backoff handling again... --- filebeat/input/kafka/input.go | 19 +++++++------------ .../input/kafka/kafka_integration_test.go | 9 +++++---- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 1cea74d32db..3598241ec53 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -101,10 +101,7 @@ func NewInput( return input, nil } -func (input *kafkaInput) runConsumerGroup() { - // Sarama uses standard go contexts to control cancellation, so we need - // to wrap our input context channel in that interface. - context := doneChannelContext(input.context.Done) +func (input *kafkaInput) runConsumerGroup(context context.Context) { handler := &groupHandler{ version: input.config.Version, outlet: input.outlet, @@ -142,6 +139,9 @@ func (input *kafkaInput) runConsumerGroup() { func (input *kafkaInput) Run() { input.runOnce.Do(func() { go func() { + // Sarama uses standard go contexts to control cancellation, so we need + // to wrap our input context channel in that interface. + context := doneChannelContext(input.context.Done) // If the consumer fails to connect, we use exponential backoff with // jitter up to 8 * the initial backoff interval. @@ -149,20 +149,15 @@ func (input *kafkaInput) Run() { input.context.Done, input.config.ConnectBackoff, 8*input.config.ConnectBackoff) - for { + + for context.Err() == nil { // Try to start the consumer group event loop: create a consumer // group client (wbich connects to the kafka cluster) and call // Consume (which starts an asynchronous consumer). - input.runConsumerGroup() + input.runConsumerGroup(context) // If runConsumerGroup returns, we wait for the backoff interval. backoff.Wait() - - // Check the Done channel before we try again. - select { - case <-input.context.Done: - return - } } }() }) diff --git a/filebeat/input/kafka/kafka_integration_test.go b/filebeat/input/kafka/kafka_integration_test.go index 4d4687d963a..71e44cf6219 100644 --- a/filebeat/input/kafka/kafka_integration_test.go +++ b/filebeat/input/kafka/kafka_integration_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -// + build integration +// +build integration package kafka @@ -123,9 +123,10 @@ func TestInput(t *testing.T) { // Setup the input config config := common.MustNewConfigFrom(common.MapStr{ - "hosts": getTestKafkaHost(), - "topics": []string{testTopic}, - "group_id": "filebeat", + "hosts": getTestKafkaHost(), + "topics": []string{testTopic}, + "group_id": "filebeat", + "wait_close": 0, }) // Route input events through our capturer instead of sending through ES. From 419ddab0ca95685c157d809a53294c8c408d3eb2 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 15 Aug 2019 11:38:18 -0400 Subject: [PATCH 50/50] Adjust what the connection backoff responds to --- filebeat/input/kafka/input.go | 40 +++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 3598241ec53..98b7f15c1f0 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -101,21 +101,14 @@ func NewInput( return input, nil } -func (input *kafkaInput) runConsumerGroup(context context.Context) { +func (input *kafkaInput) runConsumerGroup( + context context.Context, consumerGroup sarama.ConsumerGroup, +) { handler := &groupHandler{ version: input.config.Version, outlet: input.outlet, } - // Create a consumer group and make sure it's closed before we return. - consumerGroup, err := - sarama.NewConsumerGroup( - input.config.Hosts, input.config.GroupID, input.saramaConfig) - if err != nil { - input.log.Errorw( - "Error initializing kafka consumer group", "error", err) - return - } input.saramaWaitGroup.Add(1) defer func() { consumerGroup.Close() @@ -129,7 +122,7 @@ func (input *kafkaInput) runConsumerGroup(context context.Context) { } }() - err = consumerGroup.Consume(context, input.config.Topics, handler) + err := consumerGroup.Consume(context, input.config.Topics, handler) if err != nil { input.log.Errorw("Kafka consume error", "error", err) } @@ -151,13 +144,24 @@ func (input *kafkaInput) Run() { 8*input.config.ConnectBackoff) for context.Err() == nil { - // Try to start the consumer group event loop: create a consumer - // group client (wbich connects to the kafka cluster) and call - // Consume (which starts an asynchronous consumer). - input.runConsumerGroup(context) - - // If runConsumerGroup returns, we wait for the backoff interval. - backoff.Wait() + // Connect to Kafka with a new consumer group. + consumerGroup, err := sarama.NewConsumerGroup( + input.config.Hosts, input.config.GroupID, input.saramaConfig) + if err != nil { + input.log.Errorw( + "Error initializing kafka consumer group", "error", err) + backoff.Wait() + continue + } + // We've successfully connected, reset the backoff timer. + backoff.Reset() + + // We have a connected consumer group now, try to start the main event + // loop by calling Consume (which starts an asynchronous consumer). + // In an ideal run, this function never returns until shutdown; if it + // does, it means the errors have been logged and the consumer group + // has been closed, so we try creating a new one in the next iteration. + input.runConsumerGroup(context, consumerGroup) } }() })