Skip to content

Commit

Permalink
Add Beanstalkd input plugin (#4272)
Browse files Browse the repository at this point in the history
  • Loading branch information
44px authored and danielnelson committed Sep 10, 2018
1 parent 710c101 commit 69100f6
Show file tree
Hide file tree
Showing 4 changed files with 701 additions and 0 deletions.
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
_ "github.com/influxdata/telegraf/plugins/inputs/aurora"
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
_ "github.com/influxdata/telegraf/plugins/inputs/beanstalkd"
_ "github.com/influxdata/telegraf/plugins/inputs/bond"
_ "github.com/influxdata/telegraf/plugins/inputs/burrow"
_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
Expand Down
98 changes: 98 additions & 0 deletions plugins/inputs/beanstalkd/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Beanstalkd Input Plugin

The `beanstalkd` plugin collects server stats as well as tube stats (reported by `stats` and `stats-tube` commands respectively).

### Configuration:

```toml
[[inputs.beanstalkd]]
## Server to collect data from
server = "localhost:11300"

## List of tubes to gather stats about.
## If no tubes specified then data gathered for each tube on server reported by list-tubes command
tubes = ["notifications"]
```

### Metrics:

Please see the [Beanstalk Protocol doc](https://raw.githubusercontent.com/kr/beanstalkd/master/doc/protocol.txt) for detailed explanation of `stats` and `stats-tube` commands output.

`beanstalkd_overview` – statistical information about the system as a whole
- fields
- cmd_delete
- cmd_pause_tube
- current_jobs_buried
- current_jobs_delayed
- current_jobs_ready
- current_jobs_reserved
- current_jobs_urgent
- current_using
- current_waiting
- current_watching
- pause
- pause_time_left
- total_jobs
- tags
- name
- server (address taken from config)

`beanstalkd_tube` – statistical information about the specified tube
- fields
- binlog_current_index
- binlog_max_size
- binlog_oldest_index
- binlog_records_migrated
- binlog_records_written
- cmd_bury
- cmd_delete
- cmd_ignore
- cmd_kick
- cmd_list_tube_used
- cmd_list_tubes
- cmd_list_tubes_watched
- cmd_pause_tube
- cmd_peek
- cmd_peek_buried
- cmd_peek_delayed
- cmd_peek_ready
- cmd_put
- cmd_release
- cmd_reserve
- cmd_reserve_with_timeout
- cmd_stats
- cmd_stats_job
- cmd_stats_tube
- cmd_touch
- cmd_use
- cmd_watch
- current_connections
- current_jobs_buried
- current_jobs_delayed
- current_jobs_ready
- current_jobs_reserved
- current_jobs_urgent
- current_producers
- current_tubes
- current_waiting
- current_workers
- job_timeouts
- max_job_size
- pid
- rusage_stime
- rusage_utime
- total_connections
- total_jobs
- uptime
- tags
- hostname
- id
- server (address taken from config)
- version

### Example Output:
```
beanstalkd_overview,host=server.local,hostname=a2ab22ed12e0,id=232485800aa11b24,server=localhost:11300,version=1.10 cmd_stats_tube=29482i,current_jobs_delayed=0i,current_jobs_urgent=6i,cmd_kick=0i,cmd_stats=7378i,cmd_stats_job=0i,current_waiting=0i,max_job_size=65535i,pid=6i,cmd_bury=0i,cmd_reserve_with_timeout=0i,cmd_touch=0i,current_connections=1i,current_jobs_ready=6i,current_producers=0i,cmd_delete=0i,cmd_list_tubes=7369i,cmd_peek_ready=0i,cmd_put=6i,cmd_use=3i,cmd_watch=0i,current_jobs_reserved=0i,rusage_stime=6.07,cmd_list_tubes_watched=0i,cmd_pause_tube=0i,total_jobs=6i,binlog_records_migrated=0i,cmd_list_tube_used=0i,cmd_peek_delayed=0i,cmd_release=0i,current_jobs_buried=0i,job_timeouts=0i,binlog_current_index=0i,binlog_max_size=10485760i,total_connections=7378i,cmd_peek_buried=0i,cmd_reserve=0i,current_tubes=4i,binlog_records_written=0i,cmd_peek=0i,rusage_utime=1.13,uptime=7099i,binlog_oldest_index=0i,current_workers=0i,cmd_ignore=0i 1528801650000000000
beanstalkd_tube,host=server.local,name=notifications,server=localhost:11300 pause_time_left=0i,current_jobs_buried=0i,current_jobs_delayed=0i,current_jobs_reserved=0i,current_using=0i,current_waiting=0i,pause=0i,total_jobs=3i,cmd_delete=0i,cmd_pause_tube=0i,current_jobs_ready=3i,current_jobs_urgent=3i,current_watching=0i 1528801650000000000
```
270 changes: 270 additions & 0 deletions plugins/inputs/beanstalkd/beanstalkd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
package beanstalkd

import (
"fmt"
"io"
"net/textproto"
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"gopkg.in/yaml.v2"
)

const sampleConfig = `
## Server to collect data from
server = "localhost:11300"
## List of tubes to gather stats about.
## If no tubes specified then data gathered for each tube on server reported by list-tubes command
tubes = ["notifications"]
`

type Beanstalkd struct {
Server string `toml:"server"`
Tubes []string `toml:"tubes"`
}

func (b *Beanstalkd) Description() string {
return "Collects Beanstalkd server and tubes stats"
}

func (b *Beanstalkd) SampleConfig() string {
return sampleConfig
}

func (b *Beanstalkd) Gather(acc telegraf.Accumulator) error {
connection, err := textproto.Dial("tcp", b.Server)
if err != nil {
return err
}
defer connection.Close()

tubes := b.Tubes
if len(tubes) == 0 {
err = runQuery(connection, "list-tubes", &tubes)
if err != nil {
acc.AddError(err)
}
}

var wg sync.WaitGroup

wg.Add(1)
go func() {
err := b.gatherServerStats(connection, acc)
if err != nil {
acc.AddError(err)
}
wg.Done()
}()

for _, tube := range tubes {
wg.Add(1)
go func(tube string) {
b.gatherTubeStats(connection, tube, acc)
wg.Done()
}(tube)
}

wg.Wait()

return nil
}

func (b *Beanstalkd) gatherServerStats(connection *textproto.Conn, acc telegraf.Accumulator) error {
stats := new(statsResponse)
if err := runQuery(connection, "stats", stats); err != nil {
return err
}

acc.AddFields("beanstalkd_overview",
map[string]interface{}{
"binlog_current_index": stats.BinlogCurrentIndex,
"binlog_max_size": stats.BinlogMaxSize,
"binlog_oldest_index": stats.BinlogOldestIndex,
"binlog_records_migrated": stats.BinlogRecordsMigrated,
"binlog_records_written": stats.BinlogRecordsWritten,
"cmd_bury": stats.CmdBury,
"cmd_delete": stats.CmdDelete,
"cmd_ignore": stats.CmdIgnore,
"cmd_kick": stats.CmdKick,
"cmd_list_tube_used": stats.CmdListTubeUsed,
"cmd_list_tubes": stats.CmdListTubes,
"cmd_list_tubes_watched": stats.CmdListTubesWatched,
"cmd_pause_tube": stats.CmdPauseTube,
"cmd_peek": stats.CmdPeek,
"cmd_peek_buried": stats.CmdPeekBuried,
"cmd_peek_delayed": stats.CmdPeekDelayed,
"cmd_peek_ready": stats.CmdPeekReady,
"cmd_put": stats.CmdPut,
"cmd_release": stats.CmdRelease,
"cmd_reserve": stats.CmdReserve,
"cmd_reserve_with_timeout": stats.CmdReserveWithTimeout,
"cmd_stats": stats.CmdStats,
"cmd_stats_job": stats.CmdStatsJob,
"cmd_stats_tube": stats.CmdStatsTube,
"cmd_touch": stats.CmdTouch,
"cmd_use": stats.CmdUse,
"cmd_watch": stats.CmdWatch,
"current_connections": stats.CurrentConnections,
"current_jobs_buried": stats.CurrentJobsBuried,
"current_jobs_delayed": stats.CurrentJobsDelayed,
"current_jobs_ready": stats.CurrentJobsReady,
"current_jobs_reserved": stats.CurrentJobsReserved,
"current_jobs_urgent": stats.CurrentJobsUrgent,
"current_producers": stats.CurrentProducers,
"current_tubes": stats.CurrentTubes,
"current_waiting": stats.CurrentWaiting,
"current_workers": stats.CurrentWorkers,
"job_timeouts": stats.JobTimeouts,
"max_job_size": stats.MaxJobSize,
"pid": stats.Pid,
"rusage_stime": stats.RusageStime,
"rusage_utime": stats.RusageUtime,
"total_connections": stats.TotalConnections,
"total_jobs": stats.TotalJobs,
"uptime": stats.Uptime,
},
map[string]string{
"hostname": stats.Hostname,
"id": stats.Id,
"server": b.Server,
"version": stats.Version,
},
)

return nil
}

func (b *Beanstalkd) gatherTubeStats(connection *textproto.Conn, tube string, acc telegraf.Accumulator) error {
stats := new(statsTubeResponse)
if err := runQuery(connection, "stats-tube "+tube, stats); err != nil {
return err
}

acc.AddFields("beanstalkd_tube",
map[string]interface{}{
"cmd_delete": stats.CmdDelete,
"cmd_pause_tube": stats.CmdPauseTube,
"current_jobs_buried": stats.CurrentJobsBuried,
"current_jobs_delayed": stats.CurrentJobsDelayed,
"current_jobs_ready": stats.CurrentJobsReady,
"current_jobs_reserved": stats.CurrentJobsReserved,
"current_jobs_urgent": stats.CurrentJobsUrgent,
"current_using": stats.CurrentUsing,
"current_waiting": stats.CurrentWaiting,
"current_watching": stats.CurrentWatching,
"pause": stats.Pause,
"pause_time_left": stats.PauseTimeLeft,
"total_jobs": stats.TotalJobs,
},
map[string]string{
"name": stats.Name,
"server": b.Server,
},
)

return nil
}

func runQuery(connection *textproto.Conn, cmd string, result interface{}) error {
requestId, err := connection.Cmd(cmd)
if err != nil {
return err
}

connection.StartResponse(requestId)
defer connection.EndResponse(requestId)

status, err := connection.ReadLine()
if err != nil {
return err
}

size := 0
if _, err = fmt.Sscanf(status, "OK %d", &size); err != nil {
return err
}

body := make([]byte, size+2)
if _, err = io.ReadFull(connection.R, body); err != nil {
return err
}

return yaml.Unmarshal(body, result)
}

func init() {
inputs.Add("beanstalkd", func() telegraf.Input {
return &Beanstalkd{}
})
}

type statsResponse struct {
BinlogCurrentIndex int `yaml:"binlog-current-index"`
BinlogMaxSize int `yaml:"binlog-max-size"`
BinlogOldestIndex int `yaml:"binlog-oldest-index"`
BinlogRecordsMigrated int `yaml:"binlog-records-migrated"`
BinlogRecordsWritten int `yaml:"binlog-records-written"`
CmdBury int `yaml:"cmd-bury"`
CmdDelete int `yaml:"cmd-delete"`
CmdIgnore int `yaml:"cmd-ignore"`
CmdKick int `yaml:"cmd-kick"`
CmdListTubeUsed int `yaml:"cmd-list-tube-used"`
CmdListTubes int `yaml:"cmd-list-tubes"`
CmdListTubesWatched int `yaml:"cmd-list-tubes-watched"`
CmdPauseTube int `yaml:"cmd-pause-tube"`
CmdPeek int `yaml:"cmd-peek"`
CmdPeekBuried int `yaml:"cmd-peek-buried"`
CmdPeekDelayed int `yaml:"cmd-peek-delayed"`
CmdPeekReady int `yaml:"cmd-peek-ready"`
CmdPut int `yaml:"cmd-put"`
CmdRelease int `yaml:"cmd-release"`
CmdReserve int `yaml:"cmd-reserve"`
CmdReserveWithTimeout int `yaml:"cmd-reserve-with-timeout"`
CmdStats int `yaml:"cmd-stats"`
CmdStatsJob int `yaml:"cmd-stats-job"`
CmdStatsTube int `yaml:"cmd-stats-tube"`
CmdTouch int `yaml:"cmd-touch"`
CmdUse int `yaml:"cmd-use"`
CmdWatch int `yaml:"cmd-watch"`
CurrentConnections int `yaml:"current-connections"`
CurrentJobsBuried int `yaml:"current-jobs-buried"`
CurrentJobsDelayed int `yaml:"current-jobs-delayed"`
CurrentJobsReady int `yaml:"current-jobs-ready"`
CurrentJobsReserved int `yaml:"current-jobs-reserved"`
CurrentJobsUrgent int `yaml:"current-jobs-urgent"`
CurrentProducers int `yaml:"current-producers"`
CurrentTubes int `yaml:"current-tubes"`
CurrentWaiting int `yaml:"current-waiting"`
CurrentWorkers int `yaml:"current-workers"`
Hostname string `yaml:"hostname"`
Id string `yaml:"id"`
JobTimeouts int `yaml:"job-timeouts"`
MaxJobSize int `yaml:"max-job-size"`
Pid int `yaml:"pid"`
RusageStime float64 `yaml:"rusage-stime"`
RusageUtime float64 `yaml:"rusage-utime"`
TotalConnections int `yaml:"total-connections"`
TotalJobs int `yaml:"total-jobs"`
Uptime int `yaml:"uptime"`
Version string `yaml:"version"`
}

type statsTubeResponse struct {
CmdDelete int `yaml:"cmd-delete"`
CmdPauseTube int `yaml:"cmd-pause-tube"`
CurrentJobsBuried int `yaml:"current-jobs-buried"`
CurrentJobsDelayed int `yaml:"current-jobs-delayed"`
CurrentJobsReady int `yaml:"current-jobs-ready"`
CurrentJobsReserved int `yaml:"current-jobs-reserved"`
CurrentJobsUrgent int `yaml:"current-jobs-urgent"`
CurrentUsing int `yaml:"current-using"`
CurrentWaiting int `yaml:"current-waiting"`
CurrentWatching int `yaml:"current-watching"`
Name string `yaml:"name"`
Pause int `yaml:"pause"`
PauseTimeLeft int `yaml:"pause-time-left"`
TotalJobs int `yaml:"total-jobs"`
}
Loading

0 comments on commit 69100f6

Please sign in to comment.