From 0ccb4bb11ccd02e8feddb464cdfdb651ad894da8 Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Fri, 10 Jul 2020 16:59:08 -0400 Subject: [PATCH] Fix reading partial logs in active files When logs are actively being written to, we can't assume that an EOF is a valid log entry terminator since the application writing could have written a partial entry. This commit changes our logic so we only read to the end of the file if both the offset and the file size haven't changed since the last time we polled. This gives applications as long as the poll interval to write an entire log entry. --- plugin/builtin/input/file/file.go | 39 +++++--- plugin/builtin/input/file/line_splitter.go | 36 ++++++-- .../builtin/input/file/line_splitter_test.go | 92 +++++++++++++++---- plugin/builtin/input/file/read_to_end.go | 25 ++++- 4 files changed, 157 insertions(+), 35 deletions(-) diff --git a/plugin/builtin/input/file/file.go b/plugin/builtin/input/file/file.go index 4d049c87a..1ea32057e 100644 --- a/plugin/builtin/input/file/file.go +++ b/plugin/builtin/input/file/file.go @@ -71,7 +71,7 @@ func (c FileInputConfig) Build(context plugin.BuildContext) (plugin.Plugin, erro // Determine the split function for log entries var splitFunc bufio.SplitFunc if c.Multiline == nil { - splitFunc = bufio.ScanLines + splitFunc = NewNewlineSplitFunc() } else { definedLineEndPattern := c.Multiline.LineEndPattern != "" definedLineStartPattern := c.Multiline.LineStartPattern != "" @@ -253,14 +253,14 @@ func (f *FileInput) checkFile(ctx context.Context, path string, firstCheck bool) f.runningFiles[path] = struct{}{} f.knownFiles[path] = knownFile f.readerWg.Add(1) - go func(ctx context.Context, path string, offset int64) { + go func(ctx context.Context, path string, offset, lastSeenSize int64) { defer f.readerWg.Done() messenger := f.newFileUpdateMessenger(path) - err := ReadToEnd(ctx, path, offset, messenger, f.SplitFunc, f.PathField, f.InputPlugin, f.MaxLogSize) + err := ReadToEnd(ctx, path, offset, lastSeenSize, messenger, f.SplitFunc, f.PathField, f.InputPlugin, f.MaxLogSize) if err != nil { f.Warnw("Failed to read log file", zap.Error(err)) } - }(ctx, path, knownFile.Offset) + }(ctx, path, knownFile.Offset, knownFile.LastSeenFileSize) } func (f *FileInput) updateFile(message fileUpdateMessage) { @@ -271,6 +271,12 @@ func (f *FileInput) updateFile(message fileUpdateMessage) { knownFile := f.knownFiles[message.path] + // This is a last seen size message, so just set the size and return + if message.lastSeenFileSize != -1 { + knownFile.LastSeenFileSize = message.lastSeenFileSize + return + } + if message.newOffset < knownFile.Offset { // The file was truncated or rotated @@ -388,6 +394,7 @@ type knownFileInfo struct { Fingerprint []byte SmallFileContents []byte Offset int64 + LastSeenFileSize int64 } func newKnownFileInfo(path string, fingerprintBytes int64, startAtBeginning bool) (*knownFileInfo, error) { @@ -474,9 +481,10 @@ func fingerprintFile(file *os.File, numBytes int64) ([]byte, error) { } type fileUpdateMessage struct { - path string - newOffset int64 - finished bool + path string + newOffset int64 + lastSeenFileSize int64 + finished bool } type fileUpdateMessenger struct { @@ -486,15 +494,24 @@ type fileUpdateMessenger struct { func (f *fileUpdateMessenger) SetOffset(offset int64) { f.c <- fileUpdateMessage{ - path: f.path, - newOffset: offset, + path: f.path, + newOffset: offset, + lastSeenFileSize: -1, + } +} + +func (f *fileUpdateMessenger) SetLastSeenFileSize(size int64) { + f.c <- fileUpdateMessage{ + path: f.path, + lastSeenFileSize: size, } } func (f *fileUpdateMessenger) FinishedReading() { f.c <- fileUpdateMessage{ - path: f.path, - finished: true, + path: f.path, + finished: true, + lastSeenFileSize: -1, } } diff --git a/plugin/builtin/input/file/line_splitter.go b/plugin/builtin/input/file/line_splitter.go index 7df454eee..dd72a2113 100644 --- a/plugin/builtin/input/file/line_splitter.go +++ b/plugin/builtin/input/file/line_splitter.go @@ -2,6 +2,7 @@ package file import ( "bufio" + "bytes" "regexp" ) @@ -25,18 +26,12 @@ func NewLineStartSplitFunc(re *regexp.Regexp) bufio.SplitFunc { if firstMatchEnd == len(data) { // the first match goes to the end of the buffer, so don't look for a second match - if atEOF { - return len(data), data[firstMatchStart:], nil // return the rest of the file and advance to end - } - return 0, nil, nil // read more data and try again + return 0, nil, nil } secondLocOffset := firstMatchEnd + 1 secondLoc := re.FindIndex(data[secondLocOffset:]) if secondLoc == nil { - if atEOF { - return len(data), data[firstMatchStart:], nil // return the rest of the file and advance to end - } return 0, nil, nil // read more data and try again } secondMatchStart := secondLoc[0] + secondLocOffset @@ -69,3 +64,30 @@ func NewLineEndSplitFunc(re *regexp.Regexp) bufio.SplitFunc { return } } + +// NewNewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but +// never returning an token using EOF as a terminator +func NewNewlineSplitFunc() bufio.SplitFunc { + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + + if i := bytes.IndexByte(data, '\n'); i >= 0 { + // We have a full newline-terminated line. + return i + 1, dropCR(data[0:i]), nil + } + + // Request more data. + return 0, nil, nil + } +} + +// dropCR drops a terminal \r from the data. +func dropCR(data []byte) []byte { + if len(data) > 0 && data[len(data)-1] == '\r' { + return data[0 : len(data)-1] + } + + return data +} diff --git a/plugin/builtin/input/file/line_splitter_test.go b/plugin/builtin/input/file/line_splitter_test.go index 8bb1442db..3e4721788 100644 --- a/plugin/builtin/input/file/line_splitter_test.go +++ b/plugin/builtin/input/file/line_splitter_test.go @@ -42,7 +42,7 @@ func TestLineStartSplitFunc(t *testing.T) { { Name: "OneLogSimple", Pattern: `LOGSTART \d+ `, - Raw: []byte(`LOGSTART 123 log1`), + Raw: []byte("LOGSTART 123 log1LOGSTART 123 a"), ExpectedTokenized: []string{ `LOGSTART 123 log1`, }, @@ -50,10 +50,10 @@ func TestLineStartSplitFunc(t *testing.T) { { Name: "TwoLogsSimple", Pattern: `LOGSTART \d+ `, - Raw: []byte(`LOGSTART 123 log1 LOGSTART 234 log2`), + Raw: []byte(`LOGSTART 123 log1 LOGSTART 234 log2 LOGSTART 345 foo`), ExpectedTokenized: []string{ `LOGSTART 123 log1 `, - `LOGSTART 234 log2`, + `LOGSTART 234 log2 `, }, }, { @@ -65,7 +65,7 @@ func TestLineStartSplitFunc(t *testing.T) { { Name: "PrecedingNonMatches", Pattern: `LOGSTART \d+ `, - Raw: []byte(`part that doesn't match LOGSTART 123 part that matches`), + Raw: []byte(`part that doesn't match LOGSTART 123 part that matchesLOGSTART 123 foo`), ExpectedTokenized: []string{ `part that doesn't match `, `LOGSTART 123 part that matches`, @@ -82,7 +82,6 @@ func TestLineStartSplitFunc(t *testing.T) { }(), ExpectedTokenized: []string{ `LOGSTART 123 ` + string(generatedByteSliceOfLength(100)), - `LOGSTART 234 endlog`, }, }, { @@ -96,7 +95,6 @@ func TestLineStartSplitFunc(t *testing.T) { }(), ExpectedTokenized: []string{ `LOGSTART 123 ` + string(generatedByteSliceOfLength(10000)), - `LOGSTART 234 endlog`, }, }, { @@ -133,8 +131,8 @@ func TestLineStartSplitFunc(t *testing.T) { t.Run("AtEOF", func(t *testing.T) { advance, token, err := splitFunc(data[:], true) require.NoError(t, err) - require.Equal(t, len(data), advance) - require.Equal(t, data, token) + require.Equal(t, 0, advance) + require.Nil(t, token) }) }) } @@ -174,34 +172,34 @@ func TestLineEndSplitFunc(t *testing.T) { }, { Name: "HugeLog100", - Pattern: `LOGEND \d+`, + Pattern: `LOGEND \d`, Raw: func() []byte { newRaw := generatedByteSliceOfLength(100) - newRaw = append(newRaw, []byte(`LOGEND 123`)...) + newRaw = append(newRaw, []byte(`LOGEND 1 `)...) return newRaw }(), ExpectedTokenized: []string{ - string(generatedByteSliceOfLength(100)) + `LOGEND 123`, + string(generatedByteSliceOfLength(100)) + `LOGEND 1`, }, }, { Name: "HugeLog10000", - Pattern: `LOGEND \d+`, + Pattern: `LOGEND \d`, Raw: func() []byte { newRaw := generatedByteSliceOfLength(10000) - newRaw = append(newRaw, []byte(`LOGEND 123`)...) + newRaw = append(newRaw, []byte(`LOGEND 1 `)...) return newRaw }(), ExpectedTokenized: []string{ - string(generatedByteSliceOfLength(10000)) + `LOGEND 123`, + string(generatedByteSliceOfLength(10000)) + `LOGEND 1`, }, }, { Name: "HugeLog1000000", - Pattern: `LOGEND \d+`, + Pattern: `LOGEND \d`, Raw: func() []byte { newRaw := generatedByteSliceOfLength(1000000) - newRaw = append(newRaw, []byte(`LOGEND 123`)...) + newRaw = append(newRaw, []byte(`LOGEND 1 `)...) return newRaw }(), ExpectedTokenized: []string{}, @@ -216,6 +214,68 @@ func TestLineEndSplitFunc(t *testing.T) { } } +func TestNewlineSplitFunc(t *testing.T) { + testCases := []tokenizerTestCase{ + { + Name: "OneLogSimple", + Raw: []byte("my log\n"), + ExpectedTokenized: []string{ + `my log`, + }, + }, + { + Name: "TwoLogsSimple", + Raw: []byte("log1\nlog2\n"), + ExpectedTokenized: []string{ + `log1`, + `log2`, + }, + }, + { + Name: "NoTailingNewline", + Raw: []byte(`foo`), + ExpectedTokenized: []string{}, + }, + { + Name: "HugeLog100", + Raw: func() []byte { + newRaw := generatedByteSliceOfLength(100) + newRaw = append(newRaw, '\n') + return newRaw + }(), + ExpectedTokenized: []string{ + string(generatedByteSliceOfLength(100)), + }, + }, + { + Name: "HugeLog10000", + Raw: func() []byte { + newRaw := generatedByteSliceOfLength(10000) + newRaw = append(newRaw, '\n') + return newRaw + }(), + ExpectedTokenized: []string{ + string(generatedByteSliceOfLength(10000)), + }, + }, + { + Name: "HugeLog1000000", + Raw: func() []byte { + newRaw := generatedByteSliceOfLength(1000000) + newRaw = append(newRaw, '\n') + return newRaw + }(), + ExpectedTokenized: []string{}, + ExpectedError: errors.New("bufio.Scanner: token too long"), + }, + } + + for _, tc := range testCases { + splitFunc := NewNewlineSplitFunc() + t.Run(tc.Name, tc.RunFunc(splitFunc)) + } +} + func generatedByteSliceOfLength(length int) []byte { chars := []byte(`abcdefghijklmnopqrstuvwxyz`) newSlice := make([]byte, length) diff --git a/plugin/builtin/input/file/read_to_end.go b/plugin/builtin/input/file/read_to_end.go index 62d724e84..33eed21e5 100644 --- a/plugin/builtin/input/file/read_to_end.go +++ b/plugin/builtin/input/file/read_to_end.go @@ -11,7 +11,7 @@ import ( "github.com/observiq/carbon/plugin/helper" ) -func ReadToEnd(ctx context.Context, path string, startOffset int64, messenger fileUpdateMessenger, splitFunc bufio.SplitFunc, pathField *entry.Field, inputPlugin helper.InputPlugin, maxLogSize int) error { +func ReadToEnd(ctx context.Context, path string, startOffset int64, lastSeenFileSize int64, messenger fileUpdateMessenger, splitFunc bufio.SplitFunc, pathField *entry.Field, inputPlugin helper.InputPlugin, maxLogSize int) error { defer messenger.FinishedReading() select { @@ -30,7 +30,9 @@ func ReadToEnd(ctx context.Context, path string, startOffset int64, messenger fi if err != nil { return err } + messenger.SetLastSeenFileSize(stat.Size()) + // Start at the beginning if the file has been truncated if stat.Size() < startOffset { startOffset = 0 messenger.SetOffset(0) @@ -52,6 +54,27 @@ func ReadToEnd(ctx context.Context, path string, startOffset int64, messenger fi } scanner.Split(scanFunc) + // If we're not at the end of the file, and we haven't + // advanced since last cycle, read the rest of the file as an entry + defer func() { + if pos < stat.Size() && pos == startOffset && lastSeenFileSize == stat.Size() { + _, err := file.Seek(pos, 0) + if err != nil { + inputPlugin.Errorf("failed to seek to read last log entry") + return + } + msgBuf := make([]byte, stat.Size()-pos) + n, err := file.Read(msgBuf) + if err != nil { + inputPlugin.Errorf("failed to read trailing log") + return + } + e := inputPlugin.NewEntry(string(msgBuf[:n])) + inputPlugin.Write(ctx, e) + messenger.SetOffset(pos + int64(n)) + } + }() + for { select { case <-ctx.Done():