Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix harvester shutdown for prospector reloading #3563

Merged
merged 2 commits into from
Feb 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

crawler, err := crawler.New(newSpoolerOutlet(fb.done, spooler, wgEvents), config.Prospectors, *once)
crawler, err := crawler.New(newSpoolerOutlet(fb.done, spooler, wgEvents), config.Prospectors, fb.done, *once)
if err != nil {
logp.Err("Could not init crawler: %v", err)
return err
Expand Down
90 changes: 90 additions & 0 deletions filebeat/channel/outlet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package channel

import (
"sync"
"sync/atomic"

"github.com/elastic/beats/filebeat/input"
)

// Outlet struct is used to be passed to an object which needs an outlet
//
// The difference between signal and done channel is as following:
// - signal channel can be added through SetSignal and is used to
// interrupt events sent through OnEventSignal-
// - done channel is used to close and stop the outlet
//
// If SetSignal is used, it must be ensure that there is only one event producer.
type Outlet struct {
wg *sync.WaitGroup // Use for counting active events
done <-chan struct{}
signal <-chan struct{}
channel chan *input.Event
isOpen int32 // atomic indicator
}

func NewOutlet(
done <-chan struct{},
c chan *input.Event,
wg *sync.WaitGroup,
) *Outlet {
return &Outlet{
done: done,
channel: c,
wg: wg,
isOpen: 1,
}
}

// SetSignal sets the signal channel for OnEventSignal
// If SetSignal is used, it must be ensure that only one producer exists.
func (o *Outlet) SetSignal(signal <-chan struct{}) {
o.signal = signal
}

func (o *Outlet) OnEvent(event *input.Event) bool {
open := atomic.LoadInt32(&o.isOpen) == 1
if !open {
return false
}

if o.wg != nil {
o.wg.Add(1)
}

select {
case <-o.done:
if o.wg != nil {
o.wg.Done()
}
atomic.StoreInt32(&o.isOpen, 0)
return false
case o.channel <- event:
return true
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if outlet is used by multiple producers this can race I think. If it's used by one single producer only, there is no need for the atomics. Also, setting a channel to nil has a similar effect as a nil channel blocks forever and a closed channel always receives. e.g.:

type Outlet struct {
  stateUpdate, event cancelChannel
}

type cancelChannel struct {
  done chan struct{}
  ch chan *input.Event
}

func (ch *cancelChannel) OnEvent(evt *input.Event) bool {
  select {
  case <-ch.done:
    ch.ch = nil
    return false
  case ch.ch <- event:
    return true
  }    
}

func (o *Outlet) OnEvent(event *input.Event) bool {
  return o.event(event)
}

func (o *Outlet) OnStateUpdate(event *input.Event) bool {
  return o.stateUpdate.event(event)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to keep the current implementation to make it possible to use the same Outlet also for prospector to spooler in the future. So we only have one implementation which supports both use cases.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's time to get rid of the spooler...

}

// OnEventSignal can be stopped by the signal that is set with SetSignal
// This does not close the outlet. Only OnEvent does close the outlet.
// If OnEventSignal is used, it must be ensured that only one producer is used.
func (o *Outlet) OnEventSignal(event *input.Event) bool {
open := atomic.LoadInt32(&o.isOpen) == 1
if !open {
return false
}

if o.wg != nil {
o.wg.Add(1)
}

select {
case <-o.signal:
if o.wg != nil {
o.wg.Done()
}
o.signal = nil
return false
case o.channel <- event:
return true
}
}
8 changes: 5 additions & 3 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ type Crawler struct {
wg sync.WaitGroup
reloader *cfgfile.Reloader
once bool
beatDone chan struct{}
}

func New(out prospector.Outlet, prospectorConfigs []*common.Config, once bool) (*Crawler, error) {
func New(out prospector.Outlet, prospectorConfigs []*common.Config, beatDone chan struct{}, once bool) (*Crawler, error) {

return &Crawler{
out: out,
prospectors: map[uint64]*prospector.Prospector{},
prospectorConfigs: prospectorConfigs,
once: once,
beatDone: beatDone,
}, nil
}

Expand All @@ -47,7 +49,7 @@ func (c *Crawler) Start(r *registrar.Registrar, reloaderConfig *common.Config) e
logp.Warn("BETA feature dynamic configuration reloading is enabled.")

c.reloader = cfgfile.NewReloader(reloaderConfig)
factory := prospector.NewFactory(c.out, r)
factory := prospector.NewFactory(c.out, r, c.beatDone)
go func() {
c.reloader.Run(factory)
}()
Expand All @@ -62,7 +64,7 @@ func (c *Crawler) startProspector(config *common.Config, states []file.State) er
if !config.Enabled() {
return nil
}
p, err := prospector.NewProspector(config, c.out)
p, err := prospector.NewProspector(config, c.out, c.beatDone)
if err != nil {
return fmt.Errorf("Error in initing prospector: %s", err)
}
Expand Down
26 changes: 17 additions & 9 deletions filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
"fmt"
"sync"

"github.com/satori/go.uuid"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
"github.com/elastic/beats/filebeat/harvester/source"
Expand All @@ -40,24 +43,26 @@ type Harvester struct {
fileReader *LogFile
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
prospectorDone chan struct{}
once sync.Once
done chan struct{}
stopOnce sync.Once
stopWg *sync.WaitGroup
outlet *channel.Outlet
ID uuid.UUID
}

func NewHarvester(
cfg *common.Config,
state file.State,
prospectorChan chan *input.Event,
done chan struct{},
outlet *channel.Outlet,
) (*Harvester, error) {

h := &Harvester{
config: defaultConfig,
state: state,
prospectorChan: prospectorChan,
prospectorDone: done,
done: make(chan struct{}),
config: defaultConfig,
state: state,
done: make(chan struct{}),
stopWg: &sync.WaitGroup{},
outlet: outlet,
ID: uuid.NewV4(),
}

if err := cfg.Unpack(&h.config); err != nil {
Expand All @@ -70,6 +75,9 @@ func NewHarvester(
}
h.encodingFactory = encodingFactory

// Add outlet signal so harvester can also stop itself
h.outlet.SetSignal(h.done)

return h, nil
}

Expand Down
41 changes: 21 additions & 20 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,19 @@ func (h *Harvester) Harvest(r reader.Reader) {

harvesterStarted.Add(1)
harvesterRunning.Add(1)
defer harvesterRunning.Add(-1)

// Makes sure file is properly closed when the harvester is stopped
defer h.close()
h.stopWg.Add(1)
defer func() {
// Channel to stop internal harvester routines
h.stop()
// Makes sure file is properly closed when the harvester is stopped
h.close()

harvesterRunning.Add(-1)

// Channel to stop internal harvester routines
defer h.stop()
// Marks harvester stopping completed
h.stopWg.Done()
}()

// Closes reader after timeout or when done channel is closed
// This routine is also responsible to properly stop the reader
Expand All @@ -74,8 +80,6 @@ func (h *Harvester) Harvest(r reader.Reader) {
// Applies when timeout is reached
case <-closeTimeout:
logp.Info("Closing harvester because close_timeout was reached.")
// Required for shutdown when hanging inside reader
case <-h.prospectorDone:
// Required when reader loop returns and reader finished
case <-h.done:
}
Expand Down Expand Up @@ -156,22 +160,23 @@ func (h *Harvester) Harvest(r reader.Reader) {
}
}

// stop is intended for internal use and closed the done channel to stop execution
func (h *Harvester) stop() {
h.once.Do(func() {
h.stopOnce.Do(func() {
close(h.done)
})
}

// Stop stops harvester and waits for completion
func (h *Harvester) Stop() {
h.stop()
h.stopWg.Wait()
}

// sendEvent sends event to the spooler channel
// Return false if event was not sent
func (h *Harvester) sendEvent(event *input.Event) bool {

select {
case <-h.done:
return false
case h.prospectorChan <- event: // ship the new event downstream
return true
}
return h.outlet.OnEventSignal(event)
}

// sendStateUpdate send an empty event with the current state to update the registry
Expand All @@ -182,11 +187,7 @@ func (h *Harvester) sendEvent(event *input.Event) bool {
func (h *Harvester) sendStateUpdate() {
logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset)
event := input.NewEvent(h.state)

select {
case <-h.prospectorDone:
case h.prospectorChan <- event: // ship the new event downstream
}
h.outlet.OnEvent(event)
}

// shouldExportLine decides if the line is exported or not based on
Expand Down
6 changes: 4 additions & 2 deletions filebeat/prospector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@ import (
type Factory struct {
outlet Outlet
registrar *registrar.Registrar
beatDone chan struct{}
}

func NewFactory(outlet Outlet, registrar *registrar.Registrar) *Factory {
func NewFactory(outlet Outlet, registrar *registrar.Registrar, beatDone chan struct{}) *Factory {
return &Factory{
outlet: outlet,
registrar: registrar,
beatDone: beatDone,
}
}

func (r *Factory) Create(c *common.Config) (cfgfile.Runner, error) {

p, err := NewProspector(c, r.outlet)
p, err := NewProspector(c, r.outlet, r.beatDone)
if err != nil {
logp.Err("Error creating prospector: %s", err)
return nil, err
Expand Down
Loading