Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pkg/stanza] Support to Customize bufio.SplitFunc #14605

11 changes: 11 additions & 0 deletions .chloggen/fileconsume-customize-splitfunc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support to Customize bufio.SplitFunc

# One or more tracking issues related to the change
issues: [14593]
15 changes: 15 additions & 0 deletions pkg/stanza/docs/operators/file_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,21 @@ use `force_flush_period` option.

Also refer to [recombine](../operators/recombine.md) operator for merging events with greater control.

#### Customised Splitter

In some cases, the log cannot be splitted by line or regular pattern. At this time, the user can customize the method of splitting log entities.

```go
cfg := fileconsumer.NewConfig(fileconsumer.WithCustomizedSplitter(
func(data []byte, atEOF bool) (advance int, token []byte, err error) {
// split log
...
return
}))
```

We cannot customised split function while setting `multiline` configuration.

### File rotation

When files are rotated and its new names are no longer captured in `include` pattern (i.e. tailing symlink files), it could result in data loss.
Expand Down
34 changes: 24 additions & 10 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,20 @@ const (

// NewConfig creates a new input config with default values
func NewConfig() *Config {
return &Config{
cfg := &Config{
IncludeFileName: true,
IncludeFilePath: false,
IncludeFileNameResolved: false,
IncludeFilePathResolved: false,
PollInterval: 200 * time.Millisecond,
Splitter: helper.NewSplitterConfig(),
StartAt: "end",
FingerprintSize: DefaultFingerprintSize,
MaxLogSize: defaultMaxLogSize,
MaxConcurrentFiles: defaultMaxConcurrentFiles,
EncodingConfig: helper.NewEncodingConfig(),
Flusher: helper.NewFlusherConfig(),
}
return cfg
}

// Config is the configuration of a file input operator
Expand All @@ -57,11 +59,12 @@ type Config struct {
FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"`
MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty"`
Splitter helper.SplitterConfig `mapstructure:",squash,omitempty"`
EncodingConfig helper.EncodingConfig `mapstructure:",squash,omitempty"`
Flusher helper.FlusherConfig `mapstructure:",squash,omitempty"`
}

// Build will build a file input operator from the supplied configuration
func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Manager, error) {
func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc, opts ...Option) (*Manager, error) {
if emit == nil {
return nil, fmt.Errorf("must provide emit function")
}
Expand Down Expand Up @@ -100,8 +103,7 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Manager, error
return nil, fmt.Errorf("`fingerprint_size` must be at least %d bytes", MinFingerprintSize)
}

// Ensure that splitter is buildable
_, err := c.Splitter.Build(false, int(c.MaxLogSize))
_, err := c.EncodingConfig.Build()
if err != nil {
return nil, err
}
Expand All @@ -116,7 +118,7 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Manager, error
return nil, fmt.Errorf("invalid start_at location '%s'", c.StartAt)
}

