Skip to content

Commit

Permalink
Carbon2 configuration option - include field in metric name (influxda…
Browse files Browse the repository at this point in the history
  • Loading branch information
pmalek-sumo authored and pmalek committed Sep 28, 2020
1 parent f64a4c6 commit c9a6de7
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 79 deletions.
9 changes: 9 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1914,6 +1914,14 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
}
}

if node, ok := tbl.Fields["carbon2_format"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.Carbon2Format = str.Value
}
}
}

if node, ok := tbl.Fields["influx_max_line_bytes"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if integer, ok := kv.Value.(*ast.Integer); ok {
Expand Down Expand Up @@ -2070,6 +2078,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
}
}

delete(tbl.Fields, "carbon2_format")
delete(tbl.Fields, "influx_max_line_bytes")
delete(tbl.Fields, "influx_sort_fields")
delete(tbl.Fields, "influx_uint_support")
Expand Down
12 changes: 6 additions & 6 deletions plugins/outputs/sumologic/sumologic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestMethod(t *testing.T) {
w.WriteHeader(http.StatusOK)
})

serializer, err := carbon2.NewSerializer()
serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
require.NoError(t, err)

plugin := tt.plugin()
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestStatusCode(t *testing.T) {
w.WriteHeader(tt.statusCode)
})

serializer, err := carbon2.NewSerializer()
serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
require.NoError(t, err)

tt.plugin.SetSerializer(serializer)
Expand All @@ -234,7 +234,7 @@ func TestContentType(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

carbon2Serializer, err := carbon2.NewSerializer()
carbon2Serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestContentEncodingGzip(t *testing.T) {
w.WriteHeader(http.StatusNoContent)
})

serializer, err := carbon2.NewSerializer()
serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
require.NoError(t, err)

plugin := tt.plugin()
Expand Down Expand Up @@ -372,7 +372,7 @@ func TestDefaultUserAgent(t *testing.T) {
MaxRequstBodySize: Default().MaxRequstBodySize,
}

serializer, err := carbon2.NewSerializer()
serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
require.NoError(t, err)

plugin.SetSerializer(serializer)
Expand Down Expand Up @@ -555,7 +555,7 @@ func TestMaxRequestBodySize(t *testing.T) {
w.WriteHeader(http.StatusOK)
})

serializer, err := carbon2.NewSerializer()
serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
require.NoError(t, err)

plugin := tt.plugin()
Expand Down
45 changes: 39 additions & 6 deletions plugins/serializers/carbon2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

