From b97027ac9a87a7c4db5d7341b5c8554aa2a47e93 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 29 Jan 2016 11:30:35 -0700 Subject: [PATCH] Allow exec plugin to parse line-protocol closes #613 --- CHANGELOG.md | 1 + plugins/inputs/exec/README.md | 73 ++++++++++++++++++++++++-------- plugins/inputs/exec/exec.go | 47 +++++++++++++------- plugins/inputs/exec/exec_test.go | 73 ++++++++++++++++++++++++++++++++ 4 files changed, 160 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ceea98ed37676..2e024efea9838 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - [#603](https://github.com/influxdata/telegraf/pull/603): Aggregate statsd timing measurements into fields. Thanks @marcinbunsch! - [#601](https://github.com/influxdata/telegraf/issues/601): Warn when overwriting cached metrics. - [#614](https://github.com/influxdata/telegraf/pull/614): PowerDNS input plugin. Thanks @Kasen! +- [#617](https://github.com/influxdata/telegraf/pull/617): exec plugin: parse influx line protocol in addition to JSON. ### Bugfixes - [#595](https://github.com/influxdata/telegraf/issues/595): graphite output should include tags to separate duplicate measurements. diff --git a/plugins/inputs/exec/README.md b/plugins/inputs/exec/README.md index bd78f0b3c414b..1172140c7a531 100644 --- a/plugins/inputs/exec/README.md +++ b/plugins/inputs/exec/README.md @@ -1,28 +1,39 @@ -# Exec Plugin +# Exec Input Plugin -The exec plugin can execute arbitrary commands which output JSON. Then it flattens JSON and finds -all numeric values, treating them as floats. +The exec plugin can execute arbitrary commands which output JSON or +InfluxDB [line-protocol](https://docs.influxdata.com/influxdb/v0.9/write_protocols/line/). -For example, if you have a json-returning command called mycollector, you could -setup the exec plugin with: +If using JSON, only numeric values are parsed and turned into floats. Booleans +and strings will be ignored. + +### Configuration ``` +# Read flattened metrics from one or more commands that output JSON to stdout [[inputs.exec]] - command = "/usr/bin/mycollector --output=json" + # the command to run + command = "/usr/bin/mycollector --foo=bar" + + # Data format to consume. This can be "json" or "influx" (line-protocol) + # NOTE json only reads numerical measurements, strings and booleans are ignored. + data_format = "json" + + # measurement name suffix (for separating different commands) name_suffix = "_mycollector" - interval = "10s" ``` -The name suffix is appended to exec as "exec_name_suffix" to identify the input stream. +Other options for modifying the measurement names are: -The interval is used to determine how often a particular command should be run. Each -time the exec plugin runs, it will only run a particular command if it has been at least -`interval` seconds since the exec plugin last ran the command. +``` +name_override = "measurement_name" +name_prefix = "prefix_" +``` +### Example 1 -# Sample +Let's say that we have the above configuration, and mycollector outputs the +following JSON: -Let's say that we have a command with the name_suffix "_mycollector", which gives the following output: ```json { "a": 0.5, @@ -33,13 +44,39 @@ Let's say that we have a command with the name_suffix "_mycollector", which give } ``` -The collected metrics will be stored as field values under the same measurement "exec_mycollector": +The collected metrics will be stored as fields under the measurement +"exec_mycollector": + ``` - exec_mycollector a=0.5,b_c=0.1,b_d=5 1452815002357578567 +exec_mycollector a=0.5,b_c=0.1,b_d=5 1452815002357578567 ``` -Other options for modifying the measurement names are: +### Example 2 + +Now let's say we have the following configuration: + ``` -name_override = "newname" -name_prefix = "prefix_" +[[inputs.exec]] + # the command to run + command = "/usr/bin/line_protocol_collector" + + # Data format to consume. This can be "json" or "influx" (line-protocol) + # NOTE json only reads numerical measurements, strings and booleans are ignored. + data_format = "influx" ``` + +And line_protocol_collector outputs the following line protocol: + +``` +cpu,cpu=cpu0,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,cpu=cpu1,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,cpu=cpu2,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,cpu=cpu3,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,cpu=cpu4,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,cpu=cpu5,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,cpu=cpu6,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +``` + +You will get data in InfluxDB exactly as it is defined above, +tags are cpu=cpuN, host=foo, and datacenter=us-east with fields usage_idle +and usage_busy. They will receive a timestamp at collection time. diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index 65be6bfaf4745..c4bb634bafbde 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "os/exec" + "time" "github.com/gonuts/go-shellquote" @@ -14,18 +15,20 @@ import ( ) const sampleConfig = ` - # NOTE This plugin only reads numerical measurements, strings and booleans - # will be ignored. - # the command to run command = "/usr/bin/mycollector --foo=bar" + # Data format to consume. This can be "json" or "influx" (line-protocol) + # NOTE json only reads numerical measurements, strings and booleans are ignored. + data_format = "json" + # measurement name suffix (for separating different commands) name_suffix = "_mycollector" ` type Exec struct { - Command string + Command string + DataFormat string runner Runner } @@ -71,20 +74,32 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error { return err } - var jsonOut interface{} - err = json.Unmarshal(out, &jsonOut) - if err != nil { - return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", - e.Command, err) - } - - f := internal.JSONFlattener{} - err = f.FlattenJSON("", jsonOut) - if err != nil { + switch e.DataFormat { + case "", "json": + var jsonOut interface{} + err = json.Unmarshal(out, &jsonOut) + if err != nil { + return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", + e.Command, err) + } + + f := internal.JSONFlattener{} + err = f.FlattenJSON("", jsonOut) + if err != nil { + return err + } + acc.AddFields("exec", f.Fields, nil) + case "influx": + now := time.Now() + metrics, err := telegraf.ParseMetrics(out) + for _, metric := range metrics { + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), now) + } return err + default: + return fmt.Errorf("Unsupported data format: %s. Must be either json "+ + "or influx.", e.DataFormat) } - - acc.AddFields("exec", f.Fields, nil) return nil } diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index 8bf47c1d0fe02..709308fce843e 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -31,6 +31,18 @@ const malformedJson = ` "status": "green", ` +const lineProtocol = "cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1" + +const lineProtocolMulti = ` +cpu,cpu=cpu0,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,cpu=cpu1,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,cpu=cpu2,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,cpu=cpu3,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,cpu=cpu4,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,cpu=cpu5,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +cpu,cpu=cpu6,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 +` + type runnerMock struct { out []byte err error @@ -97,3 +109,64 @@ func TestCommandError(t *testing.T) { require.Error(t, err) assert.Equal(t, acc.NFields(), 0, "No new points should have been added") } + +func TestLineProtocolParse(t *testing.T) { + e := &Exec{ + runner: newRunnerMock([]byte(lineProtocol), nil), + Command: "line-protocol", + DataFormat: "influx", + } + + var acc testutil.Accumulator + err := e.Gather(&acc) + require.NoError(t, err) + + fields := map[string]interface{}{ + "usage_idle": float64(99), + "usage_busy": float64(1), + } + tags := map[string]string{ + "host": "foo", + "datacenter": "us-east", + } + acc.AssertContainsTaggedFields(t, "cpu", fields, tags) +} + +func TestLineProtocolParseMultiple(t *testing.T) { + e := &Exec{ + runner: newRunnerMock([]byte(lineProtocolMulti), nil), + Command: "line-protocol", + DataFormat: "influx", + } + + var acc testutil.Accumulator + err := e.Gather(&acc) + require.NoError(t, err) + + fields := map[string]interface{}{ + "usage_idle": float64(99), + "usage_busy": float64(1), + } + tags := map[string]string{ + "host": "foo", + "datacenter": "us-east", + } + cpuTags := []string{"cpu0", "cpu1", "cpu2", "cpu3", "cpu4", "cpu5", "cpu6"} + + for _, cpu := range cpuTags { + tags["cpu"] = cpu + acc.AssertContainsTaggedFields(t, "cpu", fields, tags) + } +} + +func TestInvalidDataFormat(t *testing.T) { + e := &Exec{ + runner: newRunnerMock([]byte(lineProtocol), nil), + Command: "bad data format", + DataFormat: "FooBar", + } + + var acc testutil.Accumulator + err := e.Gather(&acc) + require.Error(t, err) +}