return &Manager{
m := &Manager{
SugaredLogger: logger.With("component", "fileconsumer"),
cancel: func() {},
readerFactory: readerFactory{
Expand All @@ -126,14 +128,26 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Manager, error
maxLogSize: int(c.MaxLogSize),
emit: emit,
},
fromBeginning: startAtBeginning,
splitterConfig: c.Splitter,
fromBeginning: startAtBeginning,
splitterFactory: &defaultSplitterFactory{
EncodingConfig: c.EncodingConfig,
Flusher: c.Flusher,
},
},
finder: c.Finder,
roller: newRoller(),
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
knownFiles: make([]*Reader, 0, 10),
seenPaths: make(map[string]struct{}, 100),
}, nil
}
for _, op := range opts {
op(m)
}
// Ensure that splitter is buildable
_, err = m.readerFactory.splitterFactory.Build(int(c.MaxLogSize))
if err != nil {
return nil, err
}
return m, nil
}
108 changes: 58 additions & 50 deletions pkg/stanza/fileconsumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,40 +252,36 @@ func TestUnmarshal(t *testing.T) {
Name: "multiline_line_start_string",
Expect: func() *mockOperatorConfig {
cfg := NewConfig()
newSplit := helper.NewSplitterConfig()
newSplit.Multiline.LineStartPattern = "Start"
cfg.Splitter = newSplit
return newMockOperatorConfig(cfg)
newMultiline := helper.NewMultilineConfig()
newMultiline.LineStartPattern = "Start"
return newMockOperatorConfigWithMultiline(cfg, newMultiline)
}(),
},
{
Name: "multiline_line_start_special",
Expect: func() *mockOperatorConfig {
cfg := NewConfig()
newSplit := helper.NewSplitterConfig()
newSplit.Multiline.LineStartPattern = "%"
cfg.Splitter = newSplit
return newMockOperatorConfig(cfg)
newMultiline := helper.NewMultilineConfig()
newMultiline.LineStartPattern = "%"
return newMockOperatorConfigWithMultiline(cfg, newMultiline)
}(),
},
{
Name: "multiline_line_end_string",
Expect: func() *mockOperatorConfig {
cfg := NewConfig()
newSplit := helper.NewSplitterConfig()
newSplit.Multiline.LineEndPattern = "Start"
cfg.Splitter = newSplit
return newMockOperatorConfig(cfg)
newMultiline := helper.NewMultilineConfig()
newMultiline.LineEndPattern = "Start"
return newMockOperatorConfigWithMultiline(cfg, newMultiline)
}(),
},
{
Name: "multiline_line_end_special",
Expect: func() *mockOperatorConfig {
cfg := NewConfig()
newSplit := helper.NewSplitterConfig()
newSplit.Multiline.LineEndPattern = "%"
cfg.Splitter = newSplit
return newMockOperatorConfig(cfg)
newMultiline := helper.NewMultilineConfig()
newMultiline.LineEndPattern = "%"
return newMockOperatorConfigWithMultiline(cfg, newMultiline)
}(),
},
{
Expand Down Expand Up @@ -340,15 +336,15 @@ func TestUnmarshal(t *testing.T) {
Name: "encoding_lower",
Expect: func() *mockOperatorConfig {
cfg := NewConfig()
cfg.Splitter.EncodingConfig = helper.EncodingConfig{Encoding: "utf-16le"}
cfg.EncodingConfig = helper.EncodingConfig{Encoding: "utf-16le"}
return newMockOperatorConfig(cfg)
}(),
},
{
Name: "encoding_upper",
Expect: func() *mockOperatorConfig {
cfg := NewConfig()
cfg.Splitter.EncodingConfig = helper.EncodingConfig{Encoding: "UTF-16lE"}
cfg.EncodingConfig = helper.EncodingConfig{Encoding: "UTF-16lE"}
return newMockOperatorConfig(cfg)
}(),
},
Expand All @@ -370,12 +366,14 @@ func TestBuild(t *testing.T) {
cases := []struct {
name string
modifyBaseConfig func(*Config)
MultilineConfig helper.MultilineConfig
errorRequirement require.ErrorAssertionFunc
validate func(*testing.T, *Manager)
}{
{
"Basic",
func(f *Config) {},
helper.NewMultilineConfig(),
require.NoError,
func(t *testing.T, f *Manager) {
require.Equal(t, f.finder.Include, []string{"/var/log/testpath.*"})
Expand All @@ -387,6 +385,7 @@ func TestBuild(t *testing.T) {
func(f *Config) {
f.Include = []string{"["}
},
helper.NewMultilineConfig(),
require.Error,
nil,
},
Expand All @@ -395,90 +394,99 @@ func TestBuild(t *testing.T) {
func(f *Config) {
f.Include = []string{"["}
},
helper.NewMultilineConfig(),
require.Error,
nil,
},
{
"MultilineConfiguredStartAndEndPatterns",
func(f *Config) {
f.Splitter = helper.NewSplitterConfig()
f.Splitter.Multiline = helper.MultilineConfig{
LineEndPattern: "Exists",
LineStartPattern: "Exists",
}
func(cfg *Config) {
cfg.EncodingConfig = helper.NewEncodingConfig()
cfg.Flusher = helper.NewFlusherConfig()
},
helper.MultilineConfig{
LineEndPattern: "Exists",
LineStartPattern: "Exists",
},
require.Error,
nil,
},
{
"MultilineConfiguredStartPattern",
func(f *Config) {
f.Splitter = helper.NewSplitterConfig()
f.Splitter.Multiline = helper.MultilineConfig{
LineStartPattern: "START.*",
}
f.EncodingConfig = helper.NewEncodingConfig()
f.Flusher = helper.NewFlusherConfig()
},
helper.MultilineConfig{
LineStartPattern: "START.*",
},
require.NoError,
func(t *testing.T, f *Manager) {},
},
{
"MultilineConfiguredEndPattern",
func(f *Config) {
f.Splitter = helper.NewSplitterConfig()
f.Splitter.Multiline = helper.MultilineConfig{
LineEndPattern: "END.*",
}
f.EncodingConfig = helper.NewEncodingConfig()
f.Flusher = helper.NewFlusherConfig()
},
helper.MultilineConfig{
LineEndPattern: "END.*",
},
require.NoError,
func(t *testing.T, f *Manager) {},
},
{
"InvalidEncoding",
func(f *Config) {
f.Splitter.EncodingConfig = helper.EncodingConfig{Encoding: "UTF-3233"}
f.EncodingConfig = helper.EncodingConfig{Encoding: "UTF-3233"}
},
helper.NewMultilineConfig(),
require.Error,
nil,
},
{
"LineStartAndEnd",
func(f *Config) {
f.Splitter = helper.NewSplitterConfig()
f.Splitter.Multiline = helper.MultilineConfig{
LineStartPattern: ".*",
LineEndPattern: ".*",
}
f.EncodingConfig = helper.NewEncodingConfig()
f.Flusher = helper.NewFlusherConfig()
},
helper.MultilineConfig{
LineStartPattern: ".*",
LineEndPattern: ".*",
},
require.Error,
nil,
},
{
"NoLineStartOrEnd",
func(f *Config) {
f.Splitter = helper.NewSplitterConfig()
f.Splitter.Multiline = helper.MultilineConfig{}
f.EncodingConfig = helper.NewEncodingConfig()
f.Flusher = helper.NewFlusherConfig()
},
helper.NewMultilineConfig(),
require.NoError,
func(t *testing.T, f *Manager) {},
},
{
"InvalidLineStartRegex",
func(f *Config) {
f.Splitter = helper.NewSplitterConfig()
f.Splitter.Multiline = helper.MultilineConfig{
LineStartPattern: "(",
}
f.EncodingConfig = helper.NewEncodingConfig()
f.Flusher = helper.NewFlusherConfig()
},
helper.MultilineConfig{
LineStartPattern: "(",
},
require.Error,
nil,
},
{
"InvalidLineEndRegex",
func(f *Config) {
f.Splitter = helper.NewSplitterConfig()
f.Splitter.Multiline = helper.MultilineConfig{
LineEndPattern: "(",
}
f.EncodingConfig = helper.NewEncodingConfig()
f.Flusher = helper.NewFlusherConfig()
},
helper.MultilineConfig{
LineEndPattern: "(",
},
require.Error,
nil,
Expand All @@ -493,8 +501,8 @@ func TestBuild(t *testing.T) {
tc.modifyBaseConfig(cfg)

nopEmit := func(_ context.Context, _ *FileAttributes, _ []byte) {}

input, err := cfg.Build(testutil.Logger(t), nopEmit)
input, err := cfg.Build(testutil.Logger(t), nopEmit,
WithMultilineSplitter(cfg.EncodingConfig, cfg.Flusher, tc.MultilineConfig))
tc.errorRequirement(t, err)
if err != nil {
return
Expand Down
Loading