From 1e71ef1876010e9641149af2b045065e382f4543 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Fri, 2 Jun 2023 14:22:10 +0530 Subject: [PATCH 1/8] Fixes #17846 --- pkg/stanza/fileconsumer/config.go | 17 +- pkg/stanza/fileconsumer/file.go | 34 +++- pkg/stanza/fileconsumer/file_test.go | 49 +++++ pkg/stanza/fileconsumer/file_threadpool.go | 206 +++++++++++++++++++++ pkg/stanza/fileconsumer/rotation_test.go | 13 ++ pkg/stanza/fileconsumer/trie.go | 110 +++++++++++ pkg/stanza/fileconsumer/util_test.go | 20 +- pkg/stanza/operator/helper/flusher.go | 6 + receiver/filelogreceiver/filelog_test.go | 66 +++++-- 9 files changed, 500 insertions(+), 21 deletions(-) create mode 100755 pkg/stanza/fileconsumer/file_threadpool.go create mode 100755 pkg/stanza/fileconsumer/trie.go diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index b179cdb8199a..88c596b1880d 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -39,6 +39,13 @@ var allowFileDeletion = featuregate.GlobalRegistry().MustRegister( featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16314"), ) +var useThreadPool = featuregate.GlobalRegistry().MustRegister( + "filelog.useThreadPool", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("When enabled, log collection switches to a thread pool model, respecting the `poll_interval` config."), + // featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16314"), +) + var AllowHeaderMetadataParsing = featuregate.GlobalRegistry().MustRegister( "filelog.allowHeaderMetadataParsing", featuregate.StageAlpha, @@ -150,8 +157,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory s return nil, fmt.Errorf("failed to build header config: %w", err) } } - - return &Manager{ + manager := Manager{ SugaredLogger: logger.With("component", "fileconsumer"), cancel: func() {}, readerFactory: readerFactory{ @@ -174,7 +180,12 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory s deleteAfterRead: c.DeleteAfterRead, knownFiles: make([]*Reader, 0, 10), seenPaths: make(map[string]struct{}, 100), - }, nil + } + if useThreadPool.IsEnabled() { + manager.readerChan = make(chan ReaderWrapper, c.MaxConcurrentFiles) + manager.trie = NewTrie() + } + return &manager, nil } func (c Config) validate() error { diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index d0abea54d8b9..04549ed46f11 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -34,6 +34,7 @@ type Manager struct { *zap.SugaredLogger wg sync.WaitGroup cancel context.CancelFunc + ctx context.Context readerFactory readerFactory finder Finder @@ -49,12 +50,23 @@ type Manager struct { seenPaths map[string]struct{} currentFps []*Fingerprint + + // Following fields are used only when useThreadPool is enabled + workerWg sync.WaitGroup + knownFilesLock sync.RWMutex + + readerChan chan ReaderWrapper + trieLock sync.RWMutex + + // TRIE - this data structure stores the fingerprint of the files which are currently being consumed + trie *Trie } func (m *Manager) Start(persister operator.Persister) error { ctx, cancel := context.WithCancel(context.Background()) m.cancel = cancel m.persister = persister + m.ctx = ctx // Load offsets from disk if err := m.loadLastPollFiles(ctx); err != nil { @@ -67,6 +79,14 @@ func (m *Manager) Start(persister operator.Persister) error { "exclude", m.finder.Exclude) } + // If useThreadPool is enabled, kick off the worker threads + if useThreadPool.IsEnabled() { + m.readerChan = make(chan ReaderWrapper, m.maxBatchFiles*2) + for i := 0; i < m.maxBatchFiles; i++ { + m.workerWg.Add(1) + go m.worker(ctx) + } + } // Start polling goroutine m.startPoller(ctx) @@ -77,6 +97,13 @@ func (m *Manager) Start(persister operator.Persister) error { func (m *Manager) Stop() error { m.cancel() m.wg.Wait() + if useThreadPool.IsEnabled() { + close(m.readerChan) + m.workerWg.Wait() + } + // save off any files left + m.syncLastPollFiles(m.ctx) + m.roller.cleanup() for _, reader := range m.knownFiles { reader.Close() @@ -101,8 +128,11 @@ func (m *Manager) startPoller(ctx context.Context) { return case <-globTicker.C: } - - m.poll(ctx) + if useThreadPool.IsEnabled() { + m.pollConcurrent(ctx) + } else { + m.poll(ctx) + } } }() } diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 0ab6fc8fff17..17a914de2cef 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -35,6 +35,22 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" ) +func TestMain(m *testing.M) { + // Run once with thread pool featuregate disabled + code := m.Run() + if code > 0 { + os.Exit(code) + } + + // // Run once with thread pool featuregate enabled + featuregate.GlobalRegistry().Set(useThreadPool.ID(), true) + code = m.Run() + featuregate.GlobalRegistry().Set(useThreadPool.ID(), false) + if code > 0 { + os.Exit(code) + } +} + func TestCleanStop(t *testing.T) { t.Parallel() t.Skip(`Skipping due to goroutine leak in opencensus. @@ -403,6 +419,9 @@ func TestReadNewLogs(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManagerWithOptions(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManagerWithOptions(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") // Poll once so we know this isn't a new file @@ -431,6 +450,9 @@ func TestReadExistingAndNewLogs(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManagerWithOptions(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManagerWithOptions(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") // Start with a file with an entry in it, and expect that entry @@ -455,6 +477,9 @@ func TestStartAtEnd(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) operator, emitCalls := buildTestManagerWithOptions(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManagerWithOptions(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") temp := openTemp(t, tempDir) @@ -483,6 +508,9 @@ func TestStartAtEndNewFile(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManagerWithOptions(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManagerWithOptions(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") operator.poll(context.Background()) @@ -600,6 +628,9 @@ func TestSplitWrite(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManagerWithOptions(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManagerWithOptions(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") temp := openTemp(t, tempDir) @@ -620,6 +651,9 @@ func TestIgnoreEmptyFiles(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManagerWithOptions(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManagerWithOptions(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") temp := openTemp(t, tempDir) @@ -874,6 +908,9 @@ func TestFileBatching(t *testing.T) { cfg.MaxBatches = maxBatches emitCalls := make(chan *emitParams, files*linesPerFile) operator, _ := buildTestManagerWithOptions(t, cfg, withEmitChan(emitCalls)) + if useThreadPool.IsEnabled() { + operator, _ = buildTestManagerWithOptions(t, cfg, withEmitChan(emitCalls), withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") core, observedLogs := observer.New(zap.DebugLevel) @@ -1231,6 +1268,9 @@ func TestDeleteAfterRead(t *testing.T) { emitCalls := make(chan *emitParams, totalLines) operator, _ := buildTestManagerWithOptions(t, cfg, withEmitChan(emitCalls)) + if useThreadPool.IsEnabled() { + operator, _ = buildTestManagerWithOptions(t, cfg, withEmitChan(emitCalls), withReaderChan()) + } operator.poll(context.Background()) actualTokens = append(actualTokens, waitForNTokens(t, emitCalls, totalLines)...) @@ -1261,6 +1301,9 @@ func TestMaxBatching(t *testing.T) { cfg.MaxBatches = maxBatches emitCalls := make(chan *emitParams, files*linesPerFile) operator, _ := buildTestManagerWithOptions(t, cfg, withEmitChan(emitCalls)) + if useThreadPool.IsEnabled() { + operator, _ = buildTestManagerWithOptions(t, cfg, withEmitChan(emitCalls), withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") core, observedLogs := observer.New(zap.DebugLevel) @@ -1376,6 +1419,9 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) { cfg.DeleteAfterRead = true emitCalls := make(chan *emitParams, longFileLines+1) operator, _ := buildTestManagerWithOptions(t, cfg, withEmitChan(emitCalls)) + if useThreadPool.IsEnabled() { + operator, _ = buildTestManagerWithOptions(t, cfg, withEmitChan(emitCalls), withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") shortFile := openTemp(t, tempDir) @@ -1525,6 +1571,9 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) { cfg.FingerprintSize = 18 cfg.StartAt = "beginning" operator, emitCalls := buildTestManagerWithOptions(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManagerWithOptions(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") // Both of they will be include diff --git a/pkg/stanza/fileconsumer/file_threadpool.go b/pkg/stanza/fileconsumer/file_threadpool.go new file mode 100755 index 000000000000..ee71606a7bc7 --- /dev/null +++ b/pkg/stanza/fileconsumer/file_threadpool.go @@ -0,0 +1,206 @@ +package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" + +import ( + "context" + "os" + "sync" + + "go.uber.org/zap" +) + +type ReaderWrapper struct { + reader *Reader + fp *Fingerprint +} + +// poll checks all the watched paths for new entries +func (m *Manager) pollConcurrent(ctx context.Context) { + // Increment the generation on all known readers + // This is done here because the next generation is about to start + + m.knownFilesLock.Lock() + for i := 0; i < len(m.knownFiles); i++ { + m.knownFiles[i].generation++ + } + m.knownFilesLock.Unlock() + + // Get the list of paths on disk + matches := m.finder.FindFiles() + m.consumeConcurrent(ctx, matches) + m.clearCurrentFingerprints() + + // Any new files that appear should be consumed entirely + m.readerFactory.fromBeginning = true + m.syncLastPollFilesConcurrent(ctx) +} + +func (m *Manager) worker(ctx context.Context) { + defer m.workerWg.Done() + for { + chanData, ok := <-m.readerChan + + if !ok { + return + } + r, fp := chanData.reader, chanData.fp + r.ReadToEnd(ctx) + // Delete a file if deleteAfterRead is enabled and we reached the end of the file + if m.deleteAfterRead && r.eof { + r.Close() + if err := os.Remove(r.file.Name()); err != nil { + m.Errorf("could not delete %s", r.file.Name()) + } + } else { + // Save off any files that were not fully read or if deleteAfterRead is disabled + m.saveCurrentConcurrent(r) + } + m.removePath(fp) + } +} + +func (m *Manager) makeReaderConcurrent(filePath string) (*Reader, *Fingerprint) { + if _, ok := m.seenPaths[filePath]; !ok { + if m.readerFactory.fromBeginning { + m.Infow("Started watching file", "path", filePath) + } else { + m.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", filePath) + } + m.seenPaths[filePath] = struct{}{} + } + file, err := os.Open(filePath) // #nosec - operator must read in files defined by user + if err != nil { + m.Debugf("Failed to open file", zap.Error(err)) + return nil, nil + } + fp, err := m.readerFactory.newFingerprint(file) + if err != nil { + m.Errorw("Failed creating fingerprint", zap.Error(err)) + return nil, nil + } + // Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files + + if len(fp.FirstBytes) == 0 { + if err = file.Close(); err != nil { + m.Errorf("problem closing file", "file", file.Name()) + } + return nil, nil + } + + // check if the current file is already being consumed + if m.isCurrentlyConsuming(fp) { + if err = file.Close(); err != nil { + m.Errorf("problem closing file", "file", file.Name()) + } + return nil, nil + } + + for i := 0; i < len(m.currentFps)-1; i++ { + fp2 := m.currentFps[i] + if fp.StartsWith(fp2) || fp2.StartsWith(fp) { + // Exclude + if err = file.Close(); err != nil { + m.Errorf("problem closing file", "file", file.Name()) + } + return nil, nil + } + } + m.currentFps = append(m.currentFps, fp) + + reader, err := m.newReaderConcurrent(file, fp) + if err != nil { + m.Errorw("Failed to create reader", zap.Error(err)) + return nil, nil + } + return reader, fp +} + +func (m *Manager) consumeConcurrent(ctx context.Context, paths []string) { + m.Debug("Consuming files") + m.clearOldReadersConcurrent(ctx) + for _, path := range paths { + reader, fp := m.makeReaderConcurrent(path) + if reader != nil { + // add path and fingerprint as it's not consuming + m.trieLock.Lock() + m.trie.Put(fp.FirstBytes, true) + m.trieLock.Unlock() + m.readerChan <- ReaderWrapper{reader: reader, fp: fp} + } + } +} + +func (m *Manager) isCurrentlyConsuming(fp *Fingerprint) bool { + m.trieLock.RLock() + defer m.trieLock.RUnlock() + return m.trie.Get(fp.FirstBytes) != nil +} + +func (m *Manager) removePath(fp *Fingerprint) { + m.trieLock.Lock() + defer m.trieLock.Unlock() + m.trie.Delete(fp.FirstBytes) +} + +// saveCurrent adds the readers from this polling interval to this list of +// known files, then increments the generation of all tracked old readers +// before clearing out readers that have existed for 3 generations. +func (m *Manager) saveCurrentConcurrent(reader *Reader) { + // Add readers from the current, completed poll interval to the list of known files + m.knownFilesLock.Lock() + defer m.knownFilesLock.Unlock() + + m.knownFiles = append(m.knownFiles, reader) +} + +func (m *Manager) clearOldReadersConcurrent(ctx context.Context) { + m.knownFilesLock.Lock() + defer m.knownFilesLock.Unlock() + // Clear out old readers. They are sorted such that they are oldest first, + // so we can just find the first reader whose poll cycle is less than our + // limit i.e. last 3 cycles, and keep every reader after that + oldReaders := make([]*Reader, 0) + for i := 0; i < len(m.knownFiles); i++ { + reader := m.knownFiles[i] + if reader.generation <= 2 { + oldReaders = m.knownFiles[:i] + m.knownFiles = m.knownFiles[i:] + break + } + } + var lostWG sync.WaitGroup + for _, reader := range oldReaders { + lostWG.Add(1) + go func(r *Reader) { + defer lostWG.Done() + r.ReadToEnd(ctx) + r.Close() + }(reader) + } + lostWG.Wait() +} + +func (m *Manager) newReaderConcurrent(file *os.File, fp *Fingerprint) (*Reader, error) { + // Check if the new path has the same fingerprint as an old path + if oldReader, ok := m.findFingerprintMatchConcurrent(fp); ok { + return m.readerFactory.copy(oldReader, file) + } + + // If we don't match any previously known files, create a new reader from scratch + return m.readerFactory.newReader(file, fp.Copy()) +} + +func (m *Manager) findFingerprintMatchConcurrent(fp *Fingerprint) (*Reader, bool) { + // Iterate backwards to match newest first + m.knownFilesLock.Lock() + defer m.knownFilesLock.Unlock() + + return m.findFingerprintMatch(fp) +} + +// syncLastPollFiles syncs the most recent set of files to the database +func (m *Manager) syncLastPollFilesConcurrent(ctx context.Context) { + m.knownFilesLock.RLock() + defer m.knownFilesLock.RUnlock() + + m.syncLastPollFiles(ctx) +} diff --git a/pkg/stanza/fileconsumer/rotation_test.go b/pkg/stanza/fileconsumer/rotation_test.go index 80aa39909c2b..ffdcd1f1e460 100644 --- a/pkg/stanza/fileconsumer/rotation_test.go +++ b/pkg/stanza/fileconsumer/rotation_test.go @@ -369,6 +369,9 @@ func TestMoveFile(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManagerWithOptions(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManagerWithOptions(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) @@ -401,6 +404,9 @@ func TestTrackMovedAwayFiles(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManagerWithOptions(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManagerWithOptions(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) @@ -486,6 +492,9 @@ func TestTruncateThenWrite(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManagerWithOptions(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManagerWithOptions(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) @@ -523,6 +532,9 @@ func TestCopyTruncateWriteBoth(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" operator, emitCalls := buildTestManagerWithOptions(t, cfg) + if useThreadPool.IsEnabled() { + operator, emitCalls = buildTestManagerWithOptions(t, cfg, withReaderChan()) + } operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) @@ -583,6 +595,7 @@ func TestFileMovedWhileOff_BigFiles(t *testing.T) { waitForToken(t, emitCalls, log1) // Stop the operator, then rename and write a new log + fmt.Println("okay") require.NoError(t, operator.Stop()) err := os.Rename(temp.Name(), fmt.Sprintf("%s2", temp.Name())) diff --git a/pkg/stanza/fileconsumer/trie.go b/pkg/stanza/fileconsumer/trie.go new file mode 100755 index 000000000000..b8051f0ec9e4 --- /dev/null +++ b/pkg/stanza/fileconsumer/trie.go @@ -0,0 +1,110 @@ +// Copyright 2022, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// TRIE data structure inspired by https://github.com/dghubble/trie +// This differs from the original trie. + +package fileconsumer + +type Trie struct { + value interface{} + children map[byte]*Trie +} + +// Trie node and the part string key of the child the path descends into. +type nodeTrie struct { + node *Trie + b byte +} + +// NewPathTrie allocates and returns a new *Trie. +func NewTrie() *Trie { + return &Trie{} +} + +func (trie *Trie) Get(key []byte) interface{} { + node := trie + for _, r := range key { + // We have reached end of the current path and all the previous characters have matched + // Return if current node is leaf and it is not root + node = node.children[r] + if node == nil { + return nil + } + if node.isLeaf() && node != trie { + return node.value + } + } + return node.value +} + +// Put inserts the value into the trie at the given key +func (trie *Trie) Put(key []byte, value interface{}) { + node := trie + for _, r := range key { + child, _ := node.children[r] + if child == nil { + if node.children == nil { + node.children = map[byte]*Trie{} + } + child = NewTrie() + node.children[r] = child + + // Assiging value to every child node allows us to detect partial matches. + // For eg. `123451` and `123456789` will match, even if they are not exactly same strings. + // Doing this, we store every prefix of the fingerprint. + node.value = value + } + node = child + } + node.value = value +} + +// Delete removes the value associated with the given key. Returns true if a +// node was found for the given key. If the node or any of its ancestors +// becomes childless as a result, it is removed from the trie. +func (trie *Trie) Delete(key []byte) bool { + var path []*Trie // record ancestors to check later + node := trie + for _, b := range key { + path = append(path, node) + node = node.children[b] + if node == nil { + // node does not exist + return false + } + } + // delete the node value + node.value = nil + // if leaf, remove it from its parent's children map. Repeat for ancestor path. + if node.isLeaf() { + // iterate backwards over path + for i := len(path) - 1; i >= 0; i-- { + parent := path[i] + b := key[i] + delete(parent.children, b) + if !parent.isLeaf() { + // parent has other children, stop + break + } + parent.children = nil + parent.value = nil + } + } + return true // node (internal or not) existed and its value was nil'd +} + +func (trie *Trie) isLeaf() bool { + return len(trie.children) == 0 +} diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index 3d8c96afe0dd..32e78ec2e697 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -77,7 +77,8 @@ type emitParams struct { } type testManagerConfig struct { - emitChan chan *emitParams + emitChan chan *emitParams + initializeChannel bool } type testManagerOption func(*testManagerConfig) @@ -88,6 +89,12 @@ func withEmitChan(emitChan chan *emitParams) testManagerOption { } } +func withReaderChan() testManagerOption { + return func(m *testManagerConfig) { + m.initializeChannel = true + } +} + func buildTestManagerWithOptions(t *testing.T, cfg *Config, opts ...testManagerOption) (*Manager, chan *emitParams) { tmc := &testManagerConfig{emitChan: make(chan *emitParams, 100)} for _, opt := range opts { @@ -95,6 +102,17 @@ func buildTestManagerWithOptions(t *testing.T, cfg *Config, opts ...testManagerO } input, err := cfg.Build(testutil.Logger(t), testEmitFunc(tmc.emitChan)) require.NoError(t, err) + if tmc.initializeChannel { + input.readerChan = make(chan ReaderWrapper, cfg.MaxConcurrentFiles/2) + ctx, cancel := context.WithCancel(context.Background()) + input.cancel = cancel + input.ctx = ctx + for i := 0; i < cfg.MaxConcurrentFiles/2; i++ { + input.workerWg.Add(1) + go input.worker(ctx) + } + } + return input, tmc.emitChan } diff --git a/pkg/stanza/operator/helper/flusher.go b/pkg/stanza/operator/helper/flusher.go index 61f5f33e2e55..435c6962bf48 100644 --- a/pkg/stanza/operator/helper/flusher.go +++ b/pkg/stanza/operator/helper/flusher.go @@ -16,6 +16,7 @@ package helper // import "github.com/open-telemetry/opentelemetry-collector-cont import ( "bufio" + "sync" "time" ) @@ -54,9 +55,12 @@ type Flusher struct { // if previousDataLength = 0 - no new data have been received after flush // if previousDataLength > 0 - there is data which has not been flushed yet and it doesn't changed since lastDataChange previousDataLength int + rwLock sync.RWMutex } func (f *Flusher) UpdateDataChangeTime(length int) { + f.rwLock.Lock() + defer f.rwLock.Unlock() // Skip if length is greater than 0 and didn't changed if length > 0 && length == f.previousDataLength { return @@ -75,6 +79,8 @@ func (f *Flusher) Flushed() { // ShouldFlush returns true if data should be forcefully flushed func (f *Flusher) ShouldFlush() bool { + f.rwLock.RLock() + defer f.rwLock.RUnlock() // Returns true if there is f.forcePeriod after f.lastDataChange and data length is greater than 0 return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0 } diff --git a/receiver/filelogreceiver/filelog_test.go b/receiver/filelogreceiver/filelog_test.go index 8c69f6552c28..b966da644eb3 100644 --- a/receiver/filelogreceiver/filelog_test.go +++ b/receiver/filelogreceiver/filelog_test.go @@ -32,6 +32,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" @@ -136,28 +137,56 @@ func TestReadRotatingFiles(t *testing.T) { tests := []rotationTest{ { - name: "CopyTruncateTimestamped", - copyTruncate: true, - sequential: false, + name: "CopyTruncateTimestamped", + copyTruncate: true, + sequential: false, + enableThreadPool: false, }, { - name: "CopyTruncateSequential", - copyTruncate: true, - sequential: true, + name: "CopyTruncateSequential", + copyTruncate: true, + sequential: true, + enableThreadPool: false, + }, + { + name: "CopyTruncateTimestampedThreadPool", + copyTruncate: true, + sequential: false, + enableThreadPool: true, + }, + { + name: "CopyTruncateSequentialThreadPool", + copyTruncate: true, + sequential: true, + enableThreadPool: true, }, } if runtime.GOOS != "windows" { // Windows has very poor support for moving active files, so rotation is less commonly used tests = append(tests, []rotationTest{ { - name: "MoveCreateTimestamped", - copyTruncate: false, - sequential: false, + name: "MoveCreateTimestampedThreadPool", + copyTruncate: false, + sequential: false, + enableThreadPool: true, }, { - name: "MoveCreateSequential", - copyTruncate: false, - sequential: true, + name: "MoveCreateSequentialThreadPool", + copyTruncate: false, + sequential: true, + enableThreadPool: true, + }, + { + name: "MoveCreateTimestamped", + copyTruncate: false, + sequential: false, + enableThreadPool: false, + }, + { + name: "MoveCreateSequential", + copyTruncate: false, + sequential: true, + enableThreadPool: false, }, }...) } @@ -168,9 +197,10 @@ func TestReadRotatingFiles(t *testing.T) { } type rotationTest struct { - name string - copyTruncate bool - sequential bool + name string + copyTruncate bool + sequential bool + enableThreadPool bool } func (rt *rotationTest) Run(t *testing.T) { @@ -178,6 +208,12 @@ func (rt *rotationTest) Run(t *testing.T) { tempDir := t.TempDir() + if rt.enableThreadPool { + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set("filelog.useThreadPool", false)) + }) + require.NoError(t, featuregate.GlobalRegistry().Set("filelog.useThreadPool", true)) + } f := NewFactory() sink := new(consumertest.LogsSink) From 64901f2adadbbfdb67cfe93c40651a5e6fcb56d2 Mon Sep 17 00:00:00 2001 From: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Date: Mon, 12 Jun 2023 10:56:14 +0530 Subject: [PATCH 2/8] Update copyright header to new format Co-authored-by: Daniel Jaglowski --- pkg/stanza/fileconsumer/trie.go | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/pkg/stanza/fileconsumer/trie.go b/pkg/stanza/fileconsumer/trie.go index b8051f0ec9e4..3c5bf82edfd4 100755 --- a/pkg/stanza/fileconsumer/trie.go +++ b/pkg/stanza/fileconsumer/trie.go @@ -1,16 +1,5 @@ -// Copyright 2022, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 // TRIE data structure inspired by https://github.com/dghubble/trie // This differs from the original trie. From 5b3f2da636e24d7a822dfbfa548c91dc11e11657 Mon Sep 17 00:00:00 2001 From: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Date: Mon, 12 Jun 2023 11:02:42 +0530 Subject: [PATCH 3/8] Remove unnecessary export Co-authored-by: Daniel Jaglowski --- pkg/stanza/fileconsumer/file_threadpool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/stanza/fileconsumer/file_threadpool.go b/pkg/stanza/fileconsumer/file_threadpool.go index ee71606a7bc7..a9baf5994ea1 100755 --- a/pkg/stanza/fileconsumer/file_threadpool.go +++ b/pkg/stanza/fileconsumer/file_threadpool.go @@ -8,7 +8,7 @@ import ( "go.uber.org/zap" ) -type ReaderWrapper struct { +type readerWrapper struct { reader *Reader fp *Fingerprint } From 06d65ba05be0a5ea9a704e5e848352fdfa610ba1 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Wed, 14 Jun 2023 22:57:18 +0530 Subject: [PATCH 4/8] Address review comments --- pkg/stanza/fileconsumer/config.go | 2 +- pkg/stanza/fileconsumer/file.go | 29 +++++--- pkg/stanza/fileconsumer/file_test.go | 86 ++++++++++++++++++++-- pkg/stanza/fileconsumer/file_threadpool.go | 12 +-- pkg/stanza/fileconsumer/util_test.go | 2 +- 5 files changed, 102 insertions(+), 29 deletions(-) diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 88c596b1880d..7d59d437f089 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -182,7 +182,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory s seenPaths: make(map[string]struct{}, 100), } if useThreadPool.IsEnabled() { - manager.readerChan = make(chan ReaderWrapper, c.MaxConcurrentFiles) + manager.readerChan = make(chan readerWrapper, c.MaxConcurrentFiles) manager.trie = NewTrie() } return &manager, nil diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 04549ed46f11..20f0a616055e 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -55,7 +55,7 @@ type Manager struct { workerWg sync.WaitGroup knownFilesLock sync.RWMutex - readerChan chan ReaderWrapper + readerChan chan readerWrapper trieLock sync.RWMutex // TRIE - this data structure stores the fingerprint of the files which are currently being consumed @@ -81,7 +81,7 @@ func (m *Manager) Start(persister operator.Persister) error { // If useThreadPool is enabled, kick off the worker threads if useThreadPool.IsEnabled() { - m.readerChan = make(chan ReaderWrapper, m.maxBatchFiles*2) + m.readerChan = make(chan readerWrapper, m.maxBatchFiles*2) for i := 0; i < m.maxBatchFiles; i++ { m.workerWg.Add(1) go m.worker(ctx) @@ -97,10 +97,10 @@ func (m *Manager) Start(persister operator.Persister) error { func (m *Manager) Stop() error { m.cancel() m.wg.Wait() - if useThreadPool.IsEnabled() { + if useThreadPool.IsEnabled() && m.readerChan != nil { close(m.readerChan) - m.workerWg.Wait() } + m.workerWg.Wait() // save off any files left m.syncLastPollFiles(m.ctx) @@ -166,6 +166,18 @@ func (m *Manager) poll(ctx context.Context) { m.consume(ctx, matches) } +func (m *Manager) readToEnd(r *Reader, ctx context.Context) bool { + r.ReadToEnd(ctx) + if m.deleteAfterRead && r.eof { + r.Close() + if err := os.Remove(r.file.Name()); err != nil { + m.Errorf("could not delete %s", r.file.Name()) + } + return true + } + return false +} + func (m *Manager) consume(ctx context.Context, paths []string) { m.Debug("Consuming files") readers := make([]*Reader, 0, len(paths)) @@ -186,14 +198,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) { wg.Add(1) go func(r *Reader) { defer wg.Done() - r.ReadToEnd(ctx) - // Delete a file if deleteAfterRead is enabled and we reached the end of the file - if m.deleteAfterRead && r.eof { - r.Close() - if err := os.Remove(r.file.Name()); err != nil { - m.Errorf("could not delete %s", r.file.Name()) - } - } + m.readToEnd(r, ctx) }(reader) } wg.Wait() diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 17a914de2cef..d38f19984ec5 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -18,6 +18,8 @@ import ( "context" "fmt" "os" + + "math/rand" "path/filepath" "runtime" "strconv" @@ -37,18 +39,18 @@ import ( func TestMain(m *testing.M) { // Run once with thread pool featuregate disabled - code := m.Run() - if code > 0 { - os.Exit(code) - } + // code := m.Run() + // if code > 0 { + // os.Exit(code) + // } // // Run once with thread pool featuregate enabled featuregate.GlobalRegistry().Set(useThreadPool.ID(), true) - code = m.Run() - featuregate.GlobalRegistry().Set(useThreadPool.ID(), false) + code := m.Run() if code > 0 { os.Exit(code) } + featuregate.GlobalRegistry().Set(useThreadPool.ID(), false) } func TestCleanStop(t *testing.T) { @@ -1601,3 +1603,75 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) { waitForTokens(t, emitCalls, [][]byte{[]byte(content), []byte(newContent1), []byte(newContent)}) operator.wg.Wait() } + +func BenchmarkTest(b *testing.B) { + ans := time.Duration(0) + // logs := []int{ + // // 1000, + // // 10000, + // // 20000, + // // 40000, + // // 80000, + // 100000, + // } + if useThreadPool.IsEnabled() { + fmt.Println("Enabled") + } else { + fmt.Println("Disabled") + + } + b.Run("Custom", func(b *testing.B) { + fmt.Println(b.N) + rootDir := b.TempDir() + num_files := b.N * 128 + num_logs := make([]int, 0) + num := 0 + for j := 0; j < num_files; j++ { + if rand.Float32() <= 0.3 { + num += 100000 + num_logs = append(num_logs, 100000) + } else { + num += 10 + num_logs = append(num_logs, 10) + } + + } + cfg := NewConfig().includeDir(rootDir) + cfg.StartAt = "beginning" + cfg.MaxConcurrentFiles = 32 + emitCalls := make(chan []byte) + + operator, _ := cfg.Build(testutil.Logger(b), emitOnChan(emitCalls)) + // operator, emitCalls := buildTestManagerWithOptions(b, cfg) + b.ResetTimer() + operator.Start(testutil.NewMockPersister("test")) + defer func() { + operator.Stop() + }() + getMessage := func(f, m int) string { return fmt.Sprintf("round %d,file %d,\n", f, m) } + + var _wg sync.WaitGroup + for j := 0; j < num_files; j++ { + _wg.Add(1) + + go func(j int) { + defer _wg.Done() + file, _ := os.OpenFile(filepath.Join(rootDir, fmt.Sprintf("%d.log", j)), os.O_CREATE|os.O_RDWR, 0600) + for k := 0; k < num_logs[j]; k++ { + file.WriteString(getMessage(j, k)) + // time.Sleep(time.Millisecond * 2) + } + }(j) + } + // tick := time.NewTicke / r(5*time.Second) + time1 := time.Now() + for i := 0; i < num; i++ { + <-emitCalls + } + s := time.Now().Sub(time1) + ans += s + fmt.Println(s) + _wg.Wait() + }) + fmt.Println(ans) +} diff --git a/pkg/stanza/fileconsumer/file_threadpool.go b/pkg/stanza/fileconsumer/file_threadpool.go index a9baf5994ea1..fa7ebb917aba 100755 --- a/pkg/stanza/fileconsumer/file_threadpool.go +++ b/pkg/stanza/fileconsumer/file_threadpool.go @@ -43,14 +43,8 @@ func (m *Manager) worker(ctx context.Context) { return } r, fp := chanData.reader, chanData.fp - r.ReadToEnd(ctx) - // Delete a file if deleteAfterRead is enabled and we reached the end of the file - if m.deleteAfterRead && r.eof { - r.Close() - if err := os.Remove(r.file.Name()); err != nil { - m.Errorf("could not delete %s", r.file.Name()) - } - } else { + + if !m.readToEnd(r, ctx) { // Save off any files that were not fully read or if deleteAfterRead is disabled m.saveCurrentConcurrent(r) } @@ -124,7 +118,7 @@ func (m *Manager) consumeConcurrent(ctx context.Context, paths []string) { m.trieLock.Lock() m.trie.Put(fp.FirstBytes, true) m.trieLock.Unlock() - m.readerChan <- ReaderWrapper{reader: reader, fp: fp} + m.readerChan <- readerWrapper{reader: reader, fp: fp} } } } diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index 32e78ec2e697..5c53bce93b1f 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -103,7 +103,7 @@ func buildTestManagerWithOptions(t *testing.T, cfg *Config, opts ...testManagerO input, err := cfg.Build(testutil.Logger(t), testEmitFunc(tmc.emitChan)) require.NoError(t, err) if tmc.initializeChannel { - input.readerChan = make(chan ReaderWrapper, cfg.MaxConcurrentFiles/2) + input.readerChan = make(chan readerWrapper, cfg.MaxConcurrentFiles/2) ctx, cancel := context.WithCancel(context.Background()) input.cancel = cancel input.ctx = ctx From d105d851e23b5c1472282151f64575d3470bff10 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Fri, 23 Jun 2023 12:34:07 +0530 Subject: [PATCH 5/8] Move saveReaders to another go routine. --- pkg/stanza/fileconsumer/config.go | 1 + pkg/stanza/fileconsumer/file.go | 21 ++++++++++++----- pkg/stanza/fileconsumer/file_threadpool.go | 26 ++++++++++++++-------- pkg/stanza/fileconsumer/util_test.go | 5 ++++- 4 files changed, 38 insertions(+), 15 deletions(-) diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 6a271cb518ae..02be1286ecd6 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -172,6 +172,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory s } if useThreadPool.IsEnabled() { manager.readerChan = make(chan readerWrapper, c.MaxConcurrentFiles) + manager.saveReaders = make(chan readerWrapper, c.MaxConcurrentFiles) manager.trie = NewTrie() } return &manager, nil diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 2937453c739e..1a67c8d457d2 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -42,10 +42,12 @@ type Manager struct { // Following fields are used only when useThreadPool is enabled workerWg sync.WaitGroup + _workerWg sync.WaitGroup knownFilesLock sync.RWMutex - readerChan chan readerWrapper - trieLock sync.RWMutex + readerChan chan readerWrapper + saveReaders chan readerWrapper + trieLock sync.RWMutex // TRIE - this data structure stores the fingerprint of the files which are currently being consumed trie *Trie @@ -71,10 +73,13 @@ func (m *Manager) Start(persister operator.Persister) error { // If useThreadPool is enabled, kick off the worker threads if useThreadPool.IsEnabled() { m.readerChan = make(chan readerWrapper, m.maxBatchFiles*2) + m.saveReaders = make(chan readerWrapper, m.maxBatchFiles*2) for i := 0; i < m.maxBatchFiles; i++ { m.workerWg.Add(1) go m.worker(ctx) } + m._workerWg.Add(1) + go m.saveReadersConcurrent(ctx) } // Start polling goroutine m.startPoller(ctx) @@ -86,10 +91,16 @@ func (m *Manager) Start(persister operator.Persister) error { func (m *Manager) Stop() error { m.cancel() m.wg.Wait() - if useThreadPool.IsEnabled() && m.readerChan != nil { - close(m.readerChan) + if useThreadPool.IsEnabled() { + if m.readerChan != nil { + close(m.readerChan) + } + m.workerWg.Wait() + if m.saveReaders != nil { + close(m.saveReaders) + } + m._workerWg.Wait() } - m.workerWg.Wait() // save off any files left m.syncLastPollFiles(m.ctx) diff --git a/pkg/stanza/fileconsumer/file_threadpool.go b/pkg/stanza/fileconsumer/file_threadpool.go index fa7ebb917aba..fd331899e852 100755 --- a/pkg/stanza/fileconsumer/file_threadpool.go +++ b/pkg/stanza/fileconsumer/file_threadpool.go @@ -46,7 +46,7 @@ func (m *Manager) worker(ctx context.Context) { if !m.readToEnd(r, ctx) { // Save off any files that were not fully read or if deleteAfterRead is disabled - m.saveCurrentConcurrent(r) + m.saveReaders <- readerWrapper{reader: r, fp: fp} } m.removePath(fp) } @@ -135,15 +135,23 @@ func (m *Manager) removePath(fp *Fingerprint) { m.trie.Delete(fp.FirstBytes) } -// saveCurrent adds the readers from this polling interval to this list of -// known files, then increments the generation of all tracked old readers -// before clearing out readers that have existed for 3 generations. -func (m *Manager) saveCurrentConcurrent(reader *Reader) { +// saveReadersConcurrent adds the readers from this polling interval to this list of +// known files and removes the fingerprint from the TRIE +func (m *Manager) saveReadersConcurrent(ctx context.Context) { + defer m._workerWg.Done() // Add readers from the current, completed poll interval to the list of known files - m.knownFilesLock.Lock() - defer m.knownFilesLock.Unlock() - - m.knownFiles = append(m.knownFiles, reader) + for { + select { + case readerWrapper, ok := <-m.saveReaders: + if !ok { + return + } + m.knownFilesLock.Lock() + m.knownFiles = append(m.knownFiles, readerWrapper.reader) + m.knownFilesLock.Unlock() + m.removePath(readerWrapper.fp) + } + } } func (m *Manager) clearOldReadersConcurrent(ctx context.Context) { diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index 31d53091b4bb..341e2c76230c 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -92,7 +92,8 @@ func buildTestManager(t *testing.T, cfg *Config, opts ...testManagerOption) (*Ma input, err := cfg.Build(testutil.Logger(t), testEmitFunc(tmc.emitChan)) require.NoError(t, err) if tmc.initializeChannel { - input.readerChan = make(chan readerWrapper, cfg.MaxConcurrentFiles/2) + input.readerChan = make(chan readerWrapper, cfg.MaxConcurrentFiles) + input.saveReaders = make(chan readerWrapper, cfg.MaxConcurrentFiles) ctx, cancel := context.WithCancel(context.Background()) input.cancel = cancel input.ctx = ctx @@ -100,6 +101,8 @@ func buildTestManager(t *testing.T, cfg *Config, opts ...testManagerOption) (*Ma input.workerWg.Add(1) go input.worker(ctx) } + input.workerWg.Add(1) + go input.saveReadersConcurrent(ctx) } return input, tmc.emitChan From 29f5dc958b5f9c2c34c8596572e11830d30d885d Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Sun, 25 Jun 2023 21:15:28 +0530 Subject: [PATCH 6/8] Update test cases --- pkg/stanza/fileconsumer/rotation_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/stanza/fileconsumer/rotation_test.go b/pkg/stanza/fileconsumer/rotation_test.go index 4e34cde1227f..9e4edab590f7 100644 --- a/pkg/stanza/fileconsumer/rotation_test.go +++ b/pkg/stanza/fileconsumer/rotation_test.go @@ -584,7 +584,6 @@ func TestFileMovedWhileOff_BigFiles(t *testing.T) { waitForToken(t, emitCalls, log1) // Stop the operator, then rename and write a new log - fmt.Println("okay") require.NoError(t, operator.Stop()) err := os.Rename(temp.Name(), fmt.Sprintf("%s2", temp.Name())) From 2bbcc66f612b4d4e876bef0124aefd65cef8b601 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Sun, 25 Jun 2023 21:20:35 +0530 Subject: [PATCH 7/8] Add license --- pkg/stanza/fileconsumer/file.go | 8 +++----- pkg/stanza/fileconsumer/file_test.go | 6 +++--- pkg/stanza/fileconsumer/file_threadpool.go | 13 ++++++++----- pkg/stanza/fileconsumer/util_test.go | 3 +-- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 013e91cb65e4..f81f91d2425f 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -23,7 +23,6 @@ type Manager struct { *zap.SugaredLogger wg sync.WaitGroup cancel context.CancelFunc - ctx context.Context readerFactory readerFactory finder Finder @@ -57,7 +56,6 @@ func (m *Manager) Start(persister operator.Persister) error { ctx, cancel := context.WithCancel(context.Background()) m.cancel = cancel m.persister = persister - m.ctx = ctx // Load offsets from disk if err := m.loadLastPollFiles(ctx); err != nil { @@ -79,7 +77,7 @@ func (m *Manager) Start(persister operator.Persister) error { go m.worker(ctx) } m._workerWg.Add(1) - go m.saveReadersConcurrent(ctx) + go m.saveReadersConcurrent() } // Start polling goroutine m.startPoller(ctx) @@ -170,7 +168,7 @@ func (m *Manager) poll(ctx context.Context) { m.consume(ctx, matches) } -func (m *Manager) readToEnd(r *Reader, ctx context.Context) bool { +func (m *Manager) readToEnd(ctx context.Context, r *Reader) bool { r.ReadToEnd(ctx) if m.deleteAfterRead && r.eof { r.Close() @@ -202,7 +200,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) { wg.Add(1) go func(r *Reader) { defer wg.Done() - m.readToEnd(r, ctx) + m.readToEnd(ctx, r) }(reader) } wg.Wait() diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 69ae51877685..09e926dc45dc 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -32,13 +32,13 @@ func TestMain(m *testing.M) { os.Exit(code) } - // // Run once with thread pool featuregate enabled - featuregate.GlobalRegistry().Set(useThreadPool.ID(), true) + // Run once with thread pool featuregate enabled + featuregate.GlobalRegistry().Set(useThreadPool.ID(), true) //nolint:all code = m.Run() if code > 0 { os.Exit(code) } - featuregate.GlobalRegistry().Set(useThreadPool.ID(), false) + featuregate.GlobalRegistry().Set(useThreadPool.ID(), false) //nolint:all } func TestCleanStop(t *testing.T) { diff --git a/pkg/stanza/fileconsumer/file_threadpool.go b/pkg/stanza/fileconsumer/file_threadpool.go index b85d8c7c801f..b389333d7d19 100755 --- a/pkg/stanza/fileconsumer/file_threadpool.go +++ b/pkg/stanza/fileconsumer/file_threadpool.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" import ( @@ -44,7 +47,7 @@ func (m *Manager) worker(ctx context.Context) { } r, fp := chanData.reader, chanData.fp - if !m.readToEnd(r, ctx) { + if !m.readToEnd(ctx, r) { // Save off any files that were not fully read or if deleteAfterRead is disabled m.saveReaders <- readerWrapper{reader: r, fp: fp} } @@ -112,19 +115,19 @@ func (m *Manager) removePath(fp *Fingerprint) { // saveReadersConcurrent adds the readers from this polling interval to this list of // known files and removes the fingerprint from the TRIE -func (m *Manager) saveReadersConcurrent(ctx context.Context) { +func (m *Manager) saveReadersConcurrent() { defer m._workerWg.Done() // Add readers from the current, completed poll interval to the list of known files for { select { - case readerWrapper, ok := <-m.saveReaders: + case reader, ok := <-m.saveReaders: if !ok { return } m.knownFilesLock.Lock() - m.knownFiles = append(m.knownFiles, readerWrapper.reader) + m.knownFiles = append(m.knownFiles, reader.reader) m.knownFilesLock.Unlock() - m.removePath(readerWrapper.fp) + m.removePath(reader.fp) } } } diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index e21d385f4ab5..371b60e72a73 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -96,13 +96,12 @@ func buildTestManager(t *testing.T, cfg *Config, opts ...testManagerOption) (*Ma input.saveReaders = make(chan readerWrapper, cfg.MaxConcurrentFiles) ctx, cancel := context.WithCancel(context.Background()) input.cancel = cancel - input.ctx = ctx for i := 0; i < cfg.MaxConcurrentFiles/2; i++ { input.workerWg.Add(1) go input.worker(ctx) } input._workerWg.Add(1) - go input.saveReadersConcurrent(ctx) + go input.saveReadersConcurrent() } return input, tmc.emitChan From 1fa0744394d60dfd48f2924d449f8f734a0b1f90 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Sun, 25 Jun 2023 21:47:07 +0530 Subject: [PATCH 8/8] FIx rotation bug --- pkg/stanza/fileconsumer/file_threadpool.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/stanza/fileconsumer/file_threadpool.go b/pkg/stanza/fileconsumer/file_threadpool.go index b389333d7d19..e8a046bbe969 100755 --- a/pkg/stanza/fileconsumer/file_threadpool.go +++ b/pkg/stanza/fileconsumer/file_threadpool.go @@ -50,8 +50,9 @@ func (m *Manager) worker(ctx context.Context) { if !m.readToEnd(ctx, r) { // Save off any files that were not fully read or if deleteAfterRead is disabled m.saveReaders <- readerWrapper{reader: r, fp: fp} + } else { + m.removePath(fp) } - m.removePath(fp) } }