Skip to content

Commit

Permalink
Fix memory leak in Filebeat pipeline acker (elastic#12063)
Browse files Browse the repository at this point in the history
* Fix memory leak in Filebeat pipeline acker

Before this change acker goroutine was kept forever as processed events
count was not correctly updated.

Filebeat sends an empty event to update file states, this event is not
published, but treated as dropped, without updating counters.

This change makes sures that `a.events` count gets updated for dropped
events also, so the acker gets closed after all ACKs happen.
  • Loading branch information
exekias authored May 7, 2019
1 parent 8eddfbf commit 9dc1f39
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix `add_docker_metadata` source matching, using `log.file.path` field now. {pull}11577[11577]
- Add missing Kubernetes metadata fields to Filebeat CoreDNS module, and fix a documentation error. {pull}11591[11591]
- Reduce memory usage if long lines are truncated to fit `max_bytes` limit. The line buffer is copied into a smaller buffer now. This allows the runtime to release unused memory earlier. {pull}11524[11524]
- Fix memory leak in Filebeat pipeline acker. {pull}12063[12063]

*Heartbeat*

Expand Down
9 changes: 8 additions & 1 deletion libbeat/publisher/pipeline/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,26 @@ func (a *gapCountACK) ackLoop() {
case <-a.done:
closing = true
a.done = nil
if a.events.Load() == 0 {
// stop worker, if all events accounted for have been ACKed.
// If new events are added after this acker won't handle them, which may
// result in duplicates
return
}

case <-a.pipeline.ackDone:
return

case n := <-acks:
empty := a.handleACK(n)
if empty && closing && a.events.Load() == 0 {
// stop worker, iff all events accounted for have been ACKed
// stop worker, if and only if all events accounted for have been ACKed
return
}

case <-drop:
// TODO: accumulate multiple drop events + flush count with timer
a.events.Sub(1)
a.fn(1, 0)
}
}
Expand Down

0 comments on commit 9dc1f39

Please sign in to comment.