From 644475a4b0eeca378f905d3f0fb6276b3b966bae Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Fri, 9 Jul 2021 14:22:01 +0200 Subject: [PATCH 01/15] feat: introduce forceFlush Signed-off-by: Dominik Rosiek --- operator/builtin/input/file/config.go | 2 +- operator/builtin/input/tcp/tcp.go | 2 +- operator/builtin/input/udp/udp.go | 2 +- operator/helper/multiline.go | 30 +++++++++++++++++---------- operator/helper/multiline_test.go | 10 ++++----- 5 files changed, 27 insertions(+), 19 deletions(-) diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index 23d27604..148cff9a 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -117,7 +117,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, return nil, err } - splitFunc, err := c.Multiline.Build(context, encoding.Encoding, false) + splitFunc, err := c.Multiline.Build(encoding.Encoding, false, helper.NewForceFlush()) if err != nil { return nil, err } diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index 952c29c7..0ad5876e 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -98,7 +98,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato return nil, err } - splitFunc, err := c.Multiline.Build(context, encoding.Encoding, true) + splitFunc, err := c.Multiline.Build(encoding.Encoding, true, helper.NewForceFlush()) if err != nil { return nil, err } diff --git a/operator/builtin/input/udp/udp.go b/operator/builtin/input/udp/udp.go index 392355d7..8f8fc0e2 100644 --- a/operator/builtin/input/udp/udp.go +++ b/operator/builtin/input/udp/udp.go @@ -81,7 +81,7 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato return nil, err } - splitFunc, err := c.Multiline.Build(context, encoding.Encoding, true) + splitFunc, err := c.Multiline.Build(encoding.Encoding, true, helper.NewForceFlush()) if err != nil { return nil, err } diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index 7aed4e2a..9e904219 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -21,10 +21,18 @@ import ( "regexp" "golang.org/x/text/encoding" - - "github.com/open-telemetry/opentelemetry-log-collection/operator" ) +type ForceFlush struct { + Force bool +} + +func NewForceFlush() *ForceFlush { + return &ForceFlush{ + Force: false, + } +} + // NewBasicConfig creates a new Multiline config func NewMultilineConfig() MultilineConfig { return MultilineConfig{ @@ -40,12 +48,12 @@ type MultilineConfig struct { } // Build will build a Multiline operator. -func (c MultilineConfig) Build(context operator.BuildContext, encoding encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) { - return c.getSplitFunc(encoding, flushAtEOF) +func (c MultilineConfig) Build(encoding encoding.Encoding, flushAtEOF bool, force *ForceFlush) (bufio.SplitFunc, error) { + return c.getSplitFunc(encoding, flushAtEOF, force) } // getSplitFunc returns split function for bufio.Scanner basing on configured pattern -func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) { +func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *ForceFlush) (bufio.SplitFunc, error) { endPattern := c.LineEndPattern startPattern := c.LineStartPattern @@ -53,19 +61,19 @@ func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding, flushAtEOF boo case endPattern != "" && startPattern != "": return nil, fmt.Errorf("only one of line_start_pattern or line_end_pattern can be set") case endPattern == "" && startPattern == "": - return NewNewlineSplitFunc(encoding, flushAtEOF) + return NewNewlineSplitFunc(encoding, flushAtEOF, force) case endPattern != "": re, err := regexp.Compile("(?m)" + c.LineEndPattern) if err != nil { return nil, fmt.Errorf("compile line end regex: %s", err) } - return NewLineEndSplitFunc(re, flushAtEOF), nil + return NewLineEndSplitFunc(re, flushAtEOF, force), nil case startPattern != "": re, err := regexp.Compile("(?m)" + c.LineStartPattern) if err != nil { return nil, fmt.Errorf("compile line start regex: %s", err) } - return NewLineStartSplitFunc(re, flushAtEOF), nil + return NewLineStartSplitFunc(re, flushAtEOF, force), nil default: return nil, fmt.Errorf("unreachable") } @@ -73,7 +81,7 @@ func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding, flushAtEOF boo // NewLineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that start with a match to the regex pattern provided -func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { +func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *ForceFlush) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { firstLoc := re.FindIndex(data) if firstLoc == nil { @@ -123,7 +131,7 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { // NewLineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that end with a match to the regex pattern provided -func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { +func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *ForceFlush) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { loc := re.FindIndex(data) if loc == nil { @@ -151,7 +159,7 @@ func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { // NewNewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but // never returning an token using EOF as a terminator -func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) { +func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *ForceFlush) (bufio.SplitFunc, error) { newline, err := encodedNewline(encoding) if err != nil { return nil, err diff --git a/operator/helper/multiline_test.go b/operator/helper/multiline_test.go index b8364587..21c1d637 100644 --- a/operator/helper/multiline_test.go +++ b/operator/helper/multiline_test.go @@ -149,13 +149,13 @@ func TestLineStartSplitFunc(t *testing.T) { cfg := &MultilineConfig{ LineStartPattern: tc.Pattern, } - splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false) + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, NewForceFlush()) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } t.Run("FirstMatchHitsEndOfBuffer", func(t *testing.T) { - splitFunc := NewLineStartSplitFunc(regexp.MustCompile("LOGSTART"), false) + splitFunc := NewLineStartSplitFunc(regexp.MustCompile("LOGSTART"), false, NewForceFlush()) data := []byte(`LOGSTART`) t.Run("NotAtEOF", func(t *testing.T) { @@ -266,7 +266,7 @@ func TestLineEndSplitFunc(t *testing.T) { cfg := &MultilineConfig{ LineEndPattern: tc.Pattern, } - splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false) + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, NewForceFlush()) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } @@ -344,7 +344,7 @@ func TestNewlineSplitFunc(t *testing.T) { } for _, tc := range testCases { - splitFunc, err := NewNewlineSplitFunc(unicode.UTF8, false) + splitFunc, err := NewNewlineSplitFunc(unicode.UTF8, false, NewForceFlush()) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } @@ -397,7 +397,7 @@ func TestNewlineSplitFunc_Encodings(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - splitFunc, err := NewNewlineSplitFunc(tc.encoding, false) + splitFunc, err := NewNewlineSplitFunc(tc.encoding, false, NewForceFlush()) require.NoError(t, err) scanner := bufio.NewScanner(bytes.NewReader(tc.input)) scanner.Split(splitFunc) From e2399adf0fc205952fa0dbf2d5a94c4dae8e294e Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Fri, 9 Jul 2021 15:59:48 +0200 Subject: [PATCH 02/15] feat: use different instances of splitFunc per reader Signed-off-by: Dominik Rosiek --- operator/builtin/input/file/config.go | 1 + operator/builtin/input/file/file.go | 9 +++++++-- operator/builtin/input/file/file_test.go | 4 ++-- operator/builtin/input/file/reader.go | 9 ++++++--- 4 files changed, 16 insertions(+), 7 deletions(-) diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index 148cff9a..7017006d 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -157,6 +157,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, Include: c.Include, Exclude: c.Exclude, SplitFunc: splitFunc, + Multiline: c.Multiline, PollInterval: c.PollInterval.Raw(), FilePathField: filePathField, FileNameField: fileNameField, diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index b9686510..e99b4885 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -45,6 +45,7 @@ type InputOperator struct { FileNameResolvedField entry.Field PollInterval time.Duration SplitFunc bufio.SplitFunc + Multiline helper.MultilineConfig MaxLogSize int MaxConcurrentFiles int SeenPaths map[string]struct{} @@ -312,6 +313,8 @@ func (f *InputOperator) saveCurrent(readers []*Reader) { } func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck bool) (*Reader, error) { + force := helper.NewForceFlush() + splitFunc, _ := f.Multiline.Build(f.encoding.Encoding, false, force) // Check if the new path has the same fingerprint as an old path if oldReader, ok := f.findFingerprintMatch(fp); ok { newReader, err := oldReader.Copy(file) @@ -323,7 +326,7 @@ func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck boo } // If we don't match any previously known files, create a new reader from scratch - newReader, err := f.NewReader(file.Name(), file, fp) + newReader, err := f.NewReader(file.Name(), file, fp, splitFunc) if err != nil { return nil, err } @@ -372,6 +375,8 @@ func (f *InputOperator) syncLastPollFiles(ctx context.Context) { // syncLastPollFiles loads the most recent set of files to the database func (f *InputOperator) loadLastPollFiles(ctx context.Context) error { + force := helper.NewForceFlush() + splitFunc, _ := f.Multiline.Build(f.encoding.Encoding, false, force) encoded, err := f.persister.Get(ctx, knownFilesKey) if err != nil { return err @@ -393,7 +398,7 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error { // Decode each of the known files f.knownFiles = make([]*Reader, 0, knownFileCount) for i := 0; i < knownFileCount; i++ { - newReader, err := f.NewReader("", nil, nil) + newReader, err := f.NewReader("", nil, nil, splitFunc) if err != nil { return err } diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index 6e0e48dc..a8eb076f 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -625,7 +625,7 @@ func TestFileReader_FingerprintUpdated(t *testing.T) { tempCopy := openFile(t, temp.Name()) fp, err := operator.NewFingerprint(temp) require.NoError(t, err) - reader, err := operator.NewReader(temp.Name(), tempCopy, fp) + reader, err := operator.NewReader(temp.Name(), tempCopy, fp, operator.SplitFunc) require.NoError(t, err) defer reader.Close() @@ -666,7 +666,7 @@ func TestFingerprintGrowsAndStops(t *testing.T) { require.NoError(t, err) require.Equal(t, []byte(""), fp.FirstBytes) - reader, err := operator.NewReader(temp.Name(), tempCopy, fp) + reader, err := operator.NewReader(temp.Name(), tempCopy, fp, operator.SplitFunc) require.NoError(t, err) defer reader.Close() diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index 90769df8..c6b37fd3 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -70,11 +70,13 @@ type Reader struct { decoder *encoding.Decoder decodeBuffer []byte + splitFunc bufio.SplitFunc + *zap.SugaredLogger `json:"-"` } // NewReader creates a new file reader -func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint) (*Reader, error) { +func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint, splitFunc bufio.SplitFunc) (*Reader, error) { r := &Reader{ Fingerprint: fp, file: file, @@ -83,13 +85,14 @@ func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint) ( decoder: f.encoding.Encoding.NewDecoder(), decodeBuffer: make([]byte, 1<<12), fileAttributes: f.resolveFileAttributes(path), + splitFunc: splitFunc, } return r, nil } // Copy creates a deep copy of a Reader func (r *Reader) Copy(file *os.File) (*Reader, error) { - reader, err := r.fileInput.NewReader(r.fileAttributes.Path, file, r.Fingerprint.Copy()) + reader, err := r.fileInput.NewReader(r.fileAttributes.Path, file, r.Fingerprint.Copy(), r.splitFunc) if err != nil { return nil, err } @@ -116,7 +119,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) { return } - scanner := NewPositionalScanner(r, r.fileInput.MaxLogSize, r.Offset, r.fileInput.SplitFunc) + scanner := NewPositionalScanner(r, r.fileInput.MaxLogSize, r.Offset, r.splitFunc) // Iterate over the tokenized file, emitting entries as we go for { From b9ece3664137e94ad2670d0cf2fa46ed0a3298b1 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Fri, 9 Jul 2021 16:26:46 +0200 Subject: [PATCH 03/15] feat: introduce multiline struct Signed-off-by: Dominik Rosiek --- operator/builtin/input/file/config.go | 4 ++-- operator/builtin/input/file/file.go | 22 ++++++++++++++-------- operator/builtin/input/file/file_test.go | 11 +++++++++-- operator/builtin/input/file/reader.go | 11 ++++++----- operator/builtin/input/tcp/tcp.go | 4 ++-- operator/builtin/input/udp/udp.go | 4 ++-- operator/helper/multiline.go | 18 ++++++++++++++++-- 7 files changed, 51 insertions(+), 23 deletions(-) diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index 7017006d..71a295d3 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -117,7 +117,8 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, return nil, err } - splitFunc, err := c.Multiline.Build(encoding.Encoding, false, helper.NewForceFlush()) + // Ensure that multiline is buildable + _, err = c.Multiline.Build(encoding.Encoding, false) if err != nil { return nil, err } @@ -156,7 +157,6 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, InputOperator: inputOperator, Include: c.Include, Exclude: c.Exclude, - SplitFunc: splitFunc, Multiline: c.Multiline, PollInterval: c.PollInterval.Raw(), FilePathField: filePathField, diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index e99b4885..c043d155 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -15,7 +15,6 @@ package file import ( - "bufio" "bytes" "context" "encoding/json" @@ -44,7 +43,6 @@ type InputOperator struct { FilePathResolvedField entry.Field FileNameResolvedField entry.Field PollInterval time.Duration - SplitFunc bufio.SplitFunc Multiline helper.MultilineConfig MaxLogSize int MaxConcurrentFiles int @@ -313,8 +311,6 @@ func (f *InputOperator) saveCurrent(readers []*Reader) { } func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck bool) (*Reader, error) { - force := helper.NewForceFlush() - splitFunc, _ := f.Multiline.Build(f.encoding.Encoding, false, force) // Check if the new path has the same fingerprint as an old path if oldReader, ok := f.findFingerprintMatch(fp); ok { newReader, err := oldReader.Copy(file) @@ -326,7 +322,11 @@ func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck boo } // If we don't match any previously known files, create a new reader from scratch - newReader, err := f.NewReader(file.Name(), file, fp, splitFunc) + multiline, err := f.getMultiline() + if err != nil { + return nil, err + } + newReader, err := f.NewReader(file.Name(), file, fp, multiline) if err != nil { return nil, err } @@ -375,8 +375,6 @@ func (f *InputOperator) syncLastPollFiles(ctx context.Context) { // syncLastPollFiles loads the most recent set of files to the database func (f *InputOperator) loadLastPollFiles(ctx context.Context) error { - force := helper.NewForceFlush() - splitFunc, _ := f.Multiline.Build(f.encoding.Encoding, false, force) encoded, err := f.persister.Get(ctx, knownFilesKey) if err != nil { return err @@ -398,7 +396,11 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error { // Decode each of the known files f.knownFiles = make([]*Reader, 0, knownFileCount) for i := 0; i < knownFileCount; i++ { - newReader, err := f.NewReader("", nil, nil, splitFunc) + multiline, err := f.getMultiline() + if err != nil { + return err + } + newReader, err := f.NewReader("", nil, nil, multiline) if err != nil { return err } @@ -410,3 +412,7 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error { return nil } + +func (f *InputOperator) getMultiline() (*helper.Multiline, error) { + return f.Multiline.Build(f.encoding.Encoding, false) +} diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index a8eb076f..c75878d1 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -625,7 +625,11 @@ func TestFileReader_FingerprintUpdated(t *testing.T) { tempCopy := openFile(t, temp.Name()) fp, err := operator.NewFingerprint(temp) require.NoError(t, err) - reader, err := operator.NewReader(temp.Name(), tempCopy, fp, operator.SplitFunc) + + multiline, err := operator.getMultiline() + require.NoError(t, err) + + reader, err := operator.NewReader(temp.Name(), tempCopy, fp, multiline) require.NoError(t, err) defer reader.Close() @@ -666,7 +670,10 @@ func TestFingerprintGrowsAndStops(t *testing.T) { require.NoError(t, err) require.Equal(t, []byte(""), fp.FirstBytes) - reader, err := operator.NewReader(temp.Name(), tempCopy, fp, operator.SplitFunc) + multiline, err := operator.getMultiline() + require.NoError(t, err) + + reader, err := operator.NewReader(temp.Name(), tempCopy, fp, multiline) require.NoError(t, err) defer reader.Close() diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index c6b37fd3..6f9f5bc5 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -26,6 +26,7 @@ import ( "golang.org/x/text/transform" "github.com/open-telemetry/opentelemetry-log-collection/errors" + "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" ) // File attributes contains information about file paths @@ -70,13 +71,13 @@ type Reader struct { decoder *encoding.Decoder decodeBuffer []byte - splitFunc bufio.SplitFunc + multiline *helper.Multiline *zap.SugaredLogger `json:"-"` } // NewReader creates a new file reader -func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint, splitFunc bufio.SplitFunc) (*Reader, error) { +func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint, multiline *helper.Multiline) (*Reader, error) { r := &Reader{ Fingerprint: fp, file: file, @@ -85,14 +86,14 @@ func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint, s decoder: f.encoding.Encoding.NewDecoder(), decodeBuffer: make([]byte, 1<<12), fileAttributes: f.resolveFileAttributes(path), - splitFunc: splitFunc, + multiline: multiline, } return r, nil } // Copy creates a deep copy of a Reader func (r *Reader) Copy(file *os.File) (*Reader, error) { - reader, err := r.fileInput.NewReader(r.fileAttributes.Path, file, r.Fingerprint.Copy(), r.splitFunc) + reader, err := r.fileInput.NewReader(r.fileAttributes.Path, file, r.Fingerprint.Copy(), r.multiline) if err != nil { return nil, err } @@ -119,7 +120,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) { return } - scanner := NewPositionalScanner(r, r.fileInput.MaxLogSize, r.Offset, r.splitFunc) + scanner := NewPositionalScanner(r, r.fileInput.MaxLogSize, r.Offset, r.multiline.SplitFunc) // Iterate over the tokenized file, emitting entries as we go for { diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index 0ad5876e..b32d5f4f 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -98,7 +98,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato return nil, err } - splitFunc, err := c.Multiline.Build(encoding.Encoding, true, helper.NewForceFlush()) + multiline, err := c.Multiline.Build(encoding.Encoding, true) if err != nil { return nil, err } @@ -114,7 +114,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato MaxLogSize: int(c.MaxLogSize), addAttributes: c.AddAttributes, encoding: encoding, - splitFunc: splitFunc, + splitFunc: multiline.SplitFunc, backoff: backoff.Backoff{ Max: 3 * time.Second, }, diff --git a/operator/builtin/input/udp/udp.go b/operator/builtin/input/udp/udp.go index 8f8fc0e2..f5709e7e 100644 --- a/operator/builtin/input/udp/udp.go +++ b/operator/builtin/input/udp/udp.go @@ -81,7 +81,7 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato return nil, err } - splitFunc, err := c.Multiline.Build(encoding.Encoding, true, helper.NewForceFlush()) + multiline, err := c.Multiline.Build(encoding.Encoding, true) if err != nil { return nil, err } @@ -97,7 +97,7 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato buffer: make([]byte, MaxUDPSize), addAttributes: c.AddAttributes, encoding: encoding, - splitFunc: splitFunc, + splitFunc: multiline.SplitFunc, resolver: resolver, } return []operator.Operator{udpInput}, nil diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index 9e904219..8ad0a5a1 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -33,6 +33,11 @@ func NewForceFlush() *ForceFlush { } } +type Multiline struct { + SplitFunc bufio.SplitFunc + force *ForceFlush +} + // NewBasicConfig creates a new Multiline config func NewMultilineConfig() MultilineConfig { return MultilineConfig{ @@ -48,8 +53,17 @@ type MultilineConfig struct { } // Build will build a Multiline operator. -func (c MultilineConfig) Build(encoding encoding.Encoding, flushAtEOF bool, force *ForceFlush) (bufio.SplitFunc, error) { - return c.getSplitFunc(encoding, flushAtEOF, force) +func (c MultilineConfig) Build(encoding encoding.Encoding, flushAtEOF bool) (*Multiline, error) { + force := NewForceFlush() + splitFunc, err := c.getSplitFunc(encoding, flushAtEOF, force) + if err != nil { + return nil, err + } + + return &Multiline{ + SplitFunc: splitFunc, + force: force, + }, nil } // getSplitFunc returns split function for bufio.Scanner basing on configured pattern From d16d0eeaf5d3d4394079e2e61591baa9c3fb832e Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Fri, 9 Jul 2021 17:37:16 +0200 Subject: [PATCH 04/15] feat: force flushing logs after configured period of time --- operator/builtin/input/file/reader.go | 5 ++ operator/helper/multiline.go | 69 ++++++++++++++++++++++++--- 2 files changed, 68 insertions(+), 6 deletions(-) diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index 6f9f5bc5..342609eb 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -135,8 +135,13 @@ func (r *Reader) ReadToEnd(ctx context.Context) { if err := getScannerError(scanner); err != nil { r.Errorw("Failed during scan", zap.Error(err)) } + + // Force flush eventually in next iteration + r.multiline.CheckAndFlush() break } + // Update information about last flush time + r.multiline.Flushed() if err := r.emit(ctx, scanner.Bytes()); err != nil { r.Error("Failed to emit entry", zap.Error(err)) diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index 8ad0a5a1..2b59c934 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -19,23 +19,50 @@ import ( "bytes" "fmt" "regexp" + "time" "golang.org/x/text/encoding" ) type ForceFlush struct { - Force bool + Force bool + LastFlush time.Time } func NewForceFlush() *ForceFlush { return &ForceFlush{ - Force: false, + Force: false, + LastFlush: time.Now(), } } type Multiline struct { - SplitFunc bufio.SplitFunc - force *ForceFlush + SplitFunc bufio.SplitFunc + force *ForceFlush + forcePeriod time.Duration + + // lastFlush > force.LastFlush => we can force flush if no logs are incoming for forcePeriod + // lastFlush = force.LastFlush => last flush was forced, so we do cannot force, we can update lastFlush + // lastFlush < force.LastFlush =>we just forced flush, set lastFlush to force.LastFlush + lastFlush time.Time +} + +// Flushed update lastFlush with current timestamp +func (m *Multiline) Flushed() { + if m.lastFlush.Sub(m.force.LastFlush) < 0 { + m.lastFlush = m.force.LastFlush + } else { + m.lastFlush = time.Now() + } +} + +// CheckAndFlush returns true if data is going to be force flushed +func (m *Multiline) CheckAndFlush() { + if time.Since(m.lastFlush) > m.forcePeriod { + if m.lastFlush.Sub(m.force.LastFlush) > 0 { + m.force.Force = true + } + } } // NewBasicConfig creates a new Multiline config @@ -43,6 +70,7 @@ func NewMultilineConfig() MultilineConfig { return MultilineConfig{ LineStartPattern: "", LineEndPattern: "", + ForceFlushPeriod: "5s", } } @@ -50,6 +78,7 @@ func NewMultilineConfig() MultilineConfig { type MultilineConfig struct { LineStartPattern string `mapstructure:"line_start_pattern" json:"line_start_pattern" yaml:"line_start_pattern"` LineEndPattern string `mapstructure:"line_end_pattern" json:"line_end_pattern" yaml:"line_end_pattern"` + ForceFlushPeriod string `mapstructure:"force_flush_period" json:"force_flush_period" yaml:"force_flush_period"` } // Build will build a Multiline operator. @@ -60,9 +89,15 @@ func (c MultilineConfig) Build(encoding encoding.Encoding, flushAtEOF bool) (*Mu return nil, err } + duration, err := time.ParseDuration(c.ForceFlushPeriod) + if err != nil { + return nil, err + } + return &Multiline{ - SplitFunc: splitFunc, - force: force, + SplitFunc: splitFunc, + force: force, + forcePeriod: duration, }, nil } @@ -97,6 +132,14 @@ func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding, flushAtEOF boo // tokens that start with a match to the regex pattern provided func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *ForceFlush) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + if force.Force { + force.Force = false + force.LastFlush = time.Now() + token = trimWhitespaces(data) + advance = len(data) + return + } + firstLoc := re.FindIndex(data) if firstLoc == nil { // Flush if no more data is expected @@ -147,6 +190,13 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *ForceFlush // tokens that end with a match to the regex pattern provided func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *ForceFlush) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + if force.Force { + force.Force = false + force.LastFlush = time.Now() + token = trimWhitespaces(data) + advance = len(data) + return + } loc := re.FindIndex(data) if loc == nil { // Flush if no more data is expected @@ -185,6 +235,13 @@ func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *For } return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + if force.Force { + force.Force = false + force.LastFlush = time.Now() + token = trimWhitespaces(data) + advance = len(data) + return + } if atEOF && len(data) == 0 { return 0, nil, nil } From 5d3f482a7b45da06983e81344bd174339f5bdc3b Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Fri, 9 Jul 2021 17:39:29 +0200 Subject: [PATCH 05/15] feat: do not force flush if period is set to 0 Signed-off-by: Dominik Rosiek --- operator/helper/multiline.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index 2b59c934..100c1b2e 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -58,10 +58,8 @@ func (m *Multiline) Flushed() { // CheckAndFlush returns true if data is going to be force flushed func (m *Multiline) CheckAndFlush() { - if time.Since(m.lastFlush) > m.forcePeriod { - if m.lastFlush.Sub(m.force.LastFlush) > 0 { - m.force.Force = true - } + if m.forcePeriod > 0 && time.Since(m.lastFlush) > m.forcePeriod && m.lastFlush.Sub(m.force.LastFlush) > 0 { + m.force.Force = true } } @@ -70,7 +68,7 @@ func NewMultilineConfig() MultilineConfig { return MultilineConfig{ LineStartPattern: "", LineEndPattern: "", - ForceFlushPeriod: "5s", + ForceFlushPeriod: "0s", } } From 6b3c0b50a2ef6ea304c1916b70e0cc699543471a Mon Sep 17 00:00:00 2001 From: Dominik Rosiek <58699848+sumo-drosiek@users.noreply.github.com> Date: Mon, 12 Jul 2021 08:49:30 +0200 Subject: [PATCH 06/15] Update operator/helper/multiline.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Patryk Małek --- operator/helper/multiline.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index 100c1b2e..1745bc1f 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -42,8 +42,8 @@ type Multiline struct { forcePeriod time.Duration // lastFlush > force.LastFlush => we can force flush if no logs are incoming for forcePeriod - // lastFlush = force.LastFlush => last flush was forced, so we do cannot force, we can update lastFlush - // lastFlush < force.LastFlush =>we just forced flush, set lastFlush to force.LastFlush + // lastFlush = force.LastFlush => last flush was forced, so we cannot force, we can update lastFlush + // lastFlush < force.LastFlush => we just forced flush, set lastFlush to force.LastFlush lastFlush time.Time } From f64b0577a20949949fdcab564a54e76a848f66be Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Mon, 12 Jul 2021 09:31:14 +0200 Subject: [PATCH 07/15] fix(multiline): default empty flush period to 0s Signed-off-by: Dominik Rosiek --- operator/helper/multiline.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index 1745bc1f..8afb46b0 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -87,6 +87,10 @@ func (c MultilineConfig) Build(encoding encoding.Encoding, flushAtEOF bool) (*Mu return nil, err } + if c.ForceFlushPeriod == "" { + c.ForceFlushPeriod = "0s" + } + duration, err := time.ParseDuration(c.ForceFlushPeriod) if err != nil { return nil, err From 5682458e966369cb4e7fa7328d29179595de4e68 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Mon, 12 Jul 2021 09:32:15 +0200 Subject: [PATCH 08/15] docs(multiline): add incode comment for forceFLushPeriod defaults Signed-off-by: Dominik Rosiek --- operator/helper/multiline.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index 8afb46b0..24713ddc 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -68,6 +68,8 @@ func NewMultilineConfig() MultilineConfig { return MultilineConfig{ LineStartPattern: "", LineEndPattern: "", + + // Empty or `0s` means that we will never force flush ForceFlushPeriod: "0s", } } From c9bb6cd459a29f625a996fdca4f20e15164fb94a Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Mon, 12 Jul 2021 10:07:09 +0200 Subject: [PATCH 09/15] fix,docs(multiline): update default flushTime After this change there is an issue only with double following force flushes IMO if this occurs, there is issue with regexes configuration, because if two following flushes from application to file doesn't match the regex, something seems to be invalid Signed-off-by: Dominik Rosiek --- docs/operators/file_input.md | 7 ++++++- operator/builtin/input/file/file.go | 1 + operator/builtin/input/tcp/tcp.go | 1 + operator/builtin/input/udp/udp.go | 1 + operator/helper/multiline.go | 9 +++++++-- 5 files changed, 16 insertions(+), 3 deletions(-) diff --git a/docs/operators/file_input.md b/docs/operators/file_input.md index c4f56066..21f31c9b 100644 --- a/docs/operators/file_input.md +++ b/docs/operators/file_input.md @@ -37,7 +37,12 @@ If set, the `multiline` configuration block instructs the `file_input` operator The `multiline` configuration block must contain exactly one of `line_start_pattern` or `line_end_pattern`. These are regex patterns that match either the beginning of a new log entry, or the end of a log entry. -Also refer to [recombine](/docs/operators/recombine.md) operator for merging events with greater control. +If using multiline, last log can sometimes be not flushed due to waiting for more content. +In order to forcefully flush last buffered log after certain period of time, +set `force_flush_period` option to [duration string](https://golang.org/pkg/time/#ParseDuration), +eg: `5s`, `1m`. It's by default `0s` which means, that no force flushing will be performed. + +Also refer to [recombine](/docs/operators/recombine.md) operator for merging events with greater control. ### File rotation diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index c043d155..3b76046a 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -413,6 +413,7 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error { return nil } +// Build multiline using struct fields func (f *InputOperator) getMultiline() (*helper.Multiline, error) { return f.Multiline.Build(f.encoding.Encoding, false) } diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index b32d5f4f..1b18da3e 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -98,6 +98,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato return nil, err } + // Build multiline multiline, err := c.Multiline.Build(encoding.Encoding, true) if err != nil { return nil, err diff --git a/operator/builtin/input/udp/udp.go b/operator/builtin/input/udp/udp.go index f5709e7e..b40ee80c 100644 --- a/operator/builtin/input/udp/udp.go +++ b/operator/builtin/input/udp/udp.go @@ -81,6 +81,7 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato return nil, err } + // Build multiline multiline, err := c.Multiline.Build(encoding.Encoding, true) if err != nil { return nil, err diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index 24713ddc..146fc792 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -24,18 +24,22 @@ import ( "golang.org/x/text/encoding" ) +// ForceFlush keeps information about force flush state type ForceFlush struct { Force bool LastFlush time.Time } +// NewForceFlush Creates new ForceFlush with lastFlush set to unix epoch +// and order to not force ongoing flush func NewForceFlush() *ForceFlush { return &ForceFlush{ Force: false, - LastFlush: time.Now(), + LastFlush: time.Unix(0, 0), } } +// Multiline consists of splitFunc and variables needed to perform force flush type Multiline struct { SplitFunc bufio.SplitFunc force *ForceFlush @@ -56,7 +60,7 @@ func (m *Multiline) Flushed() { } } -// CheckAndFlush returns true if data is going to be force flushed +// CheckAndFlush sets internal flag to true if data is going to be force flushed func (m *Multiline) CheckAndFlush() { if m.forcePeriod > 0 && time.Since(m.lastFlush) > m.forcePeriod && m.lastFlush.Sub(m.force.LastFlush) > 0 { m.force.Force = true @@ -102,6 +106,7 @@ func (c MultilineConfig) Build(encoding encoding.Encoding, flushAtEOF bool) (*Mu SplitFunc: splitFunc, force: force, forcePeriod: duration, + lastFlush: time.Now(), }, nil } From a1bf062f6db7bbbd15759612eb79e0e72a38c131 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Mon, 12 Jul 2021 12:18:03 +0200 Subject: [PATCH 10/15] fix(file_input): fix and enable TestNoNewline Signed-off-by: Dominik Rosiek --- operator/builtin/input/file/file_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index c75878d1..d245fde7 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -296,8 +296,10 @@ func TestStartAtEndNewFile(t *testing.T) { // even if the file doesn't end in a newline func TestNoNewline(t *testing.T) { t.Parallel() - t.Skip() - operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *InputConfig) { + cfg.Multiline = helper.NewMultilineConfig() + cfg.Multiline.ForceFlushPeriod = "1ms" + }, nil) temp := openTemp(t, tempDir) writeString(t, temp, "testlog1\ntestlog2") From 1503414ae130b1a0be12a0285786972355637d8b Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Mon, 12 Jul 2021 13:18:40 +0200 Subject: [PATCH 11/15] tests(multiline): add tests for force flush Signed-off-by: Dominik Rosiek --- operator/helper/multiline_test.go | 83 +++++++++++++++++++++++++++++-- 1 file changed, 80 insertions(+), 3 deletions(-) diff --git a/operator/helper/multiline_test.go b/operator/helper/multiline_test.go index 21c1d637..1440140d 100644 --- a/operator/helper/multiline_test.go +++ b/operator/helper/multiline_test.go @@ -20,6 +20,7 @@ import ( "errors" "regexp" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -33,6 +34,7 @@ type tokenizerTestCase struct { Raw []byte ExpectedTokenized []string ExpectedError error + ForceFlush *ForceFlush } func (tc tokenizerTestCase) RunFunc(splitFunc bufio.SplitFunc) func(t *testing.T) { @@ -143,13 +145,38 @@ func TestLineStartSplitFunc(t *testing.T) { "LOGSTART 17 log2\nLOGPART log2\nanother line", }, }, + { + Name: "LogsWithoutForceFlush", + Pattern: `^LOGSTART \d+`, + Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), + ExpectedTokenized: []string{}, + ForceFlush: &ForceFlush{ + Force: false, + LastFlush: time.Now(), + }, + }, + { + Name: "LogsWithForceFlush", + Pattern: `^LOGSTART \d+`, + Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), + ExpectedTokenized: []string{ + "LOGPART log1\nLOGPART log1", + }, + ForceFlush: &ForceFlush{ + Force: true, + LastFlush: time.Now(), + }, + }, } for _, tc := range testCases { cfg := &MultilineConfig{ LineStartPattern: tc.Pattern, } - splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, NewForceFlush()) + if tc.ForceFlush == nil { + tc.ForceFlush = NewForceFlush() + } + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, tc.ForceFlush) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } @@ -260,13 +287,38 @@ func TestLineEndSplitFunc(t *testing.T) { "LOGSTART 17 log2\nLOGPART log2\nLOGEND log2", }, }, + { + Name: "LogsWithoutForceFlush", + Pattern: `^LOGEND.*$`, + Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), + ExpectedTokenized: []string{}, + ForceFlush: &ForceFlush{ + Force: false, + LastFlush: time.Now(), + }, + }, + { + Name: "LogsWithForceFlush", + Pattern: `^LOGEND.*$`, + Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), + ExpectedTokenized: []string{ + "LOGPART log1\nLOGPART log1", + }, + ForceFlush: &ForceFlush{ + Force: true, + LastFlush: time.Now(), + }, + }, } for _, tc := range testCases { cfg := &MultilineConfig{ LineEndPattern: tc.Pattern, } - splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, NewForceFlush()) + if tc.ForceFlush == nil { + tc.ForceFlush = NewForceFlush() + } + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, tc.ForceFlush) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } @@ -341,10 +393,35 @@ func TestNewlineSplitFunc(t *testing.T) { ExpectedTokenized: []string{}, ExpectedError: errors.New("bufio.Scanner: token too long"), }, + { + Name: "LogsWithoutForceFlush", + Pattern: `^LOGEND.*$`, + Raw: []byte("LOGPART log1"), + ExpectedTokenized: []string{}, + ForceFlush: &ForceFlush{ + Force: false, + LastFlush: time.Now(), + }, + }, + { + Name: "LogsWithForceFlush", + Pattern: `^LOGEND.*$`, + Raw: []byte("LOGPART log1"), + ExpectedTokenized: []string{ + "LOGPART log1", + }, + ForceFlush: &ForceFlush{ + Force: true, + LastFlush: time.Now(), + }, + }, } for _, tc := range testCases { - splitFunc, err := NewNewlineSplitFunc(unicode.UTF8, false, NewForceFlush()) + if tc.ForceFlush == nil { + tc.ForceFlush = NewForceFlush() + } + splitFunc, err := NewNewlineSplitFunc(unicode.UTF8, false, tc.ForceFlush) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } From c6557fdf5554897b86cdb6eb7190d1c055d4e504 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Tue, 13 Jul 2021 08:37:32 +0200 Subject: [PATCH 12/15] refactor: move all force flush logic into ForceFlush struct Signed-off-by: Dominik Rosiek --- operator/builtin/input/file/reader.go | 4 +- operator/helper/multiline.go | 85 +++++++++++++++------------ operator/helper/multiline_test.go | 39 +++++------- 3 files changed, 64 insertions(+), 64 deletions(-) diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index 342609eb..6e39bf9b 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -137,11 +137,11 @@ func (r *Reader) ReadToEnd(ctx context.Context) { } // Force flush eventually in next iteration - r.multiline.CheckAndFlush() + r.multiline.Force.CheckAndFlush() break } // Update information about last flush time - r.multiline.Flushed() + r.multiline.Force.Flushed() if err := r.emit(ctx, scanner.Bytes()); err != nil { r.Error("Failed to emit entry", zap.Error(err)) diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index 146fc792..7e43b800 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -26,47 +26,60 @@ import ( // ForceFlush keeps information about force flush state type ForceFlush struct { - Force bool - LastFlush time.Time + force bool + forcePeriod time.Duration + + // lastFlush > lastForcedFlush => we can force flush if no logs are incoming for forcePeriod + // lastFlush = lastForcedFlush => last flush was forced, so we cannot force, we can update lastFlush + // lastFlush < lastForcedFlush => we just forced flush, set lastFlush to lastForcedFlush + lastFlush time.Time + lastForcedFlush time.Time } // NewForceFlush Creates new ForceFlush with lastFlush set to unix epoch // and order to not force ongoing flush -func NewForceFlush() *ForceFlush { +func NewForceFlush(forcePeriod time.Duration) *ForceFlush { return &ForceFlush{ - Force: false, - LastFlush: time.Unix(0, 0), + force: false, + lastFlush: time.Now(), + forcePeriod: forcePeriod, + lastForcedFlush: time.Unix(0, 0), } } // Multiline consists of splitFunc and variables needed to perform force flush type Multiline struct { - SplitFunc bufio.SplitFunc - force *ForceFlush - forcePeriod time.Duration - - // lastFlush > force.LastFlush => we can force flush if no logs are incoming for forcePeriod - // lastFlush = force.LastFlush => last flush was forced, so we cannot force, we can update lastFlush - // lastFlush < force.LastFlush => we just forced flush, set lastFlush to force.LastFlush - lastFlush time.Time + SplitFunc bufio.SplitFunc + Force *ForceFlush } // Flushed update lastFlush with current timestamp -func (m *Multiline) Flushed() { - if m.lastFlush.Sub(m.force.LastFlush) < 0 { - m.lastFlush = m.force.LastFlush +func (ff *ForceFlush) Flushed() { + if ff.lastFlush.Sub(ff.lastForcedFlush) < 0 { + ff.lastFlush = ff.lastForcedFlush } else { - m.lastFlush = time.Now() + ff.lastFlush = time.Now() } } // CheckAndFlush sets internal flag to true if data is going to be force flushed -func (m *Multiline) CheckAndFlush() { - if m.forcePeriod > 0 && time.Since(m.lastFlush) > m.forcePeriod && m.lastFlush.Sub(m.force.LastFlush) > 0 { - m.force.Force = true +func (ff *ForceFlush) CheckAndFlush() { + if ff.forcePeriod > 0 && time.Since(ff.lastFlush) > ff.forcePeriod && ff.lastFlush.Sub(ff.lastForcedFlush) > 0 { + ff.force = true } } +// ForceFlushed update struct fields after forced flush +func (ff *ForceFlush) ForceFlushed() { + ff.force = false + ff.lastForcedFlush = time.Now() +} + +// ShouldFlush returns true if data should be forcefully flushed +func (ff *ForceFlush) ShouldFlush() bool { + return ff.force +} + // NewBasicConfig creates a new Multiline config func NewMultilineConfig() MultilineConfig { return MultilineConfig{ @@ -87,12 +100,6 @@ type MultilineConfig struct { // Build will build a Multiline operator. func (c MultilineConfig) Build(encoding encoding.Encoding, flushAtEOF bool) (*Multiline, error) { - force := NewForceFlush() - splitFunc, err := c.getSplitFunc(encoding, flushAtEOF, force) - if err != nil { - return nil, err - } - if c.ForceFlushPeriod == "" { c.ForceFlushPeriod = "0s" } @@ -102,11 +109,14 @@ func (c MultilineConfig) Build(encoding encoding.Encoding, flushAtEOF bool) (*Mu return nil, err } + force := NewForceFlush(duration) + splitFunc, err := c.getSplitFunc(encoding, flushAtEOF, force) + if err != nil { + return nil, err + } return &Multiline{ - SplitFunc: splitFunc, - force: force, - forcePeriod: duration, - lastFlush: time.Now(), + SplitFunc: splitFunc, + Force: force, }, nil } @@ -141,9 +151,8 @@ func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding, flushAtEOF boo // tokens that start with a match to the regex pattern provided func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *ForceFlush) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { - if force.Force { - force.Force = false - force.LastFlush = time.Now() + if force != nil && force.ShouldFlush() { + force.ForceFlushed() token = trimWhitespaces(data) advance = len(data) return @@ -199,9 +208,8 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *ForceFlush // tokens that end with a match to the regex pattern provided func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *ForceFlush) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { - if force.Force { - force.Force = false - force.LastFlush = time.Now() + if force != nil && force.ShouldFlush() { + force.ForceFlushed() token = trimWhitespaces(data) advance = len(data) return @@ -244,9 +252,8 @@ func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *For } return func(data []byte, atEOF bool) (advance int, token []byte, err error) { - if force.Force { - force.Force = false - force.LastFlush = time.Now() + if force != nil && force.ShouldFlush() { + force.ForceFlushed() token = trimWhitespaces(data) advance = len(data) return diff --git a/operator/helper/multiline_test.go b/operator/helper/multiline_test.go index 1440140d..11741a47 100644 --- a/operator/helper/multiline_test.go +++ b/operator/helper/multiline_test.go @@ -151,8 +151,8 @@ func TestLineStartSplitFunc(t *testing.T) { Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), ExpectedTokenized: []string{}, ForceFlush: &ForceFlush{ - Force: false, - LastFlush: time.Now(), + force: false, + lastForcedFlush: time.Now(), }, }, { @@ -163,8 +163,8 @@ func TestLineStartSplitFunc(t *testing.T) { "LOGPART log1\nLOGPART log1", }, ForceFlush: &ForceFlush{ - Force: true, - LastFlush: time.Now(), + force: true, + lastForcedFlush: time.Now(), }, }, } @@ -173,16 +173,14 @@ func TestLineStartSplitFunc(t *testing.T) { cfg := &MultilineConfig{ LineStartPattern: tc.Pattern, } - if tc.ForceFlush == nil { - tc.ForceFlush = NewForceFlush() - } + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, tc.ForceFlush) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } t.Run("FirstMatchHitsEndOfBuffer", func(t *testing.T) { - splitFunc := NewLineStartSplitFunc(regexp.MustCompile("LOGSTART"), false, NewForceFlush()) + splitFunc := NewLineStartSplitFunc(regexp.MustCompile("LOGSTART"), false, nil) data := []byte(`LOGSTART`) t.Run("NotAtEOF", func(t *testing.T) { @@ -293,8 +291,8 @@ func TestLineEndSplitFunc(t *testing.T) { Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), ExpectedTokenized: []string{}, ForceFlush: &ForceFlush{ - Force: false, - LastFlush: time.Now(), + force: false, + lastForcedFlush: time.Now(), }, }, { @@ -305,8 +303,8 @@ func TestLineEndSplitFunc(t *testing.T) { "LOGPART log1\nLOGPART log1", }, ForceFlush: &ForceFlush{ - Force: true, - LastFlush: time.Now(), + force: true, + lastForcedFlush: time.Now(), }, }, } @@ -315,9 +313,7 @@ func TestLineEndSplitFunc(t *testing.T) { cfg := &MultilineConfig{ LineEndPattern: tc.Pattern, } - if tc.ForceFlush == nil { - tc.ForceFlush = NewForceFlush() - } + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, tc.ForceFlush) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) @@ -399,8 +395,8 @@ func TestNewlineSplitFunc(t *testing.T) { Raw: []byte("LOGPART log1"), ExpectedTokenized: []string{}, ForceFlush: &ForceFlush{ - Force: false, - LastFlush: time.Now(), + force: false, + lastForcedFlush: time.Now(), }, }, { @@ -411,16 +407,13 @@ func TestNewlineSplitFunc(t *testing.T) { "LOGPART log1", }, ForceFlush: &ForceFlush{ - Force: true, - LastFlush: time.Now(), + force: true, + lastForcedFlush: time.Now(), }, }, } for _, tc := range testCases { - if tc.ForceFlush == nil { - tc.ForceFlush = NewForceFlush() - } splitFunc, err := NewNewlineSplitFunc(unicode.UTF8, false, tc.ForceFlush) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) @@ -474,7 +467,7 @@ func TestNewlineSplitFunc_Encodings(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - splitFunc, err := NewNewlineSplitFunc(tc.encoding, false, NewForceFlush()) + splitFunc, err := NewNewlineSplitFunc(tc.encoding, false, nil) require.NoError(t, err) scanner := bufio.NewScanner(bytes.NewReader(tc.input)) scanner.Split(splitFunc) From bced39d3562107b586c1128629515118a968d453 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Tue, 13 Jul 2021 15:12:07 +0200 Subject: [PATCH 13/15] refactor: exclude forceFlush to be separate entity Signed-off-by: Dominik Rosiek --- operator/builtin/input/file/config.go | 27 ++++++----- operator/builtin/input/file/file.go | 19 +++++--- operator/builtin/input/file/file_test.go | 12 ++--- operator/builtin/input/file/reader.go | 16 ++++--- operator/builtin/input/tcp/tcp.go | 4 +- operator/builtin/input/udp/udp.go | 4 +- operator/helper/multiline.go | 57 +++++++++++------------- 7 files changed, 72 insertions(+), 67 deletions(-) diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index 71a295d3..5bbd83dd 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -43,6 +43,7 @@ func NewInputConfig(operatorID string) *InputConfig { IncludeFilePath: false, IncludeFileNameResolved: false, IncludeFilePathResolved: false, + ForceFlush: helper.NewForceFlushConfig(), StartAt: "end", FingerprintSize: defaultFingerprintSize, MaxLogSize: defaultMaxLogSize, @@ -58,17 +59,18 @@ type InputConfig struct { Include []string `mapstructure:"include,omitempty" json:"include,omitempty" yaml:"include,omitempty"` Exclude []string `mapstructure:"exclude,omitempty" json:"exclude,omitempty" yaml:"exclude,omitempty"` - PollInterval helper.Duration `mapstructure:"poll_interval,omitempty" json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` - Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"` - IncludeFileName bool `mapstructure:"include_file_name,omitempty" json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"` - IncludeFilePath bool `mapstructure:"include_file_path,omitempty" json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"` - IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty" json:"include_file_name_resolved,omitempty" yaml:"include_file_name_resolved,omitempty"` - IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty" json:"include_file_path_resolved,omitempty" yaml:"include_file_path_resolved,omitempty"` - StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"` - FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty" json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"` - MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` - MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty" json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"` - Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"` + PollInterval helper.Duration `mapstructure:"poll_interval,omitempty" json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` + Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"` + IncludeFileName bool `mapstructure:"include_file_name,omitempty" json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"` + IncludeFilePath bool `mapstructure:"include_file_path,omitempty" json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"` + IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty" json:"include_file_name_resolved,omitempty" yaml:"include_file_name_resolved,omitempty"` + IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty" json:"include_file_path_resolved,omitempty" yaml:"include_file_path_resolved,omitempty"` + StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"` + FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty" json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"` + MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` + MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty" json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"` + Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"` + ForceFlush helper.ForceFlushConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"` } // Build will build a file input operator from the supplied configuration @@ -118,7 +120,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, } // Ensure that multiline is buildable - _, err = c.Multiline.Build(encoding.Encoding, false) + _, err = c.Multiline.Build(encoding.Encoding, false, nil) if err != nil { return nil, err } @@ -164,6 +166,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, FilePathResolvedField: filePathResolvedField, FileNameResolvedField: fileNameResolvedField, startAtBeginning: startAtBeginning, + ForceFlush: c.ForceFlush, queuedMatches: make([]string, 0), encoding: encoding, firstCheck: true, diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index 3b76046a..7d12cd93 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -15,6 +15,7 @@ package file import ( + "bufio" "bytes" "context" "encoding/json" @@ -44,6 +45,7 @@ type InputOperator struct { FileNameResolvedField entry.Field PollInterval time.Duration Multiline helper.MultilineConfig + ForceFlush helper.ForceFlushConfig MaxLogSize int MaxConcurrentFiles int SeenPaths map[string]struct{} @@ -322,11 +324,11 @@ func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck boo } // If we don't match any previously known files, create a new reader from scratch - multiline, err := f.getMultiline() + splitFunc, forceFlush, err := f.getMultiline() if err != nil { return nil, err } - newReader, err := f.NewReader(file.Name(), file, fp, multiline) + newReader, err := f.NewReader(file.Name(), file, fp, splitFunc, forceFlush) if err != nil { return nil, err } @@ -396,11 +398,11 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error { // Decode each of the known files f.knownFiles = make([]*Reader, 0, knownFileCount) for i := 0; i < knownFileCount; i++ { - multiline, err := f.getMultiline() + splitFunc, forceFlush, err := f.getMultiline() if err != nil { return err } - newReader, err := f.NewReader("", nil, nil, multiline) + newReader, err := f.NewReader("", nil, nil, splitFunc, forceFlush) if err != nil { return err } @@ -413,7 +415,10 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error { return nil } -// Build multiline using struct fields -func (f *InputOperator) getMultiline() (*helper.Multiline, error) { - return f.Multiline.Build(f.encoding.Encoding, false) +// getMultiline returns splitFunc, related ForceFlush structure and error eventually +func (f *InputOperator) getMultiline() (bufio.SplitFunc, *helper.ForceFlush, error) { + force := f.ForceFlush.Build() + splitFunc, err := f.Multiline.Build(f.encoding.Encoding, false, force) + + return splitFunc, force, err } diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index d245fde7..235f9aa5 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -297,8 +297,8 @@ func TestStartAtEndNewFile(t *testing.T) { func TestNoNewline(t *testing.T) { t.Parallel() operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *InputConfig) { - cfg.Multiline = helper.NewMultilineConfig() - cfg.Multiline.ForceFlushPeriod = "1ms" + cfg.ForceFlush = helper.NewForceFlushConfig() + cfg.ForceFlush.Period.Duration = time.Nanosecond }, nil) temp := openTemp(t, tempDir) @@ -628,10 +628,10 @@ func TestFileReader_FingerprintUpdated(t *testing.T) { fp, err := operator.NewFingerprint(temp) require.NoError(t, err) - multiline, err := operator.getMultiline() + splitFunc, forceFlush, err := operator.getMultiline() require.NoError(t, err) - reader, err := operator.NewReader(temp.Name(), tempCopy, fp, multiline) + reader, err := operator.NewReader(temp.Name(), tempCopy, fp, splitFunc, forceFlush) require.NoError(t, err) defer reader.Close() @@ -672,10 +672,10 @@ func TestFingerprintGrowsAndStops(t *testing.T) { require.NoError(t, err) require.Equal(t, []byte(""), fp.FirstBytes) - multiline, err := operator.getMultiline() + splitFunc, forceFlush, err := operator.getMultiline() require.NoError(t, err) - reader, err := operator.NewReader(temp.Name(), tempCopy, fp, multiline) + reader, err := operator.NewReader(temp.Name(), tempCopy, fp, splitFunc, forceFlush) require.NoError(t, err) defer reader.Close() diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index 6e39bf9b..ccf502ab 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -71,13 +71,14 @@ type Reader struct { decoder *encoding.Decoder decodeBuffer []byte - multiline *helper.Multiline + splitFunc bufio.SplitFunc + forceFlush *helper.ForceFlush *zap.SugaredLogger `json:"-"` } // NewReader creates a new file reader -func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint, multiline *helper.Multiline) (*Reader, error) { +func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint, splitFunc bufio.SplitFunc, force *helper.ForceFlush) (*Reader, error) { r := &Reader{ Fingerprint: fp, file: file, @@ -86,14 +87,15 @@ func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint, m decoder: f.encoding.Encoding.NewDecoder(), decodeBuffer: make([]byte, 1<<12), fileAttributes: f.resolveFileAttributes(path), - multiline: multiline, + splitFunc: splitFunc, + forceFlush: force, } return r, nil } // Copy creates a deep copy of a Reader func (r *Reader) Copy(file *os.File) (*Reader, error) { - reader, err := r.fileInput.NewReader(r.fileAttributes.Path, file, r.Fingerprint.Copy(), r.multiline) + reader, err := r.fileInput.NewReader(r.fileAttributes.Path, file, r.Fingerprint.Copy(), r.splitFunc, r.forceFlush) if err != nil { return nil, err } @@ -120,7 +122,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) { return } - scanner := NewPositionalScanner(r, r.fileInput.MaxLogSize, r.Offset, r.multiline.SplitFunc) + scanner := NewPositionalScanner(r, r.fileInput.MaxLogSize, r.Offset, r.splitFunc) // Iterate over the tokenized file, emitting entries as we go for { @@ -137,11 +139,11 @@ func (r *Reader) ReadToEnd(ctx context.Context) { } // Force flush eventually in next iteration - r.multiline.Force.CheckAndFlush() + r.forceFlush.CheckAndFlush() break } // Update information about last flush time - r.multiline.Force.Flushed() + r.forceFlush.Flushed() if err := r.emit(ctx, scanner.Bytes()); err != nil { r.Error("Failed to emit entry", zap.Error(err)) diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index 1b18da3e..3adbcd37 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -99,7 +99,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato } // Build multiline - multiline, err := c.Multiline.Build(encoding.Encoding, true) + splitFunc, err := c.Multiline.Build(encoding.Encoding, true, nil) if err != nil { return nil, err } @@ -115,7 +115,7 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato MaxLogSize: int(c.MaxLogSize), addAttributes: c.AddAttributes, encoding: encoding, - splitFunc: multiline.SplitFunc, + splitFunc: splitFunc, backoff: backoff.Backoff{ Max: 3 * time.Second, }, diff --git a/operator/builtin/input/udp/udp.go b/operator/builtin/input/udp/udp.go index b40ee80c..080592ee 100644 --- a/operator/builtin/input/udp/udp.go +++ b/operator/builtin/input/udp/udp.go @@ -82,7 +82,7 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato } // Build multiline - multiline, err := c.Multiline.Build(encoding.Encoding, true) + splitFunc, err := c.Multiline.Build(encoding.Encoding, true, nil) if err != nil { return nil, err } @@ -98,7 +98,7 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato buffer: make([]byte, MaxUDPSize), addAttributes: c.AddAttributes, encoding: encoding, - splitFunc: multiline.SplitFunc, + splitFunc: splitFunc, resolver: resolver, } return []operator.Operator{udpInput}, nil diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index 7e43b800..250e06aa 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -24,6 +24,22 @@ import ( "golang.org/x/text/encoding" ) +type ForceFlushConfig struct { + Period Duration `mapstructure:"force_flush_period" json:"force_flush_period" yaml:"force_flush_period"` +} + +// NewBasicConfig creates a new Multiline config +func NewForceFlushConfig() ForceFlushConfig { + return ForceFlushConfig{ + // Empty or `0s` means that we will never force flush + Period: Duration{Duration: 0}, + } +} + +func (c *ForceFlushConfig) Build() *ForceFlush { + return NewForceFlush(c.Period) +} + // ForceFlush keeps information about force flush state type ForceFlush struct { force bool @@ -38,21 +54,15 @@ type ForceFlush struct { // NewForceFlush Creates new ForceFlush with lastFlush set to unix epoch // and order to not force ongoing flush -func NewForceFlush(forcePeriod time.Duration) *ForceFlush { +func NewForceFlush(forcePeriod Duration) *ForceFlush { return &ForceFlush{ force: false, lastFlush: time.Now(), - forcePeriod: forcePeriod, + forcePeriod: forcePeriod.Raw(), lastForcedFlush: time.Unix(0, 0), } } -// Multiline consists of splitFunc and variables needed to perform force flush -type Multiline struct { - SplitFunc bufio.SplitFunc - Force *ForceFlush -} - // Flushed update lastFlush with current timestamp func (ff *ForceFlush) Flushed() { if ff.lastFlush.Sub(ff.lastForcedFlush) < 0 { @@ -80,14 +90,17 @@ func (ff *ForceFlush) ShouldFlush() bool { return ff.force } +// Multiline consists of splitFunc and variables needed to perform force flush +type Multiline struct { + SplitFunc bufio.SplitFunc + Force *ForceFlush +} + // NewBasicConfig creates a new Multiline config func NewMultilineConfig() MultilineConfig { return MultilineConfig{ LineStartPattern: "", LineEndPattern: "", - - // Empty or `0s` means that we will never force flush - ForceFlushPeriod: "0s", } } @@ -95,29 +108,11 @@ func NewMultilineConfig() MultilineConfig { type MultilineConfig struct { LineStartPattern string `mapstructure:"line_start_pattern" json:"line_start_pattern" yaml:"line_start_pattern"` LineEndPattern string `mapstructure:"line_end_pattern" json:"line_end_pattern" yaml:"line_end_pattern"` - ForceFlushPeriod string `mapstructure:"force_flush_period" json:"force_flush_period" yaml:"force_flush_period"` } // Build will build a Multiline operator. -func (c MultilineConfig) Build(encoding encoding.Encoding, flushAtEOF bool) (*Multiline, error) { - if c.ForceFlushPeriod == "" { - c.ForceFlushPeriod = "0s" - } - - duration, err := time.ParseDuration(c.ForceFlushPeriod) - if err != nil { - return nil, err - } - - force := NewForceFlush(duration) - splitFunc, err := c.getSplitFunc(encoding, flushAtEOF, force) - if err != nil { - return nil, err - } - return &Multiline{ - SplitFunc: splitFunc, - Force: force, - }, nil +func (c MultilineConfig) Build(encoding encoding.Encoding, flushAtEOF bool, force *ForceFlush) (bufio.SplitFunc, error) { + return c.getSplitFunc(encoding, flushAtEOF, force) } // getSplitFunc returns split function for bufio.Scanner basing on configured pattern From 6c72b19bc0ccdd1582bc35b3d096fcae6f80c612 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Wed, 14 Jul 2021 11:02:24 +0200 Subject: [PATCH 14/15] refactor(multiline): rename ForceFlusher to Flusher and group with Multiline as Splitter eventually Signed-off-by: Dominik Rosiek --- operator/builtin/input/file/config.go | 30 +++-- operator/builtin/input/file/config_test.go | 56 ++++++---- operator/builtin/input/file/file.go | 21 ++-- operator/builtin/input/file/file_test.go | 12 +- operator/builtin/input/file/reader.go | 16 ++- operator/helper/multiline.go | 121 +++++++++++++++------ operator/helper/multiline_test.go | 32 +++--- 7 files changed, 169 insertions(+), 119 deletions(-) diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index 5bbd83dd..29d8bcbe 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -43,7 +43,7 @@ func NewInputConfig(operatorID string) *InputConfig { IncludeFilePath: false, IncludeFileNameResolved: false, IncludeFilePathResolved: false, - ForceFlush: helper.NewForceFlushConfig(), + Splitter: helper.NewSplitterConfig(), StartAt: "end", FingerprintSize: defaultFingerprintSize, MaxLogSize: defaultMaxLogSize, @@ -59,18 +59,17 @@ type InputConfig struct { Include []string `mapstructure:"include,omitempty" json:"include,omitempty" yaml:"include,omitempty"` Exclude []string `mapstructure:"exclude,omitempty" json:"exclude,omitempty" yaml:"exclude,omitempty"` - PollInterval helper.Duration `mapstructure:"poll_interval,omitempty" json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` - Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"` - IncludeFileName bool `mapstructure:"include_file_name,omitempty" json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"` - IncludeFilePath bool `mapstructure:"include_file_path,omitempty" json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"` - IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty" json:"include_file_name_resolved,omitempty" yaml:"include_file_name_resolved,omitempty"` - IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty" json:"include_file_path_resolved,omitempty" yaml:"include_file_path_resolved,omitempty"` - StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"` - FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty" json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"` - MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` - MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty" json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"` - Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"` - ForceFlush helper.ForceFlushConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"` + PollInterval helper.Duration `mapstructure:"poll_interval,omitempty" json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` + IncludeFileName bool `mapstructure:"include_file_name,omitempty" json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"` + IncludeFilePath bool `mapstructure:"include_file_path,omitempty" json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"` + IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty" json:"include_file_name_resolved,omitempty" yaml:"include_file_name_resolved,omitempty"` + IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty" json:"include_file_path_resolved,omitempty" yaml:"include_file_path_resolved,omitempty"` + StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"` + FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty" json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"` + MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` + MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty" json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"` + Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"` + Splitter helper.SplitterConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"` } // Build will build a file input operator from the supplied configuration @@ -120,7 +119,7 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, } // Ensure that multiline is buildable - _, err = c.Multiline.Build(encoding.Encoding, false, nil) + _, err = c.Splitter.Build(encoding.Encoding, false) if err != nil { return nil, err } @@ -159,14 +158,13 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, InputOperator: inputOperator, Include: c.Include, Exclude: c.Exclude, - Multiline: c.Multiline, PollInterval: c.PollInterval.Raw(), FilePathField: filePathField, FileNameField: fileNameField, FilePathResolvedField: filePathResolvedField, FileNameResolvedField: fileNameResolvedField, startAtBeginning: startAtBeginning, - ForceFlush: c.ForceFlush, + Splitter: c.Splitter, queuedMatches: make([]string, 0), encoding: encoding, firstCheck: true, diff --git a/operator/builtin/input/file/config_test.go b/operator/builtin/input/file/config_test.go index 88396682..f9726234 100644 --- a/operator/builtin/input/file/config_test.go +++ b/operator/builtin/input/file/config_test.go @@ -389,9 +389,9 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *InputConfig { cfg := defaultCfg() - newMulti := helper.MultilineConfig{} - newMulti.LineStartPattern = "Start" - cfg.Multiline = newMulti + newSplit := helper.NewSplitterConfig() + newSplit.Multiline.LineStartPattern = "Start" + cfg.Splitter = newSplit return cfg }(), }, @@ -400,9 +400,9 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *InputConfig { cfg := defaultCfg() - newMulti := helper.MultilineConfig{} - newMulti.LineStartPattern = "%" - cfg.Multiline = newMulti + newSplit := helper.NewSplitterConfig() + newSplit.Multiline.LineStartPattern = "%" + cfg.Splitter = newSplit return cfg }(), }, @@ -411,9 +411,9 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *InputConfig { cfg := defaultCfg() - newMulti := helper.MultilineConfig{} - newMulti.LineEndPattern = "Start" - cfg.Multiline = newMulti + newSplit := helper.NewSplitterConfig() + newSplit.Multiline.LineEndPattern = "Start" + cfg.Splitter = newSplit return cfg }(), }, @@ -422,9 +422,9 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *InputConfig { cfg := defaultCfg() - newMulti := helper.MultilineConfig{} - newMulti.LineEndPattern = "%" - cfg.Multiline = newMulti + newSplit := helper.NewSplitterConfig() + newSplit.Multiline.LineEndPattern = "%" + cfg.Splitter = newSplit return cfg }(), }, @@ -564,7 +564,8 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredStartAndEndPatterns", func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ + f.Splitter = helper.NewSplitterConfig() + f.Splitter.Multiline = helper.MultilineConfig{ LineEndPattern: "Exists", LineStartPattern: "Exists", } @@ -575,7 +576,8 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredStartPattern", func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ + f.Splitter = helper.NewSplitterConfig() + f.Splitter.Multiline = helper.MultilineConfig{ LineStartPattern: "START.*", } }, @@ -585,7 +587,8 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredEndPattern", func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ + f.Splitter = helper.NewSplitterConfig() + f.Splitter.Multiline = helper.MultilineConfig{ LineEndPattern: "END.*", } }, @@ -603,7 +606,8 @@ func TestBuild(t *testing.T) { { "LineStartAndEnd", func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ + f.Splitter = helper.NewSplitterConfig() + f.Splitter.Multiline = helper.MultilineConfig{ LineStartPattern: ".*", LineEndPattern: ".*", } @@ -614,7 +618,8 @@ func TestBuild(t *testing.T) { { "NoLineStartOrEnd", func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{} + f.Splitter = helper.NewSplitterConfig() + f.Splitter.Multiline = helper.MultilineConfig{} }, require.NoError, func(t *testing.T, f *InputOperator) {}, @@ -622,7 +627,8 @@ func TestBuild(t *testing.T) { { "InvalidLineStartRegex", func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ + f.Splitter = helper.NewSplitterConfig() + f.Splitter.Multiline = helper.MultilineConfig{ LineStartPattern: "(", } }, @@ -632,7 +638,8 @@ func TestBuild(t *testing.T) { { "InvalidLineEndRegex", func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ + f.Splitter = helper.NewSplitterConfig() + f.Splitter.Multiline = helper.MultilineConfig{ LineEndPattern: "(", } }, @@ -673,7 +680,8 @@ func NewTestInputConfig() *InputConfig { cfg.WriteTo = entry.NewBodyField([]string{}...) cfg.Include = []string{"i1", "i2"} cfg.Exclude = []string{"e1", "e2"} - cfg.Multiline = helper.MultilineConfig{ + cfg.Splitter = helper.NewSplitterConfig() + cfg.Splitter.Multiline = helper.MultilineConfig{ LineStartPattern: "start", LineEndPattern: "end", } @@ -695,8 +703,8 @@ func TestMapStructureDecodeConfigWithHook(t *testing.T) { "exclude": expect.Exclude, "poll_interval": 0.2, "multiline": map[string]interface{}{ - "line_start_pattern": expect.Multiline.LineStartPattern, - "line_end_pattern": expect.Multiline.LineEndPattern, + "line_start_pattern": expect.Splitter.Multiline.LineStartPattern, + "line_end_pattern": expect.Splitter.Multiline.LineEndPattern, }, "include_file_name": true, "include_file_path": false, @@ -731,8 +739,8 @@ func TestMapStructureDecodeConfig(t *testing.T) { "Duration": 200 * 1000 * 1000, }, "multiline": map[string]interface{}{ - "line_start_pattern": expect.Multiline.LineStartPattern, - "line_end_pattern": expect.Multiline.LineEndPattern, + "line_start_pattern": expect.Splitter.Multiline.LineStartPattern, + "line_end_pattern": expect.Splitter.Multiline.LineEndPattern, }, "include_file_name": true, "include_file_path": false, diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index 7d12cd93..b864f6e2 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -15,7 +15,6 @@ package file import ( - "bufio" "bytes" "context" "encoding/json" @@ -44,8 +43,7 @@ type InputOperator struct { FilePathResolvedField entry.Field FileNameResolvedField entry.Field PollInterval time.Duration - Multiline helper.MultilineConfig - ForceFlush helper.ForceFlushConfig + Splitter helper.SplitterConfig MaxLogSize int MaxConcurrentFiles int SeenPaths map[string]struct{} @@ -324,11 +322,11 @@ func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck boo } // If we don't match any previously known files, create a new reader from scratch - splitFunc, forceFlush, err := f.getMultiline() + splitter, err := f.getMultiline() if err != nil { return nil, err } - newReader, err := f.NewReader(file.Name(), file, fp, splitFunc, forceFlush) + newReader, err := f.NewReader(file.Name(), file, fp, splitter) if err != nil { return nil, err } @@ -398,11 +396,11 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error { // Decode each of the known files f.knownFiles = make([]*Reader, 0, knownFileCount) for i := 0; i < knownFileCount; i++ { - splitFunc, forceFlush, err := f.getMultiline() + splitter, err := f.getMultiline() if err != nil { return err } - newReader, err := f.NewReader("", nil, nil, splitFunc, forceFlush) + newReader, err := f.NewReader("", nil, nil, splitter) if err != nil { return err } @@ -415,10 +413,7 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error { return nil } -// getMultiline returns splitFunc, related ForceFlush structure and error eventually -func (f *InputOperator) getMultiline() (bufio.SplitFunc, *helper.ForceFlush, error) { - force := f.ForceFlush.Build() - splitFunc, err := f.Multiline.Build(f.encoding.Encoding, false, force) - - return splitFunc, force, err +// getMultiline returns helper.Splitter structure and error eventually +func (f *InputOperator) getMultiline() (*helper.Splitter, error) { + return f.Splitter.Build(f.encoding.Encoding, false) } diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index 235f9aa5..cc0b9fe2 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -297,8 +297,8 @@ func TestStartAtEndNewFile(t *testing.T) { func TestNoNewline(t *testing.T) { t.Parallel() operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *InputConfig) { - cfg.ForceFlush = helper.NewForceFlushConfig() - cfg.ForceFlush.Period.Duration = time.Nanosecond + cfg.Splitter = helper.NewSplitterConfig() + cfg.Splitter.Flusher.Period.Duration = time.Nanosecond }, nil) temp := openTemp(t, tempDir) @@ -628,10 +628,10 @@ func TestFileReader_FingerprintUpdated(t *testing.T) { fp, err := operator.NewFingerprint(temp) require.NoError(t, err) - splitFunc, forceFlush, err := operator.getMultiline() + splitter, err := operator.getMultiline() require.NoError(t, err) - reader, err := operator.NewReader(temp.Name(), tempCopy, fp, splitFunc, forceFlush) + reader, err := operator.NewReader(temp.Name(), tempCopy, fp, splitter) require.NoError(t, err) defer reader.Close() @@ -672,10 +672,10 @@ func TestFingerprintGrowsAndStops(t *testing.T) { require.NoError(t, err) require.Equal(t, []byte(""), fp.FirstBytes) - splitFunc, forceFlush, err := operator.getMultiline() + splitter, err := operator.getMultiline() require.NoError(t, err) - reader, err := operator.NewReader(temp.Name(), tempCopy, fp, splitFunc, forceFlush) + reader, err := operator.NewReader(temp.Name(), tempCopy, fp, splitter) require.NoError(t, err) defer reader.Close() diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index ccf502ab..2bb9efdf 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -71,14 +71,13 @@ type Reader struct { decoder *encoding.Decoder decodeBuffer []byte - splitFunc bufio.SplitFunc - forceFlush *helper.ForceFlush + splitter *helper.Splitter *zap.SugaredLogger `json:"-"` } // NewReader creates a new file reader -func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint, splitFunc bufio.SplitFunc, force *helper.ForceFlush) (*Reader, error) { +func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint, splitter *helper.Splitter) (*Reader, error) { r := &Reader{ Fingerprint: fp, file: file, @@ -87,15 +86,14 @@ func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint, s decoder: f.encoding.Encoding.NewDecoder(), decodeBuffer: make([]byte, 1<<12), fileAttributes: f.resolveFileAttributes(path), - splitFunc: splitFunc, - forceFlush: force, + splitter: splitter, } return r, nil } // Copy creates a deep copy of a Reader func (r *Reader) Copy(file *os.File) (*Reader, error) { - reader, err := r.fileInput.NewReader(r.fileAttributes.Path, file, r.Fingerprint.Copy(), r.splitFunc, r.forceFlush) + reader, err := r.fileInput.NewReader(r.fileAttributes.Path, file, r.Fingerprint.Copy(), r.splitter) if err != nil { return nil, err } @@ -122,7 +120,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) { return } - scanner := NewPositionalScanner(r, r.fileInput.MaxLogSize, r.Offset, r.splitFunc) + scanner := NewPositionalScanner(r, r.fileInput.MaxLogSize, r.Offset, r.splitter.SplitFunc) // Iterate over the tokenized file, emitting entries as we go for { @@ -139,11 +137,11 @@ func (r *Reader) ReadToEnd(ctx context.Context) { } // Force flush eventually in next iteration - r.forceFlush.CheckAndFlush() + r.splitter.CheckAndFlush() break } // Update information about last flush time - r.forceFlush.Flushed() + r.splitter.Flushed() if err := r.emit(ctx, scanner.Bytes()); err != nil { r.Error("Failed to emit entry", zap.Error(err)) diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index 250e06aa..ff1daf4d 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -24,25 +24,30 @@ import ( "golang.org/x/text/encoding" ) -type ForceFlushConfig struct { +// FlusherConfig is a configuration of Flusher helper +type FlusherConfig struct { Period Duration `mapstructure:"force_flush_period" json:"force_flush_period" yaml:"force_flush_period"` } -// NewBasicConfig creates a new Multiline config -func NewForceFlushConfig() ForceFlushConfig { - return ForceFlushConfig{ +// NewFlusherConfig creates a default Flusher config +func NewFlusherConfig() FlusherConfig { + return FlusherConfig{ // Empty or `0s` means that we will never force flush Period: Duration{Duration: 0}, } } -func (c *ForceFlushConfig) Build() *ForceFlush { - return NewForceFlush(c.Period) +// Build creates Flusher from configuration +func (c *FlusherConfig) Build() *Flusher { + return NewFlusher(c.Period) } -// ForceFlush keeps information about force flush state -type ForceFlush struct { - force bool +// Flusher keeps information about flush state +type Flusher struct { + // force is true when data should be flushed as soon as possible + force bool + // forcePeriod defines time from last flush which should pass before setting force to true. + // Never forces if forcePeriod is set to 0 forcePeriod time.Duration // lastFlush > lastForcedFlush => we can force flush if no logs are incoming for forcePeriod @@ -52,10 +57,10 @@ type ForceFlush struct { lastForcedFlush time.Time } -// NewForceFlush Creates new ForceFlush with lastFlush set to unix epoch +// NewFlusher Creates new Flusher with lastFlush set to unix epoch // and order to not force ongoing flush -func NewForceFlush(forcePeriod Duration) *ForceFlush { - return &ForceFlush{ +func NewFlusher(forcePeriod Duration) *Flusher { + return &Flusher{ force: false, lastFlush: time.Now(), forcePeriod: forcePeriod.Raw(), @@ -64,36 +69,36 @@ func NewForceFlush(forcePeriod Duration) *ForceFlush { } // Flushed update lastFlush with current timestamp -func (ff *ForceFlush) Flushed() { - if ff.lastFlush.Sub(ff.lastForcedFlush) < 0 { - ff.lastFlush = ff.lastForcedFlush +func (f *Flusher) Flushed() { + if f.lastFlush.Sub(f.lastForcedFlush) < 0 { + f.lastFlush = f.lastForcedFlush } else { - ff.lastFlush = time.Now() + f.lastFlush = time.Now() } } // CheckAndFlush sets internal flag to true if data is going to be force flushed -func (ff *ForceFlush) CheckAndFlush() { - if ff.forcePeriod > 0 && time.Since(ff.lastFlush) > ff.forcePeriod && ff.lastFlush.Sub(ff.lastForcedFlush) > 0 { - ff.force = true +func (f *Flusher) CheckAndFlush() { + if f.forcePeriod > 0 && time.Since(f.lastFlush) > f.forcePeriod && f.lastFlush.Sub(f.lastForcedFlush) > 0 { + f.force = true } } // ForceFlushed update struct fields after forced flush -func (ff *ForceFlush) ForceFlushed() { - ff.force = false - ff.lastForcedFlush = time.Now() +func (f *Flusher) ForceFlushed() { + f.force = false + f.lastForcedFlush = time.Now() } // ShouldFlush returns true if data should be forcefully flushed -func (ff *ForceFlush) ShouldFlush() bool { - return ff.force +func (f *Flusher) ShouldFlush() bool { + return f.force } // Multiline consists of splitFunc and variables needed to perform force flush type Multiline struct { SplitFunc bufio.SplitFunc - Force *ForceFlush + Force *Flusher } // NewBasicConfig creates a new Multiline config @@ -111,12 +116,12 @@ type MultilineConfig struct { } // Build will build a Multiline operator. -func (c MultilineConfig) Build(encoding encoding.Encoding, flushAtEOF bool, force *ForceFlush) (bufio.SplitFunc, error) { +func (c MultilineConfig) Build(encoding encoding.Encoding, flushAtEOF bool, force *Flusher) (bufio.SplitFunc, error) { return c.getSplitFunc(encoding, flushAtEOF, force) } // getSplitFunc returns split function for bufio.Scanner basing on configured pattern -func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *ForceFlush) (bufio.SplitFunc, error) { +func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *Flusher) (bufio.SplitFunc, error) { endPattern := c.LineEndPattern startPattern := c.LineStartPattern @@ -144,7 +149,7 @@ func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding, flushAtEOF boo // NewLineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that start with a match to the regex pattern provided -func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *ForceFlush) bufio.SplitFunc { +func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *Flusher) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { if force != nil && force.ShouldFlush() { force.ForceFlushed() @@ -174,7 +179,7 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *ForceFlush } if firstMatchEnd == len(data) { - // the first match goes to the end of the buffer, so don't look for a second match + // the first match goes to the end of the bufer, so don't look for a second match return 0, nil, nil } @@ -185,12 +190,12 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *ForceFlush return } - secondLocOffset := firstMatchEnd + 1 - secondLoc := re.FindIndex(data[secondLocOffset:]) + secondLocOfset := firstMatchEnd + 1 + secondLoc := re.FindIndex(data[secondLocOfset:]) if secondLoc == nil { return 0, nil, nil // read more data and try again } - secondMatchStart := secondLoc[0] + secondLocOffset + secondMatchStart := secondLoc[0] + secondLocOfset advance = secondMatchStart // start scanning at the beginning of the second match token = trimWhitespaces(data[firstMatchStart:secondMatchStart]) // the token begins at the first match, and ends at the beginning of the second match @@ -201,7 +206,7 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *ForceFlush // NewLineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that end with a match to the regex pattern provided -func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *ForceFlush) bufio.SplitFunc { +func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *Flusher) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { if force != nil && force.ShouldFlush() { force.ForceFlushed() @@ -220,7 +225,7 @@ func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *ForceFlush) return 0, nil, nil // read more data and try again } - // If the match goes up to the end of the current buffer, do another + // If the match goes up to the end of the current bufer, do another // read until we can capture the entire match if loc[1] == len(data)-1 && !atEOF { return 0, nil, nil @@ -235,7 +240,7 @@ func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *ForceFlush) // NewNewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but // never returning an token using EOF as a terminator -func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *ForceFlush) (bufio.SplitFunc, error) { +func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *Flusher) (bufio.SplitFunc, error) { newline, err := encodedNewline(encoding) if err != nil { return nil, err @@ -292,3 +297,49 @@ func trimWhitespaces(data []byte) []byte { // TrimRight to strip all whitespaces from the end of log return bytes.TrimLeft(bytes.TrimRight(data, "\r\n\t "), "\r\n") } + +// SplitterConfig consolidates MultilineConfig and FlusherConfig +type SplitterConfig struct { + Multiline MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"` + Flusher FlusherConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"` +} + +// NewSplitterConfig returns default SplitterConfig +func NewSplitterConfig() SplitterConfig { + return SplitterConfig{ + Multiline: NewMultilineConfig(), + Flusher: NewFlusherConfig(), + } +} + +// Build builds Splitter struct +func (c *SplitterConfig) Build(encoding encoding.Encoding, flushAtEOF bool) (*Splitter, error) { + flusher := c.Flusher.Build() + splitFunc, err := c.Multiline.Build(encoding, flushAtEOF, flusher) + + if err != nil { + return nil, err + } + + return &Splitter{ + Flusher: flusher, + SplitFunc: splitFunc, + }, nil +} + +// Splitter consolidates Flusher and dependent splitFunc +type Splitter struct { + SplitFunc bufio.SplitFunc + Flusher *Flusher +} + +// Flushed informs Flusher that Flushed had been performed +func (s *Splitter) Flushed() { + s.Flusher.Flushed() +} + +// CheckAndFlush instructs Flusher to check if next log should be forcefully flushed +// and set appropriate flags if yes +func (s *Splitter) CheckAndFlush() { + s.Flusher.CheckAndFlush() +} diff --git a/operator/helper/multiline_test.go b/operator/helper/multiline_test.go index 11741a47..4bc7b6fc 100644 --- a/operator/helper/multiline_test.go +++ b/operator/helper/multiline_test.go @@ -34,7 +34,7 @@ type tokenizerTestCase struct { Raw []byte ExpectedTokenized []string ExpectedError error - ForceFlush *ForceFlush + Flusher *Flusher } func (tc tokenizerTestCase) RunFunc(splitFunc bufio.SplitFunc) func(t *testing.T) { @@ -146,23 +146,23 @@ func TestLineStartSplitFunc(t *testing.T) { }, }, { - Name: "LogsWithoutForceFlush", + Name: "LogsWithoutFlusher", Pattern: `^LOGSTART \d+`, Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), ExpectedTokenized: []string{}, - ForceFlush: &ForceFlush{ + Flusher: &Flusher{ force: false, lastForcedFlush: time.Now(), }, }, { - Name: "LogsWithForceFlush", + Name: "LogsWithFlusher", Pattern: `^LOGSTART \d+`, Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), ExpectedTokenized: []string{ "LOGPART log1\nLOGPART log1", }, - ForceFlush: &ForceFlush{ + Flusher: &Flusher{ force: true, lastForcedFlush: time.Now(), }, @@ -174,7 +174,7 @@ func TestLineStartSplitFunc(t *testing.T) { LineStartPattern: tc.Pattern, } - splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, tc.ForceFlush) + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, tc.Flusher) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } @@ -286,23 +286,23 @@ func TestLineEndSplitFunc(t *testing.T) { }, }, { - Name: "LogsWithoutForceFlush", + Name: "LogsWithoutFlusher", Pattern: `^LOGEND.*$`, Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), ExpectedTokenized: []string{}, - ForceFlush: &ForceFlush{ + Flusher: &Flusher{ force: false, lastForcedFlush: time.Now(), }, }, { - Name: "LogsWithForceFlush", + Name: "LogsWithFlusher", Pattern: `^LOGEND.*$`, Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), ExpectedTokenized: []string{ "LOGPART log1\nLOGPART log1", }, - ForceFlush: &ForceFlush{ + Flusher: &Flusher{ force: true, lastForcedFlush: time.Now(), }, @@ -314,7 +314,7 @@ func TestLineEndSplitFunc(t *testing.T) { LineEndPattern: tc.Pattern, } - splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, tc.ForceFlush) + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, tc.Flusher) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } @@ -390,23 +390,23 @@ func TestNewlineSplitFunc(t *testing.T) { ExpectedError: errors.New("bufio.Scanner: token too long"), }, { - Name: "LogsWithoutForceFlush", + Name: "LogsWithoutFlusher", Pattern: `^LOGEND.*$`, Raw: []byte("LOGPART log1"), ExpectedTokenized: []string{}, - ForceFlush: &ForceFlush{ + Flusher: &Flusher{ force: false, lastForcedFlush: time.Now(), }, }, { - Name: "LogsWithForceFlush", + Name: "LogsWithFlusher", Pattern: `^LOGEND.*$`, Raw: []byte("LOGPART log1"), ExpectedTokenized: []string{ "LOGPART log1", }, - ForceFlush: &ForceFlush{ + Flusher: &Flusher{ force: true, lastForcedFlush: time.Now(), }, @@ -414,7 +414,7 @@ func TestNewlineSplitFunc(t *testing.T) { } for _, tc := range testCases { - splitFunc, err := NewNewlineSplitFunc(unicode.UTF8, false, tc.ForceFlush) + splitFunc, err := NewNewlineSplitFunc(unicode.UTF8, false, tc.Flusher) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } From b2faa2c73d7c8d6e92985530dec4c33a44ba68ae Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Wed, 14 Jul 2021 14:55:27 +0200 Subject: [PATCH 15/15] docs(file_input): add force_flush_period option Signed-off-by: Dominik Rosiek --- docs/operators/file_input.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/operators/file_input.md b/docs/operators/file_input.md index 21f31c9b..c3a1c605 100644 --- a/docs/operators/file_input.md +++ b/docs/operators/file_input.md @@ -12,6 +12,7 @@ The `file_input` operator reads logs from files. It will place the lines read in | `exclude` | [] | A list of file glob patterns to exclude from reading | | `poll_interval` | 200ms | The duration between filesystem polls | | `multiline` | | A `multiline` configuration block. See below for details | +| `force_flush_period` | `0s` | Time since last read of data from file, after which currently buffered log should be send to pipeline. Takes [duration](../types/duration.md) as value. Zero means waiting for new data forever | | `write_to` | `$body` | The body [field](/docs/types/field.md) written to when creating a new log entry | | `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options | | `include_file_name` | `true` | Whether to add the file name as the attribute `file.name` | @@ -39,8 +40,7 @@ match either the beginning of a new log entry, or the end of a log entry. If using multiline, last log can sometimes be not flushed due to waiting for more content. In order to forcefully flush last buffered log after certain period of time, -set `force_flush_period` option to [duration string](https://golang.org/pkg/time/#ParseDuration), -eg: `5s`, `1m`. It's by default `0s` which means, that no force flushing will be performed. +use `force_flush_period` option. Also refer to [recombine](/docs/operators/recombine.md) operator for merging events with greater control.