Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metricbeat redis refactors #9907

Merged
merged 11 commits into from
Jan 14, 2019
8 changes: 5 additions & 3 deletions metricbeat/module/redis/info/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package info

import (
"github.com/elastic/beats/libbeat/common"
s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstrstr"
"github.com/elastic/beats/metricbeat/mb"
)

var (
Expand Down Expand Up @@ -237,12 +237,14 @@ var (
)

// Map data to MapStr
func eventMapping(info map[string]string) common.MapStr {
func eventMapping(r mb.ReporterV2, info map[string]string) {
// Full mapping from info
source := map[string]interface{}{}
for key, val := range info {
source[key] = val
}
data, _ := schema.Apply(source)
return data
r.Event(mb.Event{
sayden marked this conversation as resolved.
Show resolved Hide resolved
MetricSetFields: data,
})
}
45 changes: 14 additions & 31 deletions metricbeat/module/redis/info/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ package info

import (
"strconv"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/redis"

rd "github.com/garyburd/redigo/redis"
)

var (
Expand All @@ -43,41 +41,25 @@ func init() {

// MetricSet for fetching Redis server information and statistics.
type MetricSet struct {
mb.BaseMetricSet
pool *rd.Pool
*redis.MetricSet
}

// New creates new instance of MetricSet
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Unpack additional configuration options.
config := struct {
IdleTimeout time.Duration `config:"idle_timeout"`
Network string `config:"network"`
MaxConn int `config:"maxconn" validate:"min=1"`
Password string `config:"password"`
}{
Network: "tcp",
MaxConn: 10,
Password: "",
}
err := base.Module().UnpackConfig(&config)
ms, err := redis.NewMetricSet(base)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to create 'info' metricset")
}

return &MetricSet{
BaseMetricSet: base,
pool: redis.CreatePool(base.Host(), config.Password, config.Network,
config.MaxConn, config.IdleTimeout, base.Module().Config().Timeout),
}, nil
return &MetricSet{ms}, nil
}

// Fetch fetches metrics from Redis by issuing the INFO command.
func (m *MetricSet) Fetch() (common.MapStr, error) {
func (m *MetricSet) Fetch(r mb.ReporterV2) {
// Fetch default INFO.
info, err := redis.FetchRedisInfo("default", m.pool.Get())
info, err := redis.FetchRedisInfo("default", m.Connection())
if err != nil {
return nil, err
logp.Err("Failed to fetch redis info: %s", err)
return
}

// In 5.0 some fields are renamed, maintain both names, old ones will be deprecated
Expand All @@ -95,12 +77,13 @@ func (m *MetricSet) Fetch() (common.MapStr, error) {
}
}

slowLogLength, err := redis.FetchSlowLogLength(m.pool.Get())
slowLogLength, err := redis.FetchSlowLogLength(m.Connection())
if err != nil {
return nil, err
logp.Err("Failed to fetch slow log length: %s", err)
return
}
info["slowlog_len"] = strconv.FormatInt(slowLogLength, 10)

debugf("Redis INFO from %s: %+v", m.Host(), info)
return eventMapping(info), nil
eventMapping(r, info)
}
97 changes: 7 additions & 90 deletions metricbeat/module/redis/info/info_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,22 @@ import (
mbtest "github.com/elastic/beats/metricbeat/mb/testing"
"github.com/elastic/beats/metricbeat/module/redis"

rd "github.com/garyburd/redigo/redis"
"github.com/stretchr/testify/assert"
)

const (
password = "foobared"
)

var redisHost = redis.GetRedisEnvHost() + ":" + redis.GetRedisEnvPort()

func TestFetch(t *testing.T) {
compose.EnsureUp(t, "redis")

f := mbtest.NewEventFetcher(t, getConfig(""))
event, err := f.Fetch()
ms := mbtest.NewReportingMetricSetV2(t, getConfig())
events, err := mbtest.ReportingFetchV2(ms)
if err != nil {
t.Fatal("fetch", err)
}
event := events[0].MetricSetFields

t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event)
t.Logf("%s/%s event: %+v", ms.Module().Name(), ms.Name(), event)

// Check fields
assert.Equal(t, 9, len(event))
Expand All @@ -57,96 +53,17 @@ func TestFetch(t *testing.T) {
func TestData(t *testing.T) {
compose.EnsureUp(t, "redis")

f := mbtest.NewEventFetcher(t, getConfig(""))

err := mbtest.WriteEvent(f, t)
ms := mbtest.NewReportingMetricSetV2(t, getConfig())
err := mbtest.WriteEventsReporterV2(ms, t, "")
if err != nil {
t.Fatal("write", err)
}
}

func TestPasswords(t *testing.T) {
compose.EnsureUp(t, "redis")

// Add password and ensure it gets reset
defer func() {
err := resetPassword(redisHost, password)
if err != nil {
t.Fatal("resetting password", err)
}
}()

err := addPassword(redisHost, password)
if err != nil {
t.Fatal("adding password", err)
}

// Test Fetch metrics with missing password
f := mbtest.NewEventFetcher(t, getConfig(""))
_, err = f.Fetch()
if assert.Error(t, err, "missing password") {
assert.Contains(t, err, "NOAUTH Authentication required.")
}

// Config redis and metricset with an invalid password
f = mbtest.NewEventFetcher(t, getConfig("blah"))
_, err = f.Fetch()
if assert.Error(t, err, "invalid password") {
assert.Contains(t, err, "ERR invalid password")
}

// Config redis and metricset with a valid password
f = mbtest.NewEventFetcher(t, getConfig(password))
_, err = f.Fetch()
assert.NoError(t, err, "valid password")
}

// addPassword will add a password to redis.
func addPassword(host, pass string) error {
c, err := rd.Dial("tcp", host)
if err != nil {
return err
}
defer c.Close()

_, err = c.Do("CONFIG", "SET", "requirepass", pass)
return err
}

// resetPassword changes the password to the redis DB.
func resetPassword(host, currentPass string) error {
c, err := rd.Dial("tcp", host)
if err != nil {
return err
}
defer c.Close()

_, err = c.Do("AUTH", currentPass)
if err != nil {
return err
}

_, err = c.Do("CONFIG", "SET", "requirepass", "")
return err
}

// writeToRedis will write to the default DB 0.
func writeToRedis(host string) error {
c, err := rd.Dial("tcp", host)
if err != nil {
return err
}
defer c.Close()

_, err = c.Do("SET", "foo", "bar")
return err
}

func getConfig(password string) map[string]interface{} {
func getConfig() map[string]interface{} {
return map[string]interface{}{
"module": "redis",
"metricsets": []string{"info"},
"hosts": []string{redisHost},
"password": password,
}
}
6 changes: 3 additions & 3 deletions metricbeat/module/redis/key/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import (
"github.com/elastic/beats/metricbeat/mb"
)

func eventMapping(r mb.ReporterV2, keyspace uint, info map[string]interface{}) {
func eventMapping(keyspace uint, info map[string]interface{}) mb.Event {
info["id"] = fmt.Sprintf("%d:%s", keyspace, info["name"])
r.Event(mb.Event{
return mb.Event{
MetricSetFields: info,
ModuleFields: common.MapStr{
"keyspace": common.MapStr{
"id": fmt.Sprintf("db%d", keyspace),
},
},
})
}
}
48 changes: 19 additions & 29 deletions metricbeat/module/redis/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
package key

import (
"time"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/redis"

rd "github.com/garyburd/redigo/redis"
)

var (
Expand All @@ -40,8 +38,7 @@ func init() {

// MetricSet for fetching Redis server information and statistics.
type MetricSet struct {
mb.BaseMetricSet
pool *rd.Pool
*redis.MetricSet
patterns []KeyPattern
}

Expand All @@ -54,34 +51,28 @@ type KeyPattern struct {

// New creates new instance of MetricSet
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Unpack additional configuration options.
config := struct {
IdleTimeout time.Duration `config:"idle_timeout"`
Network string `config:"network"`
MaxConn int `config:"maxconn" validate:"min=1"`
Password string `config:"password"`
Patterns []KeyPattern `config:"key.patterns" validate:"nonzero,required"`
}{
Network: "tcp",
MaxConn: 10,
Password: "",
}
Patterns []KeyPattern `config:"key.patterns" validate:"nonzero,required"`
}{}
err := base.Module().UnpackConfig(&config)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to read configuration for 'key' metricset")
}

ms, err := redis.NewMetricSet(base)
if err != nil {
return nil, errors.Wrap(err, "failed to create 'key' metricset")
}

return &MetricSet{
BaseMetricSet: base,
pool: redis.CreatePool(base.Host(), config.Password, config.Network,
config.MaxConn, config.IdleTimeout, base.Module().Config().Timeout),
patterns: config.Patterns,
MetricSet: ms,
patterns: config.Patterns,
}, nil
}

// Fetch fetches information from Redis keys
func (m *MetricSet) Fetch(r mb.ReporterV2) {
conn := m.pool.Get()
conn := m.Connection()
for _, p := range m.patterns {
if err := redis.Select(conn, p.Keyspace); err != nil {
logp.Err("Failed to select keyspace %d: %s", p.Keyspace, err)
Expand All @@ -90,7 +81,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {

keys, err := redis.FetchKeys(conn, p.Pattern, p.Limit)
if err != nil {
logp.Err("Failed to fetch list of keys in keyspace %d with pattern '%s': %s", p.Keyspace, p.Pattern, err)
logp.Err("Failed to list keys in keyspace %d with pattern '%s': %s", p.Keyspace, p.Pattern, err)
continue
}
if p.Limit > 0 && len(keys) > int(p.Limit) {
Expand All @@ -104,12 +95,11 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
logp.Err("Failed to fetch key info for key %s in keyspace %d", key, p.Keyspace)
continue
}
eventMapping(r, p.Keyspace, keyInfo)
event := eventMapping(p.Keyspace, keyInfo)
if !r.Event(event) {
debugf("Failed to report event, interrupting Fetch")
return
}
}
}
}

// Close connections
func (m *MetricSet) Close() error {
return m.pool.Close()
}
14 changes: 9 additions & 5 deletions metricbeat/module/redis/keyspace/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,22 @@ import (
"github.com/elastic/beats/libbeat/common"
s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstrstr"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/redis"
)

// Map data to MapStr
func eventsMapping(info map[string]string) []common.MapStr {
events := []common.MapStr{}
func eventsMapping(r mb.ReporterV2, info map[string]string) {
for key, space := range getKeyspaceStats(info) {
space["id"] = key
events = append(events, space)
event := mb.Event{
MetricSetFields: space,
}
if !r.Event(event) {
debugf("Failed to report event, interrupting Fetch")
return
}
}

return events
}

func getKeyspaceStats(info map[string]string) map[string]common.MapStr {
Expand Down
Loading