Skip to content

Commit

Permalink
Client side aggregation for distribution, histogram and timing
Browse files Browse the repository at this point in the history
  • Loading branch information
hush-hush committed Jan 5, 2021
1 parent 70f33d5 commit ea6839b
Show file tree
Hide file tree
Showing 17 changed files with 987 additions and 78 deletions.
33 changes: 33 additions & 0 deletions statsd/agent_version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package statsd

import (
"fmt"

"github.com/Masterminds/semver"
)

var (
// multiple value per message was introduce in Dogstatsd protocol 1.1
// implemented since Agent 6.25+ and 7.25+
multiValuePerMessageCond *semver.Constraints
)

func init() {
multiValuePerMessageCond, _ = semver.NewConstraint(">= 6.25 < 7.0.0 || >= 7.25")
}

type agentFeatures struct {
multiValuePerMessage bool
}

func computeAgentFeature(version string) (*agentFeatures, error) {
af := agentFeatures{}
v, err := semver.NewVersion(version)
if err != nil {
return nil, fmt.Errorf("could not parse version '%s': %s", version, err)
}

af.multiValuePerMessage = multiValuePerMessageCond.Check(v)

return &af, nil
}
21 changes: 21 additions & 0 deletions statsd/agent_version_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package statsd

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestAgentFeatureMultipleValue(t *testing.T) {
af, err := computeAgentFeature("6.25.0")
assert.Nil(t, err)
assert.True(t, af.multiValuePerMessage)

af, err = computeAgentFeature("7.25.0")
assert.Nil(t, err)
assert.True(t, af.multiValuePerMessage)

af, err = computeAgentFeature("7.24.0")
assert.Nil(t, err)
assert.False(t, af.multiValuePerMessage)
}
162 changes: 133 additions & 29 deletions statsd/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,35 @@ import (
)

type (
countsMap map[string]*countMetric
gaugesMap map[string]*gaugeMetric
setsMap map[string]*setMetric
countsMap map[string]*countMetric
gaugesMap map[string]*gaugeMetric
setsMap map[string]*setMetric
histogramMap map[string]*histogramMetric
distributionMap map[string]*distributionMetric
timingMap map[string]*timingMetric
)

type aggregator struct {
nbContextGauge int32
nbContextCount int32
nbContextSet int32

countsM sync.RWMutex
gaugesM sync.RWMutex
setsM sync.RWMutex

gauges gaugesMap
counts countsMap
sets setsMap
nbContextGauge int32
nbContextCount int32
nbContextSet int32
nbContextHistogram int32
nbContextDistribution int32
nbContextTiming int32

countsM sync.RWMutex
gaugesM sync.RWMutex
setsM sync.RWMutex
histogramsM sync.RWMutex
distributionM sync.RWMutex
timingM sync.RWMutex

gauges gaugesMap
counts countsMap
sets setsMap
histograms histogramMap
distributions distributionMap
timings timingMap

closed chan struct{}
exited chan struct{}
Expand All @@ -33,20 +45,26 @@ type aggregator struct {
}

type aggregatorMetrics struct {
nbContext int32
nbContextGauge int32
nbContextCount int32
nbContextSet int32
nbContext int32
nbContextGauge int32
nbContextCount int32
nbContextSet int32
nbContextHistogram int32
nbContextDistribution int32
nbContextTiming int32
}

func newAggregator(c *Client) *aggregator {
return &aggregator{
client: c,
counts: countsMap{},
gauges: gaugesMap{},
sets: setsMap{},
closed: make(chan struct{}),
exited: make(chan struct{}),
client: c,
counts: countsMap{},
gauges: gaugesMap{},
sets: setsMap{},
histograms: histogramMap{},
distributions: distributionMap{},
timings: timingMap{},
closed: make(chan struct{}),
exited: make(chan struct{}),
}
}

Expand Down Expand Up @@ -84,12 +102,15 @@ func (a *aggregator) flushTelemetryMetrics() *aggregatorMetrics {
}

am := &aggregatorMetrics{
nbContextGauge: atomic.SwapInt32(&a.nbContextGauge, 0),
nbContextCount: atomic.SwapInt32(&a.nbContextCount, 0),
nbContextSet: atomic.SwapInt32(&a.nbContextSet, 0),
nbContextGauge: atomic.SwapInt32(&a.nbContextGauge, 0),
nbContextCount: atomic.SwapInt32(&a.nbContextCount, 0),
nbContextSet: atomic.SwapInt32(&a.nbContextSet, 0),
nbContextHistogram: atomic.SwapInt32(&a.nbContextHistogram, 0),
nbContextDistribution: atomic.SwapInt32(&a.nbContextDistribution, 0),
nbContextTiming: atomic.SwapInt32(&a.nbContextTiming, 0),
}

am.nbContext = am.nbContextGauge + am.nbContextCount + am.nbContextSet
am.nbContext = am.nbContextGauge + am.nbContextCount + am.nbContextSet + am.nbContextHistogram + am.nbContextDistribution + am.nbContextTiming
return am
}

