Skip to content

Commit

Permalink
Add carbon2 serializer (influxdata#5345)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankreno authored and otherpirate committed Mar 15, 2019
1 parent 1c69b7a commit cf500d1
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ For documentation on the latest development code see the [documentation index][d
- [Graphite](/plugins/serializers/graphite)
- [ServiceNow](/plugins/serializers/nowmetric)
- [SplunkMetric](/plugins/serializers/splunkmetric)
- [Carbon2](/plugins/serializers/carbon2)

## Processor Plugins

Expand Down
1 change: 1 addition & 0 deletions docs/DATA_FORMATS_OUTPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ plugins.
1. [JSON](/plugins/serializers/json)
1. [Graphite](/plugins/serializers/graphite)
1. [SplunkMetric](/plugins/serializers/splunkmetric)
1. [Carbon2](/plugins/serializers/carbon2)

You will be able to identify the plugins with support by the presence of a
`data_format` config option, for example, in the `file` output plugin:
Expand Down
49 changes: 49 additions & 0 deletions plugins/serializers/carbon2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Carbon2

The `carbon2` serializer translates the Telegraf metric format to the [Carbon2 format](http://metrics20.org/implementations/).

### Configuration

```toml
[[outputs.file]]
## Files to write to, "stdout" is a specially handled file.
files = ["stdout", "/tmp/metrics.out"]

## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "carbon2"
```

Standard form:
```
metric=name field=field_1 host=foo 30 1234567890
metric=name field=field_2 host=foo 4 1234567890
metric=name field=field_N host=foo 59 1234567890
```

### Metrics

The serializer converts the metrics by creating `intrinsic_tags` using the combination of metric name and fields. So, if one Telegraf metric has 4 fields, the `carbon2` output will be 4 separate metrics. There will be a `metric` tag that represents the name of the metric and a `field` tag to represent the field.

### Example

If we take the following InfluxDB Line Protocol:

```
weather,location=us-midwest,season=summer temperature=82,wind=100 1234567890
```

after serializing in Carbon2, the result would be:

```
metric=weather field=temperature location=us-midwest season=summer 82 1234567890
metric=weather field=wind location=us-midwest season=summer 100 1234567890
```

### Fields and Tags with spaces
When a field key or tag key/value have spaces, spaces will be replaced with `_`.

### Tags with empty values
When a tag's value is empty, it will be replaced with `null`
67 changes: 67 additions & 0 deletions plugins/serializers/carbon2/carbon2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package carbon2

import (
"bytes"
"fmt"
"github.com/influxdata/telegraf"
"strconv"
"strings"
)

type serializer struct {
}

func NewSerializer() (*serializer, error) {
s := &serializer{}
return s, nil
}

func (s *serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
return s.createObject(metric), nil
}

func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var batch bytes.Buffer
for _, metric := range metrics {
batch.Write(s.createObject(metric))
}
return batch.Bytes(), nil
}

func (s *serializer) createObject(metric telegraf.Metric) []byte {
var m bytes.Buffer
for fieldName, fieldValue := range metric.Fields() {
if isNumeric(fieldValue) {
m.WriteString("metric=")
m.WriteString(strings.Replace(metric.Name(), " ", "_", -1))
m.WriteString(" field=")
m.WriteString(strings.Replace(fieldName, " ", "_", -1))
m.WriteString(" ")
for _, tag := range metric.TagList() {
m.WriteString(strings.Replace(tag.Key, " ", "_", -1))
m.WriteString("=")
value := tag.Value
if len(value) == 0 {
value = "null"
}
m.WriteString(strings.Replace(value, " ", "_", -1))
m.WriteString(" ")
}
m.WriteString(" ")
m.WriteString(fmt.Sprintf("%v", fieldValue))
m.WriteString(" ")
m.WriteString(strconv.FormatInt(metric.Time().Unix(), 10))
m.WriteString("\n")
}
}
return m.Bytes()
}

func isNumeric(v interface{}) bool {
switch v.(type) {
case string:
return false
default:
return true
}
}
138 changes: 138 additions & 0 deletions plugins/serializers/carbon2/carbon2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package carbon2

import (
"fmt"
"github.com/stretchr/testify/require"
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
)

func MustMetric(v telegraf.Metric, err error) telegraf.Metric {
if err != nil {
panic(err)
}
return v
}

func TestSerializeMetricFloat(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)

s, _ := NewSerializer()
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)
expS := []byte(fmt.Sprintf(`metric=cpu field=usage_idle cpu=cpu0 91.5 %d`, now.Unix()) + "\n")
assert.Equal(t, string(expS), string(buf))
}

func TestSerializeMetricWithEmptyStringTag(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu": "",
}
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)

s, _ := NewSerializer()
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)
expS := []byte(fmt.Sprintf(`metric=cpu field=usage_idle cpu=null 91.5 %d`, now.Unix()) + "\n")
assert.Equal(t, string(expS), string(buf))
}

func TestSerializeWithSpaces(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu 0": "cpu 0",
}
fields := map[string]interface{}{
"usage_idle 1": float64(91.5),
}
m, err := metric.New("cpu metric", tags, fields, now)
assert.NoError(t, err)

s, _ := NewSerializer()
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)
expS := []byte(fmt.Sprintf(`metric=cpu_metric field=usage_idle_1 cpu_0=cpu_0 91.5 %d`, now.Unix()) + "\n")
assert.Equal(t, string(expS), string(buf))
}

func TestSerializeMetricInt(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": int64(90),
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)

s, _ := NewSerializer()
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)

expS := []byte(fmt.Sprintf(`metric=cpu field=usage_idle cpu=cpu0 90 %d`, now.Unix()) + "\n")
assert.Equal(t, string(expS), string(buf))
}

func TestSerializeMetricString(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": "foobar",
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)

s, _ := NewSerializer()
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)

expS := []byte("")
assert.Equal(t, string(expS), string(buf))
}

func TestSerializeBatch(t *testing.T) {
m := MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42,
},
time.Unix(0, 0),
),
)

metrics := []telegraf.Metric{m, m}
s, _ := NewSerializer()
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
expS := []byte(`metric=cpu field=value 42 0
metric=cpu field=value 42 0
`)
assert.Equal(t, string(expS), string(buf))
}
7 changes: 7 additions & 0 deletions plugins/serializers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/influxdata/telegraf"

"github.com/influxdata/telegraf/plugins/serializers/carbon2"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/plugins/serializers/json"
Expand Down Expand Up @@ -82,6 +83,8 @@ func NewSerializer(config *Config) (Serializer, error) {
serializer, err = NewSplunkmetricSerializer(config.HecRouting)
case "nowmetric":
serializer, err = NewNowSerializer()
case "carbon2":
serializer, err = NewCarbon2Serializer()
default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
}
Expand All @@ -92,6 +95,10 @@ func NewJsonSerializer(timestampUnits time.Duration) (Serializer, error) {
return json.NewSerializer(timestampUnits)
}

func NewCarbon2Serializer() (Serializer, error) {
return carbon2.NewSerializer()
}

func NewSplunkmetricSerializer(splunkmetric_hec_routing bool) (Serializer, error) {
return splunkmetric.NewSerializer(splunkmetric_hec_routing)
}
Expand Down

0 comments on commit cf500d1

Please sign in to comment.