Skip to content

Commit

Permalink
Drop sampling rate when aggregation is enabled
Browse files Browse the repository at this point in the history
Mixing sampling rate and aggregation doesn't make sens and could be
erroneous if users sends the same metrics with different sampling rate.
  • Loading branch information
hush-hush committed Oct 23, 2020
1 parent 65f6630 commit 01fdab5
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 62 deletions.
12 changes: 6 additions & 6 deletions statsd/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func getContext(name string, tags []string) string {
return name + ":" + strings.Join(tags, ",")
}

func (a *aggregator) count(name string, value int64, tags []string, rate float64) error {
func (a *aggregator) count(name string, value int64, tags []string) error {
context := getContext(name, tags)
a.countsM.RLock()
if count, found := a.counts[context]; found {
Expand All @@ -147,12 +147,12 @@ func (a *aggregator) count(name string, value int64, tags []string, rate float64
a.countsM.RUnlock()

a.countsM.Lock()
a.counts[context] = newCountMetric(name, value, tags, rate)
a.counts[context] = newCountMetric(name, value, tags)
a.countsM.Unlock()
return nil
}

func (a *aggregator) gauge(name string, value float64, tags []string, rate float64) error {
func (a *aggregator) gauge(name string, value float64, tags []string) error {
context := getContext(name, tags)
a.gaugesM.RLock()
if gauge, found := a.gauges[context]; found {
Expand All @@ -162,15 +162,15 @@ func (a *aggregator) gauge(name string, value float64, tags []string, rate float
}
a.gaugesM.RUnlock()

gauge := newGaugeMetric(name, value, tags, rate)
gauge := newGaugeMetric(name, value, tags)

a.gaugesM.Lock()
a.gauges[context] = gauge
a.gaugesM.Unlock()
return nil
}

func (a *aggregator) set(name string, value string, tags []string, rate float64) error {
func (a *aggregator) set(name string, value string, tags []string) error {
context := getContext(name, tags)
a.setsM.RLock()
if set, found := a.sets[context]; found {
Expand All @@ -181,7 +181,7 @@ func (a *aggregator) set(name string, value string, tags []string, rate float64)
a.setsM.RUnlock()

a.setsM.Lock()
a.sets[context] = newSetMetric(name, value, tags, rate)
a.sets[context] = newSetMetric(name, value, tags)
a.setsM.Unlock()
return nil
}
38 changes: 19 additions & 19 deletions statsd/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,27 @@ func TestAggregatorSample(t *testing.T) {

tags := []string{"tag1", "tag2"}

a.gauge("gaugeTest", 21, tags, 1)
a.gauge("gaugeTest", 21, tags)
assert.Len(t, a.gauges, 1)
assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2")

a.count("countTest", 21, tags, 1)
a.count("countTest", 21, tags)
assert.Len(t, a.counts, 1)
assert.Contains(t, a.counts, "countTest:tag1,tag2")

a.set("setTest", "value1", tags, 1)
a.set("setTest", "value1", tags)
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")

a.gauge("gaugeTest", 123, tags, 1)
a.gauge("gaugeTest", 123, tags)
assert.Len(t, a.gauges, 1)
assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2")

a.count("countTest", 10, tags, 1)
a.count("countTest", 10, tags)
assert.Len(t, a.counts, 1)
assert.Contains(t, a.counts, "countTest:tag1,tag2")

a.set("setTest", "value1", tags, 1)
a.set("setTest", "value1", tags)
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")
}
Expand All @@ -44,18 +44,18 @@ func TestAggregatorFlush(t *testing.T) {

tags := []string{"tag1", "tag2"}

a.gauge("gaugeTest1", 21, tags, 1)
a.gauge("gaugeTest1", 10, tags, 1)
a.gauge("gaugeTest2", 15, tags, 1)
a.gauge("gaugeTest1", 21, tags)
a.gauge("gaugeTest1", 10, tags)
a.gauge("gaugeTest2", 15, tags)

a.count("countTest1", 21, tags, 1)
a.count("countTest1", 10, tags, 1)
a.count("countTest2", 1, tags, 1)
a.count("countTest1", 21, tags)
a.count("countTest1", 10, tags)
a.count("countTest2", 1, tags)

a.set("setTest1", "value1", tags, 1)
a.set("setTest1", "value1", tags, 1)
a.set("setTest1", "value2", tags, 1)
a.set("setTest2", "value1", tags, 1)
a.set("setTest1", "value1", tags)
a.set("setTest1", "value1", tags)
a.set("setTest1", "value2", tags)
a.set("setTest2", "value1", tags)

metrics := a.flushMetrics()

Expand Down Expand Up @@ -143,9 +143,9 @@ func TestAggregatorFlushConcurrency(t *testing.T) {
go func() {
defer wg.Done()

a.gauge("gaugeTest1", 21, tags, 1)
a.count("countTest1", 21, tags, 1)
a.set("setTest1", "value1", tags, 1)
a.gauge("gaugeTest1", 21, tags)
a.count("countTest1", 21, tags)
a.set("setTest1", "value1", tags)
}()
}

Expand Down
18 changes: 6 additions & 12 deletions statsd/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ type countMetric struct {
value int64
name string
tags []string
rate float64
}

func newCountMetric(name string, value int64, tags []string, rate float64) *countMetric {
func newCountMetric(name string, value int64, tags []string) *countMetric {
return &countMetric{
value: value,
name: name,
tags: tags,
rate: rate,
}
}

Expand All @@ -38,7 +36,7 @@ func (c *countMetric) flushUnsafe() metric {
metricType: count,
name: c.name,
tags: c.tags,
rate: c.rate,
rate: 1,
ivalue: c.value,
}
}
Expand All @@ -49,15 +47,13 @@ type gaugeMetric struct {
value uint64
name string
tags []string
rate float64
}

func newGaugeMetric(name string, value float64, tags []string, rate float64) *gaugeMetric {
func newGaugeMetric(name string, value float64, tags []string) *gaugeMetric {
return &gaugeMetric{
value: math.Float64bits(value),
name: name,
tags: tags,
rate: rate,
}
}

Expand All @@ -70,7 +66,7 @@ func (g *gaugeMetric) flushUnsafe() metric {
metricType: gauge,
name: g.name,
tags: g.tags,
rate: g.rate,
rate: 1,
fvalue: math.Float64frombits(g.value),
}
}
Expand All @@ -81,16 +77,14 @@ type setMetric struct {
data map[string]struct{}
name string
tags []string
rate float64
sync.Mutex
}

func newSetMetric(name string, value string, tags []string, rate float64) *setMetric {
func newSetMetric(name string, value string, tags []string) *setMetric {
set := &setMetric{
data: map[string]struct{}{},
name: name,
tags: tags,
rate: rate,
}
set.data[value] = struct{}{}
return set
Expand All @@ -116,7 +110,7 @@ func (s *setMetric) flushUnsafe() []metric {
metricType: set,
name: s.name,
tags: s.tags,
rate: s.rate,
rate: 1,
svalue: value,
}
i++
Expand Down
31 changes: 9 additions & 22 deletions statsd/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,94 +11,84 @@ import (
)

func TestNewCountMetric(t *testing.T) {
c := newCountMetric("test", 21, []string{"tag1", "tag2"}, 1)
c := newCountMetric("test", 21, []string{"tag1", "tag2"})
assert.Equal(t, c.value, int64(21))
assert.Equal(t, c.name, "test")
assert.Equal(t, c.tags, []string{"tag1", "tag2"})
assert.Equal(t, c.rate, 1.0)
}

func TestCountMetricSample(t *testing.T) {
c := newCountMetric("test", 21, []string{"tag1", "tag2"}, 1)
c := newCountMetric("test", 21, []string{"tag1", "tag2"})
c.sample(12)
assert.Equal(t, c.value, int64(33))
assert.Equal(t, c.name, "test")
assert.Equal(t, c.tags, []string{"tag1", "tag2"})
assert.Equal(t, c.rate, 1.0)
}

func TestFlushUnsafeCountMetricSample(t *testing.T) {
c := newCountMetric("test", 21, []string{"tag1", "tag2"}, 1)
c := newCountMetric("test", 21, []string{"tag1", "tag2"})
m := c.flushUnsafe()
assert.Equal(t, m.metricType, count)
assert.Equal(t, m.ivalue, int64(21))
assert.Equal(t, m.name, "test")
assert.Equal(t, m.tags, []string{"tag1", "tag2"})
assert.Equal(t, m.rate, 1.0)

c.sample(12)
m = c.flushUnsafe()
assert.Equal(t, m.metricType, count)
assert.Equal(t, m.ivalue, int64(33))
assert.Equal(t, m.name, "test")
assert.Equal(t, m.tags, []string{"tag1", "tag2"})
assert.Equal(t, m.rate, 1.0)
}

func TestNewGaugeMetric(t *testing.T) {
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, 1)
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"})
assert.Equal(t, math.Float64frombits(g.value), float64(21))
assert.Equal(t, g.name, "test")
assert.Equal(t, g.tags, []string{"tag1", "tag2"})
assert.Equal(t, g.rate, 1.0)
}

func TestGaugeMetricSample(t *testing.T) {
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, 1)
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"})
g.sample(12)
assert.Equal(t, math.Float64frombits(g.value), float64(12))
assert.Equal(t, g.name, "test")
assert.Equal(t, g.tags, []string{"tag1", "tag2"})
assert.Equal(t, g.rate, 1.0)
}

func TestFlushUnsafeGaugeMetricSample(t *testing.T) {
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, 1)
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"})
m := g.flushUnsafe()
assert.Equal(t, m.metricType, gauge)
assert.Equal(t, m.fvalue, float64(21))
assert.Equal(t, m.name, "test")
assert.Equal(t, m.tags, []string{"tag1", "tag2"})
assert.Equal(t, m.rate, 1.0)

g.sample(12)
m = g.flushUnsafe()
assert.Equal(t, m.metricType, gauge)
assert.Equal(t, m.fvalue, float64(12))
assert.Equal(t, m.name, "test")
assert.Equal(t, m.tags, []string{"tag1", "tag2"})
assert.Equal(t, m.rate, 1.0)
}

func TestNewSetMetric(t *testing.T) {
s := newSetMetric("test", "value1", []string{"tag1", "tag2"}, 1)
s := newSetMetric("test", "value1", []string{"tag1", "tag2"})
assert.Equal(t, s.data, map[string]struct{}{"value1": struct{}{}})
assert.Equal(t, s.name, "test")
assert.Equal(t, s.tags, []string{"tag1", "tag2"})
assert.Equal(t, s.rate, 1.0)
}

func TestSetMetricSample(t *testing.T) {
s := newSetMetric("test", "value1", []string{"tag1", "tag2"}, 1)
s := newSetMetric("test", "value1", []string{"tag1", "tag2"})
s.sample("value2")
assert.Equal(t, s.data, map[string]struct{}{"value1": struct{}{}, "value2": struct{}{}})
assert.Equal(t, s.name, "test")
assert.Equal(t, s.tags, []string{"tag1", "tag2"})
assert.Equal(t, s.rate, 1.0)
}

func TestFlushUnsafeSetMetricSample(t *testing.T) {
s := newSetMetric("test", "value1", []string{"tag1", "tag2"}, 1)
s := newSetMetric("test", "value1", []string{"tag1", "tag2"})
m := s.flushUnsafe()

require.Len(t, m, 1)
Expand All @@ -107,7 +97,6 @@ func TestFlushUnsafeSetMetricSample(t *testing.T) {
assert.Equal(t, m[0].svalue, "value1")
assert.Equal(t, m[0].name, "test")
assert.Equal(t, m[0].tags, []string{"tag1", "tag2"})
assert.Equal(t, m[0].rate, 1.0)

s.sample("value1")
s.sample("value2")
Expand All @@ -122,10 +111,8 @@ func TestFlushUnsafeSetMetricSample(t *testing.T) {
assert.Equal(t, m[0].svalue, "value1")
assert.Equal(t, m[0].name, "test")
assert.Equal(t, m[0].tags, []string{"tag1", "tag2"})
assert.Equal(t, m[0].rate, 1.0)
assert.Equal(t, m[1].metricType, set)
assert.Equal(t, m[1].svalue, "value2")
assert.Equal(t, m[1].name, "test")
assert.Equal(t, m[1].tags, []string{"tag1", "tag2"})
assert.Equal(t, m[1].rate, 1.0)
}
6 changes: 3 additions & 3 deletions statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (c *Client) Gauge(name string, value float64, tags []string, rate float64)
}
atomic.AddUint64(&c.metrics.TotalMetricsGauge, 1)
if c.agg != nil {
return c.agg.gauge(name, value, tags, rate)
return c.agg.gauge(name, value, tags)
}
return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate})
}
Expand All @@ -462,7 +462,7 @@ func (c *Client) Count(name string, value int64, tags []string, rate float64) er
}
atomic.AddUint64(&c.metrics.TotalMetricsCount, 1)
if c.agg != nil {
return c.agg.count(name, value, tags, rate)
return c.agg.count(name, value, tags)
}
return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate})
}
Expand Down Expand Up @@ -502,7 +502,7 @@ func (c *Client) Set(name string, value string, tags []string, rate float64) err
}
atomic.AddUint64(&c.metrics.TotalMetricsSet, 1)
if c.agg != nil {
return c.agg.set(name, value, tags, rate)
return c.agg.set(name, value, tags)
}
return c.send(metric{metricType: set, name: name, svalue: value, tags: tags, rate: rate})
}
Expand Down

0 comments on commit 01fdab5

Please sign in to comment.