diff --git a/pkg/stanza/fileconsumer/benchmark_test.go b/pkg/stanza/fileconsumer/benchmark_test.go index 204329106abc..9a9c161a5395 100644 --- a/pkg/stanza/fileconsumer/benchmark_test.go +++ b/pkg/stanza/fileconsumer/benchmark_test.go @@ -5,10 +5,12 @@ package fileconsumer import ( "context" + "fmt" "os" "path/filepath" "sync" "testing" + "time" "github.com/stretchr/testify/require" @@ -23,22 +25,6 @@ type fileInputBenchmark struct { config func() *Config } -type benchFile struct { - *os.File - log func(int) -} - -func simpleTextFile(b *testing.B, file *os.File) *benchFile { - line := string(filetest.TokenWithLength(49)) + "\n" - return &benchFile{ - File: file, - log: func(_ int) { - _, err := file.WriteString(line) - require.NoError(b, err) - }, - } -} - func BenchmarkFileInput(b *testing.B) { cases := []fileInputBenchmark{ { @@ -144,16 +130,50 @@ func BenchmarkFileInput(b *testing.B) { return cfg }, }, + { + name: "Many", + paths: func() []string { + paths := make([]string, 100) + for i := range paths { + paths[i] = fmt.Sprintf("file%d.log", i) + } + return paths + }(), + config: func() *Config { + cfg := NewConfig() + cfg.Include = []string{"file*.log"} + cfg.MaxConcurrentFiles = 100 + return cfg + }, + }, + } + + // Pregenerate some lines which we can write to the files + // to avoid measuring the time it takes to generate them + // and to reduce the amount of syscalls in the benchmark. + uniqueLines := 10 + severalLines := "" + for i := 0; i < uniqueLines; i++ { + severalLines += string(filetest.TokenWithLength(999)) + "\n" } for _, bench := range cases { b.Run(bench.name, func(b *testing.B) { rootDir := b.TempDir() - var files []*benchFile + var files []*os.File for _, path := range bench.paths { - file := filetest.OpenFile(b, filepath.Join(rootDir, path)) - files = append(files, simpleTextFile(b, file)) + f := filetest.OpenFile(b, filepath.Join(rootDir, path)) + // Initialize the file to ensure a unique fingerprint + _, err := f.WriteString(f.Name() + "\n") + require.NoError(b, err) + // Write half the content before starting the benchmark + for i := 0; i < b.N/2; i++ { + _, err := f.WriteString(severalLines) + require.NoError(b, err) + } + require.NoError(b, f.Sync()) + files = append(files, f) } cfg := bench.config() @@ -161,46 +181,47 @@ func BenchmarkFileInput(b *testing.B) { cfg.Include[i] = filepath.Join(rootDir, inc) } cfg.StartAt = "beginning" + // Use aggressive poll interval so we're not measuring excess sleep time + cfg.PollInterval = time.Microsecond - received := make(chan []byte) + doneChan := make(chan bool, len(files)) callback := func(_ context.Context, token []byte, _ map[string]any) error { - received <- token + if len(token) == 0 { + doneChan <- true + } return nil } op, err := cfg.Build(testutil.Logger(b), callback) require.NoError(b, err) - // write half the lines before starting - mid := b.N / 2 - for i := 0; i < mid; i++ { - for _, file := range files { - file.log(i) - } - } - b.ResetTimer() - err = op.Start(testutil.NewUnscopedMockPersister()) + require.NoError(b, op.Start(testutil.NewUnscopedMockPersister())) defer func() { require.NoError(b, op.Stop()) }() - require.NoError(b, err) - // write the remainder of lines while running var wg sync.WaitGroup - wg.Add(1) - go func() { - for i := mid; i < b.N; i++ { - for _, file := range files { - file.log(i) + for _, file := range files { + wg.Add(1) + go func(f *os.File) { + defer wg.Done() + // Write the other half of the content while running + for i := 0; i < b.N/2; i++ { + _, err := f.WriteString(severalLines) + require.NoError(b, err) } - } - wg.Done() - }() - wg.Wait() + // Signal end of file + _, err := f.WriteString("\n") + require.NoError(b, err) + require.NoError(b, f.Sync()) + }(file) + } - for i := 0; i < b.N*len(files); i++ { - <-received + // Timer continues to run until all files have been read + for dones := 0; dones < len(files); dones++ { + <-doneChan } + wg.Wait() }) } }