From 9e5f20e929c181683756a84ee33844f5f6a4f7ec Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Mon, 28 Aug 2023 12:25:42 -0400 Subject: [PATCH] [pkg/stanza] Extract trim func from split package --- .chloggen/pkg-stanza-extract-trim-split.yaml | 30 +++ pkg/stanza/fileconsumer/config.go | 2 +- .../fileconsumer/internal/header/config.go | 10 +- .../fileconsumer/internal/splitter/custom.go | 6 +- .../internal/splitter/custom_test.go | 62 ++++++- .../internal/splitter/multiline.go | 9 +- .../internal/splitter/multiline_test.go | 26 ++- pkg/stanza/flush/flush.go | 10 +- pkg/stanza/flush/flush_test.go | 6 +- pkg/stanza/operator/input/tcp/tcp.go | 12 +- pkg/stanza/operator/input/udp/udp.go | 5 +- pkg/stanza/split/split.go | 40 ++-- pkg/stanza/split/split_test.go | 175 ++++++------------ pkg/stanza/split/splittest/splittest.go | 20 +- pkg/stanza/trim/trim.go | 28 ++- pkg/stanza/trim/trim_test.go | 88 +++++++-- 16 files changed, 328 insertions(+), 201 deletions(-) create mode 100755 .chloggen/pkg-stanza-extract-trim-split.yaml diff --git a/.chloggen/pkg-stanza-extract-trim-split.yaml b/.chloggen/pkg-stanza-extract-trim-split.yaml new file mode 100755 index 000000000000..651ae0e0b097 --- /dev/null +++ b/.chloggen/pkg-stanza-extract-trim-split.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Make trim func composable + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26536] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + - Adds trim.WithFunc to allow trim funcs to wrap bufio.SplitFuncs. + - Removes trim.Func from split.Config.Func. Use trim.WithFunc instead. + - Removes trim.Func from flush.WithPeriod. Use trim.WithFunc instead. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index c0158b4a5d2b..d5416d8da88a 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -119,7 +119,7 @@ func (c Config) BuildWithSplitFunc(logger *zap.SugaredLogger, emit emit.Callback } // Ensure that splitter is buildable - factory := splitter.NewCustomFactory(splitFunc, c.FlushPeriod) + factory := splitter.NewCustomFactory(splitFunc, c.TrimConfig.Func(), c.FlushPeriod) if _, err := factory.SplitFunc(); err != nil { return nil, err } diff --git a/pkg/stanza/fileconsumer/internal/header/config.go b/pkg/stanza/fileconsumer/internal/header/config.go index c28ba2ac103a..618d1b1fa714 100644 --- a/pkg/stanza/fileconsumer/internal/header/config.go +++ b/pkg/stanza/fileconsumer/internal/header/config.go @@ -16,6 +16,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) type Config struct { @@ -69,13 +70,16 @@ func NewConfig(matchRegex string, metadataOperators []operator.Config, enc encod return nil, fmt.Errorf("failed to compile `pattern`: %w", err) } - splitFunc, err := split.NewlineSplitFunc(enc, false, func(b []byte) []byte { - return bytes.Trim(b, "\r\n") - }) + splitFunc, err := split.NewlineSplitFunc(enc, false) if err != nil { return nil, fmt.Errorf("failed to create split func: %w", err) } + var trimFunc trim.Func = func(b []byte) []byte { + return bytes.Trim(b, "\r\n") + } + splitFunc = trim.WithFunc(splitFunc, trimFunc) + return &Config{ regex: regex, SplitFunc: splitFunc, diff --git a/pkg/stanza/fileconsumer/internal/splitter/custom.go b/pkg/stanza/fileconsumer/internal/splitter/custom.go index 52fe9125e627..6cb8afced95f 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/custom.go +++ b/pkg/stanza/fileconsumer/internal/splitter/custom.go @@ -13,19 +13,21 @@ import ( type customFactory struct { splitFunc bufio.SplitFunc + trimFunc trim.Func flushPeriod time.Duration } var _ Factory = (*customFactory)(nil) -func NewCustomFactory(splitFunc bufio.SplitFunc, flushPeriod time.Duration) Factory { +func NewCustomFactory(splitFunc bufio.SplitFunc, trimFunc trim.Func, flushPeriod time.Duration) Factory { return &customFactory{ splitFunc: splitFunc, + trimFunc: trimFunc, flushPeriod: flushPeriod, } } // SplitFunc builds a bufio.SplitFunc based on the configuration func (f *customFactory) SplitFunc() (bufio.SplitFunc, error) { - return flush.WithPeriod(f.splitFunc, trim.Nop, f.flushPeriod), nil + return trim.WithFunc(flush.WithPeriod(f.splitFunc, f.flushPeriod), f.trimFunc), nil } diff --git a/pkg/stanza/fileconsumer/internal/splitter/custom_test.go b/pkg/stanza/fileconsumer/internal/splitter/custom_test.go index a03d7e0f290b..1b65b08d0097 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/custom_test.go +++ b/pkg/stanza/fileconsumer/internal/splitter/custom_test.go @@ -9,10 +9,12 @@ import ( "time" "github.com/stretchr/testify/assert" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) func TestCustom(t *testing.T) { - factory := NewCustomFactory(bufio.ScanLines, 0) + factory := NewCustomFactory(bufio.ScanLines, trim.Nop, 0) splitFunc, err := factory.SplitFunc() assert.NoError(t, err) assert.NotNil(t, splitFunc) @@ -35,9 +37,33 @@ func TestCustom(t *testing.T) { assert.Nil(t, token) } +func TestCustomWithTrim(t *testing.T) { + factory := NewCustomFactory(bufio.ScanLines, trim.Whitespace, 0) + splitFunc, err := factory.SplitFunc() + assert.NoError(t, err) + assert.NotNil(t, splitFunc) + + input := []byte(" hello \n world \n extra ") + + advance, token, err := splitFunc(input, false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte("hello"), token) + + advance, token, err = splitFunc(input[8:], false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte("world"), token) + + advance, token, err = splitFunc(input[16:], false) + assert.NoError(t, err) + assert.Equal(t, 0, advance) + assert.Nil(t, token) +} + func TestCustomWithFlush(t *testing.T) { flushPeriod := 100 * time.Millisecond - factory := NewCustomFactory(bufio.ScanLines, flushPeriod) + factory := NewCustomFactory(bufio.ScanLines, trim.Nop, flushPeriod) splitFunc, err := factory.SplitFunc() assert.NoError(t, err) assert.NotNil(t, splitFunc) @@ -66,3 +92,35 @@ func TestCustomWithFlush(t *testing.T) { assert.Equal(t, 7, advance) assert.Equal(t, []byte(" extra "), token) } + +func TestCustomWithFlushTrim(t *testing.T) { + flushPeriod := 100 * time.Millisecond + factory := NewCustomFactory(bufio.ScanLines, trim.Whitespace, flushPeriod) + splitFunc, err := factory.SplitFunc() + assert.NoError(t, err) + assert.NotNil(t, splitFunc) + + input := []byte(" hello \n world \n extra ") + + advance, token, err := splitFunc(input, false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte("hello"), token) + + advance, token, err = splitFunc(input[8:], false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte("world"), token) + + advance, token, err = splitFunc(input[16:], false) + assert.NoError(t, err) + assert.Equal(t, 0, advance) + assert.Nil(t, token) + + time.Sleep(2 * flushPeriod) + + advance, token, err = splitFunc(input[16:], false) + assert.NoError(t, err) + assert.Equal(t, 7, advance) + assert.Equal(t, []byte("extra"), token) // Ensure trim applies to flushed token +} diff --git a/pkg/stanza/fileconsumer/internal/splitter/multiline.go b/pkg/stanza/fileconsumer/internal/splitter/multiline.go index ae770350ed80..917bf2aeddb7 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/multiline.go +++ b/pkg/stanza/fileconsumer/internal/splitter/multiline.go @@ -42,9 +42,14 @@ func NewSplitFuncFactory( // SplitFunc builds a bufio.SplitFunc based on the configuration func (f *splitFuncFactory) SplitFunc() (bufio.SplitFunc, error) { - splitFunc, err := f.splitConfig.Func(f.encoding, false, f.maxLogSize, f.trimFunc) + splitFunc, err := f.splitConfig.Func(f.encoding, false, f.maxLogSize) if err != nil { return nil, err } - return flush.WithPeriod(splitFunc, f.trimFunc, f.flushPeriod), nil + splitFunc = flush.WithPeriod(splitFunc, f.flushPeriod) + if f.encoding == encoding.Nop { + // Special case where we should never trim + return splitFunc, nil + } + return trim.WithFunc(splitFunc, f.trimFunc), nil } diff --git a/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go b/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go index d0207f964762..89b773ad802d 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go +++ b/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go @@ -49,6 +49,30 @@ func TestSplitFunc(t *testing.T) { assert.Nil(t, token) } +func TestSplitFuncWithTrim(t *testing.T) { + factory := NewSplitFuncFactory(split.Config{}, unicode.UTF8, 1024, trim.Whitespace, 0) + splitFunc, err := factory.SplitFunc() + assert.NoError(t, err) + assert.NotNil(t, splitFunc) + + input := []byte(" hello \n world \n extra ") + + advance, token, err := splitFunc(input, false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte("hello"), token) + + advance, token, err = splitFunc(input[8:], false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte("world"), token) + + advance, token, err = splitFunc(input[16:], false) + assert.NoError(t, err) + assert.Equal(t, 0, advance) + assert.Nil(t, token) +} + func TestSplitFuncWithFlush(t *testing.T) { flushPeriod := 100 * time.Millisecond factory := NewSplitFuncFactory(split.Config{}, unicode.UTF8, 1024, trim.Nop, flushPeriod) @@ -81,7 +105,7 @@ func TestSplitFuncWithFlush(t *testing.T) { assert.Equal(t, []byte(" extra "), token) } -func TestSplitFuncWithTrim(t *testing.T) { +func TestSplitFuncWithFlushTrim(t *testing.T) { flushPeriod := 100 * time.Millisecond factory := NewSplitFuncFactory(split.Config{}, unicode.UTF8, 1024, trim.Whitespace, flushPeriod) splitFunc, err := factory.SplitFunc() diff --git a/pkg/stanza/flush/flush.go b/pkg/stanza/flush/flush.go index f42e18c82370..4197f527972d 100644 --- a/pkg/stanza/flush/flush.go +++ b/pkg/stanza/flush/flush.go @@ -6,12 +6,10 @@ package flush // import "github.com/open-telemetry/opentelemetry-collector-contr import ( "bufio" "time" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) // Wrap a bufio.SplitFunc with a flusher -func WithPeriod(splitFunc bufio.SplitFunc, trimFunc trim.Func, period time.Duration) bufio.SplitFunc { +func WithPeriod(splitFunc bufio.SplitFunc, period time.Duration) bufio.SplitFunc { if period <= 0 { return splitFunc } @@ -20,7 +18,7 @@ func WithPeriod(splitFunc bufio.SplitFunc, trimFunc trim.Func, period time.Durat forcePeriod: period, previousDataLength: 0, } - return f.splitFunc(splitFunc, trimFunc) + return f.splitFunc(splitFunc) } // flusher keeps information about flush state @@ -61,7 +59,7 @@ func (f *flusher) shouldFlush() bool { return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0 } -func (f *flusher) splitFunc(splitFunc bufio.SplitFunc, trimFunc trim.Func) bufio.SplitFunc { +func (f *flusher) splitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { advance, token, err = splitFunc(data, atEOF) @@ -81,7 +79,7 @@ func (f *flusher) splitFunc(splitFunc bufio.SplitFunc, trimFunc trim.Func) bufio if f.shouldFlush() { // Inform flusher that we just flushed f.flushed() - token = trimFunc(data) + token = data advance = len(data) return } diff --git a/pkg/stanza/flush/flush_test.go b/pkg/stanza/flush/flush_test.go index 25d3aec0212b..140308274033 100644 --- a/pkg/stanza/flush/flush_test.go +++ b/pkg/stanza/flush/flush_test.go @@ -9,8 +9,6 @@ import ( "time" "github.com/stretchr/testify/assert" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) func TestFlusher(t *testing.T) { @@ -22,7 +20,7 @@ func TestFlusher(t *testing.T) { // always use atEOF=false. flushPeriod := 100 * time.Millisecond - f := WithPeriod(bufio.ScanWords, trim.Nop, flushPeriod) + f := WithPeriod(bufio.ScanWords, flushPeriod) content := []byte("foo bar hellowo") @@ -64,7 +62,7 @@ func TestNoFlushPeriod(t *testing.T) { // In other words, we should expect exactly the behavior of bufio.ScanWords. flushPeriod := time.Duration(0) - f := WithPeriod(bufio.ScanWords, trim.Nop, flushPeriod) + f := WithPeriod(bufio.ScanWords, flushPeriod) content := []byte("foo bar hellowo") diff --git a/pkg/stanza/operator/input/tcp/tcp.go b/pkg/stanza/operator/input/tcp/tcp.go index 2a618e7d1054..2dd553e77512 100644 --- a/pkg/stanza/operator/input/tcp/tcp.go +++ b/pkg/stanza/operator/input/tcp/tcp.go @@ -81,13 +81,8 @@ type BaseConfig struct { type SplitFuncBuilder func(enc encoding.Encoding) (bufio.SplitFunc, error) -func (c Config) defaultMultilineBuilder(enc encoding.Encoding) (bufio.SplitFunc, error) { - trimFunc := c.TrimConfig.Func() - splitFunc, err := c.SplitConfig.Func(enc, true, int(c.MaxLogSize), trimFunc) - if err != nil { - return nil, err - } - return splitFunc, nil +func (c Config) defaultSplitFuncBuilder(enc encoding.Encoding) (bufio.SplitFunc, error) { + return c.SplitConfig.Func(enc, true, int(c.MaxLogSize)) } // Build will build a tcp input operator. @@ -121,7 +116,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { } if c.SplitFuncBuilder == nil { - c.SplitFuncBuilder = c.defaultMultilineBuilder + c.SplitFuncBuilder = c.defaultSplitFuncBuilder } // Build split func @@ -129,6 +124,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { if err != nil { return nil, err } + splitFunc = trim.WithFunc(splitFunc, c.TrimConfig.Func()) var resolver *helper.IPResolver if c.AddAttributes { diff --git a/pkg/stanza/operator/input/udp/udp.go b/pkg/stanza/operator/input/udp/udp.go index 0bf1c78f76fe..e45ee9b8d498 100644 --- a/pkg/stanza/operator/input/udp/udp.go +++ b/pkg/stanza/operator/input/udp/udp.go @@ -89,11 +89,12 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { return nil, err } - // Build SplitFunc - splitFunc, err := c.SplitConfig.Func(enc, true, MaxUDPSize, c.TrimConfig.Func()) + // Build split func + splitFunc, err := c.SplitConfig.Func(enc, true, MaxUDPSize) if err != nil { return nil, err } + splitFunc = trim.WithFunc(splitFunc, c.TrimConfig.Func()) var resolver *helper.IPResolver if c.AddAttributes { diff --git a/pkg/stanza/split/split.go b/pkg/stanza/split/split.go index 6e0282f52d28..fe449141da36 100644 --- a/pkg/stanza/split/split.go +++ b/pkg/stanza/split/split.go @@ -10,8 +10,6 @@ import ( "regexp" "golang.org/x/text/encoding" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) // Config is the configuration for a split func @@ -21,7 +19,7 @@ type Config struct { } // Func will return a bufio.SplitFunc based on the config -func (c Config) Func(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, trimFunc trim.Func) (bufio.SplitFunc, error) { +func (c Config) Func(enc encoding.Encoding, flushAtEOF bool, maxLogSize int) (bufio.SplitFunc, error) { endPattern := c.LineEndPattern startPattern := c.LineStartPattern @@ -38,7 +36,7 @@ func (c Config) Func(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, tri case enc == encoding.Nop: return NoSplitFunc(maxLogSize), nil case endPattern == "" && startPattern == "": - splitFunc, err = NewlineSplitFunc(enc, flushAtEOF, trimFunc) + splitFunc, err = NewlineSplitFunc(enc, flushAtEOF) if err != nil { return nil, err } @@ -47,26 +45,26 @@ func (c Config) Func(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, tri if err != nil { return nil, fmt.Errorf("compile line end regex: %w", err) } - splitFunc = LineEndSplitFunc(re, flushAtEOF, trimFunc) + splitFunc = LineEndSplitFunc(re, flushAtEOF) case startPattern != "": re, err := regexp.Compile("(?m)" + c.LineStartPattern) if err != nil { return nil, fmt.Errorf("compile line start regex: %w", err) } - splitFunc = LineStartSplitFunc(re, flushAtEOF, trimFunc) + splitFunc = LineStartSplitFunc(re, flushAtEOF) } return splitFunc, nil } // LineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that start with a match to the regex pattern provided -func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) bufio.SplitFunc { +func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { firstLoc := re.FindIndex(data) if firstLoc == nil { // Flush if no more data is expected if len(data) != 0 && atEOF && flushAtEOF { - token = trimFunc(data) + token = data advance = len(data) return } @@ -78,7 +76,7 @@ func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) if firstMatchStart != 0 { // the beginning of the file does not match the start pattern, so return a token up to the first match so we don't lose data advance = firstMatchStart - token = trimFunc(data[0:firstMatchStart]) + token = data[0:firstMatchStart] // return if non-matching pattern is not only whitespaces if token != nil { @@ -93,7 +91,7 @@ func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) // Flush if no more data is expected if atEOF && flushAtEOF { - token = trimFunc(data) + token = data advance = len(data) return } @@ -105,8 +103,8 @@ func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) } secondMatchStart := secondLoc[0] + secondLocOfset - advance = secondMatchStart // start scanning at the beginning of the second match - token = trimFunc(data[firstMatchStart:secondMatchStart]) // the token begins at the first match, and ends at the beginning of the second match + advance = secondMatchStart // start scanning at the beginning of the second match + token = data[firstMatchStart:secondMatchStart] // the token begins at the first match, and ends at the beginning of the second match err = nil return } @@ -114,13 +112,13 @@ func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) // LineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that end with a match to the regex pattern provided -func LineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) bufio.SplitFunc { +func LineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { loc := re.FindIndex(data) if loc == nil { // Flush if no more data is expected if len(data) != 0 && atEOF && flushAtEOF { - token = trimFunc(data) + token = data advance = len(data) return } @@ -134,7 +132,7 @@ func LineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) bu } advance = loc[1] - token = trimFunc(data[:loc[1]]) + token = data[:loc[1]] err = nil return } @@ -142,7 +140,7 @@ func LineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trim.Func) bu // NewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but // never returning an token using EOF as a terminator -func NewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool, trimFunc trim.Func) (bufio.SplitFunc, error) { +func NewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool) (bufio.SplitFunc, error) { newline, err := encodedNewline(enc) if err != nil { return nil, err @@ -158,15 +156,19 @@ func NewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool, trimFunc trim.Func return 0, nil, nil } - if i := bytes.Index(data, newline); i >= 0 { + i := bytes.Index(data, newline) + if i == 0 { + return len(newline), []byte{}, nil + } + if i >= 0 { // We have a full newline-terminated line. token = bytes.TrimSuffix(data[:i], carriageReturn) - return i + len(newline), trimFunc(token), nil + return i + len(newline), token, nil } // Flush if no more data is expected if atEOF && flushAtEOF { - token = trimFunc(data) + token = data advance = len(data) return } diff --git a/pkg/stanza/split/split_test.go b/pkg/stanza/split/split_test.go index 372768553854..ace7112896d8 100644 --- a/pkg/stanza/split/split_test.go +++ b/pkg/stanza/split/split_test.go @@ -24,19 +24,14 @@ func TestConfigFunc(t *testing.T) { maxLogSize := 100 t.Run("BothStartAndEnd", func(t *testing.T) { - cfg := &Config{ - LineStartPattern: "foo", - LineEndPattern: "bar", - } - - _, err := cfg.Func(unicode.UTF8, false, maxLogSize, trim.Nop) + cfg := &Config{LineStartPattern: "foo", LineEndPattern: "bar"} + _, err := cfg.Func(unicode.UTF8, false, maxLogSize) assert.EqualError(t, err, "only one of line_start_pattern or line_end_pattern can be set") }) t.Run("NopEncoding", func(t *testing.T) { cfg := &Config{} - - f, err := cfg.Func(encoding.Nop, false, maxLogSize, trim.Nop) + f, err := cfg.Func(encoding.Nop, false, maxLogSize) assert.NoError(t, err) raw := splittest.GenerateBytes(maxLogSize * 2) @@ -48,8 +43,7 @@ func TestConfigFunc(t *testing.T) { t.Run("Newline", func(t *testing.T) { cfg := &Config{} - - f, err := cfg.Func(unicode.UTF8, false, maxLogSize, trim.Nop) + f, err := cfg.Func(unicode.UTF8, false, maxLogSize) assert.NoError(t, err) advance, token, err := f([]byte("foo\nbar\nbaz\n"), false) @@ -59,20 +53,14 @@ func TestConfigFunc(t *testing.T) { }) t.Run("InvalidStartRegex", func(t *testing.T) { - cfg := &Config{ - LineStartPattern: "[", - } - - _, err := cfg.Func(unicode.UTF8, false, maxLogSize, trim.Nop) + cfg := &Config{LineStartPattern: "["} + _, err := cfg.Func(unicode.UTF8, false, maxLogSize) assert.EqualError(t, err, "compile line start regex: error parsing regexp: missing closing ]: `[`") }) t.Run("InvalidEndRegex", func(t *testing.T) { - cfg := &Config{ - LineEndPattern: "[", - } - - _, err := cfg.Func(unicode.UTF8, false, maxLogSize, trim.Nop) + cfg := &Config{LineEndPattern: "["} + _, err := cfg.Func(unicode.UTF8, false, maxLogSize) assert.EqualError(t, err, "compile line end regex: error parsing regexp: missing closing ]: `[`") }) } @@ -92,8 +80,8 @@ func TestLineStartSplitFunc(t *testing.T) { Pattern: `LOGSTART \d+ `, Input: []byte(`LOGSTART 123 log1 LOGSTART 234 log2 LOGSTART 345 foo`), ExpectedTokens: []string{ - `LOGSTART 123 log1`, - `LOGSTART 234 log2`, + `LOGSTART 123 log1 `, + `LOGSTART 234 log2 `, }, }, { @@ -101,8 +89,8 @@ func TestLineStartSplitFunc(t *testing.T) { Pattern: `^LOGSTART \d+ `, Input: []byte("LOGSTART 123 LOGSTART 345 log1\nLOGSTART 234 log2\nLOGSTART 345 foo"), ExpectedTokens: []string{ - "LOGSTART 123 LOGSTART 345 log1", - "LOGSTART 234 log2", + "LOGSTART 123 LOGSTART 345 log1\n", + "LOGSTART 234 log2\n", }, }, { @@ -115,7 +103,7 @@ func TestLineStartSplitFunc(t *testing.T) { Pattern: `LOGSTART \d+ `, Input: []byte(`part that doesn't match LOGSTART 123 part that matchesLOGSTART 123 foo`), ExpectedTokens: []string{ - `part that doesn't match`, + `part that doesn't match `, `LOGSTART 123 part that matches`, }, }, @@ -161,30 +149,26 @@ func TestLineStartSplitFunc(t *testing.T) { Pattern: `^LOGSTART \d+`, Input: []byte("LOGSTART 12 log1\t \nLOGPART log1\nLOGPART log1\t \nLOGSTART 17 log2\nLOGPART log2\nanother line\nLOGSTART 43 log5"), ExpectedTokens: []string{ - "LOGSTART 12 log1\t \nLOGPART log1\nLOGPART log1", - "LOGSTART 17 log2\nLOGPART log2\nanother line", + "LOGSTART 12 log1\t \nLOGPART log1\nLOGPART log1\t \n", + "LOGSTART 17 log2\nLOGPART log2\nanother line\n", }, }, { - Name: "LogsWithoutFlusher", + Name: "NoMatch", Pattern: `^LOGSTART \d+`, Input: []byte("LOGPART log1\nLOGPART log1\t \n"), }, } for _, tc := range testCases { - cfg := Config{LineStartPattern: tc.Pattern} - trimFunc := trim.Config{ - PreserveLeading: tc.PreserveLeadingWhitespaces, - PreserveTrailing: tc.PreserveTrailingWhitespaces, - }.Func() - splitFunc, err := cfg.Func(unicode.UTF8, false, 0, trimFunc) + cfg := &Config{LineStartPattern: tc.Pattern} + splitFunc, err := cfg.Func(unicode.UTF8, false, 0) require.NoError(t, err) t.Run(tc.Name, tc.Run(splitFunc)) } t.Run("FirstMatchHitsEndOfBuffer", func(t *testing.T) { - splitFunc := LineStartSplitFunc(regexp.MustCompile("LOGSTART"), false, trim.Nop) + splitFunc := LineStartSplitFunc(regexp.MustCompile("LOGSTART"), false) data := []byte(`LOGSTART`) t.Run("NotAtEOF", func(t *testing.T) { @@ -221,21 +205,21 @@ func TestLineStartSplitFunc(t *testing.T) { }, } for _, tc := range flushAtEOFCases { - cfg := &Config{ - LineStartPattern: `^LOGSTART \d+`, - } - splitFunc, err := cfg.Func(unicode.UTF8, true, 0, trim.Nop) + cfg := &Config{LineStartPattern: `^LOGSTART \d+`} + splitFunc, err := cfg.Func(unicode.UTF8, true, 0) require.NoError(t, err) t.Run(tc.Name, tc.Run(splitFunc)) } }) + // TODO move to internal/splitter? t.Run("ApplyTrimFunc", func(t *testing.T) { - cfg := Config{LineStartPattern: ` LOGSTART \d+ `} + cfg := &Config{LineStartPattern: ` LOGSTART \d+ `} input := []byte(" LOGSTART 123 log1 LOGSTART 234 log2 LOGSTART 345 foo") - - splitTrimLeading, err := cfg.Func(unicode.UTF8, false, 0, trim.Leading) + splitFunc, err := cfg.Func(unicode.UTF8, false, 0) require.NoError(t, err) + + splitTrimLeading := trim.WithFunc(splitFunc, trim.Leading) t.Run("TrimLeading", splittest.TestCase{ Input: input, ExpectedTokens: []string{ @@ -243,8 +227,7 @@ func TestLineStartSplitFunc(t *testing.T) { `LOGSTART 234 log2 `, }}.Run(splitTrimLeading)) - splitTrimTrailing, err := cfg.Func(unicode.UTF8, false, 0, trim.Trailing) - require.NoError(t, err) + splitTrimTrailing := trim.WithFunc(splitFunc, trim.Trailing) t.Run("TrimTrailing", splittest.TestCase{ Input: input, ExpectedTokens: []string{ @@ -252,8 +235,7 @@ func TestLineStartSplitFunc(t *testing.T) { ` LOGSTART 234 log2`, }}.Run(splitTrimTrailing)) - splitTrimBoth, err := cfg.Func(unicode.UTF8, false, 0, trim.Whitespace) - require.NoError(t, err) + splitTrimBoth := trim.WithFunc(splitFunc, trim.Whitespace) t.Run("TrimBoth", splittest.TestCase{ Input: input, ExpectedTokens: []string{ @@ -288,7 +270,7 @@ func TestLineEndSplitFunc(t *testing.T) { Input: []byte("log1 LOGEND LOGEND\nlog2 LOGEND\n"), ExpectedTokens: []string{ "log1 LOGEND LOGEND", - "log2 LOGEND", + "\nlog2 LOGEND", }, }, { @@ -339,38 +321,31 @@ func TestLineEndSplitFunc(t *testing.T) { ExpectedError: errors.New("bufio.Scanner: token too long"), }, { - Name: "MultipleMultilineLogs", + Name: "MultiplesplitLogs", Pattern: `^LOGEND.*$`, Input: []byte("LOGSTART 12 log1\t \nLOGPART log1\nLOGEND log1\t \nLOGSTART 17 log2\nLOGPART log2\nLOGEND log2\nLOGSTART 43 log5"), ExpectedTokens: []string{ - "LOGSTART 12 log1\t \nLOGPART log1\nLOGEND log1", - "LOGSTART 17 log2\nLOGPART log2\nLOGEND log2", + "LOGSTART 12 log1\t \nLOGPART log1\nLOGEND log1\t ", + "\nLOGSTART 17 log2\nLOGPART log2\nLOGEND log2", }, }, { - Name: "LogsWithoutFlusher", + Name: "NoMatch", Pattern: `^LOGEND.*$`, Input: []byte("LOGPART log1\nLOGPART log1\t \n"), }, } for _, tc := range testCases { - cfg := Config{LineEndPattern: tc.Pattern} - - trimFunc := trim.Config{ - PreserveLeading: tc.PreserveLeadingWhitespaces, - PreserveTrailing: tc.PreserveTrailingWhitespaces, - }.Func() - splitFunc, err := cfg.Func(unicode.UTF8, false, 0, trimFunc) + cfg := &Config{LineEndPattern: tc.Pattern} + splitFunc, err := cfg.Func(unicode.UTF8, false, 0) require.NoError(t, err) t.Run(tc.Name, tc.Run(splitFunc)) } t.Run("FlushAtEOF", func(t *testing.T) { - cfg := &Config{ - LineEndPattern: `^LOGSTART \d+`, - } - splitFunc, err := cfg.Func(unicode.UTF8, true, 0, trim.Nop) + cfg := &Config{LineEndPattern: `^LOGSTART \d+`} + splitFunc, err := cfg.Func(unicode.UTF8, true, 0) require.NoError(t, err) splittest.TestCase{ Name: "NoMatch", @@ -379,12 +354,14 @@ func TestLineEndSplitFunc(t *testing.T) { }.Run(splitFunc)(t) }) + // TODO move to internal/splitter? t.Run("ApplyTrimFunc", func(t *testing.T) { - cfg := Config{LineEndPattern: ` LOGEND `} + cfg := &Config{LineEndPattern: ` LOGEND `} input := []byte(" log1 LOGEND log2 LOGEND ") - - splitTrimLeading, err := cfg.Func(unicode.UTF8, false, 0, trim.Leading) + splitFunc, err := cfg.Func(unicode.UTF8, false, 0) require.NoError(t, err) + + splitTrimLeading := trim.WithFunc(splitFunc, trim.Leading) t.Run("TrimLeading", splittest.TestCase{ Input: input, ExpectedTokens: []string{ @@ -392,8 +369,7 @@ func TestLineEndSplitFunc(t *testing.T) { `log2 LOGEND `, }}.Run(splitTrimLeading)) - splitTrimTrailing, err := cfg.Func(unicode.UTF8, false, 0, trim.Trailing) - require.NoError(t, err) + splitTrimTrailing := trim.WithFunc(splitFunc, trim.Trailing) t.Run("TrimTrailing", splittest.TestCase{ Input: input, ExpectedTokens: []string{ @@ -401,8 +377,7 @@ func TestLineEndSplitFunc(t *testing.T) { ` log2 LOGEND`, }}.Run(splitTrimTrailing)) - splitTrimBoth, err := cfg.Func(unicode.UTF8, false, 0, trim.Whitespace) - require.NoError(t, err) + splitTrimBoth := trim.WithFunc(splitFunc, trim.Whitespace) t.Run("TrimBoth", splittest.TestCase{ Input: input, ExpectedTokens: []string{ @@ -484,7 +459,7 @@ func TestNewlineSplitFunc(t *testing.T) { Input: []byte("LOGPART log1"), }, { - Name: "DefaultFlusherSplits", + Name: "DefaultSplits", Input: []byte("log1\nlog2\n"), ExpectedTokens: []string{ "log1", @@ -499,48 +474,16 @@ func TestNewlineSplitFunc(t *testing.T) { "LOGEND 333", }, }, - { - Name: "PreserveLeadingWhitespaces", - Input: []byte("\n LOGEND 333 \nAnother one "), - ExpectedTokens: []string{ - "", - " LOGEND 333", - }, - PreserveLeadingWhitespaces: true, - }, - { - Name: "PreserveTrailingWhitespaces", - Input: []byte("\n LOGEND 333 \nAnother one "), - ExpectedTokens: []string{ - "", - "LOGEND 333 ", - }, - PreserveTrailingWhitespaces: true, - }, - { - Name: "PreserveBothLeadingAndTrailingWhitespaces", - Input: []byte("\n LOGEND 333 \nAnother one "), - ExpectedTokens: []string{ - "", - " LOGEND 333 ", - }, - PreserveLeadingWhitespaces: true, - PreserveTrailingWhitespaces: true, - }, } for _, tc := range testCases { - trimFunc := trim.Config{ - PreserveLeading: tc.PreserveLeadingWhitespaces, - PreserveTrailing: tc.PreserveTrailingWhitespaces, - }.Func() - splitFunc, err := NewlineSplitFunc(unicode.UTF8, false, trimFunc) + splitFunc, err := NewlineSplitFunc(unicode.UTF8, false) require.NoError(t, err) t.Run(tc.Name, tc.Run(splitFunc)) } t.Run("FlushAtEOF", func(t *testing.T) { - splitFunc, err := Config{}.Func(unicode.UTF8, true, 0, trim.Nop) + splitFunc, err := Config{}.Func(unicode.UTF8, true, 0) require.NoError(t, err) splittest.TestCase{ Name: "FlushAtEOF", @@ -549,12 +492,14 @@ func TestNewlineSplitFunc(t *testing.T) { }.Run(splitFunc)(t) }) + // // TODO move to internal/splitter? t.Run("ApplyTrimFunc", func(t *testing.T) { cfg := &Config{} input := []byte(" log1 \n log2 \n") - - splitTrimLeading, err := cfg.Func(unicode.UTF8, false, 0, trim.Leading) + splitFunc, err := cfg.Func(unicode.UTF8, false, 0) require.NoError(t, err) + + splitTrimLeading := trim.WithFunc(splitFunc, trim.Leading) t.Run("TrimLeading", splittest.TestCase{ Input: input, ExpectedTokens: []string{ @@ -562,8 +507,7 @@ func TestNewlineSplitFunc(t *testing.T) { `log2 `, }}.Run(splitTrimLeading)) - splitTrimTrailing, err := cfg.Func(unicode.UTF8, false, 0, trim.Trailing) - require.NoError(t, err) + splitTrimTrailing := trim.WithFunc(splitFunc, trim.Trailing) t.Run("TrimTrailing", splittest.TestCase{ Input: input, ExpectedTokens: []string{ @@ -571,8 +515,9 @@ func TestNewlineSplitFunc(t *testing.T) { ` log2`, }}.Run(splitTrimTrailing)) - splitTrimBoth, err := cfg.Func(unicode.UTF8, false, 0, trim.Whitespace) + splitTrimBoth, err := cfg.Func(unicode.UTF8, false, 0) require.NoError(t, err) + splitTrimBoth = trim.WithFunc(splitTrimBoth, trim.Whitespace) t.Run("TrimBoth", splittest.TestCase{ Input: input, ExpectedTokens: []string{ @@ -644,14 +589,12 @@ func TestNoSplitFunc(t *testing.T) { } func TestNoopEncodingError(t *testing.T) { - endCfg := Config{LineEndPattern: "\n"} - - _, err := endCfg.Func(encoding.Nop, false, 0, trim.Nop) + endCfg := &Config{LineEndPattern: "\n"} + _, err := endCfg.Func(encoding.Nop, false, 0) require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding")) - startCfg := Config{LineStartPattern: "\n"} - - _, err = startCfg.Func(encoding.Nop, false, 0, trim.Nop) + startCfg := &Config{LineStartPattern: "\n"} + _, err = startCfg.Func(encoding.Nop, false, 0) require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding")) } @@ -712,7 +655,7 @@ func TestNewlineSplitFunc_Encodings(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - splitFunc, err := NewlineSplitFunc(tc.encoding, false, trim.Nop) + splitFunc, err := NewlineSplitFunc(tc.encoding, false) require.NoError(t, err) scanner := bufio.NewScanner(bytes.NewReader(tc.input)) scanner.Split(splitFunc) diff --git a/pkg/stanza/split/splittest/splittest.go b/pkg/stanza/split/splittest/splittest.go index 1ea3ebc2c3ca..b784b1b68dec 100644 --- a/pkg/stanza/split/splittest/splittest.go +++ b/pkg/stanza/split/splittest/splittest.go @@ -71,18 +71,16 @@ func (r *testReader) splitFunc(split bufio.SplitFunc) bufio.SplitFunc { } type TestCase struct { - Name string - Pattern string - Input []byte - ExpectedTokens []string - ExpectedError error - Sleep time.Duration - AdditionalIterations int - PreserveLeadingWhitespaces bool - PreserveTrailingWhitespaces bool + Name string + Pattern string + Input []byte + ExpectedTokens []string + ExpectedError error + Sleep time.Duration + AdditionalIterations int } -func (tc TestCase) Run(split bufio.SplitFunc) func(t *testing.T) { +func (tc TestCase) Run(splitFunc bufio.SplitFunc) func(t *testing.T) { reader := newTestReader(tc.Input) return func(t *testing.T) { @@ -94,7 +92,7 @@ func (tc TestCase) Run(split bufio.SplitFunc) func(t *testing.T) { } reader.Reset() scanner := bufio.NewScanner(reader) - scanner.Split(reader.splitFunc(split)) + scanner.Split(reader.splitFunc(splitFunc)) for { ok := scanner.Scan() if !ok { diff --git a/pkg/stanza/trim/trim.go b/pkg/stanza/trim/trim.go index 90118dfc6543..20e48e3fe3da 100644 --- a/pkg/stanza/trim/trim.go +++ b/pkg/stanza/trim/trim.go @@ -4,11 +4,22 @@ package trim // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" import ( + "bufio" "bytes" ) type Func func([]byte) []byte +func WithFunc(splitFunc bufio.SplitFunc, trimFunc Func) bufio.SplitFunc { + if trimFunc == nil { + return splitFunc + } + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + advance, token, err = splitFunc(data, atEOF) + return advance, trimFunc(token), err + } +} + type Config struct { PreserveLeading bool `mapstructure:"preserve_leading_whitespaces,omitempty"` PreserveTrailing bool `mapstructure:"preserve_trailing_whitespaces,omitempty"` @@ -27,27 +38,24 @@ func (c Config) Func() Func { return Whitespace } -func Nop(token []byte) []byte { +var Nop Func = func(token []byte) []byte { return token } -func Leading(data []byte) []byte { - // TrimLeft to strip EOF whitespaces in case of using $ in regex - // For some reason newline and carriage return are being moved to beginning of next log +var Leading Func = func(data []byte) []byte { token := bytes.TrimLeft(data, "\r\n\t ") - - // TrimLeft will return nil if data is an empty slice if token == nil { - return []byte{} + // TrimLeft sometimes overwrites something with nothing. + // We need to override this behavior in order to preserve empty tokens. + return data } return token } -func Trailing(data []byte) []byte { - // TrimRight to strip all whitespaces from the end of log +var Trailing Func = func(data []byte) []byte { return bytes.TrimRight(data, "\r\n\t ") } -func Whitespace(data []byte) []byte { +var Whitespace Func = func(data []byte) []byte { return Leading(Trailing(data)) } diff --git a/pkg/stanza/trim/trim_test.go b/pkg/stanza/trim/trim_test.go index 114a645853ee..5db6cf44f77f 100644 --- a/pkg/stanza/trim/trim_test.go +++ b/pkg/stanza/trim/trim_test.go @@ -4,6 +4,7 @@ package trim import ( + "bufio" "testing" "github.com/stretchr/testify/assert" @@ -15,36 +16,50 @@ func TestTrim(t *testing.T) { name string preserveLeading bool preserveTrailing bool - input string - expect string + input []byte + expect []byte }{ { name: "preserve both", preserveLeading: true, preserveTrailing: true, - input: " hello world ", - expect: " hello world ", + input: []byte(" hello world "), + expect: []byte(" hello world "), }, { name: "preserve leading", preserveLeading: true, preserveTrailing: false, - input: " hello world ", - expect: " hello world", + input: []byte(" hello world "), + expect: []byte(" hello world"), }, { name: "preserve trailing", preserveLeading: false, preserveTrailing: true, - input: " hello world ", - expect: "hello world ", + input: []byte(" hello world "), + expect: []byte("hello world "), }, { name: "preserve neither", preserveLeading: false, preserveTrailing: false, - input: " hello world ", - expect: "hello world", + input: []byte(" hello world "), + expect: []byte("hello world"), + }, + { + name: "trim leading returns nil when given nil", + preserveLeading: false, + preserveTrailing: true, + input: nil, + expect: nil, + }, + { + name: "trim leading returns []byte when given []byte", + preserveLeading: false, + preserveTrailing: true, + input: []byte{}, + expect: []byte{}, }, } @@ -54,10 +69,55 @@ func TestTrim(t *testing.T) { PreserveLeading: tc.preserveLeading, PreserveTrailing: tc.preserveTrailing, }.Func() - assert.Equal(t, []byte(tc.expect), trimFunc([]byte(tc.input))) - - // Also test that regardless of configuration, an empty []byte in gives an empty []byte out - assert.Equal(t, []byte{}, trimFunc([]byte{})) + assert.Equal(t, tc.expect, trimFunc(tc.input)) }) } } + +func TestWithFunc(t *testing.T) { + scanAndTrimLines := WithFunc(bufio.ScanLines, Config{ + PreserveLeading: false, + PreserveTrailing: false, + }.Func()) + + input := []byte(" hello \n world \n extra ") + + advance, token, err := scanAndTrimLines(input, false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte("hello"), token) + + advance, token, err = scanAndTrimLines(input[8:], false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte("world"), token) + + advance, token, err = scanAndTrimLines(input[16:], false) + assert.NoError(t, err) + assert.Equal(t, 0, advance) + assert.Nil(t, token) +} + +func TestWithNilTrimFunc(t *testing.T) { + // Same test as above, but pass nil instead of a trim func + // In other words, we should expect exactly the behavior of bufio.ScanLines. + + scanLines := WithFunc(bufio.ScanLines, nil) + + input := []byte(" hello \n world \n extra ") + + advance, token, err := scanLines(input, false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte(" hello "), token) + + advance, token, err = scanLines(input[8:], false) + assert.NoError(t, err) + assert.Equal(t, 8, advance) + assert.Equal(t, []byte(" world "), token) + + advance, token, err = scanLines(input[16:], false) + assert.NoError(t, err) + assert.Equal(t, 0, advance) + assert.Nil(t, token) +}