From 892d65fced382a99f39a6b50ac64dbba6497aa65 Mon Sep 17 00:00:00 2001 From: lookchen Date: Fri, 30 Sep 2022 19:08:13 +0800 Subject: [PATCH 01/13] Support to Customize bufio.SplitFunc --- pkg/stanza/docs/operators/file_input.md | 15 +++ pkg/stanza/fileconsumer/config.go | 18 ++- pkg/stanza/fileconsumer/config_test.go | 31 +++++ pkg/stanza/operator/helper/multiline.go | 39 ++++++- pkg/stanza/operator/helper/multiline_test.go | 108 ++++++++++++++++++ .../fileconsume-customize-splitfunc.yaml | 11 ++ 6 files changed, 216 insertions(+), 6 deletions(-) create mode 100644 unreleased/fileconsume-customize-splitfunc.yaml diff --git a/pkg/stanza/docs/operators/file_input.md b/pkg/stanza/docs/operators/file_input.md index d9286d349f11..77e7ef02c4bc 100644 --- a/pkg/stanza/docs/operators/file_input.md +++ b/pkg/stanza/docs/operators/file_input.md @@ -43,6 +43,21 @@ use `force_flush_period` option. Also refer to [recombine](../operators/recombine.md) operator for merging events with greater control. +#### Customised Splitter + +In some cases, the log cannot be splitted by line or regular pattern. At this time, the user can customize the method of splitting log entities. + +```go +cfg := fileconsumer.NewConfig(fileconsumer.WithCustomizedSplitter( + func(data []byte, atEOF bool) (advance int, token []byte, err error) { + // split log + ... + return + })) +``` + +We cannot customised split function while setting `multiline` configuration. + ### File rotation When files are rotated and its new names are no longer captured in `include` pattern (i.e. tailing symlink files), it could result in data loss. diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index cfe5bdad5039..91c7e5e279f5 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -15,6 +15,7 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" import ( + "bufio" "fmt" "time" @@ -29,9 +30,18 @@ const ( defaultMaxConcurrentFiles = 1024 ) +type Option func(cfg *Config) + +func WithCustomizedSplitter(splitter bufio.SplitFunc) Option { + return func(cfg *Config) { + cfg.Splitter.Customization.Enabled = true + cfg.Splitter.Customization.Splitter = splitter + } +} + // NewConfig creates a new input config with default values -func NewConfig() *Config { - return &Config{ +func NewConfig(opts ...Option) *Config { + cfg := &Config{ IncludeFileName: true, IncludeFilePath: false, IncludeFileNameResolved: false, @@ -43,6 +53,10 @@ func NewConfig() *Config { MaxLogSize: defaultMaxLogSize, MaxConcurrentFiles: defaultMaxConcurrentFiles, } + for _, op := range opts { + op(cfg) + } + return cfg } // Config is the configuration of a file input operator diff --git a/pkg/stanza/fileconsumer/config_test.go b/pkg/stanza/fileconsumer/config_test.go index 9409f74202fb..27b5e631ad94 100644 --- a/pkg/stanza/fileconsumer/config_test.go +++ b/pkg/stanza/fileconsumer/config_test.go @@ -15,11 +15,13 @@ package fileconsumer import ( + "bufio" "context" "path/filepath" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" @@ -504,3 +506,32 @@ func TestBuild(t *testing.T) { }) } } + +func TestNewConfigWithOptions(t *testing.T) { + type args struct { + opts []Option + } + tests := []struct { + name string + args args + splitter bufio.SplitFunc + }{ + { + name: "customized splitter", + args: args{ + opts: []Option{ + WithCustomizedSplitter(func(data []byte, atEOF bool) (advance int, token []byte, err error) { + return 0, nil, err + }), + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := NewConfig(tt.args.opts...) + assert.NotNil(t, got) + assert.NotNil(t, got.Splitter.Customization.Splitter) + }) + } +} diff --git a/pkg/stanza/operator/helper/multiline.go b/pkg/stanza/operator/helper/multiline.go index 9a4ff58b5095..dece9291ce62 100644 --- a/pkg/stanza/operator/helper/multiline.go +++ b/pkg/stanza/operator/helper/multiline.go @@ -346,11 +346,35 @@ func trimWhitespaces(data []byte) []byte { return token } +// CustomizedConfig is a configuration of Customized Splitter helper +type CustomizedConfig struct { + Enabled bool + Splitter bufio.SplitFunc +} + +// NewCustomizedConfig creates a new CustomizedConfig +func NewCustomizedConfig() CustomizedConfig { + return CustomizedConfig{Enabled: false} +} + +// Build creates Customized SplitFunc +func (c CustomizedConfig) Build(flusher *Flusher) (bufio.SplitFunc, error) { + if c.Splitter == nil { + return nil, fmt.Errorf("customized splitter is nil") + } + var splitFunc bufio.SplitFunc + if flusher != nil { + splitFunc = flusher.SplitFunc(c.Splitter) + } + return splitFunc, nil +} + // SplitterConfig consolidates MultilineConfig and FlusherConfig type SplitterConfig struct { EncodingConfig EncodingConfig `mapstructure:",squash,omitempty"` Multiline MultilineConfig `mapstructure:"multiline,omitempty"` Flusher FlusherConfig `mapstructure:",squash,omitempty"` + Customization CustomizedConfig } // NewSplitterConfig returns default SplitterConfig @@ -359,6 +383,7 @@ func NewSplitterConfig() SplitterConfig { EncodingConfig: NewEncodingConfig(), Multiline: NewMultilineConfig(), Flusher: NewFlusherConfig(), + Customization: NewCustomizedConfig(), } } @@ -368,14 +393,20 @@ func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (*Splitter, erro if err != nil { return nil, err } - + if c.Customization.Enabled && + (c.Multiline.LineStartPattern != "" || c.Multiline.LineEndPattern != "") { + return nil, fmt.Errorf("multiline should not be set when using customized splitter") + } flusher := c.Flusher.Build() - splitFunc, err := c.Multiline.Build(enc.Encoding, flushAtEOF, flusher, maxLogSize) - + var splitFunc bufio.SplitFunc + if c.Customization.Enabled { + splitFunc, err = c.Customization.Build(flusher) + } else { + splitFunc, err = c.Multiline.Build(enc.Encoding, flushAtEOF, flusher, maxLogSize) + } if err != nil { return nil, err } - return &Splitter{ Encoding: enc, Flusher: flusher, diff --git a/pkg/stanza/operator/helper/multiline_test.go b/pkg/stanza/operator/helper/multiline_test.go index 5c3287b39656..6822e55dceaf 100644 --- a/pkg/stanza/operator/helper/multiline_test.go +++ b/pkg/stanza/operator/helper/multiline_test.go @@ -773,3 +773,111 @@ func generatedByteSliceOfLength(length int) []byte { } return newSlice } + +func TestSplitterConfig_Build(t *testing.T) { + type fields struct { + EncodingConfig EncodingConfig + Multiline MultilineConfig + Flusher FlusherConfig + Customization CustomizedConfig + } + type args struct { + flushAtEOF bool + maxLogSize int + } + tests := []struct { + name string + fields fields + args args + want *Splitter + wantErr bool + }{ + { + name: "default configuration", + fields: fields{ + EncodingConfig: NewEncodingConfig(), + Multiline: NewMultilineConfig(), + Flusher: NewFlusherConfig(), + Customization: NewCustomizedConfig(), + }, + args: args{ + flushAtEOF: false, + maxLogSize: 10000, + }, + wantErr: false, + }, + { + name: "Customize splitter configuration", + fields: fields{ + EncodingConfig: NewEncodingConfig(), + Multiline: NewMultilineConfig(), + Flusher: NewFlusherConfig(), + Customization: CustomizedConfig{ + Enabled: true, + Splitter: func(data []byte, atEOF bool) (advance int, token []byte, err error) { + return 0, nil, err + }, + }, + }, + args: args{ + flushAtEOF: false, + maxLogSize: 10000, + }, + wantErr: false, + }, + { + name: "Customize splitter configuration:Splitter is nil ", + fields: fields{ + EncodingConfig: NewEncodingConfig(), + Multiline: NewMultilineConfig(), + Flusher: NewFlusherConfig(), + Customization: CustomizedConfig{ + Enabled: true, + Splitter: nil, + }, + }, + args: args{ + flushAtEOF: false, + maxLogSize: 10000, + }, + wantErr: true, + }, + { + name: "Customize splitter configuration:Splitter is nil ", + fields: fields{ + EncodingConfig: NewEncodingConfig(), + Multiline: MultilineConfig{LineStartPattern: "START"}, + Flusher: NewFlusherConfig(), + Customization: CustomizedConfig{ + Enabled: true, + Splitter: func(data []byte, atEOF bool) (advance int, token []byte, err error) { + return 0, nil, err + }, + }, + }, + args: args{ + flushAtEOF: false, + maxLogSize: 10000, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &SplitterConfig{ + EncodingConfig: tt.fields.EncodingConfig, + Multiline: tt.fields.Multiline, + Flusher: tt.fields.Flusher, + Customization: tt.fields.Customization, + } + got, err := c.Build(tt.args.flushAtEOF, tt.args.maxLogSize) + if tt.wantErr { + assert.NotNil(t, err) + assert.Nil(t, got) + } else { + assert.Nil(t, err) + assert.NotNil(t, got) + } + }) + } +} diff --git a/unreleased/fileconsume-customize-splitfunc.yaml b/unreleased/fileconsume-customize-splitfunc.yaml new file mode 100644 index 000000000000..8022100c25c7 --- /dev/null +++ b/unreleased/fileconsume-customize-splitfunc.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Support to Customize bufio.SplitFunc + +# One or more tracking issues related to the change +issues: [14593] \ No newline at end of file From 14008941eeb7493ff1cfefb12b201b71addeb6f6 Mon Sep 17 00:00:00 2001 From: lookchen Date: Fri, 30 Sep 2022 19:20:33 +0800 Subject: [PATCH 02/13] move changelog to .chloggen/ --- {unreleased => .chloggen}/fileconsume-customize-splitfunc.yaml | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename {unreleased => .chloggen}/fileconsume-customize-splitfunc.yaml (100%) diff --git a/unreleased/fileconsume-customize-splitfunc.yaml b/.chloggen/fileconsume-customize-splitfunc.yaml similarity index 100% rename from unreleased/fileconsume-customize-splitfunc.yaml rename to .chloggen/fileconsume-customize-splitfunc.yaml From cc1a99c61c0820b088eaa087ed08448f8c00660b Mon Sep 17 00:00:00 2001 From: lookchen Date: Fri, 30 Sep 2022 19:44:27 +0800 Subject: [PATCH 03/13] mapstructure tag not present on field --- pkg/stanza/operator/helper/multiline.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/stanza/operator/helper/multiline.go b/pkg/stanza/operator/helper/multiline.go index dece9291ce62..336b8a9190b9 100644 --- a/pkg/stanza/operator/helper/multiline.go +++ b/pkg/stanza/operator/helper/multiline.go @@ -371,10 +371,10 @@ func (c CustomizedConfig) Build(flusher *Flusher) (bufio.SplitFunc, error) { // SplitterConfig consolidates MultilineConfig and FlusherConfig type SplitterConfig struct { - EncodingConfig EncodingConfig `mapstructure:",squash,omitempty"` - Multiline MultilineConfig `mapstructure:"multiline,omitempty"` - Flusher FlusherConfig `mapstructure:",squash,omitempty"` - Customization CustomizedConfig + EncodingConfig EncodingConfig `mapstructure:",squash,omitempty"` + Multiline MultilineConfig `mapstructure:"multiline,omitempty"` + Flusher FlusherConfig `mapstructure:",squash,omitempty"` + Customization CustomizedConfig `mapstructure:",squash,omitempty"` } // NewSplitterConfig returns default SplitterConfig From 4068450b961eb00c73f65dba27f2415a7f376791 Mon Sep 17 00:00:00 2001 From: lookchen Date: Fri, 30 Sep 2022 20:04:34 +0800 Subject: [PATCH 04/13] unexported customization --- pkg/stanza/fileconsumer/config.go | 3 +-- pkg/stanza/fileconsumer/config_test.go | 1 - pkg/stanza/operator/helper/multiline.go | 21 +++++++++++++-------- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 91c7e5e279f5..f2aefbadab57 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -34,8 +34,7 @@ type Option func(cfg *Config) func WithCustomizedSplitter(splitter bufio.SplitFunc) Option { return func(cfg *Config) { - cfg.Splitter.Customization.Enabled = true - cfg.Splitter.Customization.Splitter = splitter + cfg.Splitter.SetCustomizedSplitter(splitter) } } diff --git a/pkg/stanza/fileconsumer/config_test.go b/pkg/stanza/fileconsumer/config_test.go index 27b5e631ad94..0a8ccbd76e3b 100644 --- a/pkg/stanza/fileconsumer/config_test.go +++ b/pkg/stanza/fileconsumer/config_test.go @@ -531,7 +531,6 @@ func TestNewConfigWithOptions(t *testing.T) { t.Run(tt.name, func(t *testing.T) { got := NewConfig(tt.args.opts...) assert.NotNil(t, got) - assert.NotNil(t, got.Splitter.Customization.Splitter) }) } } diff --git a/pkg/stanza/operator/helper/multiline.go b/pkg/stanza/operator/helper/multiline.go index 336b8a9190b9..e068574ed93e 100644 --- a/pkg/stanza/operator/helper/multiline.go +++ b/pkg/stanza/operator/helper/multiline.go @@ -371,10 +371,10 @@ func (c CustomizedConfig) Build(flusher *Flusher) (bufio.SplitFunc, error) { // SplitterConfig consolidates MultilineConfig and FlusherConfig type SplitterConfig struct { - EncodingConfig EncodingConfig `mapstructure:",squash,omitempty"` - Multiline MultilineConfig `mapstructure:"multiline,omitempty"` - Flusher FlusherConfig `mapstructure:",squash,omitempty"` - Customization CustomizedConfig `mapstructure:",squash,omitempty"` + EncodingConfig EncodingConfig `mapstructure:",squash,omitempty"` + Multiline MultilineConfig `mapstructure:"multiline,omitempty"` + Flusher FlusherConfig `mapstructure:",squash,omitempty"` + customization CustomizedConfig } // NewSplitterConfig returns default SplitterConfig @@ -383,7 +383,7 @@ func NewSplitterConfig() SplitterConfig { EncodingConfig: NewEncodingConfig(), Multiline: NewMultilineConfig(), Flusher: NewFlusherConfig(), - Customization: NewCustomizedConfig(), + customization: NewCustomizedConfig(), } } @@ -393,14 +393,14 @@ func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (*Splitter, erro if err != nil { return nil, err } - if c.Customization.Enabled && + if c.customization.Enabled && (c.Multiline.LineStartPattern != "" || c.Multiline.LineEndPattern != "") { return nil, fmt.Errorf("multiline should not be set when using customized splitter") } flusher := c.Flusher.Build() var splitFunc bufio.SplitFunc - if c.Customization.Enabled { - splitFunc, err = c.Customization.Build(flusher) + if c.customization.Enabled { + splitFunc, err = c.customization.Build(flusher) } else { splitFunc, err = c.Multiline.Build(enc.Encoding, flushAtEOF, flusher, maxLogSize) } @@ -414,6 +414,11 @@ func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (*Splitter, erro }, nil } +func (c *SplitterConfig) SetCustomizedSplitter(splitter bufio.SplitFunc) { + c.customization.Splitter = splitter + c.customization.Enabled = true +} + // Splitter consolidates Flusher and dependent splitFunc type Splitter struct { Encoding Encoding From 21323e0218d8da03c29eed5b34a3a4c9d2e53982 Mon Sep 17 00:00:00 2001 From: lookchen Date: Fri, 30 Sep 2022 20:11:35 +0800 Subject: [PATCH 05/13] update test --- pkg/stanza/operator/helper/multiline_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/stanza/operator/helper/multiline_test.go b/pkg/stanza/operator/helper/multiline_test.go index 6822e55dceaf..dd41d5346c70 100644 --- a/pkg/stanza/operator/helper/multiline_test.go +++ b/pkg/stanza/operator/helper/multiline_test.go @@ -868,7 +868,7 @@ func TestSplitterConfig_Build(t *testing.T) { EncodingConfig: tt.fields.EncodingConfig, Multiline: tt.fields.Multiline, Flusher: tt.fields.Flusher, - Customization: tt.fields.Customization, + customization: tt.fields.Customization, } got, err := c.Build(tt.args.flushAtEOF, tt.args.maxLogSize) if tt.wantErr { From d292de8d94e62d36fa8ce3645a1fa22fd501945e Mon Sep 17 00:00:00 2001 From: lookchen Date: Sat, 1 Oct 2022 22:57:38 +0800 Subject: [PATCH 06/13] remove all usage of Multiline from the fileconsumer package --- pkg/stanza/fileconsumer/config.go | 41 ++-- pkg/stanza/fileconsumer/file.go | 12 ++ pkg/stanza/fileconsumer/reader_factory.go | 8 +- pkg/stanza/fileconsumer/splitter_factory.go | 30 +++ pkg/stanza/operator/helper/flusher.go | 111 ++++++++++ pkg/stanza/operator/helper/multiline.go | 190 ------------------ pkg/stanza/operator/helper/splitter.go | 44 ++++ pkg/stanza/operator/input/file/config.go | 37 +++- pkg/stanza/operator/input/file/config_test.go | 79 ++++---- pkg/stanza/operator/input/file/file_test.go | 10 +- 10 files changed, 299 insertions(+), 263 deletions(-) create mode 100644 pkg/stanza/fileconsumer/splitter_factory.go create mode 100644 pkg/stanza/operator/helper/flusher.go create mode 100644 pkg/stanza/operator/helper/splitter.go diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index f2aefbadab57..1c081afb21e8 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -15,7 +15,6 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" import ( - "bufio" "fmt" "time" @@ -30,30 +29,20 @@ const ( defaultMaxConcurrentFiles = 1024 ) -type Option func(cfg *Config) - -func WithCustomizedSplitter(splitter bufio.SplitFunc) Option { - return func(cfg *Config) { - cfg.Splitter.SetCustomizedSplitter(splitter) - } -} - // NewConfig creates a new input config with default values -func NewConfig(opts ...Option) *Config { +func NewConfig() *Config { cfg := &Config{ IncludeFileName: true, IncludeFilePath: false, IncludeFileNameResolved: false, IncludeFilePathResolved: false, PollInterval: 200 * time.Millisecond, - Splitter: helper.NewSplitterConfig(), StartAt: "end", FingerprintSize: DefaultFingerprintSize, MaxLogSize: defaultMaxLogSize, MaxConcurrentFiles: defaultMaxConcurrentFiles, - } - for _, op := range opts { - op(cfg) + EncodingConfig: helper.NewEncodingConfig(), + Flusher: helper.NewFlusherConfig(), } return cfg } @@ -70,11 +59,12 @@ type Config struct { FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty"` MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"` MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty"` - Splitter helper.SplitterConfig `mapstructure:",squash,omitempty"` + EncodingConfig helper.EncodingConfig `mapstructure:",squash,omitempty"` + Flusher helper.FlusherConfig `mapstructure:",squash,omitempty"` } // Build will build a file input operator from the supplied configuration -func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Manager, error) { +func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc, opts ...Option) (*Manager, error) { if emit == nil { return nil, fmt.Errorf("must provide emit function") } @@ -113,8 +103,7 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Manager, error return nil, fmt.Errorf("`fingerprint_size` must be at least %d bytes", MinFingerprintSize) } - // Ensure that splitter is buildable - _, err := c.Splitter.Build(false, int(c.MaxLogSize)) + _, err := c.EncodingConfig.Build() if err != nil { return nil, err } @@ -129,7 +118,7 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Manager, error return nil, fmt.Errorf("invalid start_at location '%s'", c.StartAt) } - return &Manager{ + m := &Manager{ SugaredLogger: logger.With("component", "fileconsumer"), cancel: func() {}, readerFactory: readerFactory{ @@ -139,8 +128,12 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Manager, error maxLogSize: int(c.MaxLogSize), emit: emit, }, - fromBeginning: startAtBeginning, - splitterConfig: c.Splitter, + fromBeginning: startAtBeginning, + splitterFactory: splitterFactory{ + EncodingConfig: c.EncodingConfig, + Flusher: c.Flusher, + SplitFunc: helper.SplitNone(int(c.MaxLogSize)), + }, }, finder: c.Finder, roller: newRoller(), @@ -148,5 +141,9 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Manager, error maxBatchFiles: c.MaxConcurrentFiles / 2, knownFiles: make([]*Reader, 0, 10), seenPaths: make(map[string]struct{}, 100), - }, nil + } + for _, op := range opts { + op(m) + } + return m, nil } diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index f77f85c8ec4f..749dbb06bba5 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -15,6 +15,7 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" import ( + "bufio" "bytes" "context" "encoding/json" @@ -30,6 +31,17 @@ import ( type EmitFunc func(ctx context.Context, attrs *FileAttributes, token []byte) +type Option func(m *Manager) + +func WithCustomizedSplitter(splitter bufio.SplitFunc) Option { + return func(m *Manager) { + if splitter == nil { + return + } + m.readerFactory.splitterFactory.SplitFunc = splitter + } +} + type Manager struct { *zap.SugaredLogger wg sync.WaitGroup diff --git a/pkg/stanza/fileconsumer/reader_factory.go b/pkg/stanza/fileconsumer/reader_factory.go index 7d48248df9a1..21af609e9ed4 100644 --- a/pkg/stanza/fileconsumer/reader_factory.go +++ b/pkg/stanza/fileconsumer/reader_factory.go @@ -24,9 +24,9 @@ import ( type readerFactory struct { *zap.SugaredLogger - readerConfig *readerConfig - fromBeginning bool - splitterConfig helper.SplitterConfig + readerConfig *readerConfig + fromBeginning bool + splitterFactory splitterFactory } func (f *readerFactory) newReader(file *os.File, fp *Fingerprint) (*Reader, error) { @@ -95,7 +95,7 @@ func (b *readerBuilder) build() (r *Reader, err error) { if b.splitter != nil { r.splitter = b.splitter } else { - r.splitter, err = b.splitterConfig.Build(false, b.readerConfig.maxLogSize) + r.splitter, err = b.splitterFactory.Build() if err != nil { return } diff --git a/pkg/stanza/fileconsumer/splitter_factory.go b/pkg/stanza/fileconsumer/splitter_factory.go new file mode 100644 index 000000000000..4c943e0e1dec --- /dev/null +++ b/pkg/stanza/fileconsumer/splitter_factory.go @@ -0,0 +1,30 @@ +package fileconsumer + +import ( + "bufio" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" +) + +type splitterFactory struct { + EncodingConfig helper.EncodingConfig + Flusher helper.FlusherConfig + SplitFunc bufio.SplitFunc +} + +func (factory *splitterFactory) Build() (*helper.Splitter, error) { + enc, err := factory.EncodingConfig.Build() + if err != nil { + return nil, err + } + flusher := factory.Flusher.Build() + splitFunc := factory.SplitFunc + if flusher != nil { + splitFunc = flusher.SplitFunc(splitFunc) + } + return &helper.Splitter{ + Encoding: enc, + Flusher: flusher, + SplitFunc: splitFunc, + }, nil +} diff --git a/pkg/stanza/operator/helper/flusher.go b/pkg/stanza/operator/helper/flusher.go new file mode 100644 index 000000000000..cdcf4d3a2e76 --- /dev/null +++ b/pkg/stanza/operator/helper/flusher.go @@ -0,0 +1,111 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + +import ( + "bufio" + "time" +) + +// FlusherConfig is a configuration of Flusher helper +type FlusherConfig struct { + Period time.Duration `mapstructure:"force_flush_period"` +} + +// NewFlusherConfig creates a default Flusher config +func NewFlusherConfig() FlusherConfig { + return FlusherConfig{ + // Empty or `0s` means that we will never force flush + Period: time.Millisecond * 500, + } +} + +// Build creates Flusher from configuration +func (c *FlusherConfig) Build() *Flusher { + return &Flusher{ + lastDataChange: time.Now(), + forcePeriod: c.Period, + previousDataLength: 0, + } +} + +// Flusher keeps information about flush state +type Flusher struct { + // forcePeriod defines time from last flush which should pass before setting force to true. + // Never forces if forcePeriod is set to 0 + forcePeriod time.Duration + + // lastDataChange tracks date of last data change (including new data and flushes) + lastDataChange time.Time + + // previousDataLength: + // if previousDataLength = 0 - no new data have been received after flush + // if previousDataLength > 0 - there is data which has not been flushed yet and it doesn't changed since lastDataChange + previousDataLength int +} + +func (f *Flusher) UpdateDataChangeTime(length int) { + // Skip if length is greater than 0 and didn't changed + if length > 0 && length == f.previousDataLength { + return + } + + // update internal properties with new values if data length changed + // because it means that data is flowing and being processed + f.previousDataLength = length + f.lastDataChange = time.Now() +} + +// Flushed reset data length +func (f *Flusher) Flushed() { + f.UpdateDataChangeTime(0) +} + +// ShouldFlush returns true if data should be forcefully flushed +func (f *Flusher) ShouldFlush() bool { + // Returns true if there is f.forcePeriod after f.lastDataChange and data length is greater than 0 + return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0 +} + +func (f *Flusher) SplitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + advance, token, err = splitFunc(data, atEOF) + + // Return as it is in case of error + if err != nil { + return + } + + // Return token + if token != nil { + // Inform flusher that we just flushed + f.Flushed() + return + } + + // If there is no token, force flush eventually + if f.ShouldFlush() { + // Inform flusher that we just flushed + f.Flushed() + token = trimWhitespaces(data) + advance = len(data) + return + } + + // Inform flusher that we didn't flushed + f.UpdateDataChangeTime(len(data)) + return + } +} diff --git a/pkg/stanza/operator/helper/multiline.go b/pkg/stanza/operator/helper/multiline.go index e068574ed93e..4da253708a71 100644 --- a/pkg/stanza/operator/helper/multiline.go +++ b/pkg/stanza/operator/helper/multiline.go @@ -19,102 +19,10 @@ import ( "bytes" "fmt" "regexp" - "time" "golang.org/x/text/encoding" ) -// FlusherConfig is a configuration of Flusher helper -type FlusherConfig struct { - Period time.Duration `mapstructure:"force_flush_period"` -} - -// NewFlusherConfig creates a default Flusher config -func NewFlusherConfig() FlusherConfig { - return FlusherConfig{ - // Empty or `0s` means that we will never force flush - Period: time.Millisecond * 500, - } -} - -// Build creates Flusher from configuration -func (c *FlusherConfig) Build() *Flusher { - return &Flusher{ - lastDataChange: time.Now(), - forcePeriod: c.Period, - previousDataLength: 0, - } -} - -// Flusher keeps information about flush state -type Flusher struct { - // forcePeriod defines time from last flush which should pass before setting force to true. - // Never forces if forcePeriod is set to 0 - forcePeriod time.Duration - - // lastDataChange tracks date of last data change (including new data and flushes) - lastDataChange time.Time - - // previousDataLength: - // if previousDataLength = 0 - no new data have been received after flush - // if previousDataLength > 0 - there is data which has not been flushed yet and it doesn't changed since lastDataChange - previousDataLength int -} - -func (f *Flusher) UpdateDataChangeTime(length int) { - // Skip if length is greater than 0 and didn't changed - if length > 0 && length == f.previousDataLength { - return - } - - // update internal properties with new values if data length changed - // because it means that data is flowing and being processed - f.previousDataLength = length - f.lastDataChange = time.Now() -} - -// Flushed reset data length -func (f *Flusher) Flushed() { - f.UpdateDataChangeTime(0) -} - -// ShouldFlush returns true if data should be forcefully flushed -func (f *Flusher) ShouldFlush() bool { - // Returns true if there is f.forcePeriod after f.lastDataChange and data length is greater than 0 - return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0 -} - -func (f *Flusher) SplitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { - return func(data []byte, atEOF bool) (advance int, token []byte, err error) { - advance, token, err = splitFunc(data, atEOF) - - // Return as it is in case of error - if err != nil { - return - } - - // Return token - if token != nil { - // Inform flusher that we just flushed - f.Flushed() - return - } - - // If there is no token, force flush eventually - if f.ShouldFlush() { - // Inform flusher that we just flushed - f.Flushed() - token = trimWhitespaces(data) - advance = len(data) - return - } - - // Inform flusher that we didn't flushed - f.UpdateDataChangeTime(len(data)) - return - } -} - // Multiline consists of splitFunc and variables needed to perform force flush type Multiline struct { SplitFunc bufio.SplitFunc @@ -240,24 +148,6 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { } } -// SplitNone doesn't split any of the bytes, it reads in all of the bytes and returns it all at once. This is for when the encoding is nop -func SplitNone(maxLogSize int) bufio.SplitFunc { - return func(data []byte, atEOF bool) (advance int, token []byte, err error) { - if len(data) >= maxLogSize { - return maxLogSize, data[:maxLogSize], nil - } - - if !atEOF { - return 0, nil, nil - } - - if len(data) == 0 { - return 0, nil, nil - } - return len(data), data, nil - } -} - // NewLineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that end with a match to the regex pattern provided func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { @@ -345,83 +235,3 @@ func trimWhitespaces(data []byte) []byte { } return token } - -// CustomizedConfig is a configuration of Customized Splitter helper -type CustomizedConfig struct { - Enabled bool - Splitter bufio.SplitFunc -} - -// NewCustomizedConfig creates a new CustomizedConfig -func NewCustomizedConfig() CustomizedConfig { - return CustomizedConfig{Enabled: false} -} - -// Build creates Customized SplitFunc -func (c CustomizedConfig) Build(flusher *Flusher) (bufio.SplitFunc, error) { - if c.Splitter == nil { - return nil, fmt.Errorf("customized splitter is nil") - } - var splitFunc bufio.SplitFunc - if flusher != nil { - splitFunc = flusher.SplitFunc(c.Splitter) - } - return splitFunc, nil -} - -// SplitterConfig consolidates MultilineConfig and FlusherConfig -type SplitterConfig struct { - EncodingConfig EncodingConfig `mapstructure:",squash,omitempty"` - Multiline MultilineConfig `mapstructure:"multiline,omitempty"` - Flusher FlusherConfig `mapstructure:",squash,omitempty"` - customization CustomizedConfig -} - -// NewSplitterConfig returns default SplitterConfig -func NewSplitterConfig() SplitterConfig { - return SplitterConfig{ - EncodingConfig: NewEncodingConfig(), - Multiline: NewMultilineConfig(), - Flusher: NewFlusherConfig(), - customization: NewCustomizedConfig(), - } -} - -// Build builds Splitter struct -func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (*Splitter, error) { - enc, err := c.EncodingConfig.Build() - if err != nil { - return nil, err - } - if c.customization.Enabled && - (c.Multiline.LineStartPattern != "" || c.Multiline.LineEndPattern != "") { - return nil, fmt.Errorf("multiline should not be set when using customized splitter") - } - flusher := c.Flusher.Build() - var splitFunc bufio.SplitFunc - if c.customization.Enabled { - splitFunc, err = c.customization.Build(flusher) - } else { - splitFunc, err = c.Multiline.Build(enc.Encoding, flushAtEOF, flusher, maxLogSize) - } - if err != nil { - return nil, err - } - return &Splitter{ - Encoding: enc, - Flusher: flusher, - SplitFunc: splitFunc, - }, nil -} - -func (c *SplitterConfig) SetCustomizedSplitter(splitter bufio.SplitFunc) { - c.customization.Splitter = splitter - c.customization.Enabled = true -} - -// Splitter consolidates Flusher and dependent splitFunc -type Splitter struct { - Encoding Encoding - SplitFunc bufio.SplitFunc - Flusher *Flusher -} diff --git a/pkg/stanza/operator/helper/splitter.go b/pkg/stanza/operator/helper/splitter.go new file mode 100644 index 000000000000..d854ac4e9dd6 --- /dev/null +++ b/pkg/stanza/operator/helper/splitter.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + +import ( + "bufio" +) + +// Splitter consolidates Flusher and dependent splitFunc +type Splitter struct { + Encoding Encoding + SplitFunc bufio.SplitFunc + Flusher *Flusher +} + +// SplitNone doesn't split any of the bytes, it reads in all of the bytes and returns it all at once. This is for when the encoding is nop +func SplitNone(maxLogSize int) bufio.SplitFunc { + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + if len(data) >= maxLogSize { + return maxLogSize, data[:maxLogSize], nil + } + + if !atEOF { + return 0, nil, nil + } + + if len(data) == 0 { + return 0, nil, nil + } + return len(data), data, nil + } +} diff --git a/pkg/stanza/operator/input/file/config.go b/pkg/stanza/operator/input/file/config.go index 3cbba06f7b5b..75db9f8742d6 100644 --- a/pkg/stanza/operator/input/file/config.go +++ b/pkg/stanza/operator/input/file/config.go @@ -15,6 +15,8 @@ package file // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/file" import ( + "bufio" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" @@ -36,15 +38,17 @@ func NewConfig() *Config { // NewConfigWithID creates a new input config with default values func NewConfigWithID(operatorID string) *Config { return &Config{ - InputConfig: helper.NewInputConfig(operatorID, operatorType), - Config: *fileconsumer.NewConfig(), + InputConfig: helper.NewInputConfig(operatorID, operatorType), + Config: *fileconsumer.NewConfig(), + MultilineConfig: helper.NewMultilineConfig(), } } // Config is the configuration of a file input operator type Config struct { - helper.InputConfig `mapstructure:",squash"` - fileconsumer.Config `mapstructure:",squash"` + helper.InputConfig `mapstructure:",squash"` + fileconsumer.Config `mapstructure:",squash"` + helper.MultilineConfig `mapstructure:"multiline,omitempty"` } // Build will build a file input operator from the supplied configuration @@ -71,7 +75,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { var toBody toBodyFunc = func(token []byte) interface{} { return string(token) } - if helper.IsNop(c.Config.Splitter.EncodingConfig.Encoding) { + if helper.IsNop(c.Config.EncodingConfig.Encoding) { toBody = func(token []byte) interface{} { return token } @@ -82,11 +86,30 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { toBody: toBody, preEmitOptions: preEmitOptions, } - - input.fileConsumer, err = c.Config.Build(logger, input.emit) + var splitter bufio.SplitFunc + if c.LineStartPattern != "" || c.LineEndPattern != "" { + splitter, err = c.buildMultilineSplitter() + if err != nil { + return nil, err + } + } + input.fileConsumer, err = c.Config.Build(logger, input.emit, fileconsumer.WithCustomizedSplitter(splitter)) if err != nil { return nil, err } return input, nil } + +func (c Config) buildMultilineSplitter() (bufio.SplitFunc, error) { + enc, err := c.EncodingConfig.Build() + if err != nil { + return nil, err + } + flusher := c.Flusher.Build() + splitter, err := c.MultilineConfig.Build(enc.Encoding, false, flusher, int(c.MaxLogSize)) + if err != nil { + return nil, err + } + return splitter, nil +} diff --git a/pkg/stanza/operator/input/file/config_test.go b/pkg/stanza/operator/input/file/config_test.go index 5976bcd27efd..b94713f26c22 100644 --- a/pkg/stanza/operator/input/file/config_test.go +++ b/pkg/stanza/operator/input/file/config_test.go @@ -327,9 +327,9 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *Config { cfg := NewConfig() - newSplit := helper.NewSplitterConfig() - newSplit.Multiline.LineStartPattern = "Start" - cfg.Splitter = newSplit + newMultiline := helper.NewMultilineConfig() + newMultiline.LineStartPattern = "Start" + cfg.MultilineConfig = newMultiline return cfg }(), }, @@ -338,9 +338,9 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *Config { cfg := NewConfig() - newSplit := helper.NewSplitterConfig() - newSplit.Multiline.LineStartPattern = "%" - cfg.Splitter = newSplit + newMultiline := helper.NewMultilineConfig() + newMultiline.LineStartPattern = "%" + cfg.MultilineConfig = newMultiline return cfg }(), }, @@ -349,9 +349,9 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *Config { cfg := NewConfig() - newSplit := helper.NewSplitterConfig() - newSplit.Multiline.LineEndPattern = "Start" - cfg.Splitter = newSplit + newMultiline := helper.NewMultilineConfig() + newMultiline.LineEndPattern = "Start" + cfg.MultilineConfig = newMultiline return cfg }(), }, @@ -360,9 +360,9 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *Config { cfg := NewConfig() - newSplit := helper.NewSplitterConfig() - newSplit.Multiline.LineEndPattern = "%" - cfg.Splitter = newSplit + newMultiline := helper.NewMultilineConfig() + newMultiline.LineEndPattern = "%" + cfg.MultilineConfig = newMultiline return cfg }(), }, @@ -425,7 +425,7 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *Config { cfg := NewConfig() - cfg.Splitter.EncodingConfig = helper.EncodingConfig{Encoding: "utf-16le"} + cfg.EncodingConfig = helper.EncodingConfig{Encoding: "utf-16le"} return cfg }(), }, @@ -434,7 +434,7 @@ func TestUnmarshal(t *testing.T) { ExpectErr: false, Expect: func() *Config { cfg := NewConfig() - cfg.Splitter.EncodingConfig = helper.EncodingConfig{Encoding: "UTF-16lE"} + cfg.EncodingConfig = helper.EncodingConfig{Encoding: "UTF-16lE"} return cfg }(), }, @@ -555,9 +555,10 @@ func TestBuild(t *testing.T) { }, { "MultilineConfiguredStartAndEndPatterns", - func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ + func(cfg *Config) { + cfg.EncodingConfig = helper.NewEncodingConfig() + cfg.Flusher = helper.NewFlusherConfig() + cfg.MultilineConfig = helper.MultilineConfig{ LineEndPattern: "Exists", LineStartPattern: "Exists", } @@ -567,9 +568,10 @@ func TestBuild(t *testing.T) { }, { "MultilineConfiguredStartPattern", - func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ + func(cfg *Config) { + cfg.EncodingConfig = helper.NewEncodingConfig() + cfg.Flusher = helper.NewFlusherConfig() + cfg.MultilineConfig = helper.MultilineConfig{ LineStartPattern: "START.*", } }, @@ -578,9 +580,10 @@ func TestBuild(t *testing.T) { }, { "MultilineConfiguredEndPattern", - func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ + func(cfg *Config) { + cfg.EncodingConfig = helper.NewEncodingConfig() + cfg.Flusher = helper.NewFlusherConfig() + cfg.MultilineConfig = helper.MultilineConfig{ LineEndPattern: "END.*", } }, @@ -590,16 +593,17 @@ func TestBuild(t *testing.T) { { "InvalidEncoding", func(f *Config) { - f.Splitter.EncodingConfig = helper.EncodingConfig{Encoding: "UTF-3233"} + f.EncodingConfig = helper.EncodingConfig{Encoding: "UTF-3233"} }, require.Error, nil, }, { "LineStartAndEnd", - func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ + func(cfg *Config) { + cfg.EncodingConfig = helper.NewEncodingConfig() + cfg.Flusher = helper.NewFlusherConfig() + cfg.MultilineConfig = helper.MultilineConfig{ LineStartPattern: ".*", LineEndPattern: ".*", } @@ -609,18 +613,20 @@ func TestBuild(t *testing.T) { }, { "NoLineStartOrEnd", - func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{} + func(cfg *Config) { + cfg.EncodingConfig = helper.NewEncodingConfig() + cfg.Flusher = helper.NewFlusherConfig() + cfg.MultilineConfig = helper.MultilineConfig{} }, require.NoError, func(t *testing.T, f *Input) {}, }, { "InvalidLineStartRegex", - func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ + func(cfg *Config) { + cfg.EncodingConfig = helper.NewEncodingConfig() + cfg.Flusher = helper.NewFlusherConfig() + cfg.MultilineConfig = helper.MultilineConfig{ LineStartPattern: "(", } }, @@ -629,9 +635,10 @@ func TestBuild(t *testing.T) { }, { "InvalidLineEndRegex", - func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ + func(cfg *Config) { + cfg.EncodingConfig = helper.NewEncodingConfig() + cfg.Flusher = helper.NewFlusherConfig() + cfg.MultilineConfig = helper.MultilineConfig{ LineEndPattern: "(", } }, diff --git a/pkg/stanza/operator/input/file/file_test.go b/pkg/stanza/operator/input/file/file_test.go index 53a8cf356138..88d8eab34aec 100644 --- a/pkg/stanza/operator/input/file/file_test.go +++ b/pkg/stanza/operator/input/file/file_test.go @@ -261,7 +261,7 @@ func TestReadUsingNopEncoding(t *testing.T) { t.Run(tc.testName, func(t *testing.T) { operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *Config) { cfg.MaxLogSize = 8 - cfg.Splitter.EncodingConfig.Encoding = "nop" + cfg.EncodingConfig.Encoding = "nop" }, nil) // Create a file, then start temp := openTemp(t, tempDir) @@ -341,7 +341,7 @@ func TestNopEncodingDifferentLogSizes(t *testing.T) { t.Run(tc.testName, func(t *testing.T) { operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *Config) { cfg.MaxLogSize = tc.maxLogSize - cfg.Splitter.EncodingConfig.Encoding = "nop" + cfg.EncodingConfig.Encoding = "nop" }, nil) // Create a file, then start temp := openTemp(t, tempDir) @@ -461,8 +461,10 @@ func TestStartAtEndNewFile(t *testing.T) { func TestNoNewline(t *testing.T) { t.Parallel() operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *Config) { - cfg.Splitter = helper.NewSplitterConfig() - cfg.Splitter.Flusher.Period = time.Nanosecond + cfg.MultilineConfig = helper.NewMultilineConfig() + cfg.EncodingConfig = helper.NewEncodingConfig() + cfg.Flusher = helper.NewFlusherConfig() + cfg.Flusher.Period = time.Nanosecond }, nil) temp := openTemp(t, tempDir) From ebebecc498b9a2f5ffa92bf6a5280d9cc98021fb Mon Sep 17 00:00:00 2001 From: lookchen Date: Sun, 2 Oct 2022 02:03:59 +0800 Subject: [PATCH 07/13] remove all usage of Multiline from the fileconsumer package --- pkg/stanza/fileconsumer/config_test.go | 147 +++++++++----------- pkg/stanza/fileconsumer/file_test.go | 69 ++++----- pkg/stanza/fileconsumer/fingerprint_test.go | 7 +- pkg/stanza/fileconsumer/reader_test.go | 9 ++ pkg/stanza/fileconsumer/rotation_test.go | 19 +-- pkg/stanza/fileconsumer/splitter_factory.go | 16 ++- pkg/stanza/fileconsumer/util_test.go | 24 +++- pkg/stanza/operator/input/file/config.go | 9 +- pkg/stanza/operator/input/file/util_test.go | 2 + 9 files changed, 164 insertions(+), 138 deletions(-) diff --git a/pkg/stanza/fileconsumer/config_test.go b/pkg/stanza/fileconsumer/config_test.go index 0a8ccbd76e3b..0bec9412dda6 100644 --- a/pkg/stanza/fileconsumer/config_test.go +++ b/pkg/stanza/fileconsumer/config_test.go @@ -15,13 +15,11 @@ package fileconsumer import ( - "bufio" "context" "path/filepath" "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" @@ -254,40 +252,36 @@ func TestUnmarshal(t *testing.T) { Name: "multiline_line_start_string", Expect: func() *mockOperatorConfig { cfg := NewConfig() - newSplit := helper.NewSplitterConfig() - newSplit.Multiline.LineStartPattern = "Start" - cfg.Splitter = newSplit - return newMockOperatorConfig(cfg) + newMultiline := helper.NewMultilineConfig() + newMultiline.LineStartPattern = "Start" + return newMockOperatorConfigWithMultiline(cfg, newMultiline) }(), }, { Name: "multiline_line_start_special", Expect: func() *mockOperatorConfig { cfg := NewConfig() - newSplit := helper.NewSplitterConfig() - newSplit.Multiline.LineStartPattern = "%" - cfg.Splitter = newSplit - return newMockOperatorConfig(cfg) + newMultiline := helper.NewMultilineConfig() + newMultiline.LineStartPattern = "%" + return newMockOperatorConfigWithMultiline(cfg, newMultiline) }(), }, { Name: "multiline_line_end_string", Expect: func() *mockOperatorConfig { cfg := NewConfig() - newSplit := helper.NewSplitterConfig() - newSplit.Multiline.LineEndPattern = "Start" - cfg.Splitter = newSplit - return newMockOperatorConfig(cfg) + newMultiline := helper.NewMultilineConfig() + newMultiline.LineEndPattern = "Start" + return newMockOperatorConfigWithMultiline(cfg, newMultiline) }(), }, { Name: "multiline_line_end_special", Expect: func() *mockOperatorConfig { cfg := NewConfig() - newSplit := helper.NewSplitterConfig() - newSplit.Multiline.LineEndPattern = "%" - cfg.Splitter = newSplit - return newMockOperatorConfig(cfg) + newMultiline := helper.NewMultilineConfig() + newMultiline.LineEndPattern = "%" + return newMockOperatorConfigWithMultiline(cfg, newMultiline) }(), }, { @@ -342,7 +336,7 @@ func TestUnmarshal(t *testing.T) { Name: "encoding_lower", Expect: func() *mockOperatorConfig { cfg := NewConfig() - cfg.Splitter.EncodingConfig = helper.EncodingConfig{Encoding: "utf-16le"} + cfg.EncodingConfig = helper.EncodingConfig{Encoding: "utf-16le"} return newMockOperatorConfig(cfg) }(), }, @@ -350,7 +344,7 @@ func TestUnmarshal(t *testing.T) { Name: "encoding_upper", Expect: func() *mockOperatorConfig { cfg := NewConfig() - cfg.Splitter.EncodingConfig = helper.EncodingConfig{Encoding: "UTF-16lE"} + cfg.EncodingConfig = helper.EncodingConfig{Encoding: "UTF-16lE"} return newMockOperatorConfig(cfg) }(), }, @@ -372,12 +366,14 @@ func TestBuild(t *testing.T) { cases := []struct { name string modifyBaseConfig func(*Config) + MultilineConfig helper.MultilineConfig errorRequirement require.ErrorAssertionFunc validate func(*testing.T, *Manager) }{ { "Basic", func(f *Config) {}, + helper.NewMultilineConfig(), require.NoError, func(t *testing.T, f *Manager) { require.Equal(t, f.finder.Include, []string{"/var/log/testpath.*"}) @@ -389,6 +385,7 @@ func TestBuild(t *testing.T) { func(f *Config) { f.Include = []string{"["} }, + helper.NewMultilineConfig(), require.Error, nil, }, @@ -397,17 +394,19 @@ func TestBuild(t *testing.T) { func(f *Config) { f.Include = []string{"["} }, + helper.NewMultilineConfig(), require.Error, nil, }, { "MultilineConfiguredStartAndEndPatterns", - func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ - LineEndPattern: "Exists", - LineStartPattern: "Exists", - } + func(cfg *Config) { + cfg.EncodingConfig = helper.NewEncodingConfig() + cfg.Flusher = helper.NewFlusherConfig() + }, + helper.MultilineConfig{ + LineEndPattern: "Exists", + LineStartPattern: "Exists", }, require.Error, nil, @@ -415,10 +414,11 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredStartPattern", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ - LineStartPattern: "START.*", - } + f.EncodingConfig = helper.NewEncodingConfig() + f.Flusher = helper.NewFlusherConfig() + }, + helper.MultilineConfig{ + LineStartPattern: "START.*", }, require.NoError, func(t *testing.T, f *Manager) {}, @@ -426,10 +426,11 @@ func TestBuild(t *testing.T) { { "MultilineConfiguredEndPattern", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ - LineEndPattern: "END.*", - } + f.EncodingConfig = helper.NewEncodingConfig() + f.Flusher = helper.NewFlusherConfig() + }, + helper.MultilineConfig{ + LineEndPattern: "END.*", }, require.NoError, func(t *testing.T, f *Manager) {}, @@ -437,19 +438,21 @@ func TestBuild(t *testing.T) { { "InvalidEncoding", func(f *Config) { - f.Splitter.EncodingConfig = helper.EncodingConfig{Encoding: "UTF-3233"} + f.EncodingConfig = helper.EncodingConfig{Encoding: "UTF-3233"} }, + helper.NewMultilineConfig(), require.Error, nil, }, { "LineStartAndEnd", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ - LineStartPattern: ".*", - LineEndPattern: ".*", - } + f.EncodingConfig = helper.NewEncodingConfig() + f.Flusher = helper.NewFlusherConfig() + }, + helper.MultilineConfig{ + LineStartPattern: ".*", + LineEndPattern: ".*", }, require.Error, nil, @@ -457,19 +460,21 @@ func TestBuild(t *testing.T) { { "NoLineStartOrEnd", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{} + f.EncodingConfig = helper.NewEncodingConfig() + f.Flusher = helper.NewFlusherConfig() }, + helper.NewMultilineConfig(), require.NoError, func(t *testing.T, f *Manager) {}, }, { "InvalidLineStartRegex", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ - LineStartPattern: "(", - } + f.EncodingConfig = helper.NewEncodingConfig() + f.Flusher = helper.NewFlusherConfig() + }, + helper.MultilineConfig{ + LineStartPattern: "(", }, require.Error, nil, @@ -477,10 +482,11 @@ func TestBuild(t *testing.T) { { "InvalidLineEndRegex", func(f *Config) { - f.Splitter = helper.NewSplitterConfig() - f.Splitter.Multiline = helper.MultilineConfig{ - LineEndPattern: "(", - } + f.EncodingConfig = helper.NewEncodingConfig() + f.Flusher = helper.NewFlusherConfig() + }, + helper.MultilineConfig{ + LineEndPattern: "(", }, require.Error, nil, @@ -496,7 +502,18 @@ func TestBuild(t *testing.T) { nopEmit := func(_ context.Context, _ *FileAttributes, _ []byte) {} - input, err := cfg.Build(testutil.Logger(t), nopEmit) + enc, err := cfg.EncodingConfig.Build() + if err != nil { + tc.errorRequirement(t, err) + return + } + flusher := cfg.Flusher.Build() + splitter, err := tc.MultilineConfig.Build(enc.Encoding, false, flusher, int(cfg.MaxLogSize)) + if err != nil { + tc.errorRequirement(t, err) + return + } + input, err := cfg.Build(testutil.Logger(t), nopEmit, WithCustomizedSplitter(splitter)) tc.errorRequirement(t, err) if err != nil { return @@ -506,31 +523,3 @@ func TestBuild(t *testing.T) { }) } } - -func TestNewConfigWithOptions(t *testing.T) { - type args struct { - opts []Option - } - tests := []struct { - name string - args args - splitter bufio.SplitFunc - }{ - { - name: "customized splitter", - args: args{ - opts: []Option{ - WithCustomizedSplitter(func(data []byte, atEOF bool) (advance int, token []byte, err error) { - return 0, nil, err - }), - }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := NewConfig(tt.args.opts...) - assert.NotNil(t, got) - }) - } -} diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index c187dd0c69c6..9f8a72a70c2f 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -42,7 +42,7 @@ See this issue for details: https://github.com/census-instrumentation/opencensus tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, _ := buildTestManager(t, cfg) + operator, _ := buildTestManager(t, cfg, helper.NewMultilineConfig()) _ = openTemp(t, tempDir) err := operator.Start(testutil.NewMockPersister("test")) @@ -62,7 +62,7 @@ func TestAddFileFields(t *testing.T) { cfg.StartAt = "beginning" cfg.IncludeFileName = true cfg.IncludeFilePath = true - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) // Create a file, then start temp := openTemp(t, tempDir) @@ -90,7 +90,7 @@ func TestAddFileResolvedFields(t *testing.T) { cfg.IncludeFilePath = true cfg.IncludeFileNameResolved = true cfg.IncludeFilePathResolved = true - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) // Create temp dir with log file dir := t.TempDir() @@ -142,7 +142,7 @@ func TestAddFileResolvedFieldsWithChangeOfSymlinkTarget(t *testing.T) { cfg.IncludeFilePath = true cfg.IncludeFileNameResolved = true cfg.IncludeFilePathResolved = true - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) // Create temp dir with log file dir := t.TempDir() @@ -217,7 +217,7 @@ func TestReadExistingLogs(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) // Create a file, then start temp := openTemp(t, tempDir) @@ -284,8 +284,8 @@ func TestReadUsingNopEncoding(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" cfg.MaxLogSize = 8 - cfg.Splitter.EncodingConfig.Encoding = "nop" - operator, emitCalls := buildTestManager(t, cfg) + cfg.EncodingConfig.Encoding = "nop" + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) // Create a file, then start temp := openTemp(t, tempDir) @@ -368,8 +368,8 @@ func TestNopEncodingDifferentLogSizes(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" cfg.MaxLogSize = tc.maxLogSize - cfg.Splitter.EncodingConfig.Encoding = "nop" - operator, emitCalls := buildTestManager(t, cfg) + cfg.EncodingConfig.Encoding = "nop" + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) // Create a file, then start temp := openTemp(t, tempDir) @@ -394,7 +394,7 @@ func TestReadNewLogs(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) operator.persister = testutil.NewMockPersister("test") // Poll once so we know this isn't a new file @@ -422,7 +422,7 @@ func TestReadExistingAndNewLogs(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) operator.persister = testutil.NewMockPersister("test") // Start with a file with an entry in it, and expect that entry @@ -446,7 +446,7 @@ func TestStartAtEnd(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) operator.persister = testutil.NewMockPersister("test") temp := openTemp(t, tempDir) @@ -471,7 +471,7 @@ func TestStartAtEndNewFile(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) operator.persister = testutil.NewMockPersister("test") operator.poll(context.Background()) @@ -491,9 +491,10 @@ func TestNoNewline(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - cfg.Splitter = helper.NewSplitterConfig() - cfg.Splitter.Flusher.Period = time.Nanosecond - operator, emitCalls := buildTestManager(t, cfg) + cfg.EncodingConfig = helper.NewEncodingConfig() + cfg.Flusher = helper.NewFlusherConfig() + cfg.Flusher.Period = time.Nanosecond + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) temp := openTemp(t, tempDir) writeString(t, temp, "testlog1\ntestlog2") @@ -514,7 +515,7 @@ func TestEmptyLine(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) temp := openTemp(t, tempDir) writeString(t, temp, "testlog1\n\ntestlog2\n") @@ -537,7 +538,7 @@ func TestMultipleEmpty(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) temp := openTemp(t, tempDir) writeString(t, temp, "\n\ntestlog1\n\n\ntestlog2\n") @@ -564,7 +565,7 @@ func TestLeadingEmpty(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) temp := openTemp(t, tempDir) writeString(t, temp, "\ntestlog1\ntestlog2\n") @@ -588,7 +589,7 @@ func TestSplitWrite(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) operator.persister = testutil.NewMockPersister("test") temp := openTemp(t, tempDir) @@ -608,7 +609,7 @@ func TestIgnoreEmptyFiles(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) operator.persister = testutil.NewMockPersister("test") temp := openTemp(t, tempDir) @@ -635,7 +636,7 @@ func TestDecodeBufferIsResized(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) defer func() { @@ -655,7 +656,7 @@ func TestMultiFileSimple(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) temp1 := openTemp(t, tempDir) temp2 := openTemp(t, tempDir) @@ -679,7 +680,7 @@ func TestMultiFileParallel_PreloadedFiles(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) numFiles := 10 numMessages := 100 @@ -720,7 +721,7 @@ func TestMultiFileParallel_LiveFiles(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) numFiles := 10 numMessages := 100 @@ -786,7 +787,7 @@ func TestRestartOffsets(t *testing.T) { duringRestart := tokenWithLength(tc.lineLength) during2ndRun := tokenWithLength(tc.lineLength) - operatorOne, emitCallsOne := buildTestManager(t, cfg) + operatorOne, emitCallsOne := buildTestManager(t, cfg, helper.NewMultilineConfig()) writeString(t, logFile, string(before1stRun)+"\n") require.NoError(t, operatorOne.Start(persister)) if tc.startAt == "beginning" { @@ -800,7 +801,7 @@ func TestRestartOffsets(t *testing.T) { writeString(t, logFile, string(duringRestart)+"\n") - operatorTwo, emitCallsTwo := buildTestManager(t, cfg) + operatorTwo, emitCallsTwo := buildTestManager(t, cfg, helper.NewMultilineConfig()) require.NoError(t, operatorTwo.Start(persister)) waitForToken(t, emitCallsTwo, duringRestart) writeString(t, logFile, string(during2ndRun)+"\n") @@ -816,7 +817,7 @@ func TestManyLogsDelivered(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) count := 1000 expectedTokens := make([]string, 0, count) @@ -859,7 +860,7 @@ func TestFileBatching(t *testing.T) { cfg.StartAt = "beginning" cfg.MaxConcurrentFiles = maxConcurrentFiles emitCalls := make(chan *emitParams, files*linesPerFile) - operator := buildTestManagerWithEmit(t, cfg, emitCalls) + operator := buildTestManagerWithEmit(t, cfg, helper.NewMultilineConfig(), emitCalls) operator.persister = testutil.NewMockPersister("test") core, observedLogs := observer.New(zap.DebugLevel) @@ -937,7 +938,7 @@ func TestFileReader_FingerprintUpdated(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) temp := openTemp(t, tempDir) tempCopy := openFile(t, temp.Name()) @@ -979,7 +980,7 @@ func TestFingerprintGrowsAndStops(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" cfg.FingerprintSize = helper.ByteSize(maxFP) - operator, _ := buildTestManager(t, cfg) + operator, _ := buildTestManager(t, cfg, helper.NewMultilineConfig()) temp := openTemp(t, tempDir) tempCopy := openFile(t, temp.Name()) @@ -1042,7 +1043,7 @@ func TestFingerprintChangeSize(t *testing.T) { cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" cfg.FingerprintSize = helper.ByteSize(maxFP) - operator, _ := buildTestManager(t, cfg) + operator, _ := buildTestManager(t, cfg, helper.NewMultilineConfig()) temp := openTemp(t, tempDir) tempCopy := openFile(t, temp.Name()) @@ -1163,8 +1164,8 @@ func TestEncodings(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - cfg.Splitter.EncodingConfig = helper.EncodingConfig{Encoding: tc.encoding} - operator, emitCalls := buildTestManager(t, cfg) + cfg.EncodingConfig = helper.EncodingConfig{Encoding: tc.encoding} + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) // Populate the file temp := openTemp(t, tempDir) diff --git a/pkg/stanza/fileconsumer/fingerprint_test.go b/pkg/stanza/fileconsumer/fingerprint_test.go index 5ebc7a754b38..3b0d83bd1960 100644 --- a/pkg/stanza/fileconsumer/fingerprint_test.go +++ b/pkg/stanza/fileconsumer/fingerprint_test.go @@ -16,6 +16,7 @@ package fileconsumer import ( "fmt" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "math/rand" "os" "strings" @@ -34,7 +35,7 @@ func TestNewFingerprintDoesNotModifyOffset(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, _ := buildTestManager(t, cfg) + operator, _ := buildTestManager(t, cfg, helper.NewMultilineConfig()) operator.readerFactory.readerConfig.fingerprintSize = len(fingerprint) @@ -130,7 +131,7 @@ func TestNewFingerprint(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, _ := buildTestManager(t, cfg) + operator, _ := buildTestManager(t, cfg, helper.NewMultilineConfig()) operator.readerFactory.readerConfig.fingerprintSize = tc.fingerprintSize @@ -237,7 +238,7 @@ func TestFingerprintStartsWith_FromFile(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, _ := buildTestManager(t, cfg) + operator, _ := buildTestManager(t, cfg, helper.NewMultilineConfig()) operator.readerFactory.readerConfig.fingerprintSize *= 10 diff --git a/pkg/stanza/fileconsumer/reader_test.go b/pkg/stanza/fileconsumer/reader_test.go index 44b0278fba05..4d3bd76c1d34 100644 --- a/pkg/stanza/fileconsumer/reader_test.go +++ b/pkg/stanza/fileconsumer/reader_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" ) @@ -99,6 +100,9 @@ func TestTokenization(t *testing.T) { func testReaderFactory(t *testing.T) (*readerFactory, chan *emitParams) { emitChan := make(chan *emitParams, 100) + encodingConfig := helper.NewEncodingConfig() + enc, _ := encodingConfig.Build() + splitter, _ := helper.NewNewlineSplitFunc(enc.Encoding, false) return &readerFactory{ SugaredLogger: testutil.Logger(t), readerConfig: &readerConfig{ @@ -109,6 +113,11 @@ func testReaderFactory(t *testing.T) (*readerFactory, chan *emitParams) { }, }, fromBeginning: true, + splitterFactory: splitterFactory{ + EncodingConfig: encodingConfig, + Flusher: helper.NewFlusherConfig(), + SplitFunc: splitter, + }, }, emitChan } diff --git a/pkg/stanza/fileconsumer/rotation_test.go b/pkg/stanza/fileconsumer/rotation_test.go index b3d2f2c30990..d51485207920 100644 --- a/pkg/stanza/fileconsumer/rotation_test.go +++ b/pkg/stanza/fileconsumer/rotation_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" ) @@ -45,7 +46,7 @@ func TestMultiFileRotate(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) numFiles := 3 numMessages := 3 @@ -103,7 +104,7 @@ func TestMultiFileRotateSlow(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) getMessage := func(f, k, m int) string { return fmt.Sprintf("file %d-%d, message %d", f, k, m) } fileName := func(f, k int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.rot%d.log", f, k)) } @@ -154,7 +155,7 @@ func TestMultiCopyTruncateSlow(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) getMessage := func(f, k, m int) string { return fmt.Sprintf("file %d-%d, message %d", f, k, m) } fileName := func(f, k int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.rot%d.log", f, k)) } @@ -263,7 +264,7 @@ func (rt rotationTest) run(tc rotationTest, copyTruncate, sequential bool) func( cfg.StartAt = "beginning" cfg.PollInterval = tc.pollInterval emitCalls := make(chan *emitParams, tc.totalLines) - operator := buildTestManagerWithEmit(t, cfg, emitCalls) + operator := buildTestManagerWithEmit(t, cfg, helper.NewMultilineConfig(), emitCalls) logger := getRotatingLogger(t, tempDir, tc.maxLinesPerFile, tc.maxBackupFiles, copyTruncate, sequential) @@ -365,7 +366,7 @@ func TestMoveFile(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) @@ -397,7 +398,7 @@ func TestTrackMovedAwayFiles(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) @@ -438,7 +439,7 @@ func TestTruncateThenWrite(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) @@ -472,7 +473,7 @@ func TestCopyTruncateWriteBoth(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) operator.persister = testutil.NewMockPersister("test") temp1 := openTemp(t, tempDir) @@ -512,7 +513,7 @@ func TestFileMovedWhileOff_BigFiles(t *testing.T) { tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" - operator, emitCalls := buildTestManager(t, cfg) + operator, emitCalls := buildTestManager(t, cfg, helper.NewMultilineConfig()) persister := testutil.NewMockPersister("test") log1 := tokenWithLength(1000) diff --git a/pkg/stanza/fileconsumer/splitter_factory.go b/pkg/stanza/fileconsumer/splitter_factory.go index 4c943e0e1dec..9de81cde3d1f 100644 --- a/pkg/stanza/fileconsumer/splitter_factory.go +++ b/pkg/stanza/fileconsumer/splitter_factory.go @@ -1,4 +1,18 @@ -package fileconsumer +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" import ( "bufio" diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index 9a4255672323..87a9f22326fe 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -50,15 +50,18 @@ type emitParams struct { token []byte } -func buildTestManager(t *testing.T, cfg *Config) (*Manager, chan *emitParams) { +func buildTestManager(t *testing.T, cfg *Config, multiline helper.MultilineConfig) (*Manager, chan *emitParams) { emitChan := make(chan *emitParams, 100) - return buildTestManagerWithEmit(t, cfg, emitChan), emitChan + return buildTestManagerWithEmit(t, cfg, multiline, emitChan), emitChan } -func buildTestManagerWithEmit(t *testing.T, cfg *Config, emitChan chan *emitParams) *Manager { +func buildTestManagerWithEmit(t *testing.T, cfg *Config, multiline helper.MultilineConfig, emitChan chan *emitParams) *Manager { + enc, _ := cfg.EncodingConfig.Build() + flusher := cfg.Flusher.Build() + splitter, _ := multiline.Build(enc.Encoding, false, flusher, int(cfg.MaxLogSize)) input, err := cfg.Build(testutil.Logger(t), func(_ context.Context, attrs *FileAttributes, token []byte) { emitChan <- &emitParams{attrs, token} - }) + }, WithCustomizedSplitter(splitter)) require.NoError(t, err) return input } @@ -184,8 +187,9 @@ func init() { } type mockOperatorConfig struct { - helper.BasicConfig `mapstructure:",squash"` - *Config `mapstructure:",squash"` + helper.BasicConfig `mapstructure:",squash"` + *Config `mapstructure:",squash"` + helper.MultilineConfig `mapstructure:"multiline,omitempty"` } func newMockOperatorConfig(cfg *Config) *mockOperatorConfig { @@ -195,6 +199,14 @@ func newMockOperatorConfig(cfg *Config) *mockOperatorConfig { } } +func newMockOperatorConfigWithMultiline(cfg *Config, multiline helper.MultilineConfig) *mockOperatorConfig { + return &mockOperatorConfig{ + BasicConfig: helper.NewBasicConfig(mockOperatorType, mockOperatorType), + Config: cfg, + MultilineConfig: multiline, + } +} + // This function is impelmented for compatibility with operatortest // but is not meant to be used directly func (h *mockOperatorConfig) Build(*zap.SugaredLogger) (operator.Operator, error) { diff --git a/pkg/stanza/operator/input/file/config.go b/pkg/stanza/operator/input/file/config.go index 75db9f8742d6..ad04e01edfd8 100644 --- a/pkg/stanza/operator/input/file/config.go +++ b/pkg/stanza/operator/input/file/config.go @@ -86,12 +86,9 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { toBody: toBody, preEmitOptions: preEmitOptions, } - var splitter bufio.SplitFunc - if c.LineStartPattern != "" || c.LineEndPattern != "" { - splitter, err = c.buildMultilineSplitter() - if err != nil { - return nil, err - } + splitter, err := c.buildMultilineSplitter() + if err != nil { + return nil, err } input.fileConsumer, err = c.Config.Build(logger, input.emit, fileconsumer.WithCustomizedSplitter(splitter)) if err != nil { diff --git a/pkg/stanza/operator/input/file/util_test.go b/pkg/stanza/operator/input/file/util_test.go index 4f9b34725487..762600713e6d 100644 --- a/pkg/stanza/operator/input/file/util_test.go +++ b/pkg/stanza/operator/input/file/util_test.go @@ -28,6 +28,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" ) @@ -37,6 +38,7 @@ func newDefaultConfig(tempDir string) *Config { cfg.StartAt = "beginning" cfg.Include = []string{fmt.Sprintf("%s/*", tempDir)} cfg.OutputIDs = []string{"fake"} + cfg.MultilineConfig = helper.NewMultilineConfig() return cfg } From f182b1cadb35ff22dcf5273d0bc9912547ce5aa9 Mon Sep 17 00:00:00 2001 From: lookchen Date: Sun, 2 Oct 2022 02:17:30 +0800 Subject: [PATCH 08/13] remove NewSplitterConfig --- receiver/otlpjsonfilereceiver/file_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/receiver/otlpjsonfilereceiver/file_test.go b/receiver/otlpjsonfilereceiver/file_test.go index bbbdf98f2a03..b3c9814fd563 100644 --- a/receiver/otlpjsonfilereceiver/file_test.go +++ b/receiver/otlpjsonfilereceiver/file_test.go @@ -131,11 +131,12 @@ func testdataConfigYamlAsMap() *Config { IncludeFileNameResolved: false, IncludeFilePathResolved: false, PollInterval: 200 * time.Millisecond, - Splitter: helper.NewSplitterConfig(), StartAt: "end", FingerprintSize: 1000, MaxLogSize: 1024 * 1024, MaxConcurrentFiles: 1024, + EncodingConfig: helper.NewEncodingConfig(), + Flusher: helper.NewFlusherConfig(), Finder: fileconsumer.Finder{ Include: []string{"/var/log/*.log"}, Exclude: []string{"/var/log/example.log"}, From 7871d9a832050399829c4ece35b8d85aa70983b0 Mon Sep 17 00:00:00 2001 From: lookchen Date: Sun, 2 Oct 2022 02:38:14 +0800 Subject: [PATCH 09/13] update test --- pkg/stanza/operator/helper/multiline_test.go | 108 ------------------- 1 file changed, 108 deletions(-) diff --git a/pkg/stanza/operator/helper/multiline_test.go b/pkg/stanza/operator/helper/multiline_test.go index dd41d5346c70..5c3287b39656 100644 --- a/pkg/stanza/operator/helper/multiline_test.go +++ b/pkg/stanza/operator/helper/multiline_test.go @@ -773,111 +773,3 @@ func generatedByteSliceOfLength(length int) []byte { } return newSlice } - -func TestSplitterConfig_Build(t *testing.T) { - type fields struct { - EncodingConfig EncodingConfig - Multiline MultilineConfig - Flusher FlusherConfig - Customization CustomizedConfig - } - type args struct { - flushAtEOF bool - maxLogSize int - } - tests := []struct { - name string - fields fields - args args - want *Splitter - wantErr bool - }{ - { - name: "default configuration", - fields: fields{ - EncodingConfig: NewEncodingConfig(), - Multiline: NewMultilineConfig(), - Flusher: NewFlusherConfig(), - Customization: NewCustomizedConfig(), - }, - args: args{ - flushAtEOF: false, - maxLogSize: 10000, - }, - wantErr: false, - }, - { - name: "Customize splitter configuration", - fields: fields{ - EncodingConfig: NewEncodingConfig(), - Multiline: NewMultilineConfig(), - Flusher: NewFlusherConfig(), - Customization: CustomizedConfig{ - Enabled: true, - Splitter: func(data []byte, atEOF bool) (advance int, token []byte, err error) { - return 0, nil, err - }, - }, - }, - args: args{ - flushAtEOF: false, - maxLogSize: 10000, - }, - wantErr: false, - }, - { - name: "Customize splitter configuration:Splitter is nil ", - fields: fields{ - EncodingConfig: NewEncodingConfig(), - Multiline: NewMultilineConfig(), - Flusher: NewFlusherConfig(), - Customization: CustomizedConfig{ - Enabled: true, - Splitter: nil, - }, - }, - args: args{ - flushAtEOF: false, - maxLogSize: 10000, - }, - wantErr: true, - }, - { - name: "Customize splitter configuration:Splitter is nil ", - fields: fields{ - EncodingConfig: NewEncodingConfig(), - Multiline: MultilineConfig{LineStartPattern: "START"}, - Flusher: NewFlusherConfig(), - Customization: CustomizedConfig{ - Enabled: true, - Splitter: func(data []byte, atEOF bool) (advance int, token []byte, err error) { - return 0, nil, err - }, - }, - }, - args: args{ - flushAtEOF: false, - maxLogSize: 10000, - }, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &SplitterConfig{ - EncodingConfig: tt.fields.EncodingConfig, - Multiline: tt.fields.Multiline, - Flusher: tt.fields.Flusher, - customization: tt.fields.Customization, - } - got, err := c.Build(tt.args.flushAtEOF, tt.args.maxLogSize) - if tt.wantErr { - assert.NotNil(t, err) - assert.Nil(t, got) - } else { - assert.Nil(t, err) - assert.NotNil(t, got) - } - }) - } -} From 28f1d50f12c8a50cc4d62ef60cc218660fccc572 Mon Sep 17 00:00:00 2001 From: lookchen Date: Sun, 2 Oct 2022 11:09:23 +0800 Subject: [PATCH 10/13] update test --- pkg/stanza/fileconsumer/fingerprint_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/stanza/fileconsumer/fingerprint_test.go b/pkg/stanza/fileconsumer/fingerprint_test.go index 3b0d83bd1960..3b97eedcc568 100644 --- a/pkg/stanza/fileconsumer/fingerprint_test.go +++ b/pkg/stanza/fileconsumer/fingerprint_test.go @@ -16,13 +16,14 @@ package fileconsumer import ( "fmt" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "math/rand" "os" "strings" "testing" "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" ) func TestNewFingerprintDoesNotModifyOffset(t *testing.T) { From 4b8444e464c26dcc208f9507b9ba25a45a6eb3aa Mon Sep 17 00:00:00 2001 From: lookchen Date: Sun, 2 Oct 2022 12:05:06 +0800 Subject: [PATCH 11/13] use interface --- pkg/stanza/fileconsumer/config.go | 8 ++- pkg/stanza/fileconsumer/config_test.go | 15 +----- pkg/stanza/fileconsumer/file.go | 21 +++++++- pkg/stanza/fileconsumer/reader_factory.go | 2 +- pkg/stanza/fileconsumer/reader_test.go | 9 ++-- pkg/stanza/fileconsumer/splitter_factory.go | 56 ++++++++++++++++++++- pkg/stanza/fileconsumer/util_test.go | 5 +- pkg/stanza/operator/input/file/config.go | 22 +------- 8 files changed, 88 insertions(+), 50 deletions(-) diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 1c081afb21e8..a95e52642dc8 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -129,10 +129,9 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc, opts ...Option) emit: emit, }, fromBeginning: startAtBeginning, - splitterFactory: splitterFactory{ + splitterFactory: &defaultSplitterFactory{ EncodingConfig: c.EncodingConfig, Flusher: c.Flusher, - SplitFunc: helper.SplitNone(int(c.MaxLogSize)), }, }, finder: c.Finder, @@ -145,5 +144,10 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc, opts ...Option) for _, op := range opts { op(m) } + + _, err = m.readerFactory.splitterFactory.Build(int(c.MaxLogSize)) + if err != nil { + return nil, err + } return m, nil } diff --git a/pkg/stanza/fileconsumer/config_test.go b/pkg/stanza/fileconsumer/config_test.go index 0bec9412dda6..c665fc40e97c 100644 --- a/pkg/stanza/fileconsumer/config_test.go +++ b/pkg/stanza/fileconsumer/config_test.go @@ -501,19 +501,8 @@ func TestBuild(t *testing.T) { tc.modifyBaseConfig(cfg) nopEmit := func(_ context.Context, _ *FileAttributes, _ []byte) {} - - enc, err := cfg.EncodingConfig.Build() - if err != nil { - tc.errorRequirement(t, err) - return - } - flusher := cfg.Flusher.Build() - splitter, err := tc.MultilineConfig.Build(enc.Encoding, false, flusher, int(cfg.MaxLogSize)) - if err != nil { - tc.errorRequirement(t, err) - return - } - input, err := cfg.Build(testutil.Logger(t), nopEmit, WithCustomizedSplitter(splitter)) + input, err := cfg.Build(testutil.Logger(t), nopEmit, + WithMultilineSplitter(cfg.EncodingConfig, cfg.Flusher, tc.MultilineConfig)) tc.errorRequirement(t, err) if err != nil { return diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 749dbb06bba5..d8e451c60437 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "os" "sync" "time" @@ -33,12 +34,28 @@ type EmitFunc func(ctx context.Context, attrs *FileAttributes, token []byte) type Option func(m *Manager) -func WithCustomizedSplitter(splitter bufio.SplitFunc) Option { +func WithCustomizedSplitter( + encodingConfig helper.EncodingConfig, flusher helper.FlusherConfig, splitter bufio.SplitFunc) Option { return func(m *Manager) { if splitter == nil { return } - m.readerFactory.splitterFactory.SplitFunc = splitter + m.readerFactory.splitterFactory = &customizeSplitterFactory{ + EncodingConfig: encodingConfig, + Flusher: flusher, + SplitFunc: splitter, + } + } +} + +func WithMultilineSplitter( + encodingConfig helper.EncodingConfig, flusher helper.FlusherConfig, multiline helper.MultilineConfig) Option { + return func(m *Manager) { + m.readerFactory.splitterFactory = &multilineSplitterFactory{ + EncodingConfig: encodingConfig, + Flusher: flusher, + Multiline: multiline, + } } } diff --git a/pkg/stanza/fileconsumer/reader_factory.go b/pkg/stanza/fileconsumer/reader_factory.go index 21af609e9ed4..f07e5c82eec6 100644 --- a/pkg/stanza/fileconsumer/reader_factory.go +++ b/pkg/stanza/fileconsumer/reader_factory.go @@ -95,7 +95,7 @@ func (b *readerBuilder) build() (r *Reader, err error) { if b.splitter != nil { r.splitter = b.splitter } else { - r.splitter, err = b.splitterFactory.Build() + r.splitter, err = b.splitterFactory.Build(r.maxLogSize) if err != nil { return } diff --git a/pkg/stanza/fileconsumer/reader_test.go b/pkg/stanza/fileconsumer/reader_test.go index 4d3bd76c1d34..cd53cee7e303 100644 --- a/pkg/stanza/fileconsumer/reader_test.go +++ b/pkg/stanza/fileconsumer/reader_test.go @@ -100,9 +100,6 @@ func TestTokenization(t *testing.T) { func testReaderFactory(t *testing.T) (*readerFactory, chan *emitParams) { emitChan := make(chan *emitParams, 100) - encodingConfig := helper.NewEncodingConfig() - enc, _ := encodingConfig.Build() - splitter, _ := helper.NewNewlineSplitFunc(enc.Encoding, false) return &readerFactory{ SugaredLogger: testutil.Logger(t), readerConfig: &readerConfig{ @@ -113,10 +110,10 @@ func testReaderFactory(t *testing.T) (*readerFactory, chan *emitParams) { }, }, fromBeginning: true, - splitterFactory: splitterFactory{ - EncodingConfig: encodingConfig, + splitterFactory: &multilineSplitterFactory{ + EncodingConfig: helper.NewEncodingConfig(), Flusher: helper.NewFlusherConfig(), - SplitFunc: splitter, + Multiline: helper.NewMultilineConfig(), }, }, emitChan } diff --git a/pkg/stanza/fileconsumer/splitter_factory.go b/pkg/stanza/fileconsumer/splitter_factory.go index 9de81cde3d1f..0cb9394be149 100644 --- a/pkg/stanza/fileconsumer/splitter_factory.go +++ b/pkg/stanza/fileconsumer/splitter_factory.go @@ -20,13 +20,65 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" ) -type splitterFactory struct { +type splitterFactory interface { + Build(maxLogSize int) (*helper.Splitter, error) +} + +type defaultSplitterFactory struct { + EncodingConfig helper.EncodingConfig + Flusher helper.FlusherConfig +} + +func (factory *defaultSplitterFactory) Build(maxLogSize int) (*helper.Splitter, error) { + enc, err := factory.EncodingConfig.Build() + if err != nil { + return nil, err + } + flusher := factory.Flusher.Build() + splitFunc := helper.SplitNone(maxLogSize) + if flusher != nil { + splitFunc = flusher.SplitFunc(splitFunc) + } + return &helper.Splitter{ + Encoding: enc, + Flusher: flusher, + SplitFunc: splitFunc, + }, nil +} + +type multilineSplitterFactory struct { + EncodingConfig helper.EncodingConfig + Flusher helper.FlusherConfig + Multiline helper.MultilineConfig +} + +func (factory *multilineSplitterFactory) Build(maxLogSize int) (*helper.Splitter, error) { + enc, err := factory.EncodingConfig.Build() + if err != nil { + return nil, err + } + flusher := factory.Flusher.Build() + splitter, err := factory.Multiline.Build(enc.Encoding, false, flusher, maxLogSize) + if err != nil { + return nil, err + } + if flusher != nil { + splitter = flusher.SplitFunc(splitter) + } + return &helper.Splitter{ + Encoding: enc, + Flusher: flusher, + SplitFunc: splitter, + }, nil +} + +type customizeSplitterFactory struct { EncodingConfig helper.EncodingConfig Flusher helper.FlusherConfig SplitFunc bufio.SplitFunc } -func (factory *splitterFactory) Build() (*helper.Splitter, error) { +func (factory *customizeSplitterFactory) Build(maxLogSize int) (*helper.Splitter, error) { enc, err := factory.EncodingConfig.Build() if err != nil { return nil, err diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index 87a9f22326fe..bbe93436ab59 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -56,12 +56,9 @@ func buildTestManager(t *testing.T, cfg *Config, multiline helper.MultilineConfi } func buildTestManagerWithEmit(t *testing.T, cfg *Config, multiline helper.MultilineConfig, emitChan chan *emitParams) *Manager { - enc, _ := cfg.EncodingConfig.Build() - flusher := cfg.Flusher.Build() - splitter, _ := multiline.Build(enc.Encoding, false, flusher, int(cfg.MaxLogSize)) input, err := cfg.Build(testutil.Logger(t), func(_ context.Context, attrs *FileAttributes, token []byte) { emitChan <- &emitParams{attrs, token} - }, WithCustomizedSplitter(splitter)) + }, WithMultilineSplitter(cfg.EncodingConfig, cfg.Flusher, multiline)) require.NoError(t, err) return input } diff --git a/pkg/stanza/operator/input/file/config.go b/pkg/stanza/operator/input/file/config.go index ad04e01edfd8..a983cde8ac18 100644 --- a/pkg/stanza/operator/input/file/config.go +++ b/pkg/stanza/operator/input/file/config.go @@ -15,8 +15,6 @@ package file // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/file" import ( - "bufio" - "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" @@ -86,27 +84,11 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { toBody: toBody, preEmitOptions: preEmitOptions, } - splitter, err := c.buildMultilineSplitter() - if err != nil { - return nil, err - } - input.fileConsumer, err = c.Config.Build(logger, input.emit, fileconsumer.WithCustomizedSplitter(splitter)) + input.fileConsumer, err = c.Config.Build(logger, input.emit, + fileconsumer.WithMultilineSplitter(c.EncodingConfig, c.Flusher, c.MultilineConfig)) if err != nil { return nil, err } return input, nil } - -func (c Config) buildMultilineSplitter() (bufio.SplitFunc, error) { - enc, err := c.EncodingConfig.Build() - if err != nil { - return nil, err - } - flusher := c.Flusher.Build() - splitter, err := c.MultilineConfig.Build(enc.Encoding, false, flusher, int(c.MaxLogSize)) - if err != nil { - return nil, err - } - return splitter, nil -} From f35c28da230e2e9e176fbb7f82300156436a732a Mon Sep 17 00:00:00 2001 From: lookchen Date: Sun, 2 Oct 2022 12:20:34 +0800 Subject: [PATCH 12/13] go imports --- pkg/stanza/fileconsumer/file.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index d8e451c60437..cf64b776a3aa 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -20,7 +20,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "os" "sync" "time" @@ -28,6 +27,7 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" ) type EmitFunc func(ctx context.Context, attrs *FileAttributes, token []byte) From 0ec1ece7c2a634e0fd543785f44759c89daa879e Mon Sep 17 00:00:00 2001 From: lookchen Date: Sun, 2 Oct 2022 12:37:02 +0800 Subject: [PATCH 13/13] Add code comments --- pkg/stanza/fileconsumer/config.go | 2 +- pkg/stanza/fileconsumer/reader_factory.go | 2 +- pkg/stanza/fileconsumer/splitter_factory.go | 3 +++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index a95e52642dc8..509490bc0cbe 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -144,7 +144,7 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc, opts ...Option) for _, op := range opts { op(m) } - + // Ensure that splitter is buildable _, err = m.readerFactory.splitterFactory.Build(int(c.MaxLogSize)) if err != nil { return nil, err diff --git a/pkg/stanza/fileconsumer/reader_factory.go b/pkg/stanza/fileconsumer/reader_factory.go index f07e5c82eec6..4bb8ae104a5c 100644 --- a/pkg/stanza/fileconsumer/reader_factory.go +++ b/pkg/stanza/fileconsumer/reader_factory.go @@ -95,7 +95,7 @@ func (b *readerBuilder) build() (r *Reader, err error) { if b.splitter != nil { r.splitter = b.splitter } else { - r.splitter, err = b.splitterFactory.Build(r.maxLogSize) + r.splitter, err = b.splitterFactory.Build(b.readerConfig.maxLogSize) if err != nil { return } diff --git a/pkg/stanza/fileconsumer/splitter_factory.go b/pkg/stanza/fileconsumer/splitter_factory.go index 0cb9394be149..34f13dbe3c8d 100644 --- a/pkg/stanza/fileconsumer/splitter_factory.go +++ b/pkg/stanza/fileconsumer/splitter_factory.go @@ -29,6 +29,7 @@ type defaultSplitterFactory struct { Flusher helper.FlusherConfig } +// Build return default Splitter func (factory *defaultSplitterFactory) Build(maxLogSize int) (*helper.Splitter, error) { enc, err := factory.EncodingConfig.Build() if err != nil { @@ -52,6 +53,7 @@ type multilineSplitterFactory struct { Multiline helper.MultilineConfig } +// Build builds Multiline Splitter struct func (factory *multilineSplitterFactory) Build(maxLogSize int) (*helper.Splitter, error) { enc, err := factory.EncodingConfig.Build() if err != nil { @@ -78,6 +80,7 @@ type customizeSplitterFactory struct { SplitFunc bufio.SplitFunc } +// Build builds custom Splitter struct func (factory *customizeSplitterFactory) Build(maxLogSize int) (*helper.Splitter, error) { enc, err := factory.EncodingConfig.Build() if err != nil {