From 0507faca7a14df4296699261a12bc1a3ca608dc0 Mon Sep 17 00:00:00 2001 From: look Date: Wed, 16 Nov 2022 22:16:02 +0800 Subject: [PATCH] [pkg/stanza] Support to Customize bufio.SplitFunc (#16272) * add a new method BuildWithSplitFunc, this method can directly pass in a user-defined splitFunc --- .../stanza-support-customize-splitter.yaml | 10 ++ pkg/stanza/fileconsumer/config.go | 98 ++++++++++++------- pkg/stanza/fileconsumer/config_test.go | 81 +++++++++++++++ pkg/stanza/fileconsumer/splitter_factory.go | 25 +++++ .../fileconsumer/splitter_factory_test.go | 55 +++++++++++ 5 files changed, 236 insertions(+), 33 deletions(-) create mode 100644 .chloggen/stanza-support-customize-splitter.yaml diff --git a/.chloggen/stanza-support-customize-splitter.yaml b/.chloggen/stanza-support-customize-splitter.yaml new file mode 100644 index 000000000000..7f5369003868 --- /dev/null +++ b/.chloggen/stanza-support-customize-splitter.yaml @@ -0,0 +1,10 @@ +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Support to Customize bufio.SplitFunc" + +# One or more tracking issues related to the change +issues: [14593] \ No newline at end of file diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 50447783f3e3..f163cd59a7f0 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -15,6 +15,7 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" import ( + "bufio" "fmt" "time" @@ -62,51 +63,43 @@ type Config struct { // Build will build a file input operator from the supplied configuration func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Manager, error) { - if emit == nil { - return nil, fmt.Errorf("must provide emit function") - } - - if len(c.Include) == 0 { - return nil, fmt.Errorf("required argument `include` is empty") - } - - // Ensure includes can be parsed as globs - for _, include := range c.Include { - _, err := doublestar.PathMatch(include, "matchstring") - if err != nil { - return nil, fmt.Errorf("parse include glob: %w", err) - } + if err := c.validate(); err != nil { + return nil, err } - // Ensure excludes can be parsed as globs - for _, exclude := range c.Exclude { - _, err := doublestar.PathMatch(exclude, "matchstring") - if err != nil { - return nil, fmt.Errorf("parse exclude glob: %w", err) - } + // Ensure that splitter is buildable + factory := newMultilineSplitterFactory(c.Splitter.EncodingConfig, c.Splitter.Flusher, c.Splitter.Multiline) + if _, err := factory.Build(int(c.MaxLogSize)); err != nil { + return nil, err } - if c.MaxLogSize <= 0 { - return nil, fmt.Errorf("`max_log_size` must be positive") - } + return c.buildManager(logger, emit, factory) +} - if c.MaxConcurrentFiles <= 1 { - return nil, fmt.Errorf("`max_concurrent_files` must be greater than 1") +// BuildWithSplitFunc will build a file input operator with customized splitFunc function +func (c Config) BuildWithSplitFunc( + logger *zap.SugaredLogger, emit EmitFunc, splitFunc bufio.SplitFunc) (*Manager, error) { + if err := c.validate(); err != nil { + return nil, err } - if c.FingerprintSize == 0 { - c.FingerprintSize = DefaultFingerprintSize - } else if c.FingerprintSize < MinFingerprintSize { - return nil, fmt.Errorf("`fingerprint_size` must be at least %d bytes", MinFingerprintSize) + if splitFunc == nil { + return nil, fmt.Errorf("must provide split function") } // Ensure that splitter is buildable - factory := newMultilineSplitterFactory(c.Splitter.EncodingConfig, c.Splitter.Flusher, c.Splitter.Multiline) - _, err := factory.Build(int(c.MaxLogSize)) - if err != nil { + factory := newCustomizeSplitterFactory(c.Splitter.Flusher, splitFunc) + if _, err := factory.Build(int(c.MaxLogSize)); err != nil { return nil, err } + return c.buildManager(logger, emit, factory) +} + +func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory splitterFactory) (*Manager, error) { + if emit == nil { + return nil, fmt.Errorf("must provide emit function") + } var startAtBeginning bool switch c.StartAt { case "beginning": @@ -116,7 +109,6 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Manager, error default: return nil, fmt.Errorf("invalid start_at location '%s'", c.StartAt) } - return &Manager{ SugaredLogger: logger.With("component", "fileconsumer"), cancel: func() {}, @@ -139,3 +131,43 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Manager, error seenPaths: make(map[string]struct{}, 100), }, nil } + +func (c Config) validate() error { + if len(c.Include) == 0 { + return fmt.Errorf("required argument `include` is empty") + } + + // Ensure includes can be parsed as globs + for _, include := range c.Include { + _, err := doublestar.PathMatch(include, "matchstring") + if err != nil { + return fmt.Errorf("parse include glob: %w", err) + } + } + + // Ensure excludes can be parsed as globs + for _, exclude := range c.Exclude { + _, err := doublestar.PathMatch(exclude, "matchstring") + if err != nil { + return fmt.Errorf("parse exclude glob: %w", err) + } + } + + if c.MaxLogSize <= 0 { + return fmt.Errorf("`max_log_size` must be positive") + } + + if c.MaxConcurrentFiles <= 1 { + return fmt.Errorf("`max_concurrent_files` must be greater than 1") + } + + if c.FingerprintSize < MinFingerprintSize { + return fmt.Errorf("`fingerprint_size` must be at least %d bytes", MinFingerprintSize) + } + + _, err := c.Splitter.EncodingConfig.Build() + if err != nil { + return err + } + return nil +} diff --git a/pkg/stanza/fileconsumer/config_test.go b/pkg/stanza/fileconsumer/config_test.go index 9409f74202fb..fdca2c623356 100644 --- a/pkg/stanza/fileconsumer/config_test.go +++ b/pkg/stanza/fileconsumer/config_test.go @@ -504,3 +504,84 @@ func TestBuild(t *testing.T) { }) } } + +func TestBuildWithSplitFunc(t *testing.T) { + t.Parallel() + + basicConfig := func() *Config { + cfg := NewConfig() + cfg.Include = []string{"/var/log/testpath.*"} + cfg.Exclude = []string{"/var/log/testpath.ex*"} + cfg.PollInterval = 10 * time.Millisecond + return cfg + } + + cases := []struct { + name string + modifyBaseConfig func(*Config) + errorRequirement require.ErrorAssertionFunc + validate func(*testing.T, *Manager) + }{ + { + "Basic", + func(f *Config) {}, + require.NoError, + func(t *testing.T, f *Manager) { + require.Equal(t, f.finder.Include, []string{"/var/log/testpath.*"}) + require.Equal(t, f.pollInterval, 10*time.Millisecond) + }, + }, + { + "BadIncludeGlob", + func(f *Config) { + f.Include = []string{"["} + }, + require.Error, + nil, + }, + { + "BadExcludeGlob", + func(f *Config) { + f.Include = []string{"["} + }, + require.Error, + nil, + }, + { + "InvalidEncoding", + func(f *Config) { + f.Splitter.EncodingConfig = helper.EncodingConfig{Encoding: "UTF-3233"} + }, + require.Error, + nil, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + tc := tc + t.Parallel() + cfg := basicConfig() + tc.modifyBaseConfig(cfg) + + nopEmit := func(_ context.Context, _ *FileAttributes, _ []byte) {} + splitNone := func(data []byte, atEOF bool) (advance int, token []byte, err error) { + if !atEOF { + return 0, nil, nil + } + if len(data) == 0 { + return 0, nil, nil + } + return len(data), data, nil + } + + input, err := cfg.BuildWithSplitFunc(testutil.Logger(t), nopEmit, splitNone) + tc.errorRequirement(t, err) + if err != nil { + return + } + + tc.validate(t, input) + }) + } +} diff --git a/pkg/stanza/fileconsumer/splitter_factory.go b/pkg/stanza/fileconsumer/splitter_factory.go index 4691d68beeff..8fb67fce38cc 100644 --- a/pkg/stanza/fileconsumer/splitter_factory.go +++ b/pkg/stanza/fileconsumer/splitter_factory.go @@ -57,3 +57,28 @@ func (factory *multilineSplitterFactory) Build(maxLogSize int) (bufio.SplitFunc, } return splitter, nil } + +type customizeSplitterFactory struct { + Flusher helper.FlusherConfig + Splitter bufio.SplitFunc +} + +var _ splitterFactory = (*customizeSplitterFactory)(nil) + +func newCustomizeSplitterFactory( + flusher helper.FlusherConfig, + splitter bufio.SplitFunc) *customizeSplitterFactory { + return &customizeSplitterFactory{ + Flusher: flusher, + Splitter: splitter, + } +} + +// Build builds Multiline Splitter struct +func (factory *customizeSplitterFactory) Build(maxLogSize int) (bufio.SplitFunc, error) { + flusher := factory.Flusher.Build() + if flusher != nil { + return flusher.SplitFunc(factory.Splitter), nil + } + return factory.Splitter, nil +} diff --git a/pkg/stanza/fileconsumer/splitter_factory_test.go b/pkg/stanza/fileconsumer/splitter_factory_test.go index 70310afd5f8c..31ab3ed84fc6 100644 --- a/pkg/stanza/fileconsumer/splitter_factory_test.go +++ b/pkg/stanza/fileconsumer/splitter_factory_test.go @@ -15,6 +15,7 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" import ( + "bufio" "testing" "github.com/stretchr/testify/assert" @@ -102,3 +103,57 @@ func Test_newMultilineSplitterFactory(t *testing.T) { splitter := newMultilineSplitterFactory(helper.NewEncodingConfig(), helper.NewFlusherConfig(), helper.NewMultilineConfig()) assert.NotNil(t, splitter) } + +func Test_customizeSplitterFactory_Build(t *testing.T) { + type fields struct { + Flusher helper.FlusherConfig + Splitter bufio.SplitFunc + } + type args struct { + maxLogSize int + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "default configuration", + fields: fields{ + Flusher: helper.NewFlusherConfig(), + Splitter: func(data []byte, atEOF bool) (advance int, token []byte, err error) { + return len(data), data, nil + }, + }, + args: args{ + maxLogSize: defaultMaxLogSize, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + factory := &customizeSplitterFactory{ + Flusher: tt.fields.Flusher, + Splitter: tt.fields.Splitter, + } + got, err := factory.Build(tt.args.maxLogSize) + if (err != nil) != tt.wantErr { + t.Errorf("Build() error = %v, wantErr %v", err, tt.wantErr) + return + } + if err == nil { + assert.NotNil(t, got) + } + }) + } +} + +func Test_newCustomizeSplitterFactory(t *testing.T) { + splitter := newCustomizeSplitterFactory(helper.NewFlusherConfig(), + func(data []byte, atEOF bool) (advance int, token []byte, err error) { + return len(data), data, nil + }) + assert.NotNil(t, splitter) +}