From caeafcb5c4a040ebd75e1341973de124f92ca251 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Mon, 18 Nov 2024 19:10:57 +0100 Subject: [PATCH] move consecutiveFailures counter to metricSetWrapper.stat struct --- metricbeat/mb/module/wrapper.go | 39 ++++++++++--------- metricbeat/mb/module/wrapper_internal_test.go | 12 ++++++ 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index b42b93d6b49..4681976f2e1 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -38,9 +38,10 @@ import ( const ( // Expvar metric names. - successesKey = "success" - failuresKey = "failures" - eventsKey = "events" + successesKey = "success" + failuresKey = "failures" + eventsKey = "events" + consecutiveFailuresKey = "consecutive_failures" // Failure threshold config key failureThresholdKey = "failure_threshold" @@ -75,17 +76,16 @@ type metricSetWrapper struct { periodic bool // Set to true if this metricset is a periodic fetcher failureThreshold uint // threshold of consecutive errors needed to set the stream as degraded - - consecutiveErrors uint // consecutive errors counter } // stats bundles common metricset stats. type stats struct { - key string // full stats key - ref uint32 // number of modules/metricsets reusing stats instance - success *monitoring.Int // Total success events. - failures *monitoring.Int // Total error events. - events *monitoring.Int // Total events published. + key string // full stats key + ref uint32 // number of modules/metricsets reusing stats instance + success *monitoring.Int // Total success events. + failures *monitoring.Int // Total error events. + events *monitoring.Int // Total events published. + consecutiveFailures *monitoring.Uint // Consecutive failures fetching this metricset } // NewWrapper creates a new module and its associated metricsets based on the given configuration. @@ -313,20 +313,20 @@ func (msw *metricSetWrapper) Test(d testing.Driver) { func (msw *metricSetWrapper) handleFetchError(err error, reporter mb.PushReporterV2) { switch { case err == nil: - msw.consecutiveErrors = 0 + msw.stats.consecutiveFailures.Set(0) msw.module.UpdateStatus(status.Running, "") case errors.As(err, &mb.PartialMetricsError{}): reporter.Error(err) - msw.consecutiveErrors = 0 + msw.stats.consecutiveFailures.Set(0) // mark module as running if metrics are partially available and display the error message msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) default: reporter.Error(err) - msw.consecutiveErrors++ - if msw.failureThreshold > 0 && msw.consecutiveErrors >= msw.failureThreshold { + msw.stats.consecutiveFailures.Inc() + if msw.failureThreshold > 0 && msw.stats.consecutiveFailures != nil && uint(msw.stats.consecutiveFailures.Get()) >= msw.failureThreshold { // mark it as degraded for any other issue encountered msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) } @@ -461,11 +461,12 @@ func getMetricSetStats(module, name string) *stats { reg := monitoring.Default.NewRegistry(key) s := &stats{ - key: key, - ref: 1, - success: monitoring.NewInt(reg, successesKey), - failures: monitoring.NewInt(reg, failuresKey), - events: monitoring.NewInt(reg, eventsKey), + key: key, + ref: 1, + success: monitoring.NewInt(reg, successesKey), + failures: monitoring.NewInt(reg, failuresKey), + events: monitoring.NewInt(reg, eventsKey), + consecutiveFailures: monitoring.NewUint(reg, consecutiveFailuresKey), } fetches[key] = s diff --git a/metricbeat/mb/module/wrapper_internal_test.go b/metricbeat/mb/module/wrapper_internal_test.go index 923f73febdb..a9b242e55e2 100644 --- a/metricbeat/mb/module/wrapper_internal_test.go +++ b/metricbeat/mb/module/wrapper_internal_test.go @@ -315,6 +315,12 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { // run metricset synchronously wrappedMetricSet := moduleWrapper.MetricSets()[0] + + t.Cleanup(func() { + // release stats structure across testcases + releaseStats(wrappedMetricSet.stats) + }) + for i := 0; i < tc.iterations; i++ { wrappedMetricSet.fetch(context.TODO(), mr) if tc.assertIteration != nil { @@ -537,6 +543,12 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { // run metricset synchronously wrappedMetricSet := moduleWrapper.MetricSets()[0] + + t.Cleanup(func() { + // release stats structure across testcases + releaseStats(wrappedMetricSet.stats) + }) + for i := 0; i < tc.iterations; i++ { wrappedMetricSet.fetch(context.TODO(), mr) if tc.assertIteration != nil {