diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b67330519cb3..616e8c85116a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -42,6 +42,7 @@ https://github.com/elastic/beats/compare/v7.0.0-rc1...master[Check the HEAD diff *Filebeat* - Don't apply multiline rules in Logstash json logs. {pull}11346[11346] +- Fix goroutine leak happening when harvesters are dynamically stopped. {pull}11263[11263] - Fix panic in add_kubernetes_metadata processor when key `log` does not exist. {issue}11543[11543] {pull}11549[11549] *Heartbeat* diff --git a/filebeat/channel/interface.go b/filebeat/channel/interface.go index 82e5a82af37b..877b818870a8 100644 --- a/filebeat/channel/interface.go +++ b/filebeat/channel/interface.go @@ -32,5 +32,6 @@ type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error) // Outleter is the outlet for an input type Outleter interface { Close() error + Done() <-chan struct{} OnEvent(data *util.Data) bool } diff --git a/filebeat/channel/outlet.go b/filebeat/channel/outlet.go index d130cf9ceeb9..c0fe2b0c9e3a 100644 --- a/filebeat/channel/outlet.go +++ b/filebeat/channel/outlet.go @@ -27,6 +27,7 @@ type outlet struct { wg eventCounter client beat.Client isOpen atomic.Bool + done chan struct{} } func newOutlet(client beat.Client, wg eventCounter) *outlet { @@ -34,6 +35,7 @@ func newOutlet(client beat.Client, wg eventCounter) *outlet { wg: wg, client: client, isOpen: atomic.MakeBool(true), + done: make(chan struct{}), } return o } @@ -41,11 +43,16 @@ func newOutlet(client beat.Client, wg eventCounter) *outlet { func (o *outlet) Close() error { isOpen := o.isOpen.Swap(false) if isOpen { + close(o.done) return o.client.Close() } return nil } +func (o *outlet) Done() <-chan struct{} { + return o.done +} + func (o *outlet) OnEvent(d *util.Data) bool { if !o.isOpen.Load() { return false diff --git a/filebeat/channel/util.go b/filebeat/channel/util.go index 134765c4cd8b..aec2132fa20a 100644 --- a/filebeat/channel/util.go +++ b/filebeat/channel/util.go @@ -71,6 +71,10 @@ func (o *subOutlet) Close() error { return nil } +func (o *subOutlet) Done() <-chan struct{} { + return o.done +} + func (o *subOutlet) OnEvent(d *util.Data) bool { o.mutex.Lock() @@ -114,8 +118,12 @@ func (o *subOutlet) OnEvent(d *util.Data) bool { func CloseOnSignal(outlet Outleter, sig <-chan struct{}) Outleter { if sig != nil { go func() { - <-sig - outlet.Close() + select { + case <-outlet.Done(): + return + case <-sig: + outlet.Close() + } }() } return outlet diff --git a/filebeat/input/log/input_other_test.go b/filebeat/input/log/input_other_test.go index 3a36cb6040d6..bdaba0c7d2ab 100644 --- a/filebeat/input/log/input_other_test.go +++ b/filebeat/input/log/input_other_test.go @@ -169,3 +169,4 @@ type TestOutlet struct{} func (o TestOutlet) OnEvent(event *util.Data) bool { return true } func (o TestOutlet) Close() error { return nil } +func (o TestOutlet) Done() <-chan struct{} { return nil }