From a8af81c8c93e551e70101b72683e8d33c619130b Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Thu, 12 Oct 2023 22:28:10 +0200 Subject: [PATCH] feat(aggregators.final): Allow to specify output strategy (#14062) --- plugins/aggregators/final/README.md | 27 ++++- plugins/aggregators/final/final.go | 39 +++++-- plugins/aggregators/final/final_test.go | 130 ++++++++++++++++++++++++ plugins/aggregators/final/sample.conf | 11 +- 4 files changed, 194 insertions(+), 13 deletions(-) diff --git a/plugins/aggregators/final/README.md b/plugins/aggregators/final/README.md index 563bac3b9ba07..4612d3e3392b5 100644 --- a/plugins/aggregators/final/README.md +++ b/plugins/aggregators/final/README.md @@ -29,12 +29,35 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. period = "30s" ## If true, the original metric will be dropped by the ## aggregator and will not get sent to the output plugins. - drop_original = false + # drop_original = false ## The time that a series is not updated until considering it final. - series_timeout = "5m" + # series_timeout = "5m" + + ## Output strategy, supported values: + ## timeout -- output a metric if no new input arrived for `series_timeout`; + ## useful for filling gaps in input data + ## periodic -- ouput the last received metric every `period`; useful to + ## downsample the input data + # output_strategy = "timeout" ``` +### Output strategy + +By default (`output_strategy = "timeout"`) the plugin will only emit a metric +for the period if the last received one is older than the series_timeout. This +will not guarantee a regular output of a `final` metric e.g. if the +series-timeout is a multiple of the gathering interval for an input. In this +case metric sporadically arrive in the timeout phase of the period and emitting +the `final` metric is surpressed. +This can be helpful to fill in gaps in the data if no input arrived in time. + +Contrary to this, `output_strategy = "periodic"` will always output a `final` +metric at the end of the period irrespectively of when the last metric arrived, +the `series_timout` is ignored. +This is helpful if you for example want to downsample input data arriving at a +high rate and require a periodic output of the `final` metric. + ## Metrics Measurement and tags are unchanged, fields are emitted with the suffix diff --git a/plugins/aggregators/final/final.go b/plugins/aggregators/final/final.go index e1ff2b6813849..f22f39e101394 100644 --- a/plugins/aggregators/final/final.go +++ b/plugins/aggregators/final/final.go @@ -3,6 +3,7 @@ package final import ( _ "embed" + "fmt" "time" "github.com/influxdata/telegraf" @@ -14,7 +15,8 @@ import ( var sampleConfig string type Final struct { - SeriesTimeout config.Duration `toml:"series_timeout"` + OutputStrategy string `toml:"output_strategy"` + SeriesTimeout config.Duration `toml:"series_timeout"` // The last metric for all series which are active metricCache map[uint64]telegraf.Metric @@ -23,7 +25,6 @@ type Final struct { func NewFinal() *Final { return &Final{ SeriesTimeout: config.Duration(5 * time.Minute), - metricCache: make(map[uint64]telegraf.Metric), } } @@ -31,6 +32,23 @@ func (*Final) SampleConfig() string { return sampleConfig } +func (m *Final) Init() error { + // Check options and set defaults + switch m.OutputStrategy { + case "": + m.OutputStrategy = "timeout" + case "timeout", "periodic": + // Do nothing, those are valid + default: + return fmt.Errorf("invalid 'output_strategy': %q", m.OutputStrategy) + } + + // Initialize the cache + m.metricCache = make(map[uint64]telegraf.Metric) + + return nil +} + func (m *Final) Add(in telegraf.Metric) { id := in.HashID() m.metricCache[id] = in @@ -41,14 +59,17 @@ func (m *Final) Push(acc telegraf.Accumulator) { acc.SetPrecision(time.Nanosecond) for id, metric := range m.metricCache { - if time.Since(metric.Time()) > time.Duration(m.SeriesTimeout) { - fields := map[string]interface{}{} - for _, field := range metric.FieldList() { - fields[field.Key+"_final"] = field.Value - } - acc.AddFields(metric.Name(), fields, metric.Tags(), metric.Time()) - delete(m.metricCache, id) + if m.OutputStrategy == "timeout" && time.Since(metric.Time()) <= time.Duration(m.SeriesTimeout) { + // We output on timeout but the last metric of the series was + // younger than that. So skip the output for this period. + continue + } + fields := map[string]interface{}{} + for _, field := range metric.FieldList() { + fields[field.Key+"_final"] = field.Value } + acc.AddFields(metric.Name(), fields, metric.Tags(), metric.Time()) + delete(m.metricCache, id) } } diff --git a/plugins/aggregators/final/final_test.go b/plugins/aggregators/final/final_test.go index 6b0c6e8e38c24..5a1a757ce594e 100644 --- a/plugins/aggregators/final/final_test.go +++ b/plugins/aggregators/final/final_test.go @@ -8,11 +8,13 @@ import ( "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" ) func TestSimple(t *testing.T) { acc := testutil.Accumulator{} final := NewFinal() + require.NoError(t, final.Init()) tags := map[string]string{"foo": "bar"} m1 := metric.New("m1", @@ -48,6 +50,7 @@ func TestSimple(t *testing.T) { func TestTwoTags(t *testing.T) { acc := testutil.Accumulator{} final := NewFinal() + require.NoError(t, final.Init()) tags1 := map[string]string{"foo": "bar"} tags2 := map[string]string{"foo": "baz"} @@ -94,6 +97,7 @@ func TestLongDifference(t *testing.T) { acc := testutil.Accumulator{} final := NewFinal() final.SeriesTimeout = config.Duration(30 * time.Second) + require.NoError(t, final.Init()) tags := map[string]string{"foo": "bar"} now := time.Now() @@ -142,3 +146,129 @@ func TestLongDifference(t *testing.T) { } testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.SortMetrics()) } + +func TestOutputStrategyInvalid(t *testing.T) { + final := &Final{ + OutputStrategy: "no way", + SeriesTimeout: config.Duration(30 * time.Second), + } + require.ErrorContains(t, final.Init(), `invalid 'output_strategy'`) +} + +func TestOutputStrategyTimeout(t *testing.T) { + final := &Final{ + OutputStrategy: "timeout", + SeriesTimeout: config.Duration(30 * time.Second), + } + require.NoError(t, final.Init()) + + now := time.Now() + tags := map[string]string{"foo": "bar"} + m1 := metric.New("m", + tags, + map[string]interface{}{"a": int64(1)}, + now.Add(time.Second*-290)) + m2 := metric.New("m", + tags, + map[string]interface{}{"a": int64(2)}, + now.Add(time.Second*-275)) + m3 := metric.New("m", + tags, + map[string]interface{}{"a": int64(3)}, + now.Add(time.Second*-100)) + m4 := metric.New("m", + tags, + map[string]interface{}{"a": int64(4)}, + now.Add(time.Second*-20)) + + var acc testutil.Accumulator + final.Add(m1) + final.Add(m2) + final.Push(&acc) + final.Add(m3) + final.Push(&acc) + final.Add(m4) + final.Push(&acc) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "m", + tags, + map[string]interface{}{ + "a_final": 2, + }, + now.Add(time.Second*-275), + ), + testutil.MustMetric( + "m", + tags, + map[string]interface{}{ + "a_final": 3, + }, + now.Add(time.Second*-100), + ), + } + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.SortMetrics()) +} + +func TestOutputStrategyPeriodic(t *testing.T) { + final := &Final{ + OutputStrategy: "periodic", + SeriesTimeout: config.Duration(30 * time.Second), + } + require.NoError(t, final.Init()) + + now := time.Now() + tags := map[string]string{"foo": "bar"} + m1 := metric.New("m", + tags, + map[string]interface{}{"a": int64(1)}, + now.Add(time.Second*-290)) + m2 := metric.New("m", + tags, + map[string]interface{}{"a": int64(2)}, + now.Add(time.Second*-275)) + m3 := metric.New("m", + tags, + map[string]interface{}{"a": int64(3)}, + now.Add(time.Second*-100)) + m4 := metric.New("m", + tags, + map[string]interface{}{"a": int64(4)}, + now.Add(time.Second*-20)) + + var acc testutil.Accumulator + final.Add(m1) + final.Add(m2) + final.Push(&acc) + final.Add(m3) + final.Push(&acc) + final.Add(m4) + final.Push(&acc) + + expected := []telegraf.Metric{ + metric.New( + "m", + tags, + map[string]interface{}{ + "a_final": 2, + }, + now.Add(time.Second*-275), + ), + metric.New( + "m", + tags, + map[string]interface{}{ + "a_final": 3, + }, + now.Add(time.Second*-100), + ), + metric.New( + "m", + tags, + map[string]interface{}{"a_final": int64(4)}, + now.Add(time.Second*-20), + ), + } + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.SortMetrics()) +} diff --git a/plugins/aggregators/final/sample.conf b/plugins/aggregators/final/sample.conf index e9226c8ce175b..27fd7396f5727 100644 --- a/plugins/aggregators/final/sample.conf +++ b/plugins/aggregators/final/sample.conf @@ -4,7 +4,14 @@ period = "30s" ## If true, the original metric will be dropped by the ## aggregator and will not get sent to the output plugins. - drop_original = false + # drop_original = false ## The time that a series is not updated until considering it final. - series_timeout = "5m" + # series_timeout = "5m" + + ## Output strategy, supported values: + ## timeout -- output a metric if no new input arrived for `series_timeout`; + ## useful for filling gaps in input data + ## periodic -- ouput the last received metric every `period`; useful to + ## downsample the input data + # output_strategy = "timeout"