diff --git a/.chloggen/pkg-stanza-flush.yaml b/.chloggen/pkg-stanza-flush.yaml new file mode 100755 index 000000000000..bc731e2f1c1a --- /dev/null +++ b/.chloggen/pkg-stanza-flush.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Remove Flusher from tokenize.SplitterConfig + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26517] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Removes the following in favor of flush.WithPeriod + - tokenize.DefaultFlushPeriod + - tokenize.FlusherConfig + - tokenize.NewFlusherConfig + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index f015d3d8a4fa..25b39075f030 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -28,6 +28,7 @@ const ( defaultMaxLogSize = 1024 * 1024 defaultMaxConcurrentFiles = 1024 defaultEncoding = "utf-8" + defaultFlushPeriod = 500 * time.Millisecond ) var allowFileDeletion = featuregate.GlobalRegistry().MustRegister( @@ -79,6 +80,7 @@ type Config struct { Splitter tokenize.SplitterConfig `mapstructure:",squash,omitempty"` TrimConfig trim.Config `mapstructure:",squash,omitempty"` Encoding string `mapstructure:"encoding,omitempty"` + FlushPeriod time.Duration `mapstructure:"force_flush_period,omitempty"` Header *HeaderConfig `mapstructure:"header,omitempty"` } @@ -99,7 +101,7 @@ func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback) (*Manager, } // Ensure that splitter is buildable - factory := splitter.NewMultilineFactory(c.Splitter, enc, int(c.MaxLogSize), c.TrimConfig.Func()) + factory := splitter.NewMultilineFactory(c.Splitter, enc, int(c.MaxLogSize), c.TrimConfig.Func(), c.FlushPeriod) if _, err := factory.Build(); err != nil { return nil, err } @@ -118,7 +120,7 @@ func (c Config) BuildWithSplitFunc(logger *zap.SugaredLogger, emit emit.Callback } // Ensure that splitter is buildable - factory := splitter.NewCustomFactory(c.Splitter.Flusher, splitFunc) + factory := splitter.NewCustomFactory(splitFunc, c.FlushPeriod) if _, err := factory.Build(); err != nil { return nil, err } diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index bcd91d530f6c..5d34cd04d799 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -548,7 +548,7 @@ func TestNoNewline(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" cfg.Splitter = tokenize.NewSplitterConfig() - cfg.Splitter.Flusher.Period = time.Nanosecond + cfg.FlushPeriod = time.Nanosecond operator, emitCalls := buildTestManager(t, cfg) temp := openTemp(t, tempDir) diff --git a/pkg/stanza/fileconsumer/internal/splitter/custom.go b/pkg/stanza/fileconsumer/internal/splitter/custom.go index 712c63eb1427..04bdf6cdc650 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/custom.go +++ b/pkg/stanza/fileconsumer/internal/splitter/custom.go @@ -5,26 +5,27 @@ package splitter // import "github.com/open-telemetry/opentelemetry-collector-co import ( "bufio" + "time" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) type customFactory struct { - flusherCfg tokenize.FlusherConfig - splitFunc bufio.SplitFunc + splitFunc bufio.SplitFunc + flushPeriod time.Duration } var _ Factory = (*customFactory)(nil) -func NewCustomFactory(flusherCfg tokenize.FlusherConfig, splitFunc bufio.SplitFunc) Factory { +func NewCustomFactory(splitFunc bufio.SplitFunc, flushPeriod time.Duration) Factory { return &customFactory{ - flusherCfg: flusherCfg, - splitFunc: splitFunc, + splitFunc: splitFunc, + flushPeriod: flushPeriod, } } // Build builds Multiline Splitter struct func (f *customFactory) Build() (bufio.SplitFunc, error) { - return f.flusherCfg.Wrap(f.splitFunc, trim.Nop), nil + return flush.WithPeriod(f.splitFunc, trim.Nop, f.flushPeriod), nil } diff --git a/pkg/stanza/fileconsumer/internal/splitter/custom_test.go b/pkg/stanza/fileconsumer/internal/splitter/custom_test.go index 6ccdeb22f220..54002d18fb0d 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/custom_test.go +++ b/pkg/stanza/fileconsumer/internal/splitter/custom_test.go @@ -6,36 +6,30 @@ package splitter import ( "bufio" "testing" + "time" "github.com/stretchr/testify/assert" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" ) func TestCustomFactory(t *testing.T) { - type fields struct { - Flusher tokenize.FlusherConfig - Splitter bufio.SplitFunc - } tests := []struct { - name string - fields fields - wantErr bool + name string + splitter bufio.SplitFunc + flushPeriod time.Duration + wantErr bool }{ { name: "default configuration", - fields: fields{ - Flusher: tokenize.NewFlusherConfig(), - Splitter: func(data []byte, atEOF bool) (advance int, token []byte, err error) { - return len(data), data, nil - }, + splitter: func(data []byte, atEOF bool) (advance int, token []byte, err error) { + return len(data), data, nil }, - wantErr: false, + flushPeriod: 100 * time.Millisecond, + wantErr: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - factory := NewCustomFactory(tt.fields.Flusher, tt.fields.Splitter) + factory := NewCustomFactory(tt.splitter, tt.flushPeriod) got, err := factory.Build() if (err != nil) != tt.wantErr { t.Errorf("Build() error = %v, wantErr %v", err, tt.wantErr) diff --git a/pkg/stanza/fileconsumer/internal/splitter/multiline.go b/pkg/stanza/fileconsumer/internal/splitter/multiline.go index a8b882cd3c00..258883e5e6f1 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/multiline.go +++ b/pkg/stanza/fileconsumer/internal/splitter/multiline.go @@ -5,9 +5,11 @@ package splitter // import "github.com/open-telemetry/opentelemetry-collector-co import ( "bufio" + "time" "golang.org/x/text/encoding" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) @@ -17,6 +19,7 @@ type multilineFactory struct { encoding encoding.Encoding maxLogSize int trimFunc trim.Func + flushPeriod time.Duration } var _ Factory = (*multilineFactory)(nil) @@ -26,16 +29,22 @@ func NewMultilineFactory( encoding encoding.Encoding, maxLogSize int, trimFunc trim.Func, + flushPeriod time.Duration, ) Factory { return &multilineFactory{ splitterCfg: splitterCfg, encoding: encoding, maxLogSize: maxLogSize, trimFunc: trimFunc, + flushPeriod: flushPeriod, } } // Build builds Multiline Splitter struct func (f *multilineFactory) Build() (bufio.SplitFunc, error) { - return f.splitterCfg.Build(f.encoding, false, f.maxLogSize, f.trimFunc) + splitFunc, err := f.splitterCfg.Build(f.encoding, false, f.maxLogSize, f.trimFunc) + if err != nil { + return nil, err + } + return flush.WithPeriod(splitFunc, f.trimFunc, f.flushPeriod), nil } diff --git a/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go b/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go index cd8559245ede..9fac28465e9b 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go +++ b/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go @@ -5,6 +5,7 @@ package splitter import ( "testing" + "time" "github.com/stretchr/testify/assert" "golang.org/x/text/encoding" @@ -20,6 +21,7 @@ func TestMultilineBuild(t *testing.T) { splitterConfig tokenize.SplitterConfig encoding encoding.Encoding maxLogSize int + flushPeriod time.Duration wantErr bool }{ { @@ -27,25 +29,26 @@ func TestMultilineBuild(t *testing.T) { splitterConfig: tokenize.NewSplitterConfig(), encoding: unicode.UTF8, maxLogSize: 1024, + flushPeriod: 100 * time.Millisecond, wantErr: false, }, { name: "Multiline error", splitterConfig: tokenize.SplitterConfig{ - Flusher: tokenize.NewFlusherConfig(), Multiline: tokenize.MultilineConfig{ LineStartPattern: "START", LineEndPattern: "END", }, }, - encoding: unicode.UTF8, - maxLogSize: 1024, - wantErr: true, + flushPeriod: 100 * time.Millisecond, + encoding: unicode.UTF8, + maxLogSize: 1024, + wantErr: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - factory := NewMultilineFactory(tt.splitterConfig, tt.encoding, tt.maxLogSize, trim.Nop) + factory := NewMultilineFactory(tt.splitterConfig, tt.encoding, tt.maxLogSize, trim.Nop, tt.flushPeriod) got, err := factory.Build() if (err != nil) != tt.wantErr { t.Errorf("Build() error = %v, wantErr %v", err, tt.wantErr) diff --git a/pkg/stanza/fileconsumer/reader_test.go b/pkg/stanza/fileconsumer/reader_test.go index 1d49c4075767..ec444834fc97 100644 --- a/pkg/stanza/fileconsumer/reader_test.go +++ b/pkg/stanza/fileconsumer/reader_test.go @@ -10,7 +10,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/text/encoding/unicode" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" @@ -25,9 +24,7 @@ import ( func TestPersistFlusher(t *testing.T) { flushPeriod := 100 * time.Millisecond - sCfg := tokenize.NewSplitterConfig() - sCfg.Flusher.Period = flushPeriod - f, emitChan := testReaderFactoryWithSplitter(t, sCfg) + f, emitChan := testReaderFactory(t, tokenize.NewSplitterConfig(), defaultMaxLogSize, flushPeriod) temp := openTemp(t, t.TempDir()) fp, err := f.newFingerprint(temp) @@ -113,7 +110,7 @@ func TestTokenization(t *testing.T) { for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { - f, emitChan := testReaderFactory(t) + f, emitChan := testReaderFactory(t, tokenize.NewSplitterConfig(), defaultMaxLogSize, defaultFlushPeriod) temp := openTemp(t, t.TempDir()) _, err := temp.Write(tc.fileContent) @@ -143,8 +140,7 @@ func TestTokenizationTooLong(t *testing.T) { []byte("aaa"), } - f, emitChan := testReaderFactory(t) - f.readerConfig.maxLogSize = 10 + f, emitChan := testReaderFactory(t, tokenize.NewSplitterConfig(), 10, defaultFlushPeriod) temp := openTemp(t, t.TempDir()) _, err := temp.Write(fileContent) @@ -174,15 +170,9 @@ func TestTokenizationTooLongWithLineStartPattern(t *testing.T) { []byte("2023-01-01 2"), } - f, emitChan := testReaderFactory(t) - - mlc := tokenize.NewMultilineConfig() - mlc.LineStartPattern = `\d+-\d+-\d+` - f.splitterFactory = splitter.NewMultilineFactory(tokenize.SplitterConfig{ - Flusher: tokenize.NewFlusherConfig(), - Multiline: mlc, - }, unicode.UTF8, 15, trim.Whitespace) - f.readerConfig.maxLogSize = 15 + sCfg := tokenize.NewSplitterConfig() + sCfg.Multiline.LineStartPattern = `\d+-\d+-\d+` + f, emitChan := testReaderFactory(t, sCfg, 15, defaultFlushPeriod) temp := openTemp(t, t.TempDir()) _, err := temp.Write(fileContent) @@ -205,8 +195,7 @@ func TestTokenizationTooLongWithLineStartPattern(t *testing.T) { func TestHeaderFingerprintIncluded(t *testing.T) { fileContent := []byte("#header-line\naaa\n") - f, _ := testReaderFactory(t) - f.readerConfig.maxLogSize = 10 + f, _ := testReaderFactory(t, tokenize.NewSplitterConfig(), 10, defaultFlushPeriod) regexConf := regex.NewConfig() regexConf.Regex = "^#(?P
.*)" @@ -234,11 +223,7 @@ func TestHeaderFingerprintIncluded(t *testing.T) { require.Equal(t, []byte("#header-line\naaa\n"), r.Fingerprint.FirstBytes) } -func testReaderFactory(t *testing.T) (*readerFactory, chan *emitParams) { - return testReaderFactoryWithSplitter(t, tokenize.NewSplitterConfig()) -} - -func testReaderFactoryWithSplitter(t *testing.T, splitterConfig tokenize.SplitterConfig) (*readerFactory, chan *emitParams) { +func testReaderFactory(t *testing.T, sCfg tokenize.SplitterConfig, maxLogSize int, flushPeriod time.Duration) (*readerFactory, chan *emitParams) { emitChan := make(chan *emitParams, 100) enc, err := decode.LookupEncoding(defaultEncoding) trimFunc := trim.Whitespace @@ -247,11 +232,11 @@ func testReaderFactoryWithSplitter(t *testing.T, splitterConfig tokenize.Splitte SugaredLogger: testutil.Logger(t), readerConfig: &readerConfig{ fingerprintSize: fingerprint.DefaultSize, - maxLogSize: defaultMaxLogSize, + maxLogSize: maxLogSize, emit: testEmitFunc(emitChan), }, fromBeginning: true, - splitterFactory: splitter.NewMultilineFactory(splitterConfig, enc, defaultMaxLogSize, trimFunc), + splitterFactory: splitter.NewMultilineFactory(sCfg, enc, maxLogSize, trimFunc, flushPeriod), encoding: enc, }, emitChan } diff --git a/pkg/stanza/tokenize/flusher.go b/pkg/stanza/flush/flush.go similarity index 79% rename from pkg/stanza/tokenize/flusher.go rename to pkg/stanza/flush/flush.go index 60a44d4f62a8..f42e18c82370 100644 --- a/pkg/stanza/tokenize/flusher.go +++ b/pkg/stanza/flush/flush.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package tokenize // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" +package flush // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush" import ( "bufio" @@ -10,26 +10,14 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) -const DefaultFlushPeriod = 500 * time.Millisecond - -// FlusherConfig is a configuration of Flusher helper -type FlusherConfig struct { - Period time.Duration `mapstructure:"force_flush_period"` -} - -// NewFlusherConfig creates a default Flusher config -func NewFlusherConfig() FlusherConfig { - return FlusherConfig{ - // Empty or `0s` means that we will never force flush - Period: DefaultFlushPeriod, - } -} - // Wrap a bufio.SplitFunc with a flusher -func (c *FlusherConfig) Wrap(splitFunc bufio.SplitFunc, trimFunc trim.Func) bufio.SplitFunc { +func WithPeriod(splitFunc bufio.SplitFunc, trimFunc trim.Func, period time.Duration) bufio.SplitFunc { + if period <= 0 { + return splitFunc + } f := &flusher{ lastDataChange: time.Now(), - forcePeriod: c.Period, + forcePeriod: period, previousDataLength: 0, } return f.splitFunc(splitFunc, trimFunc) diff --git a/pkg/stanza/flush/flush_test.go b/pkg/stanza/flush/flush_test.go new file mode 100644 index 000000000000..25d3aec0212b --- /dev/null +++ b/pkg/stanza/flush/flush_test.go @@ -0,0 +1,102 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package flush + +import ( + "bufio" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" +) + +func TestFlusher(t *testing.T) { + + // bufio.ScanWords is a simple split function which with tokenize based on newlines. + // It will return a partial token if atEOF=true. In order to test the flusher, + // we don't want the split func to return partial tokens on its own. Instead, we only + // want the flusher to force the partial out based on its own behavior. Therefore, we + // always use atEOF=false. + + flushPeriod := 100 * time.Millisecond + f := WithPeriod(bufio.ScanWords, trim.Nop, flushPeriod) + + content := []byte("foo bar hellowo") + + // The first token is complete + advance, token, err := f(content, false) + assert.NoError(t, err) + assert.Equal(t, 4, advance) + assert.Equal(t, []byte("foo"), token) + + // The second token is also complete + advance, token, err = f(content[4:], false) + assert.NoError(t, err) + assert.Equal(t, 4, advance) + assert.Equal(t, []byte("bar"), token) + + // We find a partial token, but we just updated, so don't flush it yet + advance, token, err = f(content[8:], false) + assert.NoError(t, err) + assert.Equal(t, 0, advance) + assert.Equal(t, []byte(nil), token) + + // We find the same partial token, but we updated quite recently, so still don't flush it yet + advance, token, err = f(content[8:], false) + assert.NoError(t, err) + assert.Equal(t, 0, advance) + assert.Equal(t, []byte(nil), token) + + time.Sleep(2 * flushPeriod) + + // Now it's been a while, so we should just flush the partial token + advance, token, err = f(content[8:], false) + assert.NoError(t, err) + assert.Equal(t, 7, advance) + assert.Equal(t, []byte("hellowo"), token) +} + +func TestNoFlushPeriod(t *testing.T) { + // Same test as above, but with a flush period of 0 we should never force flush. + // In other words, we should expect exactly the behavior of bufio.ScanWords. + + flushPeriod := time.Duration(0) + f := WithPeriod(bufio.ScanWords, trim.Nop, flushPeriod) + + content := []byte("foo bar hellowo") + + // The first token is complete + advance, token, err := f(content, false) + assert.NoError(t, err) + assert.Equal(t, 4, advance) + assert.Equal(t, []byte("foo"), token) + + // The second token is also complete + advance, token, err = f(content[4:], false) + assert.NoError(t, err) + assert.Equal(t, 4, advance) + assert.Equal(t, []byte("bar"), token) + + // We find a partial token, but we're using flushPeriod = 0 so we should never flush + advance, token, err = f(content[8:], false) + assert.NoError(t, err) + assert.Equal(t, 0, advance) + assert.Equal(t, []byte(nil), token) + + // We find the same partial token, but we're using flushPeriod = 0 so we should never flush + advance, token, err = f(content[8:], false) + assert.NoError(t, err) + assert.Equal(t, 0, advance) + assert.Equal(t, []byte(nil), token) + + time.Sleep(2 * flushPeriod) + + // Now it's been a while, but we are using flushPeriod=0, so we should never not flush + advance, token, err = f(content[8:], false) + assert.NoError(t, err) + assert.Equal(t, 0, advance) + assert.Equal(t, []byte(nil), token) +} diff --git a/pkg/stanza/tokenize/multiline_test.go b/pkg/stanza/tokenize/multiline_test.go index a8c85db245e5..7c85799c54ef 100644 --- a/pkg/stanza/tokenize/multiline_test.go +++ b/pkg/stanza/tokenize/multiline_test.go @@ -10,7 +10,6 @@ import ( "fmt" "regexp" "testing" - "time" "github.com/stretchr/testify/require" "golang.org/x/text/encoding" @@ -20,15 +19,8 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) -const ( - // Those values has been experimentally figured out for windows - sleepDuration time.Duration = time.Millisecond * 80 - forcePeriod time.Duration = time.Millisecond * 40 -) - type MultiLineTokenizerTestCase struct { tokenizetest.TestCase - Flusher *FlusherConfig } func TestLineStartSplitFunc(t *testing.T) { @@ -42,7 +34,6 @@ func TestLineStartSplitFunc(t *testing.T) { `LOGSTART 123 log1`, }, }, - nil, }, { tokenizetest.TestCase{ @@ -54,7 +45,6 @@ func TestLineStartSplitFunc(t *testing.T) { `LOGSTART 234 log2`, }, }, - nil, }, { tokenizetest.TestCase{ @@ -66,7 +56,6 @@ func TestLineStartSplitFunc(t *testing.T) { "LOGSTART 234 log2", }, }, - nil, }, { tokenizetest.TestCase{ @@ -74,7 +63,6 @@ func TestLineStartSplitFunc(t *testing.T) { Pattern: `LOGSTART \d+ `, Input: []byte(`file that has no matches in it`), }, - nil, }, { tokenizetest.TestCase{ @@ -86,7 +74,6 @@ func TestLineStartSplitFunc(t *testing.T) { `LOGSTART 123 part that matches`, }, }, - nil, }, { tokenizetest.TestCase{ @@ -102,7 +89,6 @@ func TestLineStartSplitFunc(t *testing.T) { `LOGSTART 123 ` + string(tokenizetest.GenerateBytes(100)), }, }, - nil, }, { tokenizetest.TestCase{ @@ -118,7 +104,6 @@ func TestLineStartSplitFunc(t *testing.T) { `LOGSTART 123 ` + string(tokenizetest.GenerateBytes(10000)), }, }, - nil, }, { tokenizetest.TestCase{ @@ -132,7 +117,6 @@ func TestLineStartSplitFunc(t *testing.T) { }(), ExpectedError: errors.New("bufio.Scanner: token too long"), }, - nil, }, { tokenizetest.TestCase{ @@ -144,7 +128,6 @@ func TestLineStartSplitFunc(t *testing.T) { "LOGSTART 17 log2\nLOGPART log2\nanother line", }, }, - nil, }, { tokenizetest.TestCase{ @@ -152,62 +135,6 @@ func TestLineStartSplitFunc(t *testing.T) { Pattern: `^LOGSTART \d+`, Input: []byte("LOGPART log1\nLOGPART log1\t \n"), }, - nil, - }, - { - tokenizetest.TestCase{ - Name: "LogsWithFlusher", - Pattern: `^LOGSTART \d+`, - Input: []byte("LOGPART log1\nLOGPART log1\t \n"), - ExpectedTokens: []string{ - "LOGPART log1\nLOGPART log1", - }, - - AdditionalIterations: 1, - Sleep: sleepDuration, - }, - &FlusherConfig{Period: forcePeriod}, - }, - { - tokenizetest.TestCase{ - Name: "LogsWithFlusherWithMultipleLogsInBuffer", - Pattern: `^LOGSTART \d+`, - Input: []byte("LOGPART log1\nLOGSTART 123\nLOGPART log1\t \n"), - ExpectedTokens: []string{ - "LOGPART log1", - "LOGSTART 123\nLOGPART log1", - }, - AdditionalIterations: 1, - Sleep: sleepDuration, - }, - &FlusherConfig{Period: forcePeriod}, - }, - { - tokenizetest.TestCase{ - Name: "LogsWithLongFlusherWithMultipleLogsInBuffer", - Pattern: `^LOGSTART \d+`, - Input: []byte("LOGPART log1\nLOGSTART 123\nLOGPART log1\t \n"), - ExpectedTokens: []string{ - "LOGPART log1", - }, - AdditionalIterations: 1, - Sleep: forcePeriod / 4, - }, - &FlusherConfig{Period: 16 * forcePeriod}, - }, - { - tokenizetest.TestCase{ - Name: "LogsWithFlusherWithLogStartingWithWhiteChars", - Pattern: `^LOGSTART \d+`, - Input: []byte("\nLOGSTART 333"), - ExpectedTokens: []string{ - "", - "LOGSTART 333", - }, - AdditionalIterations: 1, - Sleep: sleepDuration, - }, - &FlusherConfig{Period: forcePeriod}, }, } @@ -222,9 +149,6 @@ func TestLineStartSplitFunc(t *testing.T) { }.Func() splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, trimFunc) require.NoError(t, err) - if tc.Flusher != nil { - splitFunc = tc.Flusher.Wrap(splitFunc, trimFunc) - } t.Run(tc.Name, tc.Run(splitFunc)) } @@ -259,7 +183,6 @@ func TestLineEndSplitFunc(t *testing.T) { `my log LOGEND 123`, }, }, - nil, }, { tokenizetest.TestCase{ @@ -271,7 +194,6 @@ func TestLineEndSplitFunc(t *testing.T) { `log2 LOGEND 234`, }, }, - nil, }, { tokenizetest.TestCase{ @@ -283,7 +205,6 @@ func TestLineEndSplitFunc(t *testing.T) { "log2 LOGEND", }, }, - nil, }, { tokenizetest.TestCase{ @@ -291,7 +212,6 @@ func TestLineEndSplitFunc(t *testing.T) { Pattern: `LOGEND \d+`, Input: []byte(`file that has no matches in it`), }, - nil, }, { tokenizetest.TestCase{ @@ -302,7 +222,6 @@ func TestLineEndSplitFunc(t *testing.T) { `part that matches LOGEND 123`, }, }, - nil, }, { tokenizetest.TestCase{ @@ -317,7 +236,6 @@ func TestLineEndSplitFunc(t *testing.T) { string(tokenizetest.GenerateBytes(100)) + `LOGEND 1`, }, }, - nil, }, { tokenizetest.TestCase{ @@ -332,7 +250,6 @@ func TestLineEndSplitFunc(t *testing.T) { string(tokenizetest.GenerateBytes(10000)) + `LOGEND 1`, }, }, - nil, }, { tokenizetest.TestCase{ @@ -345,7 +262,6 @@ func TestLineEndSplitFunc(t *testing.T) { }(), ExpectedError: errors.New("bufio.Scanner: token too long"), }, - nil, }, { tokenizetest.TestCase{ @@ -357,7 +273,6 @@ func TestLineEndSplitFunc(t *testing.T) { "LOGSTART 17 log2\nLOGPART log2\nLOGEND log2", }, }, - nil, }, { tokenizetest.TestCase{ @@ -365,64 +280,6 @@ func TestLineEndSplitFunc(t *testing.T) { Pattern: `^LOGEND.*$`, Input: []byte("LOGPART log1\nLOGPART log1\t \n"), }, - nil, - }, - { - tokenizetest.TestCase{ - Name: "LogsWithFlusher", - Pattern: `^LOGEND.*$`, - Input: []byte("LOGPART log1\nLOGPART log1\t \n"), - ExpectedTokens: []string{ - "LOGPART log1\nLOGPART log1", - }, - - AdditionalIterations: 1, - Sleep: sleepDuration, - }, - &FlusherConfig{Period: forcePeriod}, - }, - { - tokenizetest.TestCase{ - Name: "LogsWithFlusherWithMultipleLogsInBuffer", - Pattern: `^LOGEND.*$`, - Input: []byte("LOGPART log1\nLOGEND\nLOGPART log1\t \n"), - ExpectedTokens: []string{ - "LOGPART log1\nLOGEND", - "LOGPART log1", - }, - - AdditionalIterations: 1, - Sleep: sleepDuration, - }, - &FlusherConfig{Period: forcePeriod}, - }, - { - tokenizetest.TestCase{ - Name: "LogsWithLongFlusherWithMultipleLogsInBuffer", - Pattern: `^LOGEND.*$`, - Input: []byte("LOGPART log1\nLOGEND\nLOGPART log1\t \n"), - ExpectedTokens: []string{ - "LOGPART log1\nLOGEND", - }, - - AdditionalIterations: 1, - Sleep: forcePeriod / 4, - }, - &FlusherConfig{Period: 16 * forcePeriod}, - }, - { - tokenizetest.TestCase{ - Name: "LogsWithFlusherWithLogStartingWithWhiteChars", - Pattern: `LOGEND \d+$`, - Input: []byte("\nLOGEND 333"), - ExpectedTokens: []string{ - "LOGEND 333", - }, - - AdditionalIterations: 1, - Sleep: sleepDuration, - }, - &FlusherConfig{Period: forcePeriod}, }, } @@ -437,9 +294,6 @@ func TestLineEndSplitFunc(t *testing.T) { }.Func() splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, trimFunc) require.NoError(t, err) - if tc.Flusher != nil { - splitFunc = tc.Flusher.Wrap(splitFunc, trimFunc) - } t.Run(tc.Name, tc.Run(splitFunc)) } } @@ -452,7 +306,7 @@ func TestNewlineSplitFunc(t *testing.T) { ExpectedTokens: []string{ `my log`, }, - }, nil, + }, }, { tokenizetest.TestCase{Name: "OneLogCarriageReturn", @@ -461,7 +315,6 @@ func TestNewlineSplitFunc(t *testing.T) { `my log`, }, }, - nil, }, { tokenizetest.TestCase{Name: "TwoLogsSimple", @@ -471,7 +324,6 @@ func TestNewlineSplitFunc(t *testing.T) { `log2`, }, }, - nil, }, { tokenizetest.TestCase{Name: "TwoLogsCarriageReturn", @@ -481,13 +333,11 @@ func TestNewlineSplitFunc(t *testing.T) { `log2`, }, }, - nil, }, { tokenizetest.TestCase{Name: "NoTailingNewline", Input: []byte(`foo`), }, - nil, }, { tokenizetest.TestCase{Name: "HugeLog100", @@ -500,7 +350,6 @@ func TestNewlineSplitFunc(t *testing.T) { string(tokenizetest.GenerateBytes(100)), }, }, - nil, }, { tokenizetest.TestCase{Name: "HugeLog10000", @@ -513,7 +362,6 @@ func TestNewlineSplitFunc(t *testing.T) { string(tokenizetest.GenerateBytes(10000)), }, }, - nil, }, { tokenizetest.TestCase{Name: "HugeLog1000000", @@ -524,24 +372,11 @@ func TestNewlineSplitFunc(t *testing.T) { }(), ExpectedError: errors.New("bufio.Scanner: token too long"), }, - nil, }, { tokenizetest.TestCase{Name: "LogsWithoutFlusher", Input: []byte("LOGPART log1"), }, - nil, - }, - { - tokenizetest.TestCase{Name: "LogsWithFlusher", - Input: []byte("LOGPART log1"), - ExpectedTokens: []string{ - "LOGPART log1", - }, - AdditionalIterations: 1, - Sleep: sleepDuration, - }, - &FlusherConfig{Period: forcePeriod}, }, { tokenizetest.TestCase{Name: "DefaultFlusherSplits", @@ -551,7 +386,6 @@ func TestNewlineSplitFunc(t *testing.T) { "log2", }, }, - nil, }, { tokenizetest.TestCase{Name: "LogsWithLogStartingWithWhiteChars", @@ -561,7 +395,6 @@ func TestNewlineSplitFunc(t *testing.T) { "LOGEND 333", }, }, - nil, }, { tokenizetest.TestCase{Name: "PreserveLeadingWhitespaces", @@ -572,7 +405,6 @@ func TestNewlineSplitFunc(t *testing.T) { }, PreserveLeadingWhitespaces: true, }, - nil, }, { tokenizetest.TestCase{Name: "PreserveTrailingWhitespaces", @@ -583,7 +415,6 @@ func TestNewlineSplitFunc(t *testing.T) { }, PreserveTrailingWhitespaces: true, }, - nil, }, { tokenizetest.TestCase{Name: "PreserveBothLeadingAndTrailingWhitespaces", @@ -595,7 +426,6 @@ func TestNewlineSplitFunc(t *testing.T) { PreserveLeadingWhitespaces: true, PreserveTrailingWhitespaces: true, }, - nil, }, } @@ -606,9 +436,6 @@ func TestNewlineSplitFunc(t *testing.T) { }.Func() splitFunc, err := NewlineSplitFunc(unicode.UTF8, false, trimFunc) require.NoError(t, err) - if tc.Flusher != nil { - splitFunc = tc.Flusher.Wrap(splitFunc, trimFunc) - } t.Run(tc.Name, tc.Run(splitFunc)) } } diff --git a/pkg/stanza/tokenize/splitter.go b/pkg/stanza/tokenize/splitter.go index 3cae0b137ded..c2d8a7444344 100644 --- a/pkg/stanza/tokenize/splitter.go +++ b/pkg/stanza/tokenize/splitter.go @@ -13,7 +13,6 @@ import ( // SplitterConfig consolidates MultilineConfig and FlusherConfig type SplitterConfig struct { - Flusher FlusherConfig `mapstructure:",squash,omitempty"` Multiline MultilineConfig `mapstructure:"multiline,omitempty"` } @@ -21,15 +20,10 @@ type SplitterConfig struct { func NewSplitterConfig() SplitterConfig { return SplitterConfig{ Multiline: NewMultilineConfig(), - Flusher: FlusherConfig{Period: DefaultFlushPeriod}, } } // Build builds bufio.SplitFunc based on the config func (c *SplitterConfig) Build(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, trimFunc trim.Func) (bufio.SplitFunc, error) { - splitFunc, err := c.Multiline.Build(enc, flushAtEOF, maxLogSize, trimFunc) - if err != nil { - return nil, err - } - return c.Flusher.Wrap(splitFunc, trimFunc), nil + return c.Multiline.Build(enc, flushAtEOF, maxLogSize, trimFunc) }