From 269d731a128d6f354b7fdbd043b0ef63bde1abd1 Mon Sep 17 00:00:00 2001 From: John Engelman Date: Thu, 27 Oct 2016 13:21:33 -0500 Subject: [PATCH] Add support to parse JSON array. --- CHANGELOG.md | 3 + docs/DATA_FORMATS_INPUT.md | 65 ++++++++++++ plugins/parsers/json/parser.go | 31 +++++- plugins/parsers/json/parser_test.go | 148 +++++++++++++++++++++++++++- plugins/parsers/registry.go | 6 +- 5 files changed, 244 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dfa79ebd8ca90..5ec9e62e8afe4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,9 @@ documentation for configuring journald. There is also a [`logfile` config option available in 1.1, which will allow users to easily configure telegraf to continue sending logs to /var/log/telegraf/telegraf.log. +- The JSON parser can now parse JSON data where the root object is an array. +The parsing configuration is applied to each element of the array. + ### Features - [#1726](https://github.com/influxdata/telegraf/issues/1726): Processor & Aggregator plugin support. diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index c14752d9cdf81..a7db9c038480d 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -115,6 +115,10 @@ For example, if you had this configuration: ## measurement name suffix (for separating different commands) name_suffix = "_mycollector" + + ## specifies if the incoming JSON data is an array of metric data (true) + ## to parse or a single object (false) + array = false ## Data format to consume. ## Each data format has it's own unique set of configuration options, read @@ -147,6 +151,67 @@ Your Telegraf metrics would get tagged with "my_tag_1" exec_mycollector,my_tag_1=foo a=5,b_c=6 ``` +If the JSON data is an array, then setting the `array` to `true` configures +Telegraf to parse each element of the array with the configured settings. +Each resulting metric will be output with the same timestamp. + +For example, if you had this configuration: + +```toml +[[inputs.exec]] + ## Commands array + commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"] + + ## measurement name suffix (for separating different commands) + name_suffix = "_mycollector" + + ## specifies if the incoming JSON data is an array of metric data (true) + ## to parse or a single object (false) + array = true + + ## Data format to consume. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "json" + + ## List of tag names to extract from top-level of JSON server response + tag_keys = [ + "my_tag_1", + "my_tag_2" + ] +``` + +with this JSON output from a command: + +```json +[ + { + "a": 5, + "b": { + "c": 6 + }, + "my_tag_1": "foo", + "my_tag_2": "baz" + }, + { + "a": 7, + "b": { + "c": 8 + }, + "my_tag_1": "bar", + "my_tag_2": "baz" + } +] +``` + +Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2" + +``` +exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6 +exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8 +``` + # Value: The "value" data format translates single values into Telegraf metrics. This diff --git a/plugins/parsers/json/parser.go b/plugins/parsers/json/parser.go index 180f2452aff15..40930e0c53036 100644 --- a/plugins/parsers/json/parser.go +++ b/plugins/parsers/json/parser.go @@ -14,17 +14,25 @@ type JSONParser struct { MetricName string TagKeys []string DefaultTags map[string]string + Array bool } -func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { +func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) { metrics := make([]telegraf.Metric, 0) - var jsonOut map[string]interface{} + var jsonOut []map[string]interface{} err := json.Unmarshal(buf, &jsonOut) if err != nil { - err = fmt.Errorf("unable to parse out as JSON, %s", err) + err = fmt.Errorf("unable to parse out as JSON Array, %s", err) return nil, err } + for _, item := range jsonOut { + metrics, err = p.parseObject(metrics, item) + } + return metrics, nil +} + +func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]interface{}) ([]telegraf.Metric, error) { tags := make(map[string]string) for k, v := range p.DefaultTags { @@ -44,7 +52,7 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { } f := JSONFlattener{} - err = f.FlattenJSON("", jsonOut) + err := f.FlattenJSON("", jsonOut) if err != nil { return nil, err } @@ -57,6 +65,21 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { return append(metrics, metric), nil } +func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { + + if !p.Array { + metrics := make([]telegraf.Metric, 0) + var jsonOut map[string]interface{} + err := json.Unmarshal(buf, &jsonOut) + if err != nil { + err = fmt.Errorf("unable to parse out as JSON, %s", err) + return nil, err + } + return p.parseObject(metrics, jsonOut) + } + return p.parseArray(buf) +} + func (p *JSONParser) ParseLine(line string) (telegraf.Metric, error) { metrics, err := p.Parse([]byte(line + "\n")) diff --git a/plugins/parsers/json/parser_test.go b/plugins/parsers/json/parser_test.go index f3e6d9404e990..98809ea1fc0ee 100644 --- a/plugins/parsers/json/parser_test.go +++ b/plugins/parsers/json/parser_test.go @@ -7,10 +7,12 @@ import ( ) const ( - validJSON = "{\"a\": 5, \"b\": {\"c\": 6}}" - validJSONNewline = "\n{\"d\": 7, \"b\": {\"d\": 8}}\n" - invalidJSON = "I don't think this is JSON" - invalidJSON2 = "{\"a\": 5, \"b\": \"c\": 6}}" + validJSON = "{\"a\": 5, \"b\": {\"c\": 6}}" + validJSONNewline = "\n{\"d\": 7, \"b\": {\"d\": 8}}\n" + validJSONArray = "[{\"a\": 5, \"b\": {\"c\": 6}}]" + validJSONArrayMultiple = "[{\"a\": 5, \"b\": {\"c\": 6}}, {\"a\": 7, \"b\": {\"c\": 8}}]" + invalidJSON = "I don't think this is JSON" + invalidJSON2 = "{\"a\": 5, \"b\": \"c\": 6}}" ) const validJSONTags = ` @@ -24,6 +26,27 @@ const validJSONTags = ` } ` +const validJSONArrayTags = ` +[ +{ + "a": 5, + "b": { + "c": 6 + }, + "mytag": "foo", + "othertag": "baz" +}, +{ + "a": 7, + "b": { + "c": 8 + }, + "mytag": "bar", + "othertag": "baz" +} +] +` + func TestParseValidJSON(t *testing.T) { parser := JSONParser{ MetricName: "json_test", @@ -282,3 +305,120 @@ func TestParseValidJSONDefaultTagsOverride(t *testing.T) { "mytag": "foobar", }, metrics[0].Tags()) } + +// Test that json arrays can be parsed +func TestParseValidJSONArray(t *testing.T) { + parser := JSONParser{ + MetricName: "json_array_test", + Array: true, + } + + // Most basic vanilla test + metrics, err := parser.Parse([]byte(validJSONArray)) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + assert.Equal(t, "json_array_test", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{}, metrics[0].Tags()) + + // Basic multiple datapoints + metrics, err = parser.Parse([]byte(validJSONArrayMultiple)) + assert.NoError(t, err) + assert.Len(t, metrics, 2) + assert.Equal(t, "json_array_test", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{}, metrics[1].Tags()) + assert.Equal(t, "json_array_test", metrics[1].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(7), + "b_c": float64(8), + }, metrics[1].Fields()) + assert.Equal(t, map[string]string{}, metrics[1].Tags()) +} + +func TestParseArrayWithTagKeys(t *testing.T) { + // Test that strings not matching tag keys are ignored + parser := JSONParser{ + MetricName: "json_array_test", + Array: true, + TagKeys: []string{"wrongtagkey"}, + } + metrics, err := parser.Parse([]byte(validJSONArrayTags)) + assert.NoError(t, err) + assert.Len(t, metrics, 2) + assert.Equal(t, "json_array_test", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{}, metrics[0].Tags()) + + assert.Equal(t, "json_array_test", metrics[1].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(7), + "b_c": float64(8), + }, metrics[1].Fields()) + assert.Equal(t, map[string]string{}, metrics[1].Tags()) + + // Test that single tag key is found and applied + parser = JSONParser{ + MetricName: "json_array_test", + Array: true, + TagKeys: []string{"mytag"}, + } + metrics, err = parser.Parse([]byte(validJSONArrayTags)) + assert.NoError(t, err) + assert.Len(t, metrics, 2) + assert.Equal(t, "json_array_test", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{ + "mytag": "foo", + }, metrics[0].Tags()) + + assert.Equal(t, "json_array_test", metrics[1].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(7), + "b_c": float64(8), + }, metrics[1].Fields()) + assert.Equal(t, map[string]string{ + "mytag": "bar", + }, metrics[1].Tags()) + + // Test that both tag keys are found and applied + parser = JSONParser{ + MetricName: "json_array_test", + Array: true, + TagKeys: []string{"mytag", "othertag"}, + } + metrics, err = parser.Parse([]byte(validJSONArrayTags)) + assert.NoError(t, err) + assert.Len(t, metrics, 2) + assert.Equal(t, "json_array_test", metrics[0].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }, metrics[0].Fields()) + assert.Equal(t, map[string]string{ + "mytag": "foo", + "othertag": "baz", + }, metrics[0].Tags()) + + assert.Equal(t, "json_array_test", metrics[1].Name()) + assert.Equal(t, map[string]interface{}{ + "a": float64(7), + "b_c": float64(8), + }, metrics[1].Fields()) + assert.Equal(t, map[string]string{ + "mytag": "bar", + "othertag": "baz", + }, metrics[1].Tags()) +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 360d795bc58f1..f0405f1fe1515 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -50,6 +50,8 @@ type Config struct { // TagKeys only apply to JSON data TagKeys []string + // Array only applies to JSON data + Array bool // MetricName applies to JSON & value. This will be the name of the measurement. MetricName string @@ -67,7 +69,7 @@ func NewParser(config *Config) (Parser, error) { switch config.DataFormat { case "json": parser, err = NewJSONParser(config.MetricName, - config.TagKeys, config.DefaultTags) + config.TagKeys, config.DefaultTags, config.Array) case "value": parser, err = NewValueParser(config.MetricName, config.DataType, config.DefaultTags) @@ -88,10 +90,12 @@ func NewJSONParser( metricName string, tagKeys []string, defaultTags map[string]string, + array bool, ) (Parser, error) { parser := &json.JSONParser{ MetricName: metricName, TagKeys: tagKeys, + Array: array, DefaultTags: defaultTags, } return parser, nil