Skip to content

Commit

Permalink
move consecutiveFailures counter to metricSetWrapper.stat struct
Browse files Browse the repository at this point in the history
  • Loading branch information
pchila committed Nov 18, 2024
1 parent b32f635 commit caeafcb
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 19 deletions.
39 changes: 20 additions & 19 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ import (

const (
// Expvar metric names.
successesKey = "success"
failuresKey = "failures"
eventsKey = "events"
successesKey = "success"
failuresKey = "failures"
eventsKey = "events"
consecutiveFailuresKey = "consecutive_failures"

// Failure threshold config key
failureThresholdKey = "failure_threshold"
Expand Down Expand Up @@ -75,17 +76,16 @@ type metricSetWrapper struct {

periodic bool // Set to true if this metricset is a periodic fetcher
failureThreshold uint // threshold of consecutive errors needed to set the stream as degraded

consecutiveErrors uint // consecutive errors counter
}

// stats bundles common metricset stats.
type stats struct {
key string // full stats key
ref uint32 // number of modules/metricsets reusing stats instance
success *monitoring.Int // Total success events.
failures *monitoring.Int // Total error events.
events *monitoring.Int // Total events published.
key string // full stats key
ref uint32 // number of modules/metricsets reusing stats instance
success *monitoring.Int // Total success events.
failures *monitoring.Int // Total error events.
events *monitoring.Int // Total events published.
consecutiveFailures *monitoring.Uint // Consecutive failures fetching this metricset
}

// NewWrapper creates a new module and its associated metricsets based on the given configuration.
Expand Down Expand Up @@ -313,20 +313,20 @@ func (msw *metricSetWrapper) Test(d testing.Driver) {
func (msw *metricSetWrapper) handleFetchError(err error, reporter mb.PushReporterV2) {
switch {
case err == nil:
msw.consecutiveErrors = 0
msw.stats.consecutiveFailures.Set(0)
msw.module.UpdateStatus(status.Running, "")

case errors.As(err, &mb.PartialMetricsError{}):
reporter.Error(err)
msw.consecutiveErrors = 0
msw.stats.consecutiveFailures.Set(0)
// mark module as running if metrics are partially available and display the error message
msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)

default:
reporter.Error(err)
msw.consecutiveErrors++
if msw.failureThreshold > 0 && msw.consecutiveErrors >= msw.failureThreshold {
msw.stats.consecutiveFailures.Inc()
if msw.failureThreshold > 0 && msw.stats.consecutiveFailures != nil && uint(msw.stats.consecutiveFailures.Get()) >= msw.failureThreshold {
// mark it as degraded for any other issue encountered
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
}
Expand Down Expand Up @@ -461,11 +461,12 @@ func getMetricSetStats(module, name string) *stats {

reg := monitoring.Default.NewRegistry(key)
s := &stats{
key: key,
ref: 1,
success: monitoring.NewInt(reg, successesKey),
failures: monitoring.NewInt(reg, failuresKey),
events: monitoring.NewInt(reg, eventsKey),
key: key,
ref: 1,
success: monitoring.NewInt(reg, successesKey),
failures: monitoring.NewInt(reg, failuresKey),
events: monitoring.NewInt(reg, eventsKey),
consecutiveFailures: monitoring.NewUint(reg, consecutiveFailuresKey),
}

fetches[key] = s
Expand Down
12 changes: 12 additions & 0 deletions metricbeat/mb/module/wrapper_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,12 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) {

// run metricset synchronously
wrappedMetricSet := moduleWrapper.MetricSets()[0]

t.Cleanup(func() {
// release stats structure across testcases
releaseStats(wrappedMetricSet.stats)
})

for i := 0; i < tc.iterations; i++ {
wrappedMetricSet.fetch(context.TODO(), mr)
if tc.assertIteration != nil {
Expand Down Expand Up @@ -537,6 +543,12 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) {

// run metricset synchronously
wrappedMetricSet := moduleWrapper.MetricSets()[0]

t.Cleanup(func() {
// release stats structure across testcases
releaseStats(wrappedMetricSet.stats)
})

for i := 0; i < tc.iterations; i++ {
wrappedMetricSet.fetch(context.TODO(), mr)
if tc.assertIteration != nil {
Expand Down

0 comments on commit caeafcb

Please sign in to comment.