-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix harvester shutdown for prospector reloading #3563
Conversation
c740c4c
to
3929b5d
Compare
3929b5d
to
876586a
Compare
There are two options for stopping a harvester or a prospector. Either the harvester and prospector finish sending all events and stop them self or they are killed because the output is blocking. In case of shutting down filebeat without using `shutdown_timeout` filebeat is expected to shut down as fast as possible. This means channels are directly closed and the events are not passed through to the registry. In case of dynamic prospector reloading, prospectors and harvesters must be stopped properly as otherwise no new harvester for the same file can be started. To make this possible the following changes were made: * Introduce harvester tracking in prospector to better control / manage the harvesters. The implementation is based on a harvester registry which starts and stops the harvesters * Use an outlet to send events from harvester to prospector. This outlet has an additional signal to have two options on when the outlet should be finished. Like this the outlet can be stopped by the harvester itself or globally through closing beatDone. * Introduce more done channels in prospector to make shutdown more fine grained * Add system tests to verify new behaviour Closes elastic#3546
876586a
to
6429786
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as channel.Outlet is very specific to harvester, let's move it into the harvester module. Also makes clear we're going to have one producer only.
filebeat/channel/outlet.go
Outdated
// SetSignal sets the signal channel for OnEventSignal | ||
func (o *Outlet) SetSignal(signal <-chan struct{}) { | ||
o.signal = signal | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we need SetSignal
? This way the outlet can not be reused by multiple harvesters? In this case please add Note(godoc) to Outlet that it requires single-producers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need it to make a difference between the harvester stopping itself and the state.Finished must always be reported and beater.done closing, which means it should directly shut down.
I added a note in the godocs.
filebeat/channel/outlet.go
Outdated
type Outlet struct { | ||
wg *sync.WaitGroup | ||
done <-chan struct{} | ||
signal <-chan struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make difference between signal and done clear as godoc here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a comment.
filebeat/channel/outlet.go
Outdated
) | ||
|
||
type Outlet struct { | ||
wg *sync.WaitGroup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wg used for counting active events?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, added a comment.
} | ||
|
||
// createHarvester creates a new harvester instance from the given state | ||
func (p *Prospector) createHarvester(state file.State) (*harvester.Harvester, error) { | ||
|
||
outlet := channel.NewOutlet(p.beatDone, p.harvesterChan, p.eventCounter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, I see... one outlet passed to harvester. If NewHarvester would get an interface instead of channel.Outlet, the wrapping could be done in the harvester. That is, the logic OnEvent
and OnEventSignal
will become more local to the harvester (no need for public definition).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking of doing that it in a second step where I would also like to unify outlet used prospector -> spooler
and the now harvester -> prospector
.
|
||
done := make(chan struct{}) | ||
go func() { | ||
p.eventCounter.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when calling wait here, are we sure no harvesters are running for sure? One MUST not use 'Add' after Wait has been invoked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
p.run.Wait()
ensures that all scanning is finished and no new harvesters are added. Then p.registry.Stop()
ensures that all harvesters are stopped before continuing. As waitEvents
happens after that, this should not happen.
return false | ||
case o.channel <- event: | ||
return true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if outlet is used by multiple producers this can race I think. If it's used by one single producer only, there is no need for the atomics. Also, setting a channel to nil
has a similar effect as a nil
channel blocks forever and a closed channel always receives. e.g.:
type Outlet struct {
stateUpdate, event cancelChannel
}
type cancelChannel struct {
done chan struct{}
ch chan *input.Event
}
func (ch *cancelChannel) OnEvent(evt *input.Event) bool {
select {
case <-ch.done:
ch.ch = nil
return false
case ch.ch <- event:
return true
}
}
func (o *Outlet) OnEvent(event *input.Event) bool {
return o.event(event)
}
func (o *Outlet) OnStateUpdate(event *input.Event) bool {
return o.stateUpdate.event(event)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to keep the current implementation to make it possible to use the same Outlet also for prospector to spooler in the future. So we only have one implementation which supports both use cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's time to get rid of the spooler...
* Fix harvester shutdown for prospector reloading There are two options for stopping a harvester or a prospector. Either the harvester and prospector finish sending all events and stop them self or they are killed because the output is blocking. In case of shutting down filebeat without using `shutdown_timeout` filebeat is expected to shut down as fast as possible. This means channels are directly closed and the events are not passed through to the registry. In case of dynamic prospector reloading, prospectors and harvesters must be stopped properly as otherwise no new harvester for the same file can be started. To make this possible the following changes were made: * Introduce harvester tracking in prospector to better control / manage the harvesters. The implementation is based on a harvester registry which starts and stops the harvesters * Use an outlet to send events from harvester to prospector. This outlet has an additional signal to have two options on when the outlet should be finished. Like this the outlet can be stopped by the harvester itself or globally through closing beatDone. * Introduce more done channels in prospector to make shutdown more fine grained * Add system tests to verify new behaviour Closes elastic#3546 * review added (cherry picked from commit 15b32e4)
* Fix harvester shutdown for prospector reloading There are two options for stopping a harvester or a prospector. Either the harvester and prospector finish sending all events and stop them self or they are killed because the output is blocking. In case of shutting down filebeat without using `shutdown_timeout` filebeat is expected to shut down as fast as possible. This means channels are directly closed and the events are not passed through to the registry. In case of dynamic prospector reloading, prospectors and harvesters must be stopped properly as otherwise no new harvester for the same file can be started. To make this possible the following changes were made: * Introduce harvester tracking in prospector to better control / manage the harvesters. The implementation is based on a harvester registry which starts and stops the harvesters * Use an outlet to send events from harvester to prospector. This outlet has an additional signal to have two options on when the outlet should be finished. Like this the outlet can be stopped by the harvester itself or globally through closing beatDone. * Introduce more done channels in prospector to make shutdown more fine grained * Add system tests to verify new behaviour Closes #3546 * review added (cherry picked from commit 15b32e4)
There are two options for stopping a harvester or a prospector. Either the harvester and prospector finish sending all events and stop them self or they are killed because the output is blocking.
In case of shutting down filebeat without using
shutdown_timeout
filebeat is expected to shut down as fast as possible. This means channels are directly closed and the events are not passed through to the registry.In case of dynamic prospector reloading, prospectors and harvesters must be stopped properly as otherwise no new harvester for the same file can be started. To make this possible the following changes were made:
Closes #3546