Skip to content

Commit

Permalink
Filebeat: Allow stateless and stateful ACKer on the global ack handler
Browse files Browse the repository at this point in the history
This commit introduces a change in how filebat handle ACK by default,
before the ACK was using the private field of the event to retrieve a
state. The updated state was sent to the registrar, and the registrar
was finalizing the ACK.

But with the introduction of the TCP/UDP and the Redis input, the events
don't have any state attached. So in that scenario, Filebeat was not
correctly acking these events to some wait group.

The ACKer was modified to handle both stateless (default) and stateful
events, when stateful is required, the states are sent to the registry
otherwise, the waiting groups are directly updated.

Fixes: elastic#7202
  • Loading branch information
ph committed May 30, 2018
1 parent 86ba4fb commit c614593
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 7 deletions.
28 changes: 22 additions & 6 deletions filebeat/beater/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,54 @@ package beater

import (
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/logp"
)

// eventAcker handles publisher pipeline ACKs and forwards
// them to the registrar.
// them to the registrar or directly to the stateless logger.
type eventACKer struct {
out successLogger
stateful statefulLogger
stateless statelessLogger
log *logp.Logger
}

type successLogger interface {
type statefulLogger interface {
Published(states []file.State)
}

func newEventACKer(out successLogger) *eventACKer {
return &eventACKer{out: out}
type statelessLogger interface {
Published(c int) bool
}

func newEventACKer(stateless statelessLogger, stateful statefulLogger) *eventACKer {
return &eventACKer{stateless: stateless, stateful: stateful, log: logp.NewLogger("acker")}
}

func (a *eventACKer) ackEvents(data []interface{}) {
stateless := 0
states := make([]file.State, 0, len(data))
for _, datum := range data {
if datum == nil {
stateless++
continue
}

st, ok := datum.(file.State)
if !ok {
stateless++
continue
}

states = append(states, st)
}

if len(states) > 0 {
a.out.Published(states)
a.log.Debugw("stateful ack", "count", len(states))
a.stateful.Published(states)
}

if stateless > 0 {
a.log.Debugw("stateless ack", "count", stateless)
a.stateless.Published(stateless)
}
}
71 changes: 71 additions & 0 deletions filebeat/beater/acker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package beater

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/filebeat/input/file"
)

type mockStatefulLogger struct {
states []file.State
}

func (sf *mockStatefulLogger) Published(states []file.State) {
sf.states = states
}

type mockStatelessLogger struct {
count int
}

func (sl *mockStatelessLogger) Published(count int) bool {
sl.count = count
return true
}

func TestACKer(t *testing.T) {
tests := []struct {
name string
data []interface{}
stateless int
stateful []file.State
}{
{
name: "only stateless",
data: []interface{}{nil, nil},
stateless: 2,
},
{
name: "only stateful",
data: []interface{}{file.State{Source: "-"}, file.State{Source: "-"}},
stateful: []file.State{file.State{Source: "-"}, file.State{Source: "-"}},
stateless: 0,
},
{
name: "both",
data: []interface{}{file.State{Source: "-"}, nil, file.State{Source: "-"}},
stateful: []file.State{file.State{Source: "-"}, file.State{Source: "-"}},
stateless: 1,
},
{
name: "any other Private type",
data: []interface{}{struct{}{}, nil},
stateless: 2,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sl := &mockStatelessLogger{}
sf := &mockStatefulLogger{}

h := newEventACKer(sl, sf)

h.ackEvents(test.data)
assert.Equal(t, test.stateless, sl.count)
assert.Equal(t, test.stateful, sf.states)
})
}
}
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
registrarChannel := newRegistrarLogger(registrar)

err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
ACKEvents: newEventACKer(registrarChannel).ackEvents,
ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents,
})
if err != nil {
logp.Err("Failed to install the registry with the publisher pipeline: %v", err)
Expand Down

0 comments on commit c614593

Please sign in to comment.