diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index a54ec86ec7388..fcdcbbc9c84cf 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/influxdata/influxdb/services/graphite" @@ -51,6 +52,8 @@ type Statsd struct { done chan struct{} // Cache gauges, counters & sets so they can be aggregated as they arrive + // gauges and counters map measurement/tags hash -> field name -> metrics + // sets and timings map measurement/tags hash -> metrics gauges map[string]cachedgauge counters map[string]cachedcounter sets map[string]cachedset @@ -80,6 +83,7 @@ func NewStatsd() *Statsd { // One statsd metric, form is :||@ type metric struct { name string + field string bucket string hash string intvalue int64 @@ -91,21 +95,21 @@ type metric struct { } type cachedset struct { - name string - set map[int64]bool - tags map[string]string + name string + fields map[string]map[int64]bool + tags map[string]string } type cachedgauge struct { - name string - value float64 - tags map[string]string + name string + fields map[string]interface{} + tags map[string]string } type cachedcounter struct { - name string - value int64 - tags map[string]string + name string + fields map[string]interface{} + tags map[string]string } type cachedtimings struct { @@ -160,6 +164,7 @@ func (_ *Statsd) SampleConfig() string { func (s *Statsd) Gather(acc telegraf.Accumulator) error { s.Lock() defer s.Unlock() + now := time.Now() for _, metric := range s.timings { fields := make(map[string]interface{}) @@ -172,28 +177,32 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { name := fmt.Sprintf("%v_percentile", percentile) fields[name] = metric.stats.Percentile(percentile) } - acc.AddFields(metric.name, fields, metric.tags) + acc.AddFields(metric.name, fields, metric.tags, now) } if s.DeleteTimings { s.timings = make(map[string]cachedtimings) } for _, metric := range s.gauges { - acc.Add(metric.name, metric.value, metric.tags) + acc.AddFields(metric.name, metric.fields, metric.tags, now) } if s.DeleteGauges { s.gauges = make(map[string]cachedgauge) } for _, metric := range s.counters { - acc.Add(metric.name, metric.value, metric.tags) + acc.AddFields(metric.name, metric.fields, metric.tags, now) } if s.DeleteCounters { s.counters = make(map[string]cachedcounter) } for _, metric := range s.sets { - acc.Add(metric.name, int64(len(metric.set)), metric.tags) + fields := make(map[string]interface{}) + for field, set := range metric.fields { + fields[field] = int64(len(set)) + } + acc.AddFields(metric.name, fields, metric.tags, now) } if s.DeleteSets { s.sets = make(map[string]cachedset) @@ -358,7 +367,7 @@ func (s *Statsd) parseStatsdLine(line string) error { } // Parse the name & tags from bucket - m.name, m.tags = s.parseName(m.bucket) + m.name, m.field, m.tags = s.parseName(m.bucket) switch m.mtype { case "c": m.tags["metric_type"] = "counter" @@ -389,8 +398,8 @@ func (s *Statsd) parseStatsdLine(line string) error { // parseName parses the given bucket name with the list of bucket maps in the // config file. If there is a match, it will parse the name of the metric and // map of tags. -// Return values are (, ) -func (s *Statsd) parseName(bucket string) (string, map[string]string) { +// Return values are (, , ) +func (s *Statsd) parseName(bucket string) (string, string, map[string]string) { tags := make(map[string]string) bucketparts := strings.Split(bucket, ",") @@ -410,17 +419,21 @@ func (s *Statsd) parseName(bucket string) (string, map[string]string) { DefaultTags: tags, } + var field string name := bucketparts[0] p, err := graphite.NewParserWithOptions(o) if err == nil { - name, tags, _, _ = p.ApplyTemplate(name) + name, tags, field, _ = p.ApplyTemplate(name) } if s.ConvertNames { name = strings.Replace(name, ".", "_", -1) name = strings.Replace(name, "-", "__", -1) } + if field == "" { + field = "value" + } - return name, tags + return name, field, tags } // Parse the key,value out of a string that looks like "key=value" @@ -466,46 +479,59 @@ func (s *Statsd) aggregate(m metric) { s.timings[m.hash] = cached } case "c": - cached, ok := s.counters[m.hash] + // check if the measurement exists + _, ok := s.counters[m.hash] if !ok { s.counters[m.hash] = cachedcounter{ - name: m.name, - value: m.intvalue, - tags: m.tags, + name: m.name, + fields: make(map[string]interface{}), + tags: m.tags, } - } else { - cached.value += m.intvalue - s.counters[m.hash] = cached } + // check if the field exists + _, ok = s.counters[m.hash].fields[m.field] + if !ok { + s.counters[m.hash].fields[m.field] = int64(0) + } + s.counters[m.hash].fields[m.field] = + s.counters[m.hash].fields[m.field].(int64) + m.intvalue case "g": - cached, ok := s.gauges[m.hash] + // check if the measurement exists + _, ok := s.gauges[m.hash] if !ok { s.gauges[m.hash] = cachedgauge{ - name: m.name, - value: m.floatvalue, - tags: m.tags, + name: m.name, + fields: make(map[string]interface{}), + tags: m.tags, } + } + // check if the field exists + _, ok = s.gauges[m.hash].fields[m.field] + if !ok { + s.gauges[m.hash].fields[m.field] = float64(0) + } + if m.additive { + s.gauges[m.hash].fields[m.field] = + s.gauges[m.hash].fields[m.field].(float64) + m.floatvalue } else { - if m.additive { - cached.value = cached.value + m.floatvalue - } else { - cached.value = m.floatvalue - } - s.gauges[m.hash] = cached + s.gauges[m.hash].fields[m.field] = m.floatvalue } case "s": - cached, ok := s.sets[m.hash] + // check if the measurement exists + _, ok := s.sets[m.hash] if !ok { - // Completely new metric (initialize with count of 1) s.sets[m.hash] = cachedset{ - name: m.name, - tags: m.tags, - set: map[int64]bool{m.intvalue: true}, + name: m.name, + fields: make(map[string]map[int64]bool), + tags: m.tags, } - } else { - cached.set[m.intvalue] = true - s.sets[m.hash] = cached } + // check if the field exists + _, ok = s.sets[m.hash].fields[m.field] + if !ok { + s.sets[m.hash].fields[m.field] = make(map[int64]bool) + } + s.sets[m.hash].fields[m.field][m.intvalue] = true } } diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index 3a5917ab6b2e6..a285467b0ced7 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -243,6 +243,113 @@ func TestParse_TemplateSpecificity(t *testing.T) { } } +// Test that most specific template is chosen +func TestParse_TemplateFields(t *testing.T) { + s := NewStatsd() + s.Templates = []string{ + "* measurement.measurement.field", + } + + lines := []string{ + "my.counter.f1:1|c", + "my.counter.f1:1|c", + "my.counter.f2:1|c", + "my.counter.f3:10|c", + "my.counter.f3:100|c", + "my.gauge.f1:10.1|g", + "my.gauge.f2:10.1|g", + "my.gauge.f1:0.9|g", + "my.set.f1:1|s", + "my.set.f1:2|s", + "my.set.f1:1|s", + "my.set.f2:100|s", + } + + for _, line := range lines { + err := s.parseStatsdLine(line) + if err != nil { + t.Errorf("Parsing line %s should not have resulted in an error\n", line) + } + } + + counter_tests := []struct { + name string + value int64 + field string + }{ + { + "my_counter", + 2, + "f1", + }, + { + "my_counter", + 1, + "f2", + }, + { + "my_counter", + 110, + "f3", + }, + } + // Validate counters + for _, test := range counter_tests { + err := test_validate_counter(test.name, test.value, s.counters, test.field) + if err != nil { + t.Error(err.Error()) + } + } + + gauge_tests := []struct { + name string + value float64 + field string + }{ + { + "my_gauge", + 0.9, + "f1", + }, + { + "my_gauge", + 10.1, + "f2", + }, + } + // Validate gauges + for _, test := range gauge_tests { + err := test_validate_gauge(test.name, test.value, s.gauges, test.field) + if err != nil { + t.Error(err.Error()) + } + } + + set_tests := []struct { + name string + value int64 + field string + }{ + { + "my_set", + 2, + "f1", + }, + { + "my_set", + 1, + "f2", + }, + } + // Validate sets + for _, test := range set_tests { + err := test_validate_set(test.name, test.value, s.sets, test.field) + if err != nil { + t.Error(err.Error()) + } + } +} + // Test that fields are parsed correctly func TestParse_Fields(t *testing.T) { if false { @@ -286,7 +393,7 @@ func TestParse_Tags(t *testing.T) { } for _, test := range tests { - name, tags := s.parseName(test.bucket) + name, _, tags := s.parseName(test.bucket) if name != test.name { t.Errorf("Expected: %s, got %s", test.name, name) } @@ -326,7 +433,7 @@ func TestParseName(t *testing.T) { } for _, test := range tests { - name, _ := s.parseName(test.in_name) + name, _, _ := s.parseName(test.in_name) if name != test.out_name { t.Errorf("Expected: %s, got %s", test.out_name, name) } @@ -354,7 +461,7 @@ func TestParseName(t *testing.T) { } for _, test := range tests { - name, _ := s.parseName(test.in_name) + name, _, _ := s.parseName(test.in_name) if name != test.out_name { t.Errorf("Expected: %s, got %s", test.out_name, name) } @@ -863,7 +970,14 @@ func test_validate_set( name string, value int64, cache map[string]cachedset, + field ...string, ) error { + var f string + if len(field) > 0 { + f = field[0] + } else { + f = "value" + } var metric cachedset var found bool for _, v := range cache { @@ -877,23 +991,30 @@ func test_validate_set( return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name)) } - if value != int64(len(metric.set)) { + if value != int64(len(metric.fields[f])) { return errors.New(fmt.Sprintf("Measurement: %s, expected %d, actual %d\n", - name, value, len(metric.set))) + name, value, len(metric.fields[f]))) } return nil } func test_validate_counter( name string, - value int64, + valueExpected int64, cache map[string]cachedcounter, + field ...string, ) error { - var metric cachedcounter + var f string + if len(field) > 0 { + f = field[0] + } else { + f = "value" + } + var valueActual int64 var found bool for _, v := range cache { if v.name == name { - metric = v + valueActual = v.fields[f].(int64) found = true break } @@ -902,23 +1023,30 @@ func test_validate_counter( return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name)) } - if value != metric.value { + if valueExpected != valueActual { return errors.New(fmt.Sprintf("Measurement: %s, expected %d, actual %d\n", - name, value, metric.value)) + name, valueExpected, valueActual)) } return nil } func test_validate_gauge( name string, - value float64, + valueExpected float64, cache map[string]cachedgauge, + field ...string, ) error { - var metric cachedgauge + var f string + if len(field) > 0 { + f = field[0] + } else { + f = "value" + } + var valueActual float64 var found bool for _, v := range cache { if v.name == name { - metric = v + valueActual = v.fields[f].(float64) found = true break } @@ -927,9 +1055,9 @@ func test_validate_gauge( return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name)) } - if value != metric.value { + if valueExpected != valueActual { return errors.New(fmt.Sprintf("Measurement: %s, expected %f, actual %f\n", - name, value, metric.value)) + name, valueExpected, valueActual)) } return nil }