Skip to content

Commit

Permalink
Renamed redis.index option to redis.key
Browse files Browse the repository at this point in the history
The "index" setting name has historical reasons, from the times that
all outputs shared the same configuration options. Since the meaning and
contents of the "index" setting are starting to vary per output, it's time
to clean this up. The Redis "index" is now called "key".

Before this change, the index was still somehow special, since it was set to
the beatName outside of the output. This removes this hack, and sets index to
beatName only in the Elasticsearch and Logstash modules.

This also removes the `index` setting for the file output, but that was never
documented.

Part of #2074.
  • Loading branch information
Tudor Golubenco committed Jul 21, 2016
1 parent 2e761cd commit 245dd0a
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 40 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha4...master[Check the HEAD d
==== Breaking changes

*Affecting all Beats*

- Rename the `filters` section to `processors`. {pull}1944[1944]
- Introduce the condition with `when` in the processor configuration. {pull}1949[1949]
- The Elasticsearch template is now loaded by default. {pull}1993[1993]
- The Redis output `index` setting is renamed to `key`. `index` still works but it's deprecated. {pull}2077[2077]
- The undocumented file output `index` setting was removed. Use `filename` instead. {pull}2077[2077]

*Metricbeat*

Expand Down
11 changes: 9 additions & 2 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -631,8 +631,8 @@ output.redis:
# password is set.
password: "my_password"
# Optional index name. The default is {beatname_lc} and generates {beatname_lc} keys.
index: "{beatname_lc}"
# Optional key name. The default is {beatname_lc}
key: "{beatname_lc}"
# Optional Redis database number where the events are stored
# The default is 0.
Expand Down Expand Up @@ -668,6 +668,13 @@ The Redis port to use if `hosts` does not contain a port number. The default is

===== index

deprecated[5.0.0-alpha5,The `index` setting is renamed to `key.]

The name of the Redis list or channel the events are published to. The default is
"{beatname_lc}".

===== key

The name of the Redis list or channel the events are published to. The default is
"{beatname_lc}".

Expand Down
4 changes: 4 additions & 0 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func New(beatName string, cfg *common.Config, topologyExpire int) (outputs.Outpu
cfg.SetInt("bulk_max_size", -1, defaultBulkSize)
}

if !cfg.HasField("index") {
cfg.SetString("index", -1, beatName)
}

output := &elasticsearchOutput{beatName: beatName}
err := output.init(cfg, topologyExpire)
if err != nil {
Expand Down
5 changes: 0 additions & 5 deletions libbeat/outputs/fileout/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
)

type config struct {
Index string `config:"index"`
Path string `config:"path"`
Filename string `config:"filename"`
RotateEveryKb int `config:"rotate_every_kb" validate:"min=1"`
Expand All @@ -22,10 +21,6 @@ var (
)

func (c *config) Validate() error {
if c.Filename == "" && c.Index == "" {
return fmt.Errorf("File logging requires filename or index being set.")
}

if c.NumberOfFiles < 2 || c.NumberOfFiles > logp.RotatorMaxFiles {
return fmt.Errorf("The number_of_files to keep should be between 2 and %v",
logp.RotatorMaxFiles)
Expand Down
5 changes: 5 additions & 0 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ func init() {
}

func new(beatName string, cfg *common.Config, _ int) (outputs.Outputer, error) {

if !cfg.HasField("index") {
cfg.SetString("index", -1, beatName)
}

output := &logstash{}
if err := output.init(cfg); err != nil {
return nil, err
Expand Down
4 changes: 0 additions & 4 deletions libbeat/outputs/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ func InitOutputs(
continue
}

if !config.HasField("index") {
config.SetString("index", -1, beatName)
}

output, err := plugin(beatName, config, topologyExpire)
if err != nil {
logp.Err("failed to initialize %s plugin as output: %s", name, err)
Expand Down
12 changes: 10 additions & 2 deletions libbeat/outputs/redis/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"fmt"
"time"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/transport"
)

type redisConfig struct {
Password string `config:"password"`
Index string `config:"index"`
Key string `config:"key"`
Port int `config:"port"`
LoadBalance bool `config:"loadbalance"`
Timeout time.Duration `config:"timeout"`
Expand Down Expand Up @@ -49,8 +51,14 @@ func (c *redisConfig) Validate() error {
return fmt.Errorf("redis data type %v not supported", c.DataType)
}

if c.Index == "" {
return errors.New("index required")
if c.Key != "" && c.Index != "" {
return errors.New("Cannot use both `output.redis.key` and `output.redis.index` configuration options." +
" Set only `output.redis.key`")
}

if c.Key == "" && c.Index != "" {
c.Key = c.Index
logp.Warn("The `output.redis.index` configuration setting is deprecated. Use `output.redis.key` instead.")
}

return nil
Expand Down
30 changes: 30 additions & 0 deletions libbeat/outputs/redis/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package redis

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestValidate(t *testing.T) {
type io struct {
Name string
Input redisConfig
Valid bool
}

tests := []io{
io{"No config", redisConfig{Key: "", Index: ""}, true},
io{"Only key", redisConfig{Key: "test", Index: ""}, true},
io{"Only index", redisConfig{Key: "", Index: "test"}, true},
io{"Both", redisConfig{Key: "test", Index: "test"}, false},

io{"Invalid Datatype", redisConfig{Key: "test", DataType: "something"}, false},
io{"List Datatype", redisConfig{Key: "test", DataType: "list"}, true},
io{"Channel Datatype", redisConfig{Key: "test", DataType: "channel"}, true},
}

for _, test := range tests {
assert.Equal(t, test.Input.Validate() == nil, test.Valid, test.Name)
}
}
12 changes: 6 additions & 6 deletions libbeat/outputs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package redis
import (
"errors"
"expvar"
"fmt"
"time"

"github.com/elastic/beats/libbeat/common"
Expand All @@ -18,6 +17,7 @@ import (
type redisOut struct {
mode mode.ConnectionMode
topology
beatName string
}

var debugf = logp.MakeDebug("redis")
Expand All @@ -40,7 +40,7 @@ func init() {
}

func new(beatName string, cfg *common.Config, expireTopo int) (outputs.Outputer, error) {
r := &redisOut{}
r := &redisOut{beatName: beatName}
if err := r.init(cfg, expireTopo); err != nil {
return nil, err
}
Expand Down Expand Up @@ -69,9 +69,9 @@ func (r *redisOut) init(cfg *common.Config, expireTopo int) error {
return errors.New("Bad Redis data type")
}

index := []byte(config.Index)
if len(index) == 0 {
return fmt.Errorf("missing %v", cfg.PathOf("index"))
key := []byte(config.Key)
if len(key) == 0 {
key = []byte(r.beatName)
}

tls, err := outputs.LoadTLSConfig(config.TLS)
Expand Down Expand Up @@ -105,7 +105,7 @@ func (r *redisOut) init(cfg *common.Config, expireTopo int) error {
if err != nil {
return nil, err
}
return newClient(t, config.Password, config.Db, index, dataType), nil
return newClient(t, config.Password, config.Db, key, dataType), nil
})
if err != nil {
return err
Expand Down
42 changes: 21 additions & 21 deletions libbeat/outputs/redis/redis_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ const (

func TestTopologyInRedisTCP(t *testing.T) {
db := 1
index := "test_topo_tcp"
key := "test_topo_tcp"
redisHosts := []string{getRedisAddr()}
redisConfig := map[string]interface{}{
"hosts": redisHosts,
"index": index,
"key": key,
"host_topology": redisHosts[0],
"db_topology": db,
"timeout": "5s",
Expand All @@ -40,11 +40,11 @@ func TestTopologyInRedisTCP(t *testing.T) {

func TestTopologyInRedisTLS(t *testing.T) {
db := 1
index := "test_topo_tls"
key := "test_topo_tls"
redisHosts := []string{getSRedisAddr()}
redisConfig := map[string]interface{}{
"hosts": redisHosts,
"index": index,
"key": key,
"host_topology": redisHosts[0],
"db_topology": db,
"timeout": "5s",
Expand All @@ -70,7 +70,7 @@ func testTopologyInRedis(t *testing.T, cfg map[string]interface{}) {
}

db := 0
index := cfg["index"].(string)
key := cfg["key"].(string)
if v, ok := cfg["db_topology"]; ok {
db = v.(int)
}
Expand All @@ -83,7 +83,7 @@ func testTopologyInRedis(t *testing.T, cfg map[string]interface{}) {
}
// delete old key if present
defer conn.Close()
conn.Do("DEL", index)
conn.Do("DEL", key)
}

// 1. connect
Expand Down Expand Up @@ -116,11 +116,11 @@ func testTopologyInRedis(t *testing.T, cfg map[string]interface{}) {
}

func TestPublishListTCP(t *testing.T) {
index := "test_publist_tcp"
key := "test_publist_tcp"
db := 0
redisConfig := map[string]interface{}{
"hosts": []string{getRedisAddr()},
"index": index,
"key": key,
"db": db,
"datatype": "list",
"timeout": "5s",
Expand All @@ -130,11 +130,11 @@ func TestPublishListTCP(t *testing.T) {
}

func TestPublishListTLS(t *testing.T) {
index := "test_publist_tls"
key := "test_publist_tls"
db := 0
redisConfig := map[string]interface{}{
"hosts": []string{getSRedisAddr()},
"index": index,
"key": key,
"db": db,
"datatype": "list",
"timeout": "5s",
Expand All @@ -154,7 +154,7 @@ func testPublishList(t *testing.T, cfg map[string]interface{}) {
total := batches & batchSize

db := 0
index := cfg["index"].(string)
key := cfg["key"].(string)
if v, ok := cfg["db"]; ok {
db = v.(int)
}
Expand All @@ -166,15 +166,15 @@ func testPublishList(t *testing.T, cfg map[string]interface{}) {

// delete old key if present
defer conn.Close()
conn.Do("DEL", index)
conn.Do("DEL", key)

out := newRedisTestingOutput(t, cfg)
err = sendTestEvents(out, batches, batchSize)
assert.NoError(t, err)

results := make([][]byte, total)
for i := range results {
results[i], err = redis.Bytes(conn.Do("LPOP", index))
results[i], err = redis.Bytes(conn.Do("LPOP", key))
assert.NoError(t, err)
}

Expand All @@ -188,10 +188,10 @@ func testPublishList(t *testing.T, cfg map[string]interface{}) {

func TestPublishChannelTCP(t *testing.T) {
db := 0
index := "test_pubchan_tcp"
key := "test_pubchan_tcp"
redisConfig := map[string]interface{}{
"hosts": []string{getRedisAddr()},
"index": index,
"key": key,
"db": db,
"datatype": "channel",
"timeout": "5s",
Expand All @@ -202,10 +202,10 @@ func TestPublishChannelTCP(t *testing.T) {

func TestPublishChannelTLS(t *testing.T) {
db := 0
index := "test_pubchan_tls"
key := "test_pubchan_tls"
redisConfig := map[string]interface{}{
"hosts": []string{getSRedisAddr()},
"index": index,
"key": key,
"db": db,
"datatype": "channel",
"timeout": "5s",
Expand All @@ -225,7 +225,7 @@ func testPublishChannel(t *testing.T, cfg map[string]interface{}) {
total := batches & batchSize

db := 0
index := cfg["index"].(string)
key := cfg["key"].(string)
if v, ok := cfg["db"]; ok {
db = v.(int)
}
Expand All @@ -237,14 +237,14 @@ func testPublishChannel(t *testing.T, cfg map[string]interface{}) {

// delete old key if present
defer conn.Close()
conn.Do("DEL", index)
conn.Do("DEL", key)

// subscribe to packetbeat channel
psc := redis.PubSubConn{conn}
if err := psc.Subscribe(index); err != nil {
if err := psc.Subscribe(key); err != nil {
t.Fatal(err)
}
defer psc.Unsubscribe(index)
defer psc.Unsubscribe(key)

// connect and publish events
var wg sync.WaitGroup
Expand Down

0 comments on commit 245dd0a

Please sign in to comment.