diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index b33b6cf9efe..878b830a7fc 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -133,7 +133,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { return err } - crawler, err := crawler.New(newSpoolerOutlet(fb.done, spooler, wgEvents), config.Prospectors, *once) + crawler, err := crawler.New(newSpoolerOutlet(fb.done, spooler, wgEvents), config.Prospectors, fb.done, *once) if err != nil { logp.Err("Could not init crawler: %v", err) return err diff --git a/filebeat/channel/outlet.go b/filebeat/channel/outlet.go new file mode 100644 index 00000000000..4695b302899 --- /dev/null +++ b/filebeat/channel/outlet.go @@ -0,0 +1,90 @@ +package channel + +import ( + "sync" + "sync/atomic" + + "github.com/elastic/beats/filebeat/input" +) + +// Outlet struct is used to be passed to an object which needs an outlet +// +// The difference between signal and done channel is as following: +// - signal channel can be added through SetSignal and is used to +// interrupt events sent through OnEventSignal- +// - done channel is used to close and stop the outlet +// +// If SetSignal is used, it must be ensure that there is only one event producer. +type Outlet struct { + wg *sync.WaitGroup // Use for counting active events + done <-chan struct{} + signal <-chan struct{} + channel chan *input.Event + isOpen int32 // atomic indicator +} + +func NewOutlet( + done <-chan struct{}, + c chan *input.Event, + wg *sync.WaitGroup, +) *Outlet { + return &Outlet{ + done: done, + channel: c, + wg: wg, + isOpen: 1, + } +} + +// SetSignal sets the signal channel for OnEventSignal +// If SetSignal is used, it must be ensure that only one producer exists. +func (o *Outlet) SetSignal(signal <-chan struct{}) { + o.signal = signal +} + +func (o *Outlet) OnEvent(event *input.Event) bool { + open := atomic.LoadInt32(&o.isOpen) == 1 + if !open { + return false + } + + if o.wg != nil { + o.wg.Add(1) + } + + select { + case <-o.done: + if o.wg != nil { + o.wg.Done() + } + atomic.StoreInt32(&o.isOpen, 0) + return false + case o.channel <- event: + return true + } +} + +// OnEventSignal can be stopped by the signal that is set with SetSignal +// This does not close the outlet. Only OnEvent does close the outlet. +// If OnEventSignal is used, it must be ensured that only one producer is used. +func (o *Outlet) OnEventSignal(event *input.Event) bool { + open := atomic.LoadInt32(&o.isOpen) == 1 + if !open { + return false + } + + if o.wg != nil { + o.wg.Add(1) + } + + select { + case <-o.signal: + if o.wg != nil { + o.wg.Done() + } + o.signal = nil + return false + case o.channel <- event: + return true + } +} diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index cf56423db4f..9baf86f8084 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -19,15 +19,17 @@ type Crawler struct { wg sync.WaitGroup reloader *cfgfile.Reloader once bool + beatDone chan struct{} } -func New(out prospector.Outlet, prospectorConfigs []*common.Config, once bool) (*Crawler, error) { +func New(out prospector.Outlet, prospectorConfigs []*common.Config, beatDone chan struct{}, once bool) (*Crawler, error) { return &Crawler{ out: out, prospectors: map[uint64]*prospector.Prospector{}, prospectorConfigs: prospectorConfigs, once: once, + beatDone: beatDone, }, nil } @@ -47,7 +49,7 @@ func (c *Crawler) Start(r *registrar.Registrar, reloaderConfig *common.Config) e logp.Warn("BETA feature dynamic configuration reloading is enabled.") c.reloader = cfgfile.NewReloader(reloaderConfig) - factory := prospector.NewFactory(c.out, r) + factory := prospector.NewFactory(c.out, r, c.beatDone) go func() { c.reloader.Run(factory) }() @@ -62,7 +64,7 @@ func (c *Crawler) startProspector(config *common.Config, states []file.State) er if !config.Enabled() { return nil } - p, err := prospector.NewProspector(config, c.out) + p, err := prospector.NewProspector(config, c.out, c.beatDone) if err != nil { return fmt.Errorf("Error in initing prospector: %s", err) } diff --git a/filebeat/harvester/harvester.go b/filebeat/harvester/harvester.go index 3e9755f1662..731eb17ace0 100644 --- a/filebeat/harvester/harvester.go +++ b/filebeat/harvester/harvester.go @@ -16,6 +16,9 @@ import ( "fmt" "sync" + "github.com/satori/go.uuid" + + "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/encoding" "github.com/elastic/beats/filebeat/harvester/source" @@ -40,24 +43,26 @@ type Harvester struct { fileReader *LogFile encodingFactory encoding.EncodingFactory encoding encoding.Encoding - prospectorDone chan struct{} - once sync.Once done chan struct{} + stopOnce sync.Once + stopWg *sync.WaitGroup + outlet *channel.Outlet + ID uuid.UUID } func NewHarvester( cfg *common.Config, state file.State, - prospectorChan chan *input.Event, - done chan struct{}, + outlet *channel.Outlet, ) (*Harvester, error) { h := &Harvester{ - config: defaultConfig, - state: state, - prospectorChan: prospectorChan, - prospectorDone: done, - done: make(chan struct{}), + config: defaultConfig, + state: state, + done: make(chan struct{}), + stopWg: &sync.WaitGroup{}, + outlet: outlet, + ID: uuid.NewV4(), } if err := cfg.Unpack(&h.config); err != nil { @@ -70,6 +75,9 @@ func NewHarvester( } h.encodingFactory = encodingFactory + // Add outlet signal so harvester can also stop itself + h.outlet.SetSignal(h.done) + return h, nil } diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index 95027045d5e..af35b2077ad 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -52,13 +52,19 @@ func (h *Harvester) Harvest(r reader.Reader) { harvesterStarted.Add(1) harvesterRunning.Add(1) - defer harvesterRunning.Add(-1) - // Makes sure file is properly closed when the harvester is stopped - defer h.close() + h.stopWg.Add(1) + defer func() { + // Channel to stop internal harvester routines + h.stop() + // Makes sure file is properly closed when the harvester is stopped + h.close() + + harvesterRunning.Add(-1) - // Channel to stop internal harvester routines - defer h.stop() + // Marks harvester stopping completed + h.stopWg.Done() + }() // Closes reader after timeout or when done channel is closed // This routine is also responsible to properly stop the reader @@ -74,8 +80,6 @@ func (h *Harvester) Harvest(r reader.Reader) { // Applies when timeout is reached case <-closeTimeout: logp.Info("Closing harvester because close_timeout was reached.") - // Required for shutdown when hanging inside reader - case <-h.prospectorDone: // Required when reader loop returns and reader finished case <-h.done: } @@ -156,22 +160,23 @@ func (h *Harvester) Harvest(r reader.Reader) { } } +// stop is intended for internal use and closed the done channel to stop execution func (h *Harvester) stop() { - h.once.Do(func() { + h.stopOnce.Do(func() { close(h.done) }) } +// Stop stops harvester and waits for completion +func (h *Harvester) Stop() { + h.stop() + h.stopWg.Wait() +} + // sendEvent sends event to the spooler channel // Return false if event was not sent func (h *Harvester) sendEvent(event *input.Event) bool { - - select { - case <-h.done: - return false - case h.prospectorChan <- event: // ship the new event downstream - return true - } + return h.outlet.OnEventSignal(event) } // sendStateUpdate send an empty event with the current state to update the registry @@ -182,11 +187,7 @@ func (h *Harvester) sendEvent(event *input.Event) bool { func (h *Harvester) sendStateUpdate() { logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset) event := input.NewEvent(h.state) - - select { - case <-h.prospectorDone: - case h.prospectorChan <- event: // ship the new event downstream - } + h.outlet.OnEvent(event) } // shouldExportLine decides if the line is exported or not based on diff --git a/filebeat/prospector/factory.go b/filebeat/prospector/factory.go index e04e6213fd6..df4d3e80e97 100644 --- a/filebeat/prospector/factory.go +++ b/filebeat/prospector/factory.go @@ -10,18 +10,20 @@ import ( type Factory struct { outlet Outlet registrar *registrar.Registrar + beatDone chan struct{} } -func NewFactory(outlet Outlet, registrar *registrar.Registrar) *Factory { +func NewFactory(outlet Outlet, registrar *registrar.Registrar, beatDone chan struct{}) *Factory { return &Factory{ outlet: outlet, registrar: registrar, + beatDone: beatDone, } } func (r *Factory) Create(c *common.Config) (cfgfile.Runner, error) { - p, err := NewProspector(c, r.outlet) + p, err := NewProspector(c, r.outlet, r.beatDone) if err != nil { logp.Err("Error creating prospector: %s", err) return nil, err diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index 31f48684596..eaa08c50aa1 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -5,11 +5,11 @@ import ( "expvar" "fmt" "sync" - "sync/atomic" "time" "github.com/mitchellh/hashstructure" + "github.com/elastic/beats/filebeat/channel" cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/input" @@ -23,19 +23,22 @@ var ( ) type Prospector struct { - // harvesterCounter MUST be first field in struct. See https://github.com/golang/go/issues/599 - harvesterCounter uint64 - cfg *common.Config // Raw config - config prospectorConfig - prospectorer Prospectorer - outlet Outlet - harvesterChan chan *input.Event - done chan struct{} - states *file.States - wg sync.WaitGroup - channelWg sync.WaitGroup // Separate waitgroup for channels as not stopped on completion - id uint64 - Once bool + cfg *common.Config // Raw config + config prospectorConfig + prospectorer Prospectorer + outlet Outlet + harvesterChan chan *input.Event + channelDone chan struct{} + runDone chan struct{} + runWg *sync.WaitGroup + states *file.States + wg *sync.WaitGroup + channelWg *sync.WaitGroup // Separate waitgroup for channels as not stopped on completion + id uint64 + Once bool + registry *harvesterRegistry + beatDone chan struct{} + eventCounter *sync.WaitGroup } type Prospectorer interface { @@ -47,17 +50,22 @@ type Outlet interface { OnEvent(event *input.Event) bool } -func NewProspector(cfg *common.Config, outlet Outlet) (*Prospector, error) { +func NewProspector(cfg *common.Config, outlet Outlet, beatDone chan struct{}) (*Prospector, error) { prospector := &Prospector{ cfg: cfg, config: defaultConfig, outlet: outlet, harvesterChan: make(chan *input.Event), - done: make(chan struct{}), - wg: sync.WaitGroup{}, + channelDone: make(chan struct{}), + wg: &sync.WaitGroup{}, + runDone: make(chan struct{}), + runWg: &sync.WaitGroup{}, states: &file.States{}, - channelWg: sync.WaitGroup{}, + channelWg: &sync.WaitGroup{}, Once: false, + registry: newHarvesterRegistry(), + beatDone: beatDone, + eventCounter: &sync.WaitGroup{}, } var err error @@ -112,25 +120,8 @@ func (p *Prospector) LoadStates(states []file.State) error { } func (p *Prospector) Start() { - logp.Info("Starting prospector of type: %v; id: %v ", p.config.InputType, p.ID()) - - if p.Once { - // If only run once, waiting for completion of prospector / harvesters - defer p.wg.Wait() - } - - // Add waitgroup to make sure prospectors finished p.wg.Add(1) - - go func() { - defer p.wg.Done() - p.Run() - }() - -} - -// Starts scanning through all the file paths and fetch the related files. Start a harvester for each file -func (p *Prospector) Run() { + logp.Info("Starting prospector of type: %v; id: %v ", p.config.InputType, p.ID()) // Open channel to receive events from harvester and forward them to spooler // Here potential filtering can happen @@ -139,18 +130,42 @@ func (p *Prospector) Run() { defer p.channelWg.Done() for { select { - case <-p.done: + case <-p.channelDone: + logp.Info("Prospector channel stopped") + return + case <-p.beatDone: logp.Info("Prospector channel stopped") return case event := <-p.harvesterChan: - err := p.updateState(event) - if err != nil { - return - } + // No stopping on error, because on error it is expected that beatDone is closed + // in the next run. If not, this will further drain the channel. + p.updateState(event) + p.eventCounter.Done() } } }() + if p.Once { + // Makes sure prospectors can complete first scan before stopped + defer p.runWg.Wait() + } + + // Add waitgroup to make sure prospectors finished + p.runWg.Add(1) + go func() { + defer func() { + p.runWg.Done() + p.stop() + }() + + p.Run() + }() + +} + +// Starts scanning through all the file paths and fetch the related files. Start a harvester for each file +func (p *Prospector) Run() { + // Initial prospector run p.prospectorer.Run() @@ -161,7 +176,7 @@ func (p *Prospector) Run() { for { select { - case <-p.done: + case <-p.runDone: logp.Info("Prospector ticker stopped") return case <-time.After(p.config.ScanFrequency): @@ -195,21 +210,71 @@ func (p *Prospector) updateState(event *input.Event) error { return nil } +// Stop stops the prospector and with it all harvesters +// +// The shutdown order is as follwoing +// - stop run and scanning +// - wait until last scan finishes to make sure no new harvesters are added +// - stop harvesters +// - wait until all harvester finished +// - stop communication channel +// - wait on internal waitgroup to make sure all prospector go routines are stopped +// - wait until all events are forwarded to the spooler func (p *Prospector) Stop() { + // Stop scanning and wait for completion + close(p.runDone) + p.wg.Wait() +} + +func (p *Prospector) stop() { + defer p.wg.Done() + logp.Info("Stopping Prospector: %v", p.ID()) - close(p.done) + + // In case of once, it will be waited until harvesters close itself + if p.Once { + p.registry.waitForCompletion() + } + + // Wait for finishing of the running prospectors + // This ensure no new harvesters are added. + p.runWg.Wait() + + // Stop all harvesters + // In case the beatDone channel is closed, this will not wait for completion + // Otherwise Stop will wait until output is complete + p.registry.Stop() + + // Waits on stopping all harvesters to make sure all events made it into the channel + p.waitEvents() +} + +// Wait for completion of sending events +func (p *Prospector) waitEvents() { + + done := make(chan struct{}) + go func() { + p.eventCounter.Wait() + close(done) + }() + + select { + case <-done: + close(p.channelDone) + case <-p.beatDone: + } + // Waits until channel go-routine properly stopped p.channelWg.Wait() - p.wg.Wait() } // createHarvester creates a new harvester instance from the given state func (p *Prospector) createHarvester(state file.State) (*harvester.Harvester, error) { + outlet := channel.NewOutlet(p.beatDone, p.harvesterChan, p.eventCounter) h, err := harvester.NewHarvester( p.cfg, state, - p.harvesterChan, - p.done, + outlet, ) return h, err @@ -219,7 +284,7 @@ func (p *Prospector) createHarvester(state file.State) (*harvester.Harvester, er // In case the HarvesterLimit is reached, an error is returned func (p *Prospector) startHarvester(state file.State, offset int64) error { - if p.config.HarvesterLimit > 0 && atomic.LoadUint64(&p.harvesterCounter) >= p.config.HarvesterLimit { + if p.config.HarvesterLimit > 0 && p.registry.len() >= p.config.HarvesterLimit { harvesterSkipped.Add(1) return fmt.Errorf("Harvester limit reached.") } @@ -246,19 +311,7 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error { return err } - p.wg.Add(1) - // startHarvester is not run concurrently, but atomic operations are need for the decrementing of the counter - // inside the following go routine - atomic.AddUint64(&p.harvesterCounter, 1) - go func() { - defer func() { - atomic.AddUint64(&p.harvesterCounter, ^uint64(0)) - p.wg.Done() - }() - - // Starts harvester and picks the right type. In case type is not set, set it to defeault (log) - h.Harvest(reader) - }() + p.registry.start(h, reader) return nil } diff --git a/filebeat/prospector/prospector_log.go b/filebeat/prospector/prospector_log.go index c255763f3a7..6e50778fde8 100644 --- a/filebeat/prospector/prospector_log.go +++ b/filebeat/prospector/prospector_log.go @@ -209,7 +209,7 @@ func (p *ProspectorLog) scan() { for path, info := range p.getFiles() { select { - case <-p.Prospector.done: + case <-p.Prospector.runDone: logp.Info("Scan aborted because prospector stopped.") return default: diff --git a/filebeat/prospector/registry.go b/filebeat/prospector/registry.go new file mode 100644 index 00000000000..86c8aa1161d --- /dev/null +++ b/filebeat/prospector/registry.go @@ -0,0 +1,70 @@ +package prospector + +import ( + "sync" + + "github.com/elastic/beats/filebeat/harvester" + "github.com/elastic/beats/filebeat/harvester/reader" + uuid "github.com/satori/go.uuid" +) + +type harvesterRegistry struct { + sync.Mutex + harvesters map[uuid.UUID]*harvester.Harvester + wg sync.WaitGroup +} + +func newHarvesterRegistry() *harvesterRegistry { + return &harvesterRegistry{ + harvesters: map[uuid.UUID]*harvester.Harvester{}, + } +} + +func (hr *harvesterRegistry) add(h *harvester.Harvester) { + hr.Lock() + defer hr.Unlock() + hr.harvesters[h.ID] = h +} + +func (hr *harvesterRegistry) remove(h *harvester.Harvester) { + hr.Lock() + defer hr.Unlock() + delete(hr.harvesters, h.ID) +} + +func (hr *harvesterRegistry) Stop() { + hr.Lock() + for _, hv := range hr.harvesters { + hr.wg.Add(1) + go func(h *harvester.Harvester) { + hr.wg.Done() + h.Stop() + }(hv) + } + hr.Unlock() + hr.waitForCompletion() +} + +func (hr *harvesterRegistry) waitForCompletion() { + hr.wg.Wait() +} + +func (hr *harvesterRegistry) start(h *harvester.Harvester, r reader.Reader) { + + hr.wg.Add(1) + hr.add(h) + go func() { + defer func() { + hr.remove(h) + hr.wg.Done() + }() + // Starts harvester and picks the right type. In case type is not set, set it to default (log) + h.Harvest(r) + }() +} + +func (hr *harvesterRegistry) len() uint64 { + hr.Lock() + defer hr.Unlock() + return uint64(len(hr.harvesters)) +} diff --git a/filebeat/tests/system/test_reload.py b/filebeat/tests/system/test_reload.py index 9e6717f3257..02eb211260f 100644 --- a/filebeat/tests/system/test_reload.py +++ b/filebeat/tests/system/test_reload.py @@ -139,3 +139,65 @@ def test_start_stop_replace(self): assert output[0]["message"] == first_line assert output[1]["message"] == second_line assert self.output_lines() == 2 + + def test_reload_same_prospector(self): + """ + Test reloading same prospector + """ + self.render_config_template( + reload=True, + reload_path=self.working_dir + "/configs/*.yml", + prospectors=False, + ) + + proc = self.start_beat() + + os.mkdir(self.working_dir + "/logs/") + logfile = self.working_dir + "/logs/test.log" + os.mkdir(self.working_dir + "/configs/") + first_line = "First log file" + second_line = "Second log file" + + config = prospectorConfigTemplate.format(self.working_dir + "/logs/test.log") + config = config + """ + close_eof: true +""" + with open(self.working_dir + "/configs/prospector.yml", 'w') as f: + f.write(config) + + with open(logfile, 'w') as f: + f.write(first_line + "\n") + + self.wait_until(lambda: self.output_lines() == 1) + + # Overwrite prospector with same path but new fields + with open(self.working_dir + "/configs/prospector.yml", 'w') as f: + config = config + """ + fields: + hello: world +""" + f.write(config) + + # Wait until prospector is stopped + self.wait_until( + lambda: self.log_contains("Runner stopped:"), + max_timeout=15) + + # Update both log files, only 1 change should be picke dup + with open(logfile, 'a') as f: + f.write(second_line + "\n") + + self.wait_until(lambda: self.output_lines() == 2) + + proc.check_kill_and_wait() + + output = self.read_output() + + # Make sure the correct lines were picked up + assert self.output_lines() == 2 + assert output[0]["message"] == first_line + # Check no fields exist + assert ("fields" in output[0]) == False + assert output[1]["message"] == second_line + # assert that fields are added + assert output[1]["fields.hello"] == "world"