Skip to content

Commit

Permalink
Add support to parse JSON array.
Browse files Browse the repository at this point in the history
  • Loading branch information
johnrengelman committed Oct 27, 2016
1 parent 522658b commit 269d731
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ documentation for configuring journald. There is also a [`logfile` config option
available in 1.1, which will allow users to easily configure telegraf to
continue sending logs to /var/log/telegraf/telegraf.log.

- The JSON parser can now parse JSON data where the root object is an array.
The parsing configuration is applied to each element of the array.

### Features

- [#1726](https://github.com/influxdata/telegraf/issues/1726): Processor & Aggregator plugin support.
Expand Down
65 changes: 65 additions & 0 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ For example, if you had this configuration:

## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"

## specifies if the incoming JSON data is an array of metric data (true)
## to parse or a single object (false)
array = false

## Data format to consume.
## Each data format has it's own unique set of configuration options, read
Expand Down Expand Up @@ -147,6 +151,67 @@ Your Telegraf metrics would get tagged with "my_tag_1"
exec_mycollector,my_tag_1=foo a=5,b_c=6
```

If the JSON data is an array, then setting the `array` to `true` configures
Telegraf to parse each element of the array with the configured settings.
Each resulting metric will be output with the same timestamp.

For example, if you had this configuration:

```toml
[[inputs.exec]]
## Commands array
commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"]

## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"

## specifies if the incoming JSON data is an array of metric data (true)
## to parse or a single object (false)
array = true

## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"

## List of tag names to extract from top-level of JSON server response
tag_keys = [
"my_tag_1",
"my_tag_2"
]
```

with this JSON output from a command:

```json
[
{
"a": 5,
"b": {
"c": 6
},
"my_tag_1": "foo",
"my_tag_2": "baz"
},
{
"a": 7,
"b": {
"c": 8
},
"my_tag_1": "bar",
"my_tag_2": "baz"
}
]
```

Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2"

```
exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6
exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8
```

# Value:

The "value" data format translates single values into Telegraf metrics. This
Expand Down
31 changes: 27 additions & 4 deletions plugins/parsers/json/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,25 @@ type JSONParser struct {
MetricName string
TagKeys []string
DefaultTags map[string]string
Array bool
}

func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) {
metrics := make([]telegraf.Metric, 0)

var jsonOut map[string]interface{}
var jsonOut []map[string]interface{}
err := json.Unmarshal(buf, &jsonOut)
if err != nil {
err = fmt.Errorf("unable to parse out as JSON, %s", err)
err = fmt.Errorf("unable to parse out as JSON Array, %s", err)
return nil, err
}
for _, item := range jsonOut {
metrics, err = p.parseObject(metrics, item)
}
return metrics, nil
}

func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]interface{}) ([]telegraf.Metric, error) {

tags := make(map[string]string)
for k, v := range p.DefaultTags {
Expand All @@ -44,7 +52,7 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
}

f := JSONFlattener{}
err = f.FlattenJSON("", jsonOut)
err := f.FlattenJSON("", jsonOut)
if err != nil {
return nil, err
}
Expand All @@ -57,6 +65,21 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {
return append(metrics, metric), nil
}

func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) {

if !p.Array {
metrics := make([]telegraf.Metric, 0)
var jsonOut map[string]interface{}
err := json.Unmarshal(buf, &jsonOut)
if err != nil {
err = fmt.Errorf("unable to parse out as JSON, %s", err)
return nil, err
}
return p.parseObject(metrics, jsonOut)
}
return p.parseArray(buf)
}

func (p *JSONParser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line + "\n"))

Expand Down
148 changes: 144 additions & 4 deletions plugins/parsers/json/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
)

const (
validJSON = "{\"a\": 5, \"b\": {\"c\": 6}}"
validJSONNewline = "\n{\"d\": 7, \"b\": {\"d\": 8}}\n"
invalidJSON = "I don't think this is JSON"
invalidJSON2 = "{\"a\": 5, \"b\": \"c\": 6}}"
validJSON = "{\"a\": 5, \"b\": {\"c\": 6}}"
validJSONNewline = "\n{\"d\": 7, \"b\": {\"d\": 8}}\n"
validJSONArray = "[{\"a\": 5, \"b\": {\"c\": 6}}]"
validJSONArrayMultiple = "[{\"a\": 5, \"b\": {\"c\": 6}}, {\"a\": 7, \"b\": {\"c\": 8}}]"
invalidJSON = "I don't think this is JSON"
invalidJSON2 = "{\"a\": 5, \"b\": \"c\": 6}}"
)

const validJSONTags = `
Expand All @@ -24,6 +26,27 @@ const validJSONTags = `
}
`

const validJSONArrayTags = `
[
{
"a": 5,
"b": {
"c": 6
},
"mytag": "foo",
"othertag": "baz"
},
{
"a": 7,
"b": {
"c": 8
},
"mytag": "bar",
"othertag": "baz"
}
]
`

func TestParseValidJSON(t *testing.T) {
parser := JSONParser{
MetricName: "json_test",
Expand Down Expand Up @@ -282,3 +305,120 @@ func TestParseValidJSONDefaultTagsOverride(t *testing.T) {
"mytag": "foobar",
}, metrics[0].Tags())
}

// Test that json arrays can be parsed
func TestParseValidJSONArray(t *testing.T) {
parser := JSONParser{
MetricName: "json_array_test",
Array: true,
}

// Most basic vanilla test
metrics, err := parser.Parse([]byte(validJSONArray))
assert.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "json_array_test", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"a": float64(5),
"b_c": float64(6),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{}, metrics[0].Tags())

// Basic multiple datapoints
metrics, err = parser.Parse([]byte(validJSONArrayMultiple))
assert.NoError(t, err)
assert.Len(t, metrics, 2)
assert.Equal(t, "json_array_test", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"a": float64(5),
"b_c": float64(6),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{}, metrics[1].Tags())
assert.Equal(t, "json_array_test", metrics[1].Name())
assert.Equal(t, map[string]interface{}{
"a": float64(7),
"b_c": float64(8),
}, metrics[1].Fields())
assert.Equal(t, map[string]string{}, metrics[1].Tags())
}

func TestParseArrayWithTagKeys(t *testing.T) {
// Test that strings not matching tag keys are ignored
parser := JSONParser{
MetricName: "json_array_test",
Array: true,
TagKeys: []string{"wrongtagkey"},
}
metrics, err := parser.Parse([]byte(validJSONArrayTags))
assert.NoError(t, err)
assert.Len(t, metrics, 2)
assert.Equal(t, "json_array_test", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"a": float64(5),
"b_c": float64(6),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{}, metrics[0].Tags())

assert.Equal(t, "json_array_test", metrics[1].Name())
assert.Equal(t, map[string]interface{}{
"a": float64(7),
"b_c": float64(8),
}, metrics[1].Fields())
assert.Equal(t, map[string]string{}, metrics[1].Tags())

// Test that single tag key is found and applied
parser = JSONParser{
MetricName: "json_array_test",
Array: true,
TagKeys: []string{"mytag"},
}
metrics, err = parser.Parse([]byte(validJSONArrayTags))
assert.NoError(t, err)
assert.Len(t, metrics, 2)
assert.Equal(t, "json_array_test", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"a": float64(5),
"b_c": float64(6),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{
"mytag": "foo",
}, metrics[0].Tags())

assert.Equal(t, "json_array_test", metrics[1].Name())
assert.Equal(t, map[string]interface{}{
"a": float64(7),
"b_c": float64(8),
}, metrics[1].Fields())
assert.Equal(t, map[string]string{
"mytag": "bar",
}, metrics[1].Tags())

// Test that both tag keys are found and applied
parser = JSONParser{
MetricName: "json_array_test",
Array: true,
TagKeys: []string{"mytag", "othertag"},
}
metrics, err = parser.Parse([]byte(validJSONArrayTags))
assert.NoError(t, err)
assert.Len(t, metrics, 2)
assert.Equal(t, "json_array_test", metrics[0].Name())
assert.Equal(t, map[string]interface{}{
"a": float64(5),
"b_c": float64(6),
}, metrics[0].Fields())
assert.Equal(t, map[string]string{
"mytag": "foo",
"othertag": "baz",
}, metrics[0].Tags())

assert.Equal(t, "json_array_test", metrics[1].Name())
assert.Equal(t, map[string]interface{}{
"a": float64(7),
"b_c": float64(8),
}, metrics[1].Fields())
assert.Equal(t, map[string]string{
"mytag": "bar",
"othertag": "baz",
}, metrics[1].Tags())
}
6 changes: 5 additions & 1 deletion plugins/parsers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Config struct {

// TagKeys only apply to JSON data
TagKeys []string
// Array only applies to JSON data
Array bool
// MetricName applies to JSON & value. This will be the name of the measurement.
MetricName string

Expand All @@ -67,7 +69,7 @@ func NewParser(config *Config) (Parser, error) {
switch config.DataFormat {
case "json":
parser, err = NewJSONParser(config.MetricName,
config.TagKeys, config.DefaultTags)
config.TagKeys, config.DefaultTags, config.Array)
case "value":
parser, err = NewValueParser(config.MetricName,
config.DataType, config.DefaultTags)
Expand All @@ -88,10 +90,12 @@ func NewJSONParser(
metricName string,
tagKeys []string,
defaultTags map[string]string,
array bool,
) (Parser, error) {
parser := &json.JSONParser{
MetricName: metricName,
TagKeys: tagKeys,
Array: array,
DefaultTags: defaultTags,
}
return parser, nil
Expand Down

0 comments on commit 269d731

Please sign in to comment.