From a78b8410be1f6dc60c7b44cb56e1b10d273b5e52 Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Mon, 27 Jul 2020 11:57:29 -0400 Subject: [PATCH 1/5] Refactor into an emit closure --- operator/builtin/input/file/read_to_end.go | 70 +++++++++------------- 1 file changed, 29 insertions(+), 41 deletions(-) diff --git a/operator/builtin/input/file/read_to_end.go b/operator/builtin/input/file/read_to_end.go index 0f2bf76f9..6e3762d7b 100644 --- a/operator/builtin/input/file/read_to_end.go +++ b/operator/builtin/input/file/read_to_end.go @@ -10,7 +10,6 @@ import ( "github.com/observiq/carbon/entry" "github.com/observiq/carbon/errors" "github.com/observiq/carbon/operator/helper" - "go.uber.org/zap" "golang.org/x/text/encoding" ) @@ -70,16 +69,41 @@ func ReadToEnd( } scanner.Split(scanFunc) - decoder := encoding.NewDecoder() - // Make a large, reusable buffer for transforming + decoder := encoding.NewDecoder() decodeBuffer := make([]byte, 16384) + emit := func(msgBuf []byte) { + decoder.Reset() + nDst, _, err := decoder.Transform(decodeBuffer, msgBuf, true) + if err != nil { + panic(err) + } + + e := inputOperator.NewEntry(string(decodeBuffer[:nDst])) + e.Set(filePathField, path) + e.Set(fileNameField, filepath.Base(file.Name())) + inputOperator.Write(ctx, e) + } + // If we're not at the end of the file, and we haven't // advanced since last cycle, read the rest of the file as an entry defer func() { if pos < stat.Size() && pos == startOffset && lastSeenFileSize == stat.Size() { - readRemaining(ctx, file, pos, stat.Size(), messenger, inputOperator, filePathField, fileNameField, decoder, decodeBuffer) + _, err := file.Seek(pos, 0) + if err != nil { + inputOperator.Errorf("failed to seek to read last log entry") + return + } + + msgBuf := make([]byte, stat.Size()-pos) + n, err := file.Read(msgBuf) + if err != nil { + inputOperator.Errorf("failed to read trailing log") + return + } + emit(msgBuf[:n]) + messenger.SetOffset(pos + int64(n)) } }() @@ -98,43 +122,7 @@ func ReadToEnd( return scanner.Err() } - decoder.Reset() - nDst, _, err := decoder.Transform(decodeBuffer, scanner.Bytes(), true) - if err != nil { - return err - } - - e := inputOperator.NewEntry(string(decodeBuffer[:nDst])) - e.Set(filePathField, path) - e.Set(fileNameField, filepath.Base(file.Name())) - inputOperator.Write(ctx, e) + emit(scanner.Bytes()) messenger.SetOffset(pos) } } - -// readRemaining will read the remaining characters in a file as a log entry. -func readRemaining(ctx context.Context, file *os.File, filePos int64, fileSize int64, messenger fileUpdateMessenger, inputOperator helper.InputOperator, filePathField, fileNameField entry.Field, encoder *encoding.Decoder, decodeBuffer []byte) { - _, err := file.Seek(filePos, 0) - if err != nil { - inputOperator.Errorf("failed to seek to read last log entry") - return - } - - msgBuf := make([]byte, fileSize-filePos) - n, err := file.Read(msgBuf) - if err != nil { - inputOperator.Errorf("failed to read trailing log") - return - } - encoder.Reset() - nDst, _, err := encoder.Transform(decodeBuffer, msgBuf, true) - if err != nil { - inputOperator.Errorw("failed to decode trailing log", zap.Error(err)) - } - - e := inputOperator.NewEntry(string(decodeBuffer[:nDst])) - e.Set(filePathField, file.Name()) - e.Set(fileNameField, filepath.Base(file.Name())) - inputOperator.Write(ctx, e) - messenger.SetOffset(filePos + int64(n)) -} From 9763c8ec40a0076f67ddd9fc62891ab37c8f6310 Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Mon, 27 Jul 2020 12:03:35 -0400 Subject: [PATCH 2/5] Factor out defer statement --- operator/builtin/input/file/read_to_end.go | 45 +++++++++++----------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/operator/builtin/input/file/read_to_end.go b/operator/builtin/input/file/read_to_end.go index 6e3762d7b..f52d4b450 100644 --- a/operator/builtin/input/file/read_to_end.go +++ b/operator/builtin/input/file/read_to_end.go @@ -86,27 +86,7 @@ func ReadToEnd( inputOperator.Write(ctx, e) } - // If we're not at the end of the file, and we haven't - // advanced since last cycle, read the rest of the file as an entry - defer func() { - if pos < stat.Size() && pos == startOffset && lastSeenFileSize == stat.Size() { - _, err := file.Seek(pos, 0) - if err != nil { - inputOperator.Errorf("failed to seek to read last log entry") - return - } - - msgBuf := make([]byte, stat.Size()-pos) - n, err := file.Read(msgBuf) - if err != nil { - inputOperator.Errorf("failed to read trailing log") - return - } - emit(msgBuf[:n]) - messenger.SetOffset(pos + int64(n)) - } - }() - + // Iterate over the tokenized file, emitting entries as we go for { select { case <-ctx.Done(): @@ -118,11 +98,32 @@ func ReadToEnd( if !ok { if err := scanner.Err(); err == bufio.ErrTooLong { return errors.NewError("log entry too large", "increase max_log_size or ensure that multiline regex patterns terminate") + } else if err != nil { + return errors.Wrap(err, "scanner error") } - return scanner.Err() + break } emit(scanner.Bytes()) messenger.SetOffset(pos) } + + // If we're not at the end of the file, and we haven't + // advanced since last cycle, read the rest of the file as an entry + if pos < stat.Size() && pos == startOffset && lastSeenFileSize == stat.Size() { + _, err := file.Seek(pos, 0) + if err != nil { + return errors.Wrap(err, "seeking for trailing entry") + } + + msgBuf := make([]byte, stat.Size()-pos) + n, err := file.Read(msgBuf) + if err != nil { + return errors.Wrap(err, "reading trailing entry") + } + emit(msgBuf[:n]) + messenger.SetOffset(pos + int64(n)) + } + + return nil } From 72b29e16392b8b4b7e1f83f5f0cef0251781c74e Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Mon, 27 Jul 2020 12:23:59 -0400 Subject: [PATCH 3/5] Fix short destination error --- operator/builtin/input/file/read_to_end.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/operator/builtin/input/file/read_to_end.go b/operator/builtin/input/file/read_to_end.go index f52d4b450..e993e6940 100644 --- a/operator/builtin/input/file/read_to_end.go +++ b/operator/builtin/input/file/read_to_end.go @@ -10,7 +10,9 @@ import ( "github.com/observiq/carbon/entry" "github.com/observiq/carbon/errors" "github.com/observiq/carbon/operator/helper" + "go.uber.org/zap" "golang.org/x/text/encoding" + "golang.org/x/text/transform" ) // ReadToEnd will read entries from a file and send them to the outputs of an input operator @@ -75,9 +77,17 @@ func ReadToEnd( emit := func(msgBuf []byte) { decoder.Reset() - nDst, _, err := decoder.Transform(decodeBuffer, msgBuf, true) - if err != nil { - panic(err) + var nDst int + for { + nDst, _, err = decoder.Transform(decodeBuffer, msgBuf, true) + if err != nil && err == transform.ErrShortDst { + decodeBuffer = make([]byte, len(decodeBuffer)*2) + continue + } else if err != nil { + inputOperator.Errorw("failed to transform encoding", zap.Error(err)) + return + } + break } e := inputOperator.NewEntry(string(decodeBuffer[:nDst])) From 7ef6d4459e3966b6775f7be7ee8a202d294e9515 Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Mon, 27 Jul 2020 12:33:10 -0400 Subject: [PATCH 4/5] Simplify channel operations --- operator/builtin/input/file/file.go | 24 +++++++++++----------- operator/builtin/input/file/read_to_end.go | 2 -- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index d04c42780..90390e254 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -230,6 +230,8 @@ func (f *InputOperator) Start() error { f.wg.Add(1) go func() { defer f.wg.Done() + defer f.syncKnownFiles() + defer f.drainMessages() globTicker := time.NewTicker(f.PollInterval) defer globTicker.Stop() @@ -242,9 +244,6 @@ func (f *InputOperator) Start() error { for { select { case <-ctx.Done(): - f.drainMessages() - f.readerWg.Wait() - f.syncKnownFiles() return case <-globTicker.C: matches := getMatches(f.Include, f.Exclude) @@ -256,8 +255,10 @@ func (f *InputOperator) Start() error { } f.syncKnownFiles() firstCheck = false - case message := <-f.fileUpdateChan: - f.updateFile(message) + case message, ok := <-f.fileUpdateChan: + if ok { + f.updateFile(message) + } } } }() @@ -269,7 +270,7 @@ func (f *InputOperator) Start() error { func (f *InputOperator) Stop() error { f.cancel() f.wg.Wait() - f.syncKnownFiles() + f.fileUpdateChan = make(chan fileUpdateMessage) f.knownFiles = nil return nil } @@ -314,6 +315,7 @@ func (f *InputOperator) checkFile(ctx context.Context, path string, firstCheck b go func(ctx context.Context, path string, offset, lastSeenSize int64) { defer f.readerWg.Done() messenger := f.newFileUpdateMessenger(path) + defer messenger.FinishedReading() err := ReadToEnd(ctx, path, offset, lastSeenSize, messenger, f.SplitFunc, f.FilePathField, f.FileNameField, f.InputOperator, f.MaxLogSize, f.encoding) if err != nil { f.Warnw("Failed to read log file", zap.Error(err)) @@ -386,19 +388,17 @@ func (f *InputOperator) updateFile(message fileUpdateMessage) { } func (f *InputOperator) drainMessages() { - done := make(chan struct{}) go func() { f.readerWg.Wait() - close(done) + close(f.fileUpdateChan) }() for { - select { - case <-done: + message, ok := <-f.fileUpdateChan + if !ok { return - case message := <-f.fileUpdateChan: - f.updateFile(message) } + f.updateFile(message) } } diff --git a/operator/builtin/input/file/read_to_end.go b/operator/builtin/input/file/read_to_end.go index e993e6940..1de6c578d 100644 --- a/operator/builtin/input/file/read_to_end.go +++ b/operator/builtin/input/file/read_to_end.go @@ -29,8 +29,6 @@ func ReadToEnd( maxLogSize int, encoding encoding.Encoding, ) error { - defer messenger.FinishedReading() - select { case <-ctx.Done(): return nil From 4e0b3530a78a8234e67a3e81d1072ef967db71ef Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Mon, 27 Jul 2020 12:41:54 -0400 Subject: [PATCH 5/5] Refactor custom scanner into its own object --- .../builtin/input/file/positional_scanner.go | 33 +++++++++++++++++++ operator/builtin/input/file/read_to_end.go | 21 ++++-------- 2 files changed, 39 insertions(+), 15 deletions(-) create mode 100644 operator/builtin/input/file/positional_scanner.go diff --git a/operator/builtin/input/file/positional_scanner.go b/operator/builtin/input/file/positional_scanner.go new file mode 100644 index 000000000..0e30144ad --- /dev/null +++ b/operator/builtin/input/file/positional_scanner.go @@ -0,0 +1,33 @@ +package file + +import ( + "bufio" + "io" +) + +type PositionalScanner struct { + pos int64 + *bufio.Scanner +} + +func NewPositionalScanner(r io.Reader, maxLogSize int, startOffset int64, splitFunc bufio.SplitFunc) *PositionalScanner { + ps := &PositionalScanner{ + pos: startOffset, + Scanner: bufio.NewScanner(r), + } + + buf := make([]byte, 0, 16384) + ps.Scanner.Buffer(buf, maxLogSize) + + scanFunc := func(data []byte, atEOF bool) (advance int, token []byte, err error) { + advance, token, err = splitFunc(data, atEOF) + ps.pos += int64(advance) + return + } + ps.Scanner.Split(scanFunc) + return ps +} + +func (ps *PositionalScanner) Pos() int64 { + return ps.pos +} diff --git a/operator/builtin/input/file/read_to_end.go b/operator/builtin/input/file/read_to_end.go index 1de6c578d..7b8a8f824 100644 --- a/operator/builtin/input/file/read_to_end.go +++ b/operator/builtin/input/file/read_to_end.go @@ -58,16 +58,7 @@ func ReadToEnd( return fmt.Errorf("seek file: %s", err) } - scanner := bufio.NewScanner(file) - buf := make([]byte, 0, 16384) - scanner.Buffer(buf, maxLogSize) - pos := startOffset - scanFunc := func(data []byte, atEOF bool) (advance int, token []byte, err error) { - advance, token, err = splitFunc(data, atEOF) - pos += int64(advance) - return - } - scanner.Split(scanFunc) + scanner := NewPositionalScanner(file, maxLogSize, startOffset, splitFunc) // Make a large, reusable buffer for transforming decoder := encoding.NewDecoder() @@ -113,24 +104,24 @@ func ReadToEnd( } emit(scanner.Bytes()) - messenger.SetOffset(pos) + messenger.SetOffset(scanner.Pos()) } // If we're not at the end of the file, and we haven't // advanced since last cycle, read the rest of the file as an entry - if pos < stat.Size() && pos == startOffset && lastSeenFileSize == stat.Size() { - _, err := file.Seek(pos, 0) + if scanner.Pos() < stat.Size() && scanner.Pos() == startOffset && lastSeenFileSize == stat.Size() { + _, err := file.Seek(scanner.Pos(), 0) if err != nil { return errors.Wrap(err, "seeking for trailing entry") } - msgBuf := make([]byte, stat.Size()-pos) + msgBuf := make([]byte, stat.Size()-scanner.Pos()) n, err := file.Read(msgBuf) if err != nil { return errors.Wrap(err, "reading trailing entry") } emit(msgBuf[:n]) - messenger.SetOffset(pos + int64(n)) + messenger.SetOffset(scanner.Pos() + int64(n)) } return nil