Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance kafka topic selection #2188

Merged
merged 1 commit into from
Aug 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d

*Affecting all Beats*
- Change Elasticsearch output index configuration to be based on format strings. If index has been configured, no date will be appended anymore to the index name. {pull}2119[2119]
- Replace `output.kafka.use_type` by `output.kafka.topic` accepting a format string. {pull}2188[2188]

*Metricbeat*
- Change field type system.process.cpu.start_time from keyword to date. {issue}1565[1565]
Expand Down Expand Up @@ -48,6 +49,9 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
- Add script to generate the Kibana index-pattern from fields.yml. {pull}2122[2122]
- Enhance redis output key selection based on format string. {pull}2169[2169]
- Configurable redis `keys` using filters and format strings. {pull}2169[2169]
- Add format string support to `output.kafka.topic`. {pull}2188[2188]
- Add `output.kafka.topics` for more advanced kafka topic selection per event. {pull}2188[2188]


*Metricbeat*

Expand Down
8 changes: 2 additions & 6 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -483,14 +483,10 @@ output.elasticsearch:
# to.
#hosts: ["localhost:9092"]

# The Kafka topic used for produced events. If use_type is set to true, the
# topic will not be used.
# The Kafka topic used for produced events. The setting can be a format string
# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# Set Kafka topic by event type. If use_type is false, the topic option must
# be configured. The default is false.
#use_type: false

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
8 changes: 2 additions & 6 deletions libbeat/_meta/config.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,10 @@ output.elasticsearch:
# to.
#hosts: ["localhost:9092"]

# The Kafka topic used for produced events. If use_type is set to true, the
# topic will not be used.
# The Kafka topic used for produced events. The setting can be a format string
# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# Set Kafka topic by event type. If use_type is false, the topic option must
# be configured. The default is false.
#use_type: false

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
7 changes: 2 additions & 5 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -543,11 +543,8 @@ The cluster metadata contain the actual Kafka brokers events are published to.

===== topic

The Kafka topic used for produced events. If `use_type` is set to true, the topic will not be used.

===== use_type

Set Kafka topic by event type. If `use_type` is false, the `topic` option must be configured. The default is false.
The Kafka topic used for produced events. The setting can be a format string
using any event field. To set the topic from document type use `%{[type]}`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should type be our default recommendation? Bringing up as we had recently some discussion around type. Perhaps better use beat.name by default?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default recommendation should be a static string (we do not set any default and require user to set the topic name). type is commonly used with filebeat + multiple prospectors setting document_type.


===== client_id

Expand Down
26 changes: 13 additions & 13 deletions libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/outil"
)

