From ea6839b71d6ec074185a13a3f1acb7e680c46db9 Mon Sep 17 00:00:00 2001 From: Maxime mouial Date: Thu, 15 Oct 2020 12:58:52 +0200 Subject: [PATCH] Client side aggregation for distribution, histogram and timing --- statsd/agent_version.go | 33 +++++ statsd/agent_version_test.go | 21 +++ statsd/aggregator.go | 162 ++++++++++++++++++----- statsd/aggregator_test.go | 119 +++++++++++++---- statsd/buffer.go | 62 ++++++++- statsd/buffer_test.go | 54 ++++++++ statsd/format.go | 31 ++++- statsd/format_test.go | 36 ++++++ statsd/metrics.go | 62 +++++++++ statsd/metrics_test.go | 114 ++++++++++++++++ statsd/options.go | 20 +++ statsd/options_test.go | 4 + statsd/statsd.go | 37 ++++-- statsd/telemetry.go | 3 + statsd/telemetry_test.go | 32 ++++- statsd/worker.go | 31 +++++ statsd/worker_test.go | 244 +++++++++++++++++++++++++++++++++++ 17 files changed, 987 insertions(+), 78 deletions(-) create mode 100644 statsd/agent_version.go create mode 100644 statsd/agent_version_test.go create mode 100644 statsd/worker_test.go diff --git a/statsd/agent_version.go b/statsd/agent_version.go new file mode 100644 index 000000000..3d4352e6f --- /dev/null +++ b/statsd/agent_version.go @@ -0,0 +1,33 @@ +package statsd + +import ( + "fmt" + + "github.com/Masterminds/semver" +) + +var ( + // multiple value per message was introduce in Dogstatsd protocol 1.1 + // implemented since Agent 6.25+ and 7.25+ + multiValuePerMessageCond *semver.Constraints +) + +func init() { + multiValuePerMessageCond, _ = semver.NewConstraint(">= 6.25 < 7.0.0 || >= 7.25") +} + +type agentFeatures struct { + multiValuePerMessage bool +} + +func computeAgentFeature(version string) (*agentFeatures, error) { + af := agentFeatures{} + v, err := semver.NewVersion(version) + if err != nil { + return nil, fmt.Errorf("could not parse version '%s': %s", version, err) + } + + af.multiValuePerMessage = multiValuePerMessageCond.Check(v) + + return &af, nil +} diff --git a/statsd/agent_version_test.go b/statsd/agent_version_test.go new file mode 100644 index 000000000..27c784076 --- /dev/null +++ b/statsd/agent_version_test.go @@ -0,0 +1,21 @@ +package statsd + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAgentFeatureMultipleValue(t *testing.T) { + af, err := computeAgentFeature("6.25.0") + assert.Nil(t, err) + assert.True(t, af.multiValuePerMessage) + + af, err = computeAgentFeature("7.25.0") + assert.Nil(t, err) + assert.True(t, af.multiValuePerMessage) + + af, err = computeAgentFeature("7.24.0") + assert.Nil(t, err) + assert.False(t, af.multiValuePerMessage) +} diff --git a/statsd/aggregator.go b/statsd/aggregator.go index 84815e68d..2a98486ab 100644 --- a/statsd/aggregator.go +++ b/statsd/aggregator.go @@ -8,23 +8,35 @@ import ( ) type ( - countsMap map[string]*countMetric - gaugesMap map[string]*gaugeMetric - setsMap map[string]*setMetric + countsMap map[string]*countMetric + gaugesMap map[string]*gaugeMetric + setsMap map[string]*setMetric + histogramMap map[string]*histogramMetric + distributionMap map[string]*distributionMetric + timingMap map[string]*timingMetric ) type aggregator struct { - nbContextGauge int32 - nbContextCount int32 - nbContextSet int32 - - countsM sync.RWMutex - gaugesM sync.RWMutex - setsM sync.RWMutex - - gauges gaugesMap - counts countsMap - sets setsMap + nbContextGauge int32 + nbContextCount int32 + nbContextSet int32 + nbContextHistogram int32 + nbContextDistribution int32 + nbContextTiming int32 + + countsM sync.RWMutex + gaugesM sync.RWMutex + setsM sync.RWMutex + histogramsM sync.RWMutex + distributionM sync.RWMutex + timingM sync.RWMutex + + gauges gaugesMap + counts countsMap + sets setsMap + histograms histogramMap + distributions distributionMap + timings timingMap closed chan struct{} exited chan struct{} @@ -33,20 +45,26 @@ type aggregator struct { } type aggregatorMetrics struct { - nbContext int32 - nbContextGauge int32 - nbContextCount int32 - nbContextSet int32 + nbContext int32 + nbContextGauge int32 + nbContextCount int32 + nbContextSet int32 + nbContextHistogram int32 + nbContextDistribution int32 + nbContextTiming int32 } func newAggregator(c *Client) *aggregator { return &aggregator{ - client: c, - counts: countsMap{}, - gauges: gaugesMap{}, - sets: setsMap{}, - closed: make(chan struct{}), - exited: make(chan struct{}), + client: c, + counts: countsMap{}, + gauges: gaugesMap{}, + sets: setsMap{}, + histograms: histogramMap{}, + distributions: distributionMap{}, + timings: timingMap{}, + closed: make(chan struct{}), + exited: make(chan struct{}), } } @@ -84,12 +102,15 @@ func (a *aggregator) flushTelemetryMetrics() *aggregatorMetrics { } am := &aggregatorMetrics{ - nbContextGauge: atomic.SwapInt32(&a.nbContextGauge, 0), - nbContextCount: atomic.SwapInt32(&a.nbContextCount, 0), - nbContextSet: atomic.SwapInt32(&a.nbContextSet, 0), + nbContextGauge: atomic.SwapInt32(&a.nbContextGauge, 0), + nbContextCount: atomic.SwapInt32(&a.nbContextCount, 0), + nbContextSet: atomic.SwapInt32(&a.nbContextSet, 0), + nbContextHistogram: atomic.SwapInt32(&a.nbContextHistogram, 0), + nbContextDistribution: atomic.SwapInt32(&a.nbContextDistribution, 0), + nbContextTiming: atomic.SwapInt32(&a.nbContextTiming, 0), } - am.nbContext = am.nbContextGauge + am.nbContextCount + am.nbContextSet + am.nbContext = am.nbContextGauge + am.nbContextCount + am.nbContextSet + am.nbContextHistogram + am.nbContextDistribution + am.nbContextTiming return am } @@ -126,14 +147,49 @@ func (a *aggregator) flushMetrics() []metric { metrics = append(metrics, c.flushUnsafe()) } + a.histogramsM.Lock() + histograms := a.histograms + a.histograms = histogramMap{} + a.histogramsM.Unlock() + + for _, h := range histograms { + metrics = append(metrics, h.flushUnsafe()) + } + + a.distributionM.Lock() + distributions := a.distributions + a.distributions = distributionMap{} + a.distributionM.Unlock() + + for _, d := range distributions { + metrics = append(metrics, d.flushUnsafe()) + } + + a.timingM.Lock() + timings := a.timings + a.timings = timingMap{} + a.timingM.Unlock() + + for _, t := range timings { + metrics = append(metrics, t.flushUnsafe()) + } + atomic.AddInt32(&a.nbContextCount, int32(len(counts))) atomic.AddInt32(&a.nbContextGauge, int32(len(gauges))) atomic.AddInt32(&a.nbContextSet, int32(len(sets))) + atomic.AddInt32(&a.nbContextHistogram, int32(len(histograms))) + atomic.AddInt32(&a.nbContextDistribution, int32(len(distributions))) + atomic.AddInt32(&a.nbContextTiming, int32(len(timings))) return metrics } func getContext(name string, tags []string) string { - return name + ":" + strings.Join(tags, ",") + return name + ":" + strings.Join(tags, tagSeparatorSymbol) +} + +func getContextAndTags(name string, tags []string) (string, string) { + stringTags := strings.Join(tags, tagSeparatorSymbol) + return name + ":" + stringTags, stringTags } func (a *aggregator) count(name string, value int64, tags []string) error { @@ -185,3 +241,51 @@ func (a *aggregator) set(name string, value string, tags []string) error { a.setsM.Unlock() return nil } + +func (a *aggregator) histogram(name string, value float64, tags []string) error { + context, stringTags := getContextAndTags(name, tags) + a.histogramsM.RLock() + if histogram, found := a.histograms[context]; found { + histogram.sample(value) + a.histogramsM.RUnlock() + return nil + } + a.histogramsM.RUnlock() + + a.histogramsM.Lock() + a.histograms[context] = newHistogramMetric(name, value, stringTags) + a.histogramsM.Unlock() + return nil +} + +func (a *aggregator) distribution(name string, value float64, tags []string) error { + context, stringTags := getContextAndTags(name, tags) + a.distributionM.RLock() + if distribution, found := a.distributions[context]; found { + distribution.sample(value) + a.distributionM.RUnlock() + return nil + } + a.distributionM.RUnlock() + + a.distributionM.Lock() + a.distributions[context] = newDistributionMetric(name, value, stringTags) + a.distributionM.Unlock() + return nil +} + +func (a *aggregator) timing(name string, value float64, tags []string) error { + context, stringTags := getContextAndTags(name, tags) + a.timingM.RLock() + if distribution, found := a.timings[context]; found { + distribution.sample(value) + a.timingM.RUnlock() + return nil + } + a.timingM.RUnlock() + + a.timingM.Lock() + a.timings[context] = newTimingMetric(name, value, stringTags) + a.timingM.Unlock() + return nil +} diff --git a/statsd/aggregator_test.go b/statsd/aggregator_test.go index d8a7a4ac9..a155152e7 100644 --- a/statsd/aggregator_test.go +++ b/statsd/aggregator_test.go @@ -14,29 +14,35 @@ func TestAggregatorSample(t *testing.T) { tags := []string{"tag1", "tag2"} - a.gauge("gaugeTest", 21, tags) - assert.Len(t, a.gauges, 1) - assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2") - - a.count("countTest", 21, tags) - assert.Len(t, a.counts, 1) - assert.Contains(t, a.counts, "countTest:tag1,tag2") - - a.set("setTest", "value1", tags) - assert.Len(t, a.sets, 1) - assert.Contains(t, a.sets, "setTest:tag1,tag2") - - a.gauge("gaugeTest", 123, tags) - assert.Len(t, a.gauges, 1) - assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2") - - a.count("countTest", 10, tags) - assert.Len(t, a.counts, 1) - assert.Contains(t, a.counts, "countTest:tag1,tag2") - - a.set("setTest", "value1", tags) - assert.Len(t, a.sets, 1) - assert.Contains(t, a.sets, "setTest:tag1,tag2") + for i := 0; i < 2; i++ { + a.gauge("gaugeTest", 21, tags) + assert.Len(t, a.gauges, 1) + assert.Contains(t, a.gauges, "gaugeTest:tag1,tag2") + + a.count("countTest", 21, tags) + assert.Len(t, a.counts, 1) + assert.Contains(t, a.counts, "countTest:tag1,tag2") + + a.set("setTest", "value1", tags) + assert.Len(t, a.sets, 1) + assert.Contains(t, a.sets, "setTest:tag1,tag2") + + a.set("setTest", "value1", tags) + assert.Len(t, a.sets, 1) + assert.Contains(t, a.sets, "setTest:tag1,tag2") + + a.histogram("histogramTest", 21, tags) + assert.Len(t, a.histograms, 1) + assert.Contains(t, a.histograms, "histogramTest:tag1,tag2") + + a.distribution("distributionTest", 21, tags) + assert.Len(t, a.distributions, 1) + assert.Contains(t, a.distributions, "distributionTest:tag1,tag2") + + a.timing("timingTest", 21, tags) + assert.Len(t, a.timings, 1) + assert.Contains(t, a.timings, "timingTest:tag1,tag2") + } } func TestAggregatorFlush(t *testing.T) { @@ -57,13 +63,28 @@ func TestAggregatorFlush(t *testing.T) { a.set("setTest1", "value2", tags) a.set("setTest2", "value1", tags) + a.histogram("histogramTest1", 21, tags) + a.histogram("histogramTest1", 22, tags) + a.histogram("histogramTest2", 23, tags) + + a.distribution("distributionTest1", 21, tags) + a.distribution("distributionTest1", 22, tags) + a.distribution("distributionTest2", 23, tags) + + a.timing("timingTest1", 21, tags) + a.timing("timingTest1", 22, tags) + a.timing("timingTest2", 23, tags) + metrics := a.flushMetrics() assert.Len(t, a.gauges, 0) assert.Len(t, a.counts, 0) assert.Len(t, a.sets, 0) + assert.Len(t, a.histograms, 0) + assert.Len(t, a.distributions, 0) + assert.Len(t, a.timings, 0) - assert.Len(t, metrics, 7) + assert.Len(t, metrics, 13) sort.Slice(metrics, func(i, j int) bool { if metrics[i].metricType == metrics[j].metricType { @@ -77,7 +98,7 @@ func TestAggregatorFlush(t *testing.T) { return metrics[i].metricType < metrics[j].metricType }) - assert.Equal(t, metrics, []metric{ + assert.Equal(t, []metric{ metric{ metricType: gauge, name: "gaugeTest1", @@ -106,6 +127,34 @@ func TestAggregatorFlush(t *testing.T) { rate: 1, ivalue: int64(1), }, + metric{ + metricType: histogramAggregated, + name: "histogramTest1", + stags: strings.Join(tags, tagSeparatorSymbol), + rate: 1, + fvalues: []float64{21.0, 22.0}, + }, + metric{ + metricType: histogramAggregated, + name: "histogramTest2", + stags: strings.Join(tags, tagSeparatorSymbol), + rate: 1, + fvalues: []float64{23.0}, + }, + metric{ + metricType: distributionAggregated, + name: "distributionTest1", + stags: strings.Join(tags, tagSeparatorSymbol), + rate: 1, + fvalues: []float64{21.0, 22.0}, + }, + metric{ + metricType: distributionAggregated, + name: "distributionTest2", + stags: strings.Join(tags, tagSeparatorSymbol), + rate: 1, + fvalues: []float64{23.0}, + }, metric{ metricType: set, name: "setTest1", @@ -127,7 +176,22 @@ func TestAggregatorFlush(t *testing.T) { rate: 1, svalue: "value1", }, - }) + metric{ + metricType: timingAggregated, + name: "timingTest1", + stags: strings.Join(tags, tagSeparatorSymbol), + rate: 1, + fvalues: []float64{21.0, 22.0}, + }, + metric{ + metricType: timingAggregated, + name: "timingTest2", + stags: strings.Join(tags, tagSeparatorSymbol), + rate: 1, + fvalues: []float64{23.0}, + }, + }, + metrics) } @@ -146,6 +210,9 @@ func TestAggregatorFlushConcurrency(t *testing.T) { a.gauge("gaugeTest1", 21, tags) a.count("countTest1", 21, tags) a.set("setTest1", "value1", tags) + a.histogram("histogramTest1", 21, tags) + a.distribution("distributionTest1", 21, tags) + a.timing("timingTest1", 21, tags) }() } diff --git a/statsd/buffer.go b/statsd/buffer.go index c38e229ab..5da60e09c 100644 --- a/statsd/buffer.go +++ b/statsd/buffer.go @@ -1,11 +1,21 @@ package statsd +import ( + "strconv" +) + type bufferFullError string func (e bufferFullError) Error() string { return string(e) } const errBufferFull = bufferFullError("statsd buffer is full") +type partialWriteError string + +func (e partialWriteError) Error() string { return string(e) } + +const errPartialWrite = partialWriteError("value partially written") + const metricOverhead = 512 // statsdBuffer is a buffer containing statsd messages @@ -55,6 +65,56 @@ func (b *statsdBuffer) writeHistogram(namespace string, globalTags []string, nam return b.validateNewElement(originalBuffer) } +// writeAggregated serialized as many values as possible in the current buffer and return the position in values where it stopped. +func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, globalTags []string, name string, values []float64, tags string, tagSize int) (int, error) { + if b.elementCount >= b.maxElements { + return 0, errBufferFull + } + + originalBuffer := b.buffer + b.buffer = appendHeader(b.buffer, namespace, name) + + // buffer already full + if len(b.buffer)+tagSize > b.maxSize { + b.buffer = originalBuffer + return 0, errBufferFull + } + + // We add as many value as possible + var position int + for idx, v := range values { + previousBuffer := b.buffer + if idx != 0 { + b.buffer = append(b.buffer, ':') + } + b.buffer = strconv.AppendFloat(b.buffer, v, 'f', -1, 64) + + // Should we stop serializing and switch to another buffer + if len(b.buffer)+tagSize > b.maxSize { + b.buffer = previousBuffer + break + } + position = idx + 1 + } + + // we could not add a single value + if position == 0 { + b.buffer = originalBuffer + return 0, errBufferFull + } + + b.buffer = append(b.buffer, '|') + b.buffer = append(b.buffer, metricSymbol...) + b.buffer = appendTagsAggregated(b.buffer, globalTags, tags) + b.elementCount++ + + if position != len(values) { + return position, errPartialWrite + } + return position, nil + +} + func (b *statsdBuffer) writeDistribution(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error { if b.elementCount >= b.maxElements { return errBufferFull @@ -116,7 +176,7 @@ func (b *statsdBuffer) validateNewElement(originalBuffer []byte) error { func (b *statsdBuffer) writeSeparator() { if b.elementCount != 0 { - b.buffer = appendSeparator(b.buffer) + b.buffer = append(b.buffer, '\n') } } diff --git a/statsd/buffer_test.go b/statsd/buffer_test.go index 01c62fa43..d6a7cb882 100644 --- a/statsd/buffer_test.go +++ b/statsd/buffer_test.go @@ -86,3 +86,57 @@ func TestBufferSeparator(t *testing.T) { assert.Nil(t, err) assert.Equal(t, "namespace.metric:1|g|#tag:tag\nnamespace.metric:1|g|#tag:tag", string(buffer.bytes())) } + +func TestBufferAggregated(t *testing.T) { + buffer := newStatsdBuffer(1024, 1) + pos, err := buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1}, "", 12) + assert.Nil(t, err) + assert.Equal(t, 1, pos) + assert.Equal(t, `namespace.metric:1|h|#tag:tag`, string(buffer.bytes())) + + buffer = newStatsdBuffer(1024, 1) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12) + assert.Nil(t, err) + assert.Equal(t, 4, pos) + assert.Equal(t, `namespace.metric:1:2:3:4|h|#tag:tag`, string(buffer.bytes())) + + // max element already used + buffer = newStatsdBuffer(1024, 1) + buffer.elementCount = 1 + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12) + assert.Equal(t, errBufferFull, err) + + // not enought size to start serializing + buffer = newStatsdBuffer(29, 1) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12) + assert.Equal(t, errBufferFull, err) + + // space for only 1 number + buffer = newStatsdBuffer(30, 1) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12) + assert.Equal(t, errPartialWrite, err) + assert.Equal(t, 1, pos) + assert.Equal(t, `namespace.metric:1|h|#tag:tag`, string(buffer.bytes())) + + // first value to big + buffer = newStatsdBuffer(30, 1) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{12, 2, 3, 4}, "", 12) + assert.Equal(t, errBufferFull, err) + assert.Equal(t, 0, pos) + assert.Equal(t, "", string(buffer.bytes())) // checking that the buffer was reset + + // not enough space left + buffer = newStatsdBuffer(40, 1) + buffer.buffer = append(buffer.buffer, []byte("abcdefghij")...) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{12, 2, 3, 4}, "", 12) + assert.Equal(t, errBufferFull, err) + assert.Equal(t, 0, pos) + assert.Equal(t, "abcdefghij", string(buffer.bytes())) // checking that the buffer was reset + + // space for only 2 number + buffer = newStatsdBuffer(32, 1) + pos, err = buffer.writeAggregated([]byte("h"), "namespace.", []string{"tag:tag"}, "metric", []float64{1, 2, 3, 4}, "", 12) + assert.Equal(t, errPartialWrite, err) + assert.Equal(t, 2, pos) + assert.Equal(t, `namespace.metric:1:2|h|#tag:tag`, string(buffer.bytes())) +} diff --git a/statsd/format.go b/statsd/format.go index bd856a048..8d62aa7ba 100644 --- a/statsd/format.go +++ b/statsd/format.go @@ -12,6 +12,7 @@ var ( distributionSymbol = []byte("d") setSymbol = []byte("s") timingSymbol = []byte("ms") + tagSeparatorSymbol = "," ) func appendHeader(buffer []byte, namespace string, name string) []byte { @@ -54,14 +55,14 @@ func appendTags(buffer []byte, globalTags []string, tags []string) []byte { for _, tag := range globalTags { if !firstTag { - buffer = append(buffer, ',') + buffer = append(buffer, tagSeparatorSymbol...) } buffer = appendWithoutNewlines(buffer, tag) firstTag = false } for _, tag := range tags { if !firstTag { - buffer = append(buffer, ',') + buffer = append(buffer, tagSeparatorSymbol...) } buffer = appendWithoutNewlines(buffer, tag) firstTag = false @@ -69,6 +70,30 @@ func appendTags(buffer []byte, globalTags []string, tags []string) []byte { return buffer } +func appendTagsAggregated(buffer []byte, globalTags []string, tags string) []byte { + if len(globalTags) == 0 && tags == "" { + return buffer + } + + buffer = append(buffer, "|#"...) + firstTag := true + + for _, tag := range globalTags { + if !firstTag { + buffer = append(buffer, tagSeparatorSymbol...) + } + buffer = appendWithoutNewlines(buffer, tag) + firstTag = false + } + if tags != "" { + if !firstTag { + buffer = append(buffer, tagSeparatorSymbol...) + } + buffer = appendWithoutNewlines(buffer, tags) + } + return buffer +} + func appendFloatMetric(buffer []byte, typeSymbol []byte, namespace string, globalTags []string, name string, value float64, tags []string, rate float64, precision int) []byte { buffer = appendHeader(buffer, namespace, name) buffer = strconv.AppendFloat(buffer, value, 'f', precision, 64) @@ -143,7 +168,7 @@ func appendEvent(buffer []byte, event Event, globalTags []string) []byte { buffer = append(buffer, "_e{"...) buffer = strconv.AppendInt(buffer, int64(len(event.Title)), 10) - buffer = append(buffer, ',') + buffer = append(buffer, tagSeparatorSymbol...) buffer = strconv.AppendInt(buffer, int64(escapedTextLen), 10) buffer = append(buffer, "}:"...) buffer = append(buffer, event.Title...) diff --git a/statsd/format_test.go b/statsd/format_test.go index d2f9a0280..ae5d59a48 100644 --- a/statsd/format_test.go +++ b/statsd/format_test.go @@ -7,6 +7,42 @@ import ( "github.com/stretchr/testify/assert" ) +func TestFormatAppendTags(t *testing.T) { + var buffer []byte + buffer = appendTags(buffer, []string{"global:tag"}, []string{"tag:tag", "tag2:tag2"}) + assert.Equal(t, `|#global:tag,tag:tag,tag2:tag2`, string(buffer)) + + var buffer2 []byte + buffer2 = appendTags(buffer2, []string{"global:tag"}, nil) + assert.Equal(t, `|#global:tag`, string(buffer2)) + + var buffer3 []byte + buffer3 = appendTags(buffer3, nil, []string{"tag:tag", "tag2:tag2"}) + assert.Equal(t, `|#tag:tag,tag2:tag2`, string(buffer3)) + + var buffer4 []byte + buffer4 = appendTags(buffer4, nil, nil) + assert.Equal(t, "", string(buffer4)) +} + +func TestFormatAppendTagsAggregated(t *testing.T) { + var buffer []byte + buffer = appendTagsAggregated(buffer, []string{"global:tag"}, "tag:tag,tag2:tag2") + assert.Equal(t, `|#global:tag,tag:tag,tag2:tag2`, string(buffer)) + + var buffer2 []byte + buffer2 = appendTagsAggregated(buffer2, []string{"global:tag"}, "") + assert.Equal(t, `|#global:tag`, string(buffer2)) + + var buffer3 []byte + buffer3 = appendTagsAggregated(buffer3, nil, "tag:tag,tag2:tag2") + assert.Equal(t, `|#tag:tag,tag2:tag2`, string(buffer3)) + + var buffer4 []byte + buffer4 = appendTagsAggregated(buffer4, nil, "") + assert.Equal(t, "", string(buffer4)) +} + func TestFormatAppendGauge(t *testing.T) { var buffer []byte buffer = appendGauge(buffer, "namespace.", []string{"global:tag"}, "gauge", 1., []string{"tag:tag"}, 1) diff --git a/statsd/metrics.go b/statsd/metrics.go index de3b448c7..99ed4da53 100644 --- a/statsd/metrics.go +++ b/statsd/metrics.go @@ -117,3 +117,65 @@ func (s *setMetric) flushUnsafe() []metric { } return metrics } + +// Histograms, Distributions and Timings + +type bufferedMetric struct { + sync.Mutex + + data []float64 + name string + // Histograms and Distributions store tags as one string since we need + // to compute its size multiple time when serializing. + tags string + mtype metricType +} + +func (s *bufferedMetric) sample(v float64) { + s.Lock() + defer s.Unlock() + s.data = append(s.data, v) +} + +func (s *bufferedMetric) flushUnsafe() metric { + return metric{ + metricType: s.mtype, + name: s.name, + stags: s.tags, + rate: 1, + fvalues: s.data, + } +} + +type histogramMetric = bufferedMetric + +func newHistogramMetric(name string, value float64, stringTags string) *histogramMetric { + return &histogramMetric{ + data: []float64{value}, + name: name, + tags: stringTags, + mtype: histogramAggregated, + } +} + +type distributionMetric = bufferedMetric + +func newDistributionMetric(name string, value float64, stringTags string) *distributionMetric { + return &distributionMetric{ + data: []float64{value}, + name: name, + tags: stringTags, + mtype: distributionAggregated, + } +} + +type timingMetric = bufferedMetric + +func newTimingMetric(name string, value float64, stringTags string) *timingMetric { + return &timingMetric{ + data: []float64{value}, + name: name, + tags: stringTags, + mtype: timingAggregated, + } +} diff --git a/statsd/metrics_test.go b/statsd/metrics_test.go index 6934109f4..41bb743de 100644 --- a/statsd/metrics_test.go +++ b/statsd/metrics_test.go @@ -116,3 +116,117 @@ func TestFlushUnsafeSetMetricSample(t *testing.T) { assert.Equal(t, m[1].name, "test") assert.Equal(t, m[1].tags, []string{"tag1", "tag2"}) } + +func TestNewHistogramMetric(t *testing.T) { + s := newHistogramMetric("test", 1.0, "tag1,tag2") + assert.Equal(t, s.data, []float64{1.0}) + assert.Equal(t, s.name, "test") + assert.Equal(t, s.tags, "tag1,tag2") + assert.Equal(t, s.mtype, histogramAggregated) +} + +func TestHistogramMetricSample(t *testing.T) { + s := newHistogramMetric("test", 1.0, "tag1,tag2") + s.sample(123.45) + assert.Equal(t, s.data, []float64{1.0, 123.45}) + assert.Equal(t, s.name, "test") + assert.Equal(t, s.tags, "tag1,tag2") + assert.Equal(t, s.mtype, histogramAggregated) +} + +func TestFlushUnsafeHistogramMetricSample(t *testing.T) { + s := newHistogramMetric("test", 1.0, "tag1,tag2") + m := s.flushUnsafe() + + assert.Equal(t, m.metricType, histogramAggregated) + assert.Equal(t, m.fvalues, []float64{1.0}) + assert.Equal(t, m.name, "test") + assert.Equal(t, m.stags, "tag1,tag2") + assert.Nil(t, m.tags) + + s.sample(21) + s.sample(123.45) + m = s.flushUnsafe() + + assert.Equal(t, m.metricType, histogramAggregated) + assert.Equal(t, m.fvalues, []float64{1.0, 21.0, 123.45}) + assert.Equal(t, m.name, "test") + assert.Equal(t, m.stags, "tag1,tag2") + assert.Nil(t, m.tags) +} + +func TestNewDistributionMetric(t *testing.T) { + s := newDistributionMetric("test", 1.0, "tag1,tag2") + assert.Equal(t, s.data, []float64{1.0}) + assert.Equal(t, s.name, "test") + assert.Equal(t, s.tags, "tag1,tag2") + assert.Equal(t, s.mtype, distributionAggregated) +} + +func TestDistributionMetricSample(t *testing.T) { + s := newDistributionMetric("test", 1.0, "tag1,tag2") + s.sample(123.45) + assert.Equal(t, s.data, []float64{1.0, 123.45}) + assert.Equal(t, s.name, "test") + assert.Equal(t, s.tags, "tag1,tag2") + assert.Equal(t, s.mtype, distributionAggregated) +} + +func TestFlushUnsafeDistributionMetricSample(t *testing.T) { + s := newDistributionMetric("test", 1.0, "tag1,tag2") + m := s.flushUnsafe() + + assert.Equal(t, m.metricType, distributionAggregated) + assert.Equal(t, m.fvalues, []float64{1.0}) + assert.Equal(t, m.name, "test") + assert.Equal(t, m.stags, "tag1,tag2") + assert.Nil(t, m.tags) + + s.sample(21) + s.sample(123.45) + m = s.flushUnsafe() + + assert.Equal(t, m.metricType, distributionAggregated) + assert.Equal(t, m.fvalues, []float64{1.0, 21.0, 123.45}) + assert.Equal(t, m.name, "test") + assert.Equal(t, m.stags, "tag1,tag2") + assert.Nil(t, m.tags) +} + +func TestNewTimingMetric(t *testing.T) { + s := newTimingMetric("test", 1.0, "tag1,tag2") + assert.Equal(t, s.data, []float64{1.0}) + assert.Equal(t, s.name, "test") + assert.Equal(t, s.tags, "tag1,tag2") + assert.Equal(t, s.mtype, timingAggregated) +} + +func TestTimingMetricSample(t *testing.T) { + s := newTimingMetric("test", 1.0, "tag1,tag2") + s.sample(123.45) + assert.Equal(t, s.data, []float64{1.0, 123.45}) + assert.Equal(t, s.name, "test") + assert.Equal(t, s.tags, "tag1,tag2") + assert.Equal(t, s.mtype, timingAggregated) +} + +func TestFlushUnsafeTimingMetricSample(t *testing.T) { + s := newTimingMetric("test", 1.0, "tag1,tag2") + m := s.flushUnsafe() + + assert.Equal(t, m.metricType, timingAggregated) + assert.Equal(t, m.fvalues, []float64{1.0}) + assert.Equal(t, m.name, "test") + assert.Equal(t, m.stags, "tag1,tag2") + assert.Nil(t, m.tags) + + s.sample(21) + s.sample(123.45) + m = s.flushUnsafe() + + assert.Equal(t, m.metricType, timingAggregated) + assert.Equal(t, m.fvalues, []float64{1.0, 21.0, 123.45}) + assert.Equal(t, m.name, "test") + assert.Equal(t, m.stags, "tag1,tag2") + assert.Nil(t, m.tags) +} diff --git a/statsd/options.go b/statsd/options.go index fef96c0ad..79187bd73 100644 --- a/statsd/options.go +++ b/statsd/options.go @@ -37,6 +37,8 @@ var ( DefaultAggregation = false // DefaultDevMode DefaultDevMode = false + // DefaultAgentMinVersion 5.0.0 is the oldest version still supported. + DefaultAgentMinVersion = "5.0.0" ) // Options contains the configuration options for a client. @@ -102,6 +104,11 @@ type Options struct { // DevMode enables the "dev" mode where the client sends much more // telemetry metrics to help troubleshooting the client behavior. DevMode bool + // AgentMinVersion represents the minimum Datadog Agent version used to + // receive metrics. Depending on the version different default + // values/behavior might be enable to leverage new feature and + // improvement in newer version of the Agent. + AgentMinVersion string } func resolveOptions(options []Option) (*Options, error) { @@ -121,6 +128,7 @@ func resolveOptions(options []Option) (*Options, error) { AggregationFlushInterval: DefaultAggregationFlushInterval, Aggregation: DefaultAggregation, DevMode: DefaultDevMode, + AgentMinVersion: DefaultAgentMinVersion, } for _, option := range options { @@ -293,3 +301,15 @@ func WithoutDevMode() Option { return nil } } + +// WithAgentMinVersion set the minimum version of the Agent receiving points +// from the client. It allows better defaults depending on the version of the +// Agent used. This will for example enable aggregation of histograms and +// distributions when WithClientSideAggregation is enabled along side an Agent +// >= 7.25. +func WithAgentMinVersion(version string) Option { + return func(o *Options) error { + o.AgentMinVersion = version + return nil + } +} diff --git a/statsd/options_test.go b/statsd/options_test.go index c4a4c2a9b..268eaa2dd 100644 --- a/statsd/options_test.go +++ b/statsd/options_test.go @@ -25,6 +25,7 @@ func TestDefaultOptions(t *testing.T) { assert.Equal(t, options.ChannelModeBufferSize, DefaultChannelModeBufferSize) assert.Equal(t, options.AggregationFlushInterval, DefaultAggregationFlushInterval) assert.Equal(t, options.Aggregation, DefaultAggregation) + assert.Equal(t, options.AgentMinVersion, DefaultAgentMinVersion) assert.Zero(t, options.TelemetryAddr) assert.False(t, options.DevMode) } @@ -42,6 +43,7 @@ func TestOptions(t *testing.T) { testChannelBufferSize := 500 testAggregationWindow := 10 * time.Second testTelemetryAddr := "localhost:1234" + testAgentMinVersion := "7.6.5" options, err := resolveOptions([]Option{ WithNamespace(testNamespace), @@ -60,6 +62,7 @@ func TestOptions(t *testing.T) { WithClientSideAggregation(), WithTelemetryAddr(testTelemetryAddr), WithDevMode(), + WithAgentMinVersion(testAgentMinVersion), }) assert.NoError(t, err) @@ -79,6 +82,7 @@ func TestOptions(t *testing.T) { assert.Equal(t, options.Aggregation, true) assert.Equal(t, options.TelemetryAddr, testTelemetryAddr) assert.True(t, options.DevMode) + assert.Equal(t, options.AgentMinVersion, testAgentMinVersion) } func TestResetOptions(t *testing.T) { diff --git a/statsd/statsd.go b/statsd/statsd.go index 42e4a4e9e..4d696ce02 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -6,19 +6,6 @@ adding tags and histograms and pushing upstream to Datadog. Refer to http://docs.datadoghq.com/guides/dogstatsd/ for information about DogStatsD. -Example Usage: - - // Create the client - c, err := statsd.New("127.0.0.1:8125") - if err != nil { - log.Fatal(err) - } - // Prefix every metric with the app name - c.Namespace = "flubber." - // Send the EC2 availability zone as a tag with every metric - c.Tags = append(c.Tags, "us-east-1a") - err = c.Gauge("request.duration", 1.2, nil, 1) - statsd is based on go-statsd-client. */ package statsd @@ -89,9 +76,12 @@ const ( gauge metricType = iota count histogram + histogramAggregated distribution + distributionAggregated set timing + timingAggregated event serviceCheck ) @@ -114,11 +104,13 @@ type metric struct { globalTags []string name string fvalue float64 + fvalues []float64 ivalue int64 svalue string evalue *Event scvalue *ServiceCheck tags []string + stags string rate float64 } @@ -206,6 +198,7 @@ type Client struct { closerLock sync.Mutex receiveMode ReceivingMode agg *aggregator + aggHistDist *aggregator options []Option addrOption string } @@ -286,6 +279,11 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro w.SetWriteTimeout(o.WriteTimeoutUDS) + af, err := computeAgentFeature(o.AgentMinVersion) + if err != nil { + return nil, fmt.Errorf("invalid AgentMinVersion: %s", err) + } + c := Client{ Namespace: o.Namespace, Tags: o.Tags, @@ -294,6 +292,10 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro if o.Aggregation { c.agg = newAggregator(&c) c.agg.start(o.AggregationFlushInterval) + + if af.multiValuePerMessage { + c.aggHistDist = c.agg + } } // Inject values of DD_* environment variables as global tags. @@ -488,6 +490,9 @@ func (c *Client) Histogram(name string, value float64, tags []string, rate float return ErrNoClient } atomic.AddUint64(&c.metrics.TotalMetricsHistogram, 1) + if c.aggHistDist != nil { + return c.agg.histogram(name, value, tags) + } return c.send(metric{metricType: histogram, name: name, fvalue: value, tags: tags, rate: rate}) } @@ -497,6 +502,9 @@ func (c *Client) Distribution(name string, value float64, tags []string, rate fl return ErrNoClient } atomic.AddUint64(&c.metrics.TotalMetricsDistribution, 1) + if c.aggHistDist != nil { + return c.agg.distribution(name, value, tags) + } return c.send(metric{metricType: distribution, name: name, fvalue: value, tags: tags, rate: rate}) } @@ -534,6 +542,9 @@ func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, r return ErrNoClient } atomic.AddUint64(&c.metrics.TotalMetricsTiming, 1) + if c.aggHistDist != nil { + return c.agg.timing(name, value, tags) + } return c.send(metric{metricType: timing, name: name, fvalue: value, tags: tags, rate: rate}) } diff --git a/statsd/telemetry.go b/statsd/telemetry.go index 946910253..3ff94f85e 100644 --- a/statsd/telemetry.go +++ b/statsd/telemetry.go @@ -128,6 +128,9 @@ func (t *telemetryClient) flush() []metric { telemetryCount("datadog.dogstatsd.client.aggregated_context_gauge", int64(aggMetrics.nbContextGauge)) telemetryCount("datadog.dogstatsd.client.aggregated_context_set", int64(aggMetrics.nbContextSet)) telemetryCount("datadog.dogstatsd.client.aggregated_context_count", int64(aggMetrics.nbContextCount)) + telemetryCount("datadog.dogstatsd.client.aggregated_context_histogram", int64(aggMetrics.nbContextHistogram)) + telemetryCount("datadog.dogstatsd.client.aggregated_context_distribution", int64(aggMetrics.nbContextDistribution)) + telemetryCount("datadog.dogstatsd.client.aggregated_context_timing", int64(aggMetrics.nbContextTiming)) } } diff --git a/statsd/telemetry_test.go b/statsd/telemetry_test.go index 5017b69bc..429dc6e7c 100644 --- a/statsd/telemetry_test.go +++ b/statsd/telemetry_test.go @@ -37,9 +37,12 @@ var devModeExpectedMetrics = map[string]int64{ } var devModeAggregationExpectedMetrics = map[string]int64{ - "datadog.dogstatsd.client.aggregated_context_gauge": 1, - "datadog.dogstatsd.client.aggregated_context_set": 1, - "datadog.dogstatsd.client.aggregated_context_count": 3, + "datadog.dogstatsd.client.aggregated_context_gauge": 1, + "datadog.dogstatsd.client.aggregated_context_set": 1, + "datadog.dogstatsd.client.aggregated_context_count": 3, + "datadog.dogstatsd.client.aggregated_context_histogram": 1, + "datadog.dogstatsd.client.aggregated_context_distribution": 1, + "datadog.dogstatsd.client.aggregated_context_timing": 2, } func TestNewTelemetry(t *testing.T) { @@ -139,7 +142,7 @@ func TestTelemetryWithGlobalTags(t *testing.T) { testTelemetry(t, telemetry, basicExpectedMetrics, expectedTelemetryTags) } -func TestTelemetryWithAggregation(t *testing.T) { +func TestTelemetryWithAggregationBasic(t *testing.T) { // disabling autoflush of the telemetry client, err := New("localhost:8125", WithoutTelemetry(), WithClientSideAggregation()) require.Nil(t, err) @@ -156,15 +159,32 @@ func TestTelemetryWithAggregation(t *testing.T) { testTelemetry(t, telemetry, expectedMetrics, basicExpectedTags) } +func TestTelemetryWithAggregationAllType(t *testing.T) { + // disabling autoflush of the telemetry + client, err := New("localhost:8125", WithoutTelemetry(), WithClientSideAggregation(), WithAgentMinVersion("7.25.0")) + require.Nil(t, err) + + telemetry := newTelemetryClient(client, "test_transport", false) + + expectedMetrics := map[string]int64{ + "datadog.dogstatsd.client.aggregated_context": 9, + } + for k, v := range basicExpectedMetrics { + expectedMetrics[k] = v + } + + testTelemetry(t, telemetry, expectedMetrics, basicExpectedTags) +} + func TestTelemetryWithAggregationDevMode(t *testing.T) { // disabling autoflush of the telemetry - client, err := New("localhost:8125", WithoutTelemetry(), WithClientSideAggregation(), WithDevMode()) + client, err := New("localhost:8125", WithoutTelemetry(), WithClientSideAggregation(), WithAgentMinVersion("7.25.0"), WithDevMode()) require.Nil(t, err) telemetry := newTelemetryClient(client, "test_transport", true) expectedMetrics := map[string]int64{ - "datadog.dogstatsd.client.aggregated_context": 5, + "datadog.dogstatsd.client.aggregated_context": 9, } for k, v := range basicExpectedMetrics { expectedMetrics[k] = v diff --git a/statsd/worker.go b/statsd/worker.go index 3fdf36344..81acb4ebe 100644 --- a/statsd/worker.go +++ b/statsd/worker.go @@ -65,6 +65,31 @@ func (w *worker) shouldSample(rate float64) bool { return true } +func (w *worker) writeAggregatedMetricUnsafe(m metric, metricSymbol []byte) error { + globalPos := 0 + + // first check how much data we can write to the buffer: + // +3 + len(metricSymbol) because the message will include '||#' before the tags + // +1 for the coma between the two set of tags + tagsSize := len(m.stags) + 4 + len(metricSymbol) + for _, t := range m.globalTags { + tagsSize += len(t) + 1 + } + + for { + pos, err := w.buffer.writeAggregated(metricSymbol, m.namespace, m.globalTags, m.name, m.fvalues[globalPos:], m.stags, tagsSize) + if err == errPartialWrite { + // We successfully wrote part of the histogram metrics. + // We flush the current buffer and finish the histogram + // in a new one. + w.flushUnsafe() + globalPos += pos + } else { + return err + } + } +} + func (w *worker) writeMetricUnsafe(m metric) error { switch m.metricType { case gauge: @@ -83,6 +108,12 @@ func (w *worker) writeMetricUnsafe(m metric) error { return w.buffer.writeEvent(*m.evalue, m.globalTags) case serviceCheck: return w.buffer.writeServiceCheck(*m.scvalue, m.globalTags) + case histogramAggregated: + return w.writeAggregatedMetricUnsafe(m, histogramSymbol) + case distributionAggregated: + return w.writeAggregatedMetricUnsafe(m, distributionSymbol) + case timingAggregated: + return w.writeAggregatedMetricUnsafe(m, timingSymbol) default: return nil } diff --git a/statsd/worker_test.go b/statsd/worker_test.go new file mode 100644 index 000000000..8cac98644 --- /dev/null +++ b/statsd/worker_test.go @@ -0,0 +1,244 @@ +package statsd + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func initWorker(bufferSize int) (*bufferPool, *sender, *worker) { + pool := newBufferPool(10, bufferSize, 5) + // manually create the sender so the sender loop is not started. All we + // need is the queue + s := &sender{ + queue: make(chan *statsdBuffer, 10), + pool: pool, + } + + w := newWorker(pool, s) + return pool, s, w +} + +func TestWorkerGauge(t *testing.T) { + _, s, w := initWorker(100) + + m := metric{ + metricType: gauge, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_gauge", + fvalue: 21, + tags: []string{"tag1", "tag2"}, + rate: 1, + } + err := w.processMetric(m) + assert.Nil(t, err) + + w.flush() + data := <-s.queue + assert.Equal(t, "namespace.test_gauge:21|g|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) +} + +func TestWorkerCount(t *testing.T) { + _, s, w := initWorker(100) + + m := metric{ + metricType: count, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_count", + ivalue: 21, + tags: []string{"tag1", "tag2"}, + rate: 1, + } + err := w.processMetric(m) + assert.Nil(t, err) + + w.flush() + data := <-s.queue + assert.Equal(t, "namespace.test_count:21|c|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) +} + +func TestWorkerHistogram(t *testing.T) { + _, s, w := initWorker(100) + + m := metric{ + metricType: histogram, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_histogram", + fvalue: 21, + tags: []string{"tag1", "tag2"}, + rate: 1, + } + err := w.processMetric(m) + assert.Nil(t, err) + + w.flush() + data := <-s.queue + assert.Equal(t, "namespace.test_histogram:21|h|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) +} + +func TestWorkerDistribution(t *testing.T) { + _, s, w := initWorker(100) + + m := metric{ + metricType: distribution, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_distribution", + fvalue: 21, + tags: []string{"tag1", "tag2"}, + rate: 1, + } + err := w.processMetric(m) + assert.Nil(t, err) + + w.flush() + data := <-s.queue + assert.Equal(t, "namespace.test_distribution:21|d|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) +} + +func TestWorkerSet(t *testing.T) { + _, s, w := initWorker(100) + + m := metric{ + metricType: set, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_set", + svalue: "value:1", + tags: []string{"tag1", "tag2"}, + rate: 1, + } + err := w.processMetric(m) + assert.Nil(t, err) + + w.flush() + data := <-s.queue + assert.Equal(t, "namespace.test_set:value:1|s|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) +} + +func TestWorkerTiming(t *testing.T) { + _, s, w := initWorker(100) + + m := metric{ + metricType: timing, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_timing", + fvalue: 1.2, + tags: []string{"tag1", "tag2"}, + rate: 1, + } + err := w.processMetric(m) + assert.Nil(t, err) + + w.flush() + data := <-s.queue + assert.Equal(t, "namespace.test_timing:1.200000|ms|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) +} + +func TestWorkerHistogramAggregated(t *testing.T) { + _, s, w := initWorker(100) + + m := metric{ + metricType: histogramAggregated, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_histogram", + fvalues: []float64{1.2}, + stags: "tag1,tag2", + rate: 1, + } + err := w.processMetric(m) + assert.Nil(t, err) + + w.flush() + data := <-s.queue + assert.Equal(t, "namespace.test_histogram:1.2|h|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) +} + +func TestWorkerHistogramAggregatedMultiple(t *testing.T) { + _, s, w := initWorker(100) + + m := metric{ + metricType: histogramAggregated, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_histogram", + fvalues: []float64{1.1, 2.2, 3.3, 4.4}, + stags: "tag1,tag2", + rate: 1, + } + err := w.processMetric(m) + assert.Nil(t, err) + + w.flush() + data := <-s.queue + assert.Equal(t, "namespace.test_histogram:1.1:2.2:3.3:4.4|h|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) + + // reducing buffer size so not all values fit in one packet + _, s, w = initWorker(70) + + err = w.processMetric(m) + assert.Nil(t, err) + + w.flush() + data = <-s.queue + assert.Equal(t, "namespace.test_histogram:1.1:2.2|h|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) + data = <-s.queue + assert.Equal(t, "namespace.test_histogram:3.3:4.4|h|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) +} + +func TestWorkerDistributionAggregated(t *testing.T) { + _, s, w := initWorker(100) + + m := metric{ + metricType: distributionAggregated, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_distribution", + fvalues: []float64{1.2}, + stags: "tag1,tag2", + rate: 1, + } + err := w.processMetric(m) + assert.Nil(t, err) + + w.flush() + data := <-s.queue + assert.Equal(t, "namespace.test_distribution:1.2|d|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) +} + +func TestWorkerDistributionAggregatedMultiple(t *testing.T) { + _, s, w := initWorker(100) + + m := metric{ + metricType: distributionAggregated, + namespace: "namespace.", + globalTags: []string{"globalTags", "globalTags2"}, + name: "test_distribution", + fvalues: []float64{1.1, 2.2, 3.3, 4.4}, + stags: "tag1,tag2", + rate: 1, + } + err := w.processMetric(m) + assert.Nil(t, err) + + w.flush() + data := <-s.queue + assert.Equal(t, "namespace.test_distribution:1.1:2.2:3.3:4.4|d|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) + + // reducing buffer size so not all values fit in one packet + _, s, w = initWorker(72) + + err = w.processMetric(m) + assert.Nil(t, err) + + w.flush() + data = <-s.queue + assert.Equal(t, "namespace.test_distribution:1.1:2.2|d|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) + data = <-s.queue + assert.Equal(t, "namespace.test_distribution:3.3:4.4|d|#globalTags,globalTags2,tag1,tag2", string(data.buffer)) +}