From cf500d1edf58a3ee1ad8b27a878e324bde314866 Mon Sep 17 00:00:00 2001 From: Frank Reno Date: Fri, 25 Jan 2019 19:06:08 -0700 Subject: [PATCH] Add carbon2 serializer (#5345) --- README.md | 1 + docs/DATA_FORMATS_OUTPUT.md | 1 + plugins/serializers/carbon2/README.md | 49 +++++++ plugins/serializers/carbon2/carbon2.go | 67 ++++++++++ plugins/serializers/carbon2/carbon2_test.go | 138 ++++++++++++++++++++ plugins/serializers/registry.go | 7 + 6 files changed, 263 insertions(+) create mode 100644 plugins/serializers/carbon2/README.md create mode 100644 plugins/serializers/carbon2/carbon2.go create mode 100644 plugins/serializers/carbon2/carbon2_test.go diff --git a/README.md b/README.md index 74dbc8ae508ea..cf994dc2079bf 100644 --- a/README.md +++ b/README.md @@ -303,6 +303,7 @@ For documentation on the latest development code see the [documentation index][d - [Graphite](/plugins/serializers/graphite) - [ServiceNow](/plugins/serializers/nowmetric) - [SplunkMetric](/plugins/serializers/splunkmetric) +- [Carbon2](/plugins/serializers/carbon2) ## Processor Plugins diff --git a/docs/DATA_FORMATS_OUTPUT.md b/docs/DATA_FORMATS_OUTPUT.md index c06ab47196062..3ee16524d2905 100644 --- a/docs/DATA_FORMATS_OUTPUT.md +++ b/docs/DATA_FORMATS_OUTPUT.md @@ -8,6 +8,7 @@ plugins. 1. [JSON](/plugins/serializers/json) 1. [Graphite](/plugins/serializers/graphite) 1. [SplunkMetric](/plugins/serializers/splunkmetric) +1. [Carbon2](/plugins/serializers/carbon2) You will be able to identify the plugins with support by the presence of a `data_format` config option, for example, in the `file` output plugin: diff --git a/plugins/serializers/carbon2/README.md b/plugins/serializers/carbon2/README.md new file mode 100644 index 0000000000000..e88b18cf079d6 --- /dev/null +++ b/plugins/serializers/carbon2/README.md @@ -0,0 +1,49 @@ +# Carbon2 + +The `carbon2` serializer translates the Telegraf metric format to the [Carbon2 format](http://metrics20.org/implementations/). + +### Configuration + +```toml +[[outputs.file]] + ## Files to write to, "stdout" is a specially handled file. + files = ["stdout", "/tmp/metrics.out"] + + ## Data format to output. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "carbon2" +``` + +Standard form: +``` +metric=name field=field_1 host=foo 30 1234567890 +metric=name field=field_2 host=foo 4 1234567890 +metric=name field=field_N host=foo 59 1234567890 +``` + +### Metrics + +The serializer converts the metrics by creating `intrinsic_tags` using the combination of metric name and fields. So, if one Telegraf metric has 4 fields, the `carbon2` output will be 4 separate metrics. There will be a `metric` tag that represents the name of the metric and a `field` tag to represent the field. + +### Example + +If we take the following InfluxDB Line Protocol: + +``` +weather,location=us-midwest,season=summer temperature=82,wind=100 1234567890 +``` + +after serializing in Carbon2, the result would be: + +``` +metric=weather field=temperature location=us-midwest season=summer 82 1234567890 +metric=weather field=wind location=us-midwest season=summer 100 1234567890 +``` + +### Fields and Tags with spaces +When a field key or tag key/value have spaces, spaces will be replaced with `_`. + +### Tags with empty values +When a tag's value is empty, it will be replaced with `null` diff --git a/plugins/serializers/carbon2/carbon2.go b/plugins/serializers/carbon2/carbon2.go new file mode 100644 index 0000000000000..fc11de0624760 --- /dev/null +++ b/plugins/serializers/carbon2/carbon2.go @@ -0,0 +1,67 @@ +package carbon2 + +import ( + "bytes" + "fmt" + "github.com/influxdata/telegraf" + "strconv" + "strings" +) + +type serializer struct { +} + +func NewSerializer() (*serializer, error) { + s := &serializer{} + return s, nil +} + +func (s *serializer) Serialize(metric telegraf.Metric) ([]byte, error) { + return s.createObject(metric), nil +} + +func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { + var batch bytes.Buffer + for _, metric := range metrics { + batch.Write(s.createObject(metric)) + } + return batch.Bytes(), nil +} + +func (s *serializer) createObject(metric telegraf.Metric) []byte { + var m bytes.Buffer + for fieldName, fieldValue := range metric.Fields() { + if isNumeric(fieldValue) { + m.WriteString("metric=") + m.WriteString(strings.Replace(metric.Name(), " ", "_", -1)) + m.WriteString(" field=") + m.WriteString(strings.Replace(fieldName, " ", "_", -1)) + m.WriteString(" ") + for _, tag := range metric.TagList() { + m.WriteString(strings.Replace(tag.Key, " ", "_", -1)) + m.WriteString("=") + value := tag.Value + if len(value) == 0 { + value = "null" + } + m.WriteString(strings.Replace(value, " ", "_", -1)) + m.WriteString(" ") + } + m.WriteString(" ") + m.WriteString(fmt.Sprintf("%v", fieldValue)) + m.WriteString(" ") + m.WriteString(strconv.FormatInt(metric.Time().Unix(), 10)) + m.WriteString("\n") + } + } + return m.Bytes() +} + +func isNumeric(v interface{}) bool { + switch v.(type) { + case string: + return false + default: + return true + } +} diff --git a/plugins/serializers/carbon2/carbon2_test.go b/plugins/serializers/carbon2/carbon2_test.go new file mode 100644 index 0000000000000..f335342d54b18 --- /dev/null +++ b/plugins/serializers/carbon2/carbon2_test.go @@ -0,0 +1,138 @@ +package carbon2 + +import ( + "fmt" + "github.com/stretchr/testify/require" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/stretchr/testify/assert" +) + +func MustMetric(v telegraf.Metric, err error) telegraf.Metric { + if err != nil { + panic(err) + } + return v +} + +func TestSerializeMetricFloat(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + } + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s, _ := NewSerializer() + var buf []byte + buf, err = s.Serialize(m) + assert.NoError(t, err) + expS := []byte(fmt.Sprintf(`metric=cpu field=usage_idle cpu=cpu0 91.5 %d`, now.Unix()) + "\n") + assert.Equal(t, string(expS), string(buf)) +} + +func TestSerializeMetricWithEmptyStringTag(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "", + } + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s, _ := NewSerializer() + var buf []byte + buf, err = s.Serialize(m) + assert.NoError(t, err) + expS := []byte(fmt.Sprintf(`metric=cpu field=usage_idle cpu=null 91.5 %d`, now.Unix()) + "\n") + assert.Equal(t, string(expS), string(buf)) +} + +func TestSerializeWithSpaces(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu 0": "cpu 0", + } + fields := map[string]interface{}{ + "usage_idle 1": float64(91.5), + } + m, err := metric.New("cpu metric", tags, fields, now) + assert.NoError(t, err) + + s, _ := NewSerializer() + var buf []byte + buf, err = s.Serialize(m) + assert.NoError(t, err) + expS := []byte(fmt.Sprintf(`metric=cpu_metric field=usage_idle_1 cpu_0=cpu_0 91.5 %d`, now.Unix()) + "\n") + assert.Equal(t, string(expS), string(buf)) +} + +func TestSerializeMetricInt(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + } + fields := map[string]interface{}{ + "usage_idle": int64(90), + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s, _ := NewSerializer() + var buf []byte + buf, err = s.Serialize(m) + assert.NoError(t, err) + + expS := []byte(fmt.Sprintf(`metric=cpu field=usage_idle cpu=cpu0 90 %d`, now.Unix()) + "\n") + assert.Equal(t, string(expS), string(buf)) +} + +func TestSerializeMetricString(t *testing.T) { + now := time.Now() + tags := map[string]string{ + "cpu": "cpu0", + } + fields := map[string]interface{}{ + "usage_idle": "foobar", + } + m, err := metric.New("cpu", tags, fields, now) + assert.NoError(t, err) + + s, _ := NewSerializer() + var buf []byte + buf, err = s.Serialize(m) + assert.NoError(t, err) + + expS := []byte("") + assert.Equal(t, string(expS), string(buf)) +} + +func TestSerializeBatch(t *testing.T) { + m := MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(0, 0), + ), + ) + + metrics := []telegraf.Metric{m, m} + s, _ := NewSerializer() + buf, err := s.SerializeBatch(metrics) + require.NoError(t, err) + expS := []byte(`metric=cpu field=value 42 0 +metric=cpu field=value 42 0 +`) + assert.Equal(t, string(expS), string(buf)) +} diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index 9ca2f42e7c67b..cbc5981a689d8 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/serializers/carbon2" "github.com/influxdata/telegraf/plugins/serializers/graphite" "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/plugins/serializers/json" @@ -82,6 +83,8 @@ func NewSerializer(config *Config) (Serializer, error) { serializer, err = NewSplunkmetricSerializer(config.HecRouting) case "nowmetric": serializer, err = NewNowSerializer() + case "carbon2": + serializer, err = NewCarbon2Serializer() default: err = fmt.Errorf("Invalid data format: %s", config.DataFormat) } @@ -92,6 +95,10 @@ func NewJsonSerializer(timestampUnits time.Duration) (Serializer, error) { return json.NewSerializer(timestampUnits) } +func NewCarbon2Serializer() (Serializer, error) { + return carbon2.NewSerializer() +} + func NewSplunkmetricSerializer(splunkmetric_hec_routing bool) (Serializer, error) { return splunkmetric.NewSerializer(splunkmetric_hec_routing) }