Skip to content

Commit

Permalink
Fix harvester shutdown for prospector reloading
Browse files Browse the repository at this point in the history
There are two options for stopping a harvester or a prospector. Either the harvester and prospector finish sending all events and stop them self or they are killed because the output is blocking.

In case of shutting down filebeat without using `shutdown_timeout` filebeat is expected to shut down as fast as possible. This means channels are directly closed and the events are not passed through to the registry.

In case of dynamic prospector reloading, prospectors and harvesters must be stopped properly as otherwise no new harvester for the same file can be started. To make this possible the following changes were made:

* Introduce harvester tracking in prospector to better control / manage the harvesters. The implementation is based on a harvester registry which starts and stops the harvesters
* Use an outlet to send events from harvester to prospector. This outlet has an additional signal to have two options on when the outlet should be finished. Like this the outlet can be stopped by the harvester itself or globally through closing beatDone.
* Introduce more done channels in prospector to make shutdown more fine grained
* Add system tests to verify new behaviour

Closes elastic#3546
  • Loading branch information
ruflin committed Feb 9, 2017
1 parent f006365 commit 6429786
Show file tree
Hide file tree
Showing 10 changed files with 374 additions and 96 deletions.
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

crawler, err := crawler.New(newSpoolerOutlet(fb.done, spooler, wgEvents), config.Prospectors, *once)
crawler, err := crawler.New(newSpoolerOutlet(fb.done, spooler, wgEvents), config.Prospectors, fb.done, *once)
if err != nil {
logp.Err("Could not init crawler: %v", err)
return err
Expand Down
80 changes: 80 additions & 0 deletions filebeat/channel/outlet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package channel

import (
"sync"
"sync/atomic"

"github.com/elastic/beats/filebeat/input"
)

type Outlet struct {
wg *sync.WaitGroup
done <-chan struct{}
signal <-chan struct{}
channel chan *input.Event
isOpen int32 // atomic indicator
}

func NewOutlet(
done <-chan struct{},
c chan *input.Event,
wg *sync.WaitGroup,
) *Outlet {
return &Outlet{
done: done,
channel: c,
wg: wg,
isOpen: 1,
}
}

// SetSignal sets the signal channel for OnEventSignal
func (o *Outlet) SetSignal(signal <-chan struct{}) {
o.signal = signal
}

func (o *Outlet) OnEvent(event *input.Event) bool {
open := atomic.LoadInt32(&o.isOpen) == 1
if !open {
return false
}

if o.wg != nil {
o.wg.Add(1)
}

select {
case <-o.done:
if o.wg != nil {
o.wg.Done()
}
atomic.StoreInt32(&o.isOpen, 0)
return false
case o.channel <- event:
return true
}
}

// OnEventSignal can be stopped by the signal that is set with SetSignal
// This does not close the outlet. Only OnEvent does close the outlet.
func (o *Outlet) OnEventSignal(event *input.Event) bool {
open := atomic.LoadInt32(&o.isOpen) == 1
if !open {
return false
}

if o.wg != nil {
o.wg.Add(1)
}

select {
case <-o.signal:
if o.wg != nil {
o.wg.Done()
}
o.signal = nil
return false
case o.channel <- event:
return true
}
}
8 changes: 5 additions & 3 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ type Crawler struct {
wg sync.WaitGroup
reloader *cfgfile.Reloader
once bool
beatDone chan struct{}
}

func New(out prospector.Outlet, prospectorConfigs []*common.Config, once bool) (*Crawler, error) {
func New(out prospector.Outlet, prospectorConfigs []*common.Config, beatDone chan struct{}, once bool) (*Crawler, error) {

return &Crawler{
out: out,
prospectors: map[uint64]*prospector.Prospector{},
prospectorConfigs: prospectorConfigs,
once: once,
beatDone: beatDone,
}, nil
}

Expand All @@ -47,7 +49,7 @@ func (c *Crawler) Start(r *registrar.Registrar, reloaderConfig *common.Config) e
logp.Warn("BETA feature dynamic configuration reloading is enabled.")

c.reloader = cfgfile.NewReloader(reloaderConfig)
factory := prospector.NewFactory(c.out, r)
factory := prospector.NewFactory(c.out, r, c.beatDone)
go func() {
c.reloader.Run(factory)
}()
Expand All @@ -62,7 +64,7 @@ func (c *Crawler) startProspector(config *common.Config, states []file.State) er
if !config.Enabled() {
return nil
}
p, err := prospector.NewProspector(config, c.out)
p, err := prospector.NewProspector(config, c.out, c.beatDone)
if err != nil {
return fmt.Errorf("Error in initing prospector: %s", err)
}
Expand Down
26 changes: 17 additions & 9 deletions filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
"fmt"
"sync"

"github.com/satori/go.uuid"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
"github.com/elastic/beats/filebeat/harvester/source"
Expand All @@ -40,24 +43,26 @@ type Harvester struct {
fileReader *LogFile
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
prospectorDone chan struct{}
once sync.Once
done chan struct{}
stopOnce sync.Once
stopWg *sync.WaitGroup
outlet *channel.Outlet
ID uuid.UUID
}

func NewHarvester(
cfg *common.Config,
state file.State,
prospectorChan chan *input.Event,
done chan struct{},
outlet *channel.Outlet,
) (*Harvester, error) {

h := &Harvester{
config: defaultConfig,
state: state,
prospectorChan: prospectorChan,
prospectorDone: done,
done: make(chan struct{}),
config: defaultConfig,
state: state,
done: make(chan struct{}),
stopWg: &sync.WaitGroup{},
outlet: outlet,
ID: uuid.NewV4(),
}

if err := cfg.Unpack(&h.config); err != nil {
Expand All @@ -70,6 +75,9 @@ func NewHarvester(
}
h.encodingFactory = encodingFactory

// Add outlet signal so harvester can also stop itself
h.outlet.SetSignal(h.done)

return h, nil
}

Expand Down
41 changes: 21 additions & 20 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,19 @@ func (h *Harvester) Harvest(r reader.Reader) {

harvesterStarted.Add(1)
harvesterRunning.Add(1)
defer harvesterRunning.Add(-1)

// Makes sure file is properly closed when the harvester is stopped
defer h.close()
h.stopWg.Add(1)
defer func() {
// Channel to stop internal harvester routines
h.stop()
// Makes sure file is properly closed when the harvester is stopped
h.close()

harvesterRunning.Add(-1)

// Channel to stop internal harvester routines
defer h.stop()
// Marks harvester stopping completed
h.stopWg.Done()
}()

// Closes reader after timeout or when done channel is closed
// This routine is also responsible to properly stop the reader
Expand All @@ -74,8 +80,6 @@ func (h *Harvester) Harvest(r reader.Reader) {
// Applies when timeout is reached
case <-closeTimeout:
logp.Info("Closing harvester because close_timeout was reached.")
// Required for shutdown when hanging inside reader
case <-h.prospectorDone:
// Required when reader loop returns and reader finished
case <-h.done:
}
Expand Down Expand Up @@ -156,22 +160,23 @@ func (h *Harvester) Harvest(r reader.Reader) {
}
}

// stop is intended for internal use and closed the done channel to stop execution
func (h *Harvester) stop() {
h.once.Do(func() {
h.stopOnce.Do(func() {
close(h.done)
})
}

// Stop stops harvester and waits for completion
func (h *Harvester) Stop() {
h.stop()
h.stopWg.Wait()
}

// sendEvent sends event to the spooler channel
// Return false if event was not sent
func (h *Harvester) sendEvent(event *input.Event) bool {

select {
case <-h.done:
return false
case h.prospectorChan <- event: // ship the new event downstream
return true
}
return h.outlet.OnEventSignal(event)
}

// sendStateUpdate send an empty event with the current state to update the registry
Expand All @@ -182,11 +187,7 @@ func (h *Harvester) sendEvent(event *input.Event) bool {
func (h *Harvester) sendStateUpdate() {
logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset)
event := input.NewEvent(h.state)

select {
case <-h.prospectorDone:
case h.prospectorChan <- event: // ship the new event downstream
}
h.outlet.OnEvent(event)
}

// shouldExportLine decides if the line is exported or not based on
Expand Down
6 changes: 4 additions & 2 deletions filebeat/prospector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@ import (
type Factory struct {
outlet Outlet
registrar *registrar.Registrar
beatDone chan struct{}
}

func NewFactory(outlet Outlet, registrar *registrar.Registrar) *Factory {
func NewFactory(outlet Outlet, registrar *registrar.Registrar, beatDone chan struct{}) *Factory {
return &Factory{
outlet: outlet,
registrar: registrar,
beatDone: beatDone,
}
}

func (r *Factory) Create(c *common.Config) (cfgfile.Runner, error) {

p, err := NewProspector(c, r.outlet)
p, err := NewProspector(c, r.outlet, r.beatDone)
if err != nil {
logp.Err("Error creating prospector: %s", err)
return nil, err
Expand Down
Loading

0 comments on commit 6429786

Please sign in to comment.