From 34a85da1586aabbf28e56bf09a0db070ae4294cb Mon Sep 17 00:00:00 2001 From: Joseph Sirianni Date: Wed, 26 May 2021 18:46:01 -0400 Subject: [PATCH] Cleanup file input https://github.com/open-telemetry/opentelemetry-log-collection/pull/155 (#316) --- operator/builtin/input/file/file.go | 57 +++++++++++++-------------- operator/builtin/input/file/reader.go | 5 +++ 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index f8da96fa6..27e074b4f 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -120,26 +120,7 @@ func (f *InputOperator) poll(ctx context.Context) { } } - // Open the files first to minimize the time between listing and opening - files := make([]*os.File, 0, len(matches)) - for _, path := range matches { - if _, ok := f.SeenPaths[path]; !ok { - if f.startAtBeginning { - f.Infow("Started watching file", "path", path) - } else { - f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path) - } - f.SeenPaths[path] = struct{}{} - } - file, err := os.Open(path) - if err != nil { - f.Errorw("Failed to open file", zap.Error(err)) - continue - } - files = append(files, file) - } - - readers := f.makeReaders(files) + readers := f.makeReaders(matches) f.firstCheck = false var wg sync.WaitGroup @@ -155,8 +136,8 @@ func (f *InputOperator) poll(ctx context.Context) { wg.Wait() // Close all files - for _, file := range files { - file.Close() + for _, reader := range readers { + reader.Close() } f.saveCurrent(readers) @@ -192,7 +173,26 @@ func getMatches(includes, excludes []string) []string { // makeReaders takes a list of paths, then creates readers from each of those paths, // discarding any that have a duplicate fingerprint to other files that have already // been read this polling interval -func (f *InputOperator) makeReaders(files []*os.File) []*Reader { +func (f *InputOperator) makeReaders(filePaths []string) []*Reader { + // Open the files first to minimize the time between listing and opening + files := make([]*os.File, 0, len(filePaths)) + for _, path := range filePaths { + if _, ok := f.SeenPaths[path]; !ok { + if f.startAtBeginning { + f.Infow("Started watching file", "path", path) + } else { + f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path) + } + f.SeenPaths[path] = struct{}{} + } + file, err := os.Open(path) + if err != nil { + f.Errorw("Failed to open file", zap.Error(err)) + continue + } + files = append(files, file) + } + // Get fingerprints for each file fps := make([]*Fingerprint, 0, len(files)) for _, file := range files { @@ -204,18 +204,15 @@ func (f *InputOperator) makeReaders(files []*os.File) []*Reader { fps = append(fps, fp) } - // Make a copy of the files so we don't modify the original - filesCopy := make([]*os.File, len(files)) - copy(filesCopy, files) - // Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files OUTER: for i := 0; i < len(fps); { fp := fps[i] if len(fp.FirstBytes) == 0 { + files[i].Close() // Empty file, don't read it until we can compare its fingerprint fps = append(fps[:i], fps[i+1:]...) - filesCopy = append(filesCopy[:i], filesCopy[i+1:]...) + files = append(files[:i], files[i+1:]...) } @@ -229,7 +226,7 @@ OUTER: if fp.StartsWith(fp2) || fp2.StartsWith(fp) { // Exclude fps = append(fps[:i], fps[i+1:]...) - filesCopy = append(filesCopy[:i], filesCopy[i+1:]...) + files = append(files[:i], files[i+1:]...) continue OUTER } } @@ -238,7 +235,7 @@ OUTER: readers := make([]*Reader, 0, len(fps)) for i := 0; i < len(fps); i++ { - reader, err := f.newReader(filesCopy[i], fps[i], f.firstCheck) + reader, err := f.newReader(files[i], fps[i], f.firstCheck) if err != nil { f.Errorw("Failed to create reader", zap.Error(err)) continue diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index 3d9939eee..4be51b46b 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -102,6 +102,11 @@ func (f *Reader) ReadToEnd(ctx context.Context) { } } +// Close will close the file +func (f *Reader) Close() error { + return f.file.Close() +} + // Emit creates an entry with the decoded message and sends it to the next // operator in the pipeline func (f *Reader) emit(ctx context.Context, msgBuf []byte) error {