From c9215e208ba917384c7b4d5adefa8ffbb7a9d7b0 Mon Sep 17 00:00:00 2001 From: bsmaldon Date: Wed, 5 Sep 2018 23:13:29 +0100 Subject: [PATCH] Add strings processor (#4476) --- plugins/processors/all/all.go | 1 + plugins/processors/strings/README.md | 83 ++++ plugins/processors/strings/strings.go | 199 +++++++++ plugins/processors/strings/strings_test.go | 483 +++++++++++++++++++++ 4 files changed, 766 insertions(+) create mode 100644 plugins/processors/strings/README.md create mode 100644 plugins/processors/strings/strings.go create mode 100644 plugins/processors/strings/strings_test.go diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index 5c2e2549e1c63..a8386a6088d5a 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -7,6 +7,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/processors/parser" _ "github.com/influxdata/telegraf/plugins/processors/printer" _ "github.com/influxdata/telegraf/plugins/processors/regex" + _ "github.com/influxdata/telegraf/plugins/processors/strings" _ "github.com/influxdata/telegraf/plugins/processors/rename" _ "github.com/influxdata/telegraf/plugins/processors/topk" ) diff --git a/plugins/processors/strings/README.md b/plugins/processors/strings/README.md new file mode 100644 index 0000000000000..f1e7361fea4d2 --- /dev/null +++ b/plugins/processors/strings/README.md @@ -0,0 +1,83 @@ +# Strings Processor Plugin + +The `strings` plugin maps certain go string functions onto measurement, tag, and field values. Values can be modified in place or stored in another key. + +Implemented functions are: +- lowercase +- uppercase +- trim +- trim_left +- trim_right +- trim_prefix +- trim_suffix + +Please note that in this implementation these are processed in the order that they appear above. + +Specify the `measurement`, `tag` or `field` that you want processed in each section and optionally a `dest` if you want the result stored in a new tag or field. You can specify lots of transformations on data with a single strings processor. + +### Configuration: + +```toml +[[processors.strings]] + # [[processors.strings.uppercase]] + # tag = "method" + + # [[processors.strings.lowercase]] + # field = "uri_stem" + # dest = "uri_stem_normalised" + + ## Convert a tag value to lowercase + # [[processors.strings.trim]] + # field = "message" + + # [[processors.strings.trim_left]] + # field = "message" + # cutset = "\t" + + # [[processors.strings.trim_right]] + # field = "message" + # cutset = "\r\n" + + # [[processors.strings.trim_prefix]] + # field = "my_value" + # prefix = "my_" + + # [[processors.strings.trim_suffix]] + # field = "read_count" + # suffix = "_count" +``` + +#### Trim, TrimLeft, TrimRight + +The `trim`, `trim_left`, and `trim_right` functions take an optional parameter: `cutset`. This value is a string containing the characters to remove from the value. + +#### TrimPrefix, TrimSuffix + +The `trim_prefix` and `trim_suffix` functions remote the given `prefix` or `suffix` +respectively from the string. + +### Example +**Config** +```toml +[[processors.strings]] + [[processors.strings.lowercase]] + field = "uri-stem" + + [[processors.strings.trim_prefix]] + field = "uri_stem" + prefix = "/api/" + + [[processors.strings.uppercase]] + field = "cs-host" + dest = "cs-host_normalised" +``` + +**Input** +``` +iis_log,method=get,uri_stem=/API/HealthCheck cs-host="MIXEDCASE_host",referrer="-",ident="-",http_version=1.1,agent="UserAgent",resp_bytes=270i 1519652321000000000 +``` + +**Output** +``` +iis_log,method=get,uri_stem=healthcheck cs-host="MIXEDCASE_host",cs-host_normalised="MIXEDCASE_HOST",referrer="-",ident="-",http_version=1.1,agent="UserAgent",resp_bytes=270i 1519652321000000000 +``` diff --git a/plugins/processors/strings/strings.go b/plugins/processors/strings/strings.go new file mode 100644 index 0000000000000..8e68dbc52355f --- /dev/null +++ b/plugins/processors/strings/strings.go @@ -0,0 +1,199 @@ +package strings + +import ( + "strings" + "unicode" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" +) + +type Strings struct { + Lowercase []converter `toml:"lowercase"` + Uppercase []converter `toml:"uppercase"` + Trim []converter `toml:"trim"` + TrimLeft []converter `toml:"trim_left"` + TrimRight []converter `toml:"trim_right"` + TrimPrefix []converter `toml:"trim_prefix"` + TrimSuffix []converter `toml:"trim_suffix"` + + converters []converter + init bool +} + +type ConvertFunc func(s string) string + +type converter struct { + Field string + Tag string + Measurement string + Dest string + Cutset string + Suffix string + Prefix string + + fn ConvertFunc +} + +const sampleConfig = ` + ## Convert a tag value to uppercase + # [[processors.strings.uppercase]] + # tag = "method" + + ## Convert a field value to lowercase and store in a new field + # [[processors.strings.lowercase]] + # field = "uri_stem" + # dest = "uri_stem_normalised" + + ## Trim leading and trailing whitespace using the default cutset + # [[processors.strings.trim]] + # field = "message" + + ## Trim leading characters in cutset + # [[processors.strings.trim_left]] + # field = "message" + # cutset = "\t" + + ## Trim trailing characters in cutset + # [[processors.strings.trim_right]] + # field = "message" + # cutset = "\r\n" + + ## Trim the given prefix from the field + # [[processors.strings.trim_prefix]] + # field = "my_value" + # prefix = "my_" + + ## Trim the given suffix from the field + # [[processors.strings.trim_suffix]] + # field = "read_count" + # suffix = "_count" +` + +func (s *Strings) SampleConfig() string { + return sampleConfig +} + +func (s *Strings) Description() string { + return "Perform string processing on tags, fields, and measurements" +} + +func (c *converter) convertTag(metric telegraf.Metric) { + tv, ok := metric.GetTag(c.Tag) + if !ok { + return + } + + dest := c.Tag + if c.Dest != "" { + dest = c.Dest + } + + metric.AddTag(dest, c.fn(tv)) +} + +func (c *converter) convertField(metric telegraf.Metric) { + fv, ok := metric.GetField(c.Field) + if !ok { + return + } + + dest := c.Field + if c.Dest != "" { + dest = c.Dest + } + + if fv, ok := fv.(string); ok { + metric.AddField(dest, c.fn(fv)) + } +} + +func (c *converter) convertMeasurement(metric telegraf.Metric) { + if metric.Name() != c.Measurement { + return + } + + metric.SetName(c.fn(metric.Name())) +} + +func (c *converter) convert(metric telegraf.Metric) { + if c.Field != "" { + c.convertField(metric) + } + + if c.Tag != "" { + c.convertTag(metric) + } + + if c.Measurement != "" { + c.convertMeasurement(metric) + } +} + +func (s *Strings) initOnce() { + if s.init { + return + } + + s.converters = make([]converter, 0) + for _, c := range s.Lowercase { + c.fn = strings.ToLower + s.converters = append(s.converters, c) + } + for _, c := range s.Uppercase { + c.fn = strings.ToUpper + s.converters = append(s.converters, c) + } + for _, c := range s.Trim { + if c.Cutset != "" { + c.fn = func(s string) string { return strings.Trim(s, c.Cutset) } + } else { + c.fn = func(s string) string { return strings.TrimFunc(s, unicode.IsSpace) } + } + s.converters = append(s.converters, c) + } + for _, c := range s.TrimLeft { + if c.Cutset != "" { + c.fn = func(s string) string { return strings.TrimLeft(s, c.Cutset) } + } else { + c.fn = func(s string) string { return strings.TrimLeftFunc(s, unicode.IsSpace) } + } + s.converters = append(s.converters, c) + } + for _, c := range s.TrimRight { + if c.Cutset != "" { + c.fn = func(s string) string { return strings.TrimRight(s, c.Cutset) } + } else { + c.fn = func(s string) string { return strings.TrimRightFunc(s, unicode.IsSpace) } + } + s.converters = append(s.converters, c) + } + for _, c := range s.TrimPrefix { + c.fn = func(s string) string { return strings.TrimPrefix(s, c.Prefix) } + s.converters = append(s.converters, c) + } + for _, c := range s.TrimSuffix { + c.fn = func(s string) string { return strings.TrimSuffix(s, c.Suffix) } + s.converters = append(s.converters, c) + } + + s.init = true +} + +func (s *Strings) Apply(in ...telegraf.Metric) []telegraf.Metric { + s.initOnce() + + for _, metric := range in { + for _, converter := range s.converters { + converter.convert(metric) + } + } + + return in +} + +func init() { + processors.Add("strings", func() telegraf.Processor { + return &Strings{} + }) +} diff --git a/plugins/processors/strings/strings_test.go b/plugins/processors/strings/strings_test.go new file mode 100644 index 0000000000000..2097ac5a836d9 --- /dev/null +++ b/plugins/processors/strings/strings_test.go @@ -0,0 +1,483 @@ +package strings + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func newM1() telegraf.Metric { + m1, _ := metric.New("IIS_log", + map[string]string{ + "verb": "GET", + "s-computername": "MIXEDCASE_hostname", + }, + map[string]interface{}{ + "request": "/mixed/CASE/paTH/?from=-1D&to=now", + "whitespace": " whitespace\t", + }, + time.Now(), + ) + return m1 +} + +func newM2() telegraf.Metric { + m2, _ := metric.New("IIS_log", + map[string]string{ + "verb": "GET", + "resp_code": "200", + "s-computername": "MIXEDCASE_hostname", + }, + map[string]interface{}{ + "request": "/mixed/CASE/paTH/?from=-1D&to=now", + "cs-host": "AAAbbb", + "ignore_number": int64(200), + "ignore_bool": true, + }, + time.Now(), + ) + return m2 +} + +func TestFieldConversions(t *testing.T) { + tests := []struct { + name string + plugin *Strings + check func(t *testing.T, actual telegraf.Metric) + }{ + { + name: "Should change existing field to lowercase", + plugin: &Strings{ + Lowercase: []converter{ + converter{ + Field: "request", + }, + }, + }, + check: func(t *testing.T, actual telegraf.Metric) { + fv, ok := actual.GetField("request") + require.True(t, ok) + require.Equal(t, "/mixed/case/path/?from=-1d&to=now", fv) + }, + }, + { + name: "Should change existing field to uppercase", + plugin: &Strings{ + Uppercase: []converter{ + converter{ + Field: "request", + }, + }, + }, + check: func(t *testing.T, actual telegraf.Metric) { + fv, ok := actual.GetField("request") + require.True(t, ok) + require.Equal(t, "/MIXED/CASE/PATH/?FROM=-1D&TO=NOW", fv) + }, + }, + { + name: "Should add new lowercase field", + plugin: &Strings{ + Lowercase: []converter{ + converter{ + Field: "request", + Dest: "lowercase_request", + }, + }, + }, + check: func(t *testing.T, actual telegraf.Metric) { + fv, ok := actual.GetField("request") + require.True(t, ok) + require.Equal(t, "/mixed/CASE/paTH/?from=-1D&to=now", fv) + + fv, ok = actual.GetField("lowercase_request") + require.True(t, ok) + require.Equal(t, "/mixed/case/path/?from=-1d&to=now", fv) + }, + }, + { + name: "Should trim from both sides", + plugin: &Strings{ + Trim: []converter{ + converter{ + Field: "request", + Cutset: "/w", + }, + }, + }, + check: func(t *testing.T, actual telegraf.Metric) { + fv, ok := actual.GetField("request") + require.True(t, ok) + require.Equal(t, "mixed/CASE/paTH/?from=-1D&to=no", fv) + }, + }, + { + name: "Should trim from both sides and make lowercase", + plugin: &Strings{ + Trim: []converter{ + converter{ + Field: "request", + Cutset: "/w", + }, + }, + Lowercase: []converter{ + converter{ + Field: "request", + }, + }, + }, + check: func(t *testing.T, actual telegraf.Metric) { + fv, ok := actual.GetField("request") + require.True(t, ok) + require.Equal(t, "mixed/case/path/?from=-1d&to=no", fv) + }, + }, + { + name: "Should trim from left side", + plugin: &Strings{ + TrimLeft: []converter{ + converter{ + Field: "request", + Cutset: "/w", + }, + }, + }, + check: func(t *testing.T, actual telegraf.Metric) { + fv, ok := actual.GetField("request") + require.True(t, ok) + require.Equal(t, "mixed/CASE/paTH/?from=-1D&to=now", fv) + }, + }, + { + name: "Should trim from right side", + plugin: &Strings{ + TrimRight: []converter{ + converter{ + Field: "request", + Cutset: "/w", + }, + }, + }, + check: func(t *testing.T, actual telegraf.Metric) { + fv, ok := actual.GetField("request") + require.True(t, ok) + require.Equal(t, "/mixed/CASE/paTH/?from=-1D&to=no", fv) + }, + }, + { + name: "Should trim prefix '/mixed'", + plugin: &Strings{ + TrimPrefix: []converter{ + converter{ + Field: "request", + Prefix: "/mixed", + }, + }, + }, + check: func(t *testing.T, actual telegraf.Metric) { + fv, ok := actual.GetField("request") + require.True(t, ok) + require.Equal(t, "/CASE/paTH/?from=-1D&to=now", fv) + }, + }, + { + name: "Should trim suffix '-1D&to=now'", + plugin: &Strings{ + TrimSuffix: []converter{ + converter{ + Field: "request", + Suffix: "-1D&to=now", + }, + }, + }, + check: func(t *testing.T, actual telegraf.Metric) { + fv, ok := actual.GetField("request") + require.True(t, ok) + require.Equal(t, "/mixed/CASE/paTH/?from=", fv) + }, + }, + { + name: "Trim without cutset removes whitespace", + plugin: &Strings{ + Trim: []converter{ + converter{ + Field: "whitespace", + }, + }, + }, + check: func(t *testing.T, actual telegraf.Metric) { + fv, ok := actual.GetField("whitespace") + require.True(t, ok) + require.Equal(t, "whitespace", fv) + }, + }, + { + name: "Trim left without cutset removes whitespace", + plugin: &Strings{ + TrimLeft: []converter{ + converter{ + Field: "whitespace", + }, + }, + }, + check: func(t *testing.T, actual telegraf.Metric) { + fv, ok := actual.GetField("whitespace") + require.True(t, ok) + require.Equal(t, "whitespace\t", fv) + }, + }, + { + name: "Trim right without cutset removes whitespace", + plugin: &Strings{ + TrimRight: []converter{ + converter{ + Field: "whitespace", + }, + }, + }, + check: func(t *testing.T, actual telegraf.Metric) { + fv, ok := actual.GetField("whitespace") + require.True(t, ok) + require.Equal(t, " whitespace", fv) + }, + }, + { + name: "No change if field missing", + plugin: &Strings{ + Lowercase: []converter{ + converter{ + Field: "xyzzy", + Suffix: "-1D&to=now", + }, + }, + }, + check: func(t *testing.T, actual telegraf.Metric) { + fv, ok := actual.GetField("request") + require.True(t, ok) + require.Equal(t, "/mixed/CASE/paTH/?from=-1D&to=now", fv) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metrics := tt.plugin.Apply(newM1()) + require.Len(t, metrics, 1) + tt.check(t, metrics[0]) + }) + } +} + +func TestTagConversions(t *testing.T) { + tests := []struct { + name string + plugin *Strings + check func(t *testing.T, actual telegraf.Metric) + }{ + { + name: "Should change existing tag to lowercase", + plugin: &Strings{ + Lowercase: []converter{ + converter{ + Tag: "s-computername", + }, + }, + }, + check: func(t *testing.T, actual telegraf.Metric) { + tv, ok := actual.GetTag("verb") + require.True(t, ok) + require.Equal(t, "GET", tv) + + tv, ok = actual.GetTag("s-computername") + require.True(t, ok) + require.Equal(t, "mixedcase_hostname", tv) + }, + }, + { + name: "Should add new lowercase tag", + plugin: &Strings{ + Lowercase: []converter{ + converter{ + Tag: "s-computername", + Dest: "s-computername_lowercase", + }, + }, + }, + check: func(t *testing.T, actual telegraf.Metric) { + tv, ok := actual.GetTag("verb") + require.True(t, ok) + require.Equal(t, "GET", tv) + + tv, ok = actual.GetTag("s-computername") + require.True(t, ok) + require.Equal(t, "MIXEDCASE_hostname", tv) + + tv, ok = actual.GetTag("s-computername_lowercase") + require.True(t, ok) + require.Equal(t, "mixedcase_hostname", tv) + }, + }, + { + name: "Should add new uppercase tag", + plugin: &Strings{ + Uppercase: []converter{ + converter{ + Tag: "s-computername", + Dest: "s-computername_uppercase", + }, + }, + }, + check: func(t *testing.T, actual telegraf.Metric) { + tv, ok := actual.GetTag("verb") + require.True(t, ok) + require.Equal(t, "GET", tv) + + tv, ok = actual.GetTag("s-computername") + require.True(t, ok) + require.Equal(t, "MIXEDCASE_hostname", tv) + + tv, ok = actual.GetTag("s-computername_uppercase") + require.True(t, ok) + require.Equal(t, "MIXEDCASE_HOSTNAME", tv) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metrics := tt.plugin.Apply(newM1()) + require.Len(t, metrics, 1) + tt.check(t, metrics[0]) + }) + } +} + +func TestMeasurementConversions(t *testing.T) { + tests := []struct { + name string + plugin *Strings + check func(t *testing.T, actual telegraf.Metric) + }{ + { + name: "lowercase measurement", + plugin: &Strings{ + Lowercase: []converter{ + converter{ + Measurement: "IIS_log", + }, + }, + }, + check: func(t *testing.T, actual telegraf.Metric) { + name := actual.Name() + require.Equal(t, "iis_log", name) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metrics := tt.plugin.Apply(newM1()) + require.Len(t, metrics, 1) + tt.check(t, metrics[0]) + }) + } +} + +func TestMultipleConversions(t *testing.T) { + plugin := &Strings{ + Lowercase: []converter{ + converter{ + Tag: "s-computername", + }, + converter{ + Field: "request", + }, + converter{ + Field: "cs-host", + Dest: "cs-host_lowercase", + }, + }, + Uppercase: []converter{ + converter{ + Tag: "verb", + }, + }, + } + + processed := plugin.Apply(newM2()) + + expectedFields := map[string]interface{}{ + "request": "/mixed/case/path/?from=-1d&to=now", + "ignore_number": int64(200), + "ignore_bool": true, + "cs-host": "AAAbbb", + "cs-host_lowercase": "aaabbb", + } + expectedTags := map[string]string{ + "verb": "GET", + "resp_code": "200", + "s-computername": "mixedcase_hostname", + } + + assert.Equal(t, expectedFields, processed[0].Fields()) + assert.Equal(t, expectedTags, processed[0].Tags()) +} + +func TestReadmeExample(t *testing.T) { + plugin := &Strings{ + Lowercase: []converter{ + converter{ + Tag: "uri_stem", + }, + }, + TrimPrefix: []converter{ + converter{ + Tag: "uri_stem", + Prefix: "/api/", + }, + }, + Uppercase: []converter{ + converter{ + Field: "cs-host", + Dest: "cs-host_normalised", + }, + }, + } + + m, _ := metric.New("iis_log", + map[string]string{ + "verb": "get", + "uri_stem": "/API/HealthCheck", + }, + map[string]interface{}{ + "cs-host": "MIXEDCASE_host", + "referrer": "-", + "ident": "-", + "http_version": "1.1", + "agent": "UserAgent", + "resp_bytes": int64(270), + }, + time.Now(), + ) + + processed := plugin.Apply(m) + + expectedTags := map[string]string{ + "verb": "get", + "uri_stem": "healthcheck", + } + expectedFields := map[string]interface{}{ + "cs-host": "MIXEDCASE_host", + "cs-host_normalised": "MIXEDCASE_HOST", + "referrer": "-", + "ident": "-", + "http_version": "1.1", + "agent": "UserAgent", + "resp_bytes": int64(270), + } + + assert.Equal(t, expectedFields, processed[0].Fields()) + assert.Equal(t, expectedTags, processed[0].Tags()) +}