Skip to content

Commit

Permalink
[chore][pkg/stanza] Remove reader config (open-telemetry#29668)
Browse files Browse the repository at this point in the history
`reader.Config` contained many fields which were only relevant to
`reader.Factory`. Once moving these fields to `reader.Factory`, it
became apparent that the `reader.Reader` struct only needs access to a
few specific fields (`fingerprintSize`, `maxLogSize`, `deleteAtEOF`,
`emitFunc`). This also contains some corresponding cleanup of the fields
within the factory struct.
  • Loading branch information
djaglowski authored Dec 6, 2023
1 parent 10a79dc commit d7b1e26
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 116 deletions.
70 changes: 31 additions & 39 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ import (
)

const (
defaultMaxLogSize = 1024 * 1024
defaultMaxConcurrentFiles = 1024
defaultEncoding = "utf-8"
defaultPollInterval = 200 * time.Millisecond
defaultFlushPeriod = 500 * time.Millisecond
)

var allowFileDeletion = featuregate.GlobalRegistry().MustRegister(
Expand All @@ -50,40 +48,36 @@ var AllowHeaderMetadataParsing = featuregate.GlobalRegistry().MustRegister(
// NewConfig creates a new input config with default values
func NewConfig() *Config {
return &Config{
IncludeFileName: true,
IncludeFilePath: false,
IncludeFileNameResolved: false,
IncludeFilePathResolved: false,
PollInterval: defaultPollInterval,
Encoding: defaultEncoding,
StartAt: "end",
FingerprintSize: fingerprint.DefaultSize,
MaxLogSize: defaultMaxLogSize,
MaxConcurrentFiles: defaultMaxConcurrentFiles,
MaxBatches: 0,
FlushPeriod: defaultFlushPeriod,
PollInterval: defaultPollInterval,
MaxConcurrentFiles: defaultMaxConcurrentFiles,
StartAt: "end",
FingerprintSize: fingerprint.DefaultSize,
MaxLogSize: reader.DefaultMaxLogSize,
Encoding: defaultEncoding,
FlushPeriod: reader.DefaultFlushPeriod,
IncludeFileName: true,
}
}

// Config is the configuration of a file input operator
type Config struct {
matcher.Criteria `mapstructure:",squash"`
IncludeFileName bool `mapstructure:"include_file_name,omitempty"`
IncludeFilePath bool `mapstructure:"include_file_path,omitempty"`
IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty"`
IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty"`
PollInterval time.Duration `mapstructure:"poll_interval,omitempty"`
MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty"`
MaxBatches int `mapstructure:"max_batches,omitempty"`
StartAt string `mapstructure:"start_at,omitempty"`
FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"`
MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty"`
MaxBatches int `mapstructure:"max_batches,omitempty"`
DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"`
Encoding string `mapstructure:"encoding,omitempty"`
SplitConfig split.Config `mapstructure:"multiline,omitempty"`
TrimConfig trim.Config `mapstructure:",squash,omitempty"`
Encoding string `mapstructure:"encoding,omitempty"`
FlushPeriod time.Duration `mapstructure:"force_flush_period,omitempty"`
IncludeFileName bool `mapstructure:"include_file_name,omitempty"`
IncludeFilePath bool `mapstructure:"include_file_path,omitempty"`
IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty"`
IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty"`
Header *HeaderConfig `mapstructure:"header,omitempty"`
DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"`
}

type HeaderConfig struct {
Expand Down Expand Up @@ -159,23 +153,21 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli
SugaredLogger: logger.With("component", "fileconsumer"),
cancel: func() {},
readerFactory: reader.Factory{
SugaredLogger: logger.With("component", "fileconsumer"),
Config: &reader.Config{
FingerprintSize: int(c.FingerprintSize),
MaxLogSize: int(c.MaxLogSize),
Emit: emit,
IncludeFileName: c.IncludeFileName,
IncludeFilePath: c.IncludeFilePath,
IncludeFileNameResolved: c.IncludeFileNameResolved,
IncludeFilePathResolved: c.IncludeFilePathResolved,
DeleteAtEOF: c.DeleteAfterRead,
FlushTimeout: c.FlushPeriod,
},
FromBeginning: startAtBeginning,
Encoding: enc,
SplitFunc: splitFunc,
TrimFunc: trimFunc,
HeaderConfig: hCfg,
SugaredLogger: logger.With("component", "fileconsumer"),
FromBeginning: startAtBeginning,
FingerprintSize: int(c.FingerprintSize),
MaxLogSize: int(c.MaxLogSize),
Encoding: enc,
SplitFunc: splitFunc,
TrimFunc: trimFunc,
FlushTimeout: c.FlushPeriod,
EmitFunc: emit,
IncludeFileName: c.IncludeFileName,
IncludeFilePath: c.IncludeFilePath,
IncludeFileNameResolved: c.IncludeFileNameResolved,
IncludeFilePathResolved: c.IncludeFilePathResolved,
HeaderConfig: hCfg,
DeleteAtEOF: c.DeleteAfterRead,
},
fileMatcher: fileMatcher,
pollInterval: c.PollInterval,
Expand Down
15 changes: 8 additions & 7 deletions pkg/stanza/fileconsumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand All @@ -26,17 +27,17 @@ import (

func TestNewConfig(t *testing.T) {
cfg := NewConfig()
assert.Equal(t, 200*time.Millisecond, cfg.PollInterval)
assert.Equal(t, defaultMaxConcurrentFiles, cfg.MaxConcurrentFiles)
assert.Equal(t, "end", cfg.StartAt)
assert.Equal(t, fingerprint.DefaultSize, int(cfg.FingerprintSize))
assert.Equal(t, defaultEncoding, cfg.Encoding)
assert.Equal(t, reader.DefaultMaxLogSize, int(cfg.MaxLogSize))
assert.Equal(t, reader.DefaultFlushPeriod, cfg.FlushPeriod)
assert.True(t, cfg.IncludeFileName)
assert.False(t, cfg.IncludeFilePath)
assert.False(t, cfg.IncludeFileNameResolved)
assert.False(t, cfg.IncludeFilePathResolved)
assert.Equal(t, "end", cfg.StartAt)
assert.Equal(t, 200*time.Millisecond, cfg.PollInterval)
assert.Equal(t, fingerprint.DefaultSize, int(cfg.FingerprintSize))
assert.Equal(t, defaultEncoding, cfg.Encoding)
assert.Equal(t, defaultMaxLogSize, int(cfg.MaxLogSize))
assert.Equal(t, defaultMaxConcurrentFiles, cfg.MaxConcurrentFiles)
assert.Equal(t, defaultFlushPeriod, cfg.FlushPeriod)
}

func TestUnmarshal(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/attrs"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
Expand Down Expand Up @@ -49,7 +50,7 @@ func TestDefaultBehaviors(t *testing.T) {
}()

// Should not emit the pre-existing token, even after flush period
sink.ExpectNoCallsUntil(t, defaultFlushPeriod)
sink.ExpectNoCallsUntil(t, reader.DefaultFlushPeriod)

// Complete token should be emitted quickly
filetest.WriteString(t, temp, " testlog2 \n")
Expand All @@ -60,8 +61,8 @@ func TestDefaultBehaviors(t *testing.T) {

// Incomplete token should not be emitted until after flush period
filetest.WriteString(t, temp, " testlog3 ")
sink.ExpectNoCallsUntil(t, defaultFlushPeriod/2)
time.Sleep(defaultFlushPeriod)
sink.ExpectNoCallsUntil(t, reader.DefaultFlushPeriod/2)
time.Sleep(reader.DefaultFlushPeriod)

token, attributes = sink.NextCall(t)
assert.Equal(t, []byte("testlog3"), token)
Expand Down
63 changes: 40 additions & 23 deletions pkg/stanza/fileconsumer/internal/reader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,80 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/attrs"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

const (
DefaultMaxLogSize = 1024 * 1024
DefaultFlushPeriod = 500 * time.Millisecond
)

type Factory struct {
*zap.SugaredLogger
Config *Config
FromBeginning bool
Encoding encoding.Encoding
HeaderConfig *header.Config
SplitFunc bufio.SplitFunc
TrimFunc trim.Func
HeaderConfig *header.Config
FromBeginning bool
FingerprintSize int
MaxLogSize int
Encoding encoding.Encoding
SplitFunc bufio.SplitFunc
TrimFunc trim.Func
FlushTimeout time.Duration
EmitFunc emit.Callback
IncludeFileName bool
IncludeFilePath bool
IncludeFileNameResolved bool
IncludeFilePathResolved bool
DeleteAtEOF bool
}

func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) {
return fingerprint.New(file, f.Config.FingerprintSize)
return fingerprint.New(file, f.FingerprintSize)
}

func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader, error) {
m := &Metadata{Fingerprint: fp, FileAttributes: map[string]any{}}
if f.Config.FlushTimeout > 0 {
if f.FlushTimeout > 0 {
m.FlushState = &flush.State{LastDataChange: time.Now()}
}
return f.NewReaderFromMetadata(file, m)
}

func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, err error) {
r = &Reader{
Config: f.Config,
Metadata: m,
file: file,
fileName: file.Name(),
logger: f.SugaredLogger.With("path", file.Name()),
decoder: decode.New(f.Encoding),
lineSplitFunc: f.SplitFunc,
Metadata: m,
logger: f.SugaredLogger.With("path", file.Name()),
file: file,
fileName: file.Name(),
fingerprintSize: f.FingerprintSize,
maxLogSize: f.MaxLogSize,
decoder: decode.New(f.Encoding),
lineSplitFunc: f.SplitFunc,
deleteAtEOF: f.DeleteAtEOF,
}

flushFunc := m.FlushState.Func(f.SplitFunc, f.Config.FlushTimeout)
r.lineSplitFunc = trim.WithFunc(trim.ToLength(flushFunc, f.Config.MaxLogSize), f.TrimFunc)
flushFunc := m.FlushState.Func(f.SplitFunc, f.FlushTimeout)
r.lineSplitFunc = trim.WithFunc(trim.ToLength(flushFunc, f.MaxLogSize), f.TrimFunc)

if !f.FromBeginning {
if err = r.offsetToEnd(); err != nil {
return nil, err
}
}

r.emitFunc = f.EmitFunc
if f.HeaderConfig == nil || m.HeaderFinalized {
r.splitFunc = r.lineSplitFunc
r.processFunc = f.Config.Emit
r.processFunc = r.emitFunc
} else {
r.splitFunc = f.HeaderConfig.SplitFunc
r.headerReader, err = header.NewReader(f.SugaredLogger, *f.HeaderConfig)
if err != nil {
return nil, err
}
r.splitFunc = f.HeaderConfig.SplitFunc
r.processFunc = r.headerReader.Process
}

Expand All @@ -91,22 +108,22 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
f.Errorf("resolve abs: %w", err)
}

if f.Config.IncludeFileName {
if f.IncludeFileName {
r.FileAttributes[attrs.LogFileName] = filepath.Base(r.fileName)
} else if r.FileAttributes[attrs.LogFileName] != nil {
delete(r.FileAttributes, attrs.LogFileName)
}
if f.Config.IncludeFilePath {
if f.IncludeFilePath {
r.FileAttributes[attrs.LogFilePath] = r.fileName
} else if r.FileAttributes[attrs.LogFilePath] != nil {
delete(r.FileAttributes, attrs.LogFilePath)
}
if f.Config.IncludeFileNameResolved {
if f.IncludeFileNameResolved {
r.FileAttributes[attrs.LogFileNameResolved] = filepath.Base(abs)
} else if r.FileAttributes[attrs.LogFileNameResolved] != nil {
delete(r.FileAttributes, attrs.LogFileNameResolved)
}
if f.Config.IncludeFilePathResolved {
if f.IncludeFilePathResolved {
r.FileAttributes[attrs.LogFilePathResolved] = abs
} else if r.FileAttributes[attrs.LogFilePathResolved] != nil {
delete(r.FileAttributes, attrs.LogFilePathResolved)
Expand Down
24 changes: 11 additions & 13 deletions pkg/stanza/fileconsumer/internal/reader/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ const (

func testFactory(t *testing.T, opts ...testFactoryOpt) (*Factory, *emittest.Sink) {
cfg := &testFactoryCfg{
fingerprintSize: fingerprint.DefaultSize,
fromBeginning: true,
fingerprintSize: fingerprint.DefaultSize,
maxLogSize: defaultMaxLogSize,
encoding: unicode.UTF8,
trimFunc: trim.Whitespace,
Expand All @@ -42,25 +42,23 @@ func testFactory(t *testing.T, opts ...testFactoryOpt) (*Factory, *emittest.Sink

sink := emittest.NewSink(emittest.WithCallBuffer(cfg.sinkCallBufferSize))
return &Factory{
SugaredLogger: testutil.Logger(t),
Config: &Config{
FingerprintSize: cfg.fingerprintSize,
MaxLogSize: cfg.maxLogSize,
FlushTimeout: cfg.flushPeriod,
Emit: sink.Callback,
},
FromBeginning: cfg.fromBeginning,
Encoding: cfg.encoding,
SplitFunc: splitFunc,
TrimFunc: cfg.trimFunc,
SugaredLogger: testutil.Logger(t),
FromBeginning: cfg.fromBeginning,
FingerprintSize: cfg.fingerprintSize,
MaxLogSize: cfg.maxLogSize,
Encoding: cfg.encoding,
SplitFunc: splitFunc,
TrimFunc: cfg.trimFunc,
FlushTimeout: cfg.flushPeriod,
EmitFunc: sink.Callback,
}, sink
}

type testFactoryOpt func(*testFactoryCfg)

type testFactoryCfg struct {
fingerprintSize int
fromBeginning bool
fingerprintSize int
maxLogSize int
encoding encoding.Encoding
splitCfg split.Config
Expand Down
Loading

0 comments on commit d7b1e26

Please sign in to comment.