Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][pkg/stanaza] Fix and strengthen test case #28228

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,13 +714,13 @@ func TestIgnoreEmptyFiles(t *testing.T) {
writeString(t, temp3, "testlog2\n")
operator.poll(context.Background())

waitForTokens(t, emitCalls, [][]byte{[]byte("testlog1"), []byte("testlog2")})
waitForTokens(t, emitCalls, []byte("testlog1"), []byte("testlog2"))

writeString(t, temp2, "testlog3\n")
writeString(t, temp4, "testlog4\n")
operator.poll(context.Background())

waitForTokens(t, emitCalls, [][]byte{[]byte("testlog3"), []byte("testlog4")})
waitForTokens(t, emitCalls, []byte("testlog3"), []byte("testlog4"))
}

func TestDecodeBufferIsResized(t *testing.T) {
Expand Down Expand Up @@ -762,7 +762,7 @@ func TestMultiFileSimple(t *testing.T) {
require.NoError(t, operator.Stop())
}()

waitForTokens(t, emitCalls, [][]byte{[]byte("testlog1"), []byte("testlog2")})
waitForTokens(t, emitCalls, []byte("testlog1"), []byte("testlog2"))
}

func TestMultiFileSort(t *testing.T) {
Expand Down Expand Up @@ -794,7 +794,7 @@ func TestMultiFileSort(t *testing.T) {
require.NoError(t, operator.Stop())
}()

waitForTokens(t, emitCalls, [][]byte{[]byte("testlog2")})
waitForTokens(t, emitCalls, []byte("testlog2"))
expectNoTokens(t, emitCalls)
}

Expand Down Expand Up @@ -828,7 +828,7 @@ func TestMultiFileSortTimestamp(t *testing.T) {
require.NoError(t, operator.Stop())
}()

waitForTokens(t, emitCalls, [][]byte{[]byte("testlog2")})
waitForTokens(t, emitCalls, []byte("testlog2"))
expectNoTokens(t, emitCalls)
}

Expand Down Expand Up @@ -869,7 +869,7 @@ func TestMultiFileParallel_PreloadedFiles(t *testing.T) {
require.NoError(t, operator.Stop())
}()

waitForTokens(t, emitCalls, expected)
waitForTokens(t, emitCalls, expected...)
wg.Wait()
}

Expand Down Expand Up @@ -914,7 +914,7 @@ func TestMultiFileParallel_LiveFiles(t *testing.T) {
}(temp, i)
}

waitForTokens(t, emitCalls, expected)
waitForTokens(t, emitCalls, expected...)
wg.Wait()
}

Expand Down Expand Up @@ -1110,7 +1110,7 @@ func TestFileBatchingRespectsStartAtEnd(t *testing.T) {

// Poll again and expect one line from each file.
operator.poll(context.Background())
waitForTokens(t, emitChan, expectedTokens)
waitForTokens(t, emitChan, expectedTokens...)
}

func TestFileReader_FingerprintUpdated(t *testing.T) {
Expand Down Expand Up @@ -1358,7 +1358,7 @@ func TestEncodings(t *testing.T) {
require.NoError(t, operator.Stop())
}()

waitForTokens(t, emitCalls, tc.expected)
waitForTokens(t, emitCalls, tc.expected...)
})
}
}
Expand Down Expand Up @@ -1684,6 +1684,6 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) {
operator.poll(context.Background())
// We should have updated the offset for one of the files, so the second file should now
// be ingested from the beginning
waitForTokens(t, emitCalls, [][]byte{[]byte(content), []byte(newContent1), []byte(newContent)})
waitForTokens(t, emitCalls, []byte(content), []byte(newContent1), []byte(newContent))
operator.wg.Wait()
}
48 changes: 25 additions & 23 deletions pkg/stanza/fileconsumer/rotation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestMultiFileRotate(t *testing.T) {
}(temp, i)
}

waitForTokens(t, emitCalls, expected)
waitForTokens(t, emitCalls, expected...)
wg.Wait()
}

Expand Down Expand Up @@ -133,7 +133,7 @@ func TestMultiFileRotateSlow(t *testing.T) {
}(fileNum)
}

waitForTokens(t, emitCalls, expected)
waitForTokens(t, emitCalls, expected...)
wg.Wait()
}

