Skip to content

Commit

Permalink
[pkg/stanza/fileconsumer] Rework benchmark to better reflect package …
Browse files Browse the repository at this point in the history
…performance
  • Loading branch information
djaglowski committed Mar 28, 2024
1 parent 7b7ec2b commit 5a9555e
Showing 1 changed file with 48 additions and 54 deletions.
102 changes: 48 additions & 54 deletions pkg/stanza/fileconsumer/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package fileconsumer

import (
"context"
"crypto/rand"
"fmt"
"os"
"path/filepath"
Expand All @@ -26,45 +25,6 @@ type fileInputBenchmark struct {
config func() *Config
}

type benchFile struct {
*os.File
content []byte
}

func simpleTextFile(file *os.File, numLines int, lineLen int) *benchFile {
distinctFirstLine := fmt.Sprintf("%s\n", file.Name())
numBytes := len(distinctFirstLine) + numLines*lineLen + 1 // +1 for the newline signaling EOF
content := make([]byte, numBytes)
rand.Read(content) // fill with random bytes

// In order to efficiently detect EOF, we want it to end with an empty newline.
// We'll just remove all newlines and then add new ones in a controlled manner.
nonNewline := content[0]
for i := 1; nonNewline == '\n'; i++ {
nonNewline = content[i]
}

// Copy in the distinct first line
copy(content[:len(distinctFirstLine)], []byte(distinctFirstLine))

// Remove all newlines after the first line
for i := len(distinctFirstLine); i < len(content); i++ {
if content[i] == '\n' {
content[i] = nonNewline
}
}

// Add newlines to the end of each line
for i := len(distinctFirstLine) + lineLen - 1; i < len(content)-1; i += lineLen {
content[i] = '\n'
}

// Overwrite the last rune with a newline to signal EOF
content[len(content)-1] = '\n'

return &benchFile{File: file, content: content}
}

func BenchmarkFileInput(b *testing.B) {
cases := []fileInputBenchmark{
{
Expand Down Expand Up @@ -170,25 +130,59 @@ func BenchmarkFileInput(b *testing.B) {
return cfg
},
},
{
name: "Many",
paths: func() []string {
paths := make([]string, 100)
for i := range paths {
paths[i] = fmt.Sprintf("file%d.log", i)
}
return paths
}(),
config: func() *Config {
cfg := NewConfig()
cfg.Include = []string{"file*.log"}
cfg.MaxConcurrentFiles = 100
return cfg
},
},
}

// Pregenerate some lines which we can write to the files
// to avoid measuring the time it takes to generate them
// and to reduce the amount of syscalls in the benchmark.
uniqueLines := 10
severalLines := ""
for i := 0; i < uniqueLines; i++ {
severalLines += string(filetest.TokenWithLength(999)) + "\n"
}

for _, bench := range cases {
b.Run(bench.name, func(b *testing.B) {
rootDir := b.TempDir()

var files []*benchFile
var files []*os.File
for _, path := range bench.paths {
file := filetest.OpenFile(b, filepath.Join(rootDir, path))
files = append(files, simpleTextFile(file, b.N, 100))
f := filetest.OpenFile(b, filepath.Join(rootDir, path))
// Initialize the file to ensure a unique fingerprint
_, err := f.WriteString(f.Name() + "\n")
require.NoError(b, err)
// Write half the content before starting the benchmark
for i := 0; i < b.N/2; i++ {
_, err := f.WriteString(severalLines)
require.NoError(b, err)
}
require.NoError(b, f.Sync())
files = append(files, f)
}

cfg := bench.config()
for i, inc := range cfg.Include {
cfg.Include[i] = filepath.Join(rootDir, inc)
}
cfg.StartAt = "beginning"
// Use aggresive poll interval so we're not measuring sleep time
cfg.PollInterval = time.Nanosecond
// Use aggressive poll interval so we're not measuring excess sleep time
cfg.PollInterval = time.Microsecond

doneChan := make(chan bool, len(files))
callback := func(_ context.Context, token []byte, _ map[string]any) error {
Expand All @@ -200,26 +194,26 @@ func BenchmarkFileInput(b *testing.B) {
op, err := cfg.Build(testutil.Logger(b), callback)
require.NoError(b, err)

// Write some of the content before starting
for _, file := range files {
_, err := file.File.Write(file.content[:len(file.content)/2])
require.NoError(b, err)
}

b.ResetTimer()
require.NoError(b, op.Start(testutil.NewUnscopedMockPersister()))
defer func() {
require.NoError(b, op.Stop())
}()

// Write the remainder of content while running
var wg sync.WaitGroup
for _, file := range files {
wg.Add(1)
go func(f *benchFile) {
go func(f *os.File) {
defer wg.Done()
_, err := f.File.Write(f.content[len(f.content)/2:])
// Write the other half of the content while running
for i := 0; i < b.N/2; i++ {
_, err := f.WriteString(severalLines)
require.NoError(b, err)
}
// Signal end of file
_, err := f.WriteString("\n")
require.NoError(b, err)
require.NoError(b, f.Sync())
}(file)
}

Expand Down

0 comments on commit 5a9555e

Please sign in to comment.