diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index 9e47e711818d..0434b2a712f9 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -39,60 +39,80 @@ func NewFlusherConfig() FlusherConfig { // Build creates Flusher from configuration func (c *FlusherConfig) Build() *Flusher { - return NewFlusher(c.Period) + return &Flusher{ + lastDataChange: time.Now(), + forcePeriod: c.Period.Raw(), + previousDataLength: 0, + } } // Flusher keeps information about flush state type Flusher struct { - // force is true when data should be flushed as soon as possible - force bool // forcePeriod defines time from last flush which should pass before setting force to true. // Never forces if forcePeriod is set to 0 forcePeriod time.Duration - // lastFlush > lastForcedFlush => we can force flush if no logs are incoming for forcePeriod - // lastFlush = lastForcedFlush => last flush was forced, so we cannot force, we can update lastFlush - // lastFlush < lastForcedFlush => we just forced flush, set lastFlush to lastForcedFlush - lastFlush time.Time - lastForcedFlush time.Time -} + // lastDataChange tracks date of last data change (including new data and flushes) + lastDataChange time.Time -// NewFlusher Creates new Flusher with lastFlush set to unix epoch -// and order to not force ongoing flush -func NewFlusher(forcePeriod Duration) *Flusher { - return &Flusher{ - force: false, - lastFlush: time.Now(), - forcePeriod: forcePeriod.Raw(), - lastForcedFlush: time.Unix(0, 0), - } + // previousDataLength: + // if previousDataLength = 0 - no new data have been received after flush + // if previousDataLength > 0 - there is data which has not been flushed yet and it doesn't changed since lastDataChange + previousDataLength int } -// Flushed update lastFlush with current timestamp -func (f *Flusher) Flushed() { - if f.lastFlush.Sub(f.lastForcedFlush) < 0 { - f.lastFlush = f.lastForcedFlush - } else { - f.lastFlush = time.Now() +func (f *Flusher) UpdateDataChangeTime(length int) { + // Skip if length is greater than 0 and didn't changed + if length > 0 && length == f.previousDataLength { + return } -} -// CheckAndFlush sets internal flag to true if data is going to be force flushed -func (f *Flusher) CheckAndFlush() { - if f.forcePeriod > 0 && time.Since(f.lastFlush) > f.forcePeriod && f.lastFlush.Sub(f.lastForcedFlush) > 0 { - f.force = true - } + // update internal properties with new values if data length changed + // because it means that data is flowing and being processed + f.previousDataLength = length + f.lastDataChange = time.Now() } -// ForceFlushed update struct fields after forced flush -func (f *Flusher) Flush() { - f.force = false - f.lastForcedFlush = time.Now() +// Flushed reset data length +func (f *Flusher) Flushed() { + f.UpdateDataChangeTime(0) } // ShouldFlush returns true if data should be forcefully flushed func (f *Flusher) ShouldFlush() bool { - return f.force + // Returns true if there is f.forcePeriod after f.lastDataChange and data length is greater than 0 + return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0 +} + +func (f *Flusher) SplitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + advance, token, err = splitFunc(data, atEOF) + + // Return as it is in case of error + if err != nil { + return + } + + // Return token + if token != nil { + // Inform flusher that we just flushed + f.Flushed() + return + } + + // If there is no token, force flush eventually + if f.ShouldFlush() { + // Inform flusher that we just flushed + f.Flushed() + token = trimWhitespaces(data) + advance = len(data) + return + } + + // Inform flusher that we didn't flushed + f.UpdateDataChangeTime(len(data)) + return + } } // Multiline consists of splitFunc and variables needed to perform force flush @@ -125,6 +145,11 @@ func (c MultilineConfig) getSplitFunc(encodingVar encoding.Encoding, flushAtEOF endPattern := c.LineEndPattern startPattern := c.LineStartPattern + var ( + splitFunc bufio.SplitFunc + err error + ) + switch { case endPattern != "" && startPattern != "": return nil, fmt.Errorf("only one of line_start_pattern or line_end_pattern can be set") @@ -133,35 +158,38 @@ func (c MultilineConfig) getSplitFunc(encodingVar encoding.Encoding, flushAtEOF case encodingVar == encoding.Nop: return SplitNone(maxLogSize), nil case endPattern == "" && startPattern == "": - return NewNewlineSplitFunc(encodingVar, flushAtEOF, force) + splitFunc, err = NewNewlineSplitFunc(encodingVar, flushAtEOF) + + if err != nil { + return nil, err + } case endPattern != "": re, err := regexp.Compile("(?m)" + c.LineEndPattern) if err != nil { return nil, fmt.Errorf("compile line end regex: %s", err) } - return NewLineEndSplitFunc(re, flushAtEOF, force), nil + splitFunc = NewLineEndSplitFunc(re, flushAtEOF) case startPattern != "": re, err := regexp.Compile("(?m)" + c.LineStartPattern) if err != nil { return nil, fmt.Errorf("compile line start regex: %s", err) } - return NewLineStartSplitFunc(re, flushAtEOF, force), nil + splitFunc = NewLineStartSplitFunc(re, flushAtEOF) default: return nil, fmt.Errorf("unreachable") } + + if force != nil { + return force.SplitFunc(splitFunc), nil + } + + return splitFunc, nil } // NewLineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that start with a match to the regex pattern provided -func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *Flusher) bufio.SplitFunc { +func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { - if force != nil && force.ShouldFlush() { - force.Flush() - token = trimWhitespaces(data) - advance = len(data) - return - } - firstLoc := re.FindIndex(data) if firstLoc == nil { // Flush if no more data is expected @@ -179,7 +207,11 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *Flusher) b // the beginning of the file does not match the start pattern, so return a token up to the first match so we don't lose data advance = firstMatchStart token = trimWhitespaces(data[0:firstMatchStart]) - return + + // return if non-matching pattern is not only whitespaces + if token != nil { + return + } } if firstMatchEnd == len(data) { @@ -228,14 +260,8 @@ func SplitNone(maxLogSize int) bufio.SplitFunc { // NewLineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that end with a match to the regex pattern provided -func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *Flusher) bufio.SplitFunc { +func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { - if force != nil && force.ShouldFlush() { - force.Flush() - token = trimWhitespaces(data) - advance = len(data) - return - } loc := re.FindIndex(data) if loc == nil { // Flush if no more data is expected @@ -262,7 +288,7 @@ func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *Flusher) buf // NewNewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but // never returning an token using EOF as a terminator -func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *Flusher) (bufio.SplitFunc, error) { +func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) { newline, err := encodedNewline(encoding) if err != nil { return nil, err @@ -280,18 +306,15 @@ func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *Flu if i := bytes.Index(data, newline); i >= 0 { // We have a full newline-terminated line. - return i + len(newline), bytes.TrimSuffix(data[:i], carriageReturn), nil + token = bytes.TrimSuffix(data[:i], carriageReturn) + + return i + len(newline), trimWhitespaces(token), nil } - // Flush if no more data is expected or if - // we don't want to wait for it - forceFlush := force != nil && force.ShouldFlush() - if atEOF && (flushAtEOF || forceFlush) { + // Flush if no more data is expected + if atEOF && flushAtEOF { token = trimWhitespaces(data) advance = len(data) - if forceFlush { - force.Flushed() - } return } @@ -316,7 +339,12 @@ func trimWhitespaces(data []byte) []byte { // TrimLeft to strip EOF whitespaces in case of using $ in regex // For some reason newline and carriage return are being moved to beginning of next log // TrimRight to strip all whitespaces from the end of log - return bytes.TrimLeft(bytes.TrimRight(data, "\r\n\t "), "\r\n") + // returns nil if log is empty + token := bytes.TrimLeft(bytes.TrimRight(data, "\r\n\t "), "\r\n") + if len(token) == 0 { + return nil + } + return token } // SplitterConfig consolidates MultilineConfig and FlusherConfig @@ -353,14 +381,3 @@ type Splitter struct { SplitFunc bufio.SplitFunc Flusher *Flusher } - -// Flushed informs Flusher that Flushed had been performed -func (s *Splitter) Flushed() { - s.Flusher.Flushed() -} - -// CheckAndFlush instructs Flusher to check if next log should be forcefully flushed -// and set appropriate flags if yes -func (s *Splitter) CheckAndFlush() { - s.Flusher.CheckAndFlush() -} diff --git a/operator/helper/multiline_test.go b/operator/helper/multiline_test.go index 6726fec66c9c..34b78d12aafb 100644 --- a/operator/helper/multiline_test.go +++ b/operator/helper/multiline_test.go @@ -151,10 +151,6 @@ func TestLineStartSplitFunc(t *testing.T) { Pattern: `^LOGSTART \d+`, Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), ExpectedTokenized: []string{}, - Flusher: &Flusher{ - force: false, - lastForcedFlush: time.Now(), - }, }, { Name: "LogsWithFlusher", @@ -164,8 +160,39 @@ func TestLineStartSplitFunc(t *testing.T) { "LOGPART log1\nLOGPART log1", }, Flusher: &Flusher{ - force: true, - lastForcedFlush: time.Now(), + // We assume than in previous iteration we had same data length + previousDataLength: len("LOGPART log1\nLOGPART log1\t \n"), + lastDataChange: time.Unix(0, 0), + forcePeriod: time.Second, + }, + }, + { + Name: "LogsWithFlusherWithMultipleLogsInBuffer", + Pattern: `^LOGSTART \d+`, + Raw: []byte("LOGPART log1\nLOGSTART 123\nLOGPART log1\t \n"), + ExpectedTokenized: []string{ + // We expect all logs except last one, as it will be flushed in next iteration + "LOGPART log1", + }, + Flusher: &Flusher{ + forcePeriod: time.Second, + lastDataChange: time.Unix(0, 0), + // Assume this is next iteration with that data + previousDataLength: len("LOGPART log1\nLOGSTART 123\nLOGPART log1\t \n"), + }, + }, + { + Name: "LogsWithFlusherWithLogStartingWithWhiteChars", + Pattern: `^LOGSTART \d+`, + Raw: []byte("\nLOGSTART 333"), + ExpectedTokenized: []string{ + "LOGSTART 333", + }, + Flusher: &Flusher{ + forcePeriod: time.Second, + lastDataChange: time.Unix(0, 0), + // assume this is next iteration with this log + previousDataLength: len("\nLOGSTART 333"), }, }, } @@ -181,7 +208,7 @@ func TestLineStartSplitFunc(t *testing.T) { } t.Run("FirstMatchHitsEndOfBuffer", func(t *testing.T) { - splitFunc := NewLineStartSplitFunc(regexp.MustCompile("LOGSTART"), false, nil) + splitFunc := NewLineStartSplitFunc(regexp.MustCompile("LOGSTART"), false) data := []byte(`LOGSTART`) t.Run("NotAtEOF", func(t *testing.T) { @@ -291,10 +318,7 @@ func TestLineEndSplitFunc(t *testing.T) { Pattern: `^LOGEND.*$`, Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), ExpectedTokenized: []string{}, - Flusher: &Flusher{ - force: false, - lastForcedFlush: time.Now(), - }, + Flusher: &Flusher{}, }, { Name: "LogsWithFlusher", @@ -304,8 +328,37 @@ func TestLineEndSplitFunc(t *testing.T) { "LOGPART log1\nLOGPART log1", }, Flusher: &Flusher{ - force: true, - lastForcedFlush: time.Now(), + previousDataLength: len("LOGPART log1\nLOGPART log1"), + lastDataChange: time.Unix(0, 0), + forcePeriod: time.Second, + }, + }, + { + Name: "LogsWithFlusherWithMultipleLogsInBuffer", + Pattern: `^LOGEND.*$`, + Raw: []byte("LOGPART log1\nLOGEND\nLOGPART log1\t \n"), + ExpectedTokenized: []string{ + // We expect to get all logs except last one which will be returned eventually in next scanning + "LOGPART log1\nLOGEND", + }, + Flusher: &Flusher{ + forcePeriod: time.Second, + lastDataChange: time.Unix(0, 0), + // Assume this is next iteration with that data + previousDataLength: len("LOGPART log1\nLOGEND\nLOGPART log1\t \n"), + }, + }, + { + Name: "LogsWithFlusherWithLogStartingWithWhiteChars", + Pattern: `LOGEND \d+$`, + Raw: []byte("\nLOGEND 333"), + ExpectedTokenized: []string{ + "LOGEND 333", + }, + Flusher: &Flusher{ + forcePeriod: time.Second, + lastDataChange: time.Unix(0, 0), + previousDataLength: -1, }, }, } @@ -392,24 +445,21 @@ func TestNewlineSplitFunc(t *testing.T) { }, { Name: "LogsWithoutFlusher", - Pattern: `^LOGEND.*$`, Raw: []byte("LOGPART log1"), ExpectedTokenized: []string{}, - Flusher: &Flusher{ - force: false, - lastForcedFlush: time.Now(), - }, + Flusher: &Flusher{}, }, { - Name: "LogsWithFlusher", - Pattern: `^LOGEND.*$`, - Raw: []byte("LOGPART log1"), + Name: "LogsWithFlusher", + Raw: []byte("LOGPART log1"), ExpectedTokenized: []string{ "LOGPART log1", }, Flusher: &Flusher{ - force: true, - lastForcedFlush: time.Now(), + // Assume same data length in previous iteration + previousDataLength: len("LOGPART log1"), + lastDataChange: time.Unix(0, 0), + forcePeriod: time.Second, }, }, { @@ -419,16 +469,22 @@ func TestNewlineSplitFunc(t *testing.T) { "log1", "log2", }, - Flusher: &Flusher{ - force: true, - lastForcedFlush: time.Now(), + }, + { + Name: "LogsWithLogStartingWithWhiteChars", + Raw: []byte("\nLOGEND 333\nAnother one"), + ExpectedTokenized: []string{ + "LOGEND 333", }, }, } for _, tc := range testCases { - splitFunc, err := NewNewlineSplitFunc(unicode.UTF8, false, tc.Flusher) + splitFunc, err := NewNewlineSplitFunc(unicode.UTF8, false) require.NoError(t, err) + if tc.Flusher != nil { + splitFunc = tc.Flusher.SplitFunc(splitFunc) + } t.Run(tc.Name, tc.RunFunc(splitFunc)) } } @@ -584,11 +640,20 @@ func TestNewlineSplitFunc_Encodings(t *testing.T) { {0, 108, 0, 111, 0, 103, 0, 50}, // log2 }, }, + { + "MultiCarriageReturnUTF16StartingWithWhiteChars", + unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM), + []byte{0, 13, 0, 10, 0, 108, 0, 111, 0, 103, 0, 49, 0, 13, 0, 10, 0, 108, 0, 111, 0, 103, 0, 50, 0, 13, 0, 10}, // \r\nlog1\r\nlog2\r\n + [][]byte{ + {0, 108, 0, 111, 0, 103, 0, 49}, // log1 + {0, 108, 0, 111, 0, 103, 0, 50}, // log2 + }, + }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - splitFunc, err := NewNewlineSplitFunc(tc.encoding, false, nil) + splitFunc, err := NewNewlineSplitFunc(tc.encoding, false) require.NoError(t, err) scanner := bufio.NewScanner(bytes.NewReader(tc.input)) scanner.Split(splitFunc) diff --git a/operator/input/file/reader.go b/operator/input/file/reader.go index efaf60a4cd52..5696c1595b88 100644 --- a/operator/input/file/reader.go +++ b/operator/input/file/reader.go @@ -136,13 +136,8 @@ func (r *Reader) ReadToEnd(ctx context.Context) { if err := getScannerError(scanner); err != nil { r.Errorw("Failed during scan", zap.Error(err)) } - - // Force flush eventually in next iteration - r.splitter.CheckAndFlush() break } - // Update information about last flush time - r.splitter.Flushed() if err := r.emit(ctx, scanner.Bytes()); err != nil { r.Error("Failed to emit entry", zap.Error(err))