Skip to content

Commit

Permalink
Client side aggregation for distribution, histogram and timing
Browse files Browse the repository at this point in the history
We now buffer values for those types and pack them in one dogstatsd
message. This is based on the new dogstatsd 1.1 protocol.
  • Loading branch information
hush-hush committed Jan 15, 2021
1 parent 8a12112 commit b2531ec
Show file tree
Hide file tree
Showing 15 changed files with 916 additions and 83 deletions.
126 changes: 105 additions & 21 deletions statsd/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,62 @@ import (
)

type (
countsMap map[string]*countMetric
gaugesMap map[string]*gaugeMetric
setsMap map[string]*setMetric
countsMap map[string]*countMetric
gaugesMap map[string]*gaugeMetric
setsMap map[string]*setMetric
bufferedMetricMap map[string]*histogramMetric
)

// bufferedMetricContexts represent the contexts for Histograms, Distributions
// and Timing. Since those 3 metric types behave the same way and are sampled
// with the same type they're represented by the same class.
type bufferedMetricContexts struct {
nbContext int32
mutex sync.RWMutex
values bufferedMetricMap
newMetric func(string, float64, string) *bufferedMetric
}

func newBufferedContexts(newMetric func(string, float64, string) *bufferedMetric) bufferedMetricContexts {
return bufferedMetricContexts{
values: bufferedMetricMap{},
newMetric: newMetric,
}
}

func (bc *bufferedMetricContexts) flush(metrics []metric) []metric {
bc.mutex.Lock()
values := bc.values
bc.values = bufferedMetricMap{}
bc.mutex.Unlock()

for _, d := range values {
metrics = append(metrics, d.flushUnsafe())
}
atomic.AddInt32(&bc.nbContext, int32(len(values)))
return metrics
}

func (bc *bufferedMetricContexts) sample(name string, value float64, tags []string) error {
context, stringTags := getContextAndTags(name, tags)
bc.mutex.RLock()
if v, found := bc.values[context]; found {
v.sample(value)
bc.mutex.RUnlock()
return nil
}
bc.mutex.RUnlock()

bc.mutex.Lock()
bc.values[context] = bc.newMetric(name, value, stringTags)
bc.mutex.Unlock()
return nil
}

func (bc *bufferedMetricContexts) resetAndGetNbContext() int32 {
return atomic.SwapInt32(&bc.nbContext, 0)
}

type aggregator struct {
nbContextGauge int32
nbContextCount int32
Expand All @@ -22,9 +73,12 @@ type aggregator struct {
gaugesM sync.RWMutex
setsM sync.RWMutex

gauges gaugesMap
counts countsMap
sets setsMap
gauges gaugesMap
counts countsMap
sets setsMap
histograms bufferedMetricContexts
distributions bufferedMetricContexts
timings bufferedMetricContexts

closed chan struct{}
exited chan struct{}
Expand All @@ -33,20 +87,26 @@ type aggregator struct {
}

type aggregatorMetrics struct {
nbContext int32
nbContextGauge int32
nbContextCount int32
nbContextSet int32
nbContext int32
nbContextGauge int32
nbContextCount int32
nbContextSet int32
nbContextHistogram int32
nbContextDistribution int32
nbContextTiming int32
}

func newAggregator(c *Client) *aggregator {
return &aggregator{
client: c,
counts: countsMap{},
gauges: gaugesMap{},
sets: setsMap{},
closed: make(chan struct{}),
exited: make(chan struct{}),
client: c,
counts: countsMap{},
gauges: gaugesMap{},
sets: setsMap{},
histograms: newBufferedContexts(newHistogramMetric),
distributions: newBufferedContexts(newDistributionMetric),
timings: newBufferedContexts(newTimingMetric),
closed: make(chan struct{}),
exited: make(chan struct{}),
}
}

Expand Down Expand Up @@ -84,12 +144,15 @@ func (a *aggregator) flushTelemetryMetrics() *aggregatorMetrics {
}

am := &aggregatorMetrics{
nbContextGauge: atomic.SwapInt32(&a.nbContextGauge, 0),
nbContextCount: atomic.SwapInt32(&a.nbContextCount, 0),
nbContextSet: atomic.SwapInt32(&a.nbContextSet, 0),
nbContextGauge: atomic.SwapInt32(&a.nbContextGauge, 0),
nbContextCount: atomic.SwapInt32(&a.nbContextCount, 0),
nbContextSet: atomic.SwapInt32(&a.nbContextSet, 0),
nbContextHistogram: a.histograms.resetAndGetNbContext(),
nbContextDistribution: a.distributions.resetAndGetNbContext(),
nbContextTiming: a.timings.resetAndGetNbContext(),
}

am.nbContext = am.nbContextGauge + am.nbContextCount + am.nbContextSet
am.nbContext = am.nbContextGauge + am.nbContextCount + am.nbContextSet + am.nbContextHistogram + am.nbContextDistribution + am.nbContextTiming
return am
}

Expand Down Expand Up @@ -126,14 +189,23 @@ func (a *aggregator) flushMetrics() []metric {
metrics = append(metrics, c.flushUnsafe())
}

metrics = a.histograms.flush(metrics)
metrics = a.distributions.flush(metrics)
metrics = a.timings.flush(metrics)

atomic.AddInt32(&a.nbContextCount, int32(len(counts)))
atomic.AddInt32(&a.nbContextGauge, int32(len(gauges)))
atomic.AddInt32(&a.nbContextSet, int32(len(sets)))
return metrics
}

func getContext(name string, tags []string) string {
return name + ":" + strings.Join(tags, ",")
return name + ":" + strings.Join(tags, tagSeparatorSymbol)
}

