Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ehnance redis key selection #2169

Merged
merged 1 commit into from
Aug 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d

*Affecting all Beats*
- Add script to generate the Kibana index-pattern from fields.yml. {pull}2122[2122]
- Enhance redis output key selection based on format string. {pull}2169[2169]
- Configurable redis `keys` using filters and format strings. {pull}2169[2169]

*Metricbeat*

Expand Down
9 changes: 9 additions & 0 deletions libbeat/outputs/outil/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ func (s Selector) IsEmpty() bool {
return s.sel == nilSelector || s.sel == nil
}

func (s Selector) IsConst() bool {
if s.sel == nilSelector {
return true
}

_, ok := s.sel.(*constSelector)
return ok
}

func BuildSelectorFromConfig(
cfg *common.Config,
settings Settings,
Expand Down
70 changes: 45 additions & 25 deletions libbeat/outputs/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,24 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/outputs/transport"
)

var (
versionRegex = regexp.MustCompile(`redis_version:(\d+).(\d+)`)
)

type publishFn func(dest []byte, events []common.MapStr) ([]common.MapStr, error)
type publishFn func(
keys outil.Selector,
events []common.MapStr,
) ([]common.MapStr, error)

type client struct {
*transport.Client
dataType redisDataType
db int
list []byte
key outil.Selector
password string
publish publishFn
}
Expand All @@ -36,13 +40,13 @@ const (
redisChannelType
)

func newClient(tc *transport.Client, pass string, db int, dest []byte, dt redisDataType) *client {
func newClient(tc *transport.Client, pass string, db int, key outil.Selector, dt redisDataType) *client {
return &client{
Client: tc,
password: pass,
db: db,
dataType: dt,
list: dest,
key: key,
}
}

Expand All @@ -61,7 +65,7 @@ func (c *client) Connect(to time.Duration) error {
}()

if err = initRedisConn(conn, c.password, c.db); err == nil {
c.publish, err = makePublish(conn, c.dataType)
c.publish, err = makePublish(conn, c.key, c.dataType)
}
return err
}
Expand Down Expand Up @@ -97,17 +101,26 @@ func (c *client) PublishEvent(event common.MapStr) error {
}

func (c *client) PublishEvents(events []common.MapStr) ([]common.MapStr, error) {
return c.publish(c.list, events)
return c.publish(c.key, events)
}

func makePublish(conn redis.Conn, dt redisDataType) (publishFn, error) {
func makePublish(
conn redis.Conn,
key outil.Selector,
dt redisDataType,
) (publishFn, error) {
if dt == redisChannelType {
return makePublishPUBLISH(conn)
}
return makePublishRPUSH(conn)
return makePublishRPUSH(conn, key)
}

func makePublishRPUSH(conn redis.Conn) (publishFn, error) {
func makePublishRPUSH(conn redis.Conn, key outil.Selector) (publishFn, error) {
if !key.IsConst() {
// TODO: more clever bulk handling batching events with same key
return publishEventsPipeline(conn, "RPUSH"), nil
}

var major, minor int
var versionRaw [][]byte

Expand Down Expand Up @@ -144,7 +157,7 @@ func makePublishRPUSH(conn redis.Conn) (publishFn, error) {
// See: http://redis.io/commands/rpush
multiValue := major > 2 || (major == 2 && minor >= 4)
if multiValue {
return publishEventsBulk(conn, "RPUSH"), nil
return publishEventsBulk(conn, key, "RPUSH"), nil
}
return publishEventsPipeline(conn, "RPUSH"), nil
}
Expand All @@ -153,8 +166,10 @@ func makePublishPUBLISH(conn redis.Conn) (publishFn, error) {
return publishEventsPipeline(conn, "PUBLISH"), nil
}

func publishEventsBulk(conn redis.Conn, command string) publishFn {
return func(dest []byte, events []common.MapStr) ([]common.MapStr, error) {
func publishEventsBulk(conn redis.Conn, key outil.Selector, command string) publishFn {
// XXX: requires key.IsConst() == true
dest, _ := key.Select(common.MapStr{})
return func(_ outil.Selector, events []common.MapStr) ([]common.MapStr, error) {
args := make([]interface{}, 1, len(events)+1)
args[0] = dest

Expand All @@ -175,21 +190,26 @@ func publishEventsBulk(conn redis.Conn, command string) publishFn {
}

func publishEventsPipeline(conn redis.Conn, command string) publishFn {
return func(dest []byte, events []common.MapStr) ([]common.MapStr, error) {
var args [2]interface{}
args[0] = dest

return func(key outil.Selector, events []common.MapStr) ([]common.MapStr, error) {
var okEvents []common.MapStr
serialized := make([]interface{}, 0, len(events))
events, serialized = serializeEvents(serialized, 0, events)
okEvents, serialized = serializeEvents(serialized, 0, events)
if len(serialized) == 0 {
return nil, nil
}

for _, event := range serialized {
args[1] = event
if err := conn.Send(command, args[:]...); err != nil {
events = okEvents[:0]
for i, serializedEvent := range serialized {
eventKey, err := key.Select(okEvents[i])
if err != nil {
logp.Err("Failed to set redis key: %v", err)
continue
}

events = append(events, okEvents[i])
if err := conn.Send(command, eventKey, serializedEvent); err != nil {
logp.Err("Failed to execute %v: %v", command, err)
return events, err
return okEvents, err
}
}

Expand All @@ -203,13 +223,13 @@ func publishEventsPipeline(conn redis.Conn, command string) publishFn {
_, err := conn.Receive()
if err != nil {
if _, ok := err.(redis.Error); ok {
logp.Err("Failed to %v event to list (%v) with %v",
command, dest, err)
logp.Err("Failed to %v event to list with %v",
command, err)
failed = append(failed, events[i])
lastErr = err
} else {
logp.Err("Failed to %v multiple events to list (%v) with %v",
command, dest, err)
logp.Err("Failed to %v multiple events to list with %v",
command, err)
failed = append(failed, events[i:]...)
lastErr = err
break
Expand Down
25 changes: 22 additions & 3 deletions libbeat/outputs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/mode"
"github.com/elastic/beats/libbeat/outputs/mode/modeutil"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/outputs/transport"
)

Expand Down Expand Up @@ -69,9 +70,27 @@ func (r *redisOut) init(cfg *common.Config, expireTopo int) error {
return errors.New("Bad Redis data type")
}

key := []byte(config.Key)
if len(key) == 0 {
key = []byte(r.beatName)
if cfg.HasField("index") && !cfg.HasField("key") {
s, err := cfg.String("index", -1)
if err != nil {
return err
}
if err := cfg.SetString("key", -1, s); err != nil {
return err
}
}
if !cfg.HasField("key") {
cfg.SetString("key", -1, r.beatName)
}

key, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "key",
MultiKey: "keys",
EnableSingleOnly: true,
FailEmpty: true,
})
if err != nil {
return err
}

tls, err := outputs.LoadTLSConfig(config.TLS)
Expand Down