Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filebeat Redis prospector type #4180

Merged
merged 1 commit into from
May 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d
- Add the option to write the generated Elasticsearch mapping template into a file. {pull}4323[4323]

*Filebeat*
- Add experimental Redis slow log prospector type. {pull}4180[4180]

*Heartbeat*

Expand Down
21 changes: 21 additions & 0 deletions filebeat/_meta/common.full.p2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,27 @@ filebeat.prospectors:
# Configuration to use stdin input
#- type: stdin

#------------------------- Redis slowlog prospector ---------------------------
# Experimental: Config options for the redis slow log prospector
#- input_type: redis
#hosts: ["localhost:6379"]
#username:
#password:
#enabled: false
#scan_frequency: 10s

# Timeout after which time the prospector should return an error
#timeout: 1s

# Network type to be used for redis connection. Default: tcp
#network: tcp

# Max number of concurrent connections. Default: 10
#maxconn: 10

# Redis AUTH password. Empty by default.
#password: foobared

#========================= Filebeat global options ============================

# Event count spool threshold - forces network flush if exceeded
Expand Down
5 changes: 5 additions & 0 deletions filebeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ services:
- proxy_dep
env_file:
- ${PWD}/build/test.env
- ${PWD}/prospector/redis/_meta/env
working_dir: /go/src/github.com/elastic/beats/filebeat
volumes:
- ${PWD}/..:/go/src/github.com/elastic/beats/
Expand All @@ -17,8 +18,12 @@ services:
image: busybox
depends_on:
elasticsearch: { condition: service_healthy }
redis: { condition: service_healthy }

elasticsearch:
extends:
file: ../testing/environments/${TESTING_ENVIRONMENT}.yml
service: elasticsearch

redis:
build: ${PWD}/prospector/redis/_meta
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ One of the following input types:

* log: Reads every line of the log file (default)
* stdin: Reads the standard in
* redis: Reads slow log entries from redis (experimental)

The value that you specify here is used as the `type` for each event published to Logstash and Elasticsearch.

Expand Down
21 changes: 21 additions & 0 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,27 @@ filebeat.prospectors:
# Configuration to use stdin input
#- type: stdin

#------------------------- Redis slowlog prospector ---------------------------
# Experimental: Config options for the redis slow log prospector
#- input_type: redis
#hosts: ["localhost:6379"]
#username:
#password:
#enabled: false
#scan_frequency: 10s

# Timeout after which time the prospector should return an error
#timeout: 1s

# Network type to be used for redis connection. Default: tcp
#network: tcp

# Max number of concurrent connections. Default: 10
#maxconn: 10

# Redis AUTH password. Empty by default.
#password: foobared

#========================= Filebeat global options ============================

# Event count spool threshold - forces network flush if exceeded
Expand Down
4 changes: 3 additions & 1 deletion filebeat/harvester/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import "github.com/elastic/beats/libbeat/common/match"
const (
LogType = "log"
StdinType = "stdin"
RedisType = "redis"
)

// ValidType is a list of all valid input types
// ValidType of valid input types
var ValidType = map[string]struct{}{
StdinType: {},
LogType: {},
RedisType: {},
}

