Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop sampling rate when aggregation is enabled #170

Merged
merged 1 commit into from
Oct 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions statsd/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func getContext(name string, tags []string) string {
return name + ":" + strings.Join(tags, ",")
}

func (a *aggregator) count(name string, value int64, tags []string, rate float64) error {
func (a *aggregator) count(name string, value int64, tags []string) error {
context := getContext(name, tags)
a.countsM.RLock()
if count, found := a.counts[context]; found {
Expand All @@ -147,12 +147,12 @@ func (a *aggregator) count(name string, value int64, tags []string, rate float64
a.countsM.RUnlock()

a.countsM.Lock()
a.counts[context] = newCountMetric(name, value, tags, rate)
a.counts[context] = newCountMetric(name, value, tags)
a.countsM.Unlock()
return nil
}

func (a *aggregator) gauge(name string, value float64, tags []string, rate float64) error {
func (a *aggregator) gauge(name string, value float64, tags []string) error {
context := getContext(name, tags)
a.gaugesM.RLock()
if gauge, found := a.gauges[context]; found {
Expand All @@ -162,15 +162,15 @@ func (a *aggregator) gauge(name string, value float64, tags []string, rate float
}
a.gaugesM.RUnlock()

gauge := newGaugeMetric(name, value, tags, rate)
gauge := newGaugeMetric(name, value, tags)

a.gaugesM.Lock()
a.gauges[context] = gauge
a.gaugesM.Unlock()
return nil
}

func (a *aggregator) set(name string, value string, tags []string, rate float64) error {
func (a *aggregator) set(name string, value string, tags []string) error {
context := getContext(name, tags)
a.setsM.RLock()
if set, found := a.sets[context]; found {
Expand All @@ -181,7 +181,7 @@ func (a *aggregator) set(name string, value string, tags []string, rate float64)
a.setsM.RUnlock()

a.setsM.Lock()
a.sets[context] = newSetMetric(name, value, tags, rate)
a.sets[context] = newSetMetric(name, value, tags)
a.setsM.Unlock()
return nil
}
38 changes: 19 additions & 19 deletions statsd/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,27 @@ func TestAggregatorSample(t *testing.T) {

tags := []string{"tag1", "tag2"}

a.gauge("gaugeTest", 21, tags, 1)
a.gauge("gaugeTest", 21, tags)
assert.Len(t, a.gauges, 1)
assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2")

a.count("countTest", 21, tags, 1)
a.count("countTest", 21, tags)
assert.Len(t, a.counts, 1)
assert.Contains(t, a.counts, "countTest:tag1,tag2")

a.set("setTest", "value1", tags, 1)
a.set("setTest", "value1", tags)
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")

a.gauge("gaugeTest", 123, tags, 1)
a.gauge("gaugeTest", 123, tags)
assert.Len(t, a.gauges, 1)
assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2")

a.count("countTest", 10, tags, 1)
a.count("countTest", 10, tags)
assert.Len(t, a.counts, 1)
assert.Contains(t, a.counts, "countTest:tag1,tag2")

a.set("setTest", "value1", tags, 1)
a.set("setTest", "value1", tags)
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")
}
Expand All @@ -44,18 +44,18 @@ func TestAggregatorFlush(t *testing.T) {

tags := []string{"tag1", "tag2"}

a.gauge("gaugeTest1", 21, tags, 1)
a.gauge("gaugeTest1", 10, tags, 1)
a.gauge("gaugeTest2", 15, tags, 1)
a.gauge("gaugeTest1", 21, tags)
a.gauge("gaugeTest1", 10, tags)
a.gauge("gaugeTest2", 15, tags)

a.count("countTest1", 21, tags, 1)
a.count("countTest1", 10, tags, 1)
a.count("countTest2", 1, tags, 1)
a.count("countTest1", 21, tags)
a.count("countTest1", 10, tags)
a.count("countTest2", 1, tags)

a.set("setTest1", "value1", tags, 1)
a.set("setTest1", "value1", tags, 1)
a.set("setTest1", "value2", tags, 1)
a.set("setTest2", "value1", tags, 1)
a.set("setTest1", "value1", tags)
a.set("setTest1", "value1", tags)
a.set("setTest1", "value2", tags)
a.set("setTest2", "value1", tags)

metrics := a.flushMetrics()

Expand Down Expand Up @@ -143,9 +143,9 @@ func TestAggregatorFlushConcurrency(t *testing.T) {
go func() {
defer wg.Done()

a.gauge("gaugeTest1", 21, tags, 1)
a.count("countTest1", 21, tags, 1)
a.set("setTest1", "value1", tags, 1)
a.gauge("gaugeTest1", 21, tags)
a.count("countTest1", 21, tags)
a.set("setTest1", "value1", tags)
}()
}

Expand Down
18 changes: 6 additions & 12 deletions statsd/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ type countMetric struct {
value int64
name string
tags []string
rate float64
}

func newCountMetric(name string, value int64, tags []string, rate float64) *countMetric {
func newCountMetric(name string, value int64, tags []string) *countMetric {
return &countMetric{
value: value,
name: name,
tags: tags,
rate: rate,
}
}

Expand All @@ -38,7 +36,7 @@ func (c *countMetric) flushUnsafe() metric {
metricType: count,
name: c.name,
tags: c.tags,
rate: c.rate,
rate: 1,
ivalue: c.value,
}
}
Expand All @@ -49,15 +47,13 @@ type gaugeMetric struct {
value uint64
name string
tags []string
rate float64
}

func newGaugeMetric(name string, value float64, tags []string, rate float64) *gaugeMetric {
func newGaugeMetric(name string, value float64, tags []string) *gaugeMetric {
return &gaugeMetric{
value: math.Float64bits(value),
name: name,
tags: tags,
rate: rate,
}
}

Expand All @@ -70,7 +66,7 @@ func (g *gaugeMetric) flushUnsafe() metric {
metricType: gauge,
name: g.name,
tags: g.tags,
rate: g.rate,
rate: 1,
fvalue: math.Float64frombits(g.value),
}
}
Expand All @@ -81,16 +77,14 @@ type setMetric struct {
data map[string]struct{}
name string
tags []string
rate float64
sync.Mutex
}

func newSetMetric(name string, value string, tags []string, rate float64) *setMetric {
func newSetMetric(name string, value string, tags []string) *setMetric {
set := &setMetric{
data: map[string]struct{}{},
name: name,
tags: tags,
rate: rate,
}
set.data[value] = struct{}{}
return set
Expand All @@ -116,7 +110,7 @@ func (s *setMetric) flushUnsafe() []metric {
metricType: set,
name: s.name,
tags: s.tags,
rate: s.rate,
rate: 1,
svalue: value,
}
i++
Expand Down
31 changes: 9 additions & 22 deletions statsd/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,94 +11,84 @@ import (
)

func TestNewCountMetric(t *testing.T) {
c := newCountMetric("test", 21, []string{"tag1", "tag2"}, 1)
c := newCountMetric("test", 21, []string{"tag1", "tag2"})
assert.Equal(t, c.value, int64(21))
assert.Equal(t, c.name, "test")
assert.Equal(t, c.tags, []string{"tag1", "tag2"})
assert.Equal(t, c.rate, 1.0)
}

func TestCountMetricSample(t *testing.T) {
c := newCountMetric("test", 21, []string{"tag1", "tag2"}, 1)
c := newCountMetric("test", 21, []string{"tag1", "tag2"})
c.sample(12)
assert.Equal(t, c.value, int64(33))
assert.Equal(t, c.name, "test")
assert.Equal(t, c.tags, []string{"tag1", "tag2"})
assert.Equal(t, c.rate, 1.0)
}

func TestFlushUnsafeCountMetricSample(t *testing.T) {
c := newCountMetric("test", 21, []string{"tag1", "tag2"}, 1)
c := newCountMetric("test", 21, []string{"tag1", "tag2"})
m := c.flushUnsafe()
assert.Equal(t, m.metricType, count)
assert.Equal(t, m.ivalue, int64(21))
assert.Equal(t, m.name, "test")
assert.Equal(t, m.tags, []string{"tag1", "tag2"})
assert.Equal(t, m.rate, 1.0)

c.sample(12)
m = c.flushUnsafe()
assert.Equal(t, m.metricType, count)
assert.Equal(t, m.ivalue, int64(33))
assert.Equal(t, m.name, "test")
assert.Equal(t, m.tags, []string{"tag1", "tag2"})
assert.Equal(t, m.rate, 1.0)
}

func TestNewGaugeMetric(t *testing.T) {
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, 1)
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"})
assert.Equal(t, math.Float64frombits(g.value), float64(21))
assert.Equal(t, g.name, "test")
assert.Equal(t, g.tags, []string{"tag1", "tag2"})
assert.Equal(t, g.rate, 1.0)
}

func TestGaugeMetricSample(t *testing.T) {
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, 1)
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"})
g.sample(12)
assert.Equal(t, math.Float64frombits(g.value), float64(12))
assert.Equal(t, g.name, "test")
assert.Equal(t, g.tags, []string{"tag1", "tag2"})
assert.Equal(t, g.rate, 1.0)
}

func TestFlushUnsafeGaugeMetricSample(t *testing.T) {
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"}, 1)
g := newGaugeMetric("test", 21, []string{"tag1", "tag2"})
m := g.flushUnsafe()
assert.Equal(t, m.metricType, gauge)
assert.Equal(t, m.fvalue, float64(21))
assert.Equal(t, m.name, "test")
assert.Equal(t, m.tags, []string{"tag1", "tag2"})
assert.Equal(t, m.rate, 1.0)

g.sample(12)
m = g.flushUnsafe()
assert.Equal(t, m.metricType, gauge)
assert.Equal(t, m.fvalue, float64(12))
assert.Equal(t, m.name, "test")
assert.Equal(t, m.tags, []string{"tag1", "tag2"})
assert.Equal(t, m.rate, 1.0)
}

func TestNewSetMetric(t *testing.T) {
s := newSetMetric("test", "value1", []string{"tag1", "tag2"}, 1)
s := newSetMetric("test", "value1", []string{"tag1", "tag2"})
assert.Equal(t, s.data, map[string]struct{}{"value1": struct{}{}})
assert.Equal(t, s.name, "test")
assert.Equal(t, s.tags, []string{"tag1", "tag2"})
assert.Equal(t, s.rate, 1.0)
}

func TestSetMetricSample(t *testing.T) {
s := newSetMetric("test", "value1", []string{"tag1", "tag2"}, 1)
s := newSetMetric("test", "value1", []string{"tag1", "tag2"})
s.sample("value2")
assert.Equal(t, s.data, map[string]struct{}{"value1": struct{}{}, "value2": struct{}{}})
assert.Equal(t, s.name, "test")
assert.Equal(t, s.tags, []string{"tag1", "tag2"})
assert.Equal(t, s.rate, 1.0)
}

func TestFlushUnsafeSetMetricSample(t *testing.T) {
s := newSetMetric("test", "value1", []string{"tag1", "tag2"}, 1)
s := newSetMetric("test", "value1", []string{"tag1", "tag2"})
m := s.flushUnsafe()

require.Len(t, m, 1)
Expand All @@ -107,7 +97,6 @@ func TestFlushUnsafeSetMetricSample(t *testing.T) {
assert.Equal(t, m[0].svalue, "value1")
assert.Equal(t, m[0].name, "test")
assert.Equal(t, m[0].tags, []string{"tag1", "tag2"})
assert.Equal(t, m[0].rate, 1.0)

s.sample("value1")
s.sample("value2")
Expand All @@ -122,10 +111,8 @@ func TestFlushUnsafeSetMetricSample(t *testing.T) {
assert.Equal(t, m[0].svalue, "value1")
assert.Equal(t, m[0].name, "test")
assert.Equal(t, m[0].tags, []string{"tag1", "tag2"})
assert.Equal(t, m[0].rate, 1.0)
assert.Equal(t, m[1].metricType, set)
assert.Equal(t, m[1].svalue, "value2")
assert.Equal(t, m[1].name, "test")
assert.Equal(t, m[1].tags, []string{"tag1", "tag2"})
assert.Equal(t, m[1].rate, 1.0)
}
6 changes: 3 additions & 3 deletions statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (c *Client) Gauge(name string, value float64, tags []string, rate float64)
}
atomic.AddUint64(&c.metrics.TotalMetricsGauge, 1)
if c.agg != nil {
return c.agg.gauge(name, value, tags, rate)
return c.agg.gauge(name, value, tags)
}
return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate})
}
Expand All @@ -462,7 +462,7 @@ func (c *Client) Count(name string, value int64, tags []string, rate float64) er
}
atomic.AddUint64(&c.metrics.TotalMetricsCount, 1)
if c.agg != nil {
return c.agg.count(name, value, tags, rate)
return c.agg.count(name, value, tags)
}
return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate})
}
Expand Down Expand Up @@ -502,7 +502,7 @@ func (c *Client) Set(name string, value string, tags []string, rate float64) err
}
atomic.AddUint64(&c.metrics.TotalMetricsSet, 1)
if c.agg != nil {
return c.agg.set(name, value, tags, rate)
return c.agg.set(name, value, tags)
}
return c.send(metric{metricType: set, name: name, svalue: value, tags: tags, rate: rate})
}
Expand Down