Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat] Add a Kafka input #12850

Merged
merged 53 commits into from
Aug 15, 2019
Merged

[filebeat] Add a Kafka input #12850

merged 53 commits into from
Aug 15, 2019

Conversation

faec
Copy link
Contributor

@faec faec commented Jul 10, 2019

Add a Kafka input to Filebeat (#7641).

@faec faec requested a review from a team as a code owner July 10, 2019 16:52
@faec faec requested a review from urso July 16, 2019 18:45
@faec faec changed the title [WIP] [filebeat] Add a Kafka input [filebeat] Add a Kafka input Jul 18, 2019
@faec faec added Filebeat Filebeat docs labels Jul 18, 2019
filebeat/input/kafka/config.go Outdated Show resolved Hide resolved
filebeat/input/kafka/config.go Outdated Show resolved Hide resolved
filebeat/input/kafka/config.go Show resolved Hide resolved
filebeat/input/kafka/input.go Outdated Show resolved Hide resolved
filebeat/input/kafka/input.go Show resolved Hide resolved
filebeat/input/kafka/input.go Outdated Show resolved Hide resolved
filebeat/input/kafka/input.go Show resolved Hide resolved
filebeat/input/kafka/input.go Outdated Show resolved Hide resolved
})
}
return array
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have duplicate headers?

Would it make sense to combine headers and only have the return type map[string]string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this, but as far as I can tell duplicate header keys are valid (if not in the spec then at least in the implementation -- when I post events with duplicate headers, it passes them on unchanged to the consumer)

Copy link

@urso urso Jul 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking the kafka implementation it is indeed just a list of headers, allowing for duplicates.
This is not optimal for querying/filtering events via kibana. A simple map[string]string would fit kibana UI better. I wonder if map[string][]string might be a better compromise.

We should test how the differnce []map[string]string and map[string][]string affect the usability.

filebeat/input/kafka/input.go Outdated Show resolved Hide resolved
@urso
Copy link

urso commented Jul 21, 2019

For adding end-to-end ACK support you will need this: #12997

end-to-end ACK becomes possible via:

    ...
    out, err := outlet.ConnectWith(cfg, beat.ClientConfig{
		Processing: beat.ProcessingConfig{
			DynamicFields: context.DynamicFields,
		},
        ACKEvents: func(privates []interface{}) {
            for _, priv := range []privates {
                if cm, ok := priv.(*sarama.ConsumerMessage); ok {
                    sess.MarkMessage(cm, "")
                }
            }
        },
	})
    ...

    var msg *sarama.ConsumerMessage
    ...
    out.OnEvent(beat.Event{
        Timestamp: ...
        Fields: ...
        Meta: ...
        Private: msg
    })

The ACKEvents callback will be called with the 'Private' field of all events recently ACKed. The pipeline ensures that ACKs are returned in the same order as you published your events.

Note: I have the sessions available in the callback. This is motivated by the fact that a pipeline.Client is not necessarily multithreading safe (we just made it safe because of the logs input in filebeat).

@faec
Copy link
Contributor Author

faec commented Aug 1, 2019

I've revised the core run loop to be more robust, fixed end-to-end ACK per @urso's suggestions about groupHandler, and fixed a shutdown bug that I came across in the process. The integration test now waits for the input to shutdown and confirms it takes <30sec.

I also looked at the header issue in a live index, and currently it's definitely not what we want:

curl "http://localhost:9200/filebeat-8.0.0-2019.08.01-000001/_search?pretty"
...
            "headers" : [
              {
                "key" : [
                  107,
                  101,
                  121,
                  115,
                  32,
                  97,
                  110,
...

I initially assumed a data blob was the safest since that's what's in the spec, but given this outcome, absent better suggestions, I'll make them strings and leave it to the receiver to re-extract a data blob if that's truly their intent.

faec added a commit that referenced this pull request Aug 1, 2019
The motivation for this update is to support the IsolationFlag configuration parameter in #12850, but this PR is just a version bump with no functional changes.
select {
case <-input.context.Done:
return
case <-time.After(input.config.ConnectBackoff):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We normally apply some exponential backoff with jitter. See package libbeat/common/backoff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The backoff package works only based on the Wait function, not a channel, so I had to wrap it in an auxiliary channel to handle shutdown signals, let me know if you see a prettier approach.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internally the implementation uses a channel. We could introduce a method like C() <- chan time.Time, which is created by a timer. If the current timer is not nil, it should be stopped and reset when calling C.

filebeat/input/kafka/input.go Show resolved Hide resolved
filebeat/input/kafka/input.go Outdated Show resolved Hide resolved
filebeat/input/kafka/input.go Outdated Show resolved Hide resolved
filebeat/input/kafka/input.go Show resolved Hide resolved
filebeat/input/kafka/kafka_integration_test.go Outdated Show resolved Hide resolved
@faec
Copy link
Contributor Author

faec commented Aug 8, 2019

Update on the headers issue, following several tests and an offline discussion: the suggestion above about writing them to map[string][]string doesn't allow for the kind of searching we'd like. The current container, []struct{ key, value: string }, can work in theory as long as it's indexed as a nested type. I've updated filebeat/_meta/fields.common.yml with something near the correct config, but for some reason the properties inside headers (key, value) do not appear in the index template.

I can create index templates for fields like this manually, but when the template is built by filebeat the subfields are omitted. None of our beats / modules / etc currently use a nested type with defined subfields, so it's not clear to me yet whether the definition in fields.common.yml is somehow incorrect, or if the template generator doesn't handle subfields of a nested field.

filebeat/input/kafka/input.go Outdated Show resolved Hide resolved
@urso urso requested a review from dedemorton August 15, 2019 16:21
@urso
Copy link

urso commented Aug 15, 2019

@dedemorton Can you have a look at the docs?

@faec faec merged commit be940a8 into elastic:master Aug 15, 2019
- name: partition
type: long
description: >
Kafka partition number
Copy link
Member

@jsoriano jsoriano Aug 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@faec @urso I wonder if we should use the same fields metricbeat uses for this, in case we want to correlate and/or monitor the kafka topics used. Metricbeat uses kafka.topic.name for the topic and kafka.partition.id for the partition.

And I wonder if message specific fields (offset, key, headers) should be under an specific namespace as kafka.message, at least to differentiate the offset from other offsets.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants