diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0413976eb08..cd521c0c70f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -87,6 +87,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix `add_docker_metadata` source matching, using `log.file.path` field now. {pull}11577[11577] - Add missing Kubernetes metadata fields to Filebeat CoreDNS module, and fix a documentation error. {pull}11591[11591] - Reduce memory usage if long lines are truncated to fit `max_bytes` limit. The line buffer is copied into a smaller buffer now. This allows the runtime to release unused memory earlier. {pull}11524[11524] +- Fix memory leak in Filebeat pipeline acker. {pull}12063[12063] *Heartbeat* diff --git a/libbeat/publisher/pipeline/acker.go b/libbeat/publisher/pipeline/acker.go index 1d4155677ac..5689b86d445 100644 --- a/libbeat/publisher/pipeline/acker.go +++ b/libbeat/publisher/pipeline/acker.go @@ -139,6 +139,12 @@ func (a *gapCountACK) ackLoop() { case <-a.done: closing = true a.done = nil + if a.events.Load() == 0 { + // stop worker, if all events accounted for have been ACKed. + // If new events are added after this acker won't handle them, which may + // result in duplicates + return + } case <-a.pipeline.ackDone: return @@ -146,12 +152,13 @@ func (a *gapCountACK) ackLoop() { case n := <-acks: empty := a.handleACK(n) if empty && closing && a.events.Load() == 0 { - // stop worker, iff all events accounted for have been ACKed + // stop worker, if and only if all events accounted for have been ACKed return } case <-drop: // TODO: accumulate multiple drop events + flush count with timer + a.events.Sub(1) a.fn(1, 0) } }