diff --git a/accumulator.go b/accumulator.go index 6c1962ac4cece..5052eddb0d2d0 100644 --- a/accumulator.go +++ b/accumulator.go @@ -13,7 +13,7 @@ import ( // BatchPoints is used to send a batch of data in a single write from telegraf // to influx type BatchPoints struct { - mu sync.Mutex + sync.Mutex client.BatchPoints @@ -30,8 +30,8 @@ func (bp *BatchPoints) Add( val interface{}, tags map[string]string, ) { - bp.mu.Lock() - defer bp.mu.Unlock() + bp.Lock() + defer bp.Unlock() measurement = bp.Prefix + measurement @@ -72,8 +72,8 @@ func (bp *BatchPoints) AddFieldsWithTime( // TODO this function should add the fields with the timestamp, but that will // need to wait for the InfluxDB point precision/unit to be fixed bp.AddFields(measurement, fields, tags) - // bp.mu.Lock() - // defer bp.mu.Unlock() + // bp.Lock() + // defer bp.Unlock() // measurement = bp.Prefix + measurement @@ -117,8 +117,8 @@ func (bp *BatchPoints) AddFields( fields map[string]interface{}, tags map[string]string, ) { - bp.mu.Lock() - defer bp.mu.Unlock() + bp.Lock() + defer bp.Unlock() measurement = bp.Prefix + measurement diff --git a/plugins/statsd/statsd.go b/plugins/statsd/statsd.go index 53db0f74e7f3e..686f5299ecb6e 100644 --- a/plugins/statsd/statsd.go +++ b/plugins/statsd/statsd.go @@ -34,9 +34,9 @@ type Statsd struct { done chan struct{} // Cache gauges, counters & sets so they can be aggregated as they arrive - gauges map[string]cachedmetric - counters map[string]cachedmetric - sets map[string]cachedmetric + gauges map[string]cachedgauge + counters map[string]cachedcounter + sets map[string]cachedset Mappings []struct { Match string @@ -52,9 +52,9 @@ func NewStatsd() *Statsd { s.done = make(chan struct{}) s.in = make(chan string, s.AllowedPendingMessages) s.inmetrics = make(chan metric, s.AllowedPendingMessages) - s.gauges = make(map[string]cachedmetric) - s.counters = make(map[string]cachedmetric) - s.sets = make(map[string]cachedmetric) + s.gauges = make(map[string]cachedgauge) + s.counters = make(map[string]cachedcounter) + s.sets = make(map[string]cachedset) return &s } @@ -63,19 +63,31 @@ func NewStatsd() *Statsd { type metric struct { name string bucket string - value int64 + value float64 mtype string additive bool samplerate float64 tags map[string]string } -// cachedmetric is a subset of metric used specifically for storing cached -// gauges and counters, ready for sending to InfluxDB. -type cachedmetric struct { +type cachedset struct { + set map[int64]bool + tags map[string]string +} + +type cachedgauge struct { + value float64 + tags map[string]string +} + +type cachedcounter struct { value int64 tags map[string]string - set map[int64]bool +} + +type cachedtiming struct { + timings []float64 + tags map[string]string } func (_ *Statsd) Description() string { @@ -105,7 +117,6 @@ func (s *Statsd) Gather(acc plugins.Accumulator) error { s.Lock() defer s.Unlock() - values := make(map[string]int64) items := len(s.inmetrics) for i := 0; i < items; i++ { @@ -123,26 +134,23 @@ func (s *Statsd) Gather(acc plugins.Accumulator) error { acc.Add(name, cmetric.value, cmetric.tags) } if s.DeleteGauges { - s.gauges = make(map[string]cachedmetric) + s.gauges = make(map[string]cachedgauge) } for name, cmetric := range s.counters { acc.Add(name, cmetric.value, cmetric.tags) } if s.DeleteCounters { - s.counters = make(map[string]cachedmetric) + s.counters = make(map[string]cachedcounter) } for name, cmetric := range s.sets { - acc.Add(name, cmetric.value, cmetric.tags) + acc.Add(name, int64(len(cmetric.set)), cmetric.tags) } if s.DeleteSets { - s.sets = make(map[string]cachedmetric) + s.sets = make(map[string]cachedset) } - for name, value := range values { - acc.Add(name, value, nil) - } return nil } @@ -153,9 +161,9 @@ func (s *Statsd) Start() error { s.done = make(chan struct{}) s.in = make(chan string, s.AllowedPendingMessages) s.inmetrics = make(chan metric, s.AllowedPendingMessages) - s.gauges = make(map[string]cachedmetric) - s.counters = make(map[string]cachedmetric) - s.sets = make(map[string]cachedmetric) + s.gauges = make(map[string]cachedgauge) + s.counters = make(map[string]cachedcounter) + s.sets = make(map[string]cachedset) // Start the UDP listener go s.udpListen() @@ -267,14 +275,14 @@ func (s *Statsd) parseStatsdLine(line string) error { } m.additive = true } - v, err := strconv.ParseInt(parts2[1], 10, 64) + v, err := strconv.ParseFloat(parts2[1], 64) if err != nil { - log.Printf("Error: parsing value to int64: %s\n", line) + log.Printf("Error: parsing value to float64: %s\n", line) return errors.New("Error Parsing statsd line") } // If a sample rate is given with a counter, divide value by the rate if m.samplerate != 0 && m.mtype == "c" { - v = int64(float64(v) / m.samplerate) + v = v / m.samplerate } m.value = v @@ -301,7 +309,7 @@ func (s *Statsd) parseStatsdLine(line string) error { // map of tags. // Return values are (, ) func (s *Statsd) parseName(m metric) (string, map[string]string) { - var tags map[string]string + tags := make(map[string]string) name := strings.Replace(m.bucket, ".", "_", -1) name = strings.Replace(name, "-", "__", -1) @@ -325,13 +333,13 @@ func (s *Statsd) parseName(m metric) (string, map[string]string) { switch m.mtype { case "c": - name = name + "_counter" + tags["metric_type"] = "counter" case "g": - name = name + "_gauge" + tags["metric_type"] = "gauge" case "s": - name = name + "_set" + tags["metric_type"] = "set" case "ms", "h": - name = name + "_timer" + tags["metric_type"] = "timer" } return name, tags @@ -361,21 +369,22 @@ func bucketglob(pattern, bucket string) bool { func (s *Statsd) aggregate(m metric) { switch m.mtype { case "c": + v := int64(m.value) cached, ok := s.counters[m.name] if !ok { - s.counters[m.name] = cachedmetric{ - value: m.value, + s.counters[m.name] = cachedcounter{ + value: v, tags: m.tags, } } else { - cached.value += m.value + cached.value += v cached.tags = m.tags s.counters[m.name] = cached } case "g": cached, ok := s.gauges[m.name] if !ok { - s.gauges[m.name] = cachedmetric{ + s.gauges[m.name] = cachedgauge{ value: m.value, tags: m.tags, } @@ -389,22 +398,17 @@ func (s *Statsd) aggregate(m metric) { s.gauges[m.name] = cached } case "s": + v := int64(m.value) cached, ok := s.sets[m.name] if !ok { // Completely new metric (initialize with count of 1) - s.sets[m.name] = cachedmetric{ - value: 1, - tags: m.tags, - set: map[int64]bool{m.value: true}, + s.sets[m.name] = cachedset{ + tags: m.tags, + set: map[int64]bool{v: true}, } } else { - _, ok := s.sets[m.name].set[m.value] - if !ok { - // Metric exists, but value has not been counted - cached.value += 1 - cached.set[m.value] = true - s.sets[m.name] = cached - } + cached.set[v] = true + s.sets[m.name] = cached } } } diff --git a/plugins/statsd/statsd_test.go b/plugins/statsd/statsd_test.go index ee48d07e0afda..4237e2381e774 100644 --- a/plugins/statsd/statsd_test.go +++ b/plugins/statsd/statsd_test.go @@ -23,7 +23,6 @@ func TestParse_InvalidLines(t *testing.T) { "invalid.value:foobar|c", "invalid.value:d11|c", "invalid.value:1d1|c", - "invalid.value:1.1|c", } for _, line := range invalid_lines { err := s.parseStatsdLine(line) @@ -50,39 +49,39 @@ func TestParse_InvalidSampleRate(t *testing.T) { } } - validations := []struct { + counter_validations := []struct { name string value int64 - cache map[string]cachedmetric + cache map[string]cachedcounter }{ { - "invalid_sample_rate_counter", + "invalid_sample_rate", 45, s.counters, }, { - "invalid_sample_rate_2_counter", + "invalid_sample_rate_2", 45, s.counters, }, - { - "invalid_sample_rate_gauge", - 45, - s.gauges, - }, - { - "invalid_sample_rate_set", - 1, - s.sets, - }, } - for _, test := range validations { - err := test_validate_value(test.name, test.value, test.cache) + for _, test := range counter_validations { + err := test_validate_counter(test.name, test.value, test.cache) if err != nil { t.Error(err.Error()) } } + + err := test_validate_gauge("invalid_sample_rate", 45, s.gauges) + if err != nil { + t.Error(err.Error()) + } + + err = test_validate_set("invalid_sample_rate", 1, s.sets) + if err != nil { + t.Error(err.Error()) + } } // Names should be parsed like . -> _ and - -> __ @@ -105,17 +104,17 @@ func TestParse_DefaultNameParsing(t *testing.T) { value int64 }{ { - "valid_counter", + "valid", 1, }, { - "valid_foo__bar_counter", + "valid_foo__bar", 11, }, } for _, test := range validations { - err := test_validate_value(test.name, test.value, s.counters) + err := test_validate_counter(test.name, test.value, s.counters) if err != nil { t.Error(err.Error()) } @@ -154,34 +153,12 @@ func TestParse_ValidLines(t *testing.T) { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } } +} - validations := []struct { - name string - value int64 - cache map[string]cachedmetric - }{ - { - "valid_counter", - 45, - s.counters, - }, - { - "valid_set", - 1, - s.sets, - }, - { - "valid_gauge", - 45, - s.gauges, - }, - } - - for _, test := range validations { - err := test_validate_value(test.name, test.value, test.cache) - if err != nil { - t.Error(err.Error()) - } +// Test that floats are handled as expected for all metric types +func TestParse_Floats(t *testing.T) { + if false { + t.Errorf("TODO") } } @@ -215,36 +192,36 @@ func TestParse_Gauges(t *testing.T) { validations := []struct { name string - value int64 + value float64 }{ { - "plus_minus_gauge", + "plus_minus", 120, }, { - "plus_plus_gauge", + "plus_plus", 300, }, { - "minus_minus_gauge", + "minus_minus", -100, }, { - "lone_plus_gauge", + "lone_plus", 100, }, { - "lone_minus_gauge", + "lone_minus", -100, }, { - "overwrite_gauge", + "overwrite", 300, }, } for _, test := range validations { - err := test_validate_value(test.name, test.value, s.gauges) + err := test_validate_gauge(test.name, test.value, s.gauges) if err != nil { t.Error(err.Error()) } @@ -282,17 +259,17 @@ func TestParse_Sets(t *testing.T) { value int64 }{ { - "unique_user_ids_set", + "unique_user_ids", 4, }, { - "oneuser_id_set", + "oneuser_id", 1, }, } for _, test := range validations { - err := test_validate_value(test.name, test.value, s.sets) + err := test_validate_set(test.name, test.value, s.sets) if err != nil { t.Error(err.Error()) } @@ -328,25 +305,25 @@ func TestParse_Counters(t *testing.T) { value int64 }{ { - "small_inc_counter", + "small_inc", 2, }, { - "big_inc_counter", + "big_inc", 1100101, }, { - "zero_init_counter", + "zero_init", 0, }, { - "sample_rate_counter", + "sample_rate", 11, }, } for _, test := range validations { - err := test_validate_value(test.name, test.value, s.counters) + err := test_validate_counter(test.name, test.value, s.counters) if err != nil { t.Error(err.Error()) } @@ -373,14 +350,14 @@ func TestParse_Gauges_Delete(t *testing.T) { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } - err = test_validate_value("current_users_gauge", 100, s.gauges) + err = test_validate_gauge("current_users", 100, s.gauges) if err != nil { t.Error(err.Error()) } s.Gather(fakeacc) - err = test_validate_value("current_users_gauge", 100, s.gauges) + err = test_validate_gauge("current_users", 100, s.gauges) if err == nil { t.Error("current_users_gauge metric should have been deleted") } @@ -399,14 +376,14 @@ func TestParse_Sets_Delete(t *testing.T) { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } - err = test_validate_value("unique_user_ids_set", 1, s.sets) + err = test_validate_set("unique_user_ids", 1, s.sets) if err != nil { t.Error(err.Error()) } s.Gather(fakeacc) - err = test_validate_value("unique_user_ids_set", 1, s.sets) + err = test_validate_set("unique_user_ids", 1, s.sets) if err == nil { t.Error("unique_user_ids_set metric should have been deleted") } @@ -425,14 +402,14 @@ func TestParse_Counters_Delete(t *testing.T) { t.Errorf("Parsing line %s should not have resulted in an error\n", line) } - err = test_validate_value("total_users_counter", 100, s.counters) + err = test_validate_counter("total_users", 100, s.counters) if err != nil { t.Error(err.Error()) } s.Gather(fakeacc) - err = test_validate_value("total_users_counter", 100, s.counters) + err = test_validate_counter("total_users", 100, s.counters) if err == nil { t.Error("total_users_counter metric should have been deleted") } @@ -447,14 +424,52 @@ func TestListen(t *testing.T) { // Test utility functions -func test_validate_value(name string, value int64, cache map[string]cachedmetric) error { +func test_validate_set( + name string, + value int64, + cache map[string]cachedset, +) error { + metric, ok := cache[name] + if !ok { + return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name)) + } + + if value != int64(len(metric.set)) { + return errors.New(fmt.Sprintf("Measurement: %s, expected %d, actual %d\n", + name, value, len(metric.set))) + } + return nil +} + +func test_validate_counter( + name string, + value int64, + cache map[string]cachedcounter, +) error { + metric, ok := cache[name] + if !ok { + return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name)) + } + + if value != metric.value { + return errors.New(fmt.Sprintf("Measurement: %s, expected %d, actual %d\n", + name, value, metric.value)) + } + return nil +} + +func test_validate_gauge( + name string, + value float64, + cache map[string]cachedgauge, +) error { metric, ok := cache[name] if !ok { return errors.New(fmt.Sprintf("Test Error: Metric name %s not found\n", name)) } if value != metric.value { - return errors.New(fmt.Sprintf("Measurement: %s, expected %d, actual %d", + return errors.New(fmt.Sprintf("Measurement: %s, expected %f, actual %f\n", name, value, metric.value)) } return nil