Skip to content

Commit

Permalink
[chore][pkg/stanza] Adjust length of knownFiles based on number of ma…
Browse files Browse the repository at this point in the history
…tches (#28646)

Follows
#28493

This adjusts the length of `knownFiles` to be roughly 4x the number of
matches per poll cycle. In other words, we will remember files for up to
4 poll cycles.

Resolves
#28567
  • Loading branch information
djaglowski authored Nov 2, 2023
1 parent 327de9f commit 4f5952a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
previousPollFiles: make([]*reader.Reader, 0, c.MaxConcurrentFiles/2),
knownFiles: make([]*reader.Metadata, 0, 10*c.MaxConcurrentFiles),
knownFiles: []*reader.Metadata{},
}, nil
}

Expand Down
24 changes: 18 additions & 6 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,26 @@ type Manager struct {

previousPollFiles []*reader.Reader
knownFiles []*reader.Metadata

// This value approximates the expected number of files which we will find in a single poll cycle.
// It is updated each poll cycle using a simple moving average calculation which assigns 20% weight
// to the most recent poll cycle.
// It is used to regulate the size of knownFiles. The goal is to allow knownFiles
// to contain checkpoints from a few previous poll cycles, but not grow unbounded.
movingAverageMatches int
}

func (m *Manager) Start(persister operator.Persister) error {
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel

if matches, err := m.fileMatcher.MatchFiles(); err != nil {
m.Warnf("finding files: %v", err)
} else {
m.movingAverageMatches = len(matches)
m.knownFiles = make([]*reader.Metadata, 0, 4*len(matches))
}

if persister != nil {
m.persister = persister
offsets, err := checkpoint.Load(ctx, m.persister)
Expand All @@ -53,19 +67,15 @@ func (m *Manager) Start(persister operator.Persister) error {
}
}

if _, err := m.fileMatcher.MatchFiles(); err != nil {
m.Warnf("finding files: %v", err)
}

// Start polling goroutine
m.startPoller(ctx)

return nil
}

func (m *Manager) closePreviousFiles() {
if forgetNum := len(m.previousPollFiles) + len(m.knownFiles) - cap(m.knownFiles); forgetNum > 0 {
m.knownFiles = m.knownFiles[forgetNum:]
if len(m.knownFiles) > 4*m.movingAverageMatches {
m.knownFiles = m.knownFiles[m.movingAverageMatches:]
}
for _, r := range m.previousPollFiles {
m.knownFiles = append(m.knownFiles, r.Close())
Expand Down Expand Up @@ -116,6 +126,8 @@ func (m *Manager) poll(ctx context.Context) {
matches, err := m.fileMatcher.MatchFiles()
if err != nil {
m.Debugf("finding files: %v", err)
} else {
m.movingAverageMatches = (m.movingAverageMatches*3 + len(matches)) / 4
}
m.Debugf("matched files", zap.Strings("paths", matches))

Expand Down

0 comments on commit 4f5952a

Please sign in to comment.