From 6e28f299c7b0fb7c106bc0036108aa4a0b7c1d29 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Fri, 8 Nov 2024 16:58:11 +0100 Subject: [PATCH 01/10] Metricbeat: add configurable failure threshold before reporting streams as degraded --- metricbeat/mb/module/wrapper.go | 72 +-- metricbeat/mb/module/wrapper_internal_test.go | 419 ++++++++++++++++++ 2 files changed, 461 insertions(+), 30 deletions(-) create mode 100644 metricbeat/mb/module/wrapper_internal_test.go diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index 95185817f5f..a688c5d2365 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -70,7 +70,10 @@ type metricSetWrapper struct { module *Wrapper // Parent Module. stats *stats // stats for this MetricSet. - periodic bool // Set to true if this metricset is a periodic fetcher + periodic bool // Set to true if this metricset is a periodic fetcher + failureThreshold int // threshold of consecutive errors needed to set the stream as degraded + + consecutiveErrors int // consecutive errors counter } // stats bundles common metricset stats. @@ -102,15 +105,27 @@ func createWrapper(module mb.Module, metricSets []mb.MetricSet, options ...Optio metricSets: make([]*metricSetWrapper, len(metricSets)), } + //FIXME provide proper struct + var hs struct { + FailureThreshold int `config:"failureThreshold"` + } + + err := module.UnpackConfig(&hs) + + if err != nil { + return nil, fmt.Errorf("unpacking raw config: %w", err) + } + for _, applyOption := range options { applyOption(wrapper) } for i, metricSet := range metricSets { wrapper.metricSets[i] = &metricSetWrapper{ - MetricSet: metricSet, - module: wrapper, - stats: getMetricSetStats(wrapper.Name(), metricSet.Name()), + MetricSet: metricSet, + module: wrapper, + stats: getMetricSetStats(wrapper.Name(), metricSet.Name()), + failureThreshold: hs.FailureThreshold, } } return wrapper, nil @@ -254,35 +269,11 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) { case mb.ReportingMetricSetV2Error: reporter.StartFetchTimer() err := fetcher.Fetch(reporter.V2()) - if err != nil { - reporter.V2().Error(err) - if errors.As(err, &mb.PartialMetricsError{}) { - // mark module as running if metrics are partially available and display the error message - msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) - } else { - // mark it as degraded for any other issue encountered - msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) - } - logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) - } else { - msw.module.UpdateStatus(status.Running, "") - } + msw.handleFetchError(err, reporter.V2()) case mb.ReportingMetricSetV2WithContext: reporter.StartFetchTimer() err := fetcher.Fetch(ctx, reporter.V2()) - if err != nil { - reporter.V2().Error(err) - if errors.As(err, &mb.PartialMetricsError{}) { - // mark module as running if metrics are partially available and display the error message - msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) - } else { - // mark it as degraded for any other issue encountered - msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) - } - logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) - } else { - msw.module.UpdateStatus(status.Running, "") - } + msw.handleFetchError(err, reporter.V2()) default: panic(fmt.Sprintf("unexpected fetcher type for %v", msw)) } @@ -311,6 +302,27 @@ func (msw *metricSetWrapper) Test(d testing.Driver) { }) } +func (msw *metricSetWrapper) handleFetchError(err error, reporter mb.PushReporterV2) { + if err != nil { + reporter.Error(err) + if errors.As(err, &mb.PartialMetricsError{}) { + msw.consecutiveErrors = 0 + // mark module as running if metrics are partially available and display the error message + msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) + } else { + msw.consecutiveErrors++ + if msw.failureThreshold >= 0 && msw.consecutiveErrors > msw.failureThreshold { + // mark it as degraded for any other issue encountered + msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) + } + } + logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) + } else { + msw.consecutiveErrors = 0 + msw.module.UpdateStatus(status.Running, "") + } +} + type reporter interface { StartFetchTimer() V1() mb.PushReporter //nolint:staticcheck // PushReporter is deprecated but not removed diff --git a/metricbeat/mb/module/wrapper_internal_test.go b/metricbeat/mb/module/wrapper_internal_test.go new file mode 100644 index 00000000000..a862b3da32f --- /dev/null +++ b/metricbeat/mb/module/wrapper_internal_test.go @@ -0,0 +1,419 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package module + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/management/status" + "github.com/elastic/beats/v7/metricbeat/mb" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +const mockModuleName = "MockModule" +const mockMetricSetName = "MockMetricSet" +const mockMetricSetWithContextName = "MockMetricSetWithContext" + +// mockReportingFetcher +type mockReportingFetcher struct { + mb.BaseMetricSet + mock.Mock +} + +func (mrf *mockReportingFetcher) Fetch(r mb.ReporterV2) error { + args := mrf.Called(r) + return args.Error(0) +} + +// mockReportingFetcherWithContext +type mockReportingFetcherWithContext struct { + mb.BaseMetricSet + mock.Mock +} + +func (mrf *mockReportingFetcherWithContext) Fetch(ctx context.Context, r mb.ReporterV2) error { + args := mrf.Called(ctx, r) + return args.Error(0) +} + +// mockReporter +type mockReporter struct { + mock.Mock +} + +func (mr *mockReporter) StartFetchTimer() { + mr.Called() +} + +func (mr *mockReporter) V1() mb.PushReporter { //nolint:staticcheck // PushReporter is deprecated but not removed + args := mr.Called() + return args.Get(0).(mb.PushReporter) +} + +func (mr *mockReporter) V2() mb.PushReporterV2 { + args := mr.Called() + return args.Get(0).(mb.PushReporterV2) +} + +// mockPushReporterV2 +type mockPushReporterV2 struct { + mock.Mock +} + +func (mpr *mockPushReporterV2) Event(event mb.Event) bool { + args := mpr.Called(event) + return args.Bool(0) +} + +func (mpr *mockPushReporterV2) Error(err error) bool { + args := mpr.Called(err) + return args.Bool(0) +} + +func (mpr *mockPushReporterV2) Done() <-chan struct{} { + args := mpr.Called() + return args.Get(0).(<-chan struct{}) +} + +// mockStatusReporterV2 +type mockStatusReporter struct { + mock.Mock +} + +func (m *mockStatusReporter) UpdateStatus(status status.Status, msg string) { + m.Called(status, msg) +} + +func TestWrapperHandleFetchErrorSync(t *testing.T) { + + fetchError := errors.New("fetch has gone all wrong") + + type setupFunc func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) + type postIterationAssertFunc func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) + + testcases := []struct { + name string + config *conf.C + setup setupFunc + iterations int + assertIteration postIterationAssertFunc + }{ + { + name: "no failureThreshold: status DEGRADED after first error", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will immediately error out + fetcher.On("Fetch", pushReporter).Return(fetchError).Once() + // expect the error to be propagated via the pushReporter + pushReporter.On("Error", fetchError).Return(true).Once() + // expect the status degraded to be set + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() + }, + iterations: 1, + assertIteration: nil, + }, + { + name: "no failureThreshold: status DEGRADED after first error, reset to Running after first successful fetch", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will immediately error out 3 times + fetcher.On("Fetch", pushReporter).Return(fetchError).Times(3) + // fetcher will never error again afterwards + fetcher.On("Fetch", pushReporter).Return(nil) + // expect the error to be propagated via the pushReporter + pushReporter.On("Error", fetchError).Return(true).Times(3) + // expect the status degraded to be set 3 times + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Times(3) + // expect the status Running to be set once fetch recovers + statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Twice() + }, + iterations: 5, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 3: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i >= 3: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream set to running at iteration %d", i) + } + }, + }, + { + name: "failureThreshold = 2: status DEGRADED at the 3rd error", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + "failureThreshold": 2, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will immediately error out 3 times in a row + fetcher.On("Fetch", pushReporter).Return(fetchError).Times(3) + // expect the error to be propagated via the pushReporter at every iteration + pushReporter.On("Error", fetchError).Return(true).Times(3) + // expect the status degraded to be set + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() + }, + iterations: 3, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 2: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i == 2: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream not yet degraded at iteration %d", i) + } + }, + }, + { + name: "failureThreshold = 2: status HEALTHY after 2 errors, 1 success and 2 more errors, DEGRADED at the 3rd consecutive error", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + "failureThreshold": 2, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will error out 2 times in a row + fetcher.On("Fetch", pushReporter).Return(fetchError).Times(2) + // fetcher will then succeed once + fetcher.On("Fetch", pushReporter).Return(nil).Once() + // fetcher will error out 3 more times in a row + fetcher.On("Fetch", pushReporter).Return(fetchError).Times(3) + + // expect the error to be propagated via the pushReporter at every failing iteration + pushReporter.On("Error", fetchError).Return(true).Times(5) + // expect the status running to be set when there's no error returned by the fetcher at the 3rd iteration + statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Once() + // expect the status degraded to be set only once + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() + }, + iterations: 6, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 2: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i >= 2 && i < 5: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i == 5: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream not yet degraded at iteration %d", i) + } + }, + }, + { + name: "failureThreshold = -1: stream status update never become DEGRADED", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + "failureThreshold": -1, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will error out 9 times in a row + fetcher.On("Fetch", pushReporter).Return(fetchError).Times(9) + // fetcher will then succeed once + fetcher.On("Fetch", pushReporter).Return(nil).Once() + + // expect the error to be propagated via the pushReporter at every failing iteration + pushReporter.On("Error", fetchError).Return(true).Times(9) + // expect the status running to be set when there's no error returned by the fetcher at the 10th iteration + statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Once() + }, + iterations: 10, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 9: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i == 9: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + } + }, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + // Setup mock push reporter + mpr := new(mockPushReporterV2) + + // Setup mock fetcher + mrf := new(mockReportingFetcher) + + // Setup mock StatusReporter + msr := new(mockStatusReporter) + + //Setup mock reporter (ensure proper handling of intermediate calls, no functional value here) + mr := new(mockReporter) + mr.On("StartFetchTimer").Return() + mr.On("V2").Return(mpr) + + // assert mocks expectations + t.Cleanup(func() { + mock.AssertExpectationsForObjects(t, mrf, mr, mpr, msr) + }) + + // setup mocks before starting the test + if tc.setup != nil { + tc.setup(t, mrf, mpr, msr) + } + + // add metricset in registry + r := mb.NewRegister() + err := r.AddMetricSet(mockModuleName, mockMetricSetName, func(base mb.BaseMetricSet) (mb.MetricSet, error) { + mrf.BaseMetricSet = base + return mrf, nil + }) + require.NoError(t, err) + + aModule, metricSets, err := mb.NewModule(tc.config, r) + require.NoError(t, err) + + // Set the mock status reporter + aModule.SetStatusReporter(msr) + + moduleWrapper, err := NewWrapperForMetricSet(aModule, metricSets[0], WithMetricSetInfo()) + require.NoError(t, err) + + // run metricset synchronously + wrappedMetricSet := moduleWrapper.MetricSets()[0] + for i := 0; i < tc.iterations; i++ { + wrappedMetricSet.fetch(context.TODO(), mr) + if tc.assertIteration != nil { + tc.assertIteration(t, i, wrappedMetricSet, mrf, mpr, msr) + } + } + }) + } +} + +func TestWrapperHandleFetchErrorAsync(t *testing.T) { + + t.Skip("This test runs a mock wrapped metricset asynchronously. Preferring the synchronous test for now") + + // Setup mock push reporter + mpr := new(mockPushReporterV2) + + // Setup mock fetcher + mrf := new(mockReportingFetcher) + //mrf.On("Fetch", mpr).Return(nil) + + //Setup mock reporter + mr := new(mockReporter) + //mr.On("StartFetchTimer").Return() + //mr.On("V2").Return(mpr) + + // assert mocks expectations + t.Cleanup(func() { + mock.AssertExpectationsForObjects(t, mrf, mr, mpr) + }) + + // add metricset in registry + r := mb.NewRegister() + err := r.AddMetricSet(mockModuleName, mockMetricSetName, func(base mb.BaseMetricSet) (mb.MetricSet, error) { + mrf.BaseMetricSet = base + return mrf, nil + }) + require.NoError(t, err) + + hosts := []string{"testhost"} + c := newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": hosts, + "health": map[string]interface{}{ + "enabled": true, + "failureThreshold": 2, + }, + }) + + aModule, metricSets, err := mb.NewModule(c, r) + require.NoError(t, err) + + mWrapper, err := NewWrapperForMetricSet(aModule, metricSets[0], WithMetricSetInfo()) + require.NoError(t, err) + + require.Len(t, mWrapper.MetricSets(), 1) + + // run the metricset asynchronously + done := make(chan struct{}) + output := mWrapper.Start(done) + + wg := new(sync.WaitGroup) + outputConsumeLoop(t, wg, output, done, func(event beat.Event) { + t.Logf("received event: %v", event) + }) + time.Sleep(1 * time.Second) + + close(done) + wg.Wait() +} + +func outputConsumeLoop(t *testing.T, wg *sync.WaitGroup, output <-chan beat.Event, done chan struct{}, ehf eventHandlingTestFunc) { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case e := <-output: + ehf(e) + case <-done: + // finish consuming and return + t.Log("test done, consuming remaining events") + for e := range output { + ehf(e) + } + t.Log("done consuming events") + return + } + } + }() +} + +type eventHandlingTestFunc func(beat.Event) + +func newConfig(t testing.TB, moduleConfig interface{}) *conf.C { + config, err := conf.NewConfigFrom(moduleConfig) + require.NoError(t, err) + return config +} From 7ce88092714d423e22ff745b779bbe1ce35b2b55 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Mon, 11 Nov 2024 09:07:49 +0100 Subject: [PATCH 02/10] remove async unit test for metribeat stream failureThreshold --- metricbeat/mb/module/wrapper_internal_test.go | 89 ------------------- 1 file changed, 89 deletions(-) diff --git a/metricbeat/mb/module/wrapper_internal_test.go b/metricbeat/mb/module/wrapper_internal_test.go index a862b3da32f..1e1df86c288 100644 --- a/metricbeat/mb/module/wrapper_internal_test.go +++ b/metricbeat/mb/module/wrapper_internal_test.go @@ -20,11 +20,8 @@ package module import ( "context" "errors" - "sync" "testing" - "time" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/mb" conf "github.com/elastic/elastic-agent-libs/config" @@ -326,92 +323,6 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { } } -func TestWrapperHandleFetchErrorAsync(t *testing.T) { - - t.Skip("This test runs a mock wrapped metricset asynchronously. Preferring the synchronous test for now") - - // Setup mock push reporter - mpr := new(mockPushReporterV2) - - // Setup mock fetcher - mrf := new(mockReportingFetcher) - //mrf.On("Fetch", mpr).Return(nil) - - //Setup mock reporter - mr := new(mockReporter) - //mr.On("StartFetchTimer").Return() - //mr.On("V2").Return(mpr) - - // assert mocks expectations - t.Cleanup(func() { - mock.AssertExpectationsForObjects(t, mrf, mr, mpr) - }) - - // add metricset in registry - r := mb.NewRegister() - err := r.AddMetricSet(mockModuleName, mockMetricSetName, func(base mb.BaseMetricSet) (mb.MetricSet, error) { - mrf.BaseMetricSet = base - return mrf, nil - }) - require.NoError(t, err) - - hosts := []string{"testhost"} - c := newConfig(t, map[string]interface{}{ - "module": mockModuleName, - "metricsets": []string{mockMetricSetName}, - "period": "100ms", - "hosts": hosts, - "health": map[string]interface{}{ - "enabled": true, - "failureThreshold": 2, - }, - }) - - aModule, metricSets, err := mb.NewModule(c, r) - require.NoError(t, err) - - mWrapper, err := NewWrapperForMetricSet(aModule, metricSets[0], WithMetricSetInfo()) - require.NoError(t, err) - - require.Len(t, mWrapper.MetricSets(), 1) - - // run the metricset asynchronously - done := make(chan struct{}) - output := mWrapper.Start(done) - - wg := new(sync.WaitGroup) - outputConsumeLoop(t, wg, output, done, func(event beat.Event) { - t.Logf("received event: %v", event) - }) - time.Sleep(1 * time.Second) - - close(done) - wg.Wait() -} - -func outputConsumeLoop(t *testing.T, wg *sync.WaitGroup, output <-chan beat.Event, done chan struct{}, ehf eventHandlingTestFunc) { - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case e := <-output: - ehf(e) - case <-done: - // finish consuming and return - t.Log("test done, consuming remaining events") - for e := range output { - ehf(e) - } - t.Log("done consuming events") - return - } - } - }() -} - -type eventHandlingTestFunc func(beat.Event) - func newConfig(t testing.TB, moduleConfig interface{}) *conf.C { config, err := conf.NewConfigFrom(moduleConfig) require.NoError(t, err) From 759502087fa3767385236e4a2d9d4b561297c040 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Mon, 11 Nov 2024 09:18:57 +0100 Subject: [PATCH 03/10] Change failure threshold to unsigned and change the threshold evaluation Change the failure threshold to be an unsigned integer: - if failureThreshold == 0, the feature is deactivated - if failureThreshold == n, where n > 0, the stream will be marked DEGRADED after n consecutive errors This changes the previous logic that was zero-based, had 2 values for failing after the first error (0 and 1) and was generally weirder to look at (to have a stream fail after 3 errors we had to set failureThreshold=2) --- metricbeat/mb/module/wrapper.go | 25 +++++++++++-------- metricbeat/mb/module/wrapper_internal_test.go | 12 ++++----- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index a688c5d2365..0af72596eba 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -71,9 +71,9 @@ type metricSetWrapper struct { stats *stats // stats for this MetricSet. periodic bool // Set to true if this metricset is a periodic fetcher - failureThreshold int // threshold of consecutive errors needed to set the stream as degraded + failureThreshold uint // threshold of consecutive errors needed to set the stream as degraded - consecutiveErrors int // consecutive errors counter + consecutiveErrors uint // consecutive errors counter } // stats bundles common metricset stats. @@ -105,19 +105,24 @@ func createWrapper(module mb.Module, metricSets []mb.MetricSet, options ...Optio metricSets: make([]*metricSetWrapper, len(metricSets)), } - //FIXME provide proper struct - var hs struct { - FailureThreshold int `config:"failureThreshold"` + for _, applyOption := range options { + applyOption(wrapper) } - err := module.UnpackConfig(&hs) + failureThreshold := uint(1) + + var streamHealthSettings struct { + FailureThreshold *uint `config:"failureThreshold"` + } + + err := module.UnpackConfig(&streamHealthSettings) if err != nil { return nil, fmt.Errorf("unpacking raw config: %w", err) } - for _, applyOption := range options { - applyOption(wrapper) + if streamHealthSettings.FailureThreshold != nil { + failureThreshold = *streamHealthSettings.FailureThreshold } for i, metricSet := range metricSets { @@ -125,7 +130,7 @@ func createWrapper(module mb.Module, metricSets []mb.MetricSet, options ...Optio MetricSet: metricSet, module: wrapper, stats: getMetricSetStats(wrapper.Name(), metricSet.Name()), - failureThreshold: hs.FailureThreshold, + failureThreshold: failureThreshold, } } return wrapper, nil @@ -311,7 +316,7 @@ func (msw *metricSetWrapper) handleFetchError(err error, reporter mb.PushReporte msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) } else { msw.consecutiveErrors++ - if msw.failureThreshold >= 0 && msw.consecutiveErrors > msw.failureThreshold { + if msw.failureThreshold > 0 && msw.consecutiveErrors >= msw.failureThreshold { // mark it as degraded for any other issue encountered msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) } diff --git a/metricbeat/mb/module/wrapper_internal_test.go b/metricbeat/mb/module/wrapper_internal_test.go index 1e1df86c288..a202237a599 100644 --- a/metricbeat/mb/module/wrapper_internal_test.go +++ b/metricbeat/mb/module/wrapper_internal_test.go @@ -169,13 +169,13 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { }, }, { - name: "failureThreshold = 2: status DEGRADED at the 3rd error", + name: "failureThreshold = 3: status DEGRADED at the 3rd error", config: newConfig(t, map[string]interface{}{ "module": mockModuleName, "metricsets": []string{mockMetricSetName}, "period": "100ms", "hosts": []string{"testhost"}, - "failureThreshold": 2, + "failureThreshold": 3, }), setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { // fetcher will immediately error out 3 times in a row @@ -197,13 +197,13 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { }, }, { - name: "failureThreshold = 2: status HEALTHY after 2 errors, 1 success and 2 more errors, DEGRADED at the 3rd consecutive error", + name: "failureThreshold = 3: status HEALTHY after 2 errors, 1 success and 2 more errors, DEGRADED at the 3rd consecutive error", config: newConfig(t, map[string]interface{}{ "module": mockModuleName, "metricsets": []string{mockMetricSetName}, "period": "100ms", "hosts": []string{"testhost"}, - "failureThreshold": 2, + "failureThreshold": 3, }), setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { // fetcher will error out 2 times in a row @@ -235,13 +235,13 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { }, }, { - name: "failureThreshold = -1: stream status update never become DEGRADED", + name: "failureThreshold = 0: stream status update never become DEGRADED", config: newConfig(t, map[string]interface{}{ "module": mockModuleName, "metricsets": []string{mockMetricSetName}, "period": "100ms", "hosts": []string{"testhost"}, - "failureThreshold": -1, + "failureThreshold": 0, }), setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { // fetcher will error out 9 times in a row From 50e23365ba024bfe8bd0a2c2beb68c9e0796a212 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Mon, 11 Nov 2024 12:09:04 +0100 Subject: [PATCH 04/10] use switch statement in handleFetchError --- metricbeat/mb/module/wrapper.go | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index 0af72596eba..9887b911b27 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -308,21 +308,25 @@ func (msw *metricSetWrapper) Test(d testing.Driver) { } func (msw *metricSetWrapper) handleFetchError(err error, reporter mb.PushReporterV2) { - if err != nil { + switch { + + case errors.As(err, &mb.PartialMetricsError{}): reporter.Error(err) - if errors.As(err, &mb.PartialMetricsError{}) { - msw.consecutiveErrors = 0 - // mark module as running if metrics are partially available and display the error message - msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) - } else { - msw.consecutiveErrors++ - if msw.failureThreshold > 0 && msw.consecutiveErrors >= msw.failureThreshold { - // mark it as degraded for any other issue encountered - msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) - } + msw.consecutiveErrors = 0 + // mark module as running if metrics are partially available and display the error message + msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) + logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) + + case err != nil: + reporter.Error(err) + msw.consecutiveErrors++ + if msw.failureThreshold > 0 && msw.consecutiveErrors >= msw.failureThreshold { + // mark it as degraded for any other issue encountered + msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) } logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) - } else { + + default: msw.consecutiveErrors = 0 msw.module.UpdateStatus(status.Running, "") } From 4d93fccf800b715104977cd82fb1bf2527a8d556 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Tue, 12 Nov 2024 10:02:27 +0100 Subject: [PATCH 05/10] Add unit tests for ReportingMetricSetV2WithContext --- metricbeat/mb/module/wrapper_internal_test.go | 615 ++++++++++++------ 1 file changed, 420 insertions(+), 195 deletions(-) diff --git a/metricbeat/mb/module/wrapper_internal_test.go b/metricbeat/mb/module/wrapper_internal_test.go index a202237a599..64a63cf5484 100644 --- a/metricbeat/mb/module/wrapper_internal_test.go +++ b/metricbeat/mb/module/wrapper_internal_test.go @@ -108,219 +108,444 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { fetchError := errors.New("fetch has gone all wrong") - type setupFunc func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) - type postIterationAssertFunc func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) - - testcases := []struct { - name string - config *conf.C - setup setupFunc - iterations int - assertIteration postIterationAssertFunc - }{ - { - name: "no failureThreshold: status DEGRADED after first error", - config: newConfig(t, map[string]interface{}{ - "module": mockModuleName, - "metricsets": []string{mockMetricSetName}, - "period": "100ms", - "hosts": []string{"testhost"}, - }), - setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { - // fetcher will immediately error out - fetcher.On("Fetch", pushReporter).Return(fetchError).Once() - // expect the error to be propagated via the pushReporter - pushReporter.On("Error", fetchError).Return(true).Once() - // expect the status degraded to be set - statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() + t.Run("ReportingMetricSetV2Error", func(t *testing.T) { + type setupFunc func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) + type postIterationAssertFunc func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) + + testcases := []struct { + name string + config *conf.C + setup setupFunc + iterations int + assertIteration postIterationAssertFunc + }{ + { + name: "no failureThreshold: status DEGRADED after first error", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will immediately error out + fetcher.On("Fetch", pushReporter).Return(fetchError).Once() + + // expect the error to be propagated via the pushReporter + pushReporter.On("Error", fetchError).Return(true).Once() + // expect the status degraded to be set + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() + }, + iterations: 1, + assertIteration: nil, }, - iterations: 1, - assertIteration: nil, - }, - { - name: "no failureThreshold: status DEGRADED after first error, reset to Running after first successful fetch", - config: newConfig(t, map[string]interface{}{ - "module": mockModuleName, - "metricsets": []string{mockMetricSetName}, - "period": "100ms", - "hosts": []string{"testhost"}, - }), - setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { - // fetcher will immediately error out 3 times - fetcher.On("Fetch", pushReporter).Return(fetchError).Times(3) - // fetcher will never error again afterwards - fetcher.On("Fetch", pushReporter).Return(nil) - // expect the error to be propagated via the pushReporter - pushReporter.On("Error", fetchError).Return(true).Times(3) - // expect the status degraded to be set 3 times - statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Times(3) - // expect the status Running to be set once fetch recovers - statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Twice() + { + name: "no failureThreshold: status DEGRADED after first error, reset to Running after first successful fetch", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will immediately error out 3 times + fetcher.On("Fetch", pushReporter).Return(fetchError).Times(3) + // fetcher will never error again afterwards + fetcher.On("Fetch", pushReporter).Return(nil) + // expect the error to be propagated via the pushReporter + pushReporter.On("Error", fetchError).Return(true).Times(3) + // expect the status degraded to be set 3 times + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Times(3) + // expect the status Running to be set once fetch recovers + statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Twice() + }, + iterations: 5, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 3: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i >= 3: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream set to running at iteration %d", i) + } + }, }, - iterations: 5, - assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { - t.Logf("Assertion after iteration %d", i) - switch { - case i < 3: - assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) - case i >= 3: - assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream set to running at iteration %d", i) - } - }, - }, - { - name: "failureThreshold = 3: status DEGRADED at the 3rd error", - config: newConfig(t, map[string]interface{}{ - "module": mockModuleName, - "metricsets": []string{mockMetricSetName}, - "period": "100ms", - "hosts": []string{"testhost"}, - "failureThreshold": 3, - }), - setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { - // fetcher will immediately error out 3 times in a row - fetcher.On("Fetch", pushReporter).Return(fetchError).Times(3) - // expect the error to be propagated via the pushReporter at every iteration - pushReporter.On("Error", fetchError).Return(true).Times(3) - // expect the status degraded to be set - statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() - }, - iterations: 3, - assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { - t.Logf("Assertion after iteration %d", i) - switch { - case i < 2: - assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) - case i == 2: - assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream not yet degraded at iteration %d", i) - } - }, - }, - { - name: "failureThreshold = 3: status HEALTHY after 2 errors, 1 success and 2 more errors, DEGRADED at the 3rd consecutive error", - config: newConfig(t, map[string]interface{}{ - "module": mockModuleName, - "metricsets": []string{mockMetricSetName}, - "period": "100ms", - "hosts": []string{"testhost"}, - "failureThreshold": 3, - }), - setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { - // fetcher will error out 2 times in a row - fetcher.On("Fetch", pushReporter).Return(fetchError).Times(2) - // fetcher will then succeed once - fetcher.On("Fetch", pushReporter).Return(nil).Once() - // fetcher will error out 3 more times in a row - fetcher.On("Fetch", pushReporter).Return(fetchError).Times(3) - - // expect the error to be propagated via the pushReporter at every failing iteration - pushReporter.On("Error", fetchError).Return(true).Times(5) - // expect the status running to be set when there's no error returned by the fetcher at the 3rd iteration - statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Once() - // expect the status degraded to be set only once - statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() + { + name: "failureThreshold = 3: status DEGRADED at the 3rd error", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + "failureThreshold": 3, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will immediately error out 3 times in a row + fetcher.On("Fetch", pushReporter).Return(fetchError).Times(3) + // expect the error to be propagated via the pushReporter at every iteration + pushReporter.On("Error", fetchError).Return(true).Times(3) + // expect the status degraded to be set + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() + }, + iterations: 3, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 2: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i == 2: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream not yet degraded at iteration %d", i) + } + }, }, - iterations: 6, - assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { - t.Logf("Assertion after iteration %d", i) - switch { - case i < 2: - assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) - case i >= 2 && i < 5: - assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) - assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) - case i == 5: - assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream not yet degraded at iteration %d", i) - } + { + name: "failureThreshold = 3: status HEALTHY after 2 errors, 1 success and 2 more errors, DEGRADED at the 3rd consecutive error", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + "failureThreshold": 3, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will error out 2 times in a row + fetcher.On("Fetch", pushReporter).Return(fetchError).Times(2) + // fetcher will then succeed once + fetcher.On("Fetch", pushReporter).Return(nil).Once() + // fetcher will error out 3 more times in a row + fetcher.On("Fetch", pushReporter).Return(fetchError).Times(3) + + // expect the error to be propagated via the pushReporter at every failing iteration + pushReporter.On("Error", fetchError).Return(true).Times(5) + // expect the status running to be set when there's no error returned by the fetcher at the 3rd iteration + statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Once() + // expect the status degraded to be set only once + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() + }, + iterations: 6, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 2: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i >= 2 && i < 5: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i == 5: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream not yet degraded at iteration %d", i) + } + }, }, - }, - { - name: "failureThreshold = 0: stream status update never become DEGRADED", - config: newConfig(t, map[string]interface{}{ - "module": mockModuleName, - "metricsets": []string{mockMetricSetName}, - "period": "100ms", - "hosts": []string{"testhost"}, - "failureThreshold": 0, - }), - setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { - // fetcher will error out 9 times in a row - fetcher.On("Fetch", pushReporter).Return(fetchError).Times(9) - // fetcher will then succeed once - fetcher.On("Fetch", pushReporter).Return(nil).Once() - - // expect the error to be propagated via the pushReporter at every failing iteration - pushReporter.On("Error", fetchError).Return(true).Times(9) - // expect the status running to be set when there's no error returned by the fetcher at the 10th iteration - statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Once() + { + name: "failureThreshold = 0: stream status update never become DEGRADED", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + "failureThreshold": 0, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will error out 9 times in a row + fetcher.On("Fetch", pushReporter).Return(fetchError).Times(9) + // fetcher will then succeed once + fetcher.On("Fetch", pushReporter).Return(nil).Once() + + // expect the error to be propagated via the pushReporter at every failing iteration + pushReporter.On("Error", fetchError).Return(true).Times(9) + // expect the status running to be set when there's no error returned by the fetcher at the 10th iteration + statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Once() + }, + iterations: 10, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 9: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i == 9: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + } + }, }, - iterations: 10, - assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { - t.Logf("Assertion after iteration %d", i) - switch { - case i < 9: - assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) - case i == 9: - assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) - assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) - } - }, - }, - } + } - for _, tc := range testcases { - t.Run(tc.name, func(t *testing.T) { - // Setup mock push reporter - mpr := new(mockPushReporterV2) + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + // Setup mock push reporter + mpr := new(mockPushReporterV2) - // Setup mock fetcher - mrf := new(mockReportingFetcher) + // Setup mock fetcher + mrf := new(mockReportingFetcher) - // Setup mock StatusReporter - msr := new(mockStatusReporter) + // Setup mock StatusReporter + msr := new(mockStatusReporter) - //Setup mock reporter (ensure proper handling of intermediate calls, no functional value here) - mr := new(mockReporter) - mr.On("StartFetchTimer").Return() - mr.On("V2").Return(mpr) + //Setup mock reporter (ensure proper handling of intermediate calls, no functional value here) + mr := new(mockReporter) + mr.On("StartFetchTimer").Return() + mr.On("V2").Return(mpr) - // assert mocks expectations - t.Cleanup(func() { - mock.AssertExpectationsForObjects(t, mrf, mr, mpr, msr) - }) + // assert mocks expectations + t.Cleanup(func() { + mock.AssertExpectationsForObjects(t, mrf, mr, mpr, msr) + }) - // setup mocks before starting the test - if tc.setup != nil { - tc.setup(t, mrf, mpr, msr) - } + // setup mocks before starting the test + if tc.setup != nil { + tc.setup(t, mrf, mpr, msr) + } - // add metricset in registry - r := mb.NewRegister() - err := r.AddMetricSet(mockModuleName, mockMetricSetName, func(base mb.BaseMetricSet) (mb.MetricSet, error) { - mrf.BaseMetricSet = base - return mrf, nil + // add metricset in registry + r := mb.NewRegister() + err := r.AddMetricSet(mockModuleName, mockMetricSetName, func(base mb.BaseMetricSet) (mb.MetricSet, error) { + mrf.BaseMetricSet = base + return mrf, nil + }) + require.NoError(t, err) + + aModule, metricSets, err := mb.NewModule(tc.config, r) + require.NoError(t, err) + + // Set the mock status reporter + aModule.SetStatusReporter(msr) + + moduleWrapper, err := NewWrapperForMetricSet(aModule, metricSets[0], WithMetricSetInfo()) + require.NoError(t, err) + + // run metricset synchronously + wrappedMetricSet := moduleWrapper.MetricSets()[0] + for i := 0; i < tc.iterations; i++ { + wrappedMetricSet.fetch(context.TODO(), mr) + if tc.assertIteration != nil { + tc.assertIteration(t, i, wrappedMetricSet, mrf, mpr, msr) + } + } }) - require.NoError(t, err) + } + }) + + t.Run("ReportingMetricSetV2WithContext", func(t *testing.T) { + // These tests are the same as ReportingMetricSetV2Error, duplicated here because the generic solution to specify + // testcases only once is awkward and not very readable + + type setupFunc func(t *testing.T, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) + type postIterationAssertFunc func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) + + fetchCtx := context.TODO() + + testcases := []struct { + name string + config *conf.C + setup setupFunc + iterations int + assertIteration postIterationAssertFunc + }{ + { + name: "no failureThreshold: status DEGRADED after first error", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will immediately error out + fetcher.On("Fetch", fetchCtx, pushReporter).Return(fetchError).Once() + + // expect the error to be propagated via the pushReporter + pushReporter.On("Error", fetchError).Return(true).Once() + // expect the status degraded to be set + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() + }, + iterations: 1, + assertIteration: nil, + }, + { + name: "no failureThreshold: status DEGRADED after first error, reset to Running after first successful fetch", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will immediately error out 3 times + fetcher.On("Fetch", fetchCtx, pushReporter).Return(fetchError).Times(3) + // fetcher will never error again afterwards + fetcher.On("Fetch", fetchCtx, pushReporter).Return(nil) + // expect the error to be propagated via the pushReporter + pushReporter.On("Error", fetchError).Return(true).Times(3) + // expect the status degraded to be set 3 times + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Times(3) + // expect the status Running to be set once fetch recovers + statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Twice() + }, + iterations: 5, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 3: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i >= 3: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream set to running at iteration %d", i) + } + }, + }, + { + name: "failureThreshold = 3: status DEGRADED at the 3rd error", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + "failureThreshold": 3, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will immediately error out 3 times in a row + fetcher.On("Fetch", fetchCtx, pushReporter).Return(fetchError).Times(3) + // expect the error to be propagated via the pushReporter at every iteration + pushReporter.On("Error", fetchError).Return(true).Times(3) + // expect the status degraded to be set + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() + }, + iterations: 3, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 2: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i == 2: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream not yet degraded at iteration %d", i) + } + }, + }, + { + name: "failureThreshold = 3: status HEALTHY after 2 errors, 1 success and 2 more errors, DEGRADED at the 3rd consecutive error", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + "failureThreshold": 3, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will error out 2 times in a row + fetcher.On("Fetch", fetchCtx, pushReporter).Return(fetchError).Times(2) + // fetcher will then succeed once + fetcher.On("Fetch", fetchCtx, pushReporter).Return(nil).Once() + // fetcher will error out 3 more times in a row + fetcher.On("Fetch", fetchCtx, pushReporter).Return(fetchError).Times(3) + + // expect the error to be propagated via the pushReporter at every failing iteration + pushReporter.On("Error", fetchError).Return(true).Times(5) + // expect the status running to be set when there's no error returned by the fetcher at the 3rd iteration + statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Once() + // expect the status degraded to be set only once + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() + }, + iterations: 6, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 2: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i >= 2 && i < 5: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i == 5: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream not yet degraded at iteration %d", i) + } + }, + }, + { + name: "failureThreshold = 0: stream status update never become DEGRADED", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + "failureThreshold": 0, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will error out 9 times in a row + fetcher.On("Fetch", fetchCtx, pushReporter).Return(fetchError).Times(9) + // fetcher will then succeed once + fetcher.On("Fetch", fetchCtx, pushReporter).Return(nil).Once() + + // expect the error to be propagated via the pushReporter at every failing iteration + pushReporter.On("Error", fetchError).Return(true).Times(9) + // expect the status running to be set when there's no error returned by the fetcher at the 10th iteration + statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Once() + }, + iterations: 10, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 9: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i == 9: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + } + }, + }, + } - aModule, metricSets, err := mb.NewModule(tc.config, r) - require.NoError(t, err) + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + // Setup mock push reporter + mpr := new(mockPushReporterV2) - // Set the mock status reporter - aModule.SetStatusReporter(msr) + // Setup mock fetcher + mrf := new(mockReportingFetcherWithContext) - moduleWrapper, err := NewWrapperForMetricSet(aModule, metricSets[0], WithMetricSetInfo()) - require.NoError(t, err) + // Setup mock StatusReporter + msr := new(mockStatusReporter) - // run metricset synchronously - wrappedMetricSet := moduleWrapper.MetricSets()[0] - for i := 0; i < tc.iterations; i++ { - wrappedMetricSet.fetch(context.TODO(), mr) - if tc.assertIteration != nil { - tc.assertIteration(t, i, wrappedMetricSet, mrf, mpr, msr) + //Setup mock reporter (ensure proper handling of intermediate calls, no functional value here) + mr := new(mockReporter) + mr.On("StartFetchTimer").Return() + mr.On("V2").Return(mpr) + + // assert mocks expectations + t.Cleanup(func() { + mock.AssertExpectationsForObjects(t, mrf, mr, mpr, msr) + }) + + // setup mocks before starting the test + if tc.setup != nil { + tc.setup(t, mrf, mpr, msr) } - } - }) - } + + // add metricset in registry + r := mb.NewRegister() + err := r.AddMetricSet(mockModuleName, mockMetricSetName, func(base mb.BaseMetricSet) (mb.MetricSet, error) { + mrf.BaseMetricSet = base + return mrf, nil + }) + require.NoError(t, err) + + aModule, metricSets, err := mb.NewModule(tc.config, r) + require.NoError(t, err) + + // Set the mock status reporter + aModule.SetStatusReporter(msr) + + moduleWrapper, err := NewWrapperForMetricSet(aModule, metricSets[0], WithMetricSetInfo()) + require.NoError(t, err) + + // run metricset synchronously + wrappedMetricSet := moduleWrapper.MetricSets()[0] + for i := 0; i < tc.iterations; i++ { + wrappedMetricSet.fetch(context.TODO(), mr) + if tc.assertIteration != nil { + tc.assertIteration(t, i, wrappedMetricSet, mrf, mpr, msr) + } + } + }) + } + }) } func newConfig(t testing.TB, moduleConfig interface{}) *conf.C { From 47c90eb846f92739c162367b424f690d059c5b99 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Tue, 12 Nov 2024 11:10:51 +0100 Subject: [PATCH 06/10] Rename failureThreshold config key to failure_threshold --- metricbeat/mb/module/wrapper.go | 7 +- metricbeat/mb/module/wrapper_internal_test.go | 80 +++++++++---------- 2 files changed, 45 insertions(+), 42 deletions(-) diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index 9887b911b27..2ebfae2c0a1 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -36,11 +36,14 @@ import ( "github.com/elastic/elastic-agent-libs/testing" ) -// Expvar metric names. const ( + // Expvar metric names. successesKey = "success" failuresKey = "failures" eventsKey = "events" + + // Failure threshold config key + failureThresholdKey = "failure_threshold" ) var ( @@ -112,7 +115,7 @@ func createWrapper(module mb.Module, metricSets []mb.MetricSet, options ...Optio failureThreshold := uint(1) var streamHealthSettings struct { - FailureThreshold *uint `config:"failureThreshold"` + FailureThreshold *uint `config:"failure_threshold"` } err := module.UnpackConfig(&streamHealthSettings) diff --git a/metricbeat/mb/module/wrapper_internal_test.go b/metricbeat/mb/module/wrapper_internal_test.go index 64a63cf5484..d24084200b6 100644 --- a/metricbeat/mb/module/wrapper_internal_test.go +++ b/metricbeat/mb/module/wrapper_internal_test.go @@ -120,7 +120,7 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { assertIteration postIterationAssertFunc }{ { - name: "no failureThreshold: status DEGRADED after first error", + name: "no failure_threshold: status DEGRADED after first error", config: newConfig(t, map[string]interface{}{ "module": mockModuleName, "metricsets": []string{mockMetricSetName}, @@ -140,7 +140,7 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { assertIteration: nil, }, { - name: "no failureThreshold: status DEGRADED after first error, reset to Running after first successful fetch", + name: "no failure_threshold: status DEGRADED after first error, reset to Running after first successful fetch", config: newConfig(t, map[string]interface{}{ "module": mockModuleName, "metricsets": []string{mockMetricSetName}, @@ -171,13 +171,13 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { }, }, { - name: "failureThreshold = 3: status DEGRADED at the 3rd error", + name: "failure_threshold = 3: status DEGRADED at the 3rd error", config: newConfig(t, map[string]interface{}{ - "module": mockModuleName, - "metricsets": []string{mockMetricSetName}, - "period": "100ms", - "hosts": []string{"testhost"}, - "failureThreshold": 3, + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + failureThresholdKey: 3, }), setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { // fetcher will immediately error out 3 times in a row @@ -199,13 +199,13 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { }, }, { - name: "failureThreshold = 3: status HEALTHY after 2 errors, 1 success and 2 more errors, DEGRADED at the 3rd consecutive error", + name: "failure_threshold = 3: status HEALTHY after 2 errors, 1 success and 2 more errors, DEGRADED at the 3rd consecutive error", config: newConfig(t, map[string]interface{}{ - "module": mockModuleName, - "metricsets": []string{mockMetricSetName}, - "period": "100ms", - "hosts": []string{"testhost"}, - "failureThreshold": 3, + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + failureThresholdKey: 3, }), setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { // fetcher will error out 2 times in a row @@ -237,13 +237,13 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { }, }, { - name: "failureThreshold = 0: stream status update never become DEGRADED", + name: "failure_threshold = 0: stream status update never become DEGRADED", config: newConfig(t, map[string]interface{}{ - "module": mockModuleName, - "metricsets": []string{mockMetricSetName}, - "period": "100ms", - "hosts": []string{"testhost"}, - "failureThreshold": 0, + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + failureThresholdKey: 0, }), setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { // fetcher will error out 9 times in a row @@ -342,7 +342,7 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { assertIteration postIterationAssertFunc }{ { - name: "no failureThreshold: status DEGRADED after first error", + name: "no failure_threshold: status DEGRADED after first error", config: newConfig(t, map[string]interface{}{ "module": mockModuleName, "metricsets": []string{mockMetricSetName}, @@ -362,7 +362,7 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { assertIteration: nil, }, { - name: "no failureThreshold: status DEGRADED after first error, reset to Running after first successful fetch", + name: "no failure_threshold: status DEGRADED after first error, reset to Running after first successful fetch", config: newConfig(t, map[string]interface{}{ "module": mockModuleName, "metricsets": []string{mockMetricSetName}, @@ -393,13 +393,13 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { }, }, { - name: "failureThreshold = 3: status DEGRADED at the 3rd error", + name: "failure_threshold = 3: status DEGRADED at the 3rd error", config: newConfig(t, map[string]interface{}{ - "module": mockModuleName, - "metricsets": []string{mockMetricSetName}, - "period": "100ms", - "hosts": []string{"testhost"}, - "failureThreshold": 3, + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + failureThresholdKey: 3, }), setup: func(t *testing.T, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { // fetcher will immediately error out 3 times in a row @@ -421,13 +421,13 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { }, }, { - name: "failureThreshold = 3: status HEALTHY after 2 errors, 1 success and 2 more errors, DEGRADED at the 3rd consecutive error", + name: "failure_threshold = 3: status HEALTHY after 2 errors, 1 success and 2 more errors, DEGRADED at the 3rd consecutive error", config: newConfig(t, map[string]interface{}{ - "module": mockModuleName, - "metricsets": []string{mockMetricSetName}, - "period": "100ms", - "hosts": []string{"testhost"}, - "failureThreshold": 3, + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + failureThresholdKey: 3, }), setup: func(t *testing.T, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { // fetcher will error out 2 times in a row @@ -459,13 +459,13 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { }, }, { - name: "failureThreshold = 0: stream status update never become DEGRADED", + name: "failure_threshold = 0: stream status update never become DEGRADED", config: newConfig(t, map[string]interface{}{ - "module": mockModuleName, - "metricsets": []string{mockMetricSetName}, - "period": "100ms", - "hosts": []string{"testhost"}, - "failureThreshold": 0, + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + failureThresholdKey: 0, }), setup: func(t *testing.T, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { // fetcher will error out 9 times in a row From 25bc54e20fe7f533aeb8d0454fcd7fb4008be781 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Tue, 12 Nov 2024 11:23:13 +0100 Subject: [PATCH 07/10] linting --- metricbeat/mb/module/wrapper_internal_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/metricbeat/mb/module/wrapper_internal_test.go b/metricbeat/mb/module/wrapper_internal_test.go index d24084200b6..8bfed1d4d8d 100644 --- a/metricbeat/mb/module/wrapper_internal_test.go +++ b/metricbeat/mb/module/wrapper_internal_test.go @@ -32,7 +32,6 @@ import ( const mockModuleName = "MockModule" const mockMetricSetName = "MockMetricSet" -const mockMetricSetWithContextName = "MockMetricSetWithContext" // mockReportingFetcher type mockReportingFetcher struct { @@ -67,7 +66,7 @@ func (mr *mockReporter) StartFetchTimer() { func (mr *mockReporter) V1() mb.PushReporter { //nolint:staticcheck // PushReporter is deprecated but not removed args := mr.Called() - return args.Get(0).(mb.PushReporter) + return args.Get(0).(mb.PushReporter) //nolint:staticcheck // PushReporter is deprecated but not removed } func (mr *mockReporter) V2() mb.PushReporterV2 { From b788b439db8b9213fce606af526c576dfc2eaba9 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Tue, 12 Nov 2024 15:35:13 +0100 Subject: [PATCH 08/10] Fix imports --- metricbeat/mb/module/wrapper_internal_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/metricbeat/mb/module/wrapper_internal_test.go b/metricbeat/mb/module/wrapper_internal_test.go index 8bfed1d4d8d..923f73febdb 100644 --- a/metricbeat/mb/module/wrapper_internal_test.go +++ b/metricbeat/mb/module/wrapper_internal_test.go @@ -22,12 +22,13 @@ import ( "errors" "testing" - "github.com/elastic/beats/v7/libbeat/management/status" - "github.com/elastic/beats/v7/metricbeat/mb" - conf "github.com/elastic/elastic-agent-libs/config" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/management/status" + "github.com/elastic/beats/v7/metricbeat/mb" + conf "github.com/elastic/elastic-agent-libs/config" ) const mockModuleName = "MockModule" From b32f635c7edfbaaa3a2aa68dcb4698da163c025a Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Tue, 12 Nov 2024 17:01:58 +0100 Subject: [PATCH 09/10] flip case statements in 'metricsetWrapper.handleFetchError()' --- metricbeat/mb/module/wrapper.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index 2ebfae2c0a1..b42b93d6b49 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -312,6 +312,9 @@ func (msw *metricSetWrapper) Test(d testing.Driver) { func (msw *metricSetWrapper) handleFetchError(err error, reporter mb.PushReporterV2) { switch { + case err == nil: + msw.consecutiveErrors = 0 + msw.module.UpdateStatus(status.Running, "") case errors.As(err, &mb.PartialMetricsError{}): reporter.Error(err) @@ -320,7 +323,7 @@ func (msw *metricSetWrapper) handleFetchError(err error, reporter mb.PushReporte msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) - case err != nil: + default: reporter.Error(err) msw.consecutiveErrors++ if msw.failureThreshold > 0 && msw.consecutiveErrors >= msw.failureThreshold { @@ -329,9 +332,6 @@ func (msw *metricSetWrapper) handleFetchError(err error, reporter mb.PushReporte } logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) - default: - msw.consecutiveErrors = 0 - msw.module.UpdateStatus(status.Running, "") } } From caeafcb5c4a040ebd75e1341973de124f92ca251 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Mon, 18 Nov 2024 19:10:57 +0100 Subject: [PATCH 10/10] move consecutiveFailures counter to metricSetWrapper.stat struct --- metricbeat/mb/module/wrapper.go | 39 ++++++++++--------- metricbeat/mb/module/wrapper_internal_test.go | 12 ++++++ 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index b42b93d6b49..4681976f2e1 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -38,9 +38,10 @@ import ( const ( // Expvar metric names. - successesKey = "success" - failuresKey = "failures" - eventsKey = "events" + successesKey = "success" + failuresKey = "failures" + eventsKey = "events" + consecutiveFailuresKey = "consecutive_failures" // Failure threshold config key failureThresholdKey = "failure_threshold" @@ -75,17 +76,16 @@ type metricSetWrapper struct { periodic bool // Set to true if this metricset is a periodic fetcher failureThreshold uint // threshold of consecutive errors needed to set the stream as degraded - - consecutiveErrors uint // consecutive errors counter } // stats bundles common metricset stats. type stats struct { - key string // full stats key - ref uint32 // number of modules/metricsets reusing stats instance - success *monitoring.Int // Total success events. - failures *monitoring.Int // Total error events. - events *monitoring.Int // Total events published. + key string // full stats key + ref uint32 // number of modules/metricsets reusing stats instance + success *monitoring.Int // Total success events. + failures *monitoring.Int // Total error events. + events *monitoring.Int // Total events published. + consecutiveFailures *monitoring.Uint // Consecutive failures fetching this metricset } // NewWrapper creates a new module and its associated metricsets based on the given configuration. @@ -313,20 +313,20 @@ func (msw *metricSetWrapper) Test(d testing.Driver) { func (msw *metricSetWrapper) handleFetchError(err error, reporter mb.PushReporterV2) { switch { case err == nil: - msw.consecutiveErrors = 0 + msw.stats.consecutiveFailures.Set(0) msw.module.UpdateStatus(status.Running, "") case errors.As(err, &mb.PartialMetricsError{}): reporter.Error(err) - msw.consecutiveErrors = 0 + msw.stats.consecutiveFailures.Set(0) // mark module as running if metrics are partially available and display the error message msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) default: reporter.Error(err) - msw.consecutiveErrors++ - if msw.failureThreshold > 0 && msw.consecutiveErrors >= msw.failureThreshold { + msw.stats.consecutiveFailures.Inc() + if msw.failureThreshold > 0 && msw.stats.consecutiveFailures != nil && uint(msw.stats.consecutiveFailures.Get()) >= msw.failureThreshold { // mark it as degraded for any other issue encountered msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) } @@ -461,11 +461,12 @@ func getMetricSetStats(module, name string) *stats { reg := monitoring.Default.NewRegistry(key) s := &stats{ - key: key, - ref: 1, - success: monitoring.NewInt(reg, successesKey), - failures: monitoring.NewInt(reg, failuresKey), - events: monitoring.NewInt(reg, eventsKey), + key: key, + ref: 1, + success: monitoring.NewInt(reg, successesKey), + failures: monitoring.NewInt(reg, failuresKey), + events: monitoring.NewInt(reg, eventsKey), + consecutiveFailures: monitoring.NewUint(reg, consecutiveFailuresKey), } fetches[key] = s diff --git a/metricbeat/mb/module/wrapper_internal_test.go b/metricbeat/mb/module/wrapper_internal_test.go index 923f73febdb..a9b242e55e2 100644 --- a/metricbeat/mb/module/wrapper_internal_test.go +++ b/metricbeat/mb/module/wrapper_internal_test.go @@ -315,6 +315,12 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { // run metricset synchronously wrappedMetricSet := moduleWrapper.MetricSets()[0] + + t.Cleanup(func() { + // release stats structure across testcases + releaseStats(wrappedMetricSet.stats) + }) + for i := 0; i < tc.iterations; i++ { wrappedMetricSet.fetch(context.TODO(), mr) if tc.assertIteration != nil { @@ -537,6 +543,12 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { // run metricset synchronously wrappedMetricSet := moduleWrapper.MetricSets()[0] + + t.Cleanup(func() { + // release stats structure across testcases + releaseStats(wrappedMetricSet.stats) + }) + for i := 0; i < tc.iterations; i++ { wrappedMetricSet.fetch(context.TODO(), mr) if tc.assertIteration != nil {