Skip to content

Commit

Permalink
feat: add temporality tag for statsd counters
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasv committed Apr 25, 2023
1 parent 302ac88 commit 931ded5
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 0 deletions.
6 changes: 6 additions & 0 deletions plugins/inputs/statsd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Reset timings & histograms every interval (default=true)
delete_timings = true

## Enable start_time field, which adds the start time of the metric accumulation
## You should use this together with enable_temporality_tag when using OpenTelemetry output.
enable_start_time_field = false
## Enable temporality tag adds temporality=delta or temporality=commulative tag to the metrics.
enable_temporality_tag = false

## Percentiles to calculate for timing & histogram stats.
percentiles = [50.0, 90.0, 99.0, 99.9, 99.95, 100.0]

Expand Down
6 changes: 6 additions & 0 deletions plugins/inputs/statsd/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
## Reset timings & histograms every interval (default=true)
delete_timings = true

## Enable start_time field, which adds the start time of the metric accumulation
## You should use this together with enable_temporality_tag when using OpenTelemetry output.
enable_start_time_field = false
## Enable temporality tag adds temporality=delta or temporality=commulative tag to the metrics.
enable_temporality_tag = false

## Percentiles to calculate for timing & histogram stats.
percentiles = [50.0, 90.0, 99.0, 99.9, 99.95, 100.0]

Expand Down
34 changes: 34 additions & 0 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ type Statsd struct {
DeleteTimings bool `toml:"delete_timings"`
ConvertNames bool `toml:"convert_names" deprecated:"0.12.0;2.0.0;use 'metric_separator' instead"`

EnableStartTimeField bool `toml:"enable_start_time_field"`
EnableTemporalityTag bool `toml:"enable_temporality_tag"`

// MetricSeparator is the separator between parts of the metric name.
MetricSeparator string `toml:"metric_separator"`
// This flag enables parsing of tags in the dogstatsd extension to the
Expand Down Expand Up @@ -156,6 +159,8 @@ type Statsd struct {
UDPBytesRecv selfstat.Stat
ParseTimeNS selfstat.Stat
PendingMessages selfstat.Stat

lastGatherTime time.Time
}

type input struct {
Expand Down Expand Up @@ -226,6 +231,9 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
fields := map[string]interface{}{
defaultFieldName: m.value,
}
if s.EnableStartTimeField {
fields["start_time"] = s.lastGatherTime.Format(time.RFC3339)
}
acc.AddFields(m.name, fields, m.tags, now)
}
s.distributions = make([]cacheddistributions, 0)
Expand All @@ -252,6 +260,9 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
fields[name] = stats.Percentile(float64(percentile))
}
}
if s.EnableStartTimeField {
fields["start_time"] = s.lastGatherTime.Format(time.RFC3339)
}

acc.AddFields(m.name, fields, m.tags, now)
}
Expand All @@ -260,13 +271,21 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
}

for _, m := range s.gauges {
if s.EnableStartTimeField && m.fields != nil {
m.fields["start_time"] = s.lastGatherTime.Format(time.RFC3339)
}

acc.AddGauge(m.name, m.fields, m.tags, now)
}
if s.DeleteGauges {
s.gauges = make(map[string]cachedgauge)
}

for _, m := range s.counters {
if s.EnableStartTimeField && m.fields != nil {
m.fields["start_time"] = s.lastGatherTime.Format(time.RFC3339)
}

acc.AddCounter(m.name, m.fields, m.tags, now)
}
if s.DeleteCounters {
Expand All @@ -278,6 +297,10 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
for field, set := range m.fields {
fields[field] = int64(len(set))
}
if s.EnableStartTimeField {
fields["start_time"] = s.lastGatherTime.Format(time.RFC3339)
}

acc.AddFields(m.name, fields, m.tags, now)
}
if s.DeleteSets {
Expand All @@ -286,6 +309,7 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {

s.expireCachedMetrics()

s.lastGatherTime = now
return nil
}

Expand All @@ -297,6 +321,7 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error {
s.acc = ac

// Make data structures
s.lastGatherTime = time.Now()
s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset)
Expand All @@ -305,6 +330,7 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error {

s.Lock()
defer s.Unlock()

//
tags := map[string]string{
"address": s.ServiceAddress,
Expand Down Expand Up @@ -644,6 +670,14 @@ func (s *Statsd) parseStatsdLine(line string) error {
switch m.mtype {
case "c":
m.tags["metric_type"] = "counter"

if s.EnableTemporalityTag {
if s.DeleteCounters {
m.tags["temporality"] = "delta"
} else {
m.tags["temporality"] = "cumulative"
}
}
case "g":
m.tags["metric_type"] = "gauge"
case "s":
Expand Down
65 changes: 65 additions & 0 deletions plugins/inputs/statsd/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1995,3 +1995,68 @@ func TestParse_InvalidAndRecoverIntegration(t *testing.T) {

require.NoError(t, conn.Close())
}

func TestParse_DeltaCounter(t *testing.T) {
statsd := Statsd{
Log: testutil.Logger{},
Protocol: "tcp",
ServiceAddress: "localhost:8125",
AllowedPendingMessages: 10000,
MaxTCPConnections: 250,
TCPKeepAlive: true,
NumberWorkerThreads: 5,
// Delete Counters causes Delta temporality to be added
DeleteCounters: true,
lastGatherTime: time.Now(),
EnableTemporalityTag: true,
EnableStartTimeField: true,
}

acc := &testutil.Accumulator{}
require.NoError(t, statsd.Start(acc))
defer statsd.Stop()

addr := statsd.TCPlistener.Addr().String()
conn, err := net.Dial("tcp", addr)
require.NoError(t, err)

_, err = conn.Write([]byte("cpu.time_idle:42|c\n"))
require.NoError(t, err)

require.Eventuallyf(t, func() bool {
require.NoError(t, statsd.Gather(acc))
acc.Lock()
defer acc.Unlock()

fmt.Println(acc.NMetrics())
expected := []telegraf.Metric{
testutil.MustMetric(
"cpu_time_idle",
map[string]string{
"metric_type": "counter",
"temporality": "delta",
},
map[string]interface{}{
"value": 42,
},
time.Now(),
telegraf.Counter,
),
}
got := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, got, testutil.IgnoreTime(), testutil.IgnoreFields("start_time"))

startTime, ok := got[0].GetField("start_time")
require.True(t, ok, "expected start_time field")

startTimeStr, ok := startTime.(string)
require.True(t, ok, "expected start_time field to be a string")

_, err = time.Parse(time.RFC3339Nano, startTimeStr)
require.NoError(t, err, "execpted start_time field to be in RFC3339Nano format")

return acc.NMetrics() >= 1
}, time.Second, 100*time.Millisecond, "Expected 1 metric found %d", acc.NMetrics())

require.NoError(t, conn.Close())
}

0 comments on commit 931ded5

Please sign in to comment.