diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index cad55bc7a34..c26a69b8e89 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -46,6 +46,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d *Affecting all Beats* - Add script to generate the Kibana index-pattern from fields.yml. {pull}2122[2122] +- Enhance redis output key selection based on format string. {pull}2169[2169] +- Configurable redis `keys` using filters and format strings. {pull}2169[2169] *Metricbeat* diff --git a/libbeat/outputs/outil/select.go b/libbeat/outputs/outil/select.go index 12d0dd361bc..a4d7bf102bd 100644 --- a/libbeat/outputs/outil/select.go +++ b/libbeat/outputs/outil/select.go @@ -81,6 +81,15 @@ func (s Selector) IsEmpty() bool { return s.sel == nilSelector || s.sel == nil } +func (s Selector) IsConst() bool { + if s.sel == nilSelector { + return true + } + + _, ok := s.sel.(*constSelector) + return ok +} + func BuildSelectorFromConfig( cfg *common.Config, settings Settings, diff --git a/libbeat/outputs/redis/client.go b/libbeat/outputs/redis/client.go index a9290cadc8f..f5204c3630d 100644 --- a/libbeat/outputs/redis/client.go +++ b/libbeat/outputs/redis/client.go @@ -11,6 +11,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs/outil" "github.com/elastic/beats/libbeat/outputs/transport" ) @@ -18,13 +19,16 @@ var ( versionRegex = regexp.MustCompile(`redis_version:(\d+).(\d+)`) ) -type publishFn func(dest []byte, events []common.MapStr) ([]common.MapStr, error) +type publishFn func( + keys outil.Selector, + events []common.MapStr, +) ([]common.MapStr, error) type client struct { *transport.Client dataType redisDataType db int - list []byte + key outil.Selector password string publish publishFn } @@ -36,13 +40,13 @@ const ( redisChannelType ) -func newClient(tc *transport.Client, pass string, db int, dest []byte, dt redisDataType) *client { +func newClient(tc *transport.Client, pass string, db int, key outil.Selector, dt redisDataType) *client { return &client{ Client: tc, password: pass, db: db, dataType: dt, - list: dest, + key: key, } } @@ -61,7 +65,7 @@ func (c *client) Connect(to time.Duration) error { }() if err = initRedisConn(conn, c.password, c.db); err == nil { - c.publish, err = makePublish(conn, c.dataType) + c.publish, err = makePublish(conn, c.key, c.dataType) } return err } @@ -97,17 +101,26 @@ func (c *client) PublishEvent(event common.MapStr) error { } func (c *client) PublishEvents(events []common.MapStr) ([]common.MapStr, error) { - return c.publish(c.list, events) + return c.publish(c.key, events) } -func makePublish(conn redis.Conn, dt redisDataType) (publishFn, error) { +func makePublish( + conn redis.Conn, + key outil.Selector, + dt redisDataType, +) (publishFn, error) { if dt == redisChannelType { return makePublishPUBLISH(conn) } - return makePublishRPUSH(conn) + return makePublishRPUSH(conn, key) } -func makePublishRPUSH(conn redis.Conn) (publishFn, error) { +func makePublishRPUSH(conn redis.Conn, key outil.Selector) (publishFn, error) { + if !key.IsConst() { + // TODO: more clever bulk handling batching events with same key + return publishEventsPipeline(conn, "RPUSH"), nil + } + var major, minor int var versionRaw [][]byte @@ -144,7 +157,7 @@ func makePublishRPUSH(conn redis.Conn) (publishFn, error) { // See: http://redis.io/commands/rpush multiValue := major > 2 || (major == 2 && minor >= 4) if multiValue { - return publishEventsBulk(conn, "RPUSH"), nil + return publishEventsBulk(conn, key, "RPUSH"), nil } return publishEventsPipeline(conn, "RPUSH"), nil } @@ -153,8 +166,10 @@ func makePublishPUBLISH(conn redis.Conn) (publishFn, error) { return publishEventsPipeline(conn, "PUBLISH"), nil } -func publishEventsBulk(conn redis.Conn, command string) publishFn { - return func(dest []byte, events []common.MapStr) ([]common.MapStr, error) { +func publishEventsBulk(conn redis.Conn, key outil.Selector, command string) publishFn { + // XXX: requires key.IsConst() == true + dest, _ := key.Select(common.MapStr{}) + return func(_ outil.Selector, events []common.MapStr) ([]common.MapStr, error) { args := make([]interface{}, 1, len(events)+1) args[0] = dest @@ -175,21 +190,26 @@ func publishEventsBulk(conn redis.Conn, command string) publishFn { } func publishEventsPipeline(conn redis.Conn, command string) publishFn { - return func(dest []byte, events []common.MapStr) ([]common.MapStr, error) { - var args [2]interface{} - args[0] = dest - + return func(key outil.Selector, events []common.MapStr) ([]common.MapStr, error) { + var okEvents []common.MapStr serialized := make([]interface{}, 0, len(events)) - events, serialized = serializeEvents(serialized, 0, events) + okEvents, serialized = serializeEvents(serialized, 0, events) if len(serialized) == 0 { return nil, nil } - for _, event := range serialized { - args[1] = event - if err := conn.Send(command, args[:]...); err != nil { + events = okEvents[:0] + for i, serializedEvent := range serialized { + eventKey, err := key.Select(okEvents[i]) + if err != nil { + logp.Err("Failed to set redis key: %v", err) + continue + } + + events = append(events, okEvents[i]) + if err := conn.Send(command, eventKey, serializedEvent); err != nil { logp.Err("Failed to execute %v: %v", command, err) - return events, err + return okEvents, err } } @@ -203,13 +223,13 @@ func publishEventsPipeline(conn redis.Conn, command string) publishFn { _, err := conn.Receive() if err != nil { if _, ok := err.(redis.Error); ok { - logp.Err("Failed to %v event to list (%v) with %v", - command, dest, err) + logp.Err("Failed to %v event to list with %v", + command, err) failed = append(failed, events[i]) lastErr = err } else { - logp.Err("Failed to %v multiple events to list (%v) with %v", - command, dest, err) + logp.Err("Failed to %v multiple events to list with %v", + command, err) failed = append(failed, events[i:]...) lastErr = err break diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index 2c716416a1d..040532912ee 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -11,6 +11,7 @@ import ( "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/mode" "github.com/elastic/beats/libbeat/outputs/mode/modeutil" + "github.com/elastic/beats/libbeat/outputs/outil" "github.com/elastic/beats/libbeat/outputs/transport" ) @@ -69,9 +70,27 @@ func (r *redisOut) init(cfg *common.Config, expireTopo int) error { return errors.New("Bad Redis data type") } - key := []byte(config.Key) - if len(key) == 0 { - key = []byte(r.beatName) + if cfg.HasField("index") && !cfg.HasField("key") { + s, err := cfg.String("index", -1) + if err != nil { + return err + } + if err := cfg.SetString("key", -1, s); err != nil { + return err + } + } + if !cfg.HasField("key") { + cfg.SetString("key", -1, r.beatName) + } + + key, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{ + Key: "key", + MultiKey: "keys", + EnableSingleOnly: true, + FailEmpty: true, + }) + if err != nil { + return err } tls, err := outputs.LoadTLSConfig(config.TLS)