diff --git a/metricbeat/mb/module/wrapper_internal_test.go b/metricbeat/mb/module/wrapper_internal_test.go index a202237a599b..64a63cf54843 100644 --- a/metricbeat/mb/module/wrapper_internal_test.go +++ b/metricbeat/mb/module/wrapper_internal_test.go @@ -108,219 +108,444 @@ func TestWrapperHandleFetchErrorSync(t *testing.T) { fetchError := errors.New("fetch has gone all wrong") - type setupFunc func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) - type postIterationAssertFunc func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) - - testcases := []struct { - name string - config *conf.C - setup setupFunc - iterations int - assertIteration postIterationAssertFunc - }{ - { - name: "no failureThreshold: status DEGRADED after first error", - config: newConfig(t, map[string]interface{}{ - "module": mockModuleName, - "metricsets": []string{mockMetricSetName}, - "period": "100ms", - "hosts": []string{"testhost"}, - }), - setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { - // fetcher will immediately error out - fetcher.On("Fetch", pushReporter).Return(fetchError).Once() - // expect the error to be propagated via the pushReporter - pushReporter.On("Error", fetchError).Return(true).Once() - // expect the status degraded to be set - statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() + t.Run("ReportingMetricSetV2Error", func(t *testing.T) { + type setupFunc func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) + type postIterationAssertFunc func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) + + testcases := []struct { + name string + config *conf.C + setup setupFunc + iterations int + assertIteration postIterationAssertFunc + }{ + { + name: "no failureThreshold: status DEGRADED after first error", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will immediately error out + fetcher.On("Fetch", pushReporter).Return(fetchError).Once() + + // expect the error to be propagated via the pushReporter + pushReporter.On("Error", fetchError).Return(true).Once() + // expect the status degraded to be set + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() + }, + iterations: 1, + assertIteration: nil, }, - iterations: 1, - assertIteration: nil, - }, - { - name: "no failureThreshold: status DEGRADED after first error, reset to Running after first successful fetch", - config: newConfig(t, map[string]interface{}{ - "module": mockModuleName, - "metricsets": []string{mockMetricSetName}, - "period": "100ms", - "hosts": []string{"testhost"}, - }), - setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { - // fetcher will immediately error out 3 times - fetcher.On("Fetch", pushReporter).Return(fetchError).Times(3) - // fetcher will never error again afterwards - fetcher.On("Fetch", pushReporter).Return(nil) - // expect the error to be propagated via the pushReporter - pushReporter.On("Error", fetchError).Return(true).Times(3) - // expect the status degraded to be set 3 times - statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Times(3) - // expect the status Running to be set once fetch recovers - statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Twice() + { + name: "no failureThreshold: status DEGRADED after first error, reset to Running after first successful fetch", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will immediately error out 3 times + fetcher.On("Fetch", pushReporter).Return(fetchError).Times(3) + // fetcher will never error again afterwards + fetcher.On("Fetch", pushReporter).Return(nil) + // expect the error to be propagated via the pushReporter + pushReporter.On("Error", fetchError).Return(true).Times(3) + // expect the status degraded to be set 3 times + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Times(3) + // expect the status Running to be set once fetch recovers + statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Twice() + }, + iterations: 5, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 3: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i >= 3: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream set to running at iteration %d", i) + } + }, }, - iterations: 5, - assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { - t.Logf("Assertion after iteration %d", i) - switch { - case i < 3: - assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) - case i >= 3: - assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream set to running at iteration %d", i) - } - }, - }, - { - name: "failureThreshold = 3: status DEGRADED at the 3rd error", - config: newConfig(t, map[string]interface{}{ - "module": mockModuleName, - "metricsets": []string{mockMetricSetName}, - "period": "100ms", - "hosts": []string{"testhost"}, - "failureThreshold": 3, - }), - setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { - // fetcher will immediately error out 3 times in a row - fetcher.On("Fetch", pushReporter).Return(fetchError).Times(3) - // expect the error to be propagated via the pushReporter at every iteration - pushReporter.On("Error", fetchError).Return(true).Times(3) - // expect the status degraded to be set - statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() - }, - iterations: 3, - assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { - t.Logf("Assertion after iteration %d", i) - switch { - case i < 2: - assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) - case i == 2: - assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream not yet degraded at iteration %d", i) - } - }, - }, - { - name: "failureThreshold = 3: status HEALTHY after 2 errors, 1 success and 2 more errors, DEGRADED at the 3rd consecutive error", - config: newConfig(t, map[string]interface{}{ - "module": mockModuleName, - "metricsets": []string{mockMetricSetName}, - "period": "100ms", - "hosts": []string{"testhost"}, - "failureThreshold": 3, - }), - setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { - // fetcher will error out 2 times in a row - fetcher.On("Fetch", pushReporter).Return(fetchError).Times(2) - // fetcher will then succeed once - fetcher.On("Fetch", pushReporter).Return(nil).Once() - // fetcher will error out 3 more times in a row - fetcher.On("Fetch", pushReporter).Return(fetchError).Times(3) - - // expect the error to be propagated via the pushReporter at every failing iteration - pushReporter.On("Error", fetchError).Return(true).Times(5) - // expect the status running to be set when there's no error returned by the fetcher at the 3rd iteration - statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Once() - // expect the status degraded to be set only once - statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() + { + name: "failureThreshold = 3: status DEGRADED at the 3rd error", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + "failureThreshold": 3, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will immediately error out 3 times in a row + fetcher.On("Fetch", pushReporter).Return(fetchError).Times(3) + // expect the error to be propagated via the pushReporter at every iteration + pushReporter.On("Error", fetchError).Return(true).Times(3) + // expect the status degraded to be set + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() + }, + iterations: 3, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 2: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i == 2: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream not yet degraded at iteration %d", i) + } + }, }, - iterations: 6, - assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { - t.Logf("Assertion after iteration %d", i) - switch { - case i < 2: - assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) - case i >= 2 && i < 5: - assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) - assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) - case i == 5: - assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream not yet degraded at iteration %d", i) - } + { + name: "failureThreshold = 3: status HEALTHY after 2 errors, 1 success and 2 more errors, DEGRADED at the 3rd consecutive error", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + "failureThreshold": 3, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will error out 2 times in a row + fetcher.On("Fetch", pushReporter).Return(fetchError).Times(2) + // fetcher will then succeed once + fetcher.On("Fetch", pushReporter).Return(nil).Once() + // fetcher will error out 3 more times in a row + fetcher.On("Fetch", pushReporter).Return(fetchError).Times(3) + + // expect the error to be propagated via the pushReporter at every failing iteration + pushReporter.On("Error", fetchError).Return(true).Times(5) + // expect the status running to be set when there's no error returned by the fetcher at the 3rd iteration + statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Once() + // expect the status degraded to be set only once + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() + }, + iterations: 6, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 2: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i >= 2 && i < 5: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i == 5: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream not yet degraded at iteration %d", i) + } + }, }, - }, - { - name: "failureThreshold = 0: stream status update never become DEGRADED", - config: newConfig(t, map[string]interface{}{ - "module": mockModuleName, - "metricsets": []string{mockMetricSetName}, - "period": "100ms", - "hosts": []string{"testhost"}, - "failureThreshold": 0, - }), - setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { - // fetcher will error out 9 times in a row - fetcher.On("Fetch", pushReporter).Return(fetchError).Times(9) - // fetcher will then succeed once - fetcher.On("Fetch", pushReporter).Return(nil).Once() - - // expect the error to be propagated via the pushReporter at every failing iteration - pushReporter.On("Error", fetchError).Return(true).Times(9) - // expect the status running to be set when there's no error returned by the fetcher at the 10th iteration - statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Once() + { + name: "failureThreshold = 0: stream status update never become DEGRADED", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + "failureThreshold": 0, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will error out 9 times in a row + fetcher.On("Fetch", pushReporter).Return(fetchError).Times(9) + // fetcher will then succeed once + fetcher.On("Fetch", pushReporter).Return(nil).Once() + + // expect the error to be propagated via the pushReporter at every failing iteration + pushReporter.On("Error", fetchError).Return(true).Times(9) + // expect the status running to be set when there's no error returned by the fetcher at the 10th iteration + statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Once() + }, + iterations: 10, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 9: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i == 9: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + } + }, }, - iterations: 10, - assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcher, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { - t.Logf("Assertion after iteration %d", i) - switch { - case i < 9: - assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) - case i == 9: - assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) - assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) - } - }, - }, - } + } - for _, tc := range testcases { - t.Run(tc.name, func(t *testing.T) { - // Setup mock push reporter - mpr := new(mockPushReporterV2) + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + // Setup mock push reporter + mpr := new(mockPushReporterV2) - // Setup mock fetcher - mrf := new(mockReportingFetcher) + // Setup mock fetcher + mrf := new(mockReportingFetcher) - // Setup mock StatusReporter - msr := new(mockStatusReporter) + // Setup mock StatusReporter + msr := new(mockStatusReporter) - //Setup mock reporter (ensure proper handling of intermediate calls, no functional value here) - mr := new(mockReporter) - mr.On("StartFetchTimer").Return() - mr.On("V2").Return(mpr) + //Setup mock reporter (ensure proper handling of intermediate calls, no functional value here) + mr := new(mockReporter) + mr.On("StartFetchTimer").Return() + mr.On("V2").Return(mpr) - // assert mocks expectations - t.Cleanup(func() { - mock.AssertExpectationsForObjects(t, mrf, mr, mpr, msr) - }) + // assert mocks expectations + t.Cleanup(func() { + mock.AssertExpectationsForObjects(t, mrf, mr, mpr, msr) + }) - // setup mocks before starting the test - if tc.setup != nil { - tc.setup(t, mrf, mpr, msr) - } + // setup mocks before starting the test + if tc.setup != nil { + tc.setup(t, mrf, mpr, msr) + } - // add metricset in registry - r := mb.NewRegister() - err := r.AddMetricSet(mockModuleName, mockMetricSetName, func(base mb.BaseMetricSet) (mb.MetricSet, error) { - mrf.BaseMetricSet = base - return mrf, nil + // add metricset in registry + r := mb.NewRegister() + err := r.AddMetricSet(mockModuleName, mockMetricSetName, func(base mb.BaseMetricSet) (mb.MetricSet, error) { + mrf.BaseMetricSet = base + return mrf, nil + }) + require.NoError(t, err) + + aModule, metricSets, err := mb.NewModule(tc.config, r) + require.NoError(t, err) + + // Set the mock status reporter + aModule.SetStatusReporter(msr) + + moduleWrapper, err := NewWrapperForMetricSet(aModule, metricSets[0], WithMetricSetInfo()) + require.NoError(t, err) + + // run metricset synchronously + wrappedMetricSet := moduleWrapper.MetricSets()[0] + for i := 0; i < tc.iterations; i++ { + wrappedMetricSet.fetch(context.TODO(), mr) + if tc.assertIteration != nil { + tc.assertIteration(t, i, wrappedMetricSet, mrf, mpr, msr) + } + } }) - require.NoError(t, err) + } + }) + + t.Run("ReportingMetricSetV2WithContext", func(t *testing.T) { + // These tests are the same as ReportingMetricSetV2Error, duplicated here because the generic solution to specify + // testcases only once is awkward and not very readable + + type setupFunc func(t *testing.T, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) + type postIterationAssertFunc func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) + + fetchCtx := context.TODO() + + testcases := []struct { + name string + config *conf.C + setup setupFunc + iterations int + assertIteration postIterationAssertFunc + }{ + { + name: "no failureThreshold: status DEGRADED after first error", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will immediately error out + fetcher.On("Fetch", fetchCtx, pushReporter).Return(fetchError).Once() + + // expect the error to be propagated via the pushReporter + pushReporter.On("Error", fetchError).Return(true).Once() + // expect the status degraded to be set + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() + }, + iterations: 1, + assertIteration: nil, + }, + { + name: "no failureThreshold: status DEGRADED after first error, reset to Running after first successful fetch", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will immediately error out 3 times + fetcher.On("Fetch", fetchCtx, pushReporter).Return(fetchError).Times(3) + // fetcher will never error again afterwards + fetcher.On("Fetch", fetchCtx, pushReporter).Return(nil) + // expect the error to be propagated via the pushReporter + pushReporter.On("Error", fetchError).Return(true).Times(3) + // expect the status degraded to be set 3 times + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Times(3) + // expect the status Running to be set once fetch recovers + statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Twice() + }, + iterations: 5, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 3: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i >= 3: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream set to running at iteration %d", i) + } + }, + }, + { + name: "failureThreshold = 3: status DEGRADED at the 3rd error", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + "failureThreshold": 3, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will immediately error out 3 times in a row + fetcher.On("Fetch", fetchCtx, pushReporter).Return(fetchError).Times(3) + // expect the error to be propagated via the pushReporter at every iteration + pushReporter.On("Error", fetchError).Return(true).Times(3) + // expect the status degraded to be set + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() + }, + iterations: 3, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 2: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i == 2: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream not yet degraded at iteration %d", i) + } + }, + }, + { + name: "failureThreshold = 3: status HEALTHY after 2 errors, 1 success and 2 more errors, DEGRADED at the 3rd consecutive error", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + "failureThreshold": 3, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will error out 2 times in a row + fetcher.On("Fetch", fetchCtx, pushReporter).Return(fetchError).Times(2) + // fetcher will then succeed once + fetcher.On("Fetch", fetchCtx, pushReporter).Return(nil).Once() + // fetcher will error out 3 more times in a row + fetcher.On("Fetch", fetchCtx, pushReporter).Return(fetchError).Times(3) + + // expect the error to be propagated via the pushReporter at every failing iteration + pushReporter.On("Error", fetchError).Return(true).Times(5) + // expect the status running to be set when there's no error returned by the fetcher at the 3rd iteration + statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Once() + // expect the status degraded to be set only once + statusReporter.On("UpdateStatus", status.Degraded, mock.AnythingOfType("string")).Once() + }, + iterations: 6, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 2: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i >= 2 && i < 5: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i == 5: + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream not yet degraded at iteration %d", i) + } + }, + }, + { + name: "failureThreshold = 0: stream status update never become DEGRADED", + config: newConfig(t, map[string]interface{}{ + "module": mockModuleName, + "metricsets": []string{mockMetricSetName}, + "period": "100ms", + "hosts": []string{"testhost"}, + "failureThreshold": 0, + }), + setup: func(t *testing.T, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + // fetcher will error out 9 times in a row + fetcher.On("Fetch", fetchCtx, pushReporter).Return(fetchError).Times(9) + // fetcher will then succeed once + fetcher.On("Fetch", fetchCtx, pushReporter).Return(nil).Once() + + // expect the error to be propagated via the pushReporter at every failing iteration + pushReporter.On("Error", fetchError).Return(true).Times(9) + // expect the status running to be set when there's no error returned by the fetcher at the 10th iteration + statusReporter.On("UpdateStatus", status.Running, mock.AnythingOfType("string")).Once() + }, + iterations: 10, + assertIteration: func(t *testing.T, i int, msWrapper *metricSetWrapper, fetcher *mockReportingFetcherWithContext, pushReporter *mockPushReporterV2, statusReporter *mockStatusReporter) { + t.Logf("Assertion after iteration %d", i) + switch { + case i < 9: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + case i == 9: + assert.Truef(t, statusReporter.AssertNotCalled(t, "UpdateStatus", status.Degraded, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + assert.Truef(t, statusReporter.AssertCalled(t, "UpdateStatus", status.Running, mock.AnythingOfType("string")), "stream degraded at iteration %d", i) + } + }, + }, + } - aModule, metricSets, err := mb.NewModule(tc.config, r) - require.NoError(t, err) + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + // Setup mock push reporter + mpr := new(mockPushReporterV2) - // Set the mock status reporter - aModule.SetStatusReporter(msr) + // Setup mock fetcher + mrf := new(mockReportingFetcherWithContext) - moduleWrapper, err := NewWrapperForMetricSet(aModule, metricSets[0], WithMetricSetInfo()) - require.NoError(t, err) + // Setup mock StatusReporter + msr := new(mockStatusReporter) - // run metricset synchronously - wrappedMetricSet := moduleWrapper.MetricSets()[0] - for i := 0; i < tc.iterations; i++ { - wrappedMetricSet.fetch(context.TODO(), mr) - if tc.assertIteration != nil { - tc.assertIteration(t, i, wrappedMetricSet, mrf, mpr, msr) + //Setup mock reporter (ensure proper handling of intermediate calls, no functional value here) + mr := new(mockReporter) + mr.On("StartFetchTimer").Return() + mr.On("V2").Return(mpr) + + // assert mocks expectations + t.Cleanup(func() { + mock.AssertExpectationsForObjects(t, mrf, mr, mpr, msr) + }) + + // setup mocks before starting the test + if tc.setup != nil { + tc.setup(t, mrf, mpr, msr) } - } - }) - } + + // add metricset in registry + r := mb.NewRegister() + err := r.AddMetricSet(mockModuleName, mockMetricSetName, func(base mb.BaseMetricSet) (mb.MetricSet, error) { + mrf.BaseMetricSet = base + return mrf, nil + }) + require.NoError(t, err) + + aModule, metricSets, err := mb.NewModule(tc.config, r) + require.NoError(t, err) + + // Set the mock status reporter + aModule.SetStatusReporter(msr) + + moduleWrapper, err := NewWrapperForMetricSet(aModule, metricSets[0], WithMetricSetInfo()) + require.NoError(t, err) + + // run metricset synchronously + wrappedMetricSet := moduleWrapper.MetricSets()[0] + for i := 0; i < tc.iterations; i++ { + wrappedMetricSet.fetch(context.TODO(), mr) + if tc.assertIteration != nil { + tc.assertIteration(t, i, wrappedMetricSet, mrf, mpr, msr) + } + } + }) + } + }) } func newConfig(t testing.TB, moduleConfig interface{}) *conf.C {