-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix concurrent harvesters #2541
Conversation
6818c40
to
b90eabd
Compare
ok := p.outlet.OnEvent(event) | ||
if !ok { | ||
logp.Info("Prospector outlet closed") | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change in logic here. This return used to stop this worker. The return having moved to updateState will not stop the worker potentially publishing more incoming events. The worker instead should be dropped + report to prospector it needs to 'shutdown' + drain the harvesterChan, so no harvester will be blocked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added an bool to updateState to be returned so the loop can check if event was sent and return accordingly. Same is needed for other calls of updateState.
LGTM |
In case newly started harvesters did not persist their first state before the next scan started, it could have happened that multiple harvesters were started for the same file. This could have been cause by a large number of files or the output blocking. The problem is solve that the Setup step of the Harvester is now synchronus and blocking the scan. Part of this is also updating the first state of the as part of the prospector. The side affect of this change is that now a scan is blocking in case the channel is blocked which means the output is probably not responding. If the output is not responding, scans will not continue and new files will not be discovered until output is available again. The code can be further simplified in the future by merging create/startHarvester. This will be done in a second step to keep backport commit to a minimum. See also elastic#2539
cde1400
to
dc12cfe
Compare
dc12cfe
to
3086818
Compare
In case newly started harvesters did not persist their first state before the next scan started, it could have happened that multiple harvesters were started for the same file. This could have been cause by a large number of files or the output blocking.
The problem is solve that the Setup step of the Harvester is now synchronus and blocking the scan. Part of this is also updating the first state of the as part of the prospector.
The side affect of this change is that now a scan is blocking in case the channel is blocked which means the output is probably not responding. If the output is not responding, scans will not continue and new files will not be discovered until output is available again.
The code can be further simplified in the future by merging create/startHarvester. This will be done in a second step to keep backport commit to a minimum.
See also #2539