From d585bb4591425dae43320a16ba9aa3099d51e37c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20Ma=C5=82ek?= Date: Mon, 7 Sep 2020 17:42:08 +0200 Subject: [PATCH 1/3] Carbon2 configuration option - include field in metric name --- config/config.go | 9 + plugins/serializers/carbon2/README.md | 45 ++++- plugins/serializers/carbon2/carbon2.go | 104 +++++++--- plugins/serializers/carbon2/carbon2_test.go | 201 ++++++++++++++++---- plugins/serializers/registry.go | 9 +- 5 files changed, 290 insertions(+), 78 deletions(-) diff --git a/config/config.go b/config/config.go index 9561ddf3a2d95..bc8691dac313b 100644 --- a/config/config.go +++ b/config/config.go @@ -1933,6 +1933,14 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error } } + if node, ok := tbl.Fields["carbon2_format"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.Carbon2Format = str.Value + } + } + } + if node, ok := tbl.Fields["influx_max_line_bytes"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if integer, ok := kv.Value.(*ast.Integer); ok { @@ -2089,6 +2097,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error } } + delete(tbl.Fields, "carbon2_format") delete(tbl.Fields, "influx_max_line_bytes") delete(tbl.Fields, "influx_sort_fields") delete(tbl.Fields, "influx_uint_support") diff --git a/plugins/serializers/carbon2/README.md b/plugins/serializers/carbon2/README.md index e88b18cf079d6..f8feaf2346b8e 100644 --- a/plugins/serializers/carbon2/README.md +++ b/plugins/serializers/carbon2/README.md @@ -2,7 +2,7 @@ The `carbon2` serializer translates the Telegraf metric format to the [Carbon2 format](http://metrics20.org/implementations/). -### Configuration +## Configuration ```toml [[outputs.file]] @@ -14,20 +14,51 @@ The `carbon2` serializer translates the Telegraf metric format to the [Carbon2 f ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "carbon2" + + ## Optionally configure metrics format, whether to merge metric name and field name. + ## Possible options: + ## * "field_separate" + ## * "metric_includes_field" + ## * "" - defaults to "field_separate" + carbon2_format = "" ``` Standard form: + ``` metric=name field=field_1 host=foo 30 1234567890 metric=name field=field_2 host=foo 4 1234567890 metric=name field=field_N host=foo 59 1234567890 ``` -### Metrics +### Metrics format + +`Carbon2` serializer has a configuration option - `carbon2_format` - to change how +metrics names are being constructed. -The serializer converts the metrics by creating `intrinsic_tags` using the combination of metric name and fields. So, if one Telegraf metric has 4 fields, the `carbon2` output will be 4 separate metrics. There will be a `metric` tag that represents the name of the metric and a `field` tag to represent the field. +By default `metric` will only inclue the metric name and a separate field `field` +will contain the field name. +This is the behavior of `carbon2_format = "field_separate"` which is the default +behavior (even if unspecified). -### Example +Optionally user can opt in to change this to make the metric inclue the field name +after the `_`. +This is the behavior of `carbon2_format = "metric_includes_field"` which would +make the above example look like: + +``` +metric=name_field_1 host=foo 30 1234567890 +metric=name_field_2 host=foo 4 1234567890 +metric=name_field_N host=foo 59 1234567890 +``` + +## Metrics + +The serializer converts the metrics by creating `intrinsic_tags` using the combination of metric name and fields. +So, if one Telegraf metric has 4 fields, the `carbon2` output will be 4 separate metrics. +There will be a `metric` tag that represents the name of the metric and a `field` tag to represent the field. + +## Example If we take the following InfluxDB Line Protocol: @@ -42,8 +73,10 @@ metric=weather field=temperature location=us-midwest season=summer 82 123456789 metric=weather field=wind location=us-midwest season=summer 100 1234567890 ``` -### Fields and Tags with spaces +## Fields and Tags with spaces + When a field key or tag key/value have spaces, spaces will be replaced with `_`. -### Tags with empty values +## Tags with empty values + When a tag's value is empty, it will be replaced with `null` diff --git a/plugins/serializers/carbon2/carbon2.go b/plugins/serializers/carbon2/carbon2.go index fc11de0624760..10611815b8a7e 100644 --- a/plugins/serializers/carbon2/carbon2.go +++ b/plugins/serializers/carbon2/carbon2.go @@ -3,24 +3,52 @@ package carbon2 import ( "bytes" "fmt" - "github.com/influxdata/telegraf" "strconv" "strings" + + "github.com/influxdata/telegraf" ) -type serializer struct { +type format string + +const ( + Carbon2FormatFieldSeparate string = "field_separate" + Carbon2FormatMetricIncludesField string = "metric_includes_field" + + formatFieldSeparate = format(Carbon2FormatFieldSeparate) + formatMetricIncludesField = format(Carbon2FormatMetricIncludesField) +) + +var formats = map[string]format{ + // Field separate is the default when no format specified. + "": formatFieldSeparate, + Carbon2FormatFieldSeparate: formatFieldSeparate, + Carbon2FormatMetricIncludesField: formatMetricIncludesField, +} + +type Serializer struct { + metricsFormat format } -func NewSerializer() (*serializer, error) { - s := &serializer{} - return s, nil +func NewSerializer(f string) (*Serializer, error) { + var ( + ok bool + metricsFormat format + ) + if metricsFormat, ok = formats[f]; !ok { + return nil, fmt.Errorf("unknown carbon2 format: %s", f) + } + + return &Serializer{ + metricsFormat: metricsFormat, + }, nil } -func (s *serializer) Serialize(metric telegraf.Metric) ([]byte, error) { +func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) { return s.createObject(metric), nil } -func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { +func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { var batch bytes.Buffer for _, metric := range metrics { batch.Write(s.createObject(metric)) @@ -28,35 +56,57 @@ func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { return batch.Bytes(), nil } -func (s *serializer) createObject(metric telegraf.Metric) []byte { +func (s *Serializer) createObject(metric telegraf.Metric) []byte { var m bytes.Buffer for fieldName, fieldValue := range metric.Fields() { - if isNumeric(fieldValue) { - m.WriteString("metric=") - m.WriteString(strings.Replace(metric.Name(), " ", "_", -1)) - m.WriteString(" field=") - m.WriteString(strings.Replace(fieldName, " ", "_", -1)) - m.WriteString(" ") - for _, tag := range metric.TagList() { - m.WriteString(strings.Replace(tag.Key, " ", "_", -1)) - m.WriteString("=") - value := tag.Value - if len(value) == 0 { - value = "null" - } - m.WriteString(strings.Replace(value, " ", "_", -1)) - m.WriteString(" ") + if !isNumeric(fieldValue) { + continue + } + + switch s.metricsFormat { + case formatFieldSeparate: + m.WriteString(serializeMetricFieldSeparate( + metric.Name(), fieldName, + )) + case formatMetricIncludesField: + m.WriteString(serializeMetricIncludeField( + metric.Name(), fieldName, + )) + } + + for _, tag := range metric.TagList() { + m.WriteString(strings.Replace(tag.Key, " ", "_", -1)) + m.WriteString("=") + value := tag.Value + if len(value) == 0 { + value = "null" } + m.WriteString(strings.Replace(value, " ", "_", -1)) m.WriteString(" ") - m.WriteString(fmt.Sprintf("%v", fieldValue)) - m.WriteString(" ") - m.WriteString(strconv.FormatInt(metric.Time().Unix(), 10)) - m.WriteString("\n") } + m.WriteString(" ") + m.WriteString(fmt.Sprintf("%v", fieldValue)) + m.WriteString(" ") + m.WriteString(strconv.FormatInt(metric.Time().Unix(), 10)) + m.WriteString("\n") } return m.Bytes() } +func serializeMetricFieldSeparate(name, fieldName string) string { + return fmt.Sprintf("metric=%s field=%s ", + strings.Replace(name, " ", "_", -1), + strings.Replace(fieldName, " ", "_", -1), + ) +} + +func serializeMetricIncludeField(name, fieldName string) string { + return fmt.Sprintf("metric=%s_%s ", + strings.Replace(name, " ", "_", -1), + strings.Replace(fieldName, " ", "_", -1), + ) +} + func isNumeric(v interface{}) bool { switch v.(type) { case string: diff --git a/plugins/serializers/carbon2/carbon2_test.go b/plugins/serializers/carbon2/carbon2_test.go index f335342d54b18..aadc55f7ede96 100644 --- a/plugins/serializers/carbon2/carbon2_test.go +++ b/plugins/serializers/carbon2/carbon2_test.go @@ -2,13 +2,14 @@ package carbon2 import ( "fmt" - "github.com/stretchr/testify/require" "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" - "github.com/stretchr/testify/assert" ) func MustMetric(v telegraf.Metric, err error) telegraf.Metric { @@ -27,14 +28,33 @@ func TestSerializeMetricFloat(t *testing.T) { "usage_idle": float64(91.5), } m, err := metric.New("cpu", tags, fields, now) - assert.NoError(t, err) + require.NoError(t, err) - s, _ := NewSerializer() - var buf []byte - buf, err = s.Serialize(m) - assert.NoError(t, err) - expS := []byte(fmt.Sprintf(`metric=cpu field=usage_idle cpu=cpu0 91.5 %d`, now.Unix()) + "\n") - assert.Equal(t, string(expS), string(buf)) + testcases := []struct { + format string + expected string + }{ + { + format: Carbon2FormatFieldSeparate, + expected: fmt.Sprintf("metric=cpu field=usage_idle cpu=cpu0 91.5 %d\n", now.Unix()), + }, + { + format: Carbon2FormatMetricIncludesField, + expected: fmt.Sprintf("metric=cpu_usage_idle cpu=cpu0 91.5 %d\n", now.Unix()), + }, + } + + for _, tc := range testcases { + t.Run(tc.format, func(t *testing.T) { + s, err := NewSerializer(tc.format) + require.NoError(t, err) + + buf, err := s.Serialize(m) + require.NoError(t, err) + + assert.Equal(t, tc.expected, string(buf)) + }) + } } func TestSerializeMetricWithEmptyStringTag(t *testing.T) { @@ -46,14 +66,33 @@ func TestSerializeMetricWithEmptyStringTag(t *testing.T) { "usage_idle": float64(91.5), } m, err := metric.New("cpu", tags, fields, now) - assert.NoError(t, err) + require.NoError(t, err) - s, _ := NewSerializer() - var buf []byte - buf, err = s.Serialize(m) - assert.NoError(t, err) - expS := []byte(fmt.Sprintf(`metric=cpu field=usage_idle cpu=null 91.5 %d`, now.Unix()) + "\n") - assert.Equal(t, string(expS), string(buf)) + testcases := []struct { + format string + expected string + }{ + { + format: Carbon2FormatFieldSeparate, + expected: fmt.Sprintf("metric=cpu field=usage_idle cpu=null 91.5 %d\n", now.Unix()), + }, + { + format: Carbon2FormatMetricIncludesField, + expected: fmt.Sprintf("metric=cpu_usage_idle cpu=null 91.5 %d\n", now.Unix()), + }, + } + + for _, tc := range testcases { + t.Run(tc.format, func(t *testing.T) { + s, err := NewSerializer(tc.format) + require.NoError(t, err) + + buf, err := s.Serialize(m) + require.NoError(t, err) + + assert.Equal(t, tc.expected, string(buf)) + }) + } } func TestSerializeWithSpaces(t *testing.T) { @@ -65,14 +104,33 @@ func TestSerializeWithSpaces(t *testing.T) { "usage_idle 1": float64(91.5), } m, err := metric.New("cpu metric", tags, fields, now) - assert.NoError(t, err) + require.NoError(t, err) - s, _ := NewSerializer() - var buf []byte - buf, err = s.Serialize(m) - assert.NoError(t, err) - expS := []byte(fmt.Sprintf(`metric=cpu_metric field=usage_idle_1 cpu_0=cpu_0 91.5 %d`, now.Unix()) + "\n") - assert.Equal(t, string(expS), string(buf)) + testcases := []struct { + format string + expected string + }{ + { + format: Carbon2FormatFieldSeparate, + expected: fmt.Sprintf("metric=cpu_metric field=usage_idle_1 cpu_0=cpu_0 91.5 %d\n", now.Unix()), + }, + { + format: Carbon2FormatMetricIncludesField, + expected: fmt.Sprintf("metric=cpu_metric_usage_idle_1 cpu_0=cpu_0 91.5 %d\n", now.Unix()), + }, + } + + for _, tc := range testcases { + t.Run(tc.format, func(t *testing.T) { + s, err := NewSerializer(tc.format) + require.NoError(t, err) + + buf, err := s.Serialize(m) + require.NoError(t, err) + + assert.Equal(t, tc.expected, string(buf)) + }) + } } func TestSerializeMetricInt(t *testing.T) { @@ -84,15 +142,33 @@ func TestSerializeMetricInt(t *testing.T) { "usage_idle": int64(90), } m, err := metric.New("cpu", tags, fields, now) - assert.NoError(t, err) + require.NoError(t, err) - s, _ := NewSerializer() - var buf []byte - buf, err = s.Serialize(m) - assert.NoError(t, err) + testcases := []struct { + format string + expected string + }{ + { + format: Carbon2FormatFieldSeparate, + expected: fmt.Sprintf("metric=cpu field=usage_idle cpu=cpu0 90 %d\n", now.Unix()), + }, + { + format: Carbon2FormatMetricIncludesField, + expected: fmt.Sprintf("metric=cpu_usage_idle cpu=cpu0 90 %d\n", now.Unix()), + }, + } - expS := []byte(fmt.Sprintf(`metric=cpu field=usage_idle cpu=cpu0 90 %d`, now.Unix()) + "\n") - assert.Equal(t, string(expS), string(buf)) + for _, tc := range testcases { + t.Run(tc.format, func(t *testing.T) { + s, err := NewSerializer(tc.format) + require.NoError(t, err) + + buf, err := s.Serialize(m) + require.NoError(t, err) + + assert.Equal(t, tc.expected, string(buf)) + }) + } } func TestSerializeMetricString(t *testing.T) { @@ -106,13 +182,31 @@ func TestSerializeMetricString(t *testing.T) { m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) - s, _ := NewSerializer() - var buf []byte - buf, err = s.Serialize(m) - assert.NoError(t, err) + testcases := []struct { + format string + expected string + }{ + { + format: Carbon2FormatFieldSeparate, + expected: "", + }, + { + format: Carbon2FormatMetricIncludesField, + expected: "", + }, + } + + for _, tc := range testcases { + t.Run(tc.format, func(t *testing.T) { + s, err := NewSerializer(tc.format) + require.NoError(t, err) - expS := []byte("") - assert.Equal(t, string(expS), string(buf)) + buf, err := s.Serialize(m) + require.NoError(t, err) + + assert.Equal(t, tc.expected, string(buf)) + }) + } } func TestSerializeBatch(t *testing.T) { @@ -128,11 +222,34 @@ func TestSerializeBatch(t *testing.T) { ) metrics := []telegraf.Metric{m, m} - s, _ := NewSerializer() - buf, err := s.SerializeBatch(metrics) - require.NoError(t, err) - expS := []byte(`metric=cpu field=value 42 0 + + testcases := []struct { + format string + expected string + }{ + { + format: Carbon2FormatFieldSeparate, + expected: `metric=cpu field=value 42 0 metric=cpu field=value 42 0 -`) - assert.Equal(t, string(expS), string(buf)) +`, + }, + { + format: Carbon2FormatMetricIncludesField, + expected: `metric=cpu_value 42 0 +metric=cpu_value 42 0 +`, + }, + } + + for _, tc := range testcases { + t.Run(tc.format, func(t *testing.T) { + s, err := NewSerializer(tc.format) + require.NoError(t, err) + + buf, err := s.SerializeBatch(metrics) + require.NoError(t, err) + + assert.Equal(t, tc.expected, string(buf)) + }) + } } diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index e5065a93c3c4b..b12ef7660b981 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -48,6 +48,9 @@ type Config struct { // Dataformat can be one of the serializer types listed in NewSerializer. DataFormat string `toml:"data_format"` + // Carbon2 metric format. + Carbon2Format string `toml:"carbon2_format"` + // Support tags in graphite protocol GraphiteTagSupport bool `toml:"graphite_tag_support"` @@ -118,7 +121,7 @@ func NewSerializer(config *Config) (Serializer, error) { case "nowmetric": serializer, err = NewNowSerializer() case "carbon2": - serializer, err = NewCarbon2Serializer() + serializer, err = NewCarbon2Serializer(config.Carbon2Format) case "wavefront": serializer, err = NewWavefrontSerializer(config.Prefix, config.WavefrontUseStrict, config.WavefrontSourceOverride) case "prometheus": @@ -160,8 +163,8 @@ func NewJsonSerializer(timestampUnits time.Duration) (Serializer, error) { return json.NewSerializer(timestampUnits) } -func NewCarbon2Serializer() (Serializer, error) { - return carbon2.NewSerializer() +func NewCarbon2Serializer(carbon2format string) (Serializer, error) { + return carbon2.NewSerializer(carbon2format) } func NewSplunkmetricSerializer(splunkmetric_hec_routing bool, splunkmetric_multimetric bool) (Serializer, error) { From 01dd3ba1696de7321edb54dd10eb99426d578082 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20Ma=C5=82ek?= Date: Wed, 9 Sep 2020 18:14:47 +0200 Subject: [PATCH 2/3] Update Carbon2 README with default value for carbon2_format --- plugins/serializers/carbon2/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/serializers/carbon2/README.md b/plugins/serializers/carbon2/README.md index f8feaf2346b8e..e32a420aec0af 100644 --- a/plugins/serializers/carbon2/README.md +++ b/plugins/serializers/carbon2/README.md @@ -20,7 +20,7 @@ The `carbon2` serializer translates the Telegraf metric format to the [Carbon2 f ## * "field_separate" ## * "metric_includes_field" ## * "" - defaults to "field_separate" - carbon2_format = "" + # carbon2_format = "field_separate" ``` Standard form: From ce32abbc3122c7f72022a06904d5acdaa28e4590 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20Ma=C5=82ek?= Date: Thu, 10 Sep 2020 17:41:48 +0200 Subject: [PATCH 3/3] Fix sumologic tests --- plugins/outputs/sumologic/sumologic_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/plugins/outputs/sumologic/sumologic_test.go b/plugins/outputs/sumologic/sumologic_test.go index 603ecf73c5c68..23db47c5b5c88 100644 --- a/plugins/outputs/sumologic/sumologic_test.go +++ b/plugins/outputs/sumologic/sumologic_test.go @@ -133,7 +133,7 @@ func TestMethod(t *testing.T) { w.WriteHeader(http.StatusOK) }) - serializer, err := carbon2.NewSerializer() + serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate) require.NoError(t, err) plugin := tt.plugin() @@ -210,7 +210,7 @@ func TestStatusCode(t *testing.T) { w.WriteHeader(tt.statusCode) }) - serializer, err := carbon2.NewSerializer() + serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate) require.NoError(t, err) tt.plugin.SetSerializer(serializer) @@ -234,7 +234,7 @@ func TestContentType(t *testing.T) { u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) require.NoError(t, err) - carbon2Serializer, err := carbon2.NewSerializer() + carbon2Serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate) require.NoError(t, err) tests := []struct { @@ -336,7 +336,7 @@ func TestContentEncodingGzip(t *testing.T) { w.WriteHeader(http.StatusNoContent) }) - serializer, err := carbon2.NewSerializer() + serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate) require.NoError(t, err) plugin := tt.plugin() @@ -372,7 +372,7 @@ func TestDefaultUserAgent(t *testing.T) { MaxRequstBodySize: Default().MaxRequstBodySize, } - serializer, err := carbon2.NewSerializer() + serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate) require.NoError(t, err) plugin.SetSerializer(serializer) @@ -555,7 +555,7 @@ func TestMaxRequestBodySize(t *testing.T) { w.WriteHeader(http.StatusOK) }) - serializer, err := carbon2.NewSerializer() + serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate) require.NoError(t, err) plugin := tt.plugin()