diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 6178368cadec..72132f316940 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -9,6 +9,8 @@ import ( "fmt" "time" + "go.opentelemetry.io/collector/extension/experimental/storage" + "github.com/bmatcuk/doublestar/v4" "go.opentelemetry.io/collector/featuregate" "go.uber.org/zap" @@ -28,6 +30,13 @@ var allowFileDeletion = featuregate.GlobalRegistry().MustRegister( featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16314"), ) +var useThreadPool = featuregate.GlobalRegistry().MustRegister( + "filelog.useThreadPool", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("When enabled, log collection switches to a thread pool model, respecting the `poll_interval` config."), + // featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16314"), +) + var AllowHeaderMetadataParsing = featuregate.GlobalRegistry().MustRegister( "filelog.allowHeaderMetadataParsing", featuregate.StageAlpha, @@ -139,8 +148,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory s return nil, fmt.Errorf("failed to build header config: %w", err) } } - - return &Manager{ + manager := Manager{ SugaredLogger: logger.With("component", "fileconsumer"), cancel: func() {}, readerFactory: readerFactory{ @@ -163,7 +171,14 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory s deleteAfterRead: c.DeleteAfterRead, knownFiles: make([]*Reader, 0, 10), seenPaths: make(map[string]struct{}, 100), - }, nil + persister: storage.NewNopClient(), + } + if useThreadPool.IsEnabled() { + manager.readerChan = make(chan readerWrapper, c.MaxConcurrentFiles) + manager.saveReaders = make(chan readerWrapper, c.MaxConcurrentFiles) + manager.trie = NewTrie() + } + return &manager, nil } func (c Config) validate() error { diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index d34e57e8920b..f81f91d2425f 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -38,6 +38,18 @@ type Manager struct { seenPaths map[string]struct{} currentFps []*Fingerprint + + // Following fields are used only when useThreadPool is enabled + workerWg sync.WaitGroup + _workerWg sync.WaitGroup + knownFilesLock sync.RWMutex + + readerChan chan readerWrapper + saveReaders chan readerWrapper + trieLock sync.RWMutex + + // TRIE - this data structure stores the fingerprint of the files which are currently being consumed + trie *Trie } func (m *Manager) Start(persister operator.Persister) error { @@ -56,6 +68,17 @@ func (m *Manager) Start(persister operator.Persister) error { "exclude", m.finder.Exclude) } + // If useThreadPool is enabled, kick off the worker threads + if useThreadPool.IsEnabled() { + m.readerChan = make(chan readerWrapper, m.maxBatchFiles*2) + m.saveReaders = make(chan readerWrapper, m.maxBatchFiles*2) + for i := 0; i < m.maxBatchFiles; i++ { + m.workerWg.Add(1) + go m.worker(ctx) + } + m._workerWg.Add(1) + go m.saveReadersConcurrent() + } // Start polling goroutine m.startPoller(ctx) @@ -66,6 +89,23 @@ func (m *Manager) Start(persister operator.Persister) error { func (m *Manager) Stop() error { m.cancel() m.wg.Wait() + if useThreadPool.IsEnabled() { + if m.readerChan != nil { + close(m.readerChan) + } + m.workerWg.Wait() + if m.saveReaders != nil { + close(m.saveReaders) + } + m._workerWg.Wait() + // save off any files left + // As we already cancelled our current context, create a new one to save any left offsets + // This is only applicable for `filelog.useThreadPool` featuregate + ctx, cancel := context.WithCancel(context.Background()) + m.syncLastPollFiles(ctx) + cancel() + } + m.roller.cleanup() for _, reader := range m.knownFiles { reader.Close() @@ -90,8 +130,11 @@ func (m *Manager) startPoller(ctx context.Context) { return case <-globTicker.C: } - - m.poll(ctx) + if useThreadPool.IsEnabled() { + m.pollConcurrent(ctx) + } else { + m.poll(ctx) + } } }() } @@ -125,6 +168,18 @@ func (m *Manager) poll(ctx context.Context) { m.consume(ctx, matches) } +func (m *Manager) readToEnd(ctx context.Context, r *Reader) bool { + r.ReadToEnd(ctx) + if m.deleteAfterRead && r.eof { + r.Close() + if err := os.Remove(r.file.Name()); err != nil { + m.Errorf("could not delete %s", r.file.Name()) + } + return true + } + return false +} + func (m *Manager) consume(ctx context.Context, paths []string) { m.Debug("Consuming files") readers := make([]*Reader, 0, len(paths)) @@ -145,14 +200,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) { wg.Add(1) go func(r *Reader) { defer wg.Done() - r.ReadToEnd(ctx) - // Delete a file if deleteAfterRead is enabled and we reached the end of the file - if m.deleteAfterRead && r.eof { - r.Close() - if err := os.Remove(r.file.Name()); err != nil { - m.Errorf("could not delete %s", r.file.Name()) - } - } + m.readToEnd(ctx, r) }(reader) } wg.Wait() diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 52d55064b0e2..09e926dc45dc 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "os" + "path/filepath" "runtime" "strconv" @@ -24,6 +25,22 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" ) +func TestMain(m *testing.M) { + // Run once with thread pool featuregate disabled + code := m.Run() + if code > 0 { + os.Exit(code) + } + + // Run once with thread pool featuregate enabled + featuregate.GlobalRegistry().Set(useThreadPool.ID(), true) //nolint:all + code = m.Run() + if code > 0 { + os.Exit(code) + } + featuregate.GlobalRegistry().Set(useThreadPool.ID(), false) //nolint:all +} + func TestCleanStop(t *testing.T) { t.Parallel() t.Skip(`Skipping due to goroutine leak in opencensus. @@ -392,6 +409,9 @@ func TestReadNewLogs(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManager(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") // Poll once so we know this isn't a new file @@ -420,6 +440,9 @@ func TestReadExistingAndNewLogs(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManager(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") // Start with a file with an entry in it, and expect that entry @@ -444,6 +467,9 @@ func TestStartAtEnd(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) operator, emitCalls := buildTestManager(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManager(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") temp := openTemp(t, tempDir) @@ -472,6 +498,9 @@ func TestStartAtEndNewFile(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManager(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") operator.poll(context.Background()) @@ -589,6 +618,9 @@ func TestSplitWrite(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManager(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") temp := openTemp(t, tempDir) @@ -609,6 +641,9 @@ func TestIgnoreEmptyFiles(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManager(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") temp := openTemp(t, tempDir) @@ -863,6 +898,9 @@ func TestFileBatching(t *testing.T) { cfg.MaxBatches = maxBatches emitCalls := make(chan *emitParams, files*linesPerFile) operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls)) + if useThreadPool.IsEnabled() { + operator, _ = buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") core, observedLogs := observer.New(zap.DebugLevel) @@ -1220,6 +1258,9 @@ func TestDeleteAfterRead(t *testing.T) { emitCalls := make(chan *emitParams, totalLines) operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls)) + if useThreadPool.IsEnabled() { + operator, _ = buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan()) + } operator.poll(context.Background()) actualTokens = append(actualTokens, waitForNTokens(t, emitCalls, totalLines)...) @@ -1250,6 +1291,9 @@ func TestMaxBatching(t *testing.T) { cfg.MaxBatches = maxBatches emitCalls := make(chan *emitParams, files*linesPerFile) operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls)) + if useThreadPool.IsEnabled() { + operator, _ = buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") core, observedLogs := observer.New(zap.DebugLevel) @@ -1365,6 +1409,9 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) { cfg.DeleteAfterRead = true emitCalls := make(chan *emitParams, longFileLines+1) operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls)) + if useThreadPool.IsEnabled() { + operator, _ = buildTestManager(t, cfg, withEmitChan(emitCalls), withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") shortFile := openTemp(t, tempDir) @@ -1514,6 +1561,9 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) { cfg.FingerprintSize = 18 cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManager(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") // Both of they will be include diff --git a/pkg/stanza/fileconsumer/file_threadpool.go b/pkg/stanza/fileconsumer/file_threadpool.go new file mode 100755 index 000000000000..e8a046bbe969 --- /dev/null +++ b/pkg/stanza/fileconsumer/file_threadpool.go @@ -0,0 +1,187 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" + +import ( + "context" + "os" + "sync" + + "go.uber.org/zap" +) + +type readerWrapper struct { + reader *Reader + fp *Fingerprint +} + +// poll checks all the watched paths for new entries +func (m *Manager) pollConcurrent(ctx context.Context) { + // Increment the generation on all known readers + // This is done here because the next generation is about to start + + m.knownFilesLock.Lock() + for i := 0; i < len(m.knownFiles); i++ { + m.knownFiles[i].generation++ + } + m.knownFilesLock.Unlock() + + // Get the list of paths on disk + matches := m.finder.FindFiles() + m.consumeConcurrent(ctx, matches) + m.clearCurrentFingerprints() + + // Any new files that appear should be consumed entirely + m.readerFactory.fromBeginning = true + m.syncLastPollFilesConcurrent(ctx) +} + +func (m *Manager) worker(ctx context.Context) { + defer m.workerWg.Done() + for { + chanData, ok := <-m.readerChan + + if !ok { + return + } + r, fp := chanData.reader, chanData.fp + + if !m.readToEnd(ctx, r) { + // Save off any files that were not fully read or if deleteAfterRead is disabled + m.saveReaders <- readerWrapper{reader: r, fp: fp} + } else { + m.removePath(fp) + } + } +} + +func (m *Manager) makeReaderConcurrent(filePath string) (*Reader, *Fingerprint) { + fp, file := m.makeFingerprint(filePath) + if fp == nil { + return nil, nil + } + + // check if the current file is already being consumed + if m.isCurrentlyConsuming(fp) { + if err := file.Close(); err != nil { + m.Errorf("problem closing file", "file", file.Name()) + } + return nil, nil + } + + // Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files + if m.checkDuplicates(fp) { + if err := file.Close(); err != nil { + m.Errorf("problem closing file", "file", file.Name()) + } + return nil, nil + } + m.currentFps = append(m.currentFps, fp) + + reader, err := m.newReaderConcurrent(file, fp) + if err != nil { + m.Errorw("Failed to create reader", zap.Error(err)) + return nil, nil + } + return reader, fp +} + +func (m *Manager) consumeConcurrent(ctx context.Context, paths []string) { + m.Debug("Consuming files") + m.clearOldReadersConcurrent(ctx) + for _, path := range paths { + reader, fp := m.makeReaderConcurrent(path) + if reader != nil { + // add path and fingerprint as it's not consuming + m.trieLock.Lock() + m.trie.Put(fp.FirstBytes, true) + m.trieLock.Unlock() + m.readerChan <- readerWrapper{reader: reader, fp: fp} + } + } +} + +func (m *Manager) isCurrentlyConsuming(fp *Fingerprint) bool { + m.trieLock.RLock() + defer m.trieLock.RUnlock() + return m.trie.Get(fp.FirstBytes) != nil +} + +func (m *Manager) removePath(fp *Fingerprint) { + m.trieLock.Lock() + defer m.trieLock.Unlock() + m.trie.Delete(fp.FirstBytes) +} + +// saveReadersConcurrent adds the readers from this polling interval to this list of +// known files and removes the fingerprint from the TRIE +func (m *Manager) saveReadersConcurrent() { + defer m._workerWg.Done() + // Add readers from the current, completed poll interval to the list of known files + for { + select { + case reader, ok := <-m.saveReaders: + if !ok { + return + } + m.knownFilesLock.Lock() + m.knownFiles = append(m.knownFiles, reader.reader) + m.knownFilesLock.Unlock() + m.removePath(reader.fp) + } + } +} + +func (m *Manager) clearOldReadersConcurrent(ctx context.Context) { + m.knownFilesLock.Lock() + defer m.knownFilesLock.Unlock() + // Clear out old readers. They are sorted such that they are oldest first, + // so we can just find the first reader whose poll cycle is less than our + // limit i.e. last 3 cycles, and keep every reader after that + oldReaders := make([]*Reader, 0) + for i := 0; i < len(m.knownFiles); i++ { + reader := m.knownFiles[i] + if reader.generation <= 2 { + oldReaders = m.knownFiles[:i] + m.knownFiles = m.knownFiles[i:] + break + } + } + var lostWG sync.WaitGroup + for _, reader := range oldReaders { + lostWG.Add(1) + go func(r *Reader) { + defer lostWG.Done() + r.ReadToEnd(ctx) + r.Close() + }(reader) + } + lostWG.Wait() +} + +func (m *Manager) newReaderConcurrent(file *os.File, fp *Fingerprint) (*Reader, error) { + // Check if the new path has the same fingerprint as an old path + if oldReader, ok := m.findFingerprintMatchConcurrent(fp); ok { + return m.readerFactory.copy(oldReader, file) + } + + // If we don't match any previously known files, create a new reader from scratch + return m.readerFactory.newReader(file, fp.Copy()) +} + +func (m *Manager) findFingerprintMatchConcurrent(fp *Fingerprint) (*Reader, bool) { + // Iterate backwards to match newest first + m.knownFilesLock.Lock() + defer m.knownFilesLock.Unlock() + + return m.findFingerprintMatch(fp) +} + +// syncLastPollFiles syncs the most recent set of files to the database +func (m *Manager) syncLastPollFilesConcurrent(ctx context.Context) { + m.knownFilesLock.RLock() + defer m.knownFilesLock.RUnlock() + + m.syncLastPollFiles(ctx) +} diff --git a/pkg/stanza/fileconsumer/rotation_test.go b/pkg/stanza/fileconsumer/rotation_test.go index 5c02fddd86e4..9e4edab590f7 100644 --- a/pkg/stanza/fileconsumer/rotation_test.go +++ b/pkg/stanza/fileconsumer/rotation_test.go @@ -358,6 +358,9 @@ func TestMoveFile(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManager(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) @@ -390,6 +393,9 @@ func TestTrackMovedAwayFiles(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManager(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) @@ -475,6 +481,9 @@ func TestTruncateThenWrite(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManager(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) @@ -512,6 +521,9 @@ func TestCopyTruncateWriteBoth(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManager(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManager(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) diff --git a/pkg/stanza/fileconsumer/trie.go b/pkg/stanza/fileconsumer/trie.go new file mode 100755 index 000000000000..3c5bf82edfd4 --- /dev/null +++ b/pkg/stanza/fileconsumer/trie.go @@ -0,0 +1,99 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// TRIE data structure inspired by https://github.com/dghubble/trie +// This differs from the original trie. + +package fileconsumer + +type Trie struct { + value interface{} + children map[byte]*Trie +} + +// Trie node and the part string key of the child the path descends into. +type nodeTrie struct { + node *Trie + b byte +} + +// NewPathTrie allocates and returns a new *Trie. +func NewTrie() *Trie { + return &Trie{} +} + +func (trie *Trie) Get(key []byte) interface{} { + node := trie + for _, r := range key { + // We have reached end of the current path and all the previous characters have matched + // Return if current node is leaf and it is not root + node = node.children[r] + if node == nil { + return nil + } + if node.isLeaf() && node != trie { + return node.value + } + } + return node.value +} + +// Put inserts the value into the trie at the given key +func (trie *Trie) Put(key []byte, value interface{}) { + node := trie + for _, r := range key { + child, _ := node.children[r] + if child == nil { + if node.children == nil { + node.children = map[byte]*Trie{} + } + child = NewTrie() + node.children[r] = child + + // Assiging value to every child node allows us to detect partial matches. + // For eg. `123451` and `123456789` will match, even if they are not exactly same strings. + // Doing this, we store every prefix of the fingerprint. + node.value = value + } + node = child + } + node.value = value +} + +// Delete removes the value associated with the given key. Returns true if a +// node was found for the given key. If the node or any of its ancestors +// becomes childless as a result, it is removed from the trie. +func (trie *Trie) Delete(key []byte) bool { + var path []*Trie // record ancestors to check later + node := trie + for _, b := range key { + path = append(path, node) + node = node.children[b] + if node == nil { + // node does not exist + return false + } + } + // delete the node value + node.value = nil + // if leaf, remove it from its parent's children map. Repeat for ancestor path. + if node.isLeaf() { + // iterate backwards over path + for i := len(path) - 1; i >= 0; i-- { + parent := path[i] + b := key[i] + delete(parent.children, b) + if !parent.isLeaf() { + // parent has other children, stop + break + } + parent.children = nil + parent.value = nil + } + } + return true // node (internal or not) existed and its value was nil'd +} + +func (trie *Trie) isLeaf() bool { + return len(trie.children) == 0 +} diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index 5f5dc386032d..371b60e72a73 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -66,7 +66,8 @@ type emitParams struct { } type testManagerConfig struct { - emitChan chan *emitParams + emitChan chan *emitParams + initializeChannel bool } type testManagerOption func(*testManagerConfig) @@ -77,6 +78,12 @@ func withEmitChan(emitChan chan *emitParams) testManagerOption { } } +func withReaderChan() testManagerOption { + return func(m *testManagerConfig) { + m.initializeChannel = true + } +} + func buildTestManager(t *testing.T, cfg *Config, opts ...testManagerOption) (*Manager, chan *emitParams) { tmc := &testManagerConfig{emitChan: make(chan *emitParams, 100)} for _, opt := range opts { @@ -84,6 +91,19 @@ func buildTestManager(t *testing.T, cfg *Config, opts ...testManagerOption) (*Ma } input, err := cfg.Build(testutil.Logger(t), testEmitFunc(tmc.emitChan)) require.NoError(t, err) + if tmc.initializeChannel { + input.readerChan = make(chan readerWrapper, cfg.MaxConcurrentFiles) + input.saveReaders = make(chan readerWrapper, cfg.MaxConcurrentFiles) + ctx, cancel := context.WithCancel(context.Background()) + input.cancel = cancel + for i := 0; i < cfg.MaxConcurrentFiles/2; i++ { + input.workerWg.Add(1) + go input.worker(ctx) + } + input._workerWg.Add(1) + go input.saveReadersConcurrent() + } + return input, tmc.emitChan } diff --git a/pkg/stanza/operator/helper/flusher.go b/pkg/stanza/operator/helper/flusher.go index 60efd4868f11..e823ee852983 100644 --- a/pkg/stanza/operator/helper/flusher.go +++ b/pkg/stanza/operator/helper/flusher.go @@ -5,6 +5,7 @@ package helper // import "github.com/open-telemetry/opentelemetry-collector-cont import ( "bufio" + "sync" "time" ) @@ -43,9 +44,12 @@ type Flusher struct { // if previousDataLength = 0 - no new data have been received after flush // if previousDataLength > 0 - there is data which has not been flushed yet and it doesn't changed since lastDataChange previousDataLength int + rwLock sync.RWMutex } func (f *Flusher) UpdateDataChangeTime(length int) { + f.rwLock.Lock() + defer f.rwLock.Unlock() // Skip if length is greater than 0 and didn't changed if length > 0 && length == f.previousDataLength { return @@ -64,6 +68,8 @@ func (f *Flusher) Flushed() { // ShouldFlush returns true if data should be forcefully flushed func (f *Flusher) ShouldFlush() bool { + f.rwLock.RLock() + defer f.rwLock.RUnlock() // Returns true if there is f.forcePeriod after f.lastDataChange and data length is greater than 0 return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0 } diff --git a/receiver/filelogreceiver/filelog_test.go b/receiver/filelogreceiver/filelog_test.go index 4a8d5ef4af4d..129d52b1468e 100644 --- a/receiver/filelogreceiver/filelog_test.go +++ b/receiver/filelogreceiver/filelog_test.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" @@ -127,28 +128,56 @@ func TestReadRotatingFiles(t *testing.T) { tests := []rotationTest{ { - name: "CopyTruncateTimestamped", - copyTruncate: true, - sequential: false, + name: "CopyTruncateTimestamped", + copyTruncate: true, + sequential: false, + enableThreadPool: false, }, { - name: "CopyTruncateSequential", - copyTruncate: true, - sequential: true, + name: "CopyTruncateSequential", + copyTruncate: true, + sequential: true, + enableThreadPool: false, + }, + { + name: "CopyTruncateTimestampedThreadPool", + copyTruncate: true, + sequential: false, + enableThreadPool: true, + }, + { + name: "CopyTruncateSequentialThreadPool", + copyTruncate: true, + sequential: true, + enableThreadPool: true, }, } if runtime.GOOS != "windows" { // Windows has very poor support for moving active files, so rotation is less commonly used tests = append(tests, []rotationTest{ { - name: "MoveCreateTimestamped", - copyTruncate: false, - sequential: false, + name: "MoveCreateTimestampedThreadPool", + copyTruncate: false, + sequential: false, + enableThreadPool: true, }, { - name: "MoveCreateSequential", - copyTruncate: false, - sequential: true, + name: "MoveCreateSequentialThreadPool", + copyTruncate: false, + sequential: true, + enableThreadPool: true, + }, + { + name: "MoveCreateTimestamped", + copyTruncate: false, + sequential: false, + enableThreadPool: false, + }, + { + name: "MoveCreateSequential", + copyTruncate: false, + sequential: true, + enableThreadPool: false, }, }...) } @@ -159,9 +188,10 @@ func TestReadRotatingFiles(t *testing.T) { } type rotationTest struct { - name string - copyTruncate bool - sequential bool + name string + copyTruncate bool + sequential bool + enableThreadPool bool } func (rt *rotationTest) Run(t *testing.T) { @@ -169,6 +199,12 @@ func (rt *rotationTest) Run(t *testing.T) { tempDir := t.TempDir() + if rt.enableThreadPool { + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set("filelog.useThreadPool", false)) + }) + require.NoError(t, featuregate.GlobalRegistry().Set("filelog.useThreadPool", true)) + } f := NewFactory() sink := new(consumertest.LogsSink)