Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat] Add a Kafka input #12850

Merged
merged 53 commits into from
Aug 15, 2019
Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
c0dbd6a
Add kafka input to filebeat import list
faec Jun 26, 2019
878a9e7
Initial skeleton of filebeat kafka input
faec Jul 8, 2019
5f6c0d9
Cleanup
faec Jul 10, 2019
1c83d55
Turn on Wait() and Stop()
faec Jul 10, 2019
7298e9c
add InitialOffset configuration parameter
faec Jul 10, 2019
7345c18
Document new kafka output fields
faec Jul 10, 2019
b925014
Add username / password and ssl config
faec Jul 11, 2019
1f6ecc3
Add parameter for client id
faec Jul 11, 2019
207bebe
Add metric registry to sarama reader
faec Jul 11, 2019
ec386a0
Log kafka errors
faec Jul 11, 2019
0aaa710
Add remaining kafka metadata fields
faec Jul 16, 2019
95e6bff
document new metadata fields
faec Jul 16, 2019
5bb711c
Adjust kafka producer version / test message in tests
faec Jul 16, 2019
9601e0c
Don't record BlockTimestamp if it's zero
faec Jul 16, 2019
2d8d37c
Remove debug printf
faec Jul 16, 2019
e6b8b53
make fmt
faec Jul 17, 2019
a843dbc
Merge branch 'master' into kafka-input
faec Jul 18, 2019
426c98f
Add kafka container to filebeat integration tests
faec Jul 18, 2019
43f1cde
regenerate docs
faec Jul 18, 2019
da3eb99
Remove unused test helpers
faec Jul 18, 2019
3da5f99
Add header verification to kafka integration test
faec Jul 18, 2019
6bdb13f
Addressing review comments
faec Jul 22, 2019
bfeaeb0
Review comments
faec Jul 23, 2019
8061d86
Add several more kafka configuration settings
faec Jul 23, 2019
678d71b
Document kafka input configuration
faec Jul 25, 2019
b8b3446
Merge branch 'master' into kafka-input
faec Jul 26, 2019
6a45e05
Update for new outlet api
faec Jul 26, 2019
5cee082
Add end-to-end ACK to the kafka input / synchronize access to the ses…
faec Jul 29, 2019
a16e862
Update integration test
faec Jul 29, 2019
e2137cb
make integration test an integration test againk not unit
faec Jul 29, 2019
cd8a3d9
Replace sarama context with a minimal wrapper suggested by @urso
faec Jul 29, 2019
e54b9d8
Clarify docs
faec Jul 31, 2019
418364a
Addressing review comments
faec Jul 31, 2019
73aa0bc
Fix kafka input Stop()
faec Jul 31, 2019
76f7792
revised run loop in progress
faec Aug 1, 2019
1a29d7b
refactor groupHandler / fix end-to-end ACK
faec Aug 1, 2019
c903177
Use strings for kafka headers
faec Aug 1, 2019
f5dd360
Make kafka message keys strings on indexing
faec Aug 1, 2019
4e73ac8
Adjust config parameter names
faec Aug 1, 2019
f2441d8
Update changed config fields in docs
faec Aug 1, 2019
6aa839e
Merge branch 'master' into kafka-input
faec Aug 1, 2019
6ad363e
Add IsolationLevel config option
faec Aug 1, 2019
df98c90
Document IsolationLevel
faec Aug 1, 2019
a90d7c5
working on corrected index template
faec Aug 8, 2019
c302125
add compromise data layout for kafka headers
faec Aug 9, 2019
6ae3210
addressing review comments
faec Aug 9, 2019
43b5bec
use exponential backoff for connecting
faec Aug 9, 2019
62488ef
shutdown outlet via the CloseRef config field
faec Aug 9, 2019
60f436c
Fixing backoff wait call
faec Aug 12, 2019
e223a64
Add wait_close parameter
faec Aug 12, 2019
135da31
Update header handling in integration test
faec Aug 12, 2019
5f64da9
Fix backoff handling again...
faec Aug 15, 2019
419ddab
Adjust what the connection backoff responds to
faec Aug 15, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions filebeat/_meta/fields.common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,37 @@
type: keyword
description: >
Name of organization associated with the autonomous system.

- name: kafka
type: group
fields:
- name: topic
type: keyword
description: >
Kafka topic

- name: partition
type: long
description: >
Kafka partition number
Copy link
Member

@jsoriano jsoriano Aug 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@faec @urso I wonder if we should use the same fields metricbeat uses for this, in case we want to correlate and/or monitor the kafka topics used. Metricbeat uses kafka.topic.name for the topic and kafka.partition.id for the partition.

And I wonder if message specific fields (offset, key, headers) should be under an specific namespace as kafka.message, at least to differentiate the offset from other offsets.


- name: offset
type: long
description: >
Kafka offset of this message

- name: key
type: keyword
description: >
Kafka key, corresponding to the Kafka value stored in the message