Expand Down Expand Up @@ -126,14 +147,49 @@ func (a *aggregator) flushMetrics() []metric {
metrics = append(metrics, c.flushUnsafe())
}

a.histogramsM.Lock()
histograms := a.histograms
a.histograms = histogramMap{}
a.histogramsM.Unlock()

for _, h := range histograms {
metrics = append(metrics, h.flushUnsafe())
}

a.distributionM.Lock()
distributions := a.distributions
a.distributions = distributionMap{}
a.distributionM.Unlock()

for _, d := range distributions {
metrics = append(metrics, d.flushUnsafe())
}

a.timingM.Lock()
timings := a.timings
a.timings = timingMap{}
a.timingM.Unlock()

for _, t := range timings {
metrics = append(metrics, t.flushUnsafe())
}

atomic.AddInt32(&a.nbContextCount, int32(len(counts)))
atomic.AddInt32(&a.nbContextGauge, int32(len(gauges)))
atomic.AddInt32(&a.nbContextSet, int32(len(sets)))
atomic.AddInt32(&a.nbContextHistogram, int32(len(histograms)))
atomic.AddInt32(&a.nbContextDistribution, int32(len(distributions)))
atomic.AddInt32(&a.nbContextTiming, int32(len(timings)))
return metrics
}

func getContext(name string, tags []string) string {
return name + ":" + strings.Join(tags, ",")
return name + ":" + strings.Join(tags, tagSeparatorSymbol)
}

func getContextAndTags(name string, tags []string) (string, string) {
stringTags := strings.Join(tags, tagSeparatorSymbol)
return name + ":" + stringTags, stringTags
}

func (a *aggregator) count(name string, value int64, tags []string) error {
Expand Down Expand Up @@ -185,3 +241,51 @@ func (a *aggregator) set(name string, value string, tags []string) error {
a.setsM.Unlock()
return nil
}

func (a *aggregator) histogram(name string, value float64, tags []string) error {
context, stringTags := getContextAndTags(name, tags)
a.histogramsM.RLock()
if histogram, found := a.histograms[context]; found {
histogram.sample(value)
a.histogramsM.RUnlock()
return nil
}
a.histogramsM.RUnlock()

a.histogramsM.Lock()
a.histograms[context] = newHistogramMetric(name, value, stringTags)
a.histogramsM.Unlock()
return nil
}

func (a *aggregator) distribution(name string, value float64, tags []string) error {
context, stringTags := getContextAndTags(name, tags)
a.distributionM.RLock()
if distribution, found := a.distributions[context]; found {
distribution.sample(value)
a.distributionM.RUnlock()
return nil
}
a.distributionM.RUnlock()

a.distributionM.Lock()
a.distributions[context] = newDistributionMetric(name, value, stringTags)
a.distributionM.Unlock()
return nil
}

func (a *aggregator) timing(name string, value float64, tags []string) error {
context, stringTags := getContextAndTags(name, tags)
a.timingM.RLock()
if distribution, found := a.timings[context]; found {
distribution.sample(value)
a.timingM.RUnlock()
return nil
}
a.timingM.RUnlock()

a.timingM.Lock()
a.timings[context] = newTimingMetric(name, value, stringTags)
a.timingM.Unlock()
return nil
}
Loading

0 comments on commit ea6839b

Please sign in to comment.