From b23ed80970ead275bef29aba04b0cef34e455d9d Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 10 Aug 2016 17:28:47 +0200 Subject: [PATCH 1/6] Add RunBytes to EventStringFormatter --- libbeat/common/fmtstr/formatevents.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/libbeat/common/fmtstr/formatevents.go b/libbeat/common/fmtstr/formatevents.go index ba0a3dfcf6d..494480d9136 100644 --- a/libbeat/common/fmtstr/formatevents.go +++ b/libbeat/common/fmtstr/formatevents.go @@ -190,6 +190,23 @@ func (fs *EventFormatString) Run(event common.MapStr) (string, error) { return ctx.buf.String(), nil } +// RunBytes executes the format string returning a new expanded string of type +// `[]byte` or an error if execution or event field expansion fails. +func (fs *EventFormatString) RunBytes(event common.MapStr) ([]byte, error) { + ctx := newEventCtx(len(fs.fields)) + defer releaseCtx(ctx) + + buf := bytes.NewBuffer(nil) + if err := fs.collectFields(ctx, event); err != nil { + return nil, err + } + err := fs.formatter.Eval(ctx, buf) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + // Eval executes the format string, writing the resulting string into the provided output buffer. Returns error if execution or event field expansion fails. func (fs *EventFormatString) Eval(out *bytes.Buffer, event common.MapStr) error { ctx := newEventCtx(len(fs.fields)) From 48fbe60b7f75ac5bf062da25446071118c20590e Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 18 Aug 2016 13:43:02 +0200 Subject: [PATCH 2/6] Make common.Time hashable --- libbeat/common/datetime.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/libbeat/common/datetime.go b/libbeat/common/datetime.go index 7c5010d5d14..b36d9170167 100644 --- a/libbeat/common/datetime.go +++ b/libbeat/common/datetime.go @@ -1,8 +1,10 @@ package common import ( + "encoding/binary" "encoding/json" "errors" + "hash" "time" ) @@ -30,6 +32,11 @@ func (t *Time) UnmarshalJSON(data []byte) (err error) { return } +func (t Time) Hash32(h hash.Hash32) error { + err := binary.Write(h, binary.LittleEndian, time.Time(t).UnixNano()) + return err +} + // ParseTime parses a time in the TsLayout format. func ParseTime(timespec string) (Time, error) { t, err := time.Parse(TsLayout, timespec) From 8a5af7067d78e64af79a6d75b67df43c45582c5d Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 10 Aug 2016 11:53:24 +0200 Subject: [PATCH 3/6] Configurable kafka partitioning - add config settings to configure partition strategy - add config settings to configure partitioner selecting only reachable partitions - Store kafka message in data.values so original message including partition selection is still available on retry - make kafka message key configurable - update changelog --- CHANGELOG.asciidoc | 3 + libbeat/outputs/kafka/client.go | 88 ++++++--- libbeat/outputs/kafka/config.go | 36 ++-- libbeat/outputs/kafka/kafka.go | 37 +++- libbeat/outputs/kafka/message.go | 46 +++++ libbeat/outputs/kafka/partition.go | 297 +++++++++++++++++++++++++++++ 6 files changed, 454 insertions(+), 53 deletions(-) create mode 100644 libbeat/outputs/kafka/message.go create mode 100644 libbeat/outputs/kafka/partition.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index d96945de8d0..30f79309746 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -68,6 +68,9 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d - Make Kafka metadata update configurable. {pull}2190[2190] - Add kafka version setting (optional) enabling kafka broker version support. {pull}2190[2190] - Add kafka message timestamp if at least version 0.10 is configured. {pull}2190[2190] +- Add configurable kafka event key setting. {pull}2284[2284] +- Add settings for configuring the kafka partitioning strategy. {pull}2284[2284] +- Add partitioner settings `reachable_only` to ignore partitions not reachable by network. {pull}2284[2284] - Enhance contains condition to work on fields that are arrays of strings. {issue}2237[2237] - Lookup the configuration file relative to the `-path.config` CLI flag. {pull}2245[2245] - Re-write import_dashboards.sh in Golang. {pull}2155[2155] diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index def44cb6419..2bccf445f80 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -3,6 +3,7 @@ package kafka import ( "encoding/json" "expvar" + "fmt" "sync" "sync/atomic" "time" @@ -10,6 +11,7 @@ import ( "github.com/Shopify/sarama" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/outil" @@ -18,6 +20,7 @@ import ( type client struct { hosts []string topic outil.Selector + key *fmtstr.EventFormatString config sarama.Config producer sarama.AsyncProducer @@ -41,12 +44,14 @@ var ( func newKafkaClient( hosts []string, + key *fmtstr.EventFormatString, topic outil.Selector, cfg *sarama.Config, ) (*client, error) { c := &client{ hosts: hosts, topic: topic, + key: key, config: *cfg, } return c, nil @@ -106,48 +111,74 @@ func (c *client) AsyncPublishEvents( ch := c.producer.Input() - for _, d := range data { - event := d.Event - topic, err := c.topic.Select(event) - var ts time.Time - - // message timestamps have been added to kafka with version 0.10.0.0 - if c.config.Version.IsAtLeast(sarama.V0_10_0_0) { - if tsRaw, ok := event["@timestamp"]; ok { - if tmp, ok := tsRaw.(common.Time); ok { - ts = time.Time(tmp) - } else if tmp, ok := tsRaw.(time.Time); ok { - ts = tmp - } - } - } + for i := range data { + d := &data[i] - jsonEvent, err := json.Marshal(event) + msg, err := c.getEventMessage(d) if err != nil { + logp.Err("Dropping event: %v", err) ref.done() continue } + msg.ref = ref + + msg.initProducerMessage() + ch <- &msg.msg + } + + return nil +} + +func (c *client) getEventMessage(data *outputs.Data) (*message, error) { + event := data.Event + msg := messageFromData(data) + if msg.topic != "" { + return msg, nil + } + + msg.event = event - msg := &sarama.ProducerMessage{ - Metadata: ref, - Topic: topic, - Value: sarama.ByteEncoder(jsonEvent), - Timestamp: ts, + topic, err := c.topic.Select(event) + if err != nil { + return nil, fmt.Errorf("setting kafka topic failed with %v", err) + } + msg.topic = topic + + jsonEvent, err := json.Marshal(event) + if err != nil { + return nil, fmt.Errorf("json encoding failed with %v", err) + } + msg.value = jsonEvent + + // message timestamps have been added to kafka with version 0.10.0.0 + var ts time.Time + if c.config.Version.IsAtLeast(sarama.V0_10_0_0) { + if tsRaw, ok := event["@timestamp"]; ok { + if tmp, ok := tsRaw.(common.Time); ok { + ts = time.Time(tmp) + } else if tmp, ok := tsRaw.(time.Time); ok { + ts = tmp + } } + } + msg.ts = ts - ch <- msg + if c.key != nil { + if key, err := c.key.RunBytes(event); err == nil { + msg.key = key + } } - return nil + return msg, nil } func (c *client) successWorker(ch <-chan *sarama.ProducerMessage) { defer c.wg.Done() defer debugf("Stop kafka ack worker") - for msg := range ch { - ref := msg.Metadata.(*msgRef) - ref.done() + for libMsg := range ch { + msg := libMsg.Metadata.(*message) + msg.ref.done() } } @@ -156,9 +187,8 @@ func (c *client) errorWorker(ch <-chan *sarama.ProducerError) { defer debugf("Stop kafka error handler") for errMsg := range ch { - msg := errMsg.Msg - ref := msg.Metadata.(*msgRef) - ref.fail(errMsg.Err) + msg := errMsg.Msg.Metadata.(*message) + msg.ref.fail(errMsg.Err) } } diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 2094b4a776b..a24a340e597 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -6,26 +6,30 @@ import ( "strings" "time" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/outputs" ) type kafkaConfig struct { - Hosts []string `config:"hosts" validate:"required"` - TLS *outputs.TLSConfig `config:"tls"` - Timeout time.Duration `config:"timeout" validate:"min=1"` - Worker int `config:"worker" validate:"min=1"` - Metadata metaConfig `config:"metadata"` - KeepAlive time.Duration `config:"keep_alive" validate:"min=0"` - MaxMessageBytes *int `config:"max_message_bytes" validate:"min=1"` - RequiredACKs *int `config:"required_acks" validate:"min=-1"` - BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"` - Compression string `config:"compression"` - Version string `config:"version"` - MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` - ClientID string `config:"client_id"` - ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"` - Username string `config:"username"` - Password string `config:"password"` + Hosts []string `config:"hosts" validate:"required"` + TLS *outputs.TLSConfig `config:"tls"` + Timeout time.Duration `config:"timeout" validate:"min=1"` + Worker int `config:"worker" validate:"min=1"` + Metadata metaConfig `config:"metadata"` + Key *fmtstr.EventFormatString `config:"key"` + Partition map[string]*common.Config `config:"partition"` + KeepAlive time.Duration `config:"keep_alive" validate:"min=0"` + MaxMessageBytes *int `config:"max_message_bytes" validate:"min=1"` + RequiredACKs *int `config:"required_acks" validate:"min=-1"` + BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"` + Compression string `config:"compression"` + Version string `config:"version"` + MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` + ClientID string `config:"client_id"` + ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"` + Username string `config:"username"` + Password string `config:"password"` } type metaConfig struct { diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index 0e3322de1fb..b0f8220427d 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -23,6 +23,8 @@ type kafka struct { modeRetry mode.ConnectionMode modeGuaranteed mode.ConnectionMode + + partitioner sarama.PartitionerConstructor } const ( @@ -87,13 +89,12 @@ func New(beatName string, cfg *common.Config, topologyExpire int) (outputs.Outpu func (k *kafka) init(cfg *common.Config) error { debugf("initialize kafka output") - k.config = defaultConfig - if err := cfg.Unpack(&k.config); err != nil { + config := defaultConfig + if err := cfg.Unpack(&config); err != nil { return err } - var err error - k.topic, err = outil.BuildSelectorFromConfig(cfg, outil.Settings{ + topic, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{ Key: "topic", MultiKey: "topics", EnableSingleOnly: true, @@ -103,7 +104,17 @@ func (k *kafka) init(cfg *common.Config) error { return err } - _, err = newKafkaConfig(&k.config) + partitioner, err := makePartitioner(config.Partition) + if err != nil { + return err + } + + k.config = config + k.partitioner = partitioner + k.topic = topic + + // validate config one more time + _, err = k.newKafkaConfig() if err != nil { return err } @@ -112,7 +123,7 @@ func (k *kafka) init(cfg *common.Config) error { } func (k *kafka) initMode(guaranteed bool) (mode.ConnectionMode, error) { - libCfg, err := newKafkaConfig(&k.config) + libCfg, err := k.newKafkaConfig() if err != nil { return nil, err } @@ -130,7 +141,7 @@ func (k *kafka) initMode(guaranteed bool) (mode.ConnectionMode, error) { hosts := k.config.Hosts topic := k.topic for i := 0; i < worker; i++ { - client, err := newKafkaClient(hosts, topic, libCfg) + client, err := newKafkaClient(hosts, k.config.Key, topic, libCfg) if err != nil { logp.Err("Failed to create kafka client: %v", err) return nil, err @@ -212,6 +223,16 @@ func (k *kafka) BulkPublish( return mode.PublishEvents(signal, opts, data) } +func (k *kafka) newKafkaConfig() (*sarama.Config, error) { + cfg, err := newKafkaConfig(&k.config) + if err != nil { + return nil, err + } + + cfg.Producer.Partitioner = k.partitioner + return cfg, nil +} + func newKafkaConfig(config *kafkaConfig) (*sarama.Config, error) { k := sarama.NewConfig() @@ -251,7 +272,7 @@ func newKafkaConfig(config *kafkaConfig) (*sarama.Config, error) { compressionMode, ok := compressionModes[strings.ToLower(config.Compression)] if !ok { - return nil, fmt.Errorf("Unknown compression mode: %v", config.Compression) + return nil, fmt.Errorf("Unknown compression mode: '%v'", config.Compression) } k.Producer.Compression = compressionMode diff --git a/libbeat/outputs/kafka/message.go b/libbeat/outputs/kafka/message.go new file mode 100644 index 00000000000..20001d65115 --- /dev/null +++ b/libbeat/outputs/kafka/message.go @@ -0,0 +1,46 @@ +package kafka + +import ( + "time" + + "github.com/Shopify/sarama" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/outputs" +) + +type message struct { + msg sarama.ProducerMessage + + topic string + key []byte + value []byte + ref *msgRef + ts time.Time + + hash uint32 + partition int32 + + event common.MapStr +} + +var kafkaMessageKey interface{} = int(0) + +func messageFromData(d *outputs.Data) *message { + if m, found := d.Values.Get(kafkaMessageKey); found { + return m.(*message) + } + + m := &message{partition: -1} + d.AddValue(kafkaMessageKey, m) + return m +} + +func (m *message) initProducerMessage() { + m.msg = sarama.ProducerMessage{ + Metadata: m, + Topic: m.topic, + Key: sarama.ByteEncoder(m.key), + Value: sarama.ByteEncoder(m.value), + Timestamp: m.ts, + } +} diff --git a/libbeat/outputs/kafka/partition.go b/libbeat/outputs/kafka/partition.go new file mode 100644 index 00000000000..70e3091efdd --- /dev/null +++ b/libbeat/outputs/kafka/partition.go @@ -0,0 +1,297 @@ +package kafka + +import ( + "encoding/binary" + "errors" + "fmt" + "hash" + "hash/fnv" + "math/rand" + "strconv" + + "github.com/Shopify/sarama" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +type partitionBuilder func(*common.Config) (func() partitioner, error) + +type partitioner func(*message, int32) (int32, error) + +// stablePartitioner re-uses last configured partition in case of event being +// repartitioned (on retry from libbeat). +type messagePartitioner struct { + p partitioner + reachable bool + partitions int32 // number of partitions seen last +} + +func makePartitioner( + partition map[string]*common.Config, +) (sarama.PartitionerConstructor, error) { + mkStrategy, reachable, err := initPartitionStrategy(partition) + if err != nil { + return nil, err + } + + return func(topic string) sarama.Partitioner { + return &messagePartitioner{ + p: mkStrategy(), + reachable: reachable, + } + }, nil +} + +var partitioners = map[string]partitionBuilder{ + "random": cfgRandomPartitioner, + "round_robin": cfgRoundRobinPartitioner, + "hash": cfgHashPartitioner, +} + +func initPartitionStrategy( + partition map[string]*common.Config, +) (func() partitioner, bool, error) { + if len(partition) == 0 { + // default use `hash` partitioner + all partitions (block if unreachable) + return makeHashPartitioner, false, nil + } + + if len(partition) > 1 { + return nil, false, errors.New("Too many partitioners") + } + + // extract partitioner from config + var name string + var config *common.Config + for n, c := range partition { + name, config = n, c + } + + // instantiate partitioner strategy + mk := partitioners[name] + if mk == nil { + return nil, false, fmt.Errorf("unknown kafka partition mode %v", name) + } + constr, err := mk(config) + if err != nil { + return nil, false, err + } + + // parse shared config + cfg := struct { + Reachable bool `config:"reachable_only"` + }{ + Reachable: false, + } + err = config.Unpack(&cfg) + if err != nil { + return nil, false, err + } + + return constr, cfg.Reachable, nil +} + +func (p *messagePartitioner) RequiresConsistency() bool { return !p.reachable } +func (p *messagePartitioner) Partition( + libMsg *sarama.ProducerMessage, + numPartitions int32, +) (int32, error) { + msg := libMsg.Metadata.(*message) + if numPartitions == p.partitions { // if reachable is false, this is always true + if 0 <= msg.partition && msg.partition < numPartitions { + return msg.partition, nil + } + } + + partition, err := p.p(msg, numPartitions) + if err != nil { + return 0, nil + } + + msg.partition = partition + p.partitions = numPartitions + return msg.partition, nil +} + +func cfgRandomPartitioner(config *common.Config) (func() partitioner, error) { + cfg := struct { + GroupEvents int `config:"group_events" validate:"min=1"` + }{ + GroupEvents: 1, + } + if err := config.Unpack(&cfg); err != nil { + return nil, err + } + + return func() partitioner { + generator := rand.New(rand.NewSource(rand.Int63())) + N := cfg.GroupEvents + count := cfg.GroupEvents + partition := int32(0) + + return func(_ *message, numPartitions int32) (int32, error) { + if N == count { + count = 0 + partition = int32(generator.Intn(int(numPartitions))) + } + count++ + return partition, nil + } + }, nil +} + +func cfgRoundRobinPartitioner(config *common.Config) (func() partitioner, error) { + cfg := struct { + GroupEvents int `config:"group_events" validate:"min=1"` + }{ + GroupEvents: 1, + } + if err := config.Unpack(&cfg); err != nil { + return nil, err + } + + return func() partitioner { + N := cfg.GroupEvents + count := N + partition := rand.Int31() + + return func(_ *message, numPartitions int32) (int32, error) { + if N == count { + count = 0 + if partition++; partition >= numPartitions { + partition = 0 + } + } + count++ + return partition, nil + } + }, nil +} + +func cfgHashPartitioner(config *common.Config) (func() partitioner, error) { + cfg := struct { + Hash []string `config:"hash"` + Random bool `config:"random"` + }{ + Random: true, + } + if err := config.Unpack(&cfg); err != nil { + return nil, err + } + + if len(cfg.Hash) == 0 { + return makeHashPartitioner, nil + } + + return func() partitioner { + return makeFieldsHashPartitioner(cfg.Hash, !cfg.Random) + }, nil +} + +func makeHashPartitioner() partitioner { + generator := rand.New(rand.NewSource(rand.Int63())) + hasher := fnv.New32a() + + return func(msg *message, numPartitions int32) (int32, error) { + if msg.key == nil { + return int32(generator.Intn(int(numPartitions))), nil + } + + hash := msg.hash + if hash == 0 { + hasher.Reset() + if _, err := hasher.Write(msg.key); err != nil { + return -1, err + } + msg.hash = hasher.Sum32() + hash = msg.hash + } + + // create positive hash value + return hash2Partition(hash, numPartitions) + } +} + +func makeFieldsHashPartitioner(fields []string, dropFail bool) partitioner { + generator := rand.New(rand.NewSource(rand.Int63())) + hasher := fnv.New32a() + + return func(msg *message, numPartitions int32) (int32, error) { + hash := msg.hash + if hash == 0 { + hasher.Reset() + + var err error + for _, field := range fields { + err = hashFieldValue(hasher, msg.event, field) + if err != nil { + break + } + } + + if err != nil { + if dropFail { + logp.Err("Hashing partition key failed: %v", err) + return -1, err + } + + msg.hash = generator.Uint32() + } else { + msg.hash = hasher.Sum32() + } + hash = msg.hash + } + + return hash2Partition(hash, numPartitions) + } +} + +func hash2Partition(hash uint32, numPartitions int32) (int32, error) { + p := int32(hash) + if p < 0 { + p = -p + } + return p % numPartitions, nil +} + +func hashFieldValue(h hash.Hash32, event common.MapStr, field string) error { + type stringer interface { + String() string + } + + type hashable interface { + Hash32(h hash.Hash32) error + } + + v, err := event.GetValue(field) + if err != nil { + return err + } + + switch s := v.(type) { + case hashable: + err = s.Hash32(h) + case string: + _, err = h.Write([]byte(s)) + case []byte: + _, err = h.Write(s) + case stringer: + _, err = h.Write([]byte(s.String())) + case int8, int16, int32, int64, int, + uint8, uint16, uint32, uint64, uint: + err = binary.Write(h, binary.LittleEndian, v) + case float32: + tmp := strconv.FormatFloat(float64(s), 'g', -1, 32) + _, err = h.Write([]byte(tmp)) + case float64: + tmp := strconv.FormatFloat(s, 'g', -1, 32) + _, err = h.Write([]byte(tmp)) + default: + // try to hash using reflection: + err = binary.Write(h, binary.LittleEndian, v) + if err != nil { + err = fmt.Errorf("can not hash key '%v' of unknown type", field) + } + } + return err +} From 05fba1a4d5f52a22e49df10c1502ca80e6521948 Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 17 Aug 2016 12:33:35 +0200 Subject: [PATCH 4/6] Introduce kafka table driven testing - replace kafka integration tests with table driven integration tests - add integrations tests using batch publishing - add integration tests using different partitioner configurations --- libbeat/outputs/kafka/kafka.go | 8 + .../outputs/kafka/kafka_integration_test.go | 319 +++++++++++++----- 2 files changed, 239 insertions(+), 88 deletions(-) diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index b0f8220427d..6a76386c852 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -223,6 +223,14 @@ func (k *kafka) BulkPublish( return mode.PublishEvents(signal, opts, data) } +func (k *kafka) PublishEvents( + signal op.Signaler, + opts outputs.Options, + data []outputs.Data, +) error { + return k.BulkPublish(signal, opts, data) +} + func (k *kafka) newKafkaConfig() (*sarama.Config, error) { cfg, err := newKafkaConfig(&k.config) if err != nil { diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index abc0be7db28..b7e25c85bc2 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -3,6 +3,7 @@ package kafka import ( + "encoding/json" "fmt" "math/rand" "os" @@ -14,7 +15,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" - "github.com/elastic/beats/libbeat/outputs/outil" + "github.com/elastic/beats/libbeat/outputs/mode/modetest" "github.com/stretchr/testify/assert" ) @@ -23,7 +24,196 @@ const ( kafkaDefaultPort = "9092" ) -var testOptions = outputs.Options{} +func TestKafkaPublish(t *testing.T) { + single := modetest.SingleEvent + + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"kafka"}) + } + + id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int()) + testTopic := fmt.Sprintf("test-libbeat-%s", id) + logType := fmt.Sprintf("log-type-%s", id) + + tests := []struct { + title string + config map[string]interface{} + topic string + events []modetest.EventInfo + }{ + { + "publish single event to test topic", + nil, + testTopic, + single(common.MapStr{ + "@timestamp": common.Time(time.Now()), + "host": "test-host", + "type": "log", + "message": id, + }), + }, + { + "publish single event with topic from type", + map[string]interface{}{ + "topic": "%{[type]}", + }, + logType, + single(common.MapStr{ + "@timestamp": common.Time(time.Now()), + "host": "test-host", + "type": logType, + "message": id, + }), + }, + { + "batch publish to test topic", + nil, + testTopic, + randMulti(5, 100, common.MapStr{ + "@timestamp": common.Time(time.Now()), + "host": "test-host", + "type": "log", + }), + }, + { + "batch publish to test topic from type", + map[string]interface{}{ + "topic": "%{[type]}", + }, + logType, + randMulti(5, 100, common.MapStr{ + "@timestamp": common.Time(time.Now()), + "host": "test-host", + "type": logType, + }), + }, + { + "batch publish with random partitioner", + map[string]interface{}{ + "partition.random": map[string]interface{}{ + "group_events": 1, + }, + }, + testTopic, + randMulti(1, 10, common.MapStr{ + "@timestamp": common.Time(time.Now()), + "host": "test-host", + "type": "log", + }), + }, + { + "batch publish with round robin partitioner", + map[string]interface{}{ + "partition.round_robin": map[string]interface{}{ + "group_events": 1, + }, + }, + testTopic, + randMulti(1, 10, common.MapStr{ + "@timestamp": common.Time(time.Now()), + "host": "test-host", + "type": "log", + }), + }, + { + "batch publish with hash partitioner without key (fallback to random)", + map[string]interface{}{ + "partition.hash": map[string]interface{}{}, + }, + testTopic, + randMulti(1, 10, common.MapStr{ + "@timestamp": common.Time(time.Now()), + "host": "test-host", + "type": "log", + }), + }, + { + // warning: this test uses random keys. In case keys are reused, test might fail. + "batch publish with hash partitioner with key", + map[string]interface{}{ + "key": "%{[message]}", + "partition.hash": map[string]interface{}{}, + }, + testTopic, + randMulti(1, 10, common.MapStr{ + "@timestamp": common.Time(time.Now()), + "host": "test-host", + "type": "log", + }), + }, + { + // warning: this test uses random keys. In case keys are reused, test might fail. + "batch publish with fields hash partitioner", + map[string]interface{}{ + "partition.hash.hash": []string{ + "@timestamp", + "type", + "message", + }, + }, + testTopic, + randMulti(1, 10, common.MapStr{ + "@timestamp": common.Time(time.Now()), + "host": "test-host", + "type": "log", + }), + }, + } + + defaultConfig := map[string]interface{}{ + "hosts": []string{getTestKafkaHost()}, + "topic": testTopic, + "timeout": "1s", + } + + for i, test := range tests { + t.Logf("run test(%v): %v", i, test.title) + + cfg := makeConfig(t, defaultConfig) + if test.config != nil { + cfg.Merge(makeConfig(t, test.config)) + } + + // create output within function scope to guarantee + // output is properly closed between single tests + func() { + tmp, err := New("libbeat", cfg, 0) + if err != nil { + t.Fatal(err) + } + + output := tmp.(*kafka) + defer output.Close() + + // publish test events + _, tmpExpected := modetest.PublishAllWith(t, output, test.events) + expected := modetest.FlattenEvents(tmpExpected) + + // check we can find all event in topic + timeout := 20 * time.Second + stored := testReadFromKafkaTopic(t, test.topic, len(expected), timeout) + + // validate messages + if len(expected) != len(stored) { + assert.Equal(t, len(stored), len(expected)) + return + } + + for i, d := range expected { + var decoded map[string]interface{} + err := json.Unmarshal(stored[i].Value, &decoded) + if err != nil { + t.Errorf("can not json decode event value: %v", stored[i].Value) + return + } + event := d.Event + + assert.Equal(t, decoded["type"], event["type"]) + assert.Equal(t, decoded["message"], event["message"]) + } + }() + } +} func strDefault(a, defaults string) string { if len(a) == 0 { @@ -43,42 +233,12 @@ func getTestKafkaHost() string { ) } -func newTestKafkaClient(t *testing.T, topic string) *client { - - hosts := []string{getTestKafkaHost()} - t.Logf("host: %v", hosts) - - sel := outil.MakeSelector(outil.ConstSelectorExpr(topic)) - client, err := newKafkaClient(hosts, sel, nil) +func makeConfig(t *testing.T, in map[string]interface{}) *common.Config { + cfg, err := common.NewConfigFrom(in) if err != nil { t.Fatal(err) } - - return client -} - -func newTestKafkaOutput(t *testing.T, topic string, useType bool) outputs.Outputer { - - if useType { - topic = "%{[type]}" - } - config := map[string]interface{}{ - "hosts": []string{getTestKafkaHost()}, - "timeout": "1s", - "topic": topic, - } - - cfg, err := common.NewConfigFrom(config) - if err != nil { - t.Fatal(err) - } - - output, err := New("libbeat", cfg, 0) - if err != nil { - t.Fatal(err) - } - - return output + return cfg } func newTestConsumer(t *testing.T) sarama.Consumer { @@ -90,16 +250,24 @@ func newTestConsumer(t *testing.T) sarama.Consumer { return consumer } +var testTopicOffsets = map[string]int64{} + func testReadFromKafkaTopic( t *testing.T, topic string, nMessages int, - timeout time.Duration) []*sarama.ConsumerMessage { + timeout time.Duration, +) []*sarama.ConsumerMessage { consumer := newTestConsumer(t) defer func() { consumer.Close() }() - partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest) + offset, found := testTopicOffsets[topic] + if !found { + offset = sarama.OffsetOldest + } + + partitionConsumer, err := consumer.ConsumePartition(topic, 0, offset) if err != nil { t.Fatal(err) } @@ -113,6 +281,7 @@ func testReadFromKafkaTopic( select { case msg := <-partitionConsumer.Messages(): messages = append(messages, msg) + testTopicOffsets[topic] = msg.Offset + 1 case <-timer: break } @@ -121,62 +290,36 @@ func testReadFromKafkaTopic( return messages } -func TestOneMessageToKafka(t *testing.T) { - if testing.Short() { - t.Skip("Skipping in short mode. Requires Kafka") - } - if testing.Verbose() { - logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"kafka"}) - } - - id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int()) - topic := fmt.Sprintf("test-libbeat-%s", id) - - kafka := newTestKafkaOutput(t, topic, false) - event := outputs.Data{Event: common.MapStr{ - "@timestamp": common.Time(time.Now()), - "host": "test-host", - "type": "log", - "message": id, - }} - if err := kafka.PublishEvent(nil, testOptions, event); err != nil { - t.Fatal(err) - } +func randMulti(batches, n int, event common.MapStr) []modetest.EventInfo { + var out []modetest.EventInfo + for i := 0; i < batches; i++ { + var data []outputs.Data + for j := 0; j < n; j++ { + tmp := common.MapStr{} + for k, v := range event { + tmp[k] = v + } + tmp["message"] = randString(100) + data = append(data, outputs.Data{Event: tmp}) + } - messages := testReadFromKafkaTopic(t, topic, 1, 5*time.Second) - if assert.Len(t, messages, 1) { - msg := messages[0] - logp.Debug("kafka", "%s: %s", msg.Key, msg.Value) - assert.Contains(t, string(msg.Value), id) + out = append(out, modetest.EventInfo{Single: false, Data: data}) } + return out } -func TestUseType(t *testing.T) { - if testing.Short() { - t.Skip("Skipping in short mode. Requires Kafka") - } - if testing.Verbose() { - logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"kafka"}) - } - - id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int()) - logType := fmt.Sprintf("log-type-%s", id) - - kafka := newTestKafkaOutput(t, "", true) - event := outputs.Data{Event: common.MapStr{ - "@timestamp": common.Time(time.Now()), - "host": "test-host", - "type": logType, - "message": id, - }} - if err := kafka.PublishEvent(nil, testOptions, event); err != nil { - t.Fatal(err) +func randString(length int) string { + b := make([]byte, length) + for i := range b { + b[i] = randChar() } + return string(b) +} - messages := testReadFromKafkaTopic(t, logType, 1, 5*time.Second) - if assert.Len(t, messages, 1) { - msg := messages[0] - logp.Debug("kafka", "%s: %s", msg.Key, msg.Value) - assert.Contains(t, string(msg.Value), id) +func randChar() byte { + start, end := 'a', 'z' + if rand.Int31n(2) == 1 { + start, end = 'A', 'Z' } + return byte(rand.Int31n(end-start+1) + start) } From fb8463ed811c9c349eecf0e289eecad39e7124e6 Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 18 Aug 2016 16:53:01 +0200 Subject: [PATCH 5/6] kafka partitioner unit tests --- libbeat/outputs/kafka/common_test.go | 25 ++ .../outputs/kafka/kafka_integration_test.go | 16 - libbeat/outputs/kafka/partition_test.go | 303 ++++++++++++++++++ 3 files changed, 328 insertions(+), 16 deletions(-) create mode 100644 libbeat/outputs/kafka/common_test.go create mode 100644 libbeat/outputs/kafka/partition_test.go diff --git a/libbeat/outputs/kafka/common_test.go b/libbeat/outputs/kafka/common_test.go new file mode 100644 index 00000000000..3eb512a9176 --- /dev/null +++ b/libbeat/outputs/kafka/common_test.go @@ -0,0 +1,25 @@ +package kafka + +import "math/rand" + +// common helpers used by unit+integration tests + +func randString(length int) string { + return string(randASCIIBytes(length)) +} + +func randASCIIBytes(length int) []byte { + b := make([]byte, length) + for i := range b { + b[i] = randChar() + } + return b +} + +func randChar() byte { + start, end := 'a', 'z' + if rand.Int31n(2) == 1 { + start, end = 'A', 'Z' + } + return byte(rand.Int31n(end-start+1) + start) +} diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index b7e25c85bc2..9c390af9646 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -307,19 +307,3 @@ func randMulti(batches, n int, event common.MapStr) []modetest.EventInfo { } return out } - -func randString(length int) string { - b := make([]byte, length) - for i := range b { - b[i] = randChar() - } - return string(b) -} - -func randChar() byte { - start, end := 'a', 'z' - if rand.Int31n(2) == 1 { - start, end = 'A', 'Z' - } - return byte(rand.Int31n(end-start+1) + start) -} diff --git a/libbeat/outputs/kafka/partition_test.go b/libbeat/outputs/kafka/partition_test.go new file mode 100644 index 00000000000..361adab9861 --- /dev/null +++ b/libbeat/outputs/kafka/partition_test.go @@ -0,0 +1,303 @@ +// +build !integration + +package kafka + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/elastic/beats/libbeat/common" + "github.com/stretchr/testify/assert" +) + +type partTestScenario func(*testing.T, bool, sarama.Partitioner) error + +func TestPartitioners(t *testing.T) { + type obj map[string]interface{} + type arr []interface{} + + nonHashScenarios := []partTestScenario{ + partTestSimple(100, false), + } + + hashScenarios := []partTestScenario{ + partTestSimple(100, true), + partTestHashInvariant(1), + } + + tests := []struct { + title string + reachableOnly bool + scenarios []partTestScenario + config obj + }{ + { + "random every event, non-consistent ", + true, + nonHashScenarios, + obj{"partition.random": obj{ + "reachable_only": true, + "group_events": 1, + }}, + }, + { + "random every event, consistent", + false, + nonHashScenarios, + obj{"partition.random": obj{ + "reachable_only": false, + "group_events": 1, + }}, + }, + { + "random every 3rd event, non-consistent", + true, + nonHashScenarios, + obj{"partition.random": obj{ + "reachable_only": true, + "group_events": 3, + }}, + }, + { + "random every 3rd event, consistent", + false, + nonHashScenarios, + obj{"partition.random": obj{ + "reachable_only": false, + "group_events": 3, + }}, + }, + { + "round-robin every event, non-consistent", + true, + nonHashScenarios, + obj{"partition.round_robin": obj{ + "reachable_only": true, + "group_events": 1, + }}, + }, + { + "round-robin every event, consistent", + false, + nonHashScenarios, + obj{"partition.round_robin": obj{ + "reachable_only": false, + "group_events": 1, + }}, + }, + { + "round-robin every 3rd event, non-consistent", + true, + nonHashScenarios, + obj{"partition.round_robin": obj{ + "reachable_only": true, + "group_events": 3, + }}, + }, + { + "round-robin every 3rd event, consistent", + false, + nonHashScenarios, + obj{"partition.round_robin": obj{ + "reachable_only": false, + "group_events": 3, + }}, + }, + { + "hash without key, fallback random, non-consistent", + true, + nonHashScenarios, + obj{"partition.hash": obj{ + "reachable_only": true, + }}, + }, + { + "hash without key, fallback random, consistent", + false, + nonHashScenarios, + obj{"partition.hash": obj{ + "reachable_only": false, + }}, + }, + { + "hash with key, consistent", + true, + hashScenarios, + obj{"partition.hash": obj{ + "reachable_only": true, + }}, + }, + { + "hash with key, non-consistent", + false, + hashScenarios, + obj{"partition.hash": obj{ + "reachable_only": false, + }}, + }, + { + "hash message field, non-consistent", + true, + hashScenarios, + obj{"partition.hash": obj{ + "reachable_only": true, + "hash": arr{"message"}, + }}, + }, + { + "hash message field, consistent", + false, + hashScenarios, + obj{"partition.hash": obj{ + "reachable_only": false, + "hash": arr{"message"}, + }}, + }, + } + + for i, test := range tests { + t.Logf("run test(%v): %v", i, test.title) + + cfg, err := common.NewConfigFrom(test.config) + if err != nil { + t.Error(err) + continue + } + + pcfg := struct { + Partition map[string]*common.Config `config:"partition"` + }{} + err = cfg.Unpack(&pcfg) + if err != nil { + t.Error(err) + continue + } + + constr, err := makePartitioner(pcfg.Partition) + if err != nil { + t.Error(err) + continue + } + + for _, runner := range test.scenarios { + partitioner := constr("test") + err := runner(t, test.reachableOnly, partitioner) + if err != nil { + t.Error(err) + break + } + } + } +} + +func partTestSimple(N int, makeKey bool) partTestScenario { + numPartitions := int32(15) + + return func(t *testing.T, reachableOnly bool, part sarama.Partitioner) error { + t.Logf(" simple test with %v partitions", numPartitions) + + partitions := make([]int, numPartitions) + + requiresConsistency := !reachableOnly + assert.Equal(t, requiresConsistency, part.RequiresConsistency()) + + for i := 0; i <= N; i++ { + ts := time.Now() + + event := common.MapStr{ + "@timestamp": common.Time(ts), + "type": "test", + "message": randString(20), + } + + jsonEvent, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("json encoding failed with %v", err) + } + + msg := &message{partition: -1} + msg.event = event + msg.topic = "test" + if makeKey { + msg.key = randASCIIBytes(10) + } + msg.value = jsonEvent + msg.ts = ts + msg.initProducerMessage() + + p, err := part.Partition(&msg.msg, numPartitions) + if err != nil { + return err + } + + assert.True(t, 0 <= p && p < numPartitions) + partitions[p]++ + } + + // count number of partitions being used + nPartitions := 0 + for _, p := range partitions { + if p > 0 { + nPartitions++ + } + } + t.Logf(" partitions used: %v/%v", nPartitions, numPartitions) + assert.True(t, nPartitions > 3) + + return nil + } +} + +func partTestHashInvariant(N int) partTestScenario { + numPartitions := int32(15) + + return func(t *testing.T, reachableOnly bool, part sarama.Partitioner) error { + t.Logf(" hash invariant test with %v partitions", numPartitions) + + for i := 0; i <= N; i++ { + ts := time.Now() + + event := common.MapStr{ + "@timestamp": common.Time(ts), + "type": "test", + "message": randString(20), + } + + jsonEvent, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("json encoding failed with %v", err) + } + + msg := &message{partition: -1} + msg.event = event + msg.topic = "test" + msg.key = randASCIIBytes(10) + msg.value = jsonEvent + msg.ts = ts + msg.initProducerMessage() + + p1, err := part.Partition(&msg.msg, numPartitions) + if err != nil { + return err + } + + // reset message state + msg.hash = 0 + msg.partition = -1 + + p2, err := part.Partition(&msg.msg, numPartitions) + if err != nil { + return err + } + + assert.True(t, 0 <= p1 && p1 < numPartitions) + assert.True(t, 0 <= p2 && p2 < numPartitions) + assert.Equal(t, p1, p2) + } + + return nil + } +} From 3597027d9c68d2c8904abca8bc0fdfb3984fe01d Mon Sep 17 00:00:00 2001 From: urso Date: Mon, 22 Aug 2016 16:40:14 +0200 Subject: [PATCH 6/6] Add kafka partitioner settings to config files --- filebeat/filebeat.full.yml | 17 +++++++++++++++++ libbeat/_meta/config.full.yml | 17 +++++++++++++++++ metricbeat/metricbeat.full.yml | 17 +++++++++++++++++ packetbeat/packetbeat.full.yml | 17 +++++++++++++++++ winlogbeat/winlogbeat.full.yml | 17 +++++++++++++++++ 5 files changed, 85 insertions(+) diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index 406e728d464..a7282cd4452 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -481,6 +481,23 @@ output.elasticsearch: # using any event field. To set the topic from document type use `%{[type]}`. #topic: beats + # The Kafka event key setting. Use format string to create unique event key. + # By default no event key will be generated. + #key: '' + + # The Kafka event partitioning strategy. Default hashing strategy is `hash` + # using the `output.kafka.key` setting or randomly distributes events if + # `output.kafka.key` is not configured. + #partition.hash: + # If enabled, events will only be published to partitions with reachable + # leaders. Default is false. + #reachable_only: false + + # Configure alternative event field names used to compute the hash value. + # If empty `output.kafka.key` setting will be used. + # Default value is empty list. + #hash: [] + # Authentication details. Password is required if username is set. #username: '' #password: '' diff --git a/libbeat/_meta/config.full.yml b/libbeat/_meta/config.full.yml index 27d52d0a9e3..affeea03d90 100644 --- a/libbeat/_meta/config.full.yml +++ b/libbeat/_meta/config.full.yml @@ -261,6 +261,23 @@ output.elasticsearch: # using any event field. To set the topic from document type use `%{[type]}`. #topic: beats + # The Kafka event key setting. Use format string to create unique event key. + # By default no event key will be generated. + #key: '' + + # The Kafka event partitioning strategy. Default hashing strategy is `hash` + # using the `output.kafka.key` setting or randomly distributes events if + # `output.kafka.key` is not configured. + #partition.hash: + # If enabled, events will only be published to partitions with reachable + # leaders. Default is false. + #reachable_only: false + + # Configure alternative event field names used to compute the hash value. + # If empty `output.kafka.key` setting will be used. + # Default value is empty list. + #hash: [] + # Authentication details. Password is required if username is set. #username: '' #password: '' diff --git a/metricbeat/metricbeat.full.yml b/metricbeat/metricbeat.full.yml index 688c6798bc1..2cd26319a67 100644 --- a/metricbeat/metricbeat.full.yml +++ b/metricbeat/metricbeat.full.yml @@ -445,6 +445,23 @@ output.elasticsearch: # using any event field. To set the topic from document type use `%{[type]}`. #topic: beats + # The Kafka event key setting. Use format string to create unique event key. + # By default no event key will be generated. + #key: '' + + # The Kafka event partitioning strategy. Default hashing strategy is `hash` + # using the `output.kafka.key` setting or randomly distributes events if + # `output.kafka.key` is not configured. + #partition.hash: + # If enabled, events will only be published to partitions with reachable + # leaders. Default is false. + #reachable_only: false + + # Configure alternative event field names used to compute the hash value. + # If empty `output.kafka.key` setting will be used. + # Default value is empty list. + #hash: [] + # Authentication details. Password is required if username is set. #username: '' #password: '' diff --git a/packetbeat/packetbeat.full.yml b/packetbeat/packetbeat.full.yml index f75cfad2af9..5565eaa2352 100644 --- a/packetbeat/packetbeat.full.yml +++ b/packetbeat/packetbeat.full.yml @@ -707,6 +707,23 @@ output.elasticsearch: # using any event field. To set the topic from document type use `%{[type]}`. #topic: beats + # The Kafka event key setting. Use format string to create unique event key. + # By default no event key will be generated. + #key: '' + + # The Kafka event partitioning strategy. Default hashing strategy is `hash` + # using the `output.kafka.key` setting or randomly distributes events if + # `output.kafka.key` is not configured. + #partition.hash: + # If enabled, events will only be published to partitions with reachable + # leaders. Default is false. + #reachable_only: false + + # Configure alternative event field names used to compute the hash value. + # If empty `output.kafka.key` setting will be used. + # Default value is empty list. + #hash: [] + # Authentication details. Password is required if username is set. #username: '' #password: '' diff --git a/winlogbeat/winlogbeat.full.yml b/winlogbeat/winlogbeat.full.yml index bbb5a621264..4ebd70aadf7 100644 --- a/winlogbeat/winlogbeat.full.yml +++ b/winlogbeat/winlogbeat.full.yml @@ -296,6 +296,23 @@ output.elasticsearch: # using any event field. To set the topic from document type use `%{[type]}`. #topic: beats + # The Kafka event key setting. Use format string to create unique event key. + # By default no event key will be generated. + #key: '' + + # The Kafka event partitioning strategy. Default hashing strategy is `hash` + # using the `output.kafka.key` setting or randomly distributes events if + # `output.kafka.key` is not configured. + #partition.hash: + # If enabled, events will only be published to partitions with reachable + # leaders. Default is false. + #reachable_only: false + + # Configure alternative event field names used to compute the hash value. + # If empty `output.kafka.key` setting will be used. + # Default value is empty list. + #hash: [] + # Authentication details. Password is required if username is set. #username: '' #password: ''