- name: block_timestamp
type: date
description: >
Kafka outer (compressed) block timestamp

- name: headers
type: array
description: >
An array of Kafka header strings for this message, in the form
"<key>: <value>".
11 changes: 11 additions & 0 deletions filebeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ services:
- ES_PORT=9200
- ES_USER=beats
- ES_PASS=testing
- KAFKA_HOST=kafka
- KAFKA_PORT=9092
- KIBANA_HOST=kibana
- KIBANA_PORT=5601
working_dir: /go/src/github.com/elastic/beats/filebeat
Expand All @@ -27,6 +29,7 @@ services:
image: busybox
depends_on:
elasticsearch: { condition: service_healthy }
kafka: { condition: service_healthy }
kibana: { condition: service_healthy }
redis: { condition: service_healthy }

Expand All @@ -35,6 +38,14 @@ services:
file: ${ES_BEATS}/testing/environments/${TESTING_ENVIRONMENT}.yml
service: elasticsearch

kafka:
build: ${ES_BEATS}/testing/environments/docker/kafka
expose:
- 9092
- 2181
environment:
- ADVERTISED_HOST=kafka

kibana:
extends:
file: ${ES_BEATS}/testing/environments/${TESTING_ENVIRONMENT}.yml
Expand Down
61 changes: 61 additions & 0 deletions filebeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7827,6 +7827,67 @@ type: keyword

--


*`kafka.topic`*::
+
--
Kafka topic


type: keyword

--

*`kafka.partition`*::
+
--
Kafka partition number


type: long

--

*`kafka.offset`*::
+
--
Kafka offset of this message


type: long

--

*`kafka.key`*::
+
--
Kafka key, corresponding to the Kafka value stored in the message


type: keyword

--

*`kafka.block_timestamp`*::
+
--
Kafka outer (compressed) block timestamp


type: date

--

*`kafka.headers`*::
+
--
Kafka headers for this message.


type: nested

--

[[exported-fields-logstash]]
== logstash fields

Expand Down
121 changes: 121 additions & 0 deletions filebeat/docs/inputs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
:type: kafka

[id="{beatname_lc}-input-{type}"]
=== Kafka input

++++
<titleabbrev>Kafka</titleabbrev>
++++

Use the `kafka` input to read from topics in a Kafka cluster.

To configure this input, specify a list of one or more <<hosts,`hosts`>> in the
cluster to bootstrap the connection with, a list of <<topics,`topics`>> to
track, and a <<groupid,`group_id`>> for the connection.

Example configuration:

["source","yaml",subs="attributes"]
----
{beatname_lc}.inputs:
- type: kafka
hosts:
- kafka-broker-1:9092
- kafka-broker-2:9092
topics: ["my-topic"]
group_id: "filebeat"

----


[id="{beatname_lc}-input-{type}-options"]
==== Configuration options

The `kafka` input supports the following configuration options plus the
<<{beatname_lc}-input-{type}-common-options>> described later.

[float]
[[hosts]]
===== `hosts`

A list of Kafka bootstrapping hosts (brokers) for this cluster.

[float]
[[topics]]
===== `topics`

A list of topics to read from.

[float]
[[groupid]]
===== `group_id`

The Kafka consumer group id.

[float]
===== `client_id`

The Kafka client id (optional).

[float]
===== `version`

The version of the Kafka protocol to use (defaults to `"1.0.0"`).

[float]
===== `initial_offset`

The initial offset to start reading, either "oldest" or "newest". Defaults to
"oldest".

===== `connect_backoff`

How long to wait before trying to reconnect to the kafka cluster after a
fatal error. Default is 30s.

===== `consume_backoff`

How long to wait before retrying a failed read. Default is 2s.

===== `max_wait_time`

How long to wait for the minimum number of input bytes while reading. Default
is 250ms.

===== `isolation_level`

This configures the Kafka group isolation level:

- `"read_uncommitted"` returns _all_ messages in the message channel.
- `"read_committed"` hides messages that are part of an aborted transaction.

The default is `"read_uncommitted"`.

===== `fetch`

Kafka fetch settings:

*`min`*:: The minimum number of bytes to wait for. Defaults to 1.

*`default`*:: The default number of bytes to read per request. Defaults to 1MB.

*`max`*:: The maximum number of bytes to read per request. Defaults to 0
(no limit).

===== `rebalance`

Kafka rebalance settings:

*`strategy`*:: Either `"range"` or `"roundrobin"`. Defaults to `"range"`.

*`timeout`*:: How long to wait for an attempted rebalance. Defaults to 60s.

*`max_retries`*:: How many times to retry if rebalancing fails. Defaults to 4.

*`retry_backoff`*:: How long to wait after an unsuccessful rebalance attempt.
Defaults to 2s.

[id="{beatname_lc}-input-{type}-common-options"]
include::../inputs/input-common-options.asciidoc[]

:type!:
2 changes: 1 addition & 1 deletion filebeat/include/fields.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading