diff --git a/filebeat/channel/util.go b/filebeat/channel/util.go index 8111bc2a011..134765c4cd8 100644 --- a/filebeat/channel/util.go +++ b/filebeat/channel/util.go @@ -18,17 +18,19 @@ package channel import ( + "sync" + "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/common/atomic" ) type subOutlet struct { - isOpen atomic.Bool - done chan struct{} - ch chan *util.Data - res chan bool + done chan struct{} + ch chan *util.Data + res chan bool + mutex sync.Mutex + closeOnce sync.Once } // ConnectTo creates a new Connector, combining a beat.Pipeline with an outlet Factory. @@ -42,10 +44,9 @@ func ConnectTo(pipeline beat.Pipeline, factory Factory) Connector { // underlying outlet. func SubOutlet(out Outleter) Outleter { s := &subOutlet{ - isOpen: atomic.MakeBool(true), - done: make(chan struct{}), - ch: make(chan *util.Data), - res: make(chan bool, 1), + done: make(chan struct{}), + ch: make(chan *util.Data), + res: make(chan bool, 1), } go func() { @@ -58,21 +59,30 @@ func SubOutlet(out Outleter) Outleter { } func (o *subOutlet) Close() error { - isOpen := o.isOpen.Swap(false) - if isOpen { + o.closeOnce.Do(func() { + // Signal OnEvent() to terminate close(o.done) - } + // This mutex prevents the event channel to be closed if OnEvent is + // still running. + o.mutex.Lock() + defer o.mutex.Unlock() + close(o.ch) + }) return nil } func (o *subOutlet) OnEvent(d *util.Data) bool { - if !o.isOpen.Load() { + + o.mutex.Lock() + defer o.mutex.Unlock() + select { + case <-o.done: return false + default: } select { case <-o.done: - close(o.ch) return false case o.ch <- d: @@ -92,8 +102,6 @@ func (o *subOutlet) OnEvent(d *util.Data) bool { // Once all messages are in the publisher pipeline, in correct order, // it depends on registrar/publisher pipeline if state is finally updated // in the registrar. - - close(o.ch) return true case ret := <-o.res: