From 0b14c8dfc98b074e22a7f85b0aad69a12b0a744a Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Sat, 5 Jan 2019 00:50:18 +0100 Subject: [PATCH 01/10] Reuse code in Redis metricsets --- metricbeat/module/redis/info/info.go | 30 ++------- metricbeat/module/redis/key/key.go | 38 ++++------- metricbeat/module/redis/keyspace/keyspace.go | 29 ++------- metricbeat/module/redis/metricset.go | 66 ++++++++++++++++++++ 4 files changed, 86 insertions(+), 77 deletions(-) create mode 100644 metricbeat/module/redis/metricset.go diff --git a/metricbeat/module/redis/info/info.go b/metricbeat/module/redis/info/info.go index a1cec8a655de..8ace184d5562 100644 --- a/metricbeat/module/redis/info/info.go +++ b/metricbeat/module/redis/info/info.go @@ -19,15 +19,12 @@ package info import ( "strconv" - "time" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/redis" - - rd "github.com/garyburd/redigo/redis" ) var ( @@ -43,39 +40,22 @@ func init() { // MetricSet for fetching Redis server information and statistics. type MetricSet struct { - mb.BaseMetricSet - pool *rd.Pool + *redis.MetricSet } // New creates new instance of MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - // Unpack additional configuration options. - config := struct { - IdleTimeout time.Duration `config:"idle_timeout"` - Network string `config:"network"` - MaxConn int `config:"maxconn" validate:"min=1"` - Password string `config:"password"` - }{ - Network: "tcp", - MaxConn: 10, - Password: "", - } - err := base.Module().UnpackConfig(&config) + ms, err := redis.NewMetricSet(base) if err != nil { return nil, err } - - return &MetricSet{ - BaseMetricSet: base, - pool: redis.CreatePool(base.Host(), config.Password, config.Network, - config.MaxConn, config.IdleTimeout, base.Module().Config().Timeout), - }, nil + return &MetricSet{ms}, nil } // Fetch fetches metrics from Redis by issuing the INFO command. func (m *MetricSet) Fetch() (common.MapStr, error) { // Fetch default INFO. - info, err := redis.FetchRedisInfo("default", m.pool.Get()) + info, err := redis.FetchRedisInfo("default", m.Connection()) if err != nil { return nil, err } @@ -95,7 +75,7 @@ func (m *MetricSet) Fetch() (common.MapStr, error) { } } - slowLogLength, err := redis.FetchSlowLogLength(m.pool.Get()) + slowLogLength, err := redis.FetchSlowLogLength(m.Connection()) if err != nil { return nil, err } diff --git a/metricbeat/module/redis/key/key.go b/metricbeat/module/redis/key/key.go index 86943729dcac..f4dffd148fa5 100644 --- a/metricbeat/module/redis/key/key.go +++ b/metricbeat/module/redis/key/key.go @@ -18,14 +18,10 @@ package key import ( - "time" - "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/redis" - - rd "github.com/garyburd/redigo/redis" ) var ( @@ -40,8 +36,7 @@ func init() { // MetricSet for fetching Redis server information and statistics. type MetricSet struct { - mb.BaseMetricSet - pool *rd.Pool + *redis.MetricSet patterns []KeyPattern } @@ -54,34 +49,28 @@ type KeyPattern struct { // New creates new instance of MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - // Unpack additional configuration options. config := struct { - IdleTimeout time.Duration `config:"idle_timeout"` - Network string `config:"network"` - MaxConn int `config:"maxconn" validate:"min=1"` - Password string `config:"password"` - Patterns []KeyPattern `config:"key.patterns" validate:"nonzero,required"` - }{ - Network: "tcp", - MaxConn: 10, - Password: "", - } + Patterns []KeyPattern `config:"key.patterns" validate:"nonzero,required"` + }{} err := base.Module().UnpackConfig(&config) if err != nil { return nil, err } + ms, err := redis.NewMetricSet(base) + if err != nil { + return nil, err + } + return &MetricSet{ - BaseMetricSet: base, - pool: redis.CreatePool(base.Host(), config.Password, config.Network, - config.MaxConn, config.IdleTimeout, base.Module().Config().Timeout), - patterns: config.Patterns, + MetricSet: ms, + patterns: config.Patterns, }, nil } // Fetch fetches information from Redis keys func (m *MetricSet) Fetch(r mb.ReporterV2) { - conn := m.pool.Get() + conn := m.Connection() for _, p := range m.patterns { if err := redis.Select(conn, p.Keyspace); err != nil { logp.Err("Failed to select keyspace %d: %s", p.Keyspace, err) @@ -108,8 +97,3 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { } } } - -// Close connections -func (m *MetricSet) Close() error { - return m.pool.Close() -} diff --git a/metricbeat/module/redis/keyspace/keyspace.go b/metricbeat/module/redis/keyspace/keyspace.go index 9eb0e3a22a20..01393cb8d027 100644 --- a/metricbeat/module/redis/keyspace/keyspace.go +++ b/metricbeat/module/redis/keyspace/keyspace.go @@ -18,15 +18,11 @@ package keyspace import ( - "time" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/redis" - - rd "github.com/garyburd/redigo/redis" ) var ( @@ -42,39 +38,22 @@ func init() { // MetricSet for fetching Redis server information and statistics. type MetricSet struct { - mb.BaseMetricSet - pool *rd.Pool + *redis.MetricSet } // New creates new instance of MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - // Unpack additional configuration options. - config := struct { - IdleTimeout time.Duration `config:"idle_timeout"` - Network string `config:"network"` - MaxConn int `config:"maxconn" validate:"min=1"` - Password string `config:"password"` - }{ - Network: "tcp", - MaxConn: 10, - Password: "", - } - err := base.Module().UnpackConfig(&config) + ms, err := redis.NewMetricSet(base) if err != nil { return nil, err } - - return &MetricSet{ - BaseMetricSet: base, - pool: redis.CreatePool(base.Host(), config.Password, config.Network, - config.MaxConn, config.IdleTimeout, base.Module().Config().Timeout), - }, nil + return &MetricSet{ms}, nil } // Fetch fetches metrics from Redis by issuing the INFO command. func (m *MetricSet) Fetch() ([]common.MapStr, error) { // Fetch default INFO. - info, err := redis.FetchRedisInfo("keyspace", m.pool.Get()) + info, err := redis.FetchRedisInfo("keyspace", m.Connection()) if err != nil { return nil, err } diff --git a/metricbeat/module/redis/metricset.go b/metricbeat/module/redis/metricset.go new file mode 100644 index 000000000000..f19950433a0e --- /dev/null +++ b/metricbeat/module/redis/metricset.go @@ -0,0 +1,66 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package redis + +import ( + "time" + + rd "github.com/garyburd/redigo/redis" + + "github.com/elastic/beats/metricbeat/mb" +) + +// MetricSet for fetching Redis server information and statistics. +type MetricSet struct { + mb.BaseMetricSet + pool *rd.Pool +} + +// New creates new instance of MetricSet +func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) { + // Unpack additional configuration options. + config := struct { + IdleTimeout time.Duration `config:"idle_timeout"` + Network string `config:"network"` + MaxConn int `config:"maxconn" validate:"min=1"` + Password string `config:"password"` + }{ + Network: "tcp", + MaxConn: 10, + Password: "", + } + err := base.Module().UnpackConfig(&config) + if err != nil { + return nil, err + } + + return &MetricSet{ + BaseMetricSet: base, + pool: CreatePool(base.Host(), config.Password, config.Network, + config.MaxConn, config.IdleTimeout, base.Module().Config().Timeout), + }, nil +} + +func (m *MetricSet) Connection() rd.Conn { + return m.pool.Get() +} + +// Close redis connections +func (m *MetricSet) Close() error { + return m.pool.Close() +} From 7e748dbb38a26d83ff4e350b3b07f1ff33bf9843 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Sat, 5 Jan 2019 01:28:36 +0100 Subject: [PATCH 02/10] Use reporter V2 interface --- metricbeat/module/redis/info/data.go | 8 +++-- metricbeat/module/redis/info/info.go | 13 ++++--- .../redis/info/info_integration_test.go | 35 ++++++++++--------- metricbeat/module/redis/keyspace/data.go | 10 +++--- metricbeat/module/redis/keyspace/keyspace.go | 8 ++--- .../keyspace/keyspace_integration_test.go | 13 ++++--- 6 files changed, 46 insertions(+), 41 deletions(-) diff --git a/metricbeat/module/redis/info/data.go b/metricbeat/module/redis/info/data.go index 6ee9de991e0a..1a6799bfe68f 100644 --- a/metricbeat/module/redis/info/data.go +++ b/metricbeat/module/redis/info/data.go @@ -18,9 +18,9 @@ package info import ( - "github.com/elastic/beats/libbeat/common" s "github.com/elastic/beats/libbeat/common/schema" c "github.com/elastic/beats/libbeat/common/schema/mapstrstr" + "github.com/elastic/beats/metricbeat/mb" ) var ( @@ -237,12 +237,14 @@ var ( ) // Map data to MapStr -func eventMapping(info map[string]string) common.MapStr { +func eventMapping(r mb.ReporterV2, info map[string]string) { // Full mapping from info source := map[string]interface{}{} for key, val := range info { source[key] = val } data, _ := schema.Apply(source) - return data + r.Event(mb.Event{ + MetricSetFields: data, + }) } diff --git a/metricbeat/module/redis/info/info.go b/metricbeat/module/redis/info/info.go index 8ace184d5562..cec4c730b8e3 100644 --- a/metricbeat/module/redis/info/info.go +++ b/metricbeat/module/redis/info/info.go @@ -20,7 +20,6 @@ package info import ( "strconv" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" @@ -53,11 +52,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } // Fetch fetches metrics from Redis by issuing the INFO command. -func (m *MetricSet) Fetch() (common.MapStr, error) { +func (m *MetricSet) Fetch(r mb.ReporterV2) { // Fetch default INFO. info, err := redis.FetchRedisInfo("default", m.Connection()) if err != nil { - return nil, err + logp.Err("Failed to fetch redis info: %s", err) + r.Error(err) + return } // In 5.0 some fields are renamed, maintain both names, old ones will be deprecated @@ -77,10 +78,12 @@ func (m *MetricSet) Fetch() (common.MapStr, error) { slowLogLength, err := redis.FetchSlowLogLength(m.Connection()) if err != nil { - return nil, err + logp.Err("Failed to fetch slow log length: %s", err) + r.Error(err) + return } info["slowlog_len"] = strconv.FormatInt(slowLogLength, 10) debugf("Redis INFO from %s: %+v", m.Host(), info) - return eventMapping(info), nil + eventMapping(r, info) } diff --git a/metricbeat/module/redis/info/info_integration_test.go b/metricbeat/module/redis/info/info_integration_test.go index bbcfd1b8601c..c1548c83b05a 100644 --- a/metricbeat/module/redis/info/info_integration_test.go +++ b/metricbeat/module/redis/info/info_integration_test.go @@ -40,13 +40,14 @@ var redisHost = redis.GetRedisEnvHost() + ":" + redis.GetRedisEnvPort() func TestFetch(t *testing.T) { compose.EnsureUp(t, "redis") - f := mbtest.NewEventFetcher(t, getConfig("")) - event, err := f.Fetch() + ms := mbtest.NewReportingMetricSetV2(t, getConfig("")) + events, err := mbtest.ReportingFetchV2(ms) if err != nil { t.Fatal("fetch", err) } + event := events[0].MetricSetFields - t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) + t.Logf("%s/%s event: %+v", ms.Module().Name(), ms.Name(), event) // Check fields assert.Equal(t, 9, len(event)) @@ -57,14 +58,14 @@ func TestFetch(t *testing.T) { func TestData(t *testing.T) { compose.EnsureUp(t, "redis") - f := mbtest.NewEventFetcher(t, getConfig("")) - - err := mbtest.WriteEvent(f, t) + ms := mbtest.NewReportingMetricSetV2(t, getConfig("")) + err := mbtest.WriteEventsReporterV2(ms, t, "") if err != nil { t.Fatal("write", err) } } +// TODO: To be tested in the redis module func TestPasswords(t *testing.T) { compose.EnsureUp(t, "redis") @@ -82,23 +83,23 @@ func TestPasswords(t *testing.T) { } // Test Fetch metrics with missing password - f := mbtest.NewEventFetcher(t, getConfig("")) - _, err = f.Fetch() - if assert.Error(t, err, "missing password") { - assert.Contains(t, err, "NOAUTH Authentication required.") + ms := mbtest.NewReportingMetricSetV2(t, getConfig("")) + _, errors := mbtest.ReportingFetchV2(ms) + if assert.NotEmpty(t, errors, "errors expected") && assert.Error(t, errors[0], "missing password") { + assert.Contains(t, errors[0], "NOAUTH Authentication required.") } // Config redis and metricset with an invalid password - f = mbtest.NewEventFetcher(t, getConfig("blah")) - _, err = f.Fetch() - if assert.Error(t, err, "invalid password") { - assert.Contains(t, err, "ERR invalid password") + ms = mbtest.NewReportingMetricSetV2(t, getConfig("blah")) + _, errors = mbtest.ReportingFetchV2(ms) + if assert.NotEmpty(t, errors, "errors expected") && assert.Error(t, errors[0], "invalid password") { + assert.Contains(t, errors[0], "ERR invalid password") } // Config redis and metricset with a valid password - f = mbtest.NewEventFetcher(t, getConfig(password)) - _, err = f.Fetch() - assert.NoError(t, err, "valid password") + ms = mbtest.NewReportingMetricSetV2(t, getConfig(password)) + _, errors = mbtest.ReportingFetchV2(ms) + assert.Empty(t, errors, "valid password") } // addPassword will add a password to redis. diff --git a/metricbeat/module/redis/keyspace/data.go b/metricbeat/module/redis/keyspace/data.go index 15eff5b06ecc..a1ca18779e21 100644 --- a/metricbeat/module/redis/keyspace/data.go +++ b/metricbeat/module/redis/keyspace/data.go @@ -23,18 +23,18 @@ import ( "github.com/elastic/beats/libbeat/common" s "github.com/elastic/beats/libbeat/common/schema" c "github.com/elastic/beats/libbeat/common/schema/mapstrstr" + "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/redis" ) // Map data to MapStr -func eventsMapping(info map[string]string) []common.MapStr { - events := []common.MapStr{} +func eventsMapping(r mb.ReporterV2, info map[string]string) { for key, space := range getKeyspaceStats(info) { space["id"] = key - events = append(events, space) + r.Event(mb.Event{ + MetricSetFields: space, + }) } - - return events } func getKeyspaceStats(info map[string]string) map[string]common.MapStr { diff --git a/metricbeat/module/redis/keyspace/keyspace.go b/metricbeat/module/redis/keyspace/keyspace.go index 01393cb8d027..498d6209a128 100644 --- a/metricbeat/module/redis/keyspace/keyspace.go +++ b/metricbeat/module/redis/keyspace/keyspace.go @@ -18,7 +18,6 @@ package keyspace import ( - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" @@ -51,13 +50,14 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } // Fetch fetches metrics from Redis by issuing the INFO command. -func (m *MetricSet) Fetch() ([]common.MapStr, error) { +func (m *MetricSet) Fetch(r mb.ReporterV2) { // Fetch default INFO. info, err := redis.FetchRedisInfo("keyspace", m.Connection()) if err != nil { - return nil, err + logp.Err("Failed to fetch redis info: %s", err) + return } debugf("Redis INFO from %s: %+v", m.Host(), info) - return eventsMapping(info), nil + eventsMapping(r, info) } diff --git a/metricbeat/module/redis/keyspace/keyspace_integration_test.go b/metricbeat/module/redis/keyspace/keyspace_integration_test.go index 35710d174e20..03b4d296b05c 100644 --- a/metricbeat/module/redis/keyspace/keyspace_integration_test.go +++ b/metricbeat/module/redis/keyspace/keyspace_integration_test.go @@ -39,18 +39,18 @@ func TestFetch(t *testing.T) { addEntry(t) // Fetch data - f := mbtest.NewEventsFetcher(t, getConfig()) - events, err := f.Fetch() + ms := mbtest.NewReportingMetricSetV2(t, getConfig()) + events, err := mbtest.ReportingFetchV2(ms) if err != nil { t.Fatal("fetch", err) } - t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), events) + t.Logf("%s/%s event: %+v", ms.Module().Name(), ms.Name(), events) // Make sure at least 1 db keyspace exists assert.True(t, len(events) > 0) - keyspace := events[0] + keyspace := events[0].MetricSetFields assert.True(t, keyspace["avg_ttl"].(int64) >= 0) assert.True(t, keyspace["expires"].(int64) >= 0) @@ -63,9 +63,8 @@ func TestData(t *testing.T) { addEntry(t) - f := mbtest.NewEventsFetcher(t, getConfig()) - - err := mbtest.WriteEvents(f, t) + ms := mbtest.NewReportingMetricSetV2(t, getConfig()) + err := mbtest.WriteEventsReporterV2(ms, t, "") if err != nil { t.Fatal("write", err) } From 57ec90f60dda031c2379e6fdc3adee518aeeeaa1 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Sat, 5 Jan 2019 02:24:17 +0100 Subject: [PATCH 03/10] Move passwords test to redis main module --- .../redis/info/info_integration_test.go | 90 +---------- .../redis/metricset_integration_test.go | 149 ++++++++++++++++++ 2 files changed, 152 insertions(+), 87 deletions(-) create mode 100644 metricbeat/module/redis/metricset_integration_test.go diff --git a/metricbeat/module/redis/info/info_integration_test.go b/metricbeat/module/redis/info/info_integration_test.go index c1548c83b05a..f723bc372e43 100644 --- a/metricbeat/module/redis/info/info_integration_test.go +++ b/metricbeat/module/redis/info/info_integration_test.go @@ -27,20 +27,15 @@ import ( mbtest "github.com/elastic/beats/metricbeat/mb/testing" "github.com/elastic/beats/metricbeat/module/redis" - rd "github.com/garyburd/redigo/redis" "github.com/stretchr/testify/assert" ) -const ( - password = "foobared" -) - var redisHost = redis.GetRedisEnvHost() + ":" + redis.GetRedisEnvPort() func TestFetch(t *testing.T) { compose.EnsureUp(t, "redis") - ms := mbtest.NewReportingMetricSetV2(t, getConfig("")) + ms := mbtest.NewReportingMetricSetV2(t, getConfig()) events, err := mbtest.ReportingFetchV2(ms) if err != nil { t.Fatal("fetch", err) @@ -58,96 +53,17 @@ func TestFetch(t *testing.T) { func TestData(t *testing.T) { compose.EnsureUp(t, "redis") - ms := mbtest.NewReportingMetricSetV2(t, getConfig("")) + ms := mbtest.NewReportingMetricSetV2(t, getConfig()) err := mbtest.WriteEventsReporterV2(ms, t, "") if err != nil { t.Fatal("write", err) } } -// TODO: To be tested in the redis module -func TestPasswords(t *testing.T) { - compose.EnsureUp(t, "redis") - - // Add password and ensure it gets reset - defer func() { - err := resetPassword(redisHost, password) - if err != nil { - t.Fatal("resetting password", err) - } - }() - - err := addPassword(redisHost, password) - if err != nil { - t.Fatal("adding password", err) - } - - // Test Fetch metrics with missing password - ms := mbtest.NewReportingMetricSetV2(t, getConfig("")) - _, errors := mbtest.ReportingFetchV2(ms) - if assert.NotEmpty(t, errors, "errors expected") && assert.Error(t, errors[0], "missing password") { - assert.Contains(t, errors[0], "NOAUTH Authentication required.") - } - - // Config redis and metricset with an invalid password - ms = mbtest.NewReportingMetricSetV2(t, getConfig("blah")) - _, errors = mbtest.ReportingFetchV2(ms) - if assert.NotEmpty(t, errors, "errors expected") && assert.Error(t, errors[0], "invalid password") { - assert.Contains(t, errors[0], "ERR invalid password") - } - - // Config redis and metricset with a valid password - ms = mbtest.NewReportingMetricSetV2(t, getConfig(password)) - _, errors = mbtest.ReportingFetchV2(ms) - assert.Empty(t, errors, "valid password") -} - -// addPassword will add a password to redis. -func addPassword(host, pass string) error { - c, err := rd.Dial("tcp", host) - if err != nil { - return err - } - defer c.Close() - - _, err = c.Do("CONFIG", "SET", "requirepass", pass) - return err -} - -// resetPassword changes the password to the redis DB. -func resetPassword(host, currentPass string) error { - c, err := rd.Dial("tcp", host) - if err != nil { - return err - } - defer c.Close() - - _, err = c.Do("AUTH", currentPass) - if err != nil { - return err - } - - _, err = c.Do("CONFIG", "SET", "requirepass", "") - return err -} - -// writeToRedis will write to the default DB 0. -func writeToRedis(host string) error { - c, err := rd.Dial("tcp", host) - if err != nil { - return err - } - defer c.Close() - - _, err = c.Do("SET", "foo", "bar") - return err -} - -func getConfig(password string) map[string]interface{} { +func getConfig() map[string]interface{} { return map[string]interface{}{ "module": "redis", "metricsets": []string{"info"}, "hosts": []string{redisHost}, - "password": password, } } diff --git a/metricbeat/module/redis/metricset_integration_test.go b/metricbeat/module/redis/metricset_integration_test.go new file mode 100644 index 000000000000..4c21051ecce9 --- /dev/null +++ b/metricbeat/module/redis/metricset_integration_test.go @@ -0,0 +1,149 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build integration + +package redis + +import ( + "testing" + + rd "github.com/garyburd/redigo/redis" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/tests/compose" + "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/beats/metricbeat/mb/parse" +) + +const ( + password = "foobared" +) + +func TestPasswords(t *testing.T) { + compose.EnsureUp(t, "redis") + + registry := mb.NewRegister() + err := registry.AddModule("redis", mb.DefaultModuleFactory) + require.NoError(t, err) + + registry.MustAddMetricSet("redis", "test", newDummyMetricSet, + mb.WithHostParser(parse.PassThruHostParser), + ) + + // Add password and ensure it gets reset + defer func() { + err := resetPassword(host, password) + if err != nil { + t.Fatal("resetting password", err) + } + }() + + err = addPassword(host, password) + if err != nil { + t.Fatal("adding password", err) + } + + // Test Fetch metrics with missing password + ms := getMetricSet(t, registry, getConfig("")) + _, err = ms.Connection().Do("PING") + if assert.Error(t, err, "missing password") { + assert.Contains(t, err, "NOAUTH Authentication required.") + } + + // Config redis and metricset with an invalid password + ms = getMetricSet(t, registry, getConfig("blah")) + _, err = ms.Connection().Do("PING") + if assert.Error(t, err, "invalid password") { + assert.Contains(t, err, "ERR invalid password") + } + + // Config redis and metricset with a valid password + ms = getMetricSet(t, registry, getConfig(password)) + _, err = ms.Connection().Do("PING") + assert.Empty(t, err, "valid password") +} + +// addPassword will add a password to redis. +func addPassword(host, pass string) error { + c, err := rd.Dial("tcp", host) + if err != nil { + return err + } + defer c.Close() + + _, err = c.Do("CONFIG", "SET", "requirepass", pass) + return err +} + +// resetPassword changes the password to the redis DB. +func resetPassword(host, currentPass string) error { + c, err := rd.Dial("tcp", host) + if err != nil { + return err + } + defer c.Close() + + _, err = c.Do("AUTH", currentPass) + if err != nil { + return err + } + + _, err = c.Do("CONFIG", "SET", "requirepass", "") + return err +} + +// dummyMetricSet is a metricset used only to instantiate a metricset +// from config using a registry +type dummyMetricSet struct { + *MetricSet +} + +func newDummyMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { + ms, err := NewMetricSet(base) + return &dummyMetricSet{ms}, err +} + +func (m *dummyMetricSet) Fetch(r mb.ReporterV2) { +} + +func getMetricSet(t *testing.T, registry *mb.Register, config map[string]interface{}) *MetricSet { + t.Helper() + + c, err := common.NewConfigFrom(config) + require.NoError(t, err) + + _, metricsets, err := mb.NewModule(c, registry) + require.NoError(t, err) + require.Len(t, metricsets, 1) + + ms, ok := metricsets[0].(*dummyMetricSet) + require.True(t, ok, "metricset must be dummyMetricSet") + + return ms.MetricSet +} + +func getConfig(password string) map[string]interface{} { + return map[string]interface{}{ + "module": "redis", + "metricsets": "test", + "hosts": []string{host}, + "password": password, + } +} From 1567917bd147580b0ffab5949b14208f05202d17 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Sat, 5 Jan 2019 02:32:40 +0100 Subject: [PATCH 04/10] Remove unneeded close --- metricbeat/module/redis/redis.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/metricbeat/module/redis/redis.go b/metricbeat/module/redis/redis.go index 06cb76193ec5..9c039b6bf8d1 100644 --- a/metricbeat/module/redis/redis.go +++ b/metricbeat/module/redis/redis.go @@ -65,8 +65,6 @@ func ParseRedisLine(s string, delimiter string) []string { // FetchRedisInfo returns a map of requested stats. func FetchRedisInfo(stat string, c rd.Conn) (map[string]string, error) { - defer c.Close() - out, err := rd.String(c.Do("INFO", stat)) if err != nil { logp.Err("Error retrieving INFO stats: %v", err) From 2d68ae54cbc2d17d3c0d9160ed2fdea8104fdb02 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Sat, 5 Jan 2019 02:48:56 +0100 Subject: [PATCH 05/10] Add test for fetch info --- .../module/redis/redis_integration_test.go | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/metricbeat/module/redis/redis_integration_test.go b/metricbeat/module/redis/redis_integration_test.go index dca51fb8f415..933a5ce6ee72 100644 --- a/metricbeat/module/redis/redis_integration_test.go +++ b/metricbeat/module/redis/redis_integration_test.go @@ -20,6 +20,7 @@ package redis import ( + "strings" "testing" rd "github.com/garyburd/redigo/redis" @@ -32,6 +33,41 @@ import ( var host = GetRedisEnvHost() + ":" + GetRedisEnvPort() +func TestFetchRedisInfo(t *testing.T) { + compose.EnsureUp(t, "redis") + + conn, err := rd.Dial("tcp", host) + if err != nil { + t.Fatal("connect", err) + } + defer conn.Close() + + t.Run("default info", func(t *testing.T) { + info, err := FetchRedisInfo("default", conn) + require.NoError(t, err) + + _, ok := info["redis_version"] + assert.True(t, ok, "redis_version should be in redis info") + }) + + t.Run("keyspace info", func(t *testing.T) { + conn.Do("FLUSHALL") + conn.Do("SET", "foo", "bar") + + info, err := FetchRedisInfo("keyspace", conn) + require.NoError(t, err) + + dbFound := false + for k := range info { + if strings.HasPrefix(k, "db") { + dbFound = true + break + } + } + assert.True(t, dbFound, "there should be keyspaces in redis info") + }) +} + func TestFetchKeys(t *testing.T) { compose.EnsureUp(t, "redis") From 038d398967bddde9cae4d9895c1e727e0138c947 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Sat, 5 Jan 2019 03:03:07 +0100 Subject: [PATCH 06/10] Make houndci happy --- metricbeat/module/redis/metricset.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metricbeat/module/redis/metricset.go b/metricbeat/module/redis/metricset.go index f19950433a0e..0aa555f0b487 100644 --- a/metricbeat/module/redis/metricset.go +++ b/metricbeat/module/redis/metricset.go @@ -31,7 +31,7 @@ type MetricSet struct { pool *rd.Pool } -// New creates new instance of MetricSet +// NewMetricSet creates the base for Redis metricsets func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) { // Unpack additional configuration options. config := struct { @@ -56,6 +56,7 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) { }, nil } +// Connection returns a redis connection from the pool func (m *MetricSet) Connection() rd.Conn { return m.pool.Get() } From 9075c5fb2651d8faf4ae68c2f474ca820a6d8dee Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Tue, 8 Jan 2019 17:15:55 +0100 Subject: [PATCH 07/10] Add context to errors --- metricbeat/module/redis/info/info.go | 4 +++- metricbeat/module/redis/key/key.go | 8 +++++--- metricbeat/module/redis/keyspace/keyspace.go | 6 ++++-- metricbeat/module/redis/metricset.go | 3 ++- 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/metricbeat/module/redis/info/info.go b/metricbeat/module/redis/info/info.go index cec4c730b8e3..fe5a5cb9136a 100644 --- a/metricbeat/module/redis/info/info.go +++ b/metricbeat/module/redis/info/info.go @@ -20,6 +20,8 @@ package info import ( "strconv" + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" @@ -46,7 +48,7 @@ type MetricSet struct { func New(base mb.BaseMetricSet) (mb.MetricSet, error) { ms, err := redis.NewMetricSet(base) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to create 'info' metricset") } return &MetricSet{ms}, nil } diff --git a/metricbeat/module/redis/key/key.go b/metricbeat/module/redis/key/key.go index f4dffd148fa5..4014e0805754 100644 --- a/metricbeat/module/redis/key/key.go +++ b/metricbeat/module/redis/key/key.go @@ -18,6 +18,8 @@ package key import ( + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" @@ -54,12 +56,12 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { }{} err := base.Module().UnpackConfig(&config) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to unpack configuration for 'key' metricset") } ms, err := redis.NewMetricSet(base) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to create 'key' metricset") } return &MetricSet{ @@ -79,7 +81,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { keys, err := redis.FetchKeys(conn, p.Pattern, p.Limit) if err != nil { - logp.Err("Failed to fetch list of keys in keyspace %d with pattern '%s': %s", p.Keyspace, p.Pattern, err) + logp.Err("Failed to list keys in keyspace %d with pattern '%s': %s", p.Keyspace, p.Pattern, err) continue } if p.Limit > 0 && len(keys) > int(p.Limit) { diff --git a/metricbeat/module/redis/keyspace/keyspace.go b/metricbeat/module/redis/keyspace/keyspace.go index 498d6209a128..8e6354e820be 100644 --- a/metricbeat/module/redis/keyspace/keyspace.go +++ b/metricbeat/module/redis/keyspace/keyspace.go @@ -18,6 +18,8 @@ package keyspace import ( + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" @@ -44,7 +46,7 @@ type MetricSet struct { func New(base mb.BaseMetricSet) (mb.MetricSet, error) { ms, err := redis.NewMetricSet(base) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to create 'keyspace' metricset") } return &MetricSet{ms}, nil } @@ -54,7 +56,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { // Fetch default INFO. info, err := redis.FetchRedisInfo("keyspace", m.Connection()) if err != nil { - logp.Err("Failed to fetch redis info: %s", err) + logp.Err("Failed to fetch redis info for keyspaces: %s", err) return } diff --git a/metricbeat/module/redis/metricset.go b/metricbeat/module/redis/metricset.go index 0aa555f0b487..79c40835b2ff 100644 --- a/metricbeat/module/redis/metricset.go +++ b/metricbeat/module/redis/metricset.go @@ -21,6 +21,7 @@ import ( "time" rd "github.com/garyburd/redigo/redis" + "github.com/pkg/errors" "github.com/elastic/beats/metricbeat/mb" ) @@ -46,7 +47,7 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) { } err := base.Module().UnpackConfig(&config) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to unpack configuration") } return &MetricSet{ From e24d9970cf88424a5f992d4189fe9525149dbe2f Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Tue, 8 Jan 2019 17:16:28 +0100 Subject: [PATCH 08/10] Remove error reporting that was used before for tests validation --- metricbeat/module/redis/info/info.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/metricbeat/module/redis/info/info.go b/metricbeat/module/redis/info/info.go index fe5a5cb9136a..57f0fa86c45e 100644 --- a/metricbeat/module/redis/info/info.go +++ b/metricbeat/module/redis/info/info.go @@ -59,7 +59,6 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { info, err := redis.FetchRedisInfo("default", m.Connection()) if err != nil { logp.Err("Failed to fetch redis info: %s", err) - r.Error(err) return } @@ -81,7 +80,6 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { slowLogLength, err := redis.FetchSlowLogLength(m.Connection()) if err != nil { logp.Err("Failed to fetch slow log length: %s", err) - r.Error(err) return } info["slowlog_len"] = strconv.FormatInt(slowLogLength, 10) From c83bba531a94311d4ce4c0bc95e7508d83e47b00 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Tue, 8 Jan 2019 17:23:21 +0100 Subject: [PATCH 09/10] Rephrase errors to use 'read' instead of 'unpack' --- metricbeat/module/redis/key/key.go | 2 +- metricbeat/module/redis/metricset.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/metricbeat/module/redis/key/key.go b/metricbeat/module/redis/key/key.go index 4014e0805754..4ed3c1f4a301 100644 --- a/metricbeat/module/redis/key/key.go +++ b/metricbeat/module/redis/key/key.go @@ -56,7 +56,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { }{} err := base.Module().UnpackConfig(&config) if err != nil { - return nil, errors.Wrap(err, "failed to unpack configuration for 'key' metricset") + return nil, errors.Wrap(err, "failed to read configuration for 'key' metricset") } ms, err := redis.NewMetricSet(base) diff --git a/metricbeat/module/redis/metricset.go b/metricbeat/module/redis/metricset.go index 79c40835b2ff..8a2e53aab2d1 100644 --- a/metricbeat/module/redis/metricset.go +++ b/metricbeat/module/redis/metricset.go @@ -47,7 +47,7 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) { } err := base.Module().UnpackConfig(&config) if err != nil { - return nil, errors.Wrap(err, "failed to unpack configuration") + return nil, errors.Wrap(err, "failed to read configuration") } return &MetricSet{ From 344b673d655d1b61ddaffe4c5f35c9068145d99f Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 10 Jan 2019 20:23:09 +0100 Subject: [PATCH 10/10] Make use of return value of reporter to interrupt Fetch --- metricbeat/module/redis/key/data.go | 6 +++--- metricbeat/module/redis/key/key.go | 6 +++++- metricbeat/module/redis/keyspace/data.go | 8 ++++++-- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/metricbeat/module/redis/key/data.go b/metricbeat/module/redis/key/data.go index 65061b83c0ff..076a5bce3490 100644 --- a/metricbeat/module/redis/key/data.go +++ b/metricbeat/module/redis/key/data.go @@ -24,14 +24,14 @@ import ( "github.com/elastic/beats/metricbeat/mb" ) -func eventMapping(r mb.ReporterV2, keyspace uint, info map[string]interface{}) { +func eventMapping(keyspace uint, info map[string]interface{}) mb.Event { info["id"] = fmt.Sprintf("%d:%s", keyspace, info["name"]) - r.Event(mb.Event{ + return mb.Event{ MetricSetFields: info, ModuleFields: common.MapStr{ "keyspace": common.MapStr{ "id": fmt.Sprintf("db%d", keyspace), }, }, - }) + } } diff --git a/metricbeat/module/redis/key/key.go b/metricbeat/module/redis/key/key.go index 4ed3c1f4a301..c60db25ad445 100644 --- a/metricbeat/module/redis/key/key.go +++ b/metricbeat/module/redis/key/key.go @@ -95,7 +95,11 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { logp.Err("Failed to fetch key info for key %s in keyspace %d", key, p.Keyspace) continue } - eventMapping(r, p.Keyspace, keyInfo) + event := eventMapping(p.Keyspace, keyInfo) + if !r.Event(event) { + debugf("Failed to report event, interrupting Fetch") + return + } } } } diff --git a/metricbeat/module/redis/keyspace/data.go b/metricbeat/module/redis/keyspace/data.go index a1ca18779e21..5b0bcda18799 100644 --- a/metricbeat/module/redis/keyspace/data.go +++ b/metricbeat/module/redis/keyspace/data.go @@ -31,9 +31,13 @@ import ( func eventsMapping(r mb.ReporterV2, info map[string]string) { for key, space := range getKeyspaceStats(info) { space["id"] = key - r.Event(mb.Event{ + event := mb.Event{ MetricSetFields: space, - }) + } + if !r.Event(event) { + debugf("Failed to report event, interrupting Fetch") + return + } } }