Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Joseph Sirianni authored May 26, 2021
1 parent d5631a3 commit 34a85da
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 30 deletions.
57 changes: 27 additions & 30 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,26 +120,7 @@ func (f *InputOperator) poll(ctx context.Context) {
}
}

// Open the files first to minimize the time between listing and opening
files := make([]*os.File, 0, len(matches))
for _, path := range matches {
if _, ok := f.SeenPaths[path]; !ok {
if f.startAtBeginning {
f.Infow("Started watching file", "path", path)
} else {
f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path)
}
f.SeenPaths[path] = struct{}{}
}
file, err := os.Open(path)
if err != nil {
f.Errorw("Failed to open file", zap.Error(err))
continue
}
files = append(files, file)
}

readers := f.makeReaders(files)
readers := f.makeReaders(matches)
f.firstCheck = false

var wg sync.WaitGroup
Expand All @@ -155,8 +136,8 @@ func (f *InputOperator) poll(ctx context.Context) {
wg.Wait()

// Close all files
for _, file := range files {
file.Close()
for _, reader := range readers {
reader.Close()
}

f.saveCurrent(readers)
Expand Down Expand Up @@ -192,7 +173,26 @@ func getMatches(includes, excludes []string) []string {
// makeReaders takes a list of paths, then creates readers from each of those paths,
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (f *InputOperator) makeReaders(files []*os.File) []*Reader {
func (f *InputOperator) makeReaders(filePaths []string) []*Reader {
// Open the files first to minimize the time between listing and opening
files := make([]*os.File, 0, len(filePaths))
for _, path := range filePaths {
if _, ok := f.SeenPaths[path]; !ok {
if f.startAtBeginning {
f.Infow("Started watching file", "path", path)
} else {
f.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path)
}
f.SeenPaths[path] = struct{}{}
}
file, err := os.Open(path)
if err != nil {
f.Errorw("Failed to open file", zap.Error(err))
continue
}
files = append(files, file)
}

// Get fingerprints for each file
fps := make([]*Fingerprint, 0, len(files))
for _, file := range files {
Expand All @@ -204,18 +204,15 @@ func (f *InputOperator) makeReaders(files []*os.File) []*Reader {
fps = append(fps, fp)
}

// Make a copy of the files so we don't modify the original
filesCopy := make([]*os.File, len(files))
copy(filesCopy, files)

// Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files
OUTER:
for i := 0; i < len(fps); {
fp := fps[i]
if len(fp.FirstBytes) == 0 {
files[i].Close()
// Empty file, don't read it until we can compare its fingerprint
fps = append(fps[:i], fps[i+1:]...)
filesCopy = append(filesCopy[:i], filesCopy[i+1:]...)
files = append(files[:i], files[i+1:]...)

}

Expand All @@ -229,7 +226,7 @@ OUTER:
if fp.StartsWith(fp2) || fp2.StartsWith(fp) {
// Exclude
fps = append(fps[:i], fps[i+1:]...)
filesCopy = append(filesCopy[:i], filesCopy[i+1:]...)
files = append(files[:i], files[i+1:]...)
continue OUTER
}
}
Expand All @@ -238,7 +235,7 @@ OUTER:

readers := make([]*Reader, 0, len(fps))
for i := 0; i < len(fps); i++ {
reader, err := f.newReader(filesCopy[i], fps[i], f.firstCheck)
reader, err := f.newReader(files[i], fps[i], f.firstCheck)
if err != nil {
f.Errorw("Failed to create reader", zap.Error(err))
continue
Expand Down
5 changes: 5 additions & 0 deletions operator/builtin/input/file/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func (f *Reader) ReadToEnd(ctx context.Context) {
}
}

// Close will close the file
func (f *Reader) Close() error {
return f.file.Close()
}

// Emit creates an entry with the decoded message and sends it to the next
// operator in the pipeline
func (f *Reader) emit(ctx context.Context, msgBuf []byte) error {
Expand Down

0 comments on commit 34a85da

Please sign in to comment.