Skip to content

Commit

Permalink
Filebeat: Allow stateless and stateful ACKer on the global ack handler (
Browse files Browse the repository at this point in the history
elastic#7214) (elastic#7258)

* Filebeat: Allow stateless and stateful ACKer on the global ack handler

This commit introduces a change in how filebat handle ACK by default,
before the ACK was using the private field of the event to retrieve a
state. The updated state was sent to the registrar, and the registrar
was finalizing the ACK.

But with the introduction of the TCP/UDP and the Redis input, the events
don't have any state attached. So in that scenario, Filebeat was not
correctly acking these events to some wait group.

The ACKer was modified to handle both stateless (default) and stateful
events, when stateful is required, the states are sent to the registry
otherwise, the waiting groups are directly updated.

Fixes: elastic#7202

* changelog

(cherry picked from commit 31c46c9)
  • Loading branch information
tsg authored and Steffen Siering committed Jun 5, 2018
1 parent 9d2f6c0 commit ce390a1
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ https://github.com/elastic/beats/compare/v6.2.3...v6.3.0[View commits]
- Add raw JSON to message field when JSON parsing fails. {issue}6516[6516]
- Commit registry writes to stable storage to avoid corrupt registry files. {pull}6877[6877]
- Fix a parsing issue in the syslog input for RFC3339 timestamp and time with nanoseconds. {pull}7046[7046]
- Fix an issue with an overflowing wait group when using the TCP input. {issue}7202[7202]
*Heartbeat*
Expand Down
28 changes: 22 additions & 6 deletions filebeat/beater/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,54 @@ package beater

import (
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/logp"
)

// eventAcker handles publisher pipeline ACKs and forwards
// them to the registrar.
// them to the registrar or directly to the stateless logger.
type eventACKer struct {
out successLogger
stateful statefulLogger
stateless statelessLogger
log *logp.Logger
}

type successLogger interface {
type statefulLogger interface {
Published(states []file.State)
}

func newEventACKer(out successLogger) *eventACKer {
return &eventACKer{out: out}
type statelessLogger interface {
Published(c int) bool
}

func newEventACKer(stateless statelessLogger, stateful statefulLogger) *eventACKer {
return &eventACKer{stateless: stateless, stateful: stateful, log: logp.NewLogger("acker")}
}

func (a *eventACKer) ackEvents(data []interface{}) {
stateless := 0
states := make([]file.State, 0, len(data))
for _, datum := range data {
if datum == nil {
stateless++
continue
}

st, ok := datum.(file.State)
if !ok {
stateless++
continue
}

states = append(states, st)
}

if len(states) > 0 {
a.out.Published(states)
a.log.Debugw("stateful ack", "count", len(states))
a.stateful.Published(states)
}

if stateless > 0 {
a.log.Debugw("stateless ack", "count", stateless)
a.stateless.Published(stateless)
}
}
71 changes: 71 additions & 0 deletions filebeat/beater/acker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package beater

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/filebeat/input/file"
)

type mockStatefulLogger struct {
states []file.State
}

func (sf *mockStatefulLogger) Published(states []file.State) {
sf.states = states
}

type mockStatelessLogger struct {
count int
}

func (sl *mockStatelessLogger) Published(count int) bool {
sl.count = count
return true
}

func TestACKer(t *testing.T) {
tests := []struct {
name string
data []interface{}
stateless int
stateful []file.State
}{
{
name: "only stateless",
data: []interface{}{nil, nil},
stateless: 2,
},
{
name: "only stateful",
data: []interface{}{file.State{Source: "-"}, file.State{Source: "-"}},
stateful: []file.State{file.State{Source: "-"}, file.State{Source: "-"}},
stateless: 0,
},
{
name: "both",
data: []interface{}{file.State{Source: "-"}, nil, file.State{Source: "-"}},
stateful: []file.State{file.State{Source: "-"}, file.State{Source: "-"}},
stateless: 1,
},
{
name: "any other Private type",
data: []interface{}{struct{}{}, nil},
stateless: 2,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sl := &mockStatelessLogger{}
sf := &mockStatefulLogger{}

h := newEventACKer(sl, sf)

h.ackEvents(test.data)
assert.Equal(t, test.stateless, sl.count)
assert.Equal(t, test.stateful, sf.states)
})
}
}
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
registrarChannel := newRegistrarLogger(registrar)

err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
ACKEvents: newEventACKer(registrarChannel).ackEvents,
ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents,
})
if err != nil {
logp.Err("Failed to install the registry with the publisher pipeline: %v", err)
Expand Down

0 comments on commit ce390a1

Please sign in to comment.