diff --git a/docs/operators/file_input.md b/docs/operators/file_input.md index c4f56066..c3a1c605 100644 --- a/docs/operators/file_input.md +++ b/docs/operators/file_input.md @@ -12,6 +12,7 @@ The `file_input` operator reads logs from files. It will place the lines read in | `exclude` | [] | A list of file glob patterns to exclude from reading | | `poll_interval` | 200ms | The duration between filesystem polls | | `multiline` | | A `multiline` configuration block. See below for details | +| `force_flush_period` | `0s` | Time since last read of data from file, after which currently buffered log should be send to pipeline. Takes [duration](../types/duration.md) as value. Zero means waiting for new data forever | | `write_to` | `$body` | The body [field](/docs/types/field.md) written to when creating a new log entry | | `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options | | `include_file_name` | `true` | Whether to add the file name as the attribute `file.name` | @@ -37,7 +38,11 @@ If set, the `multiline` configuration block instructs the `file_input` operator The `multiline` configuration block must contain exactly one of `line_start_pattern` or `line_end_pattern`. These are regex patterns that match either the beginning of a new log entry, or the end of a log entry. -Also refer to [recombine](/docs/operators/recombine.md) operator for merging events with greater control. +If using multiline, last log can sometimes be not flushed due to waiting for more content. +In order to forcefully flush last buffered log after certain period of time, +use `force_flush_period` option. + +Also refer to [recombine](/docs/operators/recombine.md) operator for merging events with greater control. ### File rotation diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index 23d27604..29d8bcbe 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -43,6 +43,7 @@ func NewInputConfig(operatorID string) *InputConfig { IncludeFilePath: false, IncludeFileNameResolved: false, IncludeFilePathResolved: false, + Splitter: helper.NewSplitterConfig(), StartAt: "end", FingerprintSize: defaultFingerprintSize, MaxLogSize: defaultMaxLogSize, @@ -58,17 +59,17 @@ type InputConfig struct { Include []string `mapstructure:"include,omitempty" json:"include,omitempty" yaml:"include,omitempty"` Exclude []string `mapstructure:"exclude,omitempty" json:"exclude,omitempty" yaml:"exclude,omitempty"` - PollInterval helper.Duration `mapstructure:"poll_interval,omitempty" json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` - Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"` - IncludeFileName bool `mapstructure:"include_file_name,omitempty" json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"` - IncludeFilePath bool `mapstructure:"include_file_path,omitempty" json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"` - IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty" json:"include_file_name_resolved,omitempty" yaml:"include_file_name_resolved,omitempty"` - IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty" json:"include_file_path_resolved,omitempty" yaml:"include_file_path_resolved,omitempty"` - StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"` - FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty" json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"` - MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` - MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty" json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"` - Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"` + PollInterval helper.Duration `mapstructure:"poll_interval,omitempty" json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"` + IncludeFileName bool `mapstructure:"include_file_name,omitempty" json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"` + IncludeFilePath bool `mapstructure:"include_file_path,omitempty" json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"` + IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty" json:"include_file_name_resolved,omitempty" yaml:"include_file_name_resolved,omitempty"` + IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty" json:"include_file_path_resolved,omitempty" yaml:"include_file_path_resolved,omitempty"` + StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"` + FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty" json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"` + MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"` + MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty" json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"` + Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"` + Splitter helper.SplitterConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"` } // Build will build a file input operator from the supplied configuration @@ -117,7 +118,8 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, return nil, err } - splitFunc, err := c.Multiline.Build(context, encoding.Encoding, false) + // Ensure that multiline is buildable + _, err = c.Splitter.Build(encoding.Encoding, false) if err != nil { return nil, err } @@ -156,13 +158,13 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, InputOperator: inputOperator, Include: c.Include, Exclude: c.Exclude, - SplitFunc: splitFunc, PollInterval: c.PollInterval.Raw(), FilePathField: filePathField, FileNameField: fileNameField, FilePathResolvedField: filePathResolvedField, FileNameResolvedField: fileNameResolvedField, startAtBeginning: startAtBeginning, + Splitter: c.Splitter, queuedMatches: make([]string, 0), encoding: encoding, firstCheck: true, diff --git a/operator/builtin/input/file/config_test.go b/operator/builtin/input/file/config_test.go index 88396682..f9726234 100644 --- a/operator/builtin/input/file/config_test.go +++ b/operator/builtin/input/file/config_test.go @@ -389,9 +389,9 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *InputConfig { cfg := defaultCfg() - newMulti := helper.MultilineConfig{} - newMulti.LineStartPattern = "Start" - cfg.Multiline = newMulti + newSplit := helper.NewSplitterConfig() + newSplit.Multiline.LineStartPattern = "Start" + cfg.Splitter = newSplit return cfg }(), }, @@ -400,9 +400,9 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *InputConfig { cfg := defaultCfg() - newMulti := helper.MultilineConfig{} - newMulti.LineStartPattern = "%" - cfg.Multiline = newMulti + newSplit := helper.NewSplitterConfig() + newSplit.Multiline.LineStartPattern = "%" + cfg.Splitter = newSplit return cfg }(), }, @@ -411,9 +411,9 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *InputConfig { cfg := defaultCfg() - newMulti := helper.MultilineConfig{} - newMulti.LineEndPattern = "Start" - cfg.Multiline = newMulti + newSplit := helper.NewSplitterConfig() + newSplit.Multiline.LineEndPattern = "Start" + cfg.Splitter = newSplit return cfg }(), }, @@ -422,9 +422,9 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *InputConfig { cfg := defaultCfg() - newMulti := helper.MultilineConfig{} - newMulti.LineEndPattern = "%" - cfg.Multiline = newMulti + newSplit := helper.NewSplitterConfig() + newSplit.Multiline.LineEndPattern = "%" + cfg.Splitter = newSplit return cfg }(), }, @@ -564,7 +564,8 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredStartAndEndPatterns", func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ + f.Splitter = helper.NewSplitterConfig() + f.Splitter.Multiline = helper.MultilineConfig{ LineEndPattern: "Exists", LineStartPattern: "Exists", } @@ -575,7 +576,8 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredStartPattern", func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ + f.Splitter = helper.NewSplitterConfig() + f.Splitter.Multiline = helper.MultilineConfig{ LineStartPattern: "START.*", } }, @@ -585,7 +587,8 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredEndPattern", func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ + f.Splitter = helper.NewSplitterConfig() + f.Splitter.Multiline = helper.MultilineConfig{ LineEndPattern: "END.*", } }, @@ -603,7 +606,8 @@ func TestBuild(t *testing.T) { { "LineStartAndEnd", func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ + f.Splitter = helper.NewSplitterConfig() + f.Splitter.Multiline = helper.MultilineConfig{ LineStartPattern: ".*", LineEndPattern: ".*", } @@ -614,7 +618,8 @@ func TestBuild(t *testing.T) { { "NoLineStartOrEnd", func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{} + f.Splitter = helper.NewSplitterConfig() + f.Splitter.Multiline = helper.MultilineConfig{} }, require.NoError, func(t *testing.T, f *InputOperator) {}, @@ -622,7 +627,8 @@ func TestBuild(t *testing.T) { { "InvalidLineStartRegex", func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ + f.Splitter = helper.NewSplitterConfig() + f.Splitter.Multiline = helper.MultilineConfig{ LineStartPattern: "(", } }, @@ -632,7 +638,8 @@ func TestBuild(t *testing.T) { { "InvalidLineEndRegex", func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ + f.Splitter = helper.NewSplitterConfig() + f.Splitter.Multiline = helper.MultilineConfig{ LineEndPattern: "(", } }, @@ -673,7 +680,8 @@ func NewTestInputConfig() *InputConfig { cfg.WriteTo = entry.NewBodyField([]string{}...) cfg.Include = []string{"i1", "i2"} cfg.Exclude = []string{"e1", "e2"} - cfg.Multiline = helper.MultilineConfig{ + cfg.Splitter = helper.NewSplitterConfig() + cfg.Splitter.Multiline = helper.MultilineConfig{ LineStartPattern: "start", LineEndPattern: "end", } @@ -695,8 +703,8 @@ func TestMapStructureDecodeConfigWithHook(t *testing.T) { "exclude": expect.Exclude, "poll_interval": 0.2, "multiline": map[string]interface{}{ - "line_start_pattern": expect.Multiline.LineStartPattern, - "line_end_pattern": expect.Multiline.LineEndPattern, + "line_start_pattern": expect.Splitter.Multiline.LineStartPattern, + "line_end_pattern": expect.Splitter.Multiline.LineEndPattern, }, "include_file_name": true, "include_file_path": false, @@ -731,8 +739,8 @@ func TestMapStructureDecodeConfig(t *testing.T) { "Duration": 200 * 1000 * 1000, }, "multiline": map[string]interface{}{ - "line_start_pattern": expect.Multiline.LineStartPattern, - "line_end_pattern": expect.Multiline.LineEndPattern, + "line_start_pattern": expect.Splitter.Multiline.LineStartPattern, + "line_end_pattern": expect.Splitter.Multiline.LineEndPattern, }, "include_file_name": true, "include_file_path": false, diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index b9686510..b864f6e2 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -15,7 +15,6 @@ package file import ( - "bufio" "bytes" "context" "encoding/json" @@ -44,7 +43,7 @@ type InputOperator struct { FilePathResolvedField entry.Field FileNameResolvedField entry.Field PollInterval time.Duration - SplitFunc bufio.SplitFunc + Splitter helper.SplitterConfig MaxLogSize int MaxConcurrentFiles int SeenPaths map[string]struct{} @@ -323,7 +322,11 @@ func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck boo } // If we don't match any previously known files, create a new reader from scratch - newReader, err := f.NewReader(file.Name(), file, fp) + splitter, err := f.getMultiline() + if err != nil { + return nil, err + } + newReader, err := f.NewReader(file.Name(), file, fp, splitter) if err != nil { return nil, err } @@ -393,7 +396,11 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error { // Decode each of the known files f.knownFiles = make([]*Reader, 0, knownFileCount) for i := 0; i < knownFileCount; i++ { - newReader, err := f.NewReader("", nil, nil) + splitter, err := f.getMultiline() + if err != nil { + return err + } + newReader, err := f.NewReader("", nil, nil, splitter) if err != nil { return err } @@ -405,3 +412,8 @@ func (f *InputOperator) loadLastPollFiles(ctx context.Context) error { return nil } + +// getMultiline returns helper.Splitter structure and error eventually +func (f *InputOperator) getMultiline() (*helper.Splitter, error) { + return f.Splitter.Build(f.encoding.Encoding, false) +} diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index 6e0e48dc..cc0b9fe2 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -296,8 +296,10 @@ func TestStartAtEndNewFile(t *testing.T) { // even if the file doesn't end in a newline func TestNoNewline(t *testing.T) { t.Parallel() - t.Skip() - operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *InputConfig) { + cfg.Splitter = helper.NewSplitterConfig() + cfg.Splitter.Flusher.Period.Duration = time.Nanosecond + }, nil) temp := openTemp(t, tempDir) writeString(t, temp, "testlog1\ntestlog2") @@ -625,7 +627,11 @@ func TestFileReader_FingerprintUpdated(t *testing.T) { tempCopy := openFile(t, temp.Name()) fp, err := operator.NewFingerprint(temp) require.NoError(t, err) - reader, err := operator.NewReader(temp.Name(), tempCopy, fp) + + splitter, err := operator.getMultiline() + require.NoError(t, err) + + reader, err := operator.NewReader(temp.Name(), tempCopy, fp, splitter) require.NoError(t, err) defer reader.Close() @@ -666,7 +672,10 @@ func TestFingerprintGrowsAndStops(t *testing.T) { require.NoError(t, err) require.Equal(t, []byte(""), fp.FirstBytes) - reader, err := operator.NewReader(temp.Name(), tempCopy, fp) + splitter, err := operator.getMultiline() + require.NoError(t, err) + + reader, err := operator.NewReader(temp.Name(), tempCopy, fp, splitter) require.NoError(t, err) defer reader.Close() diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index 90769df8..2bb9efdf 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -26,6 +26,7 @@ import ( "golang.org/x/text/transform" "github.com/open-telemetry/opentelemetry-log-collection/errors" + "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" ) // File attributes contains information about file paths @@ -70,11 +71,13 @@ type Reader struct { decoder *encoding.Decoder decodeBuffer []byte + splitter *helper.Splitter + *zap.SugaredLogger `json:"-"` } // NewReader creates a new file reader -func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint) (*Reader, error) { +func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint, splitter *helper.Splitter) (*Reader, error) { r := &Reader{ Fingerprint: fp, file: file, @@ -83,13 +86,14 @@ func (f *InputOperator) NewReader(path string, file *os.File, fp *Fingerprint) ( decoder: f.encoding.Encoding.NewDecoder(), decodeBuffer: make([]byte, 1<<12), fileAttributes: f.resolveFileAttributes(path), + splitter: splitter, } return r, nil } // Copy creates a deep copy of a Reader func (r *Reader) Copy(file *os.File) (*Reader, error) { - reader, err := r.fileInput.NewReader(r.fileAttributes.Path, file, r.Fingerprint.Copy()) + reader, err := r.fileInput.NewReader(r.fileAttributes.Path, file, r.Fingerprint.Copy(), r.splitter) if err != nil { return nil, err } @@ -116,7 +120,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) { return } - scanner := NewPositionalScanner(r, r.fileInput.MaxLogSize, r.Offset, r.fileInput.SplitFunc) + scanner := NewPositionalScanner(r, r.fileInput.MaxLogSize, r.Offset, r.splitter.SplitFunc) // Iterate over the tokenized file, emitting entries as we go for { @@ -131,8 +135,13 @@ func (r *Reader) ReadToEnd(ctx context.Context) { if err := getScannerError(scanner); err != nil { r.Errorw("Failed during scan", zap.Error(err)) } + + // Force flush eventually in next iteration + r.splitter.CheckAndFlush() break } + // Update information about last flush time + r.splitter.Flushed() if err := r.emit(ctx, scanner.Bytes()); err != nil { r.Error("Failed to emit entry", zap.Error(err)) diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index 952c29c7..3adbcd37 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -98,7 +98,8 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato return nil, err } - splitFunc, err := c.Multiline.Build(context, encoding.Encoding, true) + // Build multiline + splitFunc, err := c.Multiline.Build(encoding.Encoding, true, nil) if err != nil { return nil, err } diff --git a/operator/builtin/input/udp/udp.go b/operator/builtin/input/udp/udp.go index 392355d7..080592ee 100644 --- a/operator/builtin/input/udp/udp.go +++ b/operator/builtin/input/udp/udp.go @@ -81,7 +81,8 @@ func (c UDPInputConfig) Build(context operator.BuildContext) ([]operator.Operato return nil, err } - splitFunc, err := c.Multiline.Build(context, encoding.Encoding, true) + // Build multiline + splitFunc, err := c.Multiline.Build(encoding.Encoding, true, nil) if err != nil { return nil, err } diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index 7aed4e2a..ff1daf4d 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -19,12 +19,88 @@ import ( "bytes" "fmt" "regexp" + "time" "golang.org/x/text/encoding" - - "github.com/open-telemetry/opentelemetry-log-collection/operator" ) +// FlusherConfig is a configuration of Flusher helper +type FlusherConfig struct { + Period Duration `mapstructure:"force_flush_period" json:"force_flush_period" yaml:"force_flush_period"` +} + +// NewFlusherConfig creates a default Flusher config +func NewFlusherConfig() FlusherConfig { + return FlusherConfig{ + // Empty or `0s` means that we will never force flush + Period: Duration{Duration: 0}, + } +} + +// Build creates Flusher from configuration +func (c *FlusherConfig) Build() *Flusher { + return NewFlusher(c.Period) +} + +// Flusher keeps information about flush state +type Flusher struct { + // force is true when data should be flushed as soon as possible + force bool + // forcePeriod defines time from last flush which should pass before setting force to true. + // Never forces if forcePeriod is set to 0 + forcePeriod time.Duration + + // lastFlush > lastForcedFlush => we can force flush if no logs are incoming for forcePeriod + // lastFlush = lastForcedFlush => last flush was forced, so we cannot force, we can update lastFlush + // lastFlush < lastForcedFlush => we just forced flush, set lastFlush to lastForcedFlush + lastFlush time.Time + lastForcedFlush time.Time +} + +// NewFlusher Creates new Flusher with lastFlush set to unix epoch +// and order to not force ongoing flush +func NewFlusher(forcePeriod Duration) *Flusher { + return &Flusher{ + force: false, + lastFlush: time.Now(), + forcePeriod: forcePeriod.Raw(), + lastForcedFlush: time.Unix(0, 0), + } +} + +// Flushed update lastFlush with current timestamp +func (f *Flusher) Flushed() { + if f.lastFlush.Sub(f.lastForcedFlush) < 0 { + f.lastFlush = f.lastForcedFlush + } else { + f.lastFlush = time.Now() + } +} + +// CheckAndFlush sets internal flag to true if data is going to be force flushed +func (f *Flusher) CheckAndFlush() { + if f.forcePeriod > 0 && time.Since(f.lastFlush) > f.forcePeriod && f.lastFlush.Sub(f.lastForcedFlush) > 0 { + f.force = true + } +} + +// ForceFlushed update struct fields after forced flush +func (f *Flusher) ForceFlushed() { + f.force = false + f.lastForcedFlush = time.Now() +} + +// ShouldFlush returns true if data should be forcefully flushed +func (f *Flusher) ShouldFlush() bool { + return f.force +} + +// Multiline consists of splitFunc and variables needed to perform force flush +type Multiline struct { + SplitFunc bufio.SplitFunc + Force *Flusher +} + // NewBasicConfig creates a new Multiline config func NewMultilineConfig() MultilineConfig { return MultilineConfig{ @@ -40,12 +116,12 @@ type MultilineConfig struct { } // Build will build a Multiline operator. -func (c MultilineConfig) Build(context operator.BuildContext, encoding encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) { - return c.getSplitFunc(encoding, flushAtEOF) +func (c MultilineConfig) Build(encoding encoding.Encoding, flushAtEOF bool, force *Flusher) (bufio.SplitFunc, error) { + return c.getSplitFunc(encoding, flushAtEOF, force) } // getSplitFunc returns split function for bufio.Scanner basing on configured pattern -func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) { +func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *Flusher) (bufio.SplitFunc, error) { endPattern := c.LineEndPattern startPattern := c.LineStartPattern @@ -53,19 +129,19 @@ func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding, flushAtEOF boo case endPattern != "" && startPattern != "": return nil, fmt.Errorf("only one of line_start_pattern or line_end_pattern can be set") case endPattern == "" && startPattern == "": - return NewNewlineSplitFunc(encoding, flushAtEOF) + return NewNewlineSplitFunc(encoding, flushAtEOF, force) case endPattern != "": re, err := regexp.Compile("(?m)" + c.LineEndPattern) if err != nil { return nil, fmt.Errorf("compile line end regex: %s", err) } - return NewLineEndSplitFunc(re, flushAtEOF), nil + return NewLineEndSplitFunc(re, flushAtEOF, force), nil case startPattern != "": re, err := regexp.Compile("(?m)" + c.LineStartPattern) if err != nil { return nil, fmt.Errorf("compile line start regex: %s", err) } - return NewLineStartSplitFunc(re, flushAtEOF), nil + return NewLineStartSplitFunc(re, flushAtEOF, force), nil default: return nil, fmt.Errorf("unreachable") } @@ -73,8 +149,15 @@ func (c MultilineConfig) getSplitFunc(encoding encoding.Encoding, flushAtEOF boo // NewLineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that start with a match to the regex pattern provided -func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { +func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *Flusher) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + if force != nil && force.ShouldFlush() { + force.ForceFlushed() + token = trimWhitespaces(data) + advance = len(data) + return + } + firstLoc := re.FindIndex(data) if firstLoc == nil { // Flush if no more data is expected @@ -96,7 +179,7 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { } if firstMatchEnd == len(data) { - // the first match goes to the end of the buffer, so don't look for a second match + // the first match goes to the end of the bufer, so don't look for a second match return 0, nil, nil } @@ -107,12 +190,12 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { return } - secondLocOffset := firstMatchEnd + 1 - secondLoc := re.FindIndex(data[secondLocOffset:]) + secondLocOfset := firstMatchEnd + 1 + secondLoc := re.FindIndex(data[secondLocOfset:]) if secondLoc == nil { return 0, nil, nil // read more data and try again } - secondMatchStart := secondLoc[0] + secondLocOffset + secondMatchStart := secondLoc[0] + secondLocOfset advance = secondMatchStart // start scanning at the beginning of the second match token = trimWhitespaces(data[firstMatchStart:secondMatchStart]) // the token begins at the first match, and ends at the beginning of the second match @@ -123,8 +206,14 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { // NewLineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that end with a match to the regex pattern provided -func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { +func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, force *Flusher) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + if force != nil && force.ShouldFlush() { + force.ForceFlushed() + token = trimWhitespaces(data) + advance = len(data) + return + } loc := re.FindIndex(data) if loc == nil { // Flush if no more data is expected @@ -136,7 +225,7 @@ func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { return 0, nil, nil // read more data and try again } - // If the match goes up to the end of the current buffer, do another + // If the match goes up to the end of the current bufer, do another // read until we can capture the entire match if loc[1] == len(data)-1 && !atEOF { return 0, nil, nil @@ -151,7 +240,7 @@ func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { // NewNewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but // never returning an token using EOF as a terminator -func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) { +func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool, force *Flusher) (bufio.SplitFunc, error) { newline, err := encodedNewline(encoding) if err != nil { return nil, err @@ -163,6 +252,12 @@ func NewNewlineSplitFunc(encoding encoding.Encoding, flushAtEOF bool) (bufio.Spl } return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + if force != nil && force.ShouldFlush() { + force.ForceFlushed() + token = trimWhitespaces(data) + advance = len(data) + return + } if atEOF && len(data) == 0 { return 0, nil, nil } @@ -202,3 +297,49 @@ func trimWhitespaces(data []byte) []byte { // TrimRight to strip all whitespaces from the end of log return bytes.TrimLeft(bytes.TrimRight(data, "\r\n\t "), "\r\n") } + +// SplitterConfig consolidates MultilineConfig and FlusherConfig +type SplitterConfig struct { + Multiline MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"` + Flusher FlusherConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"` +} + +// NewSplitterConfig returns default SplitterConfig +func NewSplitterConfig() SplitterConfig { + return SplitterConfig{ + Multiline: NewMultilineConfig(), + Flusher: NewFlusherConfig(), + } +} + +// Build builds Splitter struct +func (c *SplitterConfig) Build(encoding encoding.Encoding, flushAtEOF bool) (*Splitter, error) { + flusher := c.Flusher.Build() + splitFunc, err := c.Multiline.Build(encoding, flushAtEOF, flusher) + + if err != nil { + return nil, err + } + + return &Splitter{ + Flusher: flusher, + SplitFunc: splitFunc, + }, nil +} + +// Splitter consolidates Flusher and dependent splitFunc +type Splitter struct { + SplitFunc bufio.SplitFunc + Flusher *Flusher +} + +// Flushed informs Flusher that Flushed had been performed +func (s *Splitter) Flushed() { + s.Flusher.Flushed() +} + +// CheckAndFlush instructs Flusher to check if next log should be forcefully flushed +// and set appropriate flags if yes +func (s *Splitter) CheckAndFlush() { + s.Flusher.CheckAndFlush() +} diff --git a/operator/helper/multiline_test.go b/operator/helper/multiline_test.go index b8364587..4bc7b6fc 100644 --- a/operator/helper/multiline_test.go +++ b/operator/helper/multiline_test.go @@ -20,6 +20,7 @@ import ( "errors" "regexp" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -33,6 +34,7 @@ type tokenizerTestCase struct { Raw []byte ExpectedTokenized []string ExpectedError error + Flusher *Flusher } func (tc tokenizerTestCase) RunFunc(splitFunc bufio.SplitFunc) func(t *testing.T) { @@ -143,19 +145,42 @@ func TestLineStartSplitFunc(t *testing.T) { "LOGSTART 17 log2\nLOGPART log2\nanother line", }, }, + { + Name: "LogsWithoutFlusher", + Pattern: `^LOGSTART \d+`, + Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), + ExpectedTokenized: []string{}, + Flusher: &Flusher{ + force: false, + lastForcedFlush: time.Now(), + }, + }, + { + Name: "LogsWithFlusher", + Pattern: `^LOGSTART \d+`, + Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), + ExpectedTokenized: []string{ + "LOGPART log1\nLOGPART log1", + }, + Flusher: &Flusher{ + force: true, + lastForcedFlush: time.Now(), + }, + }, } for _, tc := range testCases { cfg := &MultilineConfig{ LineStartPattern: tc.Pattern, } - splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false) + + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, tc.Flusher) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } t.Run("FirstMatchHitsEndOfBuffer", func(t *testing.T) { - splitFunc := NewLineStartSplitFunc(regexp.MustCompile("LOGSTART"), false) + splitFunc := NewLineStartSplitFunc(regexp.MustCompile("LOGSTART"), false, nil) data := []byte(`LOGSTART`) t.Run("NotAtEOF", func(t *testing.T) { @@ -260,13 +285,36 @@ func TestLineEndSplitFunc(t *testing.T) { "LOGSTART 17 log2\nLOGPART log2\nLOGEND log2", }, }, + { + Name: "LogsWithoutFlusher", + Pattern: `^LOGEND.*$`, + Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), + ExpectedTokenized: []string{}, + Flusher: &Flusher{ + force: false, + lastForcedFlush: time.Now(), + }, + }, + { + Name: "LogsWithFlusher", + Pattern: `^LOGEND.*$`, + Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), + ExpectedTokenized: []string{ + "LOGPART log1\nLOGPART log1", + }, + Flusher: &Flusher{ + force: true, + lastForcedFlush: time.Now(), + }, + }, } for _, tc := range testCases { cfg := &MultilineConfig{ LineEndPattern: tc.Pattern, } - splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false) + + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, tc.Flusher) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } @@ -341,10 +389,32 @@ func TestNewlineSplitFunc(t *testing.T) { ExpectedTokenized: []string{}, ExpectedError: errors.New("bufio.Scanner: token too long"), }, + { + Name: "LogsWithoutFlusher", + Pattern: `^LOGEND.*$`, + Raw: []byte("LOGPART log1"), + ExpectedTokenized: []string{}, + Flusher: &Flusher{ + force: false, + lastForcedFlush: time.Now(), + }, + }, + { + Name: "LogsWithFlusher", + Pattern: `^LOGEND.*$`, + Raw: []byte("LOGPART log1"), + ExpectedTokenized: []string{ + "LOGPART log1", + }, + Flusher: &Flusher{ + force: true, + lastForcedFlush: time.Now(), + }, + }, } for _, tc := range testCases { - splitFunc, err := NewNewlineSplitFunc(unicode.UTF8, false) + splitFunc, err := NewNewlineSplitFunc(unicode.UTF8, false, tc.Flusher) require.NoError(t, err) t.Run(tc.Name, tc.RunFunc(splitFunc)) } @@ -397,7 +467,7 @@ func TestNewlineSplitFunc_Encodings(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - splitFunc, err := NewNewlineSplitFunc(tc.encoding, false) + splitFunc, err := NewNewlineSplitFunc(tc.encoding, false, nil) require.NoError(t, err) scanner := bufio.NewScanner(bytes.NewReader(tc.input)) scanner.Split(splitFunc)