Skip to content

Commit

Permalink
Fix harvester shutdown for prospector reloading (#3563) (#3584)
Browse files Browse the repository at this point in the history
* Fix harvester shutdown for prospector reloading

There are two options for stopping a harvester or a prospector. Either the harvester and prospector finish sending all events and stop them self or they are killed because the output is blocking.

In case of shutting down filebeat without using `shutdown_timeout` filebeat is expected to shut down as fast as possible. This means channels are directly closed and the events are not passed through to the registry.

In case of dynamic prospector reloading, prospectors and harvesters must be stopped properly as otherwise no new harvester for the same file can be started. To make this possible the following changes were made:

* Introduce harvester tracking in prospector to better control / manage the harvesters. The implementation is based on a harvester registry which starts and stops the harvesters
* Use an outlet to send events from harvester to prospector. This outlet has an additional signal to have two options on when the outlet should be finished. Like this the outlet can be stopped by the harvester itself or globally through closing beatDone.
* Introduce more done channels in prospector to make shutdown more fine grained
* Add system tests to verify new behaviour

Closes #3546

* review added

(cherry picked from commit 15b32e4)
  • Loading branch information
ruflin authored and tsg committed Feb 14, 2017
1 parent c6ae24f commit 28d2b2c
Show file tree
Hide file tree
Showing 10 changed files with 384 additions and 96 deletions.
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

crawler, err := crawler.New(newSpoolerOutlet(fb.done, spooler, wgEvents), config.Prospectors, *once)
crawler, err := crawler.New(newSpoolerOutlet(fb.done, spooler, wgEvents), config.Prospectors, fb.done, *once)
if err != nil {
logp.Err("Could not init crawler: %v", err)
return err
Expand Down
90 changes: 90 additions & 0 deletions filebeat/channel/outlet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package channel

import (
"sync"
"sync/atomic"

"github.com/elastic/beats/filebeat/input"
)

// Outlet struct is used to be passed to an object which needs an outlet
//
// The difference between signal and done channel is as following:
// - signal channel can be added through SetSignal and is used to
// interrupt events sent through OnEventSignal-
// - done channel is used to close and stop the outlet
//
// If SetSignal is used, it must be ensure that there is only one event producer.
type Outlet struct {
wg *sync.WaitGroup // Use for counting active events
done <-chan struct{}
signal <-chan struct{}
channel chan *input.Event
isOpen int32 // atomic indicator
}

func NewOutlet(
done <-chan struct{},
c chan *input.Event,
wg *sync.WaitGroup,
) *Outlet {
return &Outlet{
done: done,
channel: c,
wg: wg,
isOpen: 1,
}
}

// SetSignal sets the signal channel for OnEventSignal
// If SetSignal is used, it must be ensure that only one producer exists.
func (o *Outlet) SetSignal(signal <-chan struct{}) {
o.signal = signal
}

func (o *Outlet) OnEvent(event *input.Event) bool {
open := atomic.LoadInt32(&o.isOpen) == 1
if !open {
return false
}

if o.wg != nil {
o.wg.Add(1)
}

select {
case <-o.done:
if o.wg != nil {
o.wg.Done()
}
atomic.StoreInt32(&o.isOpen, 0)
return false
case o.channel <- event:
return true
}
}

// OnEventSignal can be stopped by the signal that is set with SetSignal
// This does not close the outlet. Only OnEvent does close the outlet.
// If OnEventSignal is used, it must be ensured that only one producer is used.
func (o *Outlet) OnEventSignal(event *input.Event) bool {
open := atomic.LoadInt32(&o.isOpen) == 1
if !open {
return false
}

if o.wg != nil {
o.wg.Add(1)
}

select {
case <-o.signal:
if o.wg != nil {
o.wg.Done()
}
o.signal = nil
return false
case o.channel <- event:
return true
}
}
8 changes: 5 additions & 3 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ type Crawler struct {
wg sync.WaitGroup
reloader *cfgfile.Reloader
once bool
beatDone chan struct{}
}

func New(out prospector.Outlet, prospectorConfigs []*common.Config, once bool) (*Crawler, error) {
func New(out prospector.Outlet, prospectorConfigs []*common.Config, beatDone chan struct{}, once bool) (*Crawler, error) {

return &Crawler{
out: out,
prospectors: map[uint64]*prospector.Prospector{},
prospectorConfigs: prospectorConfigs,
once: once,
beatDone: beatDone,
}, nil
}

Expand All @@ -47,7 +49,7 @@ func (c *Crawler) Start(r *registrar.Registrar, reloaderConfig *common.Config) e
logp.Warn("BETA feature dynamic configuration reloading is enabled.")

c.reloader = cfgfile.NewReloader(reloaderConfig)
factory := prospector.NewFactory(c.out, r)
factory := prospector.NewFactory(c.out, r, c.beatDone)
go func() {
c.reloader.Run(factory)
}()
Expand All @@ -62,7 +64,7 @@ func (c *Crawler) startProspector(config *common.Config, states []file.State) er
if !config.Enabled() {
return nil
}
p, err := prospector.NewProspector(config, c.out)
p, err := prospector.NewProspector(config, c.out, c.beatDone)
if err != nil {
return fmt.Errorf("Error in initing prospector: %s", err)
}
Expand Down
26 changes: 17 additions & 9 deletions filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
"fmt"
"sync"

"github.com/satori/go.uuid"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
"github.com/elastic/beats/filebeat/harvester/source"
Expand All @@ -40,24 +43,26 @@ type Harvester struct {
fileReader *LogFile
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
prospectorDone chan struct{}
once sync.Once
done chan struct{}
stopOnce sync.Once
stopWg *sync.WaitGroup
outlet *channel.Outlet
ID uuid.UUID
}

func NewHarvester(
cfg *common.Config,
state file.State,
prospectorChan chan *input.Event,
done chan struct{},
outlet *channel.Outlet,
) (*Harvester, error) {

h := &Harvester{
config: defaultConfig,
state: state,
prospectorChan: prospectorChan,
prospectorDone: done,
done: make(chan struct{}),
config: defaultConfig,
state: state,
done: make(chan struct{}),
stopWg: &sync.WaitGroup{},
outlet: outlet,
ID: uuid.NewV4(),
}

if err := cfg.Unpack(&h.config); err != nil {
Expand All @@ -70,6 +75,9 @@ func NewHarvester(
}
h.encodingFactory = encodingFactory

// Add outlet signal so harvester can also stop itself
h.outlet.SetSignal(h.done)

return h, nil
}

Expand Down
41 changes: 21 additions & 20 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,19 @@ func (h *Harvester) Harvest(r reader.Reader) {

harvesterStarted.Add(1)
harvesterRunning.Add(1)
defer harvesterRunning.Add(-1)

// Makes sure file is properly closed when the harvester is stopped
defer h.close()
h.stopWg.Add(1)
defer func() {
// Channel to stop internal harvester routines
h.stop()
// Makes sure file is properly closed when the harvester is stopped
h.close()

harvesterRunning.Add(-1)

// Channel to stop internal harvester routines
defer h.stop()
// Marks harvester stopping completed
h.stopWg.Done()
}()

// Closes reader after timeout or when done channel is closed
// This routine is also responsible to properly stop the reader
Expand All @@ -74,8 +80,6 @@ func (h *Harvester) Harvest(r reader.Reader) {
// Applies when timeout is reached
case <-closeTimeout:
logp.Info("Closing harvester because close_timeout was reached.")
// Required for shutdown when hanging inside reader
case <-h.prospectorDone:
// Required when reader loop returns and reader finished
case <-h.done:
}
Expand Down Expand Up @@ -156,22 +160,23 @@ func (h *Harvester) Harvest(r reader.Reader) {
}
}

// stop is intended for internal use and closed the done channel to stop execution
func (h *Harvester) stop() {
h.once.Do(func() {
h.stopOnce.Do(func() {
close(h.done)
})
}

// Stop stops harvester and waits for completion
func (h *Harvester) Stop() {
h.stop()
h.stopWg.Wait()
}

// sendEvent sends event to the spooler channel
// Return false if event was not sent
func (h *Harvester) sendEvent(event *input.Event) bool {

select {
case <-h.done:
return false
case h.prospectorChan <- event: // ship the new event downstream
return true
}
return h.outlet.OnEventSignal(event)
}

// sendStateUpdate send an empty event with the current state to update the registry
Expand All @@ -182,11 +187,7 @@ func (h *Harvester) sendEvent(event *input.Event) bool {
func (h *Harvester) sendStateUpdate() {
logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset)
event := input.NewEvent(h.state)

select {
case <-h.prospectorDone:
case h.prospectorChan <- event: // ship the new event downstream
}
h.outlet.OnEvent(event)
}

// shouldExportLine decides if the line is exported or not based on
Expand Down
6 changes: 4 additions & 2 deletions filebeat/prospector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@ import (
type Factory struct {
outlet Outlet
registrar *registrar.Registrar
beatDone chan struct{}
}

func NewFactory(outlet Outlet, registrar *registrar.Registrar) *Factory {
func NewFactory(outlet Outlet, registrar *registrar.Registrar, beatDone chan struct{}) *Factory {
return &Factory{
outlet: outlet,
registrar: registrar,
beatDone: beatDone,
}
}

func (r *Factory) Create(c *common.Config) (cfgfile.Runner, error) {

p, err := NewProspector(c, r.outlet)
p, err := NewProspector(c, r.outlet, r.beatDone)
if err != nil {
logp.Err("Error creating prospector: %s", err)
return nil, err
Expand Down
Loading

0 comments on commit 28d2b2c

Please sign in to comment.