Skip to content

Commit

Permalink
fix: add temporality and start_time tag for statsd metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasv committed Apr 14, 2023
1 parent 290a0b0 commit 577179d
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 3 deletions.
23 changes: 23 additions & 0 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ type Statsd struct {
UDPBytesRecv selfstat.Stat
ParseTimeNS selfstat.Stat
PendingMessages selfstat.Stat

lastGatherTime time.Time
}

type input struct {
Expand Down Expand Up @@ -226,6 +228,9 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
fields := map[string]interface{}{
defaultFieldName: m.value,
}
if m.tags != nil {
m.tags["start_time"] = s.lastGatherTime.Format(time.RFC3339Nano)
}
acc.AddFields(m.name, fields, m.tags, now)
}
s.distributions = make([]cacheddistributions, 0)
Expand All @@ -252,6 +257,9 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
fields[name] = stats.Percentile(float64(percentile))
}
}
if m.tags != nil {
m.tags["start_time"] = s.lastGatherTime.Format(time.RFC3339Nano)
}

acc.AddFields(m.name, fields, m.tags, now)
}
Expand All @@ -260,13 +268,21 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
}

for _, m := range s.gauges {
if m.tags != nil {
m.tags["start_time"] = s.lastGatherTime.Format(time.RFC3339Nano)
}

acc.AddGauge(m.name, m.fields, m.tags, now)
}
if s.DeleteGauges {
s.gauges = make(map[string]cachedgauge)
}