type client struct {
hosts []string
topic string
useType bool
config sarama.Config
hosts []string
topic outil.Selector
config sarama.Config

producer sarama.AsyncProducer

Expand All @@ -38,12 +38,15 @@ var (
publishEventsCallCount = expvar.NewInt("libbeat.kafka.call_count.PublishEvents")
)

func newKafkaClient(hosts []string, topic string, useType bool, cfg *sarama.Config) (*client, error) {
func newKafkaClient(
hosts []string,
topic outil.Selector,
cfg *sarama.Config,
) (*client, error) {
c := &client{
hosts: hosts,
useType: useType,
topic: topic,
config: *cfg,
hosts: hosts,
topic: topic,
config: *cfg,
}
return c, nil
}
Expand Down Expand Up @@ -103,10 +106,7 @@ func (c *client) AsyncPublishEvents(
ch := c.producer.Input()

for _, event := range events {
topic := c.topic
if c.useType {
topic = event["type"].(string)
}
topic, err := c.topic.Select(event)

jsonEvent, err := json.Marshal(event)
if err != nil {
Expand Down
8 changes: 0 additions & 8 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ type kafkaConfig struct {
TLS *outputs.TLSConfig `config:"tls"`
Timeout time.Duration `config:"timeout" validate:"min=1"`
Worker int `config:"worker" validate:"min=1"`
UseType bool `config:"use_type"`
Topic string `config:"topic"`
KeepAlive time.Duration `config:"keep_alive" validate:"min=0"`
MaxMessageBytes *int `config:"max_message_bytes" validate:"min=1"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Topic still exist, don't we need it here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Topic still exists, but not required here. outil.BuildSelectorFrom... looks for topic and topics building a topic-selector similar to pipelining/index selection in elasticsearch output.

RequiredACKs *int `config:"required_acks" validate:"min=-1"`
Expand All @@ -32,8 +30,6 @@ var (
TLS: nil,
Timeout: 30 * time.Second,
Worker: 1,
UseType: false,
Topic: "",
KeepAlive: 0,
MaxMessageBytes: nil, // use library default
RequiredACKs: nil, // use library default
Expand All @@ -50,10 +46,6 @@ func (c *kafkaConfig) Validate() error {
return errors.New("no hosts configured")
}

if c.UseType == false && c.Topic == "" {
return errors.New("use_type must be true or topic must be set")
}

if _, ok := compressionModes[strings.ToLower(c.Compression)]; !ok {
return fmt.Errorf("compression mode '%v' unknown", c.Compression)
}
Expand Down
20 changes: 16 additions & 4 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import (
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/mode"
"github.com/elastic/beats/libbeat/outputs/mode/modeutil"
"github.com/elastic/beats/libbeat/outputs/outil"
)

type kafka struct {
config kafkaConfig
topic outil.Selector

modeRetry mode.ConnectionMode
modeGuaranteed mode.ConnectionMode
Expand Down Expand Up @@ -71,7 +73,18 @@ func (k *kafka) init(cfg *common.Config) error {
return err
}

_, err := newKafkaConfig(&k.config)
var err error
k.topic, err = outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "topic",
MultiKey: "topics",
EnableSingleOnly: true,
FailEmpty: true,
})
if err != nil {
return err
}

_, err = newKafkaConfig(&k.config)
if err != nil {
return err
}
Expand All @@ -96,10 +109,9 @@ func (k *kafka) initMode(guaranteed bool) (mode.ConnectionMode, error) {

var clients []mode.AsyncProtocolClient
hosts := k.config.Hosts
topic := k.config.Topic
useType := k.config.UseType
topic := k.topic
for i := 0; i < worker; i++ {
client, err := newKafkaClient(hosts, topic, useType, libCfg)
client, err := newKafkaClient(hosts, topic, libCfg)
if err != nil {
logp.Err("Failed to create kafka client: %v", err)
return nil, err
Expand Down
14 changes: 9 additions & 5 deletions libbeat/outputs/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -47,7 +48,8 @@ func newTestKafkaClient(t *testing.T, topic string) *client {
hosts := []string{getTestKafkaHost()}
t.Logf("host: %v", hosts)

client, err := newKafkaClient(hosts, topic, false, nil)
sel := outil.MakeSelector(outil.ConstSelectorExpr(topic))
client, err := newKafkaClient(hosts, sel, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -57,11 +59,13 @@ func newTestKafkaClient(t *testing.T, topic string) *client {

func newTestKafkaOutput(t *testing.T, topic string, useType bool) outputs.Outputer {

if useType {
Copy link
Contributor

@ruflin ruflin Aug 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this to test BC compatibility? I assume it is just a simplification to keep the tests the same.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use_type has been removed from config. This is in order to keep the tests working without much change, as some tests use the event type to choose topic dynamically and other tests set static topic name. Alternatively one can change tests to pass format-string in topic and remove useType.

topic = "%{[type]}"
}
config := map[string]interface{}{
"hosts": []string{getTestKafkaHost()},
"timeout": "1s",
"topic": topic,
"use_type": useType,
"hosts": []string{getTestKafkaHost()},
"timeout": "1s",
"topic": topic,
}

cfg, err := common.NewConfigFrom(config)
Expand Down
8 changes: 2 additions & 6 deletions metricbeat/metricbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -405,14 +405,10 @@ output.elasticsearch:
# to.
#hosts: ["localhost:9092"]

# The Kafka topic used for produced events. If use_type is set to true, the
# topic will not be used.
# The Kafka topic used for produced events. The setting can be a format string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we already have some docs about how to use format string? If not, we should definitively create an issue for it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #2132

# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# Set Kafka topic by event type. If use_type is false, the topic option must
# be configured. The default is false.
#use_type: false

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
8 changes: 2 additions & 6 deletions packetbeat/packetbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -675,14 +675,10 @@ output.elasticsearch:
# to.
#hosts: ["localhost:9092"]

# The Kafka topic used for produced events. If use_type is set to true, the
# topic will not be used.
# The Kafka topic used for produced events. The setting can be a format string
# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# Set Kafka topic by event type. If use_type is false, the topic option must
# be configured. The default is false.
#use_type: false

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
8 changes: 2 additions & 6 deletions winlogbeat/winlogbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,10 @@ output.elasticsearch:
# to.
#hosts: ["localhost:9092"]

# The Kafka topic used for produced events. If use_type is set to true, the
# topic will not be used.
# The Kafka topic used for produced events. The setting can be a format string
# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# Set Kafka topic by event type. If use_type is false, the topic option must
# be configured. The default is false.
#use_type: false

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down