Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metricbeat: add configurable failure threshold before reporting streams as degraded #41570

Merged
merged 10 commits into from
Nov 19, 2024
86 changes: 55 additions & 31 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ import (
"github.com/elastic/elastic-agent-libs/testing"
)

// Expvar metric names.
const (
// Expvar metric names.
successesKey = "success"
failuresKey = "failures"
eventsKey = "events"

// Failure threshold config key
failureThresholdKey = "failure_threshold"
)

var (
Expand Down Expand Up @@ -70,7 +73,10 @@ type metricSetWrapper struct {
module *Wrapper // Parent Module.
stats *stats // stats for this MetricSet.

periodic bool // Set to true if this metricset is a periodic fetcher
periodic bool // Set to true if this metricset is a periodic fetcher
failureThreshold uint // threshold of consecutive errors needed to set the stream as degraded

consecutiveErrors uint // consecutive errors counter
}

// stats bundles common metricset stats.
Expand Down Expand Up @@ -106,11 +112,28 @@ func createWrapper(module mb.Module, metricSets []mb.MetricSet, options ...Optio
applyOption(wrapper)
}

failureThreshold := uint(1)

var streamHealthSettings struct {
FailureThreshold *uint `config:"failure_threshold"`
}

err := module.UnpackConfig(&streamHealthSettings)

if err != nil {
return nil, fmt.Errorf("unpacking raw config: %w", err)
}

if streamHealthSettings.FailureThreshold != nil {
failureThreshold = *streamHealthSettings.FailureThreshold
}

for i, metricSet := range metricSets {
wrapper.metricSets[i] = &metricSetWrapper{
MetricSet: metricSet,
module: wrapper,
stats: getMetricSetStats(wrapper.Name(), metricSet.Name()),
MetricSet: metricSet,
module: wrapper,
stats: getMetricSetStats(wrapper.Name(), metricSet.Name()),
failureThreshold: failureThreshold,
}
}
return wrapper, nil
Expand Down Expand Up @@ -254,35 +277,11 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
case mb.ReportingMetricSetV2Error:
reporter.StartFetchTimer()
err := fetcher.Fetch(reporter.V2())
if err != nil {
reporter.V2().Error(err)
if errors.As(err, &mb.PartialMetricsError{}) {
// mark module as running if metrics are partially available and display the error message
msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
} else {
// mark it as degraded for any other issue encountered
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
}
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
} else {
msw.module.UpdateStatus(status.Running, "")
}
msw.handleFetchError(err, reporter.V2())
case mb.ReportingMetricSetV2WithContext:
reporter.StartFetchTimer()
err := fetcher.Fetch(ctx, reporter.V2())
if err != nil {
reporter.V2().Error(err)
if errors.As(err, &mb.PartialMetricsError{}) {
// mark module as running if metrics are partially available and display the error message
msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
} else {
// mark it as degraded for any other issue encountered
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
}
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
} else {
msw.module.UpdateStatus(status.Running, "")
}
msw.handleFetchError(err, reporter.V2())
default:
panic(fmt.Sprintf("unexpected fetcher type for %v", msw))
}
Expand Down Expand Up @@ -311,6 +310,31 @@ func (msw *metricSetWrapper) Test(d testing.Driver) {
})
}

func (msw *metricSetWrapper) handleFetchError(err error, reporter mb.PushReporterV2) {
switch {

case errors.As(err, &mb.PartialMetricsError{}):
reporter.Error(err)
msw.consecutiveErrors = 0
// mark module as running if metrics are partially available and display the error message
msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)

case err != nil:
reporter.Error(err)
msw.consecutiveErrors++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I am not an expert in MetricBeat/MetricSets code but if fetch is called by multiple goroutines here we should go with atomics?! If it shouldn't be called like that maybe we should enrich the fetch godoc to mark it explicitly as a concurrent unsafe

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding from here is that each metricset is run within its own goroutine so I didn't add any extra synchronization around the counters.
@leehinman can maybe have a look and if there's the chance of a race condition we can easily switch to atomics

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets move the consecutiveErrors to the stats struct in metricSetWrapper, then make it a *monitoring.Int, that way we observe it and it will be atomic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leehinman I had a look at using a *monitoring.Int (which is just a struct wrapping an atomic.Int64) and I was quite surprised to find some of the new unit tests failing.
I then realized that stats keeps state from previous tests thanks to this and the getMetricSetStats() function.

Is it normal that we keep state from a previous wrapper instance (in the unit tests the metricsetWrapper is recreated along with the mocks for each testcase) just matching on metricset name ? I am not sure we want to do the same for the consecutive errors part just because a previous wrapper existed that failed all the time...
Any thoughts ? Is remembering previous states for previous wrappers of the metricset the standard behavior in metricbeat ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a quick zoom with @leehinman we determined that the stat struct is shared by design between different metricsetWrapper that may run the same metricset on different hosts from the same module config block to aggregate the success/failures/events counters (and now also the consecutiveFailures)
Fixed the tests (releasing correctly the stats structs) in caeafcb

if msw.failureThreshold > 0 && msw.consecutiveErrors >= msw.failureThreshold {
// mark it as degraded for any other issue encountered
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
}
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)

default:
msw.consecutiveErrors = 0
msw.module.UpdateStatus(status.Running, "")
}
}

type reporter interface {
StartFetchTimer()
V1() mb.PushReporter //nolint:staticcheck // PushReporter is deprecated but not removed
Expand Down
Loading
Loading