Skip to content

Commit

Permalink
Fix bug where splitting could be skipped (#303)
Browse files Browse the repository at this point in the history
When force_flush_period is set to a non-zero value,
a forced flushing operation would disregard the possibility
that the data being flushed may contain multiple entries.

For example, the file_input operator could flush the remainder
of a file that it was consuming. If multiple lines were written
to that file, they would be emitted as a single log entry.
  • Loading branch information
djaglowski authored Nov 15, 2021
1 parent fb2be78 commit 9c01d75
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
21 changes: 10 additions & 11 deletions operator/helper/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (f *Flusher) CheckAndFlush() {
}

// ForceFlushed update struct fields after forced flush
func (f *Flusher) ForceFlushed() {
func (f *Flusher) Flush() {
f.force = false
f.lastForcedFlush = time.Now()
}
Expand Down Expand Up @@ -156,7 +156,7 @@ func (c MultilineConfig) getSplitFunc(encodingVar encoding.Encoding, flushAtEOF
func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *Flusher) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if force != nil && force.ShouldFlush() {
force.ForceFlushed()
force.Flush()
token = trimWhitespaces(data)
advance = len(data)
return
Expand Down Expand Up @@ -231,7 +231,7 @@ func SplitNone(maxLogSize int) bufio.SplitFunc {
func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *Flusher) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if force != nil && force.ShouldFlush() {
force.ForceFlushed()
force.Flush()
token = trimWhitespaces(data)
advance = len(data)
return
Expand Down Expand Up @@ -274,12 +274,6 @@ func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *Flu
}

return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if force != nil && force.ShouldFlush() {
force.ForceFlushed()
token = trimWhitespaces(data)
advance = len(data)
return
}
if atEOF && len(data) == 0 {
return 0, nil, nil
}
Expand All @@ -289,10 +283,15 @@ func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *Flu
return i + len(newline), bytes.TrimSuffix(data[:i], carriageReturn), nil
}

// Flush if no more data is expected
if atEOF && flushAtEOF {
// Flush if no more data is expected or if
// we don't want to wait for it
forceFlush := force != nil && force.ShouldFlush()
if atEOF && (flushAtEOF || forceFlush) {
token = trimWhitespaces(data)
advance = len(data)
if forceFlush {
force.Flushed()
}
return
}

Expand Down
12 changes: 12 additions & 0 deletions operator/helper/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,18 @@ func TestNewlineSplitFunc(t *testing.T) {
lastForcedFlush: time.Now(),
},
},
{
Name: "DefaultFlusherSplits",
Raw: []byte("log1\nlog2\n"),
ExpectedTokenized: []string{
"log1",
"log2",
},
Flusher: &Flusher{
force: true,
lastForcedFlush: time.Now(),
},
},
}

for _, tc := range testCases {
Expand Down

0 comments on commit 9c01d75

Please sign in to comment.