Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pkg/stanza/fileconsumer] Add ability to read files asynchronously #23056

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"fmt"
"time"

"go.opentelemetry.io/collector/extension/experimental/storage"

"github.com/bmatcuk/doublestar/v4"
"go.opentelemetry.io/collector/featuregate"
"go.uber.org/zap"
Expand All @@ -28,6 +30,13 @@ var allowFileDeletion = featuregate.GlobalRegistry().MustRegister(
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16314"),
)

var useThreadPool = featuregate.GlobalRegistry().MustRegister(
"filelog.useThreadPool",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("When enabled, log collection switches to a thread pool model, respecting the `poll_interval` config."),
// featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16314"),
)

var AllowHeaderMetadataParsing = featuregate.GlobalRegistry().MustRegister(
"filelog.allowHeaderMetadataParsing",
featuregate.StageAlpha,
Expand Down Expand Up @@ -139,8 +148,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory s
return nil, fmt.Errorf("failed to build header config: %w", err)
}
}

return &Manager{
manager := Manager{
SugaredLogger: logger.With("component", "fileconsumer"),
cancel: func() {},
readerFactory: readerFactory{
Expand All @@ -163,7 +171,14 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory s
deleteAfterRead: c.DeleteAfterRead,
knownFiles: make([]*Reader, 0, 10),
seenPaths: make(map[string]struct{}, 100),
}, nil
persister: storage.NewNopClient(),
}
if useThreadPool.IsEnabled() {
manager.readerChan = make(chan readerWrapper, c.MaxConcurrentFiles)
manager.saveReaders = make(chan readerWrapper, c.MaxConcurrentFiles)
manager.trie = NewTrie()
}
return &manager, nil
}

func (c Config) validate() error {
Expand Down
68 changes: 58 additions & 10 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ type Manager struct {
seenPaths map[string]struct{}

currentFps []*Fingerprint

// Following fields are used only when useThreadPool is enabled
workerWg sync.WaitGroup
_workerWg sync.WaitGroup
knownFilesLock sync.RWMutex

readerChan chan readerWrapper
saveReaders chan readerWrapper
trieLock sync.RWMutex

// TRIE - this data structure stores the fingerprint of the files which are currently being consumed
trie *Trie
}

func (m *Manager) Start(persister operator.Persister) error {
Expand All @@ -56,6 +68,17 @@ func (m *Manager) Start(persister operator.Persister) error {
"exclude", m.finder.Exclude)
}

// If useThreadPool is enabled, kick off the worker threads
if useThreadPool.IsEnabled() {
m.readerChan = make(chan readerWrapper, m.maxBatchFiles*2)
m.saveReaders = make(chan readerWrapper, m.maxBatchFiles*2)
for i := 0; i < m.maxBatchFiles; i++ {
m.workerWg.Add(1)
go m.worker(ctx)
}
m._workerWg.Add(1)
go m.saveReadersConcurrent()
}
// Start polling goroutine
m.startPoller(ctx)

Expand All @@ -66,6 +89,23 @@ func (m *Manager) Start(persister operator.Persister) error {
func (m *Manager) Stop() error {
m.cancel()
m.wg.Wait()
if useThreadPool.IsEnabled() {
if m.readerChan != nil {
close(m.readerChan)
}
m.workerWg.Wait()
if m.saveReaders != nil {
close(m.saveReaders)
}
m._workerWg.Wait()
// save off any files left
// As we already cancelled our current context, create a new one to save any left offsets
// This is only applicable for `filelog.useThreadPool` featuregate
ctx, cancel := context.WithCancel(context.Background())
m.syncLastPollFiles(ctx)
cancel()
}

m.roller.cleanup()
for _, reader := range m.knownFiles {
reader.Close()
Expand All @@ -90,8 +130,11 @@ func (m *Manager) startPoller(ctx context.Context) {
return
case <-globTicker.C:
}

m.poll(ctx)
if useThreadPool.IsEnabled() {
m.pollConcurrent(ctx)
} else {
m.poll(ctx)
}
}
}()
}
Expand Down Expand Up @@ -125,6 +168,18 @@ func (m *Manager) poll(ctx context.Context) {
m.consume(ctx, matches)
}

func (m *Manager) readToEnd(ctx context.Context, r *Reader) bool {
r.ReadToEnd(ctx)
if m.deleteAfterRead && r.eof {
r.Close()
if err := os.Remove(r.file.Name()); err != nil {
m.Errorf("could not delete %s", r.file.Name())
}
return true
}
return false
}

func (m *Manager) consume(ctx context.Context, paths []string) {
m.Debug("Consuming files")
readers := make([]*Reader, 0, len(paths))
Expand All @@ -145,14 +200,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
wg.Add(1)
go func(r *Reader) {
defer wg.Done()
r.ReadToEnd(ctx)
// Delete a file if deleteAfterRead is enabled and we reached the end of the file
if m.deleteAfterRead && r.eof {
r.Close()
if err := os.Remove(r.file.Name()); err != nil {
m.Errorf("could not delete %s", r.file.Name())
}
}
m.readToEnd(ctx, r)
}(reader)
}
wg.Wait()
Expand Down
50 changes: 50 additions & 0 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"os"

"path/filepath"
"runtime"
"strconv"
Expand All @@ -24,6 +25,22 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
)