The `carbon2` serializer translates the Telegraf metric format to the [Carbon2 format](http://metrics20.org/implementations/).

### Configuration
## Configuration

```toml
[[outputs.file]]
Expand All @@ -14,20 +14,51 @@ The `carbon2` serializer translates the Telegraf metric format to the [Carbon2 f
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "carbon2"

## Optionally configure metrics format, whether to merge metric name and field name.
## Possible options:
## * "field_separate"
## * "metric_includes_field"
## * "" - defaults to "field_separate"
# carbon2_format = "field_separate"
```

Standard form:

```
metric=name field=field_1 host=foo 30 1234567890
metric=name field=field_2 host=foo 4 1234567890
metric=name field=field_N host=foo 59 1234567890
```

### Metrics
### Metrics format

`Carbon2` serializer has a configuration option - `carbon2_format` - to change how
metrics names are being constructed.

The serializer converts the metrics by creating `intrinsic_tags` using the combination of metric name and fields. So, if one Telegraf metric has 4 fields, the `carbon2` output will be 4 separate metrics. There will be a `metric` tag that represents the name of the metric and a `field` tag to represent the field.
By default `metric` will only inclue the metric name and a separate field `field`
will contain the field name.
This is the behavior of `carbon2_format = "field_separate"` which is the default
behavior (even if unspecified).

### Example
Optionally user can opt in to change this to make the metric inclue the field name
after the `_`.
This is the behavior of `carbon2_format = "metric_includes_field"` which would
make the above example look like:

```
metric=name_field_1 host=foo 30 1234567890
metric=name_field_2 host=foo 4 1234567890
metric=name_field_N host=foo 59 1234567890
```

## Metrics

The serializer converts the metrics by creating `intrinsic_tags` using the combination of metric name and fields.
So, if one Telegraf metric has 4 fields, the `carbon2` output will be 4 separate metrics.
There will be a `metric` tag that represents the name of the metric and a `field` tag to represent the field.

## Example

If we take the following InfluxDB Line Protocol:

Expand All @@ -42,8 +73,10 @@ metric=weather field=temperature location=us-midwest season=summer 82 123456789
metric=weather field=wind location=us-midwest season=summer 100 1234567890
```

### Fields and Tags with spaces
## Fields and Tags with spaces

When a field key or tag key/value have spaces, spaces will be replaced with `_`.

### Tags with empty values
## Tags with empty values

When a tag's value is empty, it will be replaced with `null`
93 changes: 71 additions & 22 deletions plugins/serializers/carbon2/carbon2.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,39 @@ import (
"github.com/influxdata/telegraf"
)

type format string

const (
Carbon2FormatFieldSeparate string = "field_separate"
Carbon2FormatMetricIncludesField string = "metric_includes_field"

formatFieldSeparate = format(Carbon2FormatFieldSeparate)
formatMetricIncludesField = format(Carbon2FormatMetricIncludesField)
)

var formats = map[string]format{
// Field separate is the default when no format specified.
"": formatFieldSeparate,
Carbon2FormatFieldSeparate: formatFieldSeparate,
Carbon2FormatMetricIncludesField: formatMetricIncludesField,
}

type Serializer struct {
metricsFormat format
}

func NewSerializer() (*Serializer, error) {
s := &Serializer{}
return s, nil
func NewSerializer(f string) (*Serializer, error) {
var (
ok bool
metricsFormat format
)
if metricsFormat, ok = formats[f]; !ok {
return nil, fmt.Errorf("unknown carbon2 format: %s", f)
}

return &Serializer{
metricsFormat: metricsFormat,
}, nil
}

func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
Expand All @@ -32,32 +59,54 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
func (s *Serializer) createObject(metric telegraf.Metric) []byte {
var m bytes.Buffer
for fieldName, fieldValue := range metric.Fields() {
if isNumeric(fieldValue) {
m.WriteString("metric=")
m.WriteString(strings.Replace(metric.Name(), " ", "_", -1))
m.WriteString(" field=")
m.WriteString(strings.Replace(fieldName, " ", "_", -1))
m.WriteString(" ")
for _, tag := range metric.TagList() {
m.WriteString(strings.Replace(tag.Key, " ", "_", -1))
m.WriteString("=")
value := tag.Value
if len(value) == 0 {
value = "null"
}
m.WriteString(strings.Replace(value, " ", "_", -1))
m.WriteString(" ")
if !isNumeric(fieldValue) {
continue
}

switch s.metricsFormat {
case formatFieldSeparate:
m.WriteString(serializeMetricFieldSeparate(
metric.Name(), fieldName,
))
case formatMetricIncludesField:
m.WriteString(serializeMetricIncludeField(
metric.Name(), fieldName,
))
}

for _, tag := range metric.TagList() {
m.WriteString(strings.Replace(tag.Key, " ", "_", -1))
m.WriteString("=")
value := tag.Value
if len(value) == 0 {
value = "null"
}
m.WriteString(strings.Replace(value, " ", "_", -1))
m.WriteString(" ")
m.WriteString(fmt.Sprintf("%v", fieldValue))
m.WriteString(" ")
m.WriteString(strconv.FormatInt(metric.Time().Unix(), 10))
m.WriteString("\n")
}
m.WriteString(" ")
m.WriteString(fmt.Sprintf("%v", fieldValue))
m.WriteString(" ")
m.WriteString(strconv.FormatInt(metric.Time().Unix(), 10))
m.WriteString("\n")
}
return m.Bytes()
}

func serializeMetricFieldSeparate(name, fieldName string) string {
return fmt.Sprintf("metric=%s field=%s ",
strings.Replace(name, " ", "_", -1),
strings.Replace(fieldName, " ", "_", -1),
)
}

func serializeMetricIncludeField(name, fieldName string) string {
return fmt.Sprintf("metric=%s_%s ",
strings.Replace(name, " ", "_", -1),
strings.Replace(fieldName, " ", "_", -1),
)
}

func isNumeric(v interface{}) bool {
switch v.(type) {
case string:
Expand Down
Loading

0 comments on commit c9a6de7

Please sign in to comment.