Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow filebeat to only run once #2456

Merged
merged 2 commits into from
Sep 16, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
*Filebeat*
- Introduce close_timeout harvester options {issue}1926[1926]
- Strip BOM from first message in case of BOM files {issue}2351[2351]
- Add command line option -once to run filebeat only once and then close {pull}2456[2456]


- Add harvester_limit option {pull}2417[2417]
Expand Down
88 changes: 53 additions & 35 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package beater

import (
"flag"
"fmt"
"sync"

Expand All @@ -15,11 +16,12 @@ import (
"github.com/elastic/beats/filebeat/spooler"
)

var once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF")

// Filebeat is a beater object. Contains all objects needed to run the beat
type Filebeat struct {
config *cfg.Config
sigWait *signalWait
done chan struct{}
config *cfg.Config
done chan struct{}
}

// New creates a new Filebeat pointer instance.
Expand All @@ -33,9 +35,8 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
}

fb := &Filebeat{
done: make(chan struct{}),
sigWait: newSignalWait(),
config: &config,
done: make(chan struct{}),
config: &config,
}
return fb, nil
}
Expand All @@ -45,13 +46,12 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
var err error
config := fb.config

var wgEvents *sync.WaitGroup // count active events for waiting on shutdown
var finishedLogger publisher.SuccessLogger
waitFinished := newSignalWait()
waitEvents := newSignalWait()

if fb.config.ShutdownTimeout > 0 {
wgEvents = &sync.WaitGroup{}
finishedLogger = newFinishedLogger(wgEvents)
}
// count active events for waiting on shutdown
wgEvents := &sync.WaitGroup{}
finishedLogger := newFinishedLogger(wgEvents)

// Setup registrar to persist state
registrar, err := registrar.New(config.RegistryFile, finishedLogger)
Expand All @@ -60,13 +60,14 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

// Channel from harvesters to spooler
successLogger := newRegistrarLogger(registrar)
// Make sure all events that were published in
registrarChannel := newRegistrarLogger(registrar)

// Channel from spooler to harvester
publisherChan := newPublisherChannel()

// Publishes event to output
publisher := publisher.New(config.PublishAsync,
publisherChan.ch, successLogger, b.Publisher)
publisher := publisher.New(config.PublishAsync, publisherChan.ch, registrarChannel, b.Publisher)

// Init and Start spooler: Harvesters dump events into the spooler.
spooler, err := spooler.New(config, publisherChan)
Expand All @@ -75,9 +76,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

