From 28d2b2cb5eb94dc34e0ce76e16636d3b3e73f763 Mon Sep 17 00:00:00 2001 From: Nicolas Ruflin Date: Tue, 14 Feb 2017 09:46:39 +0100 Subject: [PATCH] Fix harvester shutdown for prospector reloading (#3563) (#3584) * Fix harvester shutdown for prospector reloading There are two options for stopping a harvester or a prospector. Either the harvester and prospector finish sending all events and stop them self or they are killed because the output is blocking. In case of shutting down filebeat without using `shutdown_timeout` filebeat is expected to shut down as fast as possible. This means channels are directly closed and the events are not passed through to the registry. In case of dynamic prospector reloading, prospectors and harvesters must be stopped properly as otherwise no new harvester for the same file can be started. To make this possible the following changes were made: * Introduce harvester tracking in prospector to better control / manage the harvesters. The implementation is based on a harvester registry which starts and stops the harvesters * Use an outlet to send events from harvester to prospector. This outlet has an additional signal to have two options on when the outlet should be finished. Like this the outlet can be stopped by the harvester itself or globally through closing beatDone. * Introduce more done channels in prospector to make shutdown more fine grained * Add system tests to verify new behaviour Closes https://github.com/elastic/beats/issues/3546 * review added (cherry picked from commit 15b32e424159989ea211398aa851384102bb8b87) --- filebeat/beater/filebeat.go | 2 +- filebeat/channel/outlet.go | 90 ++++++++++++++ filebeat/crawler/crawler.go | 8 +- filebeat/harvester/harvester.go | 26 ++-- filebeat/harvester/log.go | 41 +++--- filebeat/prospector/factory.go | 6 +- filebeat/prospector/prospector.go | 173 +++++++++++++++++--------- filebeat/prospector/prospector_log.go | 2 +- filebeat/prospector/registry.go | 70 +++++++++++ filebeat/tests/system/test_reload.py | 62 +++++++++ 10 files changed, 384 insertions(+), 96 deletions(-) create mode 100644 filebeat/channel/outlet.go create mode 100644 filebeat/prospector/registry.go diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index b33b6cf9efe..878b830a7fc 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -133,7 +133,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { return err } - crawler, err := crawler.New(newSpoolerOutlet(fb.done, spooler, wgEvents), config.Prospectors, *once) + crawler, err := crawler.New(newSpoolerOutlet(fb.done, spooler, wgEvents), config.Prospectors, fb.done, *once) if err != nil { logp.Err("Could not init crawler: %v", err) return err diff --git a/filebeat/channel/outlet.go b/filebeat/channel/outlet.go new file mode 100644 index 00000000000..4695b302899 --- /dev/null +++ b/filebeat/channel/outlet.go @@ -0,0 +1,90 @@ +package channel + +import ( + "sync" + "sync/atomic" + + "github.com/elastic/beats/filebeat/input" +) + +// Outlet struct is used to be passed to an object which needs an outlet +// +// The difference between signal and done channel is as following: +// - signal channel can be added through SetSignal and is used to +// interrupt events sent through OnEventSignal- +// - done channel is used to close and stop the outlet +// +// If SetSignal is used, it must be ensure that there is only one event producer. +type Outlet struct { + wg *sync.WaitGroup // Use for counting active events + done <-chan struct{} + signal <-chan struct{} + channel chan *input.Event + isOpen int32 // atomic indicator +} + +func NewOutlet( + done <-chan struct{}, + c chan *input.Event, + wg *sync.WaitGroup, +) *Outlet { + return &Outlet{ + done: done, + channel: c, + wg: wg, + isOpen: 1, + } +} + +// SetSignal sets the signal channel for OnEventSignal +// If SetSignal is used, it must be ensure that only one producer exists. +func (o *Outlet) SetSignal(signal <-chan struct{}) { + o.signal = signal +} + +func (o *Outlet) OnEvent(event *input.Event) bool { + open := atomic.LoadInt32(&o.isOpen) == 1 + if !open { + return false + } + + if o.wg != nil { + o.wg.Add(1) + } + + select { + case <-o.done: + if o.wg != nil { + o.wg.Done() + } + atomic.StoreInt32(&o.isOpen, 0) + return false + case o.channel <- event: + return true + } +} + +// OnEventSignal can be stopped by the signal that is set with SetSignal +// This does not close the outlet. Only OnEvent does close the outlet. +// If OnEventSignal is used, it must be ensured that only one producer is used. +func (o *Outlet) OnEventSignal(event *input.Event) bool { + open := atomic.LoadInt32(&o.isOpen) == 1 + if !open { + return false + } + + if o.wg != nil { + o.wg.Add(1) + } + + select { + case <-o.signal: + if o.wg != nil { + o.wg.Done() + } + o.signal = nil + return false + case o.channel <- event: + return true + } +} diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index cf56423db4f..9baf86f8084 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -19,15 +19,17 @@ type Crawler struct { wg sync.WaitGroup reloader *cfgfile.Reloader once bool + beatDone chan struct{} } -func New(out prospector.Outlet, prospectorConfigs []*common.Config, once bool) (*Crawler, error) { +func New(out prospector.Outlet, prospectorConfigs []*common.Config, beatDone chan struct{}, once bool) (*Crawler, error) { return &Crawler{ out: out, prospectors: map[uint64]*prospector.Prospector{}, prospectorConfigs: prospectorConfigs, once: once, + beatDone: beatDone, }, nil } @@ -47,7 +49,7 @@ func (c *Crawler) Start(r *registrar.Registrar, reloaderConfig *common.Config) e logp.Warn("BETA feature dynamic configuration reloading is enabled.") c.reloader = cfgfile.NewReloader(reloaderConfig) - factory := prospector.NewFactory(c.out, r) + factory := prospector.NewFactory(c.out, r, c.beatDone) go func() { c.reloader.Run(factory) }() @@ -62,7 +64,7 @@ func (c *Crawler) startProspector(config *common.Config, states []file.State) er if !config.Enabled() { return nil } - p, err := prospector.NewProspector(config, c.out) + p, err := prospector.NewProspector(config, c.out, c.beatDone) if err != nil { return fmt.Errorf("Error in initing prospector: %s", err) } diff --git a/filebeat/harvester/harvester.go b/filebeat/harvester/harvester.go index 3e9755f1662..731eb17ace0 100644 --- a/filebeat/harvester/harvester.go +++ b/filebeat/harvester/harvester.go @@ -16,6 +16,9 @@ import ( "fmt" "sync" + "github.com/satori/go.uuid" + + "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/encoding" "github.com/elastic/beats/filebeat/harvester/source" @@ -40,24 +43,26 @@ type Harvester struct { fileReader *LogFile encodingFactory encoding.EncodingFactory encoding encoding.Encoding - prospectorDone chan struct{} - once sync.Once done chan struct{} + stopOnce sync.Once + stopWg *sync.WaitGroup + outlet *channel.Outlet + ID uuid.UUID } func NewHarvester( cfg *common.Config, state file.State, - prospectorChan chan *input.Event, - done chan struct{}, + outlet *channel.Outlet, ) (*Harvester, error) { h := &Harvester{ - config: defaultConfig, - state: state, - prospectorChan: prospectorChan, - prospectorDone: done, - done: make(chan struct{}), + config: defaultConfig, + state: state, + done: make(chan struct{}), + stopWg: &sync.WaitGroup{}, + outlet: outlet, + ID: uuid.NewV4(), } if err := cfg.Unpack(&h.config); err != nil { @@ -70,6 +75,9 @@ func NewHarvester( } h.encodingFactory = encodingFactory + // Add outlet signal so harvester can also stop itself + h.outlet.SetSignal(h.done) + return h, nil } diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index 95027045d5e..af35b2077ad 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -52,13 +52,19 @@ func (h *Harvester) Harvest(r reader.Reader) { harvesterStarted.Add(1) harvesterRunning.Add(1) - defer harvesterRunning.Add(-1) - // Makes sure file is properly closed when the harvester is stopped - defer h.close() + h.stopWg.Add(1) + defer func() { + // Channel to stop internal harvester routines + h.stop() + // Makes sure file is properly closed when the harvester is stopped + h.close() + + harvesterRunning.Add(-1) - // Channel to stop internal harvester routines - defer h.stop() + // Marks harvester stopping completed + h.stopWg.Done() + }() // Closes reader after timeout or when done channel is closed // This routine is also responsible to properly stop the reader @@ -74,8 +80,6 @@ func (h *Harvester) Harvest(r reader.Reader) { // Applies when timeout is reached case <-closeTimeout: logp.Info("Closing harvester because close_timeout was reached.") - // Required for shutdown when hanging inside reader - case <-h.prospectorDone: // Required when reader loop returns and reader finished case <-h.done: } @@ -156,22 +160,23 @@ func (h *Harvester) Harvest(r reader.Reader) { } } +// stop is intended for internal use and closed the done channel to stop execution func (h *Harvester) stop() { - h.once.Do(func() { + h.stopOnce.Do(func() { close(h.done) }) } +// Stop stops harvester and waits for completion +func (h *Harvester) Stop() { + h.stop() + h.stopWg.Wait() +} + // sendEvent sends event to the spooler channel // Return false if event was not sent func (h *Harvester) sendEvent(event *input.Event) bool { - - select { - case <-h.done: - return false - case h.prospectorChan <- event: // ship the new event downstream - return true - } + return h.outlet.OnEventSignal(event) } // sendStateUpdate send an empty event with the current state to update the registry @@ -182,11 +187,7 @@ func (h *Harvester) sendEvent(event *input.Event) bool { func (h *Harvester) sendStateUpdate() { logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset) event := input.NewEvent(h.state) - - select { - case <-h.prospectorDone: - case h.prospectorChan <- event: // ship the new event downstream - } + h.outlet.OnEvent(event) } // shouldExportLine decides if the line is exported or not based on diff --git a/filebeat/prospector/factory.go b/filebeat/prospector/factory.go index e04e6213fd6..df4d3e80e97 100644 --- a/filebeat/prospector/factory.go +++ b/filebeat/prospector/factory.go @@ -10,18 +10,20 @@ import ( type Factory struct { outlet Outlet registrar *registrar.Registrar + beatDone chan struct{} } -func NewFactory(outlet Outlet, registrar *registrar.Registrar) *Factory { +func NewFactory(outlet Outlet, registrar *registrar.Registrar, beatDone chan struct{}) *Factory { return &Factory{ outlet: outlet, registrar: registrar, + beatDone: beatDone, } } func (r *Factory) Create(c *common.Config) (cfgfile.Runner, error) { - p, err := NewProspector(c, r.outlet) + p, err := NewProspector(c, r.outlet, r.beatDone) if err != nil { logp.Err("Error creating prospector: %s", err) return nil, err diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index 31f48684596..eaa08c50aa1 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -5,11 +5,11 @@ import ( "expvar" "fmt" "sync" - "sync/atomic" "time" "github.com/mitchellh/hashstructure" + "github.com/elastic/beats/filebeat/channel" cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/input" @@ -23,19 +23,22 @@ var ( ) type Prospector struct { - // harvesterCounter MUST be first field in struct. See https://github.com/golang/go/issues/599 - harvesterCounter uint64 - cfg *common.Config // Raw config - config prospectorConfig - prospectorer Prospectorer - outlet Outlet - harvesterChan chan *input.Event - done chan struct{} - states *file.States - wg sync.WaitGroup - channelWg sync.WaitGroup // Separate waitgroup for channels as not stopped on completion - id uint64 - Once bool + cfg *common.Config // Raw config + config prospectorConfig + prospectorer Prospectorer + outlet Outlet + harvesterChan chan *input.Event + channelDone chan struct{} + runDone chan struct{} + runWg *sync.WaitGroup + states *file.States + wg *sync.WaitGroup + channelWg *sync.WaitGroup // Separate waitgroup for channels as not stopped on completion + id uint64 + Once bool + registry *harvesterRegistry + beatDone chan struct{} + eventCounter *sync.WaitGroup } type Prospectorer interface { @@ -47,17 +50,22 @@ type Outlet interface { OnEvent(event *input.Event) bool } -func NewProspector(cfg *common.Config, outlet Outlet) (*Prospector, error) { +func NewProspector(cfg *common.Config, outlet Outlet, beatDone chan struct{}) (*Prospector, error) { prospector := &Prospector{ cfg: cfg, config: defaultConfig, outlet: outlet, harvesterChan: make(chan *input.Event), - done: make(chan struct{}), - wg: sync.WaitGroup{}, + channelDone: make(chan struct{}), + wg: &sync.WaitGroup{}, + runDone: make(chan struct{}), + runWg: &sync.WaitGroup{}, states: &file.States{}, - channelWg: sync.WaitGroup{}, + channelWg: &sync.WaitGroup{}, Once: false, + registry: newHarvesterRegistry(), + beatDone: beatDone, + eventCounter: &sync.WaitGroup{}, } var err error @@ -112,25 +120,8 @@ func (p *Prospector) LoadStates(states []file.State) error { } func (p *Prospector) Start() { - logp.Info("Starting prospector of type: %v; id: %v ", p.config.InputType, p.ID()) - - if p.Once { - // If only run once, waiting for completion of prospector / harvesters - defer p.wg.Wait() - } - - // Add waitgroup to make sure prospectors finished p.wg.Add(1) - - go func() { - defer p.wg.Done() - p.Run() - }() - -} - -// Starts scanning through all the file paths and fetch the related files. Start a harvester for each file -func (p *Prospector) Run() { + logp.Info("Starting prospector of type: %v; id: %v ", p.config.InputType, p.ID()) // Open channel to receive events from harvester and forward them to spooler // Here potential filtering can happen @@ -139,18 +130,42 @@ func (p *Prospector) Run() { defer p.channelWg.Done() for { select { - case <-p.done: + case <-p.channelDone: + logp.Info("Prospector channel stopped") + return + case <-p.beatDone: logp.Info("Prospector channel stopped") return case event := <-p.harvesterChan: - err := p.updateState(event) - if err != nil { - return - } + // No stopping on error, because on error it is expected that beatDone is closed + // in the next run. If not, this will further drain the channel. + p.updateState(event) + p.eventCounter.Done() } } }() + if p.Once { + // Makes sure prospectors can complete first scan before stopped + defer p.runWg.Wait() + } + + // Add waitgroup to make sure prospectors finished + p.runWg.Add(1) + go func() { + defer func() { + p.runWg.Done() + p.stop() + }() + + p.Run() + }() + +} + +// Starts scanning through all the file paths and fetch the related files. Start a harvester for each file +func (p *Prospector) Run() { + // Initial prospector run p.prospectorer.Run() @@ -161,7 +176,7 @@ func (p *Prospector) Run() { for { select { - case <-p.done: + case <-p.runDone: logp.Info("Prospector ticker stopped") return case <-time.After(p.config.ScanFrequency): @@ -195,21 +210,71 @@ func (p *Prospector) updateState(event *input.Event) error { return nil } +// Stop stops the prospector and with it all harvesters +// +// The shutdown order is as follwoing +// - stop run and scanning +// - wait until last scan finishes to make sure no new harvesters are added +// - stop harvesters +// - wait until all harvester finished +// - stop communication channel +// - wait on internal waitgroup to make sure all prospector go routines are stopped +// - wait until all events are forwarded to the spooler func (p *Prospector) Stop() { + // Stop scanning and wait for completion + close(p.runDone) + p.wg.Wait() +} + +func (p *Prospector) stop() { + defer p.wg.Done() + logp.Info("Stopping Prospector: %v", p.ID()) - close(p.done) + + // In case of once, it will be waited until harvesters close itself + if p.Once { + p.registry.waitForCompletion() + } + + // Wait for finishing of the running prospectors + // This ensure no new harvesters are added. + p.runWg.Wait() + + // Stop all harvesters + // In case the beatDone channel is closed, this will not wait for completion + // Otherwise Stop will wait until output is complete + p.registry.Stop() + + // Waits on stopping all harvesters to make sure all events made it into the channel + p.waitEvents() +} + +// Wait for completion of sending events +func (p *Prospector) waitEvents() { + + done := make(chan struct{}) + go func() { + p.eventCounter.Wait() + close(done) + }() + + select { + case <-done: + close(p.channelDone) + case <-p.beatDone: + } + // Waits until channel go-routine properly stopped p.channelWg.Wait() - p.wg.Wait() } // createHarvester creates a new harvester instance from the given state func (p *Prospector) createHarvester(state file.State) (*harvester.Harvester, error) { + outlet := channel.NewOutlet(p.beatDone, p.harvesterChan, p.eventCounter) h, err := harvester.NewHarvester( p.cfg, state, - p.harvesterChan, - p.done, + outlet, ) return h, err @@ -219,7 +284,7 @@ func (p *Prospector) createHarvester(state file.State) (*harvester.Harvester, er // In case the HarvesterLimit is reached, an error is returned func (p *Prospector) startHarvester(state file.State, offset int64) error { - if p.config.HarvesterLimit > 0 && atomic.LoadUint64(&p.harvesterCounter) >= p.config.HarvesterLimit { + if p.config.HarvesterLimit > 0 && p.registry.len() >= p.config.HarvesterLimit { harvesterSkipped.Add(1) return fmt.Errorf("Harvester limit reached.") } @@ -246,19 +311,7 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error { return err } - p.wg.Add(1) - // startHarvester is not run concurrently, but atomic operations are need for the decrementing of the counter - // inside the following go routine - atomic.AddUint64(&p.harvesterCounter, 1) - go func() { - defer func() { - atomic.AddUint64(&p.harvesterCounter, ^uint64(0)) - p.wg.Done() - }() - - // Starts harvester and picks the right type. In case type is not set, set it to defeault (log) - h.Harvest(reader) - }() + p.registry.start(h, reader) return nil } diff --git a/filebeat/prospector/prospector_log.go b/filebeat/prospector/prospector_log.go index c255763f3a7..6e50778fde8 100644 --- a/filebeat/prospector/prospector_log.go +++ b/filebeat/prospector/prospector_log.go @@ -209,7 +209,7 @@ func (p *ProspectorLog) scan() { for path, info := range p.getFiles() { select { - case <-p.Prospector.done: + case <-p.Prospector.runDone: logp.Info("Scan aborted because prospector stopped.") return default: diff --git a/filebeat/prospector/registry.go b/filebeat/prospector/registry.go new file mode 100644 index 00000000000..86c8aa1161d --- /dev/null +++ b/filebeat/prospector/registry.go @@ -0,0 +1,70 @@ +package prospector + +import ( + "sync" + + "github.com/elastic/beats/filebeat/harvester" + "github.com/elastic/beats/filebeat/harvester/reader" + uuid "github.com/satori/go.uuid" +) + +type harvesterRegistry struct { + sync.Mutex + harvesters map[uuid.UUID]*harvester.Harvester + wg sync.WaitGroup +} + +func newHarvesterRegistry() *harvesterRegistry { + return &harvesterRegistry{ + harvesters: map[uuid.UUID]*harvester.Harvester{}, + } +} + +func (hr *harvesterRegistry) add(h *harvester.Harvester) { + hr.Lock() + defer hr.Unlock() + hr.harvesters[h.ID] = h +} + +func (hr *harvesterRegistry) remove(h *harvester.Harvester) { + hr.Lock() + defer hr.Unlock() + delete(hr.harvesters, h.ID) +} + +func (hr *harvesterRegistry) Stop() { + hr.Lock() + for _, hv := range hr.harvesters { + hr.wg.Add(1) + go func(h *harvester.Harvester) { + hr.wg.Done() + h.Stop() + }(hv) + } + hr.Unlock() + hr.waitForCompletion() +} + +func (hr *harvesterRegistry) waitForCompletion() { + hr.wg.Wait() +} + +func (hr *harvesterRegistry) start(h *harvester.Harvester, r reader.Reader) { + + hr.wg.Add(1) + hr.add(h) + go func() { + defer func() { + hr.remove(h) + hr.wg.Done() + }() + // Starts harvester and picks the right type. In case type is not set, set it to default (log) + h.Harvest(r) + }() +} + +func (hr *harvesterRegistry) len() uint64 { + hr.Lock() + defer hr.Unlock() + return uint64(len(hr.harvesters)) +} diff --git a/filebeat/tests/system/test_reload.py b/filebeat/tests/system/test_reload.py index 9e6717f3257..02eb211260f 100644 --- a/filebeat/tests/system/test_reload.py +++ b/filebeat/tests/system/test_reload.py @@ -139,3 +139,65 @@ def test_start_stop_replace(self): assert output[0]["message"] == first_line assert output[1]["message"] == second_line assert self.output_lines() == 2 + + def test_reload_same_prospector(self): + """ + Test reloading same prospector + """ + self.render_config_template( + reload=True, + reload_path=self.working_dir + "/configs/*.yml", + prospectors=False, + ) + + proc = self.start_beat() + + os.mkdir(self.working_dir + "/logs/") + logfile = self.working_dir + "/logs/test.log" + os.mkdir(self.working_dir + "/configs/") + first_line = "First log file" + second_line = "Second log file" + + config = prospectorConfigTemplate.format(self.working_dir + "/logs/test.log") + config = config + """ + close_eof: true +""" + with open(self.working_dir + "/configs/prospector.yml", 'w') as f: + f.write(config) + + with open(logfile, 'w') as f: + f.write(first_line + "\n") + + self.wait_until(lambda: self.output_lines() == 1) + + # Overwrite prospector with same path but new fields + with open(self.working_dir + "/configs/prospector.yml", 'w') as f: + config = config + """ + fields: + hello: world +""" + f.write(config) + + # Wait until prospector is stopped + self.wait_until( + lambda: self.log_contains("Runner stopped:"), + max_timeout=15) + + # Update both log files, only 1 change should be picke dup + with open(logfile, 'a') as f: + f.write(second_line + "\n") + + self.wait_until(lambda: self.output_lines() == 2) + + proc.check_kill_and_wait() + + output = self.read_output() + + # Make sure the correct lines were picked up + assert self.output_lines() == 2 + assert output[0]["message"] == first_line + # Check no fields exist + assert ("fields" in output[0]) == False + assert output[1]["message"] == second_line + # assert that fields are added + assert output[1]["fields.hello"] == "world"