Skip to content

Commit

Permalink
[pkg/stanza] Support to Customize bufio.SplitFunc (open-telemetry#16272)
Browse files Browse the repository at this point in the history
* add a new method BuildWithSplitFunc, this method can directly pass in a user-defined splitFunc
  • Loading branch information
atingchen authored and JaredTan95 committed Nov 21, 2022
1 parent 7226fd6 commit f0edfb4
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 33 deletions.
10 changes: 10 additions & 0 deletions .chloggen/stanza-support-customize-splitter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Support to Customize bufio.SplitFunc"

# One or more tracking issues related to the change
issues: [14593]
98 changes: 65 additions & 33 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"

import (
"bufio"
"fmt"
"time"

Expand Down Expand Up @@ -62,51 +63,43 @@ type Config struct {

// Build will build a file input operator from the supplied configuration
func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Manager, error) {
if emit == nil {
return nil, fmt.Errorf("must provide emit function")
}

if len(c.Include) == 0 {
return nil, fmt.Errorf("required argument `include` is empty")
}

// Ensure includes can be parsed as globs
for _, include := range c.Include {
_, err := doublestar.PathMatch(include, "matchstring")
if err != nil {
return nil, fmt.Errorf("parse include glob: %w", err)
}
if err := c.validate(); err != nil {
return nil, err
}

// Ensure excludes can be parsed as globs
for _, exclude := range c.Exclude {
_, err := doublestar.PathMatch(exclude, "matchstring")
if err != nil {
return nil, fmt.Errorf("parse exclude glob: %w", err)
}
// Ensure that splitter is buildable
factory := newMultilineSplitterFactory(c.Splitter.EncodingConfig, c.Splitter.Flusher, c.Splitter.Multiline)
if _, err := factory.Build(int(c.MaxLogSize)); err != nil {
return nil, err
}

if c.MaxLogSize <= 0 {
return nil, fmt.Errorf("`max_log_size` must be positive")
}
return c.buildManager(logger, emit, factory)
}

if c.MaxConcurrentFiles <= 1 {
return nil, fmt.Errorf("`max_concurrent_files` must be greater than 1")
// BuildWithSplitFunc will build a file input operator with customized splitFunc function
func (c Config) BuildWithSplitFunc(
logger *zap.SugaredLogger, emit EmitFunc, splitFunc bufio.SplitFunc) (*Manager, error) {
if err := c.validate(); err != nil {
return nil, err
}

if c.FingerprintSize == 0 {
c.FingerprintSize = DefaultFingerprintSize
} else if c.FingerprintSize < MinFingerprintSize {
return nil, fmt.Errorf("`fingerprint_size` must be at least %d bytes", MinFingerprintSize)
if splitFunc == nil {
return nil, fmt.Errorf("must provide split function")
}

// Ensure that splitter is buildable
factory := newMultilineSplitterFactory(c.Splitter.EncodingConfig, c.Splitter.Flusher, c.Splitter.Multiline)
_, err := factory.Build(int(c.MaxLogSize))
if err != nil {
factory := newCustomizeSplitterFactory(c.Splitter.Flusher, splitFunc)
if _, err := factory.Build(int(c.MaxLogSize)); err != nil {
return nil, err
}

return c.buildManager(logger, emit, factory)
}

func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory splitterFactory) (*Manager, error) {
if emit == nil {
return nil, fmt.Errorf("must provide emit function")
}
var startAtBeginning bool
switch c.StartAt {
case "beginning":
Expand All @@ -116,7 +109,6 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Manager, error
default:
return nil, fmt.Errorf("invalid start_at location '%s'", c.StartAt)
}

return &Manager{
SugaredLogger: logger.With("component", "fileconsumer"),
cancel: func() {},
Expand All @@ -139,3 +131,43 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Manager, error
seenPaths: make(map[string]struct{}, 100),
}, nil
}

func (c Config) validate() error {
if len(c.Include) == 0 {
return fmt.Errorf("required argument `include` is empty")
}

// Ensure includes can be parsed as globs
for _, include := range c.Include {
_, err := doublestar.PathMatch(include, "matchstring")
if err != nil {
return fmt.Errorf("parse include glob: %w", err)
}
}

// Ensure excludes can be parsed as globs
for _, exclude := range c.Exclude {
_, err := doublestar.PathMatch(exclude, "matchstring")
if err != nil {
return fmt.Errorf("parse exclude glob: %w", err)
}
}

if c.MaxLogSize <= 0 {
return fmt.Errorf("`max_log_size` must be positive")
}

if c.MaxConcurrentFiles <= 1 {
return fmt.Errorf("`max_concurrent_files` must be greater than 1")
}

if c.FingerprintSize < MinFingerprintSize {
return fmt.Errorf("`fingerprint_size` must be at least %d bytes", MinFingerprintSize)
}

_, err := c.Splitter.EncodingConfig.Build()
if err != nil {
return err
}
return nil
}
81 changes: 81 additions & 0 deletions pkg/stanza/fileconsumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,3 +504,84 @@ func TestBuild(t *testing.T) {
})
}
}

func TestBuildWithSplitFunc(t *testing.T) {
t.Parallel()

basicConfig := func() *Config {
cfg := NewConfig()
cfg.Include = []string{"/var/log/testpath.*"}
cfg.Exclude = []string{"/var/log/testpath.ex*"}
cfg.PollInterval = 10 * time.Millisecond
return cfg
}

cases := []struct {
name string
modifyBaseConfig func(*Config)
errorRequirement require.ErrorAssertionFunc
validate func(*testing.T, *Manager)
}{
{
"Basic",
func(f *Config) {},
require.NoError,
func(t *testing.T, f *Manager) {
require.Equal(t, f.finder.Include, []string{"/var/log/testpath.*"})
require.Equal(t, f.pollInterval, 10*time.Millisecond)
},
},
{
"BadIncludeGlob",
func(f *Config) {
f.Include = []string{"["}
},
require.Error,
nil,
},
{
"BadExcludeGlob",
func(f *Config) {
f.Include = []string{"["}
},
require.Error,
nil,
},
{
"InvalidEncoding",
func(f *Config) {
f.Splitter.EncodingConfig = helper.EncodingConfig{Encoding: "UTF-3233"}
},
require.Error,
nil,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
tc := tc
t.Parallel()
cfg := basicConfig()
tc.modifyBaseConfig(cfg)

nopEmit := func(_ context.Context, _ *FileAttributes, _ []byte) {}
splitNone := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if !atEOF {
return 0, nil, nil
}
if len(data) == 0 {
return 0, nil, nil
}
return len(data), data, nil
}

input, err := cfg.BuildWithSplitFunc(testutil.Logger(t), nopEmit, splitNone)
tc.errorRequirement(t, err)
if err != nil {
return
}

tc.validate(t, input)
})
}
}
25 changes: 25 additions & 0 deletions pkg/stanza/fileconsumer/splitter_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,28 @@ func (factory *multilineSplitterFactory) Build(maxLogSize int) (bufio.SplitFunc,
}
return splitter, nil
}

