Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds command intervals to exec plugin #227

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/exec/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ setup the exec plugin with:
[[exec.commands]]
command = "/usr/bin/mycollector --output=json"
name = "mycollector"
interval = 10
```

The name is used as a prefix for the measurements.

The interval is used to determine how often a particular command should be run. Each
time the exec plugin runs, it will only run a particular command if it has been at least
`interval` seconds since the exec plugin last ran the command.
81 changes: 55 additions & 26 deletions plugins/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"fmt"
"github.com/gonuts/go-shellquote"
"github.com/influxdb/telegraf/plugins"
"math"
"os/exec"
"sync"
"time"
)

const sampleConfig = `
Expand All @@ -18,41 +20,63 @@ const sampleConfig = `

# name of the command (used as a prefix for measurements)
name = "mycollector"
`

type Command struct {
Command string
Name string
}
# Only run this command if it has been at least this many
# seconds since it last ran
interval = 10
`

type Exec struct {
Commands []*Command
runner Runner
clock Clock
}

type Runner interface {
Run(string, ...string) ([]byte, error)
type Command struct {
Command string
Name string
Interval int
lastRunAt time.Time
}

type CommandRunner struct {
type Runner interface {
Run(*Command) ([]byte, error)
}

func NewExec() *Exec {
return &Exec{runner: CommandRunner{}}
type Clock interface {
Now() time.Time
}

func (c CommandRunner) Run(command string, args ...string) ([]byte, error) {
cmd := exec.Command(command, args...)
type CommandRunner struct{}

type RealClock struct{}

func (c CommandRunner) Run(command *Command) ([]byte, error) {
command.lastRunAt = time.Now()
split_cmd, err := shellquote.Split(command.Command)
if err != nil || len(split_cmd) == 0 {
return nil, fmt.Errorf("exec: unable to parse command, %s", err)
}

cmd := exec.Command(split_cmd[0], split_cmd[1:]...)
var out bytes.Buffer
cmd.Stdout = &out

if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("exec: %s for command '%s'", err, command)
return nil, fmt.Errorf("exec: %s for command '%s'", err, command.Command)
}

return out.Bytes(), nil
}

func (c RealClock) Now() time.Time {
return time.Now()
}

func NewExec() *Exec {
return &Exec{runner: CommandRunner{}, clock: RealClock{}}
}

func (e *Exec) SampleConfig() string {
return sampleConfig
}
Expand Down Expand Up @@ -80,23 +104,28 @@ func (e *Exec) Gather(acc plugins.Accumulator) error {
}

func (e *Exec) gatherCommand(c *Command, acc plugins.Accumulator) error {
words, err := shellquote.Split(c.Command)
if err != nil || len(words) == 0 {
return fmt.Errorf("exec: unable to parse command, %s", err)
}
secondsSinceLastRun := 0.0

out, err := e.runner.Run(words[0], words[1:]...)
if err != nil {
return err
if c.lastRunAt.Unix() == 0 { // means time is uninitialized
secondsSinceLastRun = math.Inf(1)
} else {
secondsSinceLastRun = (e.clock.Now().Sub(c.lastRunAt)).Seconds()
}

var jsonOut interface{}
err = json.Unmarshal(out, &jsonOut)
if err != nil {
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", c.Command, err)
}
if secondsSinceLastRun >= float64(c.Interval) {
out, err := e.runner.Run(c)
if err != nil {
return err
}

processResponse(acc, c.Name, map[string]string{}, jsonOut)
var jsonOut interface{}
err = json.Unmarshal(out, &jsonOut)
if err != nil {
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", c.Command, err)
}

processResponse(acc, c.Name, map[string]string{}, jsonOut)
}
return nil
}

Expand Down
191 changes: 182 additions & 9 deletions plugins/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ import (
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"math"
"testing"
"time"
)

// Midnight 9/22/2015
const baseTimeSeconds = 1442905200

const validJson = `
{
"status": "green",
Expand All @@ -32,24 +37,52 @@ type runnerMock struct {
err error
}

type clockMock struct {
now time.Time
}

func newRunnerMock(out []byte, err error) Runner {
return &runnerMock{out: out, err: err}
return &runnerMock{
out: out,
err: err,
}
}

func (r runnerMock) Run(command string, args ...string) ([]byte, error) {
func (r runnerMock) Run(command *Command) ([]byte, error) {
if r.err != nil {
return nil, r.err
}
return r.out, nil
}

func newClockMock(now time.Time) Clock {
return &clockMock{now: now}
}

func (c clockMock) Now() time.Time {
return c.now
}

func TestExec(t *testing.T) {
runner := newRunnerMock([]byte(validJson), nil)
command := Command{Command: "testcommand arg1", Name: "mycollector"}
e := &Exec{runner: runner, Commands: []*Command{&command}}
clock := newClockMock(time.Unix(baseTimeSeconds+20, 0))
command := Command{
Command: "testcommand arg1",
Name: "mycollector",
Interval: 10,
lastRunAt: time.Unix(baseTimeSeconds, 0),
}

e := &Exec{
runner: runner,
clock: clock,
Commands: []*Command{&command},
}

var acc testutil.Accumulator
initialPoints := len(acc.Points)
err := e.Gather(&acc)
deltaPoints := len(acc.Points) - initialPoints
require.NoError(t, err)

checkFloat := []struct {
Expand All @@ -66,24 +99,164 @@ func TestExec(t *testing.T) {
assert.True(t, acc.CheckValue(c.name, c.value))
}

assert.Equal(t, len(acc.Points), 4, "non-numeric measurements should be ignored")
assert.Equal(t, deltaPoints, 4, "non-numeric measurements should be ignored")
}

func TestExecMalformed(t *testing.T) {
runner := newRunnerMock([]byte(malformedJson), nil)
command := Command{Command: "badcommand arg1", Name: "mycollector"}
e := &Exec{runner: runner, Commands: []*Command{&command}}
clock := newClockMock(time.Unix(baseTimeSeconds+20, 0))
command := Command{
Command: "badcommand arg1",
Name: "mycollector",
Interval: 10,
lastRunAt: time.Unix(baseTimeSeconds, 0),
}

e := &Exec{
runner: runner,
clock: clock,
Commands: []*Command{&command},
}

var acc testutil.Accumulator
initialPoints := len(acc.Points)
err := e.Gather(&acc)
deltaPoints := len(acc.Points) - initialPoints
require.Error(t, err)

assert.Equal(t, deltaPoints, 0, "No new points should have been added")
}

func TestCommandError(t *testing.T) {
runner := newRunnerMock(nil, fmt.Errorf("exit status code 1"))
command := Command{Command: "badcommand", Name: "mycollector"}
e := &Exec{runner: runner, Commands: []*Command{&command}}
clock := newClockMock(time.Unix(baseTimeSeconds+20, 0))
command := Command{
Command: "badcommand",
Name: "mycollector",
Interval: 10,
lastRunAt: time.Unix(baseTimeSeconds, 0),
}

e := &Exec{
runner: runner,
clock: clock,
Commands: []*Command{&command},
}

var acc testutil.Accumulator
initialPoints := len(acc.Points)
err := e.Gather(&acc)
deltaPoints := len(acc.Points) - initialPoints
require.Error(t, err)

assert.Equal(t, deltaPoints, 0, "No new points should have been added")
}

func TestExecNotEnoughTime(t *testing.T) {
runner := newRunnerMock([]byte(validJson), nil)
clock := newClockMock(time.Unix(baseTimeSeconds+5, 0))
command := Command{
Command: "testcommand arg1",
Name: "mycollector",
Interval: 10,
lastRunAt: time.Unix(baseTimeSeconds, 0),
}

e := &Exec{
runner: runner,
clock: clock,
Commands: []*Command{&command},
}

var acc testutil.Accumulator
initialPoints := len(acc.Points)
err := e.Gather(&acc)
deltaPoints := len(acc.Points) - initialPoints
require.NoError(t, err)

assert.Equal(t, deltaPoints, 0, "No new points should have been added")
}

func TestExecUninitializedLastRunAt(t *testing.T) {
runner := newRunnerMock([]byte(validJson), nil)
clock := newClockMock(time.Unix(baseTimeSeconds, 0))
command := Command{
Command: "testcommand arg1",
Name: "mycollector",
Interval: math.MaxInt32,
// Uninitialized lastRunAt should default to time.Unix(0, 0), so this should
// run no matter what the interval is
}

e := &Exec{
runner: runner,
clock: clock,
Commands: []*Command{&command},
}

var acc testutil.Accumulator
initialPoints := len(acc.Points)
err := e.Gather(&acc)
deltaPoints := len(acc.Points) - initialPoints
require.NoError(t, err)

checkFloat := []struct {
name string
value float64
}{
{"mycollector_num_processes", 82},
{"mycollector_cpu_used", 8234},
{"mycollector_cpu_free", 32},
{"mycollector_percent", 0.81},
}

for _, c := range checkFloat {
assert.True(t, acc.CheckValue(c.name, c.value))
}

assert.Equal(t, deltaPoints, 4, "non-numeric measurements should be ignored")
}
func TestExecOneNotEnoughTimeAndOneEnoughTime(t *testing.T) {
runner := newRunnerMock([]byte(validJson), nil)
clock := newClockMock(time.Unix(baseTimeSeconds+5, 0))
notEnoughTimeCommand := Command{
Command: "testcommand arg1",
Name: "mycollector",
Interval: 10,
lastRunAt: time.Unix(baseTimeSeconds, 0),
}
enoughTimeCommand := Command{
Command: "testcommand arg1",
Name: "mycollector",
Interval: 3,
lastRunAt: time.Unix(baseTimeSeconds, 0),
}

e := &Exec{
runner: runner,
clock: clock,
Commands: []*Command{&notEnoughTimeCommand, &enoughTimeCommand},
}

var acc testutil.Accumulator
initialPoints := len(acc.Points)
err := e.Gather(&acc)
deltaPoints := len(acc.Points) - initialPoints
require.NoError(t, err)

checkFloat := []struct {
name string
value float64
}{
{"mycollector_num_processes", 82},
{"mycollector_cpu_used", 8234},
{"mycollector_cpu_free", 32},
{"mycollector_percent", 0.81},
}

for _, c := range checkFloat {
assert.True(t, acc.CheckValue(c.name, c.value))
}

assert.Equal(t, deltaPoints, 4, "Only one command should have been run")
}