From 0c942e7c5adedd9b6876ad3940a515b29b7eb0b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Sun, 5 May 2019 22:43:56 +0200 Subject: [PATCH 1/5] Fix memory leak in Filebeat pipeline acker Before this change acker goroutine was kept forever as processed events count was not correctly updated. Filebeat sends an empty event to update file states, this event is not published, but treated as dropped, without updating counters. This change makes sures that `a.events` count gets updated for dropped events also, so the acker gets closed after all ACKs happen. --- libbeat/publisher/pipeline/acker.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/libbeat/publisher/pipeline/acker.go b/libbeat/publisher/pipeline/acker.go index 1d4155677aca..979049be99ce 100644 --- a/libbeat/publisher/pipeline/acker.go +++ b/libbeat/publisher/pipeline/acker.go @@ -133,25 +133,31 @@ func (a *gapCountACK) ackLoop() { acks, drop := a.acks, a.drop closing := false + empty := false for { select { case <-a.done: closing = true a.done = nil + if a.events.Load() == 0 { + // stop worker, if all events accounted for have been ACKed + return + } case <-a.pipeline.ackDone: return case n := <-acks: - empty := a.handleACK(n) + empty = a.handleACK(n) if empty && closing && a.events.Load() == 0 { - // stop worker, iff all events accounted for have been ACKed + // stop worker, if all events accounted for have been ACKed return } case <-drop: // TODO: accumulate multiple drop events + flush count with timer + a.events.Sub(1) a.fn(1, 0) } } From 290c4a55303f56456fec8a51cd79996c6bfe54e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Mon, 6 May 2019 13:34:08 +0200 Subject: [PATCH 2/5] fix `empty` var scope --- libbeat/publisher/pipeline/acker.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/libbeat/publisher/pipeline/acker.go b/libbeat/publisher/pipeline/acker.go index 979049be99ce..52870fc09930 100644 --- a/libbeat/publisher/pipeline/acker.go +++ b/libbeat/publisher/pipeline/acker.go @@ -133,7 +133,6 @@ func (a *gapCountACK) ackLoop() { acks, drop := a.acks, a.drop closing := false - empty := false for { select { @@ -149,7 +148,7 @@ func (a *gapCountACK) ackLoop() { return case n := <-acks: - empty = a.handleACK(n) + empty := a.handleACK(n) if empty && closing && a.events.Load() == 0 { // stop worker, if all events accounted for have been ACKed return From f830863de1217dc1870332a59da57b8ec77fbdae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Mon, 6 May 2019 13:37:06 +0200 Subject: [PATCH 3/5] clarify acker closing condition --- libbeat/publisher/pipeline/acker.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/acker.go b/libbeat/publisher/pipeline/acker.go index 52870fc09930..43a8d755cbaa 100644 --- a/libbeat/publisher/pipeline/acker.go +++ b/libbeat/publisher/pipeline/acker.go @@ -140,7 +140,9 @@ func (a *gapCountACK) ackLoop() { closing = true a.done = nil if a.events.Load() == 0 { - // stop worker, if all events accounted for have been ACKed + // stop worker, if all events accounted for have been ACKed. + // If new events are added after this acker won't handle them, which may + // result in duplicates return } From 0a06af99eedcf917590fc27b6f96a1d540c0340f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Mon, 6 May 2019 14:42:03 +0200 Subject: [PATCH 4/5] Add CHANGELOG --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 11c0a4f98e02..731c6e77541d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -87,6 +87,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix `add_docker_metadata` source matching, using `log.file.path` field now. {pull}11577[11577] - Add missing Kubernetes metadata fields to Filebeat CoreDNS module, and fix a documentation error. {pull}11591[11591] - Reduce memory usage if long lines are truncated to fit `max_bytes` limit. The line buffer is copied into a smaller buffer now. This allows the runtime to release unused memory earlier. {pull}11524[11524] +- Fix memory leak in Filebeat pipeline acker. {pull}12063[12063] *Heartbeat* From 899d1b6c14fa9816ad91c42cb5de5cf2c2f1e1e1 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Mon, 6 May 2019 17:19:57 +0200 Subject: [PATCH 5/5] Update libbeat/publisher/pipeline/acker.go Co-Authored-By: exekias --- libbeat/publisher/pipeline/acker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/acker.go b/libbeat/publisher/pipeline/acker.go index 43a8d755cbaa..5689b86d4458 100644 --- a/libbeat/publisher/pipeline/acker.go +++ b/libbeat/publisher/pipeline/acker.go @@ -152,7 +152,7 @@ func (a *gapCountACK) ackLoop() { case n := <-acks: empty := a.handleACK(n) if empty && closing && a.events.Load() == 0 { - // stop worker, if all events accounted for have been ACKed + // stop worker, if and only if all events accounted for have been ACKed return }