Expand Down Expand Up @@ -195,7 +195,7 @@ func TestMultiCopyTruncateSlow(t *testing.T) {
}(fileNum)
}

waitForTokens(t, emitCalls, expected)
waitForTokens(t, emitCalls, expected...)
wg.Wait()
}

Expand Down Expand Up @@ -458,7 +458,7 @@ func TestTrackRotatedFilesLogOrder(t *testing.T) {
require.NoError(t, err)
writeString(t, newFile, "testlog3\n")

waitForTokens(t, emitCalls, [][]byte{[]byte("testlog2"), []byte("testlog3")})
waitForTokens(t, emitCalls, []byte("testlog2"), []byte("testlog3"))
}

// When a file it rotated out of pattern via move/create, we should
Expand Down Expand Up @@ -498,7 +498,7 @@ func TestRotatedOutOfPatternMoveCreate(t *testing.T) {
operator.poll(context.Background())

// expect remaining log from old file as well as all from new file
waitForTokens(t, emitCalls, [][]byte{[]byte("testlog2"), []byte("testlog4"), []byte("testlog5")})
waitForTokens(t, emitCalls, []byte("testlog2"), []byte("testlog4"), []byte("testlog5"))
}

// When a file it rotated out of pattern via copy/truncate, we should
Expand Down Expand Up @@ -536,7 +536,7 @@ func TestRotatedOutOfPatternCopyTruncate(t *testing.T) {
// poll again
operator.poll(context.Background())

waitForTokens(t, emitCalls, [][]byte{[]byte("testlog4"), []byte("testlog5")})
waitForTokens(t, emitCalls, []byte("testlog4"), []byte("testlog5"))
}

// TruncateThenWrite tests that, after a file has been truncated,
Expand Down Expand Up @@ -618,7 +618,7 @@ func TestCopyTruncateWriteBoth(t *testing.T) {

// Expect both messages to come through
operator.poll(context.Background())
waitForTokens(t, emitCalls, [][]byte{[]byte("testlog3"), []byte("testlog4")})
waitForTokens(t, emitCalls, []byte("testlog3"), []byte("testlog4"))
}

func TestFileMovedWhileOff_BigFiles(t *testing.T) {
Expand All @@ -633,31 +633,33 @@ func TestFileMovedWhileOff_BigFiles(t *testing.T) {
operator, emitCalls := buildTestManager(t, cfg)
persister := testutil.NewUnscopedMockPersister()

log1 := tokenWithLength(1000)
log2 := tokenWithLength(1000)
log1 := tokenWithLength(1001)
log2 := tokenWithLength(1002)
log3 := tokenWithLength(1003)

temp := openTemp(t, tempDir)
tempName := temp.Name()
writeString(t, temp, string(log1)+"\n")
require.NoError(t, temp.Close())

// Start the operator
// Run the operator to read the first log
require.NoError(t, operator.Start(persister))
defer func() {
require.NoError(t, operator.Stop())
}()
waitForToken(t, emitCalls, log1)

// Stop the operator, then rename and write a new log
require.NoError(t, operator.Stop())

err := os.Rename(temp.Name(), fmt.Sprintf("%s2", temp.Name()))
require.NoError(t, err)

temp = reopenTemp(t, temp.Name())
require.NoError(t, err)
// Write one more log to the original file
writeString(t, temp, string(log2)+"\n")
require.NoError(t, temp.Close())

// Rename the file and open another file in the same location
require.NoError(t, os.Rename(tempName, fmt.Sprintf("%s2", tempName)))

// Write a different log to the new file
temp2 := reopenTemp(t, tempName)
writeString(t, temp2, string(log3)+"\n")

// Expect the message written to the new log to come through
require.NoError(t, operator.Start(persister))
waitForToken(t, emitCalls, log2)
operator2, emitCalls2 := buildTestManager(t, cfg)
require.NoError(t, operator2.Start(persister))
waitForTokens(t, emitCalls2, log2, log3)
require.NoError(t, operator2.Stop())
}
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func waitForTokenWithAttributes(t *testing.T, c chan *emitParams, expected []byt
}
}

func waitForTokens(t *testing.T, c chan *emitParams, expected [][]byte) {
func waitForTokens(t *testing.T, c chan *emitParams, expected ...[]byte) {
actual := make([][]byte, 0, len(expected))
LOOP:
for {
Expand Down