diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 70f5175e9d20..20313ee33b12 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -149,26 +149,27 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli return nil, err } + readerFactory := reader.Factory{ + SugaredLogger: logger.With("component", "fileconsumer"), + FromBeginning: startAtBeginning, + FingerprintSize: int(c.FingerprintSize), + MaxLogSize: int(c.MaxLogSize), + Encoding: enc, + SplitFunc: splitFunc, + TrimFunc: trimFunc, + FlushTimeout: c.FlushPeriod, + EmitFunc: emit, + IncludeFileName: c.IncludeFileName, + IncludeFilePath: c.IncludeFilePath, + IncludeFileNameResolved: c.IncludeFileNameResolved, + IncludeFilePathResolved: c.IncludeFilePathResolved, + HeaderConfig: hCfg, + DeleteAtEOF: c.DeleteAfterRead, + } + return &Manager{ - SugaredLogger: logger.With("component", "fileconsumer"), - cancel: func() {}, - readerFactory: reader.Factory{ - SugaredLogger: logger.With("component", "fileconsumer"), - FromBeginning: startAtBeginning, - FingerprintSize: int(c.FingerprintSize), - MaxLogSize: int(c.MaxLogSize), - Encoding: enc, - SplitFunc: splitFunc, - TrimFunc: trimFunc, - FlushTimeout: c.FlushPeriod, - EmitFunc: emit, - IncludeFileName: c.IncludeFileName, - IncludeFilePath: c.IncludeFilePath, - IncludeFileNameResolved: c.IncludeFileNameResolved, - IncludeFilePathResolved: c.IncludeFilePathResolved, - HeaderConfig: hCfg, - DeleteAtEOF: c.DeleteAfterRead, - }, + SugaredLogger: logger.With("component", "fileconsumer"), + readerFactory: readerFactory, fileMatcher: fileMatcher, pollInterval: c.PollInterval, maxBatchFiles: c.MaxConcurrentFiles / 2, @@ -179,40 +180,24 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli } func (c Config) validate() error { - if c.DeleteAfterRead && !allowFileDeletion.IsEnabled() { - return fmt.Errorf("`delete_after_read` requires feature gate `%s`", allowFileDeletion.ID()) - } - - if c.Header != nil && !AllowHeaderMetadataParsing.IsEnabled() { - return fmt.Errorf("`header` requires feature gate `%s`", AllowHeaderMetadataParsing.ID()) - } - if _, err := matcher.New(c.Criteria); err != nil { return err } - if c.MaxLogSize <= 0 { - return fmt.Errorf("`max_log_size` must be positive") - } - - if c.MaxConcurrentFiles <= 1 { - return fmt.Errorf("`max_concurrent_files` must be greater than 1") - } - if c.FingerprintSize < fingerprint.MinSize { - return fmt.Errorf("`fingerprint_size` must be at least %d bytes", fingerprint.MinSize) + return fmt.Errorf("'fingerprint_size' must be at least %d bytes", fingerprint.MinSize) } - if c.DeleteAfterRead && c.StartAt == "end" { - return fmt.Errorf("`delete_after_read` cannot be used with `start_at: end`") + if c.MaxLogSize <= 0 { + return fmt.Errorf("'max_log_size' must be positive") } - if c.Header != nil && c.StartAt == "end" { - return fmt.Errorf("`header` cannot be specified with `start_at: end`") + if c.MaxConcurrentFiles <= 1 { + return fmt.Errorf("'max_concurrent_files' must be positive") } if c.MaxBatches < 0 { - return errors.New("`max_batches` must not be negative") + return errors.New("'max_batches' must not be negative") } enc, err := decode.LookupEncoding(c.Encoding) @@ -220,9 +205,24 @@ func (c Config) validate() error { return err } + if c.DeleteAfterRead { + if !allowFileDeletion.IsEnabled() { + return fmt.Errorf("'delete_after_read' requires feature gate '%s'", allowFileDeletion.ID()) + } + if c.StartAt == "end" { + return fmt.Errorf("'delete_after_read' cannot be used with 'start_at: end'") + } + } + if c.Header != nil { + if !AllowHeaderMetadataParsing.IsEnabled() { + return fmt.Errorf("'header' requires feature gate '%s'", AllowHeaderMetadataParsing.ID()) + } + if c.StartAt == "end" { + return fmt.Errorf("'header' cannot be specified with 'start_at: end'") + } if _, err := header.NewConfig(c.Header.Pattern, c.Header.MetadataOperators, enc); err != nil { - return fmt.Errorf("invalid config for `header`: %w", err) + return fmt.Errorf("invalid config for 'header': %w", err) } } diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 8e284d3d6950..085389685017 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -85,7 +85,10 @@ func (m *Manager) closePreviousFiles() { // Stop will stop the file monitoring process func (m *Manager) Stop() error { - m.cancel() + if m.cancel != nil { + m.cancel() + m.cancel = nil + } m.wg.Wait() m.closePreviousFiles() if m.persister != nil { @@ -93,7 +96,6 @@ func (m *Manager) Stop() error { m.Errorw("save offsets", zap.Error(err)) } } - m.cancel = nil return nil }