func getContextAndTags(name string, tags []string) (string, string) {
stringTags := strings.Join(tags, tagSeparatorSymbol)
return name + ":" + stringTags, stringTags
}

func (a *aggregator) count(name string, value int64, tags []string) error {
Expand Down Expand Up @@ -185,3 +257,15 @@ func (a *aggregator) set(name string, value string, tags []string) error {
a.setsM.Unlock()
return nil
}

func (a *aggregator) histogram(name string, value float64, tags []string) error {
return a.histograms.sample(name, value, tags)
}

func (a *aggregator) distribution(name string, value float64, tags []string) error {
return a.distributions.sample(name, value, tags)
}

func (a *aggregator) timing(name string, value float64, tags []string) error {
return a.timings.sample(name, value, tags)
}
119 changes: 93 additions & 26 deletions statsd/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,35 @@ func TestAggregatorSample(t *testing.T) {

tags := []string{"tag1", "tag2"}

a.gauge("gaugeTest", 21, tags)
assert.Len(t, a.gauges, 1)
assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2")

a.count("countTest", 21, tags)
assert.Len(t, a.counts, 1)
assert.Contains(t, a.counts, "countTest:tag1,tag2")

a.set("setTest", "value1", tags)
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")

a.gauge("gaugeTest", 123, tags)
assert.Len(t, a.gauges, 1)
assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2")

a.count("countTest", 10, tags)
assert.Len(t, a.counts, 1)
assert.Contains(t, a.counts, "countTest:tag1,tag2")

a.set("setTest", "value1", tags)
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")
for i := 0; i < 2; i++ {
a.gauge("gaugeTest", 21, tags)
assert.Len(t, a.gauges, 1)
assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2")

a.count("countTest", 21, tags)
assert.Len(t, a.counts, 1)
assert.Contains(t, a.counts, "countTest:tag1,tag2")

a.set("setTest", "value1", tags)
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")

a.set("setTest", "value1", tags)
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")

a.histogram("histogramTest", 21, tags)
assert.Len(t, a.histograms.values, 1)
assert.Contains(t, a.histograms.values, "histogramTest:tag1,tag2")

a.distribution("distributionTest", 21, tags)
assert.Len(t, a.distributions.values, 1)
assert.Contains(t, a.distributions.values, "distributionTest:tag1,tag2")

a.timing("timingTest", 21, tags)
assert.Len(t, a.timings.values, 1)
assert.Contains(t, a.timings.values, "timingTest:tag1,tag2")
}
}

func TestAggregatorFlush(t *testing.T) {
Expand All @@ -57,13 +63,28 @@ func TestAggregatorFlush(t *testing.T) {
a.set("setTest1", "value2", tags)
a.set("setTest2", "value1", tags)

a.histogram("histogramTest1", 21, tags)
a.histogram("histogramTest1", 22, tags)
a.histogram("histogramTest2", 23, tags)

a.distribution("distributionTest1", 21, tags)
a.distribution("distributionTest1", 22, tags)
a.distribution("distributionTest2", 23, tags)

a.timing("timingTest1", 21, tags)
a.timing("timingTest1", 22, tags)
a.timing("timingTest2", 23, tags)

metrics := a.flushMetrics()

assert.Len(t, a.gauges, 0)
assert.Len(t, a.counts, 0)
assert.Len(t, a.sets, 0)
assert.Len(t, a.histograms.values, 0)
assert.Len(t, a.distributions.values, 0)
assert.Len(t, a.timings.values, 0)

assert.Len(t, metrics, 7)
assert.Len(t, metrics, 13)

sort.Slice(metrics, func(i, j int) bool {
if metrics[i].metricType == metrics[j].metricType {
Expand All @@ -77,7 +98,7 @@ func TestAggregatorFlush(t *testing.T) {
return metrics[i].metricType < metrics[j].metricType
})

assert.Equal(t, metrics, []metric{
assert.Equal(t, []metric{
metric{
metricType: gauge,
name: "gaugeTest1",
Expand Down Expand Up @@ -106,6 +127,34 @@ func TestAggregatorFlush(t *testing.T) {
rate: 1,
ivalue: int64(1),
},
metric{
metricType: histogramAggregated,
name: "histogramTest1",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: 1,
fvalues: []float64{21.0, 22.0},
},
metric{
metricType: histogramAggregated,
name: "histogramTest2",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: 1,
fvalues: []float64{23.0},
},
metric{
metricType: distributionAggregated,
name: "distributionTest1",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: 1,
fvalues: []float64{21.0, 22.0},
},
metric{
metricType: distributionAggregated,
name: "distributionTest2",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: 1,
fvalues: []float64{23.0},
},
metric{
metricType: set,
name: "setTest1",
Expand All @@ -127,7 +176,22 @@ func TestAggregatorFlush(t *testing.T) {
rate: 1,
svalue: "value1",
},
})
metric{
metricType: timingAggregated,
name: "timingTest1",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: 1,
fvalues: []float64{21.0, 22.0},
},
metric{
metricType: timingAggregated,
name: "timingTest2",
stags: strings.Join(tags, tagSeparatorSymbol),
rate: 1,
fvalues: []float64{23.0},
},
},
metrics)

}

Expand All @@ -146,6 +210,9 @@ func TestAggregatorFlushConcurrency(t *testing.T) {
a.gauge("gaugeTest1", 21, tags)
a.count("countTest1", 21, tags)
a.set("setTest1", "value1", tags)
a.histogram("histogramTest1", 21, tags)
a.distribution("distributionTest1", 21, tags)
a.timing("timingTest1", 21, tags)
}()
}

Expand Down
Loading

0 comments on commit b2531ec

Please sign in to comment.