Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metricbeat: add configurable failure threshold before reporting streams as degraded #41570

Merged
merged 10 commits into from
Nov 19, 2024
113 changes: 69 additions & 44 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@ import (
"github.com/elastic/elastic-agent-libs/testing"
)

// Expvar metric names.
const (
successesKey = "success"
failuresKey = "failures"
eventsKey = "events"
// Expvar metric names.
successesKey = "success"
failuresKey = "failures"
eventsKey = "events"
consecutiveFailuresKey = "consecutive_failures"

// Failure threshold config key
failureThresholdKey = "failure_threshold"
)

var (
Expand Down Expand Up @@ -70,16 +74,18 @@ type metricSetWrapper struct {
module *Wrapper // Parent Module.
stats *stats // stats for this MetricSet.

periodic bool // Set to true if this metricset is a periodic fetcher
periodic bool // Set to true if this metricset is a periodic fetcher
failureThreshold uint // threshold of consecutive errors needed to set the stream as degraded
}

// stats bundles common metricset stats.
type stats struct {
key string // full stats key
ref uint32 // number of modules/metricsets reusing stats instance
success *monitoring.Int // Total success events.
failures *monitoring.Int // Total error events.
events *monitoring.Int // Total events published.
key string // full stats key
ref uint32 // number of modules/metricsets reusing stats instance
success *monitoring.Int // Total success events.
failures *monitoring.Int // Total error events.
events *monitoring.Int // Total events published.
consecutiveFailures *monitoring.Uint // Consecutive failures fetching this metricset
}

// NewWrapper creates a new module and its associated metricsets based on the given configuration.
Expand All @@ -106,11 +112,28 @@ func createWrapper(module mb.Module, metricSets []mb.MetricSet, options ...Optio
applyOption(wrapper)
}

failureThreshold := uint(1)

var streamHealthSettings struct {
FailureThreshold *uint `config:"failure_threshold"`
}

err := module.UnpackConfig(&streamHealthSettings)

if err != nil {
return nil, fmt.Errorf("unpacking raw config: %w", err)
}

if streamHealthSettings.FailureThreshold != nil {
failureThreshold = *streamHealthSettings.FailureThreshold
}

for i, metricSet := range metricSets {
wrapper.metricSets[i] = &metricSetWrapper{
MetricSet: metricSet,
module: wrapper,
stats: getMetricSetStats(wrapper.Name(), metricSet.Name()),
MetricSet: metricSet,
module: wrapper,
stats: getMetricSetStats(wrapper.Name(), metricSet.Name()),
failureThreshold: failureThreshold,
}
}
return wrapper, nil
Expand Down Expand Up @@ -254,35 +277,11 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
case mb.ReportingMetricSetV2Error:
reporter.StartFetchTimer()
err := fetcher.Fetch(reporter.V2())
if err != nil {
reporter.V2().Error(err)
if errors.As(err, &mb.PartialMetricsError{}) {
// mark module as running if metrics are partially available and display the error message
msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
} else {
// mark it as degraded for any other issue encountered
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
}
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
} else {
msw.module.UpdateStatus(status.Running, "")
}
msw.handleFetchError(err, reporter.V2())
case mb.ReportingMetricSetV2WithContext:
reporter.StartFetchTimer()
err := fetcher.Fetch(ctx, reporter.V2())
if err != nil {
reporter.V2().Error(err)
if errors.As(err, &mb.PartialMetricsError{}) {
// mark module as running if metrics are partially available and display the error message
msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
} else {
// mark it as degraded for any other issue encountered
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
}
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
} else {
msw.module.UpdateStatus(status.Running, "")
}
msw.handleFetchError(err, reporter.V2())
default:
panic(fmt.Sprintf("unexpected fetcher type for %v", msw))
}
Expand Down Expand Up @@ -311,6 +310,31 @@ func (msw *metricSetWrapper) Test(d testing.Driver) {
})
}

func (msw *metricSetWrapper) handleFetchError(err error, reporter mb.PushReporterV2) {
switch {
case err == nil:
msw.stats.consecutiveFailures.Set(0)
msw.module.UpdateStatus(status.Running, "")

case errors.As(err, &mb.PartialMetricsError{}):
reporter.Error(err)
msw.stats.consecutiveFailures.Set(0)
// mark module as running if metrics are partially available and display the error message
msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)

default:
reporter.Error(err)
msw.stats.consecutiveFailures.Inc()
if msw.failureThreshold > 0 && msw.stats.consecutiveFailures != nil && uint(msw.stats.consecutiveFailures.Get()) >= msw.failureThreshold {
// mark it as degraded for any other issue encountered
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
}
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)

}
}

type reporter interface {
StartFetchTimer()
V1() mb.PushReporter //nolint:staticcheck // PushReporter is deprecated but not removed
Expand Down Expand Up @@ -437,11 +461,12 @@ func getMetricSetStats(module, name string) *stats {

reg := monitoring.Default.NewRegistry(key)
s := &stats{
key: key,
ref: 1,
success: monitoring.NewInt(reg, successesKey),
failures: monitoring.NewInt(reg, failuresKey),
events: monitoring.NewInt(reg, eventsKey),
key: key,
ref: 1,
success: monitoring.NewInt(reg, successesKey),
failures: monitoring.NewInt(reg, failuresKey),
events: monitoring.NewInt(reg, eventsKey),
consecutiveFailures: monitoring.NewUint(reg, consecutiveFailuresKey),
}

fetches[key] = s
Expand Down
Loading
Loading