diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 658b3bba01d3..229e70ed43ff 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -37,6 +37,8 @@ https://github.com/elastic/beats/compare/v6.7.2...6.x[Check the HEAD diff] *Filebeat* +- Fix memory leak in Filebeat pipeline acker. {pull}12063[12063] + *Heartbeat* *Journalbeat* diff --git a/libbeat/publisher/pipeline/acker.go b/libbeat/publisher/pipeline/acker.go index 1d4155677aca..5689b86d4458 100644 --- a/libbeat/publisher/pipeline/acker.go +++ b/libbeat/publisher/pipeline/acker.go @@ -139,6 +139,12 @@ func (a *gapCountACK) ackLoop() { case <-a.done: closing = true a.done = nil + if a.events.Load() == 0 { + // stop worker, if all events accounted for have been ACKed. + // If new events are added after this acker won't handle them, which may + // result in duplicates + return + } case <-a.pipeline.ackDone: return @@ -146,12 +152,13 @@ func (a *gapCountACK) ackLoop() { case n := <-acks: empty := a.handleACK(n) if empty && closing && a.events.Load() == 0 { - // stop worker, iff all events accounted for have been ACKed + // stop worker, if and only if all events accounted for have been ACKed return } case <-drop: // TODO: accumulate multiple drop events + flush count with timer + a.events.Sub(1) a.fn(1, 0) } }