Skip to content

Commit

Permalink
fix(helpers/multiline): fix force flushing with multiline (#434)
Browse files Browse the repository at this point in the history
* feat(helpers/multiline): add test for flusher if multiple logs in buffer

Signed-off-by: Dominik Rosiek <[email protected]>

* refactor(helpers/multiline): refactor flusher due to bug

Signed-off-by: Dominik Rosiek <[email protected]>

* feat(helpers/multiline): add more tests and fixes for multiline

Signed-off-by: Dominik Rosiek <[email protected]>

* feat(helpers/multiline): add more tests and fixes for multiline

Signed-off-by: Dominik Rosiek <[email protected]>

* refactor(helpers/multiline): add flushed function to forceFlusher

Signed-off-by: Dominik Rosiek <[email protected]>

* refactor(helpers/multiline): wrap splitFuncs with force flusher splitFunc to simplify logic

Signed-off-by: Dominik Rosiek <[email protected]>

* tests(helpers/multiline): unify force flusher starting data length

Signed-off-by: Dominik Rosiek <[email protected]>

* feat(helpers/multiline): do not flush empty log

Signed-off-by: Dominik Rosiek <[email protected]>

* tests(helpers/multiline): do not overengineer tests

Signed-off-by: Dominik Rosiek <[email protected]>

* feat(helpers/multiline): remove enable force flushing

Signed-off-by: Dominik Rosiek <[email protected]>

* refactor(helpers/multiline): remove NewFlusher function

Signed-off-by: Dominik Rosiek <[email protected]>

* refactor(helpers/multiline): flatter flusherSplitFunc

Signed-off-by: Dominik Rosiek <[email protected]>

* refactor(helpers/multiline): extract force flusher related function from flusherSplitFunc

Signed-off-by: Dominik Rosiek <[email protected]>

* refactor(helpers/multiline): remove splitFuncWrapper

Signed-off-by: Dominik Rosiek <[email protected]>

* tests(helpers/multiline): do not return empty log for NewNewlineSplitFunc

Signed-off-by: Dominik Rosiek <[email protected]>

* chore(helpers/multiline): add comment

Signed-off-by: Dominik Rosiek <[email protected]>

* refactor(helpers/multiline): revert not necessary needed change

Signed-off-by: Dominik Rosiek <[email protected]>
  • Loading branch information
sumo-drosiek authored Mar 22, 2022
1 parent 8154878 commit 6bc7c94
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 108 deletions.
167 changes: 92 additions & 75 deletions operator/helper/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,60 +39,80 @@ func NewFlusherConfig() FlusherConfig {

// Build creates Flusher from configuration
func (c *FlusherConfig) Build() *Flusher {
return NewFlusher(c.Period)
return &Flusher{
lastDataChange: time.Now(),
forcePeriod: c.Period.Raw(),
previousDataLength: 0,
}
}

// Flusher keeps information about flush state
type Flusher struct {
// force is true when data should be flushed as soon as possible
force bool
// forcePeriod defines time from last flush which should pass before setting force to true.
// Never forces if forcePeriod is set to 0
forcePeriod time.Duration

// lastFlush > lastForcedFlush => we can force flush if no logs are incoming for forcePeriod
// lastFlush = lastForcedFlush => last flush was forced, so we cannot force, we can update lastFlush
// lastFlush < lastForcedFlush => we just forced flush, set lastFlush to lastForcedFlush
lastFlush time.Time
lastForcedFlush time.Time
}
// lastDataChange tracks date of last data change (including new data and flushes)
lastDataChange time.Time

// NewFlusher Creates new Flusher with lastFlush set to unix epoch
// and order to not force ongoing flush
func NewFlusher(forcePeriod Duration) *Flusher {
return &Flusher{
force: false,
lastFlush: time.Now(),
forcePeriod: forcePeriod.Raw(),
lastForcedFlush: time.Unix(0, 0),
}
// previousDataLength:
// if previousDataLength = 0 - no new data have been received after flush
// if previousDataLength > 0 - there is data which has not been flushed yet and it doesn't changed since lastDataChange
previousDataLength int
}

// Flushed update lastFlush with current timestamp
func (f *Flusher) Flushed() {
if f.lastFlush.Sub(f.lastForcedFlush) < 0 {
f.lastFlush = f.lastForcedFlush
} else {
f.lastFlush = time.Now()
func (f *Flusher) UpdateDataChangeTime(length int) {
// Skip if length is greater than 0 and didn't changed
if length > 0 && length == f.previousDataLength {
return
}
}

// CheckAndFlush sets internal flag to true if data is going to be force flushed
func (f *Flusher) CheckAndFlush() {
if f.forcePeriod > 0 && time.Since(f.lastFlush) > f.forcePeriod && f.lastFlush.Sub(f.lastForcedFlush) > 0 {
f.force = true
}
// update internal properties with new values if data length changed
// because it means that data is flowing and being processed
f.previousDataLength = length
f.lastDataChange = time.Now()
}

// ForceFlushed update struct fields after forced flush
func (f *Flusher) Flush() {
f.force = false
f.lastForcedFlush = time.Now()
// Flushed reset data length
func (f *Flusher) Flushed() {
f.UpdateDataChangeTime(0)
}

// ShouldFlush returns true if data should be forcefully flushed
func (f *Flusher) ShouldFlush() bool {
return f.force
// Returns true if there is f.forcePeriod after f.lastDataChange and data length is greater than 0
return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0
}

func (f *Flusher) SplitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
advance, token, err = splitFunc(data, atEOF)

// Return as it is in case of error
if err != nil {
return
}

// Return token
if token != nil {
// Inform flusher that we just flushed
f.Flushed()
return
}

// If there is no token, force flush eventually
if f.ShouldFlush() {
// Inform flusher that we just flushed
f.Flushed()
token = trimWhitespaces(data)
advance = len(data)
return
}

// Inform flusher that we didn't flushed
f.UpdateDataChangeTime(len(data))
return
}
}

// Multiline consists of splitFunc and variables needed to perform force flush
Expand Down Expand Up @@ -125,6 +145,11 @@ func (c MultilineConfig) getSplitFunc(encodingVar encoding.Encoding, flushAtEOF
endPattern := c.LineEndPattern
startPattern := c.LineStartPattern

var (
splitFunc bufio.SplitFunc
err error
)

switch {
case endPattern != "" && startPattern != "":
return nil, fmt.Errorf("only one of line_start_pattern or line_end_pattern can be set")
Expand All @@ -133,35 +158,38 @@ func (c MultilineConfig) getSplitFunc(encodingVar encoding.Encoding, flushAtEOF
case encodingVar == encoding.Nop:
return SplitNone(maxLogSize), nil
case endPattern == "" && startPattern == "":
return NewNewlineSplitFunc(encodingVar, flushAtEOF, force)
splitFunc, err = NewNewlineSplitFunc(encodingVar, flushAtEOF)

if err != nil {
return nil, err
}
case endPattern != "":
re, err := regexp.Compile("(?m)" + c.LineEndPattern)
if err != nil {
return nil, fmt.Errorf("compile line end regex: %s", err)
}
return NewLineEndSplitFunc(re, flushAtEOF, force), nil
splitFunc = NewLineEndSplitFunc(re, flushAtEOF)
case startPattern != "":
re, err := regexp.Compile("(?m)" + c.LineStartPattern)
if err != nil {
return nil, fmt.Errorf("compile line start regex: %s", err)
}
return NewLineStartSplitFunc(re, flushAtEOF, force), nil
splitFunc = NewLineStartSplitFunc(re, flushAtEOF)
default:
return nil, fmt.Errorf("unreachable")
}

if force != nil {
return force.SplitFunc(splitFunc), nil
}

return splitFunc, nil
}

// NewLineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into
// tokens that start with a match to the regex pattern provided
func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *Flusher) bufio.SplitFunc {
func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if force != nil && force.ShouldFlush() {
force.Flush()
token = trimWhitespaces(data)
advance = len(data)
return
}

firstLoc := re.FindIndex(data)
if firstLoc == nil {
// Flush if no more data is expected
Expand All @@ -179,7 +207,11 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *Flusher) b
// the beginning of the file does not match the start pattern, so return a token up to the first match so we don't lose data
advance = firstMatchStart
token = trimWhitespaces(data[0:firstMatchStart])
return

// return if non-matching pattern is not only whitespaces
if token != nil {
return
}
}

if firstMatchEnd == len(data) {
Expand Down Expand Up @@ -228,14 +260,8 @@ func SplitNone(maxLogSize int) bufio.SplitFunc {

// NewLineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into
// tokens that end with a match to the regex pattern provided
func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *Flusher) bufio.SplitFunc {
func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if force != nil && force.ShouldFlush() {
force.Flush()
token = trimWhitespaces(data)
advance = len(data)
return
}
loc := re.FindIndex(data)
if loc == nil {
// Flush if no more data is expected
Expand All @@ -262,7 +288,7 @@ func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *Flusher) buf

// NewNewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but
// never returning an token using EOF as a terminator
func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *Flusher) (bufio.SplitFunc, error) {
func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) {
newline, err := encodedNewline(encoding)
if err != nil {
return nil, err
Expand All @@ -280,18 +306,15 @@ func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *Flu

if i := bytes.Index(data, newline); i >= 0 {
// We have a full newline-terminated line.
return i + len(newline), bytes.TrimSuffix(data[:i], carriageReturn), nil
token = bytes.TrimSuffix(data[:i], carriageReturn)

return i + len(newline), trimWhitespaces(token), nil
}

// Flush if no more data is expected or if
// we don't want to wait for it
forceFlush := force != nil && force.ShouldFlush()
if atEOF && (flushAtEOF || forceFlush) {
// Flush if no more data is expected
if atEOF && flushAtEOF {
token = trimWhitespaces(data)
advance = len(data)
if forceFlush {
force.Flushed()
}
return
}

Expand All @@ -316,7 +339,12 @@ func trimWhitespaces(data []byte) []byte {
// TrimLeft to strip EOF whitespaces in case of using $ in regex
// For some reason newline and carriage return are being moved to beginning of next log
// TrimRight to strip all whitespaces from the end of log
return bytes.TrimLeft(bytes.TrimRight(data, "\r\n\t "), "\r\n")
// returns nil if log is empty
token := bytes.TrimLeft(bytes.TrimRight(data, "\r\n\t "), "\r\n")
if len(token) == 0 {
return nil
}
return token
}

// SplitterConfig consolidates MultilineConfig and FlusherConfig
Expand Down Expand Up @@ -353,14 +381,3 @@ type Splitter struct {
SplitFunc bufio.SplitFunc
Flusher *Flusher
}

// Flushed informs Flusher that Flushed had been performed
func (s *Splitter) Flushed() {
s.Flusher.Flushed()
}

// CheckAndFlush instructs Flusher to check if next log should be forcefully flushed
// and set appropriate flags if yes
func (s *Splitter) CheckAndFlush() {
s.Flusher.CheckAndFlush()
}
Loading

0 comments on commit 6bc7c94

Please sign in to comment.