func TestMain(m *testing.M) {
// Run once with thread pool featuregate disabled
code := m.Run()
if code > 0 {
os.Exit(code)
}

// Run once with thread pool featuregate enabled
featuregate.GlobalRegistry().Set(useThreadPool.ID(), true) //nolint:all
code = m.Run()
if code > 0 {
os.Exit(code)
}
featuregate.GlobalRegistry().Set(useThreadPool.ID(), false) //nolint:all
}

func TestCleanStop(t *testing.T) {
t.Parallel()
t.Skip(`Skipping due to goroutine leak in opencensus.
Expand Down Expand Up @@ -392,6 +409,9 @@ func TestReadNewLogs(t *testing.T) {
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
operator, emitCalls := buildTestManager(t, cfg)
if useThreadPool.IsEnabled() {
operator, emitCalls = buildTestManager(t, cfg, withReaderChan())
}
operator.persister = testutil.NewMockPersister("test")

// Poll once so we know this isn't a new file
Expand Down Expand Up @@ -420,6 +440,9 @@ func TestReadExistingAndNewLogs(t *testing.T) {
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
operator, emitCalls := buildTestManager(t, cfg)
if useThreadPool.IsEnabled() {
operator, emitCalls = buildTestManager(t, cfg, withReaderChan())
}
operator.persister = testutil.NewMockPersister("test")

// Start with a file with an entry in it, and expect that entry
Expand All @@ -444,6 +467,9 @@ func TestStartAtEnd(t *testing.T) {
tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
operator, emitCalls := buildTestManager(t, cfg)
if useThreadPool.IsEnabled() {
operator, emitCalls = buildTestManager(t, cfg, withReaderChan())
}
operator.persister = testutil.NewMockPersister("test")

temp := openTemp(t, tempDir)
Expand Down Expand Up @@ -472,6 +498,9 @@ func TestStartAtEndNewFile(t *testing.T) {
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
operator, emitCalls := buildTestManager(t, cfg)
if useThreadPool.IsEnabled() {
operator, emitCalls = buildTestManager(t, cfg, withReaderChan())
}
operator.persister = testutil.NewMockPersister("test")

operator.poll(context.Background())
Expand Down Expand Up @@ -589,6 +618,9 @@ func TestSplitWrite(t *testing.T) {
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
operator, emitCalls := buildTestManager(t, cfg)
if useThreadPool.IsEnabled() {
operator, emitCalls = buildTestManager(t, cfg, withReaderChan())
}
operator.persister = testutil.NewMockPersister("test")

temp := openTemp(t, tempDir)
Expand All @@ -609,6 +641,9 @@ func TestIgnoreEmptyFiles(t *testing.T) {
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
operator, emitCalls := buildTestManager(t, cfg)
if useThreadPool.IsEnabled() {
operator, emitCalls = buildTestManager(t, cfg, withReaderChan())
}
operator.persister = testutil.NewMockPersister("test")

temp := openTemp(t, tempDir)
Expand Down Expand Up @@ -863,6 +898,9 @@ func TestFileBatching(t *testing.T) {
cfg.MaxBatches = maxBatches
emitCalls := make(chan *emitParams, files*linesPerFile)
operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls))
if useThreadPool.IsEnabled() {
operator, _ = buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan())
}
operator.persister = testutil.NewMockPersister("test")

core, observedLogs := observer.New(zap.DebugLevel)
Expand Down Expand Up @@ -1220,6 +1258,9 @@ func TestDeleteAfterRead(t *testing.T) {
emitCalls := make(chan *emitParams, totalLines)
operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls))

if useThreadPool.IsEnabled() {
operator, _ = buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan())
}
operator.poll(context.Background())
actualTokens = append(actualTokens, waitForNTokens(t, emitCalls, totalLines)...)

Expand Down Expand Up @@ -1250,6 +1291,9 @@ func TestMaxBatching(t *testing.T) {
cfg.MaxBatches = maxBatches
emitCalls := make(chan *emitParams, files*linesPerFile)
operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls))
if useThreadPool.IsEnabled() {
operator, _ = buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan())
}
operator.persister = testutil.NewMockPersister("test")

core, observedLogs := observer.New(zap.DebugLevel)
Expand Down Expand Up @@ -1365,6 +1409,9 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {
cfg.DeleteAfterRead = true
emitCalls := make(chan *emitParams, longFileLines+1)
operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls))
if useThreadPool.IsEnabled() {
operator, _ = buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan())
}
operator.persister = testutil.NewMockPersister("test")

shortFile := openTemp(t, tempDir)
Expand Down Expand Up @@ -1514,6 +1561,9 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) {
cfg.FingerprintSize = 18
cfg.StartAt = "beginning"
operator, emitCalls := buildTestManager(t, cfg)
if useThreadPool.IsEnabled() {
operator, emitCalls = buildTestManager(t, cfg, withReaderChan())
}
operator.persister = testutil.NewMockPersister("test")

// Both of they will be include
Expand Down
Loading