From f420d53a35bbc5c994de3479b04210a93d134eaf Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Mon, 23 Oct 2023 20:35:29 -0600 Subject: [PATCH] [chore][pkg/stanza] Ensure all start/stop calls in are balanced (#28294) Follows https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/28228 This normalizes calls to `Start` and `Stop` across the test suite. In some cases, `poll` is called directly in order to trigger behavior independently of timing. However, we should _either_ use `poll` directly, or use both `Start` and `Stop` exactly once. In the future, I expect `poll` will be exported and tested directly as part of an internal package. --- pkg/stanza/fileconsumer/file_test.go | 25 ++++++------------------ pkg/stanza/fileconsumer/rotation_test.go | 22 ++------------------- 2 files changed, 8 insertions(+), 39 deletions(-) diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 965de97e4d0a..3645d309d290 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -79,11 +79,8 @@ See this issue for details: https://github.com/census-instrumentation/opencensus operator, _ := buildTestManager(t, cfg) _ = openTemp(t, tempDir) - err := operator.Start(testutil.NewUnscopedMockPersister()) - require.NoError(t, err) - defer func() { - require.NoError(t, operator.Stop()) - }() + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) + require.NoError(t, operator.Stop()) } // AddFields tests that the `log.file.name` and `log.file.path` fields are included @@ -491,9 +488,6 @@ func TestReadNewLogs(t *testing.T) { // Poll once so we know this isn't a new file operator.poll(context.Background()) - defer func() { - require.NoError(t, operator.Stop()) - }() // Create a new file temp := openTemp(t, tempDir) @@ -1583,14 +1577,13 @@ func TestHeaderPersistance(t *testing.T) { writeString(t, temp, "#headerField: headerValue\nlog line\n") persister := testutil.NewUnscopedMockPersister() - require.NoError(t, op1.Start(persister)) + require.NoError(t, op1.Start(persister)) waitForTokenWithAttributes(t, emitCalls1, []byte("log line"), map[string]any{ "header_key": "headerField", "header_value": "headerValue", attrs.LogFileName: filepath.Base(temp.Name()), }) - require.NoError(t, op1.Stop()) writeString(t, temp, "log line 2\n") @@ -1598,13 +1591,11 @@ func TestHeaderPersistance(t *testing.T) { op2, emitCalls2 := buildTestManager(t, cfg) require.NoError(t, op2.Start(persister)) - waitForTokenWithAttributes(t, emitCalls2, []byte("log line 2"), map[string]any{ "header_key": "headerField", "header_value": "headerValue", attrs.LogFileName: filepath.Base(temp.Name()), }) - require.NoError(t, op2.Stop()) } @@ -1626,12 +1617,10 @@ func TestHeaderPersistanceInHeader(t *testing.T) { writeString(t, temp, "|headerField1: headerValue1\n") persister := testutil.NewUnscopedMockPersister() - require.NoError(t, op1.Start(persister)) - - // The operator will poll at fixed time intervals, but we just want to make sure at least - // one poll operation occurs between now and when we stop. - op1.poll(context.Background()) + // Start and stop the operator, ensuring that at least one poll cycle occurs in between + require.NoError(t, op1.Start(persister)) + time.Sleep(2 * cfg1.PollInterval) require.NoError(t, op1.Stop()) writeString(t, temp, "|headerField2: headerValue2\nlog line\n") @@ -1643,13 +1632,11 @@ func TestHeaderPersistanceInHeader(t *testing.T) { op2, emitCalls := buildTestManager(t, cfg2) require.NoError(t, op2.Start(persister)) - waitForTokenWithAttributes(t, emitCalls, []byte("log line"), map[string]any{ "header_value_1": "headerValue1", "header_value_2": "headerValue2", attrs.LogFileName: filepath.Base(temp.Name()), }) - require.NoError(t, op2.Stop()) } diff --git a/pkg/stanza/fileconsumer/rotation_test.go b/pkg/stanza/fileconsumer/rotation_test.go index 9d7225dc0174..f016149bb416 100644 --- a/pkg/stanza/fileconsumer/rotation_test.go +++ b/pkg/stanza/fileconsumer/rotation_test.go @@ -365,10 +365,6 @@ func TestMoveFile(t *testing.T) { temp1.Close() operator.poll(context.Background()) - defer func() { - require.NoError(t, operator.Stop()) - }() - waitForToken(t, emitCalls, []byte("testlog1")) // Wait until all goroutines are finished before renaming @@ -397,10 +393,6 @@ func TestTrackMovedAwayFiles(t *testing.T) { temp1.Close() operator.poll(context.Background()) - defer func() { - require.NoError(t, operator.Stop()) - }() - waitForToken(t, emitCalls, []byte("testlog1")) // Wait until all goroutines are finished before renaming @@ -557,12 +549,7 @@ func TestTruncateThenWrite(t *testing.T) { writeString(t, temp1, "testlog1\ntestlog2\n") operator.poll(context.Background()) - defer func() { - require.NoError(t, operator.Stop()) - }() - - waitForToken(t, emitCalls, []byte("testlog1")) - waitForToken(t, emitCalls, []byte("testlog2")) + waitForTokens(t, emitCalls, []byte("testlog1"), []byte("testlog2")) require.NoError(t, temp1.Truncate(0)) _, err := temp1.Seek(0, 0) @@ -594,12 +581,7 @@ func TestCopyTruncateWriteBoth(t *testing.T) { writeString(t, temp1, "testlog1\ntestlog2\n") operator.poll(context.Background()) - defer func() { - require.NoError(t, operator.Stop()) - }() - - waitForToken(t, emitCalls, []byte("testlog1")) - waitForToken(t, emitCalls, []byte("testlog2")) + waitForTokens(t, emitCalls, []byte("testlog1"), []byte("testlog2")) operator.wg.Wait() // wait for all goroutines to finish // Copy the first file to a new file, and add another log