Skip to content

Commit

Permalink
File Input Cleanup - Step 1 (#155)
Browse files Browse the repository at this point in the history
The codebase within the `file_input` operator is complex enough that functional changes are difficult to confidently implement. This PR is minor step towards untangling the code base.

- Open files in the `makeReaders` method
- Add `Reader.Close`
- Remove unnecessary copying of file handles
  • Loading branch information
djaglowski authored May 25, 2021
1 parent 964a7f9 commit 6184b76
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 30 deletions.
58 changes: 28 additions & 30 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,26 +136,7 @@ func (f *InputOperator) poll(ctx context.Context) {
}
}

// Open the files first to minimize the time between listing and opening
files := make([]*os.File, 0, len(matches))
for _, path := range matches {
if _, ok := f.SeenPaths[path]; !ok {
if f.startAtBeginning {
f.Infow("Started watching file", "path", path)
} else {
f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path)
}
f.SeenPaths[path] = struct{}{}
}
file, err := os.Open(path)
if err != nil {
f.Errorw("Failed to open file", zap.Error(err))
continue
}
files = append(files, file)
}

readers := f.makeReaders(files)
readers := f.makeReaders(matches)
f.firstCheck = false

var wg sync.WaitGroup
Expand All @@ -171,8 +152,8 @@ func (f *InputOperator) poll(ctx context.Context) {
wg.Wait()

// Close all files
for _, file := range files {
file.Close()
for _, reader := range readers {
reader.Close()
}

f.saveCurrent(readers)
Expand Down Expand Up @@ -208,7 +189,26 @@ func getMatches(includes, excludes []string) []string {
// makeReaders takes a list of paths, then creates readers from each of those paths,
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (f *InputOperator) makeReaders(files []*os.File) []*Reader {
func (f *InputOperator) makeReaders(filesPaths []string) []*Reader {
// Open the files first to minimize the time between listing and opening
files := make([]*os.File, 0, len(filesPaths))
for _, path := range filesPaths {
if _, ok := f.SeenPaths[path]; !ok {
if f.startAtBeginning {
f.Infow("Started watching file", "path", path)
} else {
f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path)
}
f.SeenPaths[path] = struct{}{}
}
file, err := os.Open(path)
if err != nil {
f.Errorw("Failed to open file", zap.Error(err))
continue
}
files = append(files, file)
}

// Get fingerprints for each file
fps := make([]*Fingerprint, 0, len(files))
for _, file := range files {
Expand All @@ -220,18 +220,15 @@ func (f *InputOperator) makeReaders(files []*os.File) []*Reader {
fps = append(fps, fp)
}

// Make a copy of the files so we don't modify the original
filesCopy := make([]*os.File, len(files))
copy(filesCopy, files)

// Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files
OUTER:
for i := 0; i < len(fps); {
fp := fps[i]
if len(fp.FirstBytes) == 0 {
files[i].Close()
// Empty file, don't read it until we can compare its fingerprint
fps = append(fps[:i], fps[i+1:]...)
filesCopy = append(filesCopy[:i], filesCopy[i+1:]...)
files = append(files[:i], files[i+1:]...)
}

for j := 0; j < len(fps); j++ {
Expand All @@ -243,8 +240,9 @@ OUTER:
fp2 := fps[j]
if fp.StartsWith(fp2) || fp2.StartsWith(fp) {
// Exclude
files[i].Close()
fps = append(fps[:i], fps[i+1:]...)
filesCopy = append(filesCopy[:i], filesCopy[i+1:]...)
files = append(files[:i], files[i+1:]...)
continue OUTER
}
}
Expand All @@ -253,7 +251,7 @@ OUTER:

readers := make([]*Reader, 0, len(fps))
for i := 0; i < len(fps); i++ {
reader, err := f.newReader(filesCopy[i], fps[i], f.firstCheck)
reader, err := f.newReader(files[i], fps[i], f.firstCheck)
if err != nil {
f.Errorw("Failed to create reader", zap.Error(err))
continue
Expand Down
5 changes: 5 additions & 0 deletions operator/builtin/input/file/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ func (f *Reader) ReadToEnd(ctx context.Context) {
}
}

// Close will close the file
func (f *Reader) Close() error {
return f.file.Close()
}

// Emit creates an entry with the decoded message and sends it to the next
// operator in the pipeline
func (f *Reader) emit(ctx context.Context, msgBuf []byte) error {
Expand Down

0 comments on commit 6184b76

Please sign in to comment.