From 245dd0aa38d0d3ff012e83d32de3db27679a9019 Mon Sep 17 00:00:00 2001 From: Tudor Golubenco Date: Wed, 20 Jul 2016 23:50:38 +0200 Subject: [PATCH] Renamed redis.index option to redis.key The "index" setting name has historical reasons, from the times that all outputs shared the same configuration options. Since the meaning and contents of the "index" setting are starting to vary per output, it's time to clean this up. The Redis "index" is now called "key". Before this change, the index was still somehow special, since it was set to the beatName outside of the output. This removes this hack, and sets index to beatName only in the Elasticsearch and Logstash modules. This also removes the `index` setting for the file output, but that was never documented. Part of #2074. --- CHANGELOG.asciidoc | 3 ++ libbeat/docs/outputconfig.asciidoc | 11 ++++- libbeat/outputs/elasticsearch/output.go | 4 ++ libbeat/outputs/fileout/config.go | 5 --- libbeat/outputs/logstash/logstash.go | 5 +++ libbeat/outputs/outputs.go | 4 -- libbeat/outputs/redis/config.go | 12 +++++- libbeat/outputs/redis/config_test.go | 30 +++++++++++++ libbeat/outputs/redis/redis.go | 12 +++--- .../outputs/redis/redis_integration_test.go | 42 +++++++++---------- 10 files changed, 88 insertions(+), 40 deletions(-) create mode 100644 libbeat/outputs/redis/config_test.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 43dd6c873bc..6369684f022 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -13,9 +13,12 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha4...master[Check the HEAD d ==== Breaking changes *Affecting all Beats* + - Rename the `filters` section to `processors`. {pull}1944[1944] - Introduce the condition with `when` in the processor configuration. {pull}1949[1949] - The Elasticsearch template is now loaded by default. {pull}1993[1993] +- The Redis output `index` setting is renamed to `key`. `index` still works but it's deprecated. {pull}2077[2077] +- The undocumented file output `index` setting was removed. Use `filename` instead. {pull}2077[2077] *Metricbeat* diff --git a/libbeat/docs/outputconfig.asciidoc b/libbeat/docs/outputconfig.asciidoc index d0ae824a257..8501242d897 100644 --- a/libbeat/docs/outputconfig.asciidoc +++ b/libbeat/docs/outputconfig.asciidoc @@ -631,8 +631,8 @@ output.redis: # password is set. password: "my_password" - # Optional index name. The default is {beatname_lc} and generates {beatname_lc} keys. - index: "{beatname_lc}" + # Optional key name. The default is {beatname_lc} + key: "{beatname_lc}" # Optional Redis database number where the events are stored # The default is 0. @@ -668,6 +668,13 @@ The Redis port to use if `hosts` does not contain a port number. The default is ===== index +deprecated[5.0.0-alpha5,The `index` setting is renamed to `key.] + +The name of the Redis list or channel the events are published to. The default is +"{beatname_lc}". + +===== key + The name of the Redis list or channel the events are published to. The default is "{beatname_lc}". diff --git a/libbeat/outputs/elasticsearch/output.go b/libbeat/outputs/elasticsearch/output.go index 568fa8c1d69..3f98fd02102 100644 --- a/libbeat/outputs/elasticsearch/output.go +++ b/libbeat/outputs/elasticsearch/output.go @@ -56,6 +56,10 @@ func New(beatName string, cfg *common.Config, topologyExpire int) (outputs.Outpu cfg.SetInt("bulk_max_size", -1, defaultBulkSize) } + if !cfg.HasField("index") { + cfg.SetString("index", -1, beatName) + } + output := &elasticsearchOutput{beatName: beatName} err := output.init(cfg, topologyExpire) if err != nil { diff --git a/libbeat/outputs/fileout/config.go b/libbeat/outputs/fileout/config.go index ae7f90a594b..8862e17ac1b 100644 --- a/libbeat/outputs/fileout/config.go +++ b/libbeat/outputs/fileout/config.go @@ -7,7 +7,6 @@ import ( ) type config struct { - Index string `config:"index"` Path string `config:"path"` Filename string `config:"filename"` RotateEveryKb int `config:"rotate_every_kb" validate:"min=1"` @@ -22,10 +21,6 @@ var ( ) func (c *config) Validate() error { - if c.Filename == "" && c.Index == "" { - return fmt.Errorf("File logging requires filename or index being set.") - } - if c.NumberOfFiles < 2 || c.NumberOfFiles > logp.RotatorMaxFiles { return fmt.Errorf("The number_of_files to keep should be between 2 and %v", logp.RotatorMaxFiles) diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index b3197e0ea6e..9c31243827f 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -47,6 +47,11 @@ func init() { } func new(beatName string, cfg *common.Config, _ int) (outputs.Outputer, error) { + + if !cfg.HasField("index") { + cfg.SetString("index", -1, beatName) + } + output := &logstash{} if err := output.init(cfg); err != nil { return nil, err diff --git a/libbeat/outputs/outputs.go b/libbeat/outputs/outputs.go index 851e7472dd0..4a64491e69f 100644 --- a/libbeat/outputs/outputs.go +++ b/libbeat/outputs/outputs.go @@ -76,10 +76,6 @@ func InitOutputs( continue } - if !config.HasField("index") { - config.SetString("index", -1, beatName) - } - output, err := plugin(beatName, config, topologyExpire) if err != nil { logp.Err("failed to initialize %s plugin as output: %s", name, err) diff --git a/libbeat/outputs/redis/config.go b/libbeat/outputs/redis/config.go index fcff02e7f27..d9c983b4f35 100644 --- a/libbeat/outputs/redis/config.go +++ b/libbeat/outputs/redis/config.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/transport" ) @@ -12,6 +13,7 @@ import ( type redisConfig struct { Password string `config:"password"` Index string `config:"index"` + Key string `config:"key"` Port int `config:"port"` LoadBalance bool `config:"loadbalance"` Timeout time.Duration `config:"timeout"` @@ -49,8 +51,14 @@ func (c *redisConfig) Validate() error { return fmt.Errorf("redis data type %v not supported", c.DataType) } - if c.Index == "" { - return errors.New("index required") + if c.Key != "" && c.Index != "" { + return errors.New("Cannot use both `output.redis.key` and `output.redis.index` configuration options." + + " Set only `output.redis.key`") + } + + if c.Key == "" && c.Index != "" { + c.Key = c.Index + logp.Warn("The `output.redis.index` configuration setting is deprecated. Use `output.redis.key` instead.") } return nil diff --git a/libbeat/outputs/redis/config_test.go b/libbeat/outputs/redis/config_test.go new file mode 100644 index 00000000000..c75631ed1ef --- /dev/null +++ b/libbeat/outputs/redis/config_test.go @@ -0,0 +1,30 @@ +package redis + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestValidate(t *testing.T) { + type io struct { + Name string + Input redisConfig + Valid bool + } + + tests := []io{ + io{"No config", redisConfig{Key: "", Index: ""}, true}, + io{"Only key", redisConfig{Key: "test", Index: ""}, true}, + io{"Only index", redisConfig{Key: "", Index: "test"}, true}, + io{"Both", redisConfig{Key: "test", Index: "test"}, false}, + + io{"Invalid Datatype", redisConfig{Key: "test", DataType: "something"}, false}, + io{"List Datatype", redisConfig{Key: "test", DataType: "list"}, true}, + io{"Channel Datatype", redisConfig{Key: "test", DataType: "channel"}, true}, + } + + for _, test := range tests { + assert.Equal(t, test.Input.Validate() == nil, test.Valid, test.Name) + } +} diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index e6fc53cf2c8..2c716416a1d 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -3,7 +3,6 @@ package redis import ( "errors" "expvar" - "fmt" "time" "github.com/elastic/beats/libbeat/common" @@ -18,6 +17,7 @@ import ( type redisOut struct { mode mode.ConnectionMode topology + beatName string } var debugf = logp.MakeDebug("redis") @@ -40,7 +40,7 @@ func init() { } func new(beatName string, cfg *common.Config, expireTopo int) (outputs.Outputer, error) { - r := &redisOut{} + r := &redisOut{beatName: beatName} if err := r.init(cfg, expireTopo); err != nil { return nil, err } @@ -69,9 +69,9 @@ func (r *redisOut) init(cfg *common.Config, expireTopo int) error { return errors.New("Bad Redis data type") } - index := []byte(config.Index) - if len(index) == 0 { - return fmt.Errorf("missing %v", cfg.PathOf("index")) + key := []byte(config.Key) + if len(key) == 0 { + key = []byte(r.beatName) } tls, err := outputs.LoadTLSConfig(config.TLS) @@ -105,7 +105,7 @@ func (r *redisOut) init(cfg *common.Config, expireTopo int) error { if err != nil { return nil, err } - return newClient(t, config.Password, config.Db, index, dataType), nil + return newClient(t, config.Password, config.Db, key, dataType), nil }) if err != nil { return err diff --git a/libbeat/outputs/redis/redis_integration_test.go b/libbeat/outputs/redis/redis_integration_test.go index fd20dae3b62..d9d879deb76 100644 --- a/libbeat/outputs/redis/redis_integration_test.go +++ b/libbeat/outputs/redis/redis_integration_test.go @@ -25,11 +25,11 @@ const ( func TestTopologyInRedisTCP(t *testing.T) { db := 1 - index := "test_topo_tcp" + key := "test_topo_tcp" redisHosts := []string{getRedisAddr()} redisConfig := map[string]interface{}{ "hosts": redisHosts, - "index": index, + "key": key, "host_topology": redisHosts[0], "db_topology": db, "timeout": "5s", @@ -40,11 +40,11 @@ func TestTopologyInRedisTCP(t *testing.T) { func TestTopologyInRedisTLS(t *testing.T) { db := 1 - index := "test_topo_tls" + key := "test_topo_tls" redisHosts := []string{getSRedisAddr()} redisConfig := map[string]interface{}{ "hosts": redisHosts, - "index": index, + "key": key, "host_topology": redisHosts[0], "db_topology": db, "timeout": "5s", @@ -70,7 +70,7 @@ func testTopologyInRedis(t *testing.T, cfg map[string]interface{}) { } db := 0 - index := cfg["index"].(string) + key := cfg["key"].(string) if v, ok := cfg["db_topology"]; ok { db = v.(int) } @@ -83,7 +83,7 @@ func testTopologyInRedis(t *testing.T, cfg map[string]interface{}) { } // delete old key if present defer conn.Close() - conn.Do("DEL", index) + conn.Do("DEL", key) } // 1. connect @@ -116,11 +116,11 @@ func testTopologyInRedis(t *testing.T, cfg map[string]interface{}) { } func TestPublishListTCP(t *testing.T) { - index := "test_publist_tcp" + key := "test_publist_tcp" db := 0 redisConfig := map[string]interface{}{ "hosts": []string{getRedisAddr()}, - "index": index, + "key": key, "db": db, "datatype": "list", "timeout": "5s", @@ -130,11 +130,11 @@ func TestPublishListTCP(t *testing.T) { } func TestPublishListTLS(t *testing.T) { - index := "test_publist_tls" + key := "test_publist_tls" db := 0 redisConfig := map[string]interface{}{ "hosts": []string{getSRedisAddr()}, - "index": index, + "key": key, "db": db, "datatype": "list", "timeout": "5s", @@ -154,7 +154,7 @@ func testPublishList(t *testing.T, cfg map[string]interface{}) { total := batches & batchSize db := 0 - index := cfg["index"].(string) + key := cfg["key"].(string) if v, ok := cfg["db"]; ok { db = v.(int) } @@ -166,7 +166,7 @@ func testPublishList(t *testing.T, cfg map[string]interface{}) { // delete old key if present defer conn.Close() - conn.Do("DEL", index) + conn.Do("DEL", key) out := newRedisTestingOutput(t, cfg) err = sendTestEvents(out, batches, batchSize) @@ -174,7 +174,7 @@ func testPublishList(t *testing.T, cfg map[string]interface{}) { results := make([][]byte, total) for i := range results { - results[i], err = redis.Bytes(conn.Do("LPOP", index)) + results[i], err = redis.Bytes(conn.Do("LPOP", key)) assert.NoError(t, err) } @@ -188,10 +188,10 @@ func testPublishList(t *testing.T, cfg map[string]interface{}) { func TestPublishChannelTCP(t *testing.T) { db := 0 - index := "test_pubchan_tcp" + key := "test_pubchan_tcp" redisConfig := map[string]interface{}{ "hosts": []string{getRedisAddr()}, - "index": index, + "key": key, "db": db, "datatype": "channel", "timeout": "5s", @@ -202,10 +202,10 @@ func TestPublishChannelTCP(t *testing.T) { func TestPublishChannelTLS(t *testing.T) { db := 0 - index := "test_pubchan_tls" + key := "test_pubchan_tls" redisConfig := map[string]interface{}{ "hosts": []string{getSRedisAddr()}, - "index": index, + "key": key, "db": db, "datatype": "channel", "timeout": "5s", @@ -225,7 +225,7 @@ func testPublishChannel(t *testing.T, cfg map[string]interface{}) { total := batches & batchSize db := 0 - index := cfg["index"].(string) + key := cfg["key"].(string) if v, ok := cfg["db"]; ok { db = v.(int) } @@ -237,14 +237,14 @@ func testPublishChannel(t *testing.T, cfg map[string]interface{}) { // delete old key if present defer conn.Close() - conn.Do("DEL", index) + conn.Do("DEL", key) // subscribe to packetbeat channel psc := redis.PubSubConn{conn} - if err := psc.Subscribe(index); err != nil { + if err := psc.Subscribe(key); err != nil { t.Fatal(err) } - defer psc.Unsubscribe(index) + defer psc.Unsubscribe(key) // connect and publish events var wg sync.WaitGroup