From ce390a13260e7ae24ccdbc16d6064f230bfbd07f Mon Sep 17 00:00:00 2001 From: Tudor Golubenco Date: Tue, 5 Jun 2018 05:14:41 -0700 Subject: [PATCH] Filebeat: Allow stateless and stateful ACKer on the global ack handler (#7214) (#7258) * Filebeat: Allow stateless and stateful ACKer on the global ack handler This commit introduces a change in how filebat handle ACK by default, before the ACK was using the private field of the event to retrieve a state. The updated state was sent to the registrar, and the registrar was finalizing the ACK. But with the introduction of the TCP/UDP and the Redis input, the events don't have any state attached. So in that scenario, Filebeat was not correctly acking these events to some wait group. The ACKer was modified to handle both stateless (default) and stateful events, when stateful is required, the states are sent to the registry otherwise, the waiting groups are directly updated. Fixes: #7202 * changelog (cherry picked from commit 31c46c99610876fe386f16d350ca0d26539baaf6) --- CHANGELOG.asciidoc | 1 + filebeat/beater/acker.go | 28 +++++++++++--- filebeat/beater/acker_test.go | 71 +++++++++++++++++++++++++++++++++++ filebeat/beater/filebeat.go | 2 +- 4 files changed, 95 insertions(+), 7 deletions(-) create mode 100644 filebeat/beater/acker_test.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index beaafe63ac1..1d3d2f99f50 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -135,6 +135,7 @@ https://github.com/elastic/beats/compare/v6.2.3...v6.3.0[View commits] - Add raw JSON to message field when JSON parsing fails. {issue}6516[6516] - Commit registry writes to stable storage to avoid corrupt registry files. {pull}6877[6877] - Fix a parsing issue in the syslog input for RFC3339 timestamp and time with nanoseconds. {pull}7046[7046] +- Fix an issue with an overflowing wait group when using the TCP input. {issue}7202[7202] *Heartbeat* diff --git a/filebeat/beater/acker.go b/filebeat/beater/acker.go index 3c39c551917..fd1074d0654 100644 --- a/filebeat/beater/acker.go +++ b/filebeat/beater/acker.go @@ -2,31 +2,41 @@ package beater import ( "github.com/elastic/beats/filebeat/input/file" + "github.com/elastic/beats/libbeat/logp" ) // eventAcker handles publisher pipeline ACKs and forwards -// them to the registrar. +// them to the registrar or directly to the stateless logger. type eventACKer struct { - out successLogger + stateful statefulLogger + stateless statelessLogger + log *logp.Logger } -type successLogger interface { +type statefulLogger interface { Published(states []file.State) } -func newEventACKer(out successLogger) *eventACKer { - return &eventACKer{out: out} +type statelessLogger interface { + Published(c int) bool +} + +func newEventACKer(stateless statelessLogger, stateful statefulLogger) *eventACKer { + return &eventACKer{stateless: stateless, stateful: stateful, log: logp.NewLogger("acker")} } func (a *eventACKer) ackEvents(data []interface{}) { + stateless := 0 states := make([]file.State, 0, len(data)) for _, datum := range data { if datum == nil { + stateless++ continue } st, ok := datum.(file.State) if !ok { + stateless++ continue } @@ -34,6 +44,12 @@ func (a *eventACKer) ackEvents(data []interface{}) { } if len(states) > 0 { - a.out.Published(states) + a.log.Debugw("stateful ack", "count", len(states)) + a.stateful.Published(states) + } + + if stateless > 0 { + a.log.Debugw("stateless ack", "count", stateless) + a.stateless.Published(stateless) } } diff --git a/filebeat/beater/acker_test.go b/filebeat/beater/acker_test.go new file mode 100644 index 00000000000..806a9773e80 --- /dev/null +++ b/filebeat/beater/acker_test.go @@ -0,0 +1,71 @@ +package beater + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/filebeat/input/file" +) + +type mockStatefulLogger struct { + states []file.State +} + +func (sf *mockStatefulLogger) Published(states []file.State) { + sf.states = states +} + +type mockStatelessLogger struct { + count int +} + +func (sl *mockStatelessLogger) Published(count int) bool { + sl.count = count + return true +} + +func TestACKer(t *testing.T) { + tests := []struct { + name string + data []interface{} + stateless int + stateful []file.State + }{ + { + name: "only stateless", + data: []interface{}{nil, nil}, + stateless: 2, + }, + { + name: "only stateful", + data: []interface{}{file.State{Source: "-"}, file.State{Source: "-"}}, + stateful: []file.State{file.State{Source: "-"}, file.State{Source: "-"}}, + stateless: 0, + }, + { + name: "both", + data: []interface{}{file.State{Source: "-"}, nil, file.State{Source: "-"}}, + stateful: []file.State{file.State{Source: "-"}, file.State{Source: "-"}}, + stateless: 1, + }, + { + name: "any other Private type", + data: []interface{}{struct{}{}, nil}, + stateless: 2, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + sl := &mockStatelessLogger{} + sf := &mockStatefulLogger{} + + h := newEventACKer(sl, sf) + + h.ackEvents(test.data) + assert.Equal(t, test.stateless, sl.count) + assert.Equal(t, test.stateful, sf.states) + }) + } +} diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 7976ba90ecc..cb42d018065 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -303,7 +303,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { registrarChannel := newRegistrarLogger(registrar) err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{ - ACKEvents: newEventACKer(registrarChannel).ackEvents, + ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents, }) if err != nil { logp.Err("Failed to install the registry with the publisher pipeline: %v", err)