Skip to content

Commit

Permalink
[chore][pkg/stanza] Minor cleanup of fileconsumer setup code (open-te…
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored and jayasai470 committed Dec 8, 2023
1 parent fcf373a commit 48e1777
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 44 deletions.
84 changes: 42 additions & 42 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,26 +149,27 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli
return nil, err
}

readerFactory := reader.Factory{
SugaredLogger: logger.With("component", "fileconsumer"),
FromBeginning: startAtBeginning,
FingerprintSize: int(c.FingerprintSize),
MaxLogSize: int(c.MaxLogSize),
Encoding: enc,
SplitFunc: splitFunc,
TrimFunc: trimFunc,
FlushTimeout: c.FlushPeriod,
EmitFunc: emit,
IncludeFileName: c.IncludeFileName,
IncludeFilePath: c.IncludeFilePath,
IncludeFileNameResolved: c.IncludeFileNameResolved,
IncludeFilePathResolved: c.IncludeFilePathResolved,
HeaderConfig: hCfg,
DeleteAtEOF: c.DeleteAfterRead,
}

return &Manager{
SugaredLogger: logger.With("component", "fileconsumer"),
cancel: func() {},
readerFactory: reader.Factory{
SugaredLogger: logger.With("component", "fileconsumer"),
FromBeginning: startAtBeginning,
FingerprintSize: int(c.FingerprintSize),
MaxLogSize: int(c.MaxLogSize),
Encoding: enc,
SplitFunc: splitFunc,
TrimFunc: trimFunc,
FlushTimeout: c.FlushPeriod,
EmitFunc: emit,
IncludeFileName: c.IncludeFileName,
IncludeFilePath: c.IncludeFilePath,
IncludeFileNameResolved: c.IncludeFileNameResolved,
IncludeFilePathResolved: c.IncludeFilePathResolved,
HeaderConfig: hCfg,
DeleteAtEOF: c.DeleteAfterRead,
},
SugaredLogger: logger.With("component", "fileconsumer"),
readerFactory: readerFactory,
fileMatcher: fileMatcher,
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
Expand All @@ -179,50 +180,49 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli
}

func (c Config) validate() error {
if c.DeleteAfterRead && !allowFileDeletion.IsEnabled() {
return fmt.Errorf("`delete_after_read` requires feature gate `%s`", allowFileDeletion.ID())
}

if c.Header != nil && !AllowHeaderMetadataParsing.IsEnabled() {
return fmt.Errorf("`header` requires feature gate `%s`", AllowHeaderMetadataParsing.ID())
}

if _, err := matcher.New(c.Criteria); err != nil {
return err
}

if c.MaxLogSize <= 0 {
return fmt.Errorf("`max_log_size` must be positive")
}

if c.MaxConcurrentFiles <= 1 {
return fmt.Errorf("`max_concurrent_files` must be greater than 1")
}

if c.FingerprintSize < fingerprint.MinSize {
return fmt.Errorf("`fingerprint_size` must be at least %d bytes", fingerprint.MinSize)
return fmt.Errorf("'fingerprint_size' must be at least %d bytes", fingerprint.MinSize)
}

if c.DeleteAfterRead && c.StartAt == "end" {
return fmt.Errorf("`delete_after_read` cannot be used with `start_at: end`")
if c.MaxLogSize <= 0 {
return fmt.Errorf("'max_log_size' must be positive")
}

if c.Header != nil && c.StartAt == "end" {
return fmt.Errorf("`header` cannot be specified with `start_at: end`")
if c.MaxConcurrentFiles <= 1 {
return fmt.Errorf("'max_concurrent_files' must be positive")
}

if c.MaxBatches < 0 {
return errors.New("`max_batches` must not be negative")
return errors.New("'max_batches' must not be negative")
}

enc, err := decode.LookupEncoding(c.Encoding)
if err != nil {
return err
}

if c.DeleteAfterRead {
if !allowFileDeletion.IsEnabled() {
return fmt.Errorf("'delete_after_read' requires feature gate '%s'", allowFileDeletion.ID())
}
if c.StartAt == "end" {
return fmt.Errorf("'delete_after_read' cannot be used with 'start_at: end'")
}
}

if c.Header != nil {
if !AllowHeaderMetadataParsing.IsEnabled() {
return fmt.Errorf("'header' requires feature gate '%s'", AllowHeaderMetadataParsing.ID())
}
if c.StartAt == "end" {
return fmt.Errorf("'header' cannot be specified with 'start_at: end'")
}
if _, err := header.NewConfig(c.Header.Pattern, c.Header.MetadataOperators, enc); err != nil {
return fmt.Errorf("invalid config for `header`: %w", err)
return fmt.Errorf("invalid config for 'header': %w", err)
}
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,17 @@ func (m *Manager) closePreviousFiles() {

// Stop will stop the file monitoring process
func (m *Manager) Stop() error {
m.cancel()
if m.cancel != nil {
m.cancel()
m.cancel = nil
}
m.wg.Wait()
m.closePreviousFiles()
if m.persister != nil {
if err := checkpoint.Save(context.Background(), m.persister, m.knownFiles); err != nil {
m.Errorw("save offsets", zap.Error(err))
}
}
m.cancel = nil
return nil
}

Expand Down

0 comments on commit 48e1777

Please sign in to comment.