type customizeSplitterFactory struct {
Flusher helper.FlusherConfig
Splitter bufio.SplitFunc
}

var _ splitterFactory = (*customizeSplitterFactory)(nil)

func newCustomizeSplitterFactory(
flusher helper.FlusherConfig,
splitter bufio.SplitFunc) *customizeSplitterFactory {
return &customizeSplitterFactory{
Flusher: flusher,
Splitter: splitter,
}
}

// Build builds Multiline Splitter struct
func (factory *customizeSplitterFactory) Build(maxLogSize int) (bufio.SplitFunc, error) {
flusher := factory.Flusher.Build()
if flusher != nil {
return flusher.SplitFunc(factory.Splitter), nil
}
return factory.Splitter, nil
}
55 changes: 55 additions & 0 deletions pkg/stanza/fileconsumer/splitter_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"

import (
"bufio"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -102,3 +103,57 @@ func Test_newMultilineSplitterFactory(t *testing.T) {
splitter := newMultilineSplitterFactory(helper.NewEncodingConfig(), helper.NewFlusherConfig(), helper.NewMultilineConfig())
assert.NotNil(t, splitter)
}

func Test_customizeSplitterFactory_Build(t *testing.T) {
type fields struct {
Flusher helper.FlusherConfig
Splitter bufio.SplitFunc
}
type args struct {
maxLogSize int
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
{
name: "default configuration",
fields: fields{
Flusher: helper.NewFlusherConfig(),
Splitter: func(data []byte, atEOF bool) (advance int, token []byte, err error) {
return len(data), data, nil
},
},
args: args{
maxLogSize: defaultMaxLogSize,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := &customizeSplitterFactory{
Flusher: tt.fields.Flusher,
Splitter: tt.fields.Splitter,
}
got, err := factory.Build(tt.args.maxLogSize)
if (err != nil) != tt.wantErr {
t.Errorf("Build() error = %v, wantErr %v", err, tt.wantErr)
return
}
if err == nil {
assert.NotNil(t, got)
}
})
}
}

func Test_newCustomizeSplitterFactory(t *testing.T) {
splitter := newCustomizeSplitterFactory(helper.NewFlusherConfig(),
func(data []byte, atEOF bool) (advance int, token []byte, err error) {
return len(data), data, nil
})
assert.NotNil(t, splitter)
}

0 comments on commit f0edfb4

Please sign in to comment.