for _, m := range s.counters {
if m.tags != nil {
m.tags["start_time"] = s.lastGatherTime.Format(time.RFC3339Nano)
}

acc.AddCounter(m.name, m.fields, m.tags, now)
}
if s.DeleteCounters {
Expand All @@ -278,6 +294,10 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
for field, set := range m.fields {
fields[field] = int64(len(set))
}
if m.tags != nil {
m.tags["start_time"] = s.lastGatherTime.Format(time.RFC3339Nano)
}

acc.AddFields(m.name, fields, m.tags, now)
}
if s.DeleteSets {
Expand All @@ -286,6 +306,7 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {

s.expireCachedMetrics()

s.lastGatherTime = now
return nil
}

Expand All @@ -297,6 +318,7 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error {
s.acc = ac

// Make data structures
s.lastGatherTime = time.Now()
s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset)
Expand All @@ -305,6 +327,7 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error {

s.Lock()
defer s.Unlock()

//
tags := map[string]string{
"address": s.ServiceAddress,
Expand Down
69 changes: 66 additions & 3 deletions plugins/inputs/statsd/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,7 @@ func TestParse_DataDogTags(t *testing.T) {
"environment": "prod",
"host": "localhost",
"metric_type": "counter",
"temporality": "cumulative",
},
map[string]interface{}{
"value": 1,
Expand Down Expand Up @@ -1070,6 +1071,7 @@ func TestParse_DataDogTags(t *testing.T) {
"cpu",
map[string]string{
"metric_type": "counter",
"temporality": "cumulative",
},
map[string]interface{}{
"value": 42,
Expand All @@ -1092,7 +1094,7 @@ func TestParse_DataDogTags(t *testing.T) {
require.NoError(t, s.Gather(&acc))

testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics(),
testutil.SortMetrics(), testutil.IgnoreTime())
testutil.SortMetrics(), testutil.IgnoreTime(), testutil.IgnoreTags("start_time"))
})
}
}
Expand Down Expand Up @@ -1196,6 +1198,7 @@ func TestCachesExpireAfterMaxTTL(t *testing.T) {
"valid",
map[string]string{
"metric_type": "counter",
"temporality": "cumulative",
},
map[string]interface{}{
"value": 90,
Expand All @@ -1207,6 +1210,7 @@ func TestCachesExpireAfterMaxTTL(t *testing.T) {
"valid",
map[string]string{
"metric_type": "counter",
"temporality": "cumulative",
},
map[string]interface{}{
"value": 90,
Expand All @@ -1218,6 +1222,7 @@ func TestCachesExpireAfterMaxTTL(t *testing.T) {
"valid",
map[string]string{
"metric_type": "counter",
"temporality": "cumulative",
},
map[string]interface{}{
"value": 45,
Expand All @@ -1228,6 +1233,7 @@ func TestCachesExpireAfterMaxTTL(t *testing.T) {
},
acc.GetTelegrafMetrics(),
testutil.IgnoreTime(),
testutil.IgnoreTags("start_time"),
)
}

Expand Down Expand Up @@ -1755,6 +1761,7 @@ func TestTCP(t *testing.T) {
"cpu_time_idle",
map[string]string{
"metric_type": "counter",
"temporality": "cumulative",
},
map[string]interface{}{
"value": 42,
Expand All @@ -1765,6 +1772,7 @@ func TestTCP(t *testing.T) {
},
acc.GetTelegrafMetrics(),
testutil.IgnoreTime(),
testutil.IgnoreTags("start_time"),
)
}

Expand Down Expand Up @@ -1800,6 +1808,7 @@ func TestUdp(t *testing.T) {
"cpu_time_idle",
map[string]string{
"metric_type": "counter",
"temporality": "cumulative",
},
map[string]interface{}{
"value": 42,
Expand All @@ -1810,6 +1819,7 @@ func TestUdp(t *testing.T) {
},
acc.GetTelegrafMetrics(),
testutil.IgnoreTime(),
testutil.IgnoreTags("start_time"),
)
}

Expand Down Expand Up @@ -1983,7 +1993,7 @@ func TestParse_InvalidAndRecoverIntegration(t *testing.T) {
"cpu_time_idle",
map[string]string{
"metric_type": "counter",
"aggregation": "cumulative",
"temporality": "cumulative",
},
map[string]interface{}{
"value": 42,
Expand All @@ -1992,7 +2002,60 @@ func TestParse_InvalidAndRecoverIntegration(t *testing.T) {
telegraf.Counter,
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime(), testutil.IgnoreTags("start_time"))

require.NoError(t, conn.Close())
}

func TestParse_DeltaCounter(t *testing.T) {
statsd := Statsd{
Log: testutil.Logger{},
Protocol: "tcp",
ServiceAddress: "localhost:8125",
AllowedPendingMessages: 10000,
MaxTCPConnections: 250,
TCPKeepAlive: true,
NumberWorkerThreads: 5,
// Delete Counters causes Delta temporality to be added
DeleteCounters: true,
lastGatherTime: time.Now(),
}

acc := &testutil.Accumulator{}
require.NoError(t, statsd.Start(acc))
defer statsd.Stop()

addr := statsd.TCPlistener.Addr().String()
conn, err := net.Dial("tcp", addr)
require.NoError(t, err)

_, err = conn.Write([]byte("cpu.time_idle:42|c\n"))
require.NoError(t, err)

require.Eventuallyf(t, func() bool {
require.NoError(t, statsd.Gather(acc))
acc.Lock()
defer acc.Unlock()

fmt.Println(acc.NMetrics())
expected := []telegraf.Metric{
testutil.MustMetric(
"cpu_time_idle",
map[string]string{
"metric_type": "counter",
"temporality": "delta",
},
map[string]interface{}{
"value": 42,
},
time.Now(),
telegraf.Counter,
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime(), testutil.IgnoreTags("start_time"))

return acc.NMetrics() >= 1
}, time.Second, 100*time.Millisecond, "Expected 1 metric found %d", acc.NMetrics())

require.NoError(t, conn.Close())
}
15 changes: 15 additions & 0 deletions testutil/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,21 @@ func IgnoreFields(names ...string) cmp.Option {
)
}

// IgnoreTags disables comparison of the tags with the given names.
// The field-names are case-sensitive!
func IgnoreTags(names ...string) cmp.Option {
return cmpopts.IgnoreSliceElements(
func(f *telegraf.Tag) bool {
for _, n := range names {
if f.Key == n {
return true
}
}
return false
},
)
}

// MetricEqual returns true if the metrics are equal.
func MetricEqual(expected, actual telegraf.Metric, opts ...cmp.Option) bool {
var lhs, rhs *metricDiff
Expand Down

0 comments on commit 577179d

Please sign in to comment.