diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index beaafe63ac1b..1d3d2f99f50d 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -135,6 +135,7 @@ https://github.com/elastic/beats/compare/v6.2.3...v6.3.0[View commits] - Add raw JSON to message field when JSON parsing fails. {issue}6516[6516] - Commit registry writes to stable storage to avoid corrupt registry files. {pull}6877[6877] - Fix a parsing issue in the syslog input for RFC3339 timestamp and time with nanoseconds. {pull}7046[7046] +- Fix an issue with an overflowing wait group when using the TCP input. {issue}7202[7202] *Heartbeat* diff --git a/filebeat/beater/acker.go b/filebeat/beater/acker.go index 3c39c551917c..fd1074d0654a 100644 --- a/filebeat/beater/acker.go +++ b/filebeat/beater/acker.go @@ -2,31 +2,41 @@ package beater import ( "github.com/elastic/beats/filebeat/input/file" + "github.com/elastic/beats/libbeat/logp" ) // eventAcker handles publisher pipeline ACKs and forwards -// them to the registrar. +// them to the registrar or directly to the stateless logger. type eventACKer struct { - out successLogger + stateful statefulLogger + stateless statelessLogger + log *logp.Logger } -type successLogger interface { +type statefulLogger interface { Published(states []file.State) } -func newEventACKer(out successLogger) *eventACKer { - return &eventACKer{out: out} +type statelessLogger interface { + Published(c int) bool +} + +func newEventACKer(stateless statelessLogger, stateful statefulLogger) *eventACKer { + return &eventACKer{stateless: stateless, stateful: stateful, log: logp.NewLogger("acker")} } func (a *eventACKer) ackEvents(data []interface{}) { + stateless := 0 states := make([]file.State, 0, len(data)) for _, datum := range data { if datum == nil { + stateless++ continue } st, ok := datum.(file.State) if !ok { + stateless++ continue } @@ -34,6 +44,12 @@ func (a *eventACKer) ackEvents(data []interface{}) { } if len(states) > 0 { - a.out.Published(states) + a.log.Debugw("stateful ack", "count", len(states)) + a.stateful.Published(states) + } + + if stateless > 0 { + a.log.Debugw("stateless ack", "count", stateless) + a.stateless.Published(stateless) } } diff --git a/filebeat/beater/acker_test.go b/filebeat/beater/acker_test.go new file mode 100644 index 000000000000..806a9773e80a --- /dev/null +++ b/filebeat/beater/acker_test.go @@ -0,0 +1,71 @@ +package beater + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/filebeat/input/file" +) + +type mockStatefulLogger struct { + states []file.State +} + +func (sf *mockStatefulLogger) Published(states []file.State) { + sf.states = states +} + +type mockStatelessLogger struct { + count int +} + +func (sl *mockStatelessLogger) Published(count int) bool { + sl.count = count + return true +} + +func TestACKer(t *testing.T) { + tests := []struct { + name string + data []interface{} + stateless int + stateful []file.State + }{ + { + name: "only stateless", + data: []interface{}{nil, nil}, + stateless: 2, + }, + { + name: "only stateful", + data: []interface{}{file.State{Source: "-"}, file.State{Source: "-"}}, + stateful: []file.State{file.State{Source: "-"}, file.State{Source: "-"}}, + stateless: 0, + }, + { + name: "both", + data: []interface{}{file.State{Source: "-"}, nil, file.State{Source: "-"}}, + stateful: []file.State{file.State{Source: "-"}, file.State{Source: "-"}}, + stateless: 1, + }, + { + name: "any other Private type", + data: []interface{}{struct{}{}, nil}, + stateless: 2, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + sl := &mockStatelessLogger{} + sf := &mockStatefulLogger{} + + h := newEventACKer(sl, sf) + + h.ackEvents(test.data) + assert.Equal(t, test.stateless, sl.count) + assert.Equal(t, test.stateful, sf.states) + }) + } +} diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 7976ba90eccd..cb42d0180653 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -303,7 +303,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { registrarChannel := newRegistrarLogger(registrar) err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{ - ACKEvents: newEventACKer(registrarChannel).ackEvents, + ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents, }) if err != nil { logp.Err("Failed to install the registry with the publisher pipeline: %v", err)