Skip to content

Commit

Permalink
[pkg/stanza/fileconsumer] Reduce noise in benchmark (open-telemetry#3…
Browse files Browse the repository at this point in the history
…1516)

This PR reworks the end-to-end benchmark in `pkg/stanza/fileconsumer`.

The primary goal is to surface a more accurate representation of
performance within the benchmark. Several factors were leading to
artificial delays which I believe may have drowned out actual impacts.
Three significant sources of noise were mitigated:
1. Every log read was passed to a channel. This caused us to measure lot
of time waiting for channels. Now, we only signal when the file is fully
consumed.
2. Every line was generated on the fly. Now we generate a set of lines
ahead of time and write these over and over.
3. Each line was written to file independently. Now we group the
pre-generated lines write them all at once. We also write longer lines
so that much more content is written per call.

The overall result is a substantially clearer CPU profile and a slightly
clearer memory profile.

CPU Profile Before
<img width="860" alt="image"
src="https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/5255616/a8bcd396-8c4a-4dcb-8c0b-0a3cbc852a86">

CPU Profile After
<img width="859" alt="image"
src="https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/5255616/14ee9a7b-af2f-4df5-a8a5-10cb8830c9b6">

Memory Profile Before
<img width="859" alt="image"
src="https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/5255616/86604420-754b-43f2-a143-39f60faed9d8">

Memory Profile After
<img width="860" alt="image"
src="https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/5255616/6752be62-2d73-4f9d-bcea-aa5835cce601">

Additionally, this adds a new benchmark to cover a scenario with many
(100) files. Finally, it reduces the time to run the benchmark from ~30s
to ~20s.
  • Loading branch information
djaglowski authored Mar 28, 2024
1 parent c3eb407 commit b8efeeb
Showing 1 changed file with 64 additions and 43 deletions.
107 changes: 64 additions & 43 deletions pkg/stanza/fileconsumer/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package fileconsumer

import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand All @@ -23,22 +25,6 @@ type fileInputBenchmark struct {
config func() *Config
}

type benchFile struct {
*os.File
log func(int)
}

func simpleTextFile(b *testing.B, file *os.File) *benchFile {
line := string(filetest.TokenWithLength(49)) + "\n"
return &benchFile{
File: file,
log: func(_ int) {
_, err := file.WriteString(line)
require.NoError(b, err)
},
}
}

func BenchmarkFileInput(b *testing.B) {
cases := []fileInputBenchmark{
{
Expand Down Expand Up @@ -144,63 +130,98 @@ func BenchmarkFileInput(b *testing.B) {
return cfg
},
},
{
name: "Many",
paths: func() []string {
paths := make([]string, 100)
for i := range paths {
paths[i] = fmt.Sprintf("file%d.log", i)
}
return paths
}(),
config: func() *Config {
cfg := NewConfig()
cfg.Include = []string{"file*.log"}
cfg.MaxConcurrentFiles = 100
return cfg
},
},
}

// Pregenerate some lines which we can write to the files
// to avoid measuring the time it takes to generate them
// and to reduce the amount of syscalls in the benchmark.
uniqueLines := 10
severalLines := ""
for i := 0; i < uniqueLines; i++ {
severalLines += string(filetest.TokenWithLength(999)) + "\n"
}

for _, bench := range cases {
b.Run(bench.name, func(b *testing.B) {
rootDir := b.TempDir()

var files []*benchFile
var files []*os.File
for _, path := range bench.paths {
file := filetest.OpenFile(b, filepath.Join(rootDir, path))
files = append(files, simpleTextFile(b, file))
f := filetest.OpenFile(b, filepath.Join(rootDir, path))
// Initialize the file to ensure a unique fingerprint
_, err := f.WriteString(f.Name() + "\n")
require.NoError(b, err)
// Write half the content before starting the benchmark
for i := 0; i < b.N/2; i++ {
_, err := f.WriteString(severalLines)
require.NoError(b, err)
}
require.NoError(b, f.Sync())
files = append(files, f)
}

cfg := bench.config()
for i, inc := range cfg.Include {
cfg.Include[i] = filepath.Join(rootDir, inc)
}
cfg.StartAt = "beginning"
// Use aggressive poll interval so we're not measuring excess sleep time
cfg.PollInterval = time.Microsecond

received := make(chan []byte)
doneChan := make(chan bool, len(files))
callback := func(_ context.Context, token []byte, _ map[string]any) error {
received <- token
if len(token) == 0 {
doneChan <- true
}
return nil
}
op, err := cfg.Build(testutil.Logger(b), callback)
require.NoError(b, err)

// write half the lines before starting
mid := b.N / 2
for i := 0; i < mid; i++ {
for _, file := range files {
file.log(i)
}
}

b.ResetTimer()
err = op.Start(testutil.NewUnscopedMockPersister())
require.NoError(b, op.Start(testutil.NewUnscopedMockPersister()))
defer func() {
require.NoError(b, op.Stop())
}()
require.NoError(b, err)

// write the remainder of lines while running
var wg sync.WaitGroup
wg.Add(1)
go func() {
for i := mid; i < b.N; i++ {
for _, file := range files {
file.log(i)
for _, file := range files {
wg.Add(1)
go func(f *os.File) {
defer wg.Done()
// Write the other half of the content while running
for i := 0; i < b.N/2; i++ {
_, err := f.WriteString(severalLines)
require.NoError(b, err)
}
}
wg.Done()
}()
wg.Wait()
// Signal end of file
_, err := f.WriteString("\n")
require.NoError(b, err)
require.NoError(b, f.Sync())
}(file)
}

for i := 0; i < b.N*len(files); i++ {
<-received
// Timer continues to run until all files have been read
for dones := 0; dones < len(files); dones++ {
<-doneChan
}
wg.Wait()
})
}
}

0 comments on commit b8efeeb

Please sign in to comment.