// MatchAny checks if the text matches any of the regular expressions
Expand Down
3 changes: 3 additions & 0 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/prospector/log"
"github.com/elastic/beats/filebeat/prospector/redis"
"github.com/elastic/beats/filebeat/prospector/stdin"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -72,6 +73,8 @@ func (p *Prospector) initProspectorer(outlet channel.Outleter, states []file.Sta
switch p.config.Type {
case harvester.StdinType:
prospectorer, err = stdin.NewProspector(config, outlet)
case harvester.RedisType:
prospectorer, err = redis.NewProspector(config, outlet)
case harvester.LogType:
prospectorer, err = log.NewProspector(config, states, outlet, p.done)
default:
Expand Down
2 changes: 2 additions & 0 deletions filebeat/prospector/redis/_meta/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FROM redis:3.2.4-alpine
HEALTHCHECK CMD nc -z localhost 6379
2 changes: 2 additions & 0 deletions filebeat/prospector/redis/_meta/env
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
REDIS_HOST=redis
REDIS_PORT=6379
27 changes: 27 additions & 0 deletions filebeat/prospector/redis/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package redis

import (
"time"

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester"
)

var defaultConfig = config{

ForwarderConfig: harvester.ForwarderConfig{
Type: cfg.DefaultType,
},
Network: "tcp",
MaxConn: 10,
Password: "",
}

type config struct {
harvester.ForwarderConfig `config:",inline"`
Hosts []string `config:"hosts" validate:"required"`
IdleTimeout time.Duration `config:"idle_timeout"`
Network string `config:"network"`
MaxConn int `config:"maxconn" validate:"min=1"`
Password string `config:"password"`
}
14 changes: 14 additions & 0 deletions filebeat/prospector/redis/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Package redis package contains prospector and harvester to read the redis slow log
//
// The redis slow log is stored in memory. The slow log can be activate on the redis command line as following:
//
// CONFIG SET slowlog-log-slower-than 2000000
//
// This sets the value of the slow log to 2000000 micro seconds (2s). All queries taking longer will be reported.
//
// As the slow log is in memory, it can be configured how many items it consists:
//
// CONFIG SET slowlog-max-len 200
//
// This sets the size of the slow log to 200 entries. In case the slow log is full, older entries are dropped.
package redis
155 changes: 155 additions & 0 deletions filebeat/prospector/redis/harvester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package redis

import (
"fmt"
"time"

"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"

"strings"

"github.com/elastic/beats/filebeat/harvester"
rd "github.com/garyburd/redigo/redis"
"github.com/satori/go.uuid"
)

// Harvester contains all redis harvester data
type Harvester struct {
id uuid.UUID
done chan struct{}
conn rd.Conn
forwarder *harvester.Forwarder
}

// log contains all data related to one slowlog entry
//
// The data is in the following format:
// 1) (integer) 13
// 2) (integer) 1309448128
// 3) (integer) 30
// 4) 1) "slowlog"
// 2) "get"
// 3) "100"
//
type log struct {
id int64
timestamp int64
duration int
cmd string
key string
args []string
}

// NewHarvester creates a new harvester with the given connection
func NewHarvester(conn rd.Conn) *Harvester {
return &Harvester{
id: uuid.NewV4(),
done: make(chan struct{}),
conn: conn,
}
}

// Run starts a new redis harvester
func (h *Harvester) Run() error {
defer h.conn.Close()

select {
case <-h.done:
return nil
default:
}
// Writes Slowlog get and slowlog reset both to the buffer so they are executed together
h.conn.Send("SLOWLOG", "GET")
h.conn.Send("SLOWLOG", "RESET")

// Flush the buffer to execute both commands and receive the reply from SLOWLOG GET
h.conn.Flush()

// Receives first reply from redis which is the one from GET
logs, err := rd.Values(h.conn.Receive())
if err != nil {
return fmt.Errorf("error receiving slowlog data: %s", err)
}

// Read reply from RESET
_, err = h.conn.Receive()
if err != nil {
return fmt.Errorf("error receiving reset data: %s", err)
}

for _, item := range logs {
// Stopping here means some of the slowlog events are lost!
select {
case <-h.done:
return nil
default:
}
entry, err := rd.Values(item, nil)
if err != nil {
logp.Err("Error loading slowlog values: %s", err)
continue
}

var log log
var args []string
rd.Scan(entry, &log.id, &log.timestamp, &log.duration, &args)

// This splits up the args into cmd, key, args.
argsLen := len(args)
if argsLen > 0 {
log.cmd = args[0]
}
if argsLen > 1 {
log.key = args[1]
}

// This could contain confidential data, processors should be used to drop it if needed
if argsLen > 2 {
log.args = args[2:]
}

data := util.NewData()
subEvent := common.MapStr{
"id": log.id,
"cmd": log.cmd,
"key": log.key,
"duration": common.MapStr{
"us": log.duration,
},
}

if log.args != nil {
subEvent["args"] = log.args

}

data.Event = common.MapStr{
"@timestamp": common.Time(time.Unix(log.timestamp, 0).UTC()),
"message": strings.Join(args, " "),
"redis": common.MapStr{
"slowlog": subEvent,
},
"beat": common.MapStr{
"read_timestamp": common.Time(time.Now()),
},
"prospector": common.MapStr{
"type": "redis",
},
}

h.forwarder.Send(data)
}
return nil
}

// Stop stopps the harvester
func (h *Harvester) Stop() {
close(h.done)
}

// ID returns the unique harvester ID
func (h *Harvester) ID() uuid.UUID {
return h.id
}
Loading