Skip to content

Commit

Permalink
Filebeat: Allow stateless and stateful ACKer on the global ack handler (
Browse files Browse the repository at this point in the history
elastic#7214)

* Filebeat: Allow stateless and stateful ACKer on the global ack handler

This commit introduces a change in how filebat handle ACK by default,
before the ACK was using the private field of the event to retrieve a
state. The updated state was sent to the registrar, and the registrar
was finalizing the ACK.

But with the introduction of the TCP/UDP and the Redis input, the events
don't have any state attached. So in that scenario, Filebeat was not
correctly acking these events to some wait group.

The ACKer was modified to handle both stateless (default) and stateful
events, when stateful is required, the states are sent to the registry
otherwise, the waiting groups are directly updated.

Fixes: elastic#7202

* changelog
  • Loading branch information
ph authored and tsg committed Jun 4, 2018
1 parent 7f1a5fd commit 31c46c9
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Fix a data race between stopping and starting of the harverters. {issue}#6879[6879]
- Fix a parsing issue in the syslog input for RFC3339 timestamp and time with nanoseconds. {pull}7046[7046]
- Comply with PostgreSQL database name format {pull}7198[7198]
- Fix an issue with an overflowing wait group when using the TCP input. {issue}7202[7202]

*Heartbeat*
- Fix race due to updates of shared a map, that was not supposed to be shared between multiple go-routines. {issue}6616[6616]
Expand Down
28 changes: 22 additions & 6 deletions filebeat/beater/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,54 @@ package beater

import (
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/logp"
)

// eventAcker handles publisher pipeline ACKs and forwards
// them to the registrar.
// them to the registrar or directly to the stateless logger.
type eventACKer struct {
out successLogger
stateful statefulLogger
stateless statelessLogger
log *logp.Logger
}

type successLogger interface {
type statefulLogger interface {
Published(states []file.State)
}

func newEventACKer(out successLogger) *eventACKer {
return &eventACKer{out: out}
type statelessLogger interface {
Published(c int) bool
}

func newEventACKer(stateless statelessLogger, stateful statefulLogger) *eventACKer {
return &eventACKer{stateless: stateless, stateful: stateful, log: logp.NewLogger("acker")}
}

func (a *eventACKer) ackEvents(data []interface{}) {
stateless := 0
states := make([]file.State, 0, len(data))
for _, datum := range data {
if datum == nil {
stateless++
continue
}

st, ok := datum.(file.State)
if !ok {
stateless++
continue
}

states = append(states, st)
}

if len(states) > 0 {
a.out.Published(states)
a.log.Debugw("stateful ack", "count", len(states))
a.stateful.Published(states)
}

if stateless > 0 {
a.log.Debugw("stateless ack", "count", stateless)
a.stateless.Published(stateless)
}
}
71 changes: 71 additions & 0 deletions filebeat/beater/acker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package beater

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/filebeat/input/file"
)

type mockStatefulLogger struct {
states []file.State
}

func (sf *mockStatefulLogger) Published(states []file.State) {
sf.states = states
}

type mockStatelessLogger struct {
count int
}

func (sl *mockStatelessLogger) Published(count int) bool {
sl.count = count
return true
}

func TestACKer(t *testing.T) {
tests := []struct {
name string
data []interface{}
stateless int
stateful []file.State
}{
{
name: "only stateless",
data: []interface{}{nil, nil},
stateless: 2,
},
{
name: "only stateful",
data: []interface{}{file.State{Source: "-"}, file.State{Source: "-"}},
stateful: []file.State{file.State{Source: "-"}, file.State{Source: "-"}},
stateless: 0,
},
{
name: "both",
data: []interface{}{file.State{Source: "-"}, nil, file.State{Source: "-"}},
stateful: []file.State{file.State{Source: "-"}, file.State{Source: "-"}},
stateless: 1,
},
{
name: "any other Private type",
data: []interface{}{struct{}{}, nil},
stateless: 2,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sl := &mockStatelessLogger{}
sf := &mockStatefulLogger{}

h := newEventACKer(sl, sf)

h.ackEvents(test.data)
assert.Equal(t, test.stateless, sl.count)
assert.Equal(t, test.stateful, sf.states)
})
}
}
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
registrarChannel := newRegistrarLogger(registrar)

err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
ACKEvents: newEventACKer(registrarChannel).ackEvents,
ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents,
})
if err != nil {
logp.Err("Failed to install the registry with the publisher pipeline: %v", err)
Expand Down

0 comments on commit 31c46c9

Please sign in to comment.