crawler, err := crawler.New(
newSpoolerOutlet(fb.done, spooler, wgEvents),
config.Prospectors)
crawler, err := crawler.New(newSpoolerOutlet(fb.done, spooler, wgEvents), config.Prospectors)
if err != nil {
logp.Err("Could not init crawler: %v", err)
return err
Expand All @@ -98,30 +97,45 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
// Start publisher
publisher.Start()
// Stopping publisher (might potentially drop items)
defer publisher.Stop()
defer successLogger.Close()
defer func() {
// Closes first the registrar logger to make sure not more events arrive at the registrar
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add comment: registrarChannel must be closed first to potentially unblock (pretty unlikely) the publisher.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

// registrarChannel must be closed first to potentially unblock (pretty unlikely) the publisher
registrarChannel.Close()
publisher.Stop()
}()

// Starting spooler
spooler.Start()

// Stopping spooler will flush items
defer func() {
// With harvesters being stopped, optionally wait for all enqueued events being
// published and written by registrar before continuing shutdown.
fb.sigWait.Wait()
// Wait for all events to be processed or timeout
waitEvents.Wait()

// continue shutdown
// Closes publisher so no further events can be sent
publisherChan.Close()
// Stopping spooler
spooler.Stop()
}()

err = crawler.Start(registrar.GetStates())
err = crawler.Start(registrar.GetStates(), *once)
if err != nil {
return err
}
// Blocks progressing. As soon as channel is closed, all defer statements come into play

<-fb.done
// If run once, add crawler completion check as alternative to done signal
if *once {
runOnce := func() {
logp.Info("Running filebeat once. Waiting for completion ...")
crawler.WaitForCompletion()
logp.Info("All data collection completed. Shutting down.")
}
waitFinished.Add(runOnce)
}

// Add done channel to wait for shutdown signal
waitFinished.AddChan(fb.done)
waitFinished.Wait()

// Stop crawler -> stop prospectors -> stop harvesters
// Note: waiting for crawlers to stop here in order to install wgEvents.Wait
Expand All @@ -130,14 +144,18 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
crawler.Stop()

timeout := fb.config.ShutdownTimeout
if timeout > 0 {
logp.Info("Shutdown output timer started. Waiting for max %v.", timeout)

// Wait for either timeout or all events having been ACKed by outputs.
fb.sigWait.Add(withLog(wgEvents.Wait,
// Checks if on shutdown it should wait for all events to be published
waitPublished := fb.config.ShutdownTimeout > 0 || *once
if waitPublished {
// Wait for registrar to finish writing registry
waitEvents.Add(withLog(wgEvents.Wait,
"Continue shutdown: All enqueued events being published."))
fb.sigWait.Add(withLog(waitDuration(timeout),
"Continue shutdown: Time out waiting for events being published."))
// Wait for either timeout or all events having been ACKed by outputs.
if fb.config.ShutdownTimeout > 0 {
logp.Info("Shutdown output timer started. Waiting for max %v.", timeout)
waitEvents.Add(withLog(waitDuration(timeout),
"Continue shutdown: Time out waiting for events being published."))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add:

} else {
    waitEvents.AddChan(fb.done)
}

This will ensure CTRL-C will be handled immediately/properly if run-once without shutdown_timeout is configured. If fb.done has been closed already, waitEvents.Wait will return immediately. If fb.done is closed while waiting for wgEvents.Wait, shutdown will continue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Added.

}

return nil
Expand Down
10 changes: 7 additions & 3 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func New(out prospector.Outlet, prospectorConfigs []*common.Config) (*Crawler, e
}, nil
}

func (c *Crawler) Start(states file.States) error {
func (c *Crawler) Start(states file.States, once bool) error {

logp.Info("Loading Prospectors: %v", len(c.prospectorConfigs))

Expand All @@ -54,7 +54,7 @@ func (c *Crawler) Start(states file.States) error {
logp.Debug("crawler", "Prospector %v stopped", id)
}()
logp.Debug("crawler", "Starting prospector %v", id)
prospector.Run()
prospector.Run(once)
}(i, p)
}

Expand All @@ -76,6 +76,10 @@ func (c *Crawler) Stop() {
c.wg.Add(1)
go stopProspector(p)
}
c.wg.Wait()
c.WaitForCompletion()
logp.Info("Crawler stopped")
}

func (c *Crawler) WaitForCompletion() {
c.wg.Wait()
}
24 changes: 20 additions & 4 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Prospector struct {
done chan struct{}
states *file.States
wg sync.WaitGroup
channelWg sync.WaitGroup // Separate waitgroup for channels as not stopped on completion
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the necessary for channelWg. why not reuse wg?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

channelWg is needed, because when only run once, the prospector does the wait() for the first run to be completed. When the prospector Run returns, channel are not shutdown yet and shouldn't because events are still processed. This only happens when prospector is closed with done.

I see that a similar pattern could apply here that we did with the other channels, but we can do this in a follow up PR.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I still not fully understand why this is required. But resource management just became somewhat more complex :(

What bothers me is adding many async/sync code-blocks(utils) trying to manage something adding more and more complexity.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, complexity currently increases which is not good. I still hope as soon as we have the feature working as we want that we can reduce the complexity again through refactoring.

harvesterCounter uint64
}

Expand All @@ -50,6 +51,7 @@ func NewProspector(cfg *common.Config, states file.States, outlet Outlet) (*Pros
done: make(chan struct{}),
states: states.Copy(),
wg: sync.WaitGroup{},
channelWg: sync.WaitGroup{},
}

if err := cfg.Unpack(&prospector.config); err != nil {
Expand Down Expand Up @@ -101,16 +103,22 @@ func (p *Prospector) Init() error {
}

// Starts scanning through all the file paths and fetch the related files. Start a harvester for each file
func (p *Prospector) Run() {
func (p *Prospector) Run(once bool) {

logp.Info("Starting prospector of type: %v", p.config.InputType)
p.wg.Add(2)
defer p.wg.Done()

// This waitgroup is not needed if run only once
// Waitgroup has to be added here to prevent panic in case Stop is called immediately afterwards
if !once {
// Add waitgroup to make sure prospectors finished
p.wg.Add(1)
defer p.wg.Done()
}
// Open channel to receive events from harvester and forward them to spooler
// Here potential filtering can happen
p.channelWg.Add(1)
go func() {
defer p.wg.Done()
defer p.channelWg.Done()
for {
select {
case <-p.done:
Expand All @@ -128,6 +136,13 @@ func (p *Prospector) Run() {
// Initial prospector run
p.prospectorer.Run()

// Shuts down after the first complete scan of all prospectors
// As all harvesters are part of the prospector waitgroup, this waits for the closing of all harvesters
if once {
p.wg.Wait()
return
}

for {
select {
case <-p.done:
Expand Down Expand Up @@ -162,6 +177,7 @@ func (p *Prospector) updateState(event *input.Event) error {
func (p *Prospector) Stop() {
logp.Info("Stopping Prospector")
close(p.done)
p.channelWg.Wait()
p.wg.Wait()
}

Expand Down
31 changes: 19 additions & 12 deletions filebeat/spooler/spooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ const channelSize = 16
type Spooler struct {
Channel chan *input.Event // Channel is the input to the Spooler.
config spoolerConfig
exit chan struct{} // Channel used to signal shutdown.
output Output // batch event output on flush
spool []*input.Event // Events being held by the Spooler.
wg sync.WaitGroup // WaitGroup used to control the shutdown.
}

// Output spooler sends event to through Send method
type Output interface {
Send(events []*input.Event) bool
}
Expand All @@ -45,7 +45,6 @@ func New(
idleTimeout: config.IdleTimeout,
spoolSize: config.SpoolSize,
},
exit: make(chan struct{}),
output: out,
spool: make([]*input.Event, 0, config.SpoolSize),
}, nil
Expand All @@ -63,6 +62,7 @@ func (s *Spooler) run() {
logp.Info("Starting spooler: spool_size: %v; idle_timeout: %s",
s.config.spoolSize, s.config.idleTimeout)

defer s.flush()
defer s.wg.Done()

timer := time.NewTimer(s.config.idleTimeout)
Expand Down Expand Up @@ -97,6 +97,7 @@ func (s *Spooler) run() {
// flushed to the publisher. The method should only be invoked one time after
// Start has been invoked.
func (s *Spooler) Stop() {

logp.Info("Stopping spooler")

// Signal to the run method that it should stop.
Expand All @@ -123,16 +124,22 @@ func (s *Spooler) queue(event *input.Event) bool {
}

// flush flushes all events to the publisher.
func (s *Spooler) flush() {
if len(s.spool) > 0 {
// copy buffer
tmpCopy := make([]*input.Event, len(s.spool))
copy(tmpCopy, s.spool)

// clear buffer
s.spool = s.spool[:0]
func (s *Spooler) flush() int {

// send batched events to output
s.output.Send(tmpCopy)
count := len(s.spool)
if count == 0 {
return 0
}

// copy buffer
tmpCopy := make([]*input.Event, count)
copy(tmpCopy, s.spool)

// clear buffer
s.spool = s.spool[:0]

// send batched events to output
s.output.Send(tmpCopy)

return count
}
Loading