diff --git a/.chloggen/pkg-stanza-encoding.yaml b/.chloggen/pkg-stanza-encoding.yaml index 3013e8af9a1c..3705335e22b3 100755 --- a/.chloggen/pkg-stanza-encoding.yaml +++ b/.chloggen/pkg-stanza-encoding.yaml @@ -7,7 +7,7 @@ change_type: deprecation component: pkg/stanza # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Deprecate encoding related elements of helper pacakge, in favor of new decode package +note: Deprecate encoding related elements of helper pacakge, in favor of new decoder package # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. issues: [26019] diff --git a/.chloggen/pkg-stanza-tokenize.yaml b/.chloggen/pkg-stanza-tokenize.yaml new file mode 100755 index 000000000000..85e0eb077402 --- /dev/null +++ b/.chloggen/pkg-stanza-tokenize.yaml @@ -0,0 +1,40 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecate tokenization related elements of helper pacakge, in favor of new tokenize package + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [25914] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Includes the following deprecations | + - Flusher + - FlusherConfig + - NewFlusherConfig + - Multiline + - MultilineConfig + - NewMultilineConfig + - NewLineStartSplitFunc + - NewLineEndSplitFunc + - NewNewlineSplitFunc + - Splitter + - SplitterConfig + - NewSplitterConfig + - SplitNone + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index f310bff35751..2b29d9ddbe74 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -20,6 +20,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" ) const ( @@ -49,7 +50,7 @@ func NewConfig() *Config { IncludeFileNameResolved: false, IncludeFilePathResolved: false, PollInterval: 200 * time.Millisecond, - Splitter: helper.NewSplitterConfig(), + Splitter: tokenize.NewSplitterConfig(), StartAt: "end", FingerprintSize: fingerprint.DefaultSize, MaxLogSize: defaultMaxLogSize, @@ -61,19 +62,19 @@ func NewConfig() *Config { // Config is the configuration of a file input operator type Config struct { matcher.Criteria `mapstructure:",squash"` - IncludeFileName bool `mapstructure:"include_file_name,omitempty"` - IncludeFilePath bool `mapstructure:"include_file_path,omitempty"` - IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty"` - IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty"` - PollInterval time.Duration `mapstructure:"poll_interval,omitempty"` - StartAt string `mapstructure:"start_at,omitempty"` - FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty"` - MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"` - MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty"` - MaxBatches int `mapstructure:"max_batches,omitempty"` - DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"` - Splitter helper.SplitterConfig `mapstructure:",squash,omitempty"` - Header *HeaderConfig `mapstructure:"header,omitempty"` + IncludeFileName bool `mapstructure:"include_file_name,omitempty"` + IncludeFilePath bool `mapstructure:"include_file_path,omitempty"` + IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty"` + IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty"` + PollInterval time.Duration `mapstructure:"poll_interval,omitempty"` + StartAt string `mapstructure:"start_at,omitempty"` + FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty"` + MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"` + MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty"` + MaxBatches int `mapstructure:"max_batches,omitempty"` + DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"` + Splitter tokenize.SplitterConfig `mapstructure:",squash,omitempty"` + Header *HeaderConfig `mapstructure:"header,omitempty"` } type HeaderConfig struct { diff --git a/pkg/stanza/fileconsumer/config_test.go b/pkg/stanza/fileconsumer/config_test.go index adcaf664928c..bcfb9ea91b90 100644 --- a/pkg/stanza/fileconsumer/config_test.go +++ b/pkg/stanza/fileconsumer/config_test.go @@ -17,6 +17,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" ) func TestUnmarshal(t *testing.T) { @@ -279,7 +280,7 @@ func TestUnmarshal(t *testing.T) { Name: "multiline_line_start_string", Expect: func() *mockOperatorConfig { cfg := NewConfig() - newSplit := helper.NewSplitterConfig() + newSplit := tokenize.NewSplitterConfig() newSplit.Multiline.LineStartPattern = "Start" cfg.Splitter = newSplit return newMockOperatorConfig(cfg) @@ -289,7 +290,7 @@ func TestUnmarshal(t *testing.T) { Name: "multiline_line_start_special", Expect: func() *mockOperatorConfig { cfg := NewConfig() - newSplit := helper.NewSplitterConfig() + newSplit := tokenize.NewSplitterConfig() newSplit.Multiline.LineStartPattern = "%" cfg.Splitter = newSplit return newMockOperatorConfig(cfg) @@ -299,7 +300,7 @@ func TestUnmarshal(t *testing.T) { Name: "multiline_line_end_string", Expect: func() *mockOperatorConfig { cfg := NewConfig() - newSplit := helper.NewSplitterConfig() + newSplit := tokenize.NewSplitterConfig() newSplit.Multiline.LineEndPattern = "Start" cfg.Splitter = newSplit return newMockOperatorConfig(cfg) @@ -309,7 +310,7 @@ func TestUnmarshal(t *testing.T) { Name: "multiline_line_end_special", Expect: func() *mockOperatorConfig { cfg := NewConfig() - newSplit := helper.NewSplitterConfig() + newSplit := tokenize.NewSplitterConfig() newSplit.Multiline.LineEndPattern = "%" cfg.Splitter = newSplit return newMockOperatorConfig(cfg) @@ -451,8 +452,8 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredStartAndEndPatterns", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ + f.Splitter = tokenize.NewSplitterConfig() + f.Splitter.Multiline = tokenize.MultilineConfig{ LineEndPattern: "Exists", LineStartPattern: "Exists", } @@ -463,8 +464,8 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredStartPattern", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ + f.Splitter = tokenize.NewSplitterConfig() + f.Splitter.Multiline = tokenize.MultilineConfig{ LineStartPattern: "START.*", } }, @@ -474,8 +475,8 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredEndPattern", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ + f.Splitter = tokenize.NewSplitterConfig() + f.Splitter.Multiline = tokenize.MultilineConfig{ LineEndPattern: "END.*", } }, @@ -493,8 +494,8 @@ func TestBuild(t *testing.T) { { "LineStartAndEnd", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ + f.Splitter = tokenize.NewSplitterConfig() + f.Splitter.Multiline = tokenize.MultilineConfig{ LineStartPattern: ".*", LineEndPattern: ".*", } @@ -505,8 +506,8 @@ func TestBuild(t *testing.T) { { "NoLineStartOrEnd", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{} + f.Splitter = tokenize.NewSplitterConfig() + f.Splitter.Multiline = tokenize.MultilineConfig{} }, require.NoError, func(t *testing.T, f *Manager) {}, @@ -514,8 +515,8 @@ func TestBuild(t *testing.T) { { "InvalidLineStartRegex", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ + f.Splitter = tokenize.NewSplitterConfig() + f.Splitter.Multiline = tokenize.MultilineConfig{ LineStartPattern: "(", } }, @@ -525,8 +526,8 @@ func TestBuild(t *testing.T) { { "InvalidLineEndRegex", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ + f.Splitter = tokenize.NewSplitterConfig() + f.Splitter.Multiline = tokenize.MultilineConfig{ LineEndPattern: "(", } }, diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 37f7a8569283..7361c5eeccf8 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -24,6 +24,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" ) func TestCleanStop(t *testing.T) { @@ -545,7 +546,7 @@ func TestNoNewline(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - cfg.Splitter = helper.NewSplitterConfig() + cfg.Splitter = tokenize.NewSplitterConfig() cfg.Splitter.Flusher.Period = time.Nanosecond operator, emitCalls := buildTestManager(t, cfg) diff --git a/pkg/stanza/fileconsumer/internal/header/config.go b/pkg/stanza/fileconsumer/internal/header/config.go index 9bd329a95ec8..5645607640bd 100644 --- a/pkg/stanza/fileconsumer/internal/header/config.go +++ b/pkg/stanza/fileconsumer/internal/header/config.go @@ -14,8 +14,8 @@ import ( "golang.org/x/text/encoding" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" ) type Config struct { @@ -69,7 +69,7 @@ func NewConfig(matchRegex string, metadataOperators []operator.Config, enc encod return nil, fmt.Errorf("failed to compile `pattern`: %w", err) } - splitFunc, err := helper.NewNewlineSplitFunc(enc, false, func(b []byte) []byte { + splitFunc, err := tokenize.NewNewlineSplitFunc(enc, false, func(b []byte) []byte { return bytes.Trim(b, "\r\n") }) if err != nil { diff --git a/pkg/stanza/fileconsumer/internal/splitter/custom.go b/pkg/stanza/fileconsumer/internal/splitter/custom.go index 930dedf23e05..d4059962bd19 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/custom.go +++ b/pkg/stanza/fileconsumer/internal/splitter/custom.go @@ -6,18 +6,18 @@ package splitter // import "github.com/open-telemetry/opentelemetry-collector-co import ( "bufio" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" ) type customFactory struct { - Flusher helper.FlusherConfig + Flusher tokenize.FlusherConfig Splitter bufio.SplitFunc } var _ Factory = (*customFactory)(nil) func NewCustomFactory( - flusher helper.FlusherConfig, + flusher tokenize.FlusherConfig, splitter bufio.SplitFunc) Factory { return &customFactory{ Flusher: flusher, diff --git a/pkg/stanza/fileconsumer/internal/splitter/custom_test.go b/pkg/stanza/fileconsumer/internal/splitter/custom_test.go index 867e5ad5e109..2d40533f1b50 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/custom_test.go +++ b/pkg/stanza/fileconsumer/internal/splitter/custom_test.go @@ -9,12 +9,12 @@ import ( "github.com/stretchr/testify/assert" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" ) func TestCustomFactory(t *testing.T) { type fields struct { - Flusher helper.FlusherConfig + Flusher tokenize.FlusherConfig Splitter bufio.SplitFunc } type args struct { @@ -29,7 +29,7 @@ func TestCustomFactory(t *testing.T) { { name: "default configuration", fields: fields{ - Flusher: helper.NewFlusherConfig(), + Flusher: tokenize.NewFlusherConfig(), Splitter: func(data []byte, atEOF bool) (advance int, token []byte, err error) { return len(data), data, nil }, diff --git a/pkg/stanza/fileconsumer/internal/splitter/multiline.go b/pkg/stanza/fileconsumer/internal/splitter/multiline.go index 152b9b2a65c6..66b1d2c8a5aa 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/multiline.go +++ b/pkg/stanza/fileconsumer/internal/splitter/multiline.go @@ -7,16 +7,16 @@ import ( "bufio" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decoder" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" ) type multilineFactory struct { - helper.SplitterConfig + tokenize.SplitterConfig } var _ Factory = (*multilineFactory)(nil) -func NewMultilineFactory(splitter helper.SplitterConfig) Factory { +func NewMultilineFactory(splitter tokenize.SplitterConfig) Factory { return &multilineFactory{ SplitterConfig: splitter, } diff --git a/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go b/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go index 9bed2f578614..99a8b8a2a44c 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go +++ b/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go @@ -8,7 +8,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" ) func TestMultilineBuild(t *testing.T) { @@ -17,13 +17,13 @@ func TestMultilineBuild(t *testing.T) { } tests := []struct { name string - splitterConfig helper.SplitterConfig + splitterConfig tokenize.SplitterConfig args args wantErr bool }{ { name: "default configuration", - splitterConfig: helper.NewSplitterConfig(), + splitterConfig: tokenize.NewSplitterConfig(), args: args{ maxLogSize: 1024, }, @@ -31,10 +31,10 @@ func TestMultilineBuild(t *testing.T) { }, { name: "eoncoding error", - splitterConfig: helper.SplitterConfig{ + splitterConfig: tokenize.SplitterConfig{ Encoding: "error", - Flusher: helper.NewFlusherConfig(), - Multiline: helper.NewMultilineConfig(), + Flusher: tokenize.NewFlusherConfig(), + Multiline: tokenize.NewMultilineConfig(), }, args: args{ maxLogSize: 1024, @@ -43,10 +43,10 @@ func TestMultilineBuild(t *testing.T) { }, { name: "Multiline error", - splitterConfig: helper.SplitterConfig{ + splitterConfig: tokenize.SplitterConfig{ Encoding: "utf-8", - Flusher: helper.NewFlusherConfig(), - Multiline: helper.MultilineConfig{ + Flusher: tokenize.NewFlusherConfig(), + Multiline: tokenize.MultilineConfig{ LineStartPattern: "START", LineEndPattern: "END", }, diff --git a/pkg/stanza/fileconsumer/reader_test.go b/pkg/stanza/fileconsumer/reader_test.go index 66a3087dc1e8..4f2d3be9da94 100644 --- a/pkg/stanza/fileconsumer/reader_test.go +++ b/pkg/stanza/fileconsumer/reader_test.go @@ -16,14 +16,14 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/splitter" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" ) func TestPersistFlusher(t *testing.T) { flushPeriod := 100 * time.Millisecond - sCfg := helper.NewSplitterConfig() + sCfg := tokenize.NewSplitterConfig() sCfg.Flusher.Period = flushPeriod f, emitChan := testReaderFactoryWithSplitter(t, sCfg) @@ -174,11 +174,11 @@ func TestTokenizationTooLongWithLineStartPattern(t *testing.T) { f, emitChan := testReaderFactory(t) - mlc := helper.NewMultilineConfig() + mlc := tokenize.NewMultilineConfig() mlc.LineStartPattern = `\d+-\d+-\d+` - f.splitterFactory = splitter.NewMultilineFactory(helper.SplitterConfig{ + f.splitterFactory = splitter.NewMultilineFactory(tokenize.SplitterConfig{ Encoding: "utf-8", - Flusher: helper.NewFlusherConfig(), + Flusher: tokenize.NewFlusherConfig(), Multiline: mlc, }) f.readerConfig.maxLogSize = 15 @@ -234,10 +234,10 @@ func TestHeaderFingerprintIncluded(t *testing.T) { } func testReaderFactory(t *testing.T) (*readerFactory, chan *emitParams) { - return testReaderFactoryWithSplitter(t, helper.NewSplitterConfig()) + return testReaderFactoryWithSplitter(t, tokenize.NewSplitterConfig()) } -func testReaderFactoryWithSplitter(t *testing.T, splitterConfig helper.SplitterConfig) (*readerFactory, chan *emitParams) { +func testReaderFactoryWithSplitter(t *testing.T, splitterConfig tokenize.SplitterConfig) (*readerFactory, chan *emitParams) { emitChan := make(chan *emitParams, 100) enc, err := decoder.LookupEncoding(splitterConfig.Encoding) require.NoError(t, err) diff --git a/pkg/stanza/operator/helper/encoding.go b/pkg/stanza/operator/helper/encoding.go index 9a625fd623f2..2ebd4c7e5f11 100644 --- a/pkg/stanza/operator/helper/encoding.go +++ b/pkg/stanza/operator/helper/encoding.go @@ -20,7 +20,6 @@ type EncodingConfig struct { // Deprecated: [v0.84.0] Use decoder.Decoder instead type Decoder = decoder.Decoder -// Deprecated: [v0.84.0] Use decoder.New instead var NewDecoder = decoder.New // Deprecated: [v0.84.0] Use decoder.LookupEncoding instead diff --git a/pkg/stanza/operator/helper/flusher.go b/pkg/stanza/operator/helper/flusher.go index 60efd4868f11..166ae0485c79 100644 --- a/pkg/stanza/operator/helper/flusher.go +++ b/pkg/stanza/operator/helper/flusher.go @@ -3,98 +3,13 @@ package helper // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" -import ( - "bufio" - "time" -) +import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" -// FlusherConfig is a configuration of Flusher helper -type FlusherConfig struct { - Period time.Duration `mapstructure:"force_flush_period"` -} +// Deprecated: [v0.84.0] Use tokenize.FlusherConfig instead +type FlusherConfig = tokenize.FlusherConfig -// NewFlusherConfig creates a default Flusher config -func NewFlusherConfig() FlusherConfig { - return FlusherConfig{ - // Empty or `0s` means that we will never force flush - Period: time.Millisecond * 500, - } -} +// Deprecated: [v0.84.0] Use tokenize.NewFlusherConfig instead +var NewFlusherConfig = tokenize.NewFlusherConfig -// Build creates Flusher from configuration -func (c *FlusherConfig) Build() *Flusher { - return &Flusher{ - lastDataChange: time.Now(), - forcePeriod: c.Period, - previousDataLength: 0, - } -} - -// Flusher keeps information about flush state -type Flusher struct { - // forcePeriod defines time from last flush which should pass before setting force to true. - // Never forces if forcePeriod is set to 0 - forcePeriod time.Duration - - // lastDataChange tracks date of last data change (including new data and flushes) - lastDataChange time.Time - - // previousDataLength: - // if previousDataLength = 0 - no new data have been received after flush - // if previousDataLength > 0 - there is data which has not been flushed yet and it doesn't changed since lastDataChange - previousDataLength int -} - -func (f *Flusher) UpdateDataChangeTime(length int) { - // Skip if length is greater than 0 and didn't changed - if length > 0 && length == f.previousDataLength { - return - } - - // update internal properties with new values if data length changed - // because it means that data is flowing and being processed - f.previousDataLength = length - f.lastDataChange = time.Now() -} - -// Flushed reset data length -func (f *Flusher) Flushed() { - f.UpdateDataChangeTime(0) -} - -// ShouldFlush returns true if data should be forcefully flushed -func (f *Flusher) ShouldFlush() bool { - // Returns true if there is f.forcePeriod after f.lastDataChange and data length is greater than 0 - return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0 -} - -func (f *Flusher) SplitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { - return func(data []byte, atEOF bool) (advance int, token []byte, err error) { - advance, token, err = splitFunc(data, atEOF) - - // Return as it is in case of error - if err != nil { - return - } - - // Return token - if token != nil { - // Inform flusher that we just flushed - f.Flushed() - return - } - - // If there is no token, force flush eventually - if f.ShouldFlush() { - // Inform flusher that we just flushed - f.Flushed() - token = trimWhitespacesFunc(data) - advance = len(data) - return - } - - // Inform flusher that we didn't flushed - f.UpdateDataChangeTime(len(data)) - return - } -} +// Deprecated: [v0.84.0] Use tokenize.Flusher instead +type Flusher = tokenize.Flusher diff --git a/pkg/stanza/operator/helper/multiline.go b/pkg/stanza/operator/helper/multiline.go index 53fadde0149b..7a237bae9f89 100644 --- a/pkg/stanza/operator/helper/multiline.go +++ b/pkg/stanza/operator/helper/multiline.go @@ -3,254 +3,22 @@ package helper // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" -import ( - "bufio" - "bytes" - "fmt" - "regexp" +import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" - "golang.org/x/text/encoding" -) +// Deprecated: [v0.84.0] Use tokenize.Multiline instead +type Multiline = tokenize.Multiline -// Multiline consists of splitFunc and variables needed to perform force flush -type Multiline struct { - SplitFunc bufio.SplitFunc - Force *Flusher -} +// Deprecated: [v0.84.0] Use tokenize.NewMultiline instead +var NewMultilineConfig = tokenize.NewMultilineConfig -// NewMultilineConfig creates a new Multiline config -func NewMultilineConfig() MultilineConfig { - return MultilineConfig{ - LineStartPattern: "", - LineEndPattern: "", - } -} +// Deprecated: [v0.84.0] Use tokenize.MultilineConfig instead +type MultilineConfig = tokenize.MultilineConfig -// MultilineConfig is the configuration of a multiline helper -type MultilineConfig struct { - LineStartPattern string `mapstructure:"line_start_pattern"` - LineEndPattern string `mapstructure:"line_end_pattern"` -} +// Deprecated: [v0.84.0] Use tokenize.NewLineStartSplitFunc instead +var NewLineStartSplitFunc = tokenize.NewLineStartSplitFunc -// Build will build a Multiline operator. -func (c MultilineConfig) Build(enc encoding.Encoding, flushAtEOF, preserveLeadingWhitespaces, preserveTrailingWhitespaces bool, force *Flusher, maxLogSize int) (bufio.SplitFunc, error) { - return c.getSplitFunc(enc, flushAtEOF, force, maxLogSize, preserveLeadingWhitespaces, preserveTrailingWhitespaces) -} +// Deprecated: [v0.84.0] Use tokenize.NewLineEndSplitFunc instead +var NewLineEndSplitFunc = tokenize.NewLineEndSplitFunc -// getSplitFunc returns split function for bufio.Scanner basing on configured pattern -func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, force *Flusher, maxLogSize int, preserveLeadingWhitespaces, preserveTrailingWhitespaces bool) (bufio.SplitFunc, error) { - endPattern := c.LineEndPattern - startPattern := c.LineStartPattern - - var ( - splitFunc bufio.SplitFunc - err error - ) - - switch { - case endPattern != "" && startPattern != "": - return nil, fmt.Errorf("only one of line_start_pattern or line_end_pattern can be set") - case enc == encoding.Nop && (endPattern != "" || startPattern != ""): - return nil, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding") - case enc == encoding.Nop: - return SplitNone(maxLogSize), nil - case endPattern == "" && startPattern == "": - splitFunc, err = NewNewlineSplitFunc(enc, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces)) - if err != nil { - return nil, err - } - case endPattern != "": - re, err := regexp.Compile("(?m)" + c.LineEndPattern) - if err != nil { - return nil, fmt.Errorf("compile line end regex: %w", err) - } - splitFunc = NewLineEndSplitFunc(re, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces)) - case startPattern != "": - re, err := regexp.Compile("(?m)" + c.LineStartPattern) - if err != nil { - return nil, fmt.Errorf("compile line start regex: %w", err) - } - splitFunc = NewLineStartSplitFunc(re, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces)) - default: - return nil, fmt.Errorf("unreachable") - } - - if force != nil { - return force.SplitFunc(splitFunc), nil - } - - return splitFunc, nil -} - -// NewLineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into -// tokens that start with a match to the regex pattern provided -func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) bufio.SplitFunc { - return func(data []byte, atEOF bool) (advance int, token []byte, err error) { - firstLoc := re.FindIndex(data) - if firstLoc == nil { - // Flush if no more data is expected - if len(data) != 0 && atEOF && flushAtEOF { - token = trimFunc(data) - advance = len(data) - return - } - return 0, nil, nil // read more data and try again. - } - firstMatchStart := firstLoc[0] - firstMatchEnd := firstLoc[1] - - if firstMatchStart != 0 { - // the beginning of the file does not match the start pattern, so return a token up to the first match so we don't lose data - advance = firstMatchStart - token = trimFunc(data[0:firstMatchStart]) - - // return if non-matching pattern is not only whitespaces - if token != nil { - return - } - } - - if firstMatchEnd == len(data) { - // the first match goes to the end of the bufer, so don't look for a second match - return 0, nil, nil - } - - // Flush if no more data is expected - if atEOF && flushAtEOF { - token = trimFunc(data) - advance = len(data) - return - } - - secondLocOfset := firstMatchEnd + 1 - secondLoc := re.FindIndex(data[secondLocOfset:]) - if secondLoc == nil { - return 0, nil, nil // read more data and try again - } - secondMatchStart := secondLoc[0] + secondLocOfset - - advance = secondMatchStart // start scanning at the beginning of the second match - token = trimFunc(data[firstMatchStart:secondMatchStart]) // the token begins at the first match, and ends at the beginning of the second match - err = nil - return - } -} - -// NewLineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into -// tokens that end with a match to the regex pattern provided -func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) bufio.SplitFunc { - return func(data []byte, atEOF bool) (advance int, token []byte, err error) { - loc := re.FindIndex(data) - if loc == nil { - // Flush if no more data is expected - if len(data) != 0 && atEOF && flushAtEOF { - token = trimFunc(data) - advance = len(data) - return - } - return 0, nil, nil // read more data and try again - } - - // If the match goes up to the end of the current bufer, do another - // read until we can capture the entire match - if loc[1] == len(data)-1 && !atEOF { - return 0, nil, nil - } - - advance = loc[1] - token = trimFunc(data[:loc[1]]) - err = nil - return - } -} - -// NewNewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but -// never returning an token using EOF as a terminator -func NewNewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool, trimFunc trimFunc) (bufio.SplitFunc, error) { - newline, err := encodedNewline(enc) - if err != nil { - return nil, err - } - - carriageReturn, err := encodedCarriageReturn(enc) - if err != nil { - return nil, err - } - - return func(data []byte, atEOF bool) (advance int, token []byte, err error) { - if atEOF && len(data) == 0 { - return 0, nil, nil - } - - if i := bytes.Index(data, newline); i >= 0 { - // We have a full newline-terminated line. - token = bytes.TrimSuffix(data[:i], carriageReturn) - - return i + len(newline), trimFunc(token), nil - } - - // Flush if no more data is expected - if atEOF && flushAtEOF { - token = trimFunc(data) - advance = len(data) - return - } - - // Request more data. - return 0, nil, nil - }, nil -} - -func encodedNewline(enc encoding.Encoding) ([]byte, error) { - out := make([]byte, 10) - nDst, _, err := enc.NewEncoder().Transform(out, []byte{'\n'}, true) - return out[:nDst], err -} - -func encodedCarriageReturn(enc encoding.Encoding) ([]byte, error) { - out := make([]byte, 10) - nDst, _, err := enc.NewEncoder().Transform(out, []byte{'\r'}, true) - return out[:nDst], err -} - -type trimFunc func([]byte) []byte - -func noTrim(token []byte) []byte { - return token -} - -func trimLeadingWhitespacesFunc(data []byte) []byte { - // TrimLeft to strip EOF whitespaces in case of using $ in regex - // For some reason newline and carriage return are being moved to beginning of next log - token := bytes.TrimLeft(data, "\r\n\t ") - if token == nil { - return []byte{} - } - return token -} - -func trimTrailingWhitespacesFunc(data []byte) []byte { - // TrimRight to strip all whitespaces from the end of log - token := bytes.TrimRight(data, "\r\n\t ") - if token == nil { - return []byte{} - } - return token -} - -func trimWhitespacesFunc(data []byte) []byte { - return trimLeadingWhitespacesFunc(trimTrailingWhitespacesFunc(data)) -} - -func getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces bool) trimFunc { - if preserveLeadingWhitespaces && preserveTrailingWhitespaces { - return noTrim - } - if preserveLeadingWhitespaces { - return trimTrailingWhitespacesFunc - } - if preserveTrailingWhitespaces { - return trimLeadingWhitespacesFunc - } - return trimWhitespacesFunc -} +// Deprecated: [v0.84.0] Use tokenize.NewNewlineSplitFunc instead +var NewNewlineSplitFunc = tokenize.NewNewlineSplitFunc diff --git a/pkg/stanza/operator/helper/splitter.go b/pkg/stanza/operator/helper/splitter.go index 65074c50355b..d584aee934c2 100644 --- a/pkg/stanza/operator/helper/splitter.go +++ b/pkg/stanza/operator/helper/splitter.go @@ -3,67 +3,16 @@ package helper // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" -import "bufio" +import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" -// SplitterConfig consolidates MultilineConfig and FlusherConfig -type SplitterConfig struct { - Encoding string `mapstructure:"encoding,omitempty"` - Flusher FlusherConfig `mapstructure:",squash,omitempty"` - Multiline MultilineConfig `mapstructure:"multiline,omitempty"` - PreserveLeadingWhitespaces bool `mapstructure:"preserve_leading_whitespaces,omitempty"` - PreserveTrailingWhitespaces bool `mapstructure:"preserve_trailing_whitespaces,omitempty"` -} +// Deprecated: [v0.84.0] Use tokenize.SplitterConfig instead +type SplitterConfig = tokenize.SplitterConfig -// NewSplitterConfig returns default SplitterConfig -func NewSplitterConfig() SplitterConfig { - return SplitterConfig{ - Encoding: "utf-8", - Multiline: NewMultilineConfig(), - Flusher: NewFlusherConfig(), - } -} +// Deprecated: [v0.84.0] Use tokenize.NewSplitterConfig instead +var NewSplitterConfig = tokenize.NewSplitterConfig -// Build builds Splitter struct -func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (*Splitter, error) { - enc, err := LookupEncoding(c.Encoding) - if err != nil { - return nil, err - } +// Deprecated: [v0.84.0] Use tokenize.Splitter instead +type Splitter = tokenize.Splitter - flusher := c.Flusher.Build() - splitFunc, err := c.Multiline.Build(enc, flushAtEOF, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, flusher, maxLogSize) - if err != nil { - return nil, err - } - - return &Splitter{ - Decoder: NewDecoder(enc), - Flusher: flusher, - SplitFunc: splitFunc, - }, nil -} - -// Splitter consolidates Flusher and dependent splitFunc -type Splitter struct { - Decoder *Decoder - SplitFunc bufio.SplitFunc - Flusher *Flusher -} - -// SplitNone doesn't split any of the bytes, it reads in all of the bytes and returns it all at once. This is for when the encoding is nop -func SplitNone(maxLogSize int) bufio.SplitFunc { - return func(data []byte, atEOF bool) (advance int, token []byte, err error) { - if len(data) >= maxLogSize { - return maxLogSize, data[:maxLogSize], nil - } - - if !atEOF { - return 0, nil, nil - } - - if len(data) == 0 { - return 0, nil, nil - } - return len(data), data, nil - } -} +// Deprecated: [v0.84.0] Use tokenize.SplitNone instead +var SplitNone = tokenize.SplitNone diff --git a/pkg/stanza/operator/input/file/config_test.go b/pkg/stanza/operator/input/file/config_test.go index a8b7dbc66bbf..5b45a021230a 100644 --- a/pkg/stanza/operator/input/file/config_test.go +++ b/pkg/stanza/operator/input/file/config_test.go @@ -14,6 +14,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" ) func TestUnmarshal(t *testing.T) { @@ -314,7 +315,7 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *Config { cfg := NewConfig() - newSplit := helper.NewSplitterConfig() + newSplit := tokenize.NewSplitterConfig() newSplit.Multiline.LineStartPattern = "Start" cfg.Splitter = newSplit return cfg @@ -325,7 +326,7 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *Config { cfg := NewConfig() - newSplit := helper.NewSplitterConfig() + newSplit := tokenize.NewSplitterConfig() newSplit.Multiline.LineStartPattern = "%" cfg.Splitter = newSplit return cfg @@ -336,7 +337,7 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *Config { cfg := NewConfig() - newSplit := helper.NewSplitterConfig() + newSplit := tokenize.NewSplitterConfig() newSplit.Multiline.LineEndPattern = "Start" cfg.Splitter = newSplit return cfg @@ -347,7 +348,7 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *Config { cfg := NewConfig() - newSplit := helper.NewSplitterConfig() + newSplit := tokenize.NewSplitterConfig() newSplit.Multiline.LineEndPattern = "%" cfg.Splitter = newSplit return cfg @@ -475,8 +476,8 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredStartAndEndPatterns", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ + f.Splitter = tokenize.NewSplitterConfig() + f.Splitter.Multiline = tokenize.MultilineConfig{ LineEndPattern: "Exists", LineStartPattern: "Exists", } @@ -487,8 +488,8 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredStartPattern", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ + f.Splitter = tokenize.NewSplitterConfig() + f.Splitter.Multiline = tokenize.MultilineConfig{ LineStartPattern: "START.*", } }, @@ -498,8 +499,8 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredEndPattern", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ + f.Splitter = tokenize.NewSplitterConfig() + f.Splitter.Multiline = tokenize.MultilineConfig{ LineEndPattern: "END.*", } }, @@ -517,8 +518,8 @@ func TestBuild(t *testing.T) { { "LineStartAndEnd", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ + f.Splitter = tokenize.NewSplitterConfig() + f.Splitter.Multiline = tokenize.MultilineConfig{ LineStartPattern: ".*", LineEndPattern: ".*", } @@ -529,8 +530,8 @@ func TestBuild(t *testing.T) { { "NoLineStartOrEnd", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{} + f.Splitter = tokenize.NewSplitterConfig() + f.Splitter.Multiline = tokenize.MultilineConfig{} }, require.NoError, func(t *testing.T, f *Input) {}, @@ -538,8 +539,8 @@ func TestBuild(t *testing.T) { { "InvalidLineStartRegex", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ + f.Splitter = tokenize.NewSplitterConfig() + f.Splitter.Multiline = tokenize.MultilineConfig{ LineStartPattern: "(", } }, @@ -549,8 +550,8 @@ func TestBuild(t *testing.T) { { "InvalidLineEndRegex", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ + f.Splitter = tokenize.NewSplitterConfig() + f.Splitter.Multiline = tokenize.MultilineConfig{ LineEndPattern: "(", } }, diff --git a/pkg/stanza/operator/input/syslog/config_test.go b/pkg/stanza/operator/input/syslog/config_test.go index acea9a7c153b..547c9a5cd56a 100644 --- a/pkg/stanza/operator/input/syslog/config_test.go +++ b/pkg/stanza/operator/input/syslog/config_test.go @@ -9,10 +9,10 @@ import ( "go.opentelemetry.io/collector/config/configtls" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/udp" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" ) func TestUnmarshal(t *testing.T) { @@ -38,7 +38,7 @@ func TestUnmarshal(t *testing.T) { cfg.TCP.ListenAddress = "10.0.0.1:9000" cfg.TCP.AddAttributes = true cfg.TCP.Encoding = "utf-16" - cfg.TCP.Multiline = helper.NewMultilineConfig() + cfg.TCP.Multiline = tokenize.NewMultilineConfig() cfg.TCP.Multiline.LineStartPattern = "ABC" cfg.TCP.TLS = &configtls.TLSServerSetting{ TLSSetting: configtls.TLSSetting{ @@ -62,7 +62,7 @@ func TestUnmarshal(t *testing.T) { cfg.UDP.ListenAddress = "10.0.0.1:9000" cfg.UDP.AddAttributes = true cfg.UDP.Encoding = "utf-16" - cfg.UDP.Multiline = helper.NewMultilineConfig() + cfg.UDP.Multiline = tokenize.NewMultilineConfig() cfg.UDP.Multiline.LineStartPattern = "ABC" return cfg }(), diff --git a/pkg/stanza/operator/input/syslog/syslog_test.go b/pkg/stanza/operator/input/syslog/syslog_test.go index d65d81a86457..4decaf3f5e12 100644 --- a/pkg/stanza/operator/input/syslog/syslog_test.go +++ b/pkg/stanza/operator/input/syslog/syslog_test.go @@ -254,3 +254,5 @@ func TestOctetFramingSplitFunc(t *testing.T) { t.Run(tc.Name, tc.RunFunc(splitFunc)) } } + +// TODO refactor test dependency away from internal? diff --git a/pkg/stanza/operator/input/tcp/config_test.go b/pkg/stanza/operator/input/tcp/config_test.go index 228daa1e02d2..8a37688ab62f 100644 --- a/pkg/stanza/operator/input/tcp/config_test.go +++ b/pkg/stanza/operator/input/tcp/config_test.go @@ -9,8 +9,8 @@ import ( "go.opentelemetry.io/collector/config/configtls" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" ) func TestUnmarshal(t *testing.T) { @@ -32,7 +32,7 @@ func TestUnmarshal(t *testing.T) { cfg.ListenAddress = "10.0.0.1:9000" cfg.AddAttributes = true cfg.Encoding = "utf-8" - cfg.Multiline = helper.NewMultilineConfig() + cfg.Multiline = tokenize.NewMultilineConfig() cfg.Multiline.LineStartPattern = "ABC" cfg.TLS = &configtls.TLSServerSetting{ TLSSetting: configtls.TLSSetting{ diff --git a/pkg/stanza/operator/input/tcp/tcp.go b/pkg/stanza/operator/input/tcp/tcp.go index a4ac4eb53d1a..caf6da6baf0d 100644 --- a/pkg/stanza/operator/input/tcp/tcp.go +++ b/pkg/stanza/operator/input/tcp/tcp.go @@ -24,6 +24,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decoder" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" ) const ( @@ -53,7 +54,7 @@ func NewConfigWithID(operatorID string) *Config { InputConfig: helper.NewInputConfig(operatorID, operatorType), BaseConfig: BaseConfig{ OneLogPerPacket: false, - Multiline: helper.NewMultilineConfig(), + Multiline: tokenize.NewMultilineConfig(), Encoding: "utf-8", }, } @@ -73,7 +74,7 @@ type BaseConfig struct { AddAttributes bool `mapstructure:"add_attributes,omitempty"` OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"` Encoding string `mapstructure:"encoding,omitempty"` - Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty"` + Multiline tokenize.MultilineConfig `mapstructure:"multiline,omitempty"` PreserveLeadingWhitespaces bool `mapstructure:"preserve_leading_whitespaces,omitempty"` PreserveTrailingWhitespaces bool `mapstructure:"preserve_trailing_whitespaces,omitempty"` MultiLineBuilder MultiLineBuilderFunc diff --git a/pkg/stanza/operator/input/udp/config_test.go b/pkg/stanza/operator/input/udp/config_test.go index 77f6396403c4..6671d5db34ce 100644 --- a/pkg/stanza/operator/input/udp/config_test.go +++ b/pkg/stanza/operator/input/udp/config_test.go @@ -7,8 +7,8 @@ import ( "path/filepath" "testing" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" ) func TestUnmarshal(t *testing.T) { @@ -29,7 +29,7 @@ func TestUnmarshal(t *testing.T) { cfg.ListenAddress = "10.0.0.1:9000" cfg.AddAttributes = true cfg.Encoding = "utf-8" - cfg.Multiline = helper.NewMultilineConfig() + cfg.Multiline = tokenize.NewMultilineConfig() cfg.Multiline.LineStartPattern = "ABC" return cfg }(), diff --git a/pkg/stanza/operator/input/udp/udp.go b/pkg/stanza/operator/input/udp/udp.go index 3c8333956546..e92142dc54dd 100644 --- a/pkg/stanza/operator/input/udp/udp.go +++ b/pkg/stanza/operator/input/udp/udp.go @@ -18,6 +18,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decoder" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" ) const ( @@ -43,7 +44,7 @@ func NewConfigWithID(operatorID string) *Config { BaseConfig: BaseConfig{ Encoding: "utf-8", OneLogPerPacket: false, - Multiline: helper.MultilineConfig{ + Multiline: tokenize.MultilineConfig{ LineStartPattern: "", LineEndPattern: ".^", // Use never matching regex to not split data by default }, @@ -59,13 +60,13 @@ type Config struct { // BaseConfig is the details configuration of a udp input operator. type BaseConfig struct { - ListenAddress string `mapstructure:"listen_address,omitempty"` - OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"` - AddAttributes bool `mapstructure:"add_attributes,omitempty"` - Encoding string `mapstructure:"encoding,omitempty"` - Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty"` - PreserveLeadingWhitespaces bool `mapstructure:"preserve_leading_whitespaces,omitempty"` - PreserveTrailingWhitespaces bool `mapstructure:"preserve_trailing_whitespaces,omitempty"` + ListenAddress string `mapstructure:"listen_address,omitempty"` + OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"` + AddAttributes bool `mapstructure:"add_attributes,omitempty"` + Encoding string `mapstructure:"encoding,omitempty"` + Multiline tokenize.MultilineConfig `mapstructure:"multiline,omitempty"` + PreserveLeadingWhitespaces bool `mapstructure:"preserve_leading_whitespaces,omitempty"` + PreserveTrailingWhitespaces bool `mapstructure:"preserve_trailing_whitespaces,omitempty"` } // Build will build a udp input operator. diff --git a/pkg/stanza/tokenize/flusher.go b/pkg/stanza/tokenize/flusher.go new file mode 100644 index 000000000000..922606179e5a --- /dev/null +++ b/pkg/stanza/tokenize/flusher.go @@ -0,0 +1,100 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tokenize // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" + +import ( + "bufio" + "time" +) + +// FlusherConfig is a configuration of Flusher helper +type FlusherConfig struct { + Period time.Duration `mapstructure:"force_flush_period"` +} + +// NewFlusherConfig creates a default Flusher config +func NewFlusherConfig() FlusherConfig { + return FlusherConfig{ + // Empty or `0s` means that we will never force flush + Period: time.Millisecond * 500, + } +} + +// Build creates Flusher from configuration +func (c *FlusherConfig) Build() *Flusher { + return &Flusher{ + lastDataChange: time.Now(), + forcePeriod: c.Period, + previousDataLength: 0, + } +} + +// Flusher keeps information about flush state +type Flusher struct { + // forcePeriod defines time from last flush which should pass before setting force to true. + // Never forces if forcePeriod is set to 0 + forcePeriod time.Duration + + // lastDataChange tracks date of last data change (including new data and flushes) + lastDataChange time.Time + + // previousDataLength: + // if previousDataLength = 0 - no new data have been received after flush + // if previousDataLength > 0 - there is data which has not been flushed yet and it doesn't changed since lastDataChange + previousDataLength int +} + +func (f *Flusher) UpdateDataChangeTime(length int) { + // Skip if length is greater than 0 and didn't changed + if length > 0 && length == f.previousDataLength { + return + } + + // update internal properties with new values if data length changed + // because it means that data is flowing and being processed + f.previousDataLength = length + f.lastDataChange = time.Now() +} + +// Flushed reset data length +func (f *Flusher) Flushed() { + f.UpdateDataChangeTime(0) +} + +// ShouldFlush returns true if data should be forcefully flushed +func (f *Flusher) ShouldFlush() bool { + // Returns true if there is f.forcePeriod after f.lastDataChange and data length is greater than 0 + return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0 +} + +func (f *Flusher) SplitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + advance, token, err = splitFunc(data, atEOF) + + // Return as it is in case of error + if err != nil { + return + } + + // Return token + if token != nil { + // Inform flusher that we just flushed + f.Flushed() + return + } + + // If there is no token, force flush eventually + if f.ShouldFlush() { + // Inform flusher that we just flushed + f.Flushed() + token = trimWhitespacesFunc(data) + advance = len(data) + return + } + + // Inform flusher that we didn't flushed + f.UpdateDataChangeTime(len(data)) + return + } +} diff --git a/pkg/stanza/tokenize/multiline.go b/pkg/stanza/tokenize/multiline.go new file mode 100644 index 000000000000..99682334e208 --- /dev/null +++ b/pkg/stanza/tokenize/multiline.go @@ -0,0 +1,256 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tokenize // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" + +import ( + "bufio" + "bytes" + "fmt" + "regexp" + + "golang.org/x/text/encoding" +) + +// Multiline consists of splitFunc and variables needed to perform force flush +type Multiline struct { + SplitFunc bufio.SplitFunc + Force *Flusher +} + +// NewMultilineConfig creates a new Multiline config +func NewMultilineConfig() MultilineConfig { + return MultilineConfig{ + LineStartPattern: "", + LineEndPattern: "", + } +} + +// MultilineConfig is the configuration of a multiline helper +type MultilineConfig struct { + LineStartPattern string `mapstructure:"line_start_pattern"` + LineEndPattern string `mapstructure:"line_end_pattern"` +} + +// Build will build a Multiline operator. +func (c MultilineConfig) Build(enc encoding.Encoding, flushAtEOF, preserveLeadingWhitespaces, preserveTrailingWhitespaces bool, force *Flusher, maxLogSize int) (bufio.SplitFunc, error) { + return c.getSplitFunc(enc, flushAtEOF, force, maxLogSize, preserveLeadingWhitespaces, preserveTrailingWhitespaces) +} + +// getSplitFunc returns split function for bufio.Scanner basing on configured pattern +func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, force *Flusher, maxLogSize int, preserveLeadingWhitespaces, preserveTrailingWhitespaces bool) (bufio.SplitFunc, error) { + endPattern := c.LineEndPattern + startPattern := c.LineStartPattern + + var ( + splitFunc bufio.SplitFunc + err error + ) + + switch { + case endPattern != "" && startPattern != "": + return nil, fmt.Errorf("only one of line_start_pattern or line_end_pattern can be set") + case enc == encoding.Nop && (endPattern != "" || startPattern != ""): + return nil, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding") + case enc == encoding.Nop: + return SplitNone(maxLogSize), nil + case endPattern == "" && startPattern == "": + splitFunc, err = NewNewlineSplitFunc(enc, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces)) + if err != nil { + return nil, err + } + case endPattern != "": + re, err := regexp.Compile("(?m)" + c.LineEndPattern) + if err != nil { + return nil, fmt.Errorf("compile line end regex: %w", err) + } + splitFunc = NewLineEndSplitFunc(re, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces)) + case startPattern != "": + re, err := regexp.Compile("(?m)" + c.LineStartPattern) + if err != nil { + return nil, fmt.Errorf("compile line start regex: %w", err) + } + splitFunc = NewLineStartSplitFunc(re, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces)) + default: + return nil, fmt.Errorf("unreachable") + } + + if force != nil { + return force.SplitFunc(splitFunc), nil + } + + return splitFunc, nil +} + +// NewLineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into +// tokens that start with a match to the regex pattern provided +func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) bufio.SplitFunc { + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + firstLoc := re.FindIndex(data) + if firstLoc == nil { + // Flush if no more data is expected + if len(data) != 0 && atEOF && flushAtEOF { + token = trimFunc(data) + advance = len(data) + return + } + return 0, nil, nil // read more data and try again. + } + firstMatchStart := firstLoc[0] + firstMatchEnd := firstLoc[1] + + if firstMatchStart != 0 { + // the beginning of the file does not match the start pattern, so return a token up to the first match so we don't lose data + advance = firstMatchStart + token = trimFunc(data[0:firstMatchStart]) + + // return if non-matching pattern is not only whitespaces + if token != nil { + return + } + } + + if firstMatchEnd == len(data) { + // the first match goes to the end of the bufer, so don't look for a second match + return 0, nil, nil + } + + // Flush if no more data is expected + if atEOF && flushAtEOF { + token = trimFunc(data) + advance = len(data) + return + } + + secondLocOfset := firstMatchEnd + 1 + secondLoc := re.FindIndex(data[secondLocOfset:]) + if secondLoc == nil { + return 0, nil, nil // read more data and try again + } + secondMatchStart := secondLoc[0] + secondLocOfset + + advance = secondMatchStart // start scanning at the beginning of the second match + token = trimFunc(data[firstMatchStart:secondMatchStart]) // the token begins at the first match, and ends at the beginning of the second match + err = nil + return + } +} + +// NewLineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into +// tokens that end with a match to the regex pattern provided +func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) bufio.SplitFunc { + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + loc := re.FindIndex(data) + if loc == nil { + // Flush if no more data is expected + if len(data) != 0 && atEOF && flushAtEOF { + token = trimFunc(data) + advance = len(data) + return + } + return 0, nil, nil // read more data and try again + } + + // If the match goes up to the end of the current bufer, do another + // read until we can capture the entire match + if loc[1] == len(data)-1 && !atEOF { + return 0, nil, nil + } + + advance = loc[1] + token = trimFunc(data[:loc[1]]) + err = nil + return + } +} + +// NewNewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but +// never returning an token using EOF as a terminator +func NewNewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool, trimFunc trimFunc) (bufio.SplitFunc, error) { + newline, err := encodedNewline(enc) + if err != nil { + return nil, err + } + + carriageReturn, err := encodedCarriageReturn(enc) + if err != nil { + return nil, err + } + + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + + if i := bytes.Index(data, newline); i >= 0 { + // We have a full newline-terminated line. + token = bytes.TrimSuffix(data[:i], carriageReturn) + + return i + len(newline), trimFunc(token), nil + } + + // Flush if no more data is expected + if atEOF && flushAtEOF { + token = trimFunc(data) + advance = len(data) + return + } + + // Request more data. + return 0, nil, nil + }, nil +} + +func encodedNewline(enc encoding.Encoding) ([]byte, error) { + out := make([]byte, 10) + nDst, _, err := enc.NewEncoder().Transform(out, []byte{'\n'}, true) + return out[:nDst], err +} + +func encodedCarriageReturn(enc encoding.Encoding) ([]byte, error) { + out := make([]byte, 10) + nDst, _, err := enc.NewEncoder().Transform(out, []byte{'\r'}, true) + return out[:nDst], err +} + +type trimFunc func([]byte) []byte + +func noTrim(token []byte) []byte { + return token +} + +func trimLeadingWhitespacesFunc(data []byte) []byte { + // TrimLeft to strip EOF whitespaces in case of using $ in regex + // For some reason newline and carriage return are being moved to beginning of next log + token := bytes.TrimLeft(data, "\r\n\t ") + if token == nil { + return []byte{} + } + return token +} + +func trimTrailingWhitespacesFunc(data []byte) []byte { + // TrimRight to strip all whitespaces from the end of log + token := bytes.TrimRight(data, "\r\n\t ") + if token == nil { + return []byte{} + } + return token +} + +func trimWhitespacesFunc(data []byte) []byte { + return trimLeadingWhitespacesFunc(trimTrailingWhitespacesFunc(data)) +} + +func getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces bool) trimFunc { + if preserveLeadingWhitespaces && preserveTrailingWhitespaces { + return noTrim + } + if preserveLeadingWhitespaces { + return trimTrailingWhitespacesFunc + } + if preserveTrailingWhitespaces { + return trimLeadingWhitespacesFunc + } + return trimWhitespacesFunc +} diff --git a/pkg/stanza/operator/helper/multiline_test.go b/pkg/stanza/tokenize/multiline_test.go similarity index 84% rename from pkg/stanza/operator/helper/multiline_test.go rename to pkg/stanza/tokenize/multiline_test.go index 19abb1eebc0a..af0dffc69e35 100644 --- a/pkg/stanza/operator/helper/multiline_test.go +++ b/pkg/stanza/tokenize/multiline_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package helper +package tokenize import ( "bufio" @@ -16,8 +16,6 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/text/encoding" "golang.org/x/text/encoding/unicode" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/internal" ) const ( @@ -27,14 +25,14 @@ const ( ) type MultiLineTokenizerTestCase struct { - internal.TokenizerTestCase + TokenizerTestCase Flusher *Flusher } func TestLineStartSplitFunc(t *testing.T) { testCases := []MultiLineTokenizerTestCase{ { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "OneLogSimple", Pattern: `LOGSTART \d+ `, Raw: []byte("LOGSTART 123 log1LOGSTART 123 a"), @@ -45,7 +43,7 @@ func TestLineStartSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "TwoLogsSimple", Pattern: `LOGSTART \d+ `, Raw: []byte(`LOGSTART 123 log1 LOGSTART 234 log2 LOGSTART 345 foo`), @@ -57,7 +55,7 @@ func TestLineStartSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "TwoLogsLineStart", Pattern: `^LOGSTART \d+ `, Raw: []byte("LOGSTART 123 LOGSTART 345 log1\nLOGSTART 234 log2\nLOGSTART 345 foo"), @@ -69,7 +67,7 @@ func TestLineStartSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "NoMatches", Pattern: `LOGSTART \d+ `, Raw: []byte(`file that has no matches in it`), @@ -77,7 +75,7 @@ func TestLineStartSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "PrecedingNonMatches", Pattern: `LOGSTART \d+ `, Raw: []byte(`part that doesn't match LOGSTART 123 part that matchesLOGSTART 123 foo`), @@ -89,44 +87,44 @@ func TestLineStartSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "HugeLog100", Pattern: `LOGSTART \d+ `, Raw: func() []byte { newRaw := []byte(`LOGSTART 123 `) - newRaw = append(newRaw, internal.GeneratedByteSliceOfLength(100)...) + newRaw = append(newRaw, GeneratedByteSliceOfLength(100)...) newRaw = append(newRaw, []byte(`LOGSTART 234 endlog`)...) return newRaw }(), ExpectedTokenized: []string{ - `LOGSTART 123 ` + string(internal.GeneratedByteSliceOfLength(100)), + `LOGSTART 123 ` + string(GeneratedByteSliceOfLength(100)), }, }, nil, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "HugeLog10000", Pattern: `LOGSTART \d+ `, Raw: func() []byte { newRaw := []byte(`LOGSTART 123 `) - newRaw = append(newRaw, internal.GeneratedByteSliceOfLength(10000)...) + newRaw = append(newRaw, GeneratedByteSliceOfLength(10000)...) newRaw = append(newRaw, []byte(`LOGSTART 234 endlog`)...) return newRaw }(), ExpectedTokenized: []string{ - `LOGSTART 123 ` + string(internal.GeneratedByteSliceOfLength(10000)), + `LOGSTART 123 ` + string(GeneratedByteSliceOfLength(10000)), }, }, nil, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "ErrTooLong", Pattern: `LOGSTART \d+ `, Raw: func() []byte { newRaw := []byte(`LOGSTART 123 `) - newRaw = append(newRaw, internal.GeneratedByteSliceOfLength(1000000)...) + newRaw = append(newRaw, GeneratedByteSliceOfLength(1000000)...) newRaw = append(newRaw, []byte(`LOGSTART 234 endlog`)...) return newRaw }(), @@ -135,7 +133,7 @@ func TestLineStartSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "MultipleMultilineLogs", Pattern: `^LOGSTART \d+`, Raw: []byte("LOGSTART 12 log1\t \nLOGPART log1\nLOGPART log1\t \nLOGSTART 17 log2\nLOGPART log2\nanother line\nLOGSTART 43 log5"), @@ -147,7 +145,7 @@ func TestLineStartSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "LogsWithoutFlusher", Pattern: `^LOGSTART \d+`, Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), @@ -155,7 +153,7 @@ func TestLineStartSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "LogsWithFlusher", Pattern: `^LOGSTART \d+`, Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), @@ -171,7 +169,7 @@ func TestLineStartSplitFunc(t *testing.T) { }, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "LogsWithFlusherWithMultipleLogsInBuffer", Pattern: `^LOGSTART \d+`, Raw: []byte("LOGPART log1\nLOGSTART 123\nLOGPART log1\t \n"), @@ -187,7 +185,7 @@ func TestLineStartSplitFunc(t *testing.T) { }, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "LogsWithLongFlusherWithMultipleLogsInBuffer", Pattern: `^LOGSTART \d+`, Raw: []byte("LOGPART log1\nLOGSTART 123\nLOGPART log1\t \n"), @@ -202,7 +200,7 @@ func TestLineStartSplitFunc(t *testing.T) { }, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "LogsWithFlusherWithLogStartingWithWhiteChars", Pattern: `^LOGSTART \d+`, Raw: []byte("\nLOGSTART 333"), @@ -252,7 +250,7 @@ func TestLineStartSplitFunc(t *testing.T) { func TestLineEndSplitFunc(t *testing.T) { testCases := []MultiLineTokenizerTestCase{ { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "OneLogSimple", Pattern: `LOGEND \d+`, Raw: []byte(`my log LOGEND 123`), @@ -263,7 +261,7 @@ func TestLineEndSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "TwoLogsSimple", Pattern: `LOGEND \d+`, Raw: []byte(`log1 LOGEND 123log2 LOGEND 234`), @@ -275,7 +273,7 @@ func TestLineEndSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "TwoLogsLineEndSimple", Pattern: `LOGEND$`, Raw: []byte("log1 LOGEND LOGEND\nlog2 LOGEND\n"), @@ -287,7 +285,7 @@ func TestLineEndSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "NoMatches", Pattern: `LOGEND \d+`, Raw: []byte(`file that has no matches in it`), @@ -295,7 +293,7 @@ func TestLineEndSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "NonMatchesAfter", Pattern: `LOGEND \d+`, Raw: []byte(`part that matches LOGEND 123 part that doesn't match`), @@ -306,41 +304,41 @@ func TestLineEndSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "HugeLog100", Pattern: `LOGEND \d`, Raw: func() []byte { - newRaw := internal.GeneratedByteSliceOfLength(100) + newRaw := GeneratedByteSliceOfLength(100) newRaw = append(newRaw, []byte(`LOGEND 1 `)...) return newRaw }(), ExpectedTokenized: []string{ - string(internal.GeneratedByteSliceOfLength(100)) + `LOGEND 1`, + string(GeneratedByteSliceOfLength(100)) + `LOGEND 1`, }, }, nil, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "HugeLog10000", Pattern: `LOGEND \d`, Raw: func() []byte { - newRaw := internal.GeneratedByteSliceOfLength(10000) + newRaw := GeneratedByteSliceOfLength(10000) newRaw = append(newRaw, []byte(`LOGEND 1 `)...) return newRaw }(), ExpectedTokenized: []string{ - string(internal.GeneratedByteSliceOfLength(10000)) + `LOGEND 1`, + string(GeneratedByteSliceOfLength(10000)) + `LOGEND 1`, }, }, nil, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "HugeLog1000000", Pattern: `LOGEND \d`, Raw: func() []byte { - newRaw := internal.GeneratedByteSliceOfLength(1000000) + newRaw := GeneratedByteSliceOfLength(1000000) newRaw = append(newRaw, []byte(`LOGEND 1 `)...) return newRaw }(), @@ -349,7 +347,7 @@ func TestLineEndSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "MultipleMultilineLogs", Pattern: `^LOGEND.*$`, Raw: []byte("LOGSTART 12 log1\t \nLOGPART log1\nLOGEND log1\t \nLOGSTART 17 log2\nLOGPART log2\nLOGEND log2\nLOGSTART 43 log5"), @@ -361,7 +359,7 @@ func TestLineEndSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "LogsWithoutFlusher", Pattern: `^LOGEND.*$`, Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), @@ -369,7 +367,7 @@ func TestLineEndSplitFunc(t *testing.T) { &Flusher{}, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "LogsWithFlusher", Pattern: `^LOGEND.*$`, Raw: []byte("LOGPART log1\nLOGPART log1\t \n"), @@ -385,7 +383,7 @@ func TestLineEndSplitFunc(t *testing.T) { }, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "LogsWithFlusherWithMultipleLogsInBuffer", Pattern: `^LOGEND.*$`, Raw: []byte("LOGPART log1\nLOGEND\nLOGPART log1\t \n"), @@ -402,7 +400,7 @@ func TestLineEndSplitFunc(t *testing.T) { }, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "LogsWithLongFlusherWithMultipleLogsInBuffer", Pattern: `^LOGEND.*$`, Raw: []byte("LOGPART log1\nLOGEND\nLOGPART log1\t \n"), @@ -418,7 +416,7 @@ func TestLineEndSplitFunc(t *testing.T) { }, }, { - internal.TokenizerTestCase{ + TokenizerTestCase{ Name: "LogsWithFlusherWithLogStartingWithWhiteChars", Pattern: `LOGEND \d+$`, Raw: []byte("\nLOGEND 333"), @@ -449,7 +447,7 @@ func TestLineEndSplitFunc(t *testing.T) { func TestNewlineSplitFunc(t *testing.T) { testCases := []MultiLineTokenizerTestCase{ { - internal.TokenizerTestCase{Name: "OneLogSimple", + TokenizerTestCase{Name: "OneLogSimple", Raw: []byte("my log\n"), ExpectedTokenized: []string{ `my log`, @@ -457,7 +455,7 @@ func TestNewlineSplitFunc(t *testing.T) { }, nil, }, { - internal.TokenizerTestCase{Name: "OneLogCarriageReturn", + TokenizerTestCase{Name: "OneLogCarriageReturn", Raw: []byte("my log\r\n"), ExpectedTokenized: []string{ `my log`, @@ -466,7 +464,7 @@ func TestNewlineSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{Name: "TwoLogsSimple", + TokenizerTestCase{Name: "TwoLogsSimple", Raw: []byte("log1\nlog2\n"), ExpectedTokenized: []string{ `log1`, @@ -476,7 +474,7 @@ func TestNewlineSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{Name: "TwoLogsCarriageReturn", + TokenizerTestCase{Name: "TwoLogsCarriageReturn", Raw: []byte("log1\r\nlog2\r\n"), ExpectedTokenized: []string{ `log1`, @@ -486,41 +484,41 @@ func TestNewlineSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{Name: "NoTailingNewline", + TokenizerTestCase{Name: "NoTailingNewline", Raw: []byte(`foo`), }, nil, }, { - internal.TokenizerTestCase{Name: "HugeLog100", + TokenizerTestCase{Name: "HugeLog100", Raw: func() []byte { - newRaw := internal.GeneratedByteSliceOfLength(100) + newRaw := GeneratedByteSliceOfLength(100) newRaw = append(newRaw, '\n') return newRaw }(), ExpectedTokenized: []string{ - string(internal.GeneratedByteSliceOfLength(100)), + string(GeneratedByteSliceOfLength(100)), }, }, nil, }, { - internal.TokenizerTestCase{Name: "HugeLog10000", + TokenizerTestCase{Name: "HugeLog10000", Raw: func() []byte { - newRaw := internal.GeneratedByteSliceOfLength(10000) + newRaw := GeneratedByteSliceOfLength(10000) newRaw = append(newRaw, '\n') return newRaw }(), ExpectedTokenized: []string{ - string(internal.GeneratedByteSliceOfLength(10000)), + string(GeneratedByteSliceOfLength(10000)), }, }, nil, }, { - internal.TokenizerTestCase{Name: "HugeLog1000000", + TokenizerTestCase{Name: "HugeLog1000000", Raw: func() []byte { - newRaw := internal.GeneratedByteSliceOfLength(1000000) + newRaw := GeneratedByteSliceOfLength(1000000) newRaw = append(newRaw, '\n') return newRaw }(), @@ -529,14 +527,14 @@ func TestNewlineSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{Name: "LogsWithoutFlusher", + TokenizerTestCase{Name: "LogsWithoutFlusher", Raw: []byte("LOGPART log1"), }, &Flusher{}, }, { - internal.TokenizerTestCase{Name: "LogsWithFlusher", + TokenizerTestCase{Name: "LogsWithFlusher", Raw: []byte("LOGPART log1"), ExpectedTokenized: []string{ "LOGPART log1", @@ -549,7 +547,7 @@ func TestNewlineSplitFunc(t *testing.T) { }, }, { - internal.TokenizerTestCase{Name: "DefaultFlusherSplits", + TokenizerTestCase{Name: "DefaultFlusherSplits", Raw: []byte("log1\nlog2\n"), ExpectedTokenized: []string{ "log1", @@ -559,7 +557,7 @@ func TestNewlineSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{Name: "LogsWithLogStartingWithWhiteChars", + TokenizerTestCase{Name: "LogsWithLogStartingWithWhiteChars", Raw: []byte("\nLOGEND 333\nAnother one"), ExpectedTokenized: []string{ "", @@ -569,7 +567,7 @@ func TestNewlineSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{Name: "PreserveLeadingWhitespaces", + TokenizerTestCase{Name: "PreserveLeadingWhitespaces", Raw: []byte("\n LOGEND 333 \nAnother one "), ExpectedTokenized: []string{ "", @@ -580,7 +578,7 @@ func TestNewlineSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{Name: "PreserveTrailingWhitespaces", + TokenizerTestCase{Name: "PreserveTrailingWhitespaces", Raw: []byte("\n LOGEND 333 \nAnother one "), ExpectedTokenized: []string{ "", @@ -591,7 +589,7 @@ func TestNewlineSplitFunc(t *testing.T) { nil, }, { - internal.TokenizerTestCase{Name: "PreserveBothLeadingAndTrailingWhitespaces", + TokenizerTestCase{Name: "PreserveBothLeadingAndTrailingWhitespaces", Raw: []byte("\n LOGEND 333 \nAnother one "), ExpectedTokenized: []string{ "", @@ -669,16 +667,16 @@ func TestNoSplitFunc(t *testing.T) { { Name: "HugeLog100", Raw: func() []byte { - return internal.GeneratedByteSliceOfLength(largeLogSize) + return GeneratedByteSliceOfLength(largeLogSize) }(), ExpectedTokenized: [][]byte{ - internal.GeneratedByteSliceOfLength(100), + GeneratedByteSliceOfLength(100), }, }, { Name: "HugeLog300", Raw: func() []byte { - return internal.GeneratedByteSliceOfLength(largeLogSize * 3) + return GeneratedByteSliceOfLength(largeLogSize * 3) }(), ExpectedTokenized: [][]byte{ []byte("abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuv"), @@ -689,7 +687,7 @@ func TestNoSplitFunc(t *testing.T) { { Name: "EOFBeforeMaxLogSize", Raw: func() []byte { - return internal.GeneratedByteSliceOfLength(largeLogSize * 3.5) + return GeneratedByteSliceOfLength(largeLogSize * 3.5) }(), ExpectedTokenized: [][]byte{ []byte("abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuv"), diff --git a/pkg/stanza/tokenize/splitter.go b/pkg/stanza/tokenize/splitter.go new file mode 100644 index 000000000000..8f58402e5389 --- /dev/null +++ b/pkg/stanza/tokenize/splitter.go @@ -0,0 +1,73 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tokenize // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" + +import ( + "bufio" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decoder" +) + +// SplitterConfig consolidates MultilineConfig and FlusherConfig +type SplitterConfig struct { + Encoding string `mapstructure:"encoding,omitempty"` + Flusher FlusherConfig `mapstructure:",squash,omitempty"` + Multiline MultilineConfig `mapstructure:"multiline,omitempty"` + PreserveLeadingWhitespaces bool `mapstructure:"preserve_leading_whitespaces,omitempty"` + PreserveTrailingWhitespaces bool `mapstructure:"preserve_trailing_whitespaces,omitempty"` +} + +// NewSplitterConfig returns default SplitterConfig +func NewSplitterConfig() SplitterConfig { + return SplitterConfig{ + Encoding: "utf-8", + Multiline: NewMultilineConfig(), + Flusher: NewFlusherConfig(), + } +} + +// Build builds Splitter struct +func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (*Splitter, error) { + enc, err := decoder.LookupEncoding(c.Encoding) + if err != nil { + return nil, err + } + + flusher := c.Flusher.Build() + splitFunc, err := c.Multiline.Build(enc, flushAtEOF, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, flusher, maxLogSize) + if err != nil { + return nil, err + } + + return &Splitter{ + Decoder: decoder.New(enc), + Flusher: flusher, + SplitFunc: splitFunc, + }, nil +} + +// Splitter consolidates Flusher and dependent splitFunc +type Splitter struct { + Decoder *decoder.Decoder + SplitFunc bufio.SplitFunc + Flusher *Flusher +} + +// SplitNone doesn't split any of the bytes, it reads in all of the bytes and returns it all at once. This is for when the encoding is nop +func SplitNone(maxLogSize int) bufio.SplitFunc { + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + if len(data) >= maxLogSize { + return maxLogSize, data[:maxLogSize], nil + } + + if !atEOF { + return 0, nil, nil + } + + if len(data) == 0 { + return 0, nil, nil + } + return len(data), data, nil + } +} diff --git a/pkg/stanza/tokenize/util_test.go b/pkg/stanza/tokenize/util_test.go new file mode 100644 index 000000000000..9a357c5c27b7 --- /dev/null +++ b/pkg/stanza/tokenize/util_test.go @@ -0,0 +1,119 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tokenize + +import ( + "bufio" + "io" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// state is going to keep processing state of the TestReader +type state struct { + ReadFrom int + Processed int +} + +// TestReader is a TestReader which keeps state of readed and processed data +type TestReader struct { + State *state + Data []byte +} + +// NewTestReader creates TestReader with empty state +func NewTestReader(data []byte) TestReader { + return TestReader{ + State: &state{ + ReadFrom: 0, + Processed: 0, + }, + Data: data, + } +} + +// Read reads data from TestReader and remebers where reading has been finished +func (r TestReader) Read(p []byte) (n int, err error) { + // return eof if data has been fully readed + if len(r.Data)-r.State.ReadFrom == 0 { + return 0, io.EOF + } + + // iterate over data char by char and write into p + // until p is full or no more data left to read + i := 0 + for ; i < len(r.Data)-r.State.ReadFrom; i++ { + if i == len(p) { + break + } + p[i] = r.Data[r.State.ReadFrom+i] + } + + // update state + r.State.ReadFrom += i + return i, nil +} + +// Reset resets TestReader state (sets last readed position to last processed position) +func (r *TestReader) Reset() { + r.State.ReadFrom = r.State.Processed +} + +func (r *TestReader) SplitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + advance, token, err = splitFunc(data, atEOF) + r.State.Processed += advance + return + } +} + +type TokenizerTestCase struct { + Name string + Pattern string + Raw []byte + ExpectedTokenized []string + ExpectedError error + Sleep time.Duration + AdditionalIterations int + PreserveLeadingWhitespaces bool + PreserveTrailingWhitespaces bool +} + +func (tc TokenizerTestCase) RunFunc(splitFunc bufio.SplitFunc) func(t *testing.T) { + reader := NewTestReader(tc.Raw) + + return func(t *testing.T) { + var tokenized []string + for i := 0; i < 1+tc.AdditionalIterations; i++ { + // sleep before next iterations + if i > 0 { + time.Sleep(tc.Sleep) + } + reader.Reset() + scanner := bufio.NewScanner(reader) + scanner.Split(reader.SplitFunc(splitFunc)) + for { + ok := scanner.Scan() + if !ok { + assert.Equal(t, tc.ExpectedError, scanner.Err()) + break + } + tokenized = append(tokenized, scanner.Text()) + } + } + + assert.Equal(t, tc.ExpectedTokenized, tokenized) + } +} + +func GeneratedByteSliceOfLength(length int) []byte { + chars := []byte(`abcdefghijklmnopqrstuvwxyz`) + newSlice := make([]byte, length) + for i := 0; i < length; i++ { + newSlice[i] = chars[i%len(chars)] + } + return newSlice +} diff --git a/receiver/otlpjsonfilereceiver/file_test.go b/receiver/otlpjsonfilereceiver/file_test.go index 2e9021f1d989..053946ae927b 100644 --- a/receiver/otlpjsonfilereceiver/file_test.go +++ b/receiver/otlpjsonfilereceiver/file_test.go @@ -24,7 +24,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otlpjsonfilereceiver/internal/metadata" ) @@ -124,7 +124,7 @@ func testdataConfigYamlAsMap() *Config { IncludeFileNameResolved: false, IncludeFilePathResolved: false, PollInterval: 200 * time.Millisecond, - Splitter: helper.NewSplitterConfig(), + Splitter: tokenize.NewSplitterConfig(), StartAt: "end", FingerprintSize: 1000, MaxLogSize: 1024 * 1024,