Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add carbon2 serializer #5345

Merged
merged 8 commits into from
Jan 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ For documentation on the latest development code see the [documentation index][d
- [Graphite](/plugins/serializers/graphite)
- [ServiceNow](/plugins/serializers/nowmetric)
- [SplunkMetric](/plugins/serializers/splunkmetric)
- [Carbon2](/plugins/serializers/carbon2)

## Processor Plugins

Expand Down
1 change: 1 addition & 0 deletions docs/DATA_FORMATS_OUTPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ plugins.
1. [JSON](/plugins/serializers/json)
1. [Graphite](/plugins/serializers/graphite)
1. [SplunkMetric](/plugins/serializers/splunkmetric)
1. [Carbon2](/plugins/serializers/carbon2)

You will be able to identify the plugins with support by the presence of a
`data_format` config option, for example, in the `file` output plugin:
Expand Down
49 changes: 49 additions & 0 deletions plugins/serializers/carbon2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Carbon2

The `carbon2` serializer translates the Telegraf metric format to the [Carbon2 format](http://metrics20.org/implementations/).

### Configuration

```toml
[[outputs.file]]
## Files to write to, "stdout" is a specially handled file.
files = ["stdout", "/tmp/metrics.out"]

## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "carbon2"
```

Standard form:
```
metric=name field=field_1 host=foo 30 1234567890
metric=name field=field_2 host=foo 4 1234567890
metric=name field=field_N host=foo 59 1234567890
```

### Metrics

The serializer converts the metrics by creating `intrinsic_tags` using the combination of metric name and fields. So, if one Telegraf metric has 4 fields, the `carbon2` output will be 4 separate metrics. There will be a `metric` tag that represents the name of the metric and a `field` tag to represent the field.

### Example

If we take the following InfluxDB Line Protocol:

```
weather,location=us-midwest,season=summer temperature=82,wind=100 1234567890
```

after serializing in Carbon2, the result would be:

```
metric=weather field=temperature location=us-midwest season=summer 82 1234567890
metric=weather field=wind location=us-midwest season=summer 100 1234567890
```

### Fields and Tags with spaces
When a field key or tag key/value have spaces, spaces will be replaced with `_`.

### Tags with empty values
When a tag's value is empty, it will be replaced with `null`
67 changes: 67 additions & 0 deletions plugins/serializers/carbon2/carbon2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package carbon2

import (
"bytes"
"fmt"
"github.com/influxdata/telegraf"
"strconv"
"strings"
)

type serializer struct {
}

func NewSerializer() (*serializer, error) {
s := &serializer{}
return s, nil
}

func (s *serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
return s.createObject(metric), nil
}

func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var batch bytes.Buffer
for _, metric := range metrics {
batch.Write(s.createObject(metric))
}
return batch.Bytes(), nil
}

func (s *serializer) createObject(metric telegraf.Metric) []byte {
var m bytes.Buffer
for fieldName, fieldValue := range metric.Fields() {
if isNumeric(fieldValue) {
m.WriteString("metric=")
m.WriteString(strings.Replace(metric.Name(), " ", "_", -1))
m.WriteString(" field=")
m.WriteString(strings.Replace(fieldName, " ", "_", -1))
m.WriteString(" ")
for _, tag := range metric.TagList() {
m.WriteString(strings.Replace(tag.Key, " ", "_", -1))
m.WriteString("=")
value := tag.Value
if len(value) == 0 {
value = "null"
}
m.WriteString(strings.Replace(value, " ", "_", -1))
m.WriteString(" ")
}
m.WriteString(" ")
m.WriteString(fmt.Sprintf("%v", fieldValue))
m.WriteString(" ")
m.WriteString(strconv.FormatInt(metric.Time().Unix(), 10))
m.WriteString("\n")
}
}
return m.Bytes()
}

func isNumeric(v interface{}) bool {
switch v.(type) {
case string:
return false
default:
return true
}
}
138 changes: 138 additions & 0 deletions plugins/serializers/carbon2/carbon2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package carbon2

import (
"fmt"
"github.com/stretchr/testify/require"
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
)

func MustMetric(v telegraf.Metric, err error) telegraf.Metric {
if err != nil {
panic(err)
}
return v
}

func TestSerializeMetricFloat(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)

s, _ := NewSerializer()
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)
expS := []byte(fmt.Sprintf(`metric=cpu field=usage_idle cpu=cpu0 91.5 %d`, now.Unix()) + "\n")
assert.Equal(t, string(expS), string(buf))
}

func TestSerializeMetricWithEmptyStringTag(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu": "",
}
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)

s, _ := NewSerializer()
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)
expS := []byte(fmt.Sprintf(`metric=cpu field=usage_idle cpu=null 91.5 %d`, now.Unix()) + "\n")
assert.Equal(t, string(expS), string(buf))
}

func TestSerializeWithSpaces(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu 0": "cpu 0",
}
fields := map[string]interface{}{
"usage_idle 1": float64(91.5),
}
m, err := metric.New("cpu metric", tags, fields, now)
assert.NoError(t, err)

s, _ := NewSerializer()
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)
expS := []byte(fmt.Sprintf(`metric=cpu_metric field=usage_idle_1 cpu_0=cpu_0 91.5 %d`, now.Unix()) + "\n")
assert.Equal(t, string(expS), string(buf))
}

func TestSerializeMetricInt(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": int64(90),
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)

s, _ := NewSerializer()
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)

expS := []byte(fmt.Sprintf(`metric=cpu field=usage_idle cpu=cpu0 90 %d`, now.Unix()) + "\n")
assert.Equal(t, string(expS), string(buf))
}

func TestSerializeMetricString(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": "foobar",
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)

s, _ := NewSerializer()
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)

expS := []byte("")
assert.Equal(t, string(expS), string(buf))
}

func TestSerializeBatch(t *testing.T) {
m := MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42,
},
time.Unix(0, 0),
),
)

metrics := []telegraf.Metric{m, m}
s, _ := NewSerializer()
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
expS := []byte(`metric=cpu field=value 42 0
metric=cpu field=value 42 0
`)
assert.Equal(t, string(expS), string(buf))
}
7 changes: 7 additions & 0 deletions plugins/serializers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/influxdata/telegraf"

"github.com/influxdata/telegraf/plugins/serializers/carbon2"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/plugins/serializers/json"
Expand Down Expand Up @@ -82,6 +83,8 @@ func NewSerializer(config *Config) (Serializer, error) {
serializer, err = NewSplunkmetricSerializer(config.HecRouting)
case "nowmetric":
serializer, err = NewNowSerializer()
case "carbon2":
serializer, err = NewCarbon2Serializer()
default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
}
Expand All @@ -92,6 +95,10 @@ func NewJsonSerializer(timestampUnits time.Duration) (Serializer, error) {
return json.NewSerializer(timestampUnits)
}

func NewCarbon2Serializer() (Serializer, error) {
return carbon2.NewSerializer()
}

func NewSplunkmetricSerializer(splunkmetric_hec_routing bool) (Serializer, error) {
return splunkmetric.NewSerializer(splunkmetric_hec_routing)
}
Expand Down