-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pkg/stanza/fileconsumer] Add ability to read files asynchronously #23056
[pkg/stanza/fileconsumer] Add ability to read files asynchronously #23056
Conversation
will add a changelog entry, @djaglowski please review it! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@VihasMakwana, thanks for continuing on this. I still want to be very careful here but this is looking like a much simpler PR than we had before.
pkg/stanza/fileconsumer/file_test.go
Outdated
if useThreadPool.IsEnabled() { | ||
operator, emitCalls = buildTestManagerWithOptions(t, cfg, withReaderChan()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could probably make better use of the options pattern here, since we are doing this everywhere.
Maybe the options can be set in TestMain
and then we can always just call buildTestManager(t, cfg, options...)
|
||
package fileconsumer | ||
|
||
type Trie struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need thorough tests for the trie itself. What do you think about adding the trie and dedicated tests to fileconsumer/internal
in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we can do that. Will reduce the the time complexity of this PR
f.rwLock.Lock() | ||
defer f.rwLock.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll have to ensure this doesn't impact performance when the gate is not enabled. If benchmarks can show it's not an issue, that's fine but otherwise can we just check the gate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
if rt.enableThreadPool { | ||
t.Cleanup(func() { | ||
require.NoError(t, featuregate.GlobalRegistry().Set("filelog.useThreadPool", false)) | ||
}) | ||
require.NoError(t, featuregate.GlobalRegistry().Set("filelog.useThreadPool", true)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this test require its own management of the gate? Isn't it covered in TestMain
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes because both of them are separate packages. TestMain will only cover the fileconsumer
package.
pkg/stanza/fileconsumer/file.go
Outdated
@@ -77,6 +97,13 @@ func (m *Manager) Start(persister operator.Persister) error { | |||
func (m *Manager) Stop() error { | |||
m.cancel() | |||
m.wg.Wait() | |||
if useThreadPool.IsEnabled() { | |||
close(m.readerChan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Start
, it's possible that we return before creating the channel, so we need to check if the channel is nil. This can crash the collector from an otherwise recoverable situation.
// Get the list of paths on disk | ||
matches := m.finder.FindFiles() | ||
m.consumeConcurrent(ctx, matches) | ||
m.clearCurrentFingerprints() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I asked you this before but I can't recall. Why can we do this in an asynchronous situation?
r.ReadToEnd(ctx) | ||
// Delete a file if deleteAfterRead is enabled and we reached the end of the file | ||
if m.deleteAfterRead && r.eof { | ||
r.Close() | ||
if err := os.Remove(r.file.Name()); err != nil { | ||
m.Errorf("could not delete %s", r.file.Name()) | ||
} | ||
} else { | ||
// Save off any files that were not fully read or if deleteAfterRead is disabled | ||
m.saveCurrentConcurrent(r) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to deduplicate this code?
if _, ok := m.seenPaths[filePath]; !ok { | ||
if m.readerFactory.fromBeginning { | ||
m.Infow("Started watching file", "path", filePath) | ||
} else { | ||
m.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", filePath) | ||
} | ||
m.seenPaths[filePath] = struct{}{} | ||
} | ||
file, err := os.Open(filePath) // #nosec - operator must read in files defined by user | ||
if err != nil { | ||
m.Debugf("Failed to open file", zap.Error(err)) | ||
return nil, nil | ||
} | ||
fp, err := m.readerFactory.newFingerprint(file) | ||
if err != nil { | ||
m.Errorw("Failed creating fingerprint", zap.Error(err)) | ||
return nil, nil | ||
} | ||
// Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files | ||
|
||
if len(fp.FirstBytes) == 0 { | ||
if err = file.Close(); err != nil { | ||
m.Errorf("problem closing file", "file", file.Name()) | ||
} | ||
return nil, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think all of this is duplicated as well. Can we extract it somehow?
Co-authored-by: Daniel Jaglowski <[email protected]>
Co-authored-by: Daniel Jaglowski <[email protected]>
4ff27c6
to
b501c5d
Compare
b501c5d
to
935a08f
Compare
@VihasMakwana, #23415 is merged, please rebase. |
10e0001
to
039a6f2
Compare
9696336
to
a989864
Compare
a989864
to
1fa0744
Compare
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
Closed as inactive. Feel free to reopen if this PR is still being worked on. |
@djaglowski lets' keep this one closed, will reopen a fresh PR after we merge our trie's PR |
Description: Add Trie data structure and keep it separate from PR #23056 Testing: Relevant test cases added --------- Co-authored-by: Dan Jaglowski <[email protected]>
Description: Added a new feature gate that enables a thread pool mechanism to respect the
poll_interval
parameterCurrent Scenario:
poll_interval
to consume, the current implementation would block until it consumes entirely. In other words, it doesn't respectpoll_interval
.Improvisation using thread pooling:
Link to tracking Issue: #18908
Testing: Nothing new added, existing ones are modified as per the feature gate
I will provide benchmarks in the comments.