Skip to content

Commit

Permalink
Metricbeat redis refactors (elastic#9907)
Browse files Browse the repository at this point in the history
Refactor redis metricsets to reuse common code, ensure that connection pools
are closed when metricset is stopped and to use v2 reporters.

Move password tests from info metricset to module level.

Add test for info fetching.
  • Loading branch information
jsoriano authored Jan 14, 2019
1 parent 2ec7b55 commit cacf231
Show file tree
Hide file tree
Showing 12 changed files with 326 additions and 199 deletions.
8 changes: 5 additions & 3 deletions metricbeat/module/redis/info/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package info

import (
"github.com/elastic/beats/libbeat/common"
s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstrstr"
"github.com/elastic/beats/metricbeat/mb"
)

var (
Expand Down Expand Up @@ -237,12 +237,14 @@ var (
)

// Map data to MapStr
func eventMapping(info map[string]string) common.MapStr {
func eventMapping(r mb.ReporterV2, info map[string]string) {
// Full mapping from info
source := map[string]interface{}{}
for key, val := range info {
source[key] = val
}
data, _ := schema.Apply(source)
return data
r.Event(mb.Event{
MetricSetFields: data,
})
}
45 changes: 14 additions & 31 deletions metricbeat/module/redis/info/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ package info

import (
"strconv"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/redis"

rd "github.com/garyburd/redigo/redis"
)

var (
Expand All @@ -43,41 +41,25 @@ func init() {

// MetricSet for fetching Redis server information and statistics.
type MetricSet struct {
mb.BaseMetricSet
pool *rd.Pool
*redis.MetricSet
}

// New creates new instance of MetricSet
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Unpack additional configuration options.
config := struct {
IdleTimeout time.Duration `config:"idle_timeout"`
Network string `config:"network"`
MaxConn int `config:"maxconn" validate:"min=1"`
Password string `config:"password"`
}{
Network: "tcp",
MaxConn: 10,
Password: "",
}
err := base.Module().UnpackConfig(&config)
ms, err := redis.NewMetricSet(base)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to create 'info' metricset")
}

return &MetricSet{
BaseMetricSet: base,
pool: redis.CreatePool(base.Host(), config.Password, config.Network,
config.MaxConn, config.IdleTimeout, base.Module().Config().Timeout),
}, nil
return &MetricSet{ms}, nil
}

// Fetch fetches metrics from Redis by issuing the INFO command.
func (m *MetricSet) Fetch() (common.MapStr, error) {
func (m *MetricSet) Fetch(r mb.ReporterV2) {
// Fetch default INFO.
info, err := redis.FetchRedisInfo("default", m.pool.Get())
info, err := redis.FetchRedisInfo("default", m.Connection())
if err != nil {
return nil, err
logp.Err("Failed to fetch redis info: %s", err)
return
}

// In 5.0 some fields are renamed, maintain both names, old ones will be deprecated
Expand All @@ -95,12 +77,13 @@ func (m *MetricSet) Fetch() (common.MapStr, error) {
}
}

slowLogLength, err := redis.FetchSlowLogLength(m.pool.Get())
slowLogLength, err := redis.FetchSlowLogLength(m.Connection())
if err != nil {
return nil, err
logp.Err("Failed to fetch slow log length: %s", err)
return
}
info["slowlog_len"] = strconv.FormatInt(slowLogLength, 10)

debugf("Redis INFO from %s: %+v", m.Host(), info)
return eventMapping(info), nil
eventMapping(r, info)
}
97 changes: 7 additions & 90 deletions metricbeat/module/redis/info/info_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,22 @@ import (
mbtest "github.com/elastic/beats/metricbeat/mb/testing"
"github.com/elastic/beats/metricbeat/module/redis"

rd "github.com/garyburd/redigo/redis"
"github.com/stretchr/testify/assert"
)

const (
password = "foobared"
)

var redisHost = redis.GetRedisEnvHost() + ":" + redis.GetRedisEnvPort()

func TestFetch(t *testing.T) {
compose.EnsureUp(t, "redis")

f := mbtest.NewEventFetcher(t, getConfig(""))
event, err := f.Fetch()
ms := mbtest.NewReportingMetricSetV2(t, getConfig())
events, err := mbtest.ReportingFetchV2(ms)
if err != nil {
t.Fatal("fetch", err)
}
event := events[0].MetricSetFields

t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event)
t.Logf("%s/%s event: %+v", ms.Module().Name(), ms.Name(), event)

// Check fields
assert.Equal(t, 9, len(event))
Expand All @@ -57,96 +53,17 @@ func TestFetch(t *testing.T) {
func TestData(t *testing.T) {
compose.EnsureUp(t, "redis")

f := mbtest.NewEventFetcher(t, getConfig(""))

err := mbtest.WriteEvent(f, t)
ms := mbtest.NewReportingMetricSetV2(t, getConfig())
err := mbtest.WriteEventsReporterV2(ms, t, "")
if err != nil {
t.Fatal("write", err)
}
}

func TestPasswords(t *testing.T) {
compose.EnsureUp(t, "redis")

// Add password and ensure it gets reset
defer func() {
err := resetPassword(redisHost, password)
if err != nil {
t.Fatal("resetting password", err)
}
}()

err := addPassword(redisHost, password)
if err != nil {
t.Fatal("adding password", err)
}

// Test Fetch metrics with missing password
f := mbtest.NewEventFetcher(t, getConfig(""))
_, err = f.Fetch()
if assert.Error(t, err, "missing password") {
assert.Contains(t, err, "NOAUTH Authentication required.")
}

// Config redis and metricset with an invalid password
f = mbtest.NewEventFetcher(t, getConfig("blah"))
_, err = f.Fetch()
if assert.Error(t, err, "invalid password") {
assert.Contains(t, err, "ERR invalid password")
}

// Config redis and metricset with a valid password
f = mbtest.NewEventFetcher(t, getConfig(password))
_, err = f.Fetch()
assert.NoError(t, err, "valid password")
}

// addPassword will add a password to redis.
func addPassword(host, pass string) error {
c, err := rd.Dial("tcp", host)
if err != nil {
return err
}
defer c.Close()

_, err = c.Do("CONFIG", "SET", "requirepass", pass)
return err
}

// resetPassword changes the password to the redis DB.
func resetPassword(host, currentPass string) error {
c, err := rd.Dial("tcp", host)
if err != nil {
return err
}
defer c.Close()

_, err = c.Do("AUTH", currentPass)
if err != nil {
return err
}

_, err = c.Do("CONFIG", "SET", "requirepass", "")
return err
}

// writeToRedis will write to the default DB 0.
func writeToRedis(host string) error {
c, err := rd.Dial("tcp", host)
if err != nil {
return err
}
defer c.Close()

_, err = c.Do("SET", "foo", "bar")
return err
}

func getConfig(password string) map[string]interface{} {
func getConfig() map[string]interface{} {
return map[string]interface{}{
"module": "redis",
"metricsets": []string{"info"},
"hosts": []string{redisHost},
"password": password,
}
}
6 changes: 3 additions & 3 deletions metricbeat/module/redis/key/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import (
"github.com/elastic/beats/metricbeat/mb"
)

func eventMapping(r mb.ReporterV2, keyspace uint, info map[string]interface{}) {
func eventMapping(keyspace uint, info map[string]interface{}) mb.Event {
info["id"] = fmt.Sprintf("%d:%s", keyspace, info["name"])
r.Event(mb.Event{
return mb.Event{
MetricSetFields: info,
ModuleFields: common.MapStr{
"keyspace": common.MapStr{
"id": fmt.Sprintf("db%d", keyspace),
},
},
})
}
}
48 changes: 19 additions & 29 deletions metricbeat/module/redis/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
package key

import (
"time"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/redis"

rd "github.com/garyburd/redigo/redis"
)

var (
Expand All @@ -40,8 +38,7 @@ func init() {

// MetricSet for fetching Redis server information and statistics.
type MetricSet struct {
mb.BaseMetricSet
pool *rd.Pool
*redis.MetricSet
patterns []KeyPattern
}

Expand All @@ -54,34 +51,28 @@ type KeyPattern struct {

// New creates new instance of MetricSet
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Unpack additional configuration options.
config := struct {
IdleTimeout time.Duration `config:"idle_timeout"`
Network string `config:"network"`
MaxConn int `config:"maxconn" validate:"min=1"`
Password string `config:"password"`
Patterns []KeyPattern `config:"key.patterns" validate:"nonzero,required"`
}{
Network: "tcp",
MaxConn: 10,
Password: "",
}
Patterns []KeyPattern `config:"key.patterns" validate:"nonzero,required"`
}{}
err := base.Module().UnpackConfig(&config)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to read configuration for 'key' metricset")
}

ms, err := redis.NewMetricSet(base)
if err != nil {
return nil, errors.Wrap(err, "failed to create 'key' metricset")
}

return &MetricSet{
BaseMetricSet: base,
pool: redis.CreatePool(base.Host(), config.Password, config.Network,
config.MaxConn, config.IdleTimeout, base.Module().Config().Timeout),
patterns: config.Patterns,
MetricSet: ms,
patterns: config.Patterns,
}, nil
}

// Fetch fetches information from Redis keys
func (m *MetricSet) Fetch(r mb.ReporterV2) {
conn := m.pool.Get()
conn := m.Connection()
for _, p := range m.patterns {
if err := redis.Select(conn, p.Keyspace); err != nil {
logp.Err("Failed to select keyspace %d: %s", p.Keyspace, err)
Expand All @@ -90,7 +81,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {

keys, err := redis.FetchKeys(conn, p.Pattern, p.Limit)
if err != nil {
logp.Err("Failed to fetch list of keys in keyspace %d with pattern '%s': %s", p.Keyspace, p.Pattern, err)
logp.Err("Failed to list keys in keyspace %d with pattern '%s': %s", p.Keyspace, p.Pattern, err)
continue
}
if p.Limit > 0 && len(keys) > int(p.Limit) {
Expand All @@ -104,12 +95,11 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
logp.Err("Failed to fetch key info for key %s in keyspace %d", key, p.Keyspace)
continue
}
eventMapping(r, p.Keyspace, keyInfo)
event := eventMapping(p.Keyspace, keyInfo)
if !r.Event(event) {
debugf("Failed to report event, interrupting Fetch")
return
}
}
}
}

// Close connections
func (m *MetricSet) Close() error {
return m.pool.Close()
}
14 changes: 9 additions & 5 deletions metricbeat/module/redis/keyspace/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,22 @@ import (
"github.com/elastic/beats/libbeat/common"
s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstrstr"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/redis"
)

// Map data to MapStr
func eventsMapping(info map[string]string) []common.MapStr {
events := []common.MapStr{}
func eventsMapping(r mb.ReporterV2, info map[string]string) {
for key, space := range getKeyspaceStats(info) {
space["id"] = key
events = append(events, space)
event := mb.Event{
MetricSetFields: space,
}
if !r.Event(event) {
debugf("Failed to report event, interrupting Fetch")
return
}
}

return events
}

func getKeyspaceStats(info map[string]string) map[string]common.MapStr {
Expand Down
Loading

0 comments on commit cacf231

Please sign in to comment.