From 4f1771130935d0490cfcb35990638c97537105e8 Mon Sep 17 00:00:00 2001 From: Alexander Shepelin Date: Mon, 10 Sep 2018 21:51:03 +0300 Subject: [PATCH] Add Beanstalkd input plugin (#4272) --- plugins/inputs/all/all.go | 1 + plugins/inputs/beanstalkd/README.md | 98 ++++++ plugins/inputs/beanstalkd/beanstalkd.go | 270 +++++++++++++++ plugins/inputs/beanstalkd/beanstalkd_test.go | 332 +++++++++++++++++++ 4 files changed, 701 insertions(+) create mode 100644 plugins/inputs/beanstalkd/README.md create mode 100644 plugins/inputs/beanstalkd/beanstalkd.go create mode 100644 plugins/inputs/beanstalkd/beanstalkd_test.go diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 7ef9477e1ae35..8e101f755194e 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -7,6 +7,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/apache" _ "github.com/influxdata/telegraf/plugins/inputs/aurora" _ "github.com/influxdata/telegraf/plugins/inputs/bcache" + _ "github.com/influxdata/telegraf/plugins/inputs/beanstalkd" _ "github.com/influxdata/telegraf/plugins/inputs/bond" _ "github.com/influxdata/telegraf/plugins/inputs/burrow" _ "github.com/influxdata/telegraf/plugins/inputs/cassandra" diff --git a/plugins/inputs/beanstalkd/README.md b/plugins/inputs/beanstalkd/README.md new file mode 100644 index 0000000000000..e4fe2203d8d9b --- /dev/null +++ b/plugins/inputs/beanstalkd/README.md @@ -0,0 +1,98 @@ +# Beanstalkd Input Plugin + +The `beanstalkd` plugin collects server stats as well as tube stats (reported by `stats` and `stats-tube` commands respectively). + +### Configuration: + +```toml +[[inputs.beanstalkd]] + ## Server to collect data from + server = "localhost:11300" + + ## List of tubes to gather stats about. + ## If no tubes specified then data gathered for each tube on server reported by list-tubes command + tubes = ["notifications"] +``` + +### Metrics: + +Please see the [Beanstalk Protocol doc](https://raw.githubusercontent.com/kr/beanstalkd/master/doc/protocol.txt) for detailed explanation of `stats` and `stats-tube` commands output. + +`beanstalkd_overview` – statistical information about the system as a whole +- fields + - cmd_delete + - cmd_pause_tube + - current_jobs_buried + - current_jobs_delayed + - current_jobs_ready + - current_jobs_reserved + - current_jobs_urgent + - current_using + - current_waiting + - current_watching + - pause + - pause_time_left + - total_jobs +- tags + - name + - server (address taken from config) + +`beanstalkd_tube` – statistical information about the specified tube +- fields + - binlog_current_index + - binlog_max_size + - binlog_oldest_index + - binlog_records_migrated + - binlog_records_written + - cmd_bury + - cmd_delete + - cmd_ignore + - cmd_kick + - cmd_list_tube_used + - cmd_list_tubes + - cmd_list_tubes_watched + - cmd_pause_tube + - cmd_peek + - cmd_peek_buried + - cmd_peek_delayed + - cmd_peek_ready + - cmd_put + - cmd_release + - cmd_reserve + - cmd_reserve_with_timeout + - cmd_stats + - cmd_stats_job + - cmd_stats_tube + - cmd_touch + - cmd_use + - cmd_watch + - current_connections + - current_jobs_buried + - current_jobs_delayed + - current_jobs_ready + - current_jobs_reserved + - current_jobs_urgent + - current_producers + - current_tubes + - current_waiting + - current_workers + - job_timeouts + - max_job_size + - pid + - rusage_stime + - rusage_utime + - total_connections + - total_jobs + - uptime +- tags + - hostname + - id + - server (address taken from config) + - version + +### Example Output: +``` +beanstalkd_overview,host=server.local,hostname=a2ab22ed12e0,id=232485800aa11b24,server=localhost:11300,version=1.10 cmd_stats_tube=29482i,current_jobs_delayed=0i,current_jobs_urgent=6i,cmd_kick=0i,cmd_stats=7378i,cmd_stats_job=0i,current_waiting=0i,max_job_size=65535i,pid=6i,cmd_bury=0i,cmd_reserve_with_timeout=0i,cmd_touch=0i,current_connections=1i,current_jobs_ready=6i,current_producers=0i,cmd_delete=0i,cmd_list_tubes=7369i,cmd_peek_ready=0i,cmd_put=6i,cmd_use=3i,cmd_watch=0i,current_jobs_reserved=0i,rusage_stime=6.07,cmd_list_tubes_watched=0i,cmd_pause_tube=0i,total_jobs=6i,binlog_records_migrated=0i,cmd_list_tube_used=0i,cmd_peek_delayed=0i,cmd_release=0i,current_jobs_buried=0i,job_timeouts=0i,binlog_current_index=0i,binlog_max_size=10485760i,total_connections=7378i,cmd_peek_buried=0i,cmd_reserve=0i,current_tubes=4i,binlog_records_written=0i,cmd_peek=0i,rusage_utime=1.13,uptime=7099i,binlog_oldest_index=0i,current_workers=0i,cmd_ignore=0i 1528801650000000000 + +beanstalkd_tube,host=server.local,name=notifications,server=localhost:11300 pause_time_left=0i,current_jobs_buried=0i,current_jobs_delayed=0i,current_jobs_reserved=0i,current_using=0i,current_waiting=0i,pause=0i,total_jobs=3i,cmd_delete=0i,cmd_pause_tube=0i,current_jobs_ready=3i,current_jobs_urgent=3i,current_watching=0i 1528801650000000000 +``` diff --git a/plugins/inputs/beanstalkd/beanstalkd.go b/plugins/inputs/beanstalkd/beanstalkd.go new file mode 100644 index 0000000000000..932edd301f910 --- /dev/null +++ b/plugins/inputs/beanstalkd/beanstalkd.go @@ -0,0 +1,270 @@ +package beanstalkd + +import ( + "fmt" + "io" + "net/textproto" + "sync" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "gopkg.in/yaml.v2" +) + +const sampleConfig = ` + ## Server to collect data from + server = "localhost:11300" + + ## List of tubes to gather stats about. + ## If no tubes specified then data gathered for each tube on server reported by list-tubes command + tubes = ["notifications"] +` + +type Beanstalkd struct { + Server string `toml:"server"` + Tubes []string `toml:"tubes"` +} + +func (b *Beanstalkd) Description() string { + return "Collects Beanstalkd server and tubes stats" +} + +func (b *Beanstalkd) SampleConfig() string { + return sampleConfig +} + +func (b *Beanstalkd) Gather(acc telegraf.Accumulator) error { + connection, err := textproto.Dial("tcp", b.Server) + if err != nil { + return err + } + defer connection.Close() + + tubes := b.Tubes + if len(tubes) == 0 { + err = runQuery(connection, "list-tubes", &tubes) + if err != nil { + acc.AddError(err) + } + } + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + err := b.gatherServerStats(connection, acc) + if err != nil { + acc.AddError(err) + } + wg.Done() + }() + + for _, tube := range tubes { + wg.Add(1) + go func(tube string) { + b.gatherTubeStats(connection, tube, acc) + wg.Done() + }(tube) + } + + wg.Wait() + + return nil +} + +func (b *Beanstalkd) gatherServerStats(connection *textproto.Conn, acc telegraf.Accumulator) error { + stats := new(statsResponse) + if err := runQuery(connection, "stats", stats); err != nil { + return err + } + + acc.AddFields("beanstalkd_overview", + map[string]interface{}{ + "binlog_current_index": stats.BinlogCurrentIndex, + "binlog_max_size": stats.BinlogMaxSize, + "binlog_oldest_index": stats.BinlogOldestIndex, + "binlog_records_migrated": stats.BinlogRecordsMigrated, + "binlog_records_written": stats.BinlogRecordsWritten, + "cmd_bury": stats.CmdBury, + "cmd_delete": stats.CmdDelete, + "cmd_ignore": stats.CmdIgnore, + "cmd_kick": stats.CmdKick, + "cmd_list_tube_used": stats.CmdListTubeUsed, + "cmd_list_tubes": stats.CmdListTubes, + "cmd_list_tubes_watched": stats.CmdListTubesWatched, + "cmd_pause_tube": stats.CmdPauseTube, + "cmd_peek": stats.CmdPeek, + "cmd_peek_buried": stats.CmdPeekBuried, + "cmd_peek_delayed": stats.CmdPeekDelayed, + "cmd_peek_ready": stats.CmdPeekReady, + "cmd_put": stats.CmdPut, + "cmd_release": stats.CmdRelease, + "cmd_reserve": stats.CmdReserve, + "cmd_reserve_with_timeout": stats.CmdReserveWithTimeout, + "cmd_stats": stats.CmdStats, + "cmd_stats_job": stats.CmdStatsJob, + "cmd_stats_tube": stats.CmdStatsTube, + "cmd_touch": stats.CmdTouch, + "cmd_use": stats.CmdUse, + "cmd_watch": stats.CmdWatch, + "current_connections": stats.CurrentConnections, + "current_jobs_buried": stats.CurrentJobsBuried, + "current_jobs_delayed": stats.CurrentJobsDelayed, + "current_jobs_ready": stats.CurrentJobsReady, + "current_jobs_reserved": stats.CurrentJobsReserved, + "current_jobs_urgent": stats.CurrentJobsUrgent, + "current_producers": stats.CurrentProducers, + "current_tubes": stats.CurrentTubes, + "current_waiting": stats.CurrentWaiting, + "current_workers": stats.CurrentWorkers, + "job_timeouts": stats.JobTimeouts, + "max_job_size": stats.MaxJobSize, + "pid": stats.Pid, + "rusage_stime": stats.RusageStime, + "rusage_utime": stats.RusageUtime, + "total_connections": stats.TotalConnections, + "total_jobs": stats.TotalJobs, + "uptime": stats.Uptime, + }, + map[string]string{ + "hostname": stats.Hostname, + "id": stats.Id, + "server": b.Server, + "version": stats.Version, + }, + ) + + return nil +} + +func (b *Beanstalkd) gatherTubeStats(connection *textproto.Conn, tube string, acc telegraf.Accumulator) error { + stats := new(statsTubeResponse) + if err := runQuery(connection, "stats-tube "+tube, stats); err != nil { + return err + } + + acc.AddFields("beanstalkd_tube", + map[string]interface{}{ + "cmd_delete": stats.CmdDelete, + "cmd_pause_tube": stats.CmdPauseTube, + "current_jobs_buried": stats.CurrentJobsBuried, + "current_jobs_delayed": stats.CurrentJobsDelayed, + "current_jobs_ready": stats.CurrentJobsReady, + "current_jobs_reserved": stats.CurrentJobsReserved, + "current_jobs_urgent": stats.CurrentJobsUrgent, + "current_using": stats.CurrentUsing, + "current_waiting": stats.CurrentWaiting, + "current_watching": stats.CurrentWatching, + "pause": stats.Pause, + "pause_time_left": stats.PauseTimeLeft, + "total_jobs": stats.TotalJobs, + }, + map[string]string{ + "name": stats.Name, + "server": b.Server, + }, + ) + + return nil +} + +func runQuery(connection *textproto.Conn, cmd string, result interface{}) error { + requestId, err := connection.Cmd(cmd) + if err != nil { + return err + } + + connection.StartResponse(requestId) + defer connection.EndResponse(requestId) + + status, err := connection.ReadLine() + if err != nil { + return err + } + + size := 0 + if _, err = fmt.Sscanf(status, "OK %d", &size); err != nil { + return err + } + + body := make([]byte, size+2) + if _, err = io.ReadFull(connection.R, body); err != nil { + return err + } + + return yaml.Unmarshal(body, result) +} + +func init() { + inputs.Add("beanstalkd", func() telegraf.Input { + return &Beanstalkd{} + }) +} + +type statsResponse struct { + BinlogCurrentIndex int `yaml:"binlog-current-index"` + BinlogMaxSize int `yaml:"binlog-max-size"` + BinlogOldestIndex int `yaml:"binlog-oldest-index"` + BinlogRecordsMigrated int `yaml:"binlog-records-migrated"` + BinlogRecordsWritten int `yaml:"binlog-records-written"` + CmdBury int `yaml:"cmd-bury"` + CmdDelete int `yaml:"cmd-delete"` + CmdIgnore int `yaml:"cmd-ignore"` + CmdKick int `yaml:"cmd-kick"` + CmdListTubeUsed int `yaml:"cmd-list-tube-used"` + CmdListTubes int `yaml:"cmd-list-tubes"` + CmdListTubesWatched int `yaml:"cmd-list-tubes-watched"` + CmdPauseTube int `yaml:"cmd-pause-tube"` + CmdPeek int `yaml:"cmd-peek"` + CmdPeekBuried int `yaml:"cmd-peek-buried"` + CmdPeekDelayed int `yaml:"cmd-peek-delayed"` + CmdPeekReady int `yaml:"cmd-peek-ready"` + CmdPut int `yaml:"cmd-put"` + CmdRelease int `yaml:"cmd-release"` + CmdReserve int `yaml:"cmd-reserve"` + CmdReserveWithTimeout int `yaml:"cmd-reserve-with-timeout"` + CmdStats int `yaml:"cmd-stats"` + CmdStatsJob int `yaml:"cmd-stats-job"` + CmdStatsTube int `yaml:"cmd-stats-tube"` + CmdTouch int `yaml:"cmd-touch"` + CmdUse int `yaml:"cmd-use"` + CmdWatch int `yaml:"cmd-watch"` + CurrentConnections int `yaml:"current-connections"` + CurrentJobsBuried int `yaml:"current-jobs-buried"` + CurrentJobsDelayed int `yaml:"current-jobs-delayed"` + CurrentJobsReady int `yaml:"current-jobs-ready"` + CurrentJobsReserved int `yaml:"current-jobs-reserved"` + CurrentJobsUrgent int `yaml:"current-jobs-urgent"` + CurrentProducers int `yaml:"current-producers"` + CurrentTubes int `yaml:"current-tubes"` + CurrentWaiting int `yaml:"current-waiting"` + CurrentWorkers int `yaml:"current-workers"` + Hostname string `yaml:"hostname"` + Id string `yaml:"id"` + JobTimeouts int `yaml:"job-timeouts"` + MaxJobSize int `yaml:"max-job-size"` + Pid int `yaml:"pid"` + RusageStime float64 `yaml:"rusage-stime"` + RusageUtime float64 `yaml:"rusage-utime"` + TotalConnections int `yaml:"total-connections"` + TotalJobs int `yaml:"total-jobs"` + Uptime int `yaml:"uptime"` + Version string `yaml:"version"` +} + +type statsTubeResponse struct { + CmdDelete int `yaml:"cmd-delete"` + CmdPauseTube int `yaml:"cmd-pause-tube"` + CurrentJobsBuried int `yaml:"current-jobs-buried"` + CurrentJobsDelayed int `yaml:"current-jobs-delayed"` + CurrentJobsReady int `yaml:"current-jobs-ready"` + CurrentJobsReserved int `yaml:"current-jobs-reserved"` + CurrentJobsUrgent int `yaml:"current-jobs-urgent"` + CurrentUsing int `yaml:"current-using"` + CurrentWaiting int `yaml:"current-waiting"` + CurrentWatching int `yaml:"current-watching"` + Name string `yaml:"name"` + Pause int `yaml:"pause"` + PauseTimeLeft int `yaml:"pause-time-left"` + TotalJobs int `yaml:"total-jobs"` +} diff --git a/plugins/inputs/beanstalkd/beanstalkd_test.go b/plugins/inputs/beanstalkd/beanstalkd_test.go new file mode 100644 index 0000000000000..92c108e06aa91 --- /dev/null +++ b/plugins/inputs/beanstalkd/beanstalkd_test.go @@ -0,0 +1,332 @@ +package beanstalkd_test + +import ( + "io" + "net" + "net/textproto" + "testing" + + "github.com/influxdata/telegraf/plugins/inputs/beanstalkd" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestBeanstalkd(t *testing.T) { + type tubeStats struct { + name string + fields map[string]interface{} + } + + tests := []struct { + name string + tubesConfig []string + expectedTubes []tubeStats + notExpectedTubes []tubeStats + }{ + { + name: "All tubes stats", + tubesConfig: []string{}, + expectedTubes: []tubeStats{ + {name: "default", fields: defaultTubeFields}, + {name: "test", fields: testTubeFields}, + }, + notExpectedTubes: []tubeStats{}, + }, + { + name: "Specified tubes stats", + tubesConfig: []string{"test"}, + expectedTubes: []tubeStats{ + {name: "test", fields: testTubeFields}, + }, + notExpectedTubes: []tubeStats{ + {name: "default", fields: defaultTubeFields}, + }, + }, + { + name: "Unknown tube stats", + tubesConfig: []string{"unknown"}, + expectedTubes: []tubeStats{}, + notExpectedTubes: []tubeStats{ + {name: "default", fields: defaultTubeFields}, + {name: "test", fields: testTubeFields}, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + server, err := startTestServer(t) + if err != nil { + t.Fatalf("Unable to create test server") + } + defer server.Close() + + serverAddress := server.Addr().String() + plugin := beanstalkd.Beanstalkd{ + Server: serverAddress, + Tubes: test.tubesConfig, + } + + var acc testutil.Accumulator + require.NoError(t, acc.GatherError(plugin.Gather)) + + acc.AssertContainsTaggedFields(t, "beanstalkd_overview", + overviewFields, + getOverviewTags(serverAddress), + ) + + for _, expectedTube := range test.expectedTubes { + acc.AssertContainsTaggedFields(t, "beanstalkd_tube", + expectedTube.fields, + getTubeTags(serverAddress, expectedTube.name), + ) + } + + for _, notExpectedTube := range test.notExpectedTubes { + acc.AssertDoesNotContainsTaggedFields(t, "beanstalkd_tube", + notExpectedTube.fields, + getTubeTags(serverAddress, notExpectedTube.name), + ) + } + }) + } +} + +func startTestServer(t *testing.T) (net.Listener, error) { + server, err := net.Listen("tcp", "localhost:0") + if err != nil { + return nil, err + } + + go func() { + defer server.Close() + + connection, err := server.Accept() + if err != nil { + t.Log("Test server: failed to accept connection. Error: ", err) + return + } + + tp := textproto.NewConn(connection) + defer tp.Close() + + sendSuccessResponse := func(body string) { + tp.PrintfLine("OK %d\r\n%s", len(body), body) + } + + for { + cmd, err := tp.ReadLine() + if err == io.EOF { + return + } else if err != nil { + t.Log("Test server: failed read command. Error: ", err) + return + } + + switch cmd { + case "list-tubes": + sendSuccessResponse(listTubesResponse) + case "stats": + sendSuccessResponse(statsResponse) + case "stats-tube default": + sendSuccessResponse(statsTubeDefaultResponse) + case "stats-tube test": + sendSuccessResponse(statsTubeTestResponse) + case "stats-tube unknown": + tp.PrintfLine("NOT_FOUND") + default: + t.Log("Test server: unknown command") + } + } + }() + + return server, nil +} + +const ( + listTubesResponse = `--- +- default +- test +` + statsResponse = `--- +current-jobs-urgent: 5 +current-jobs-ready: 5 +current-jobs-reserved: 0 +current-jobs-delayed: 1 +current-jobs-buried: 0 +cmd-put: 6 +cmd-peek: 0 +cmd-peek-ready: 1 +cmd-peek-delayed: 0 +cmd-peek-buried: 0 +cmd-reserve: 0 +cmd-reserve-with-timeout: 1 +cmd-delete: 1 +cmd-release: 0 +cmd-use: 2 +cmd-watch: 0 +cmd-ignore: 0 +cmd-bury: 1 +cmd-kick: 1 +cmd-touch: 0 +cmd-stats: 1 +cmd-stats-job: 0 +cmd-stats-tube: 2 +cmd-list-tubes: 1 +cmd-list-tube-used: 0 +cmd-list-tubes-watched: 0 +cmd-pause-tube: 0 +job-timeouts: 0 +total-jobs: 6 +max-job-size: 65535 +current-tubes: 2 +current-connections: 2 +current-producers: 1 +current-workers: 1 +current-waiting: 0 +total-connections: 2 +pid: 6 +version: 1.10 +rusage-utime: 0.000000 +rusage-stime: 0.000000 +uptime: 20 +binlog-oldest-index: 0 +binlog-current-index: 0 +binlog-records-migrated: 0 +binlog-records-written: 0 +binlog-max-size: 10485760 +id: bba7546657efdd4c +hostname: 2873efd3e88c +` + statsTubeDefaultResponse = `--- +name: default +current-jobs-urgent: 0 +current-jobs-ready: 0 +current-jobs-reserved: 0 +current-jobs-delayed: 0 +current-jobs-buried: 0 +total-jobs: 0 +current-using: 2 +current-watching: 2 +current-waiting: 0 +cmd-delete: 0 +cmd-pause-tube: 0 +pause: 0 +pause-time-left: 0 +` + statsTubeTestResponse = `--- +name: test +current-jobs-urgent: 5 +current-jobs-ready: 5 +current-jobs-reserved: 0 +current-jobs-delayed: 1 +current-jobs-buried: 0 +total-jobs: 6 +current-using: 0 +current-watching: 0 +current-waiting: 0 +cmd-delete: 0 +cmd-pause-tube: 0 +pause: 0 +pause-time-left: 0 +` +) + +var ( + // Default tube without stats + defaultTubeFields = map[string]interface{}{ + "cmd_delete": 0, + "cmd_pause_tube": 0, + "current_jobs_buried": 0, + "current_jobs_delayed": 0, + "current_jobs_ready": 0, + "current_jobs_reserved": 0, + "current_jobs_urgent": 0, + "current_using": 2, + "current_waiting": 0, + "current_watching": 2, + "pause": 0, + "pause_time_left": 0, + "total_jobs": 0, + } + // Test tube with stats + testTubeFields = map[string]interface{}{ + "cmd_delete": 0, + "cmd_pause_tube": 0, + "current_jobs_buried": 0, + "current_jobs_delayed": 1, + "current_jobs_ready": 5, + "current_jobs_reserved": 0, + "current_jobs_urgent": 5, + "current_using": 0, + "current_waiting": 0, + "current_watching": 0, + "pause": 0, + "pause_time_left": 0, + "total_jobs": 6, + } + // Server stats + overviewFields = map[string]interface{}{ + "binlog_current_index": 0, + "binlog_max_size": 10485760, + "binlog_oldest_index": 0, + "binlog_records_migrated": 0, + "binlog_records_written": 0, + "cmd_bury": 1, + "cmd_delete": 1, + "cmd_ignore": 0, + "cmd_kick": 1, + "cmd_list_tube_used": 0, + "cmd_list_tubes": 1, + "cmd_list_tubes_watched": 0, + "cmd_pause_tube": 0, + "cmd_peek": 0, + "cmd_peek_buried": 0, + "cmd_peek_delayed": 0, + "cmd_peek_ready": 1, + "cmd_put": 6, + "cmd_release": 0, + "cmd_reserve": 0, + "cmd_reserve_with_timeout": 1, + "cmd_stats": 1, + "cmd_stats_job": 0, + "cmd_stats_tube": 2, + "cmd_touch": 0, + "cmd_use": 2, + "cmd_watch": 0, + "current_connections": 2, + "current_jobs_buried": 0, + "current_jobs_delayed": 1, + "current_jobs_ready": 5, + "current_jobs_reserved": 0, + "current_jobs_urgent": 5, + "current_producers": 1, + "current_tubes": 2, + "current_waiting": 0, + "current_workers": 1, + "job_timeouts": 0, + "max_job_size": 65535, + "pid": 6, + "rusage_stime": 0.0, + "rusage_utime": 0.0, + "total_connections": 2, + "total_jobs": 6, + "uptime": 20, + } +) + +func getOverviewTags(server string) map[string]string { + return map[string]string{ + "hostname": "2873efd3e88c", + "id": "bba7546657efdd4c", + "server": server, + "version": "1.10", + } +} + +func getTubeTags(server string, tube string) map[string]string { + return map[string]string{ + "name": tube, + "server": server, + } +}