Skip to content

Commit

Permalink
fix(streams): Various fixes to correctly retry only on Kinesis failur…
Browse files Browse the repository at this point in the history
…e, plus Stream+Firehose+S3 support (s12v#31)

* fix(streams): Handle nil record(s) returned by aws-sdk-go

Ref s12v#27

* fix(doc,example): Filebeat docker example was referring invalid tag

* fix(streams): Fix panicing on a retry

Turns out awsbeats was calling `batch.RetryEvents` on different data and timing. Revise the implementation according to the [official beat outputs](https://github.com/elastic/beats/blob/c4af03c51373c1de7daaca660f5d21b3f602771c/libbeat/outputs/elasticsearch/client.go#L234)

Fixes s12v#29

* Support for Kinesis DataStreams->Firehose->S3 pieplines

* Improve error propagation and not give up retrying too aggressively

* Add tests

* More fixes to actually trigger retry on failure. Tested with invalid AWS credential
  • Loading branch information
mumoshu authored May 31, 2018
1 parent 028b7c1 commit a086eea
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 38 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,7 @@ endif
.PHONY: dockerimage
dockerimage:
docker build --build-arg AWSBEATS_VERSION=$(AWSBEATS_VERSION) --build-arg GO_VERSION=$(GO_VERSION) --build-arg BEATS_VERSION=$(BEATS_VERSION) --build-arg BEAT_NAME=$(BEAT_NAME) -t $(DOCKER_IMAGE):$(DOCKER_TAG) .

.PHONY: filebeat-image
filebeat-image:
@bash -c 'make dockerimage BEATS_VERSION=6.2.4 GO_VERSION=1.10.2 BEAT_NAME=filebeat AWSBEATS_VERSION=$(ref=$(git rev-parse HEAD); ref=${ref:0:7}; echo $ref) GOPATH=$HOME/go'
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ To build a docker image for awsbeats, run `make dockerimage`.
make dockerimage BEATS_VERSION=6.2.4 GO_VERSION=1.10.2 GOPATH=$HOME/go
```

There is also a convenient make target `filebeat-image` with sane defaults:

```console
make filebeat
```

**metricbeat**:

```
Expand Down
2 changes: 1 addition & 1 deletion hack/containerized-filebeat
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ docker run \
-v $(pwd)/filebeat.yml:/etc/filebeat/filebeat.yml \
-e AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} \
-e AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} \
s12v/awsbeats:canary \
s12v/awsbeats:filebeat-canary \
filebeat \
--plugin kinesis.so \
-e \
Expand Down
92 changes: 63 additions & 29 deletions streams/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type client struct {
streams *kinesis.Kinesis
streams kinesisStreamsClient
streamName string
partitionKeyProvider PartitionKeyProvider
beatName string
Expand All @@ -24,6 +24,10 @@ type client struct {
observer outputs.Observer
}

type kinesisStreamsClient interface {
PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error)
}

func newClient(sess *session.Session, config *StreamsConfig, observer outputs.Observer, beat beat.Info) (*client, error) {
partitionKeyProvider := createPartitionKeyProvider(config)
client := &client{
Expand Down Expand Up @@ -57,42 +61,60 @@ func (client *client) Connect() error {

func (client *client) Publish(batch publisher.Batch) error {
events := batch.Events()
rest, _ := client.publishEvents(events)
if len(rest) == 0 {
// We have to ACK only when all the submission succeeded
// Ref: https://github.com/elastic/beats/blob/c4af03c51373c1de7daaca660f5d21b3f602771c/libbeat/outputs/elasticsearch/client.go#L232
batch.ACK()
} else {
// Mark the failed events to retry
// Ref: https://github.com/elastic/beats/blob/c4af03c51373c1de7daaca660f5d21b3f602771c/libbeat/outputs/elasticsearch/client.go#L234
batch.RetryEvents(rest)
}
// This shouldn't be an error object according to other official beats' implementations
// Ref: https://github.com/elastic/beats/blob/c4af03c51373c1de7daaca660f5d21b3f602771c/libbeat/outputs/kafka/client.go#L119
return nil
}

func (client *client) publishEvents(events []publisher.Event) ([]publisher.Event, error) {
observer := client.observer
observer.NewBatch(len(events))

logp.Debug("kinesis", "received events: %v", events)
records, dropped := client.mapEvents(events)
okEvents, records, dropped := client.mapEvents(events)
if dropped > 0 {
logp.Debug("kinesis", "sent %d records: %v", len(records), records)
observer.Dropped(dropped)
observer.Acked(len(okEvents))
}
logp.Debug("kinesis", "mapped to records: %v", records)
res, err := client.sendRecords(records)
if err != nil {
logp.Critical("Unable to send batch: %v", err)
observer.Dropped(len(events))
return err
res, err := client.putKinesisRecords(records)
failed := collectFailedEvents(res, events)
if err != nil && len(failed) == 0 {
failed = events
}
processFailedDeliveries(res, batch)

batch.ACK()
logp.Debug("kinesis", "sent %d records: %v", len(records), records)
observer.Dropped(dropped)
observer.Acked(len(events) - dropped)
return nil
if len(failed) > 0 {
logp.Info("retrying %d events on error: %v", len(failed), err)
}
return failed, err
}

func (client *client) mapEvents(events []publisher.Event) ([]*kinesis.PutRecordsRequestEntry, int) {
func (client *client) mapEvents(events []publisher.Event) ([]publisher.Event, []*kinesis.PutRecordsRequestEntry, int) {
dropped := 0
records := make([]*kinesis.PutRecordsRequestEntry, 0, len(events))
okEvents := make([]publisher.Event, 0, len(events))
for i := range events {
event := events[i]
record, err := client.mapEvent(&event)
if err != nil {
logp.Debug("kinesis", "failed to map event(%v): %v", event, err)
dropped++
} else {
okEvents = append(okEvents, event)
records = append(records, record)
}
}

return records, dropped
return okEvents, records, dropped
}

func (client *client) mapEvent(event *publisher.Event) (*kinesis.PutRecordsRequestEntry, error) {
Expand All @@ -105,8 +127,15 @@ func (client *client) mapEvent(event *publisher.Event) (*kinesis.PutRecordsReque
}
// See https://github.com/elastic/beats/blob/5a6630a8bc9b9caf312978f57d1d9193bdab1ac7/libbeat/outputs/kafka/client.go#L163-L164
// You need to copy the byte data like this. Otherwise you see strange issues like all the records sent in a same batch has the same Data.
buf = make([]byte, len(serializedEvent))
buf = make([]byte, len(serializedEvent)+1)
copy(buf, serializedEvent)
// Firehose doesn't automatically add trailing new-line on after each record.
// This ends up a stream->firehose->s3 pipeline to produce useless s3 objects.
// No ndjson, but a sequence of json objects without separators...
// Fix it just adding a new-line.
//
// See https://stackoverflow.com/questions/43010117/writing-properly-formatted-json-to-s3-to-load-in-athena-redshift
buf[len(buf)-1] = byte('\n')
}

partitionKey, err := client.partitionKeyProvider.PartitionKeyFor(event)
Expand All @@ -116,29 +145,34 @@ func (client *client) mapEvent(event *publisher.Event) (*kinesis.PutRecordsReque

return &kinesis.PutRecordsRequestEntry{Data: buf, PartitionKey: aws.String(partitionKey)}, nil
}
func (client *client) sendRecords(records []*kinesis.PutRecordsRequestEntry) (*kinesis.PutRecordsOutput, error) {
func (client *client) putKinesisRecords(records []*kinesis.PutRecordsRequestEntry) (*kinesis.PutRecordsOutput, error) {
request := kinesis.PutRecordsInput{
StreamName: &client.streamName,
Records: records,
}
return client.streams.PutRecords(&request)
res, err := client.streams.PutRecords(&request)
if err != nil {
return res, fmt.Errorf("failed to put records: %v", err)
}
return res, nil
}

func processFailedDeliveries(res *kinesis.PutRecordsOutput, batch publisher.Batch) {
if *res.FailedRecordCount > 0 {
events := batch.Events()
func collectFailedEvents(res *kinesis.PutRecordsOutput, events []publisher.Event) []publisher.Event {
if res.FailedRecordCount != nil && *res.FailedRecordCount > 0 {
failedEvents := make([]publisher.Event, 0)
records := res.Records
for i, r := range records {
if r == nil {
// See https://github.com/s12v/awsbeats/issues/27 for more info
logp.Warn("no record returned from kinesis for event: ", events[i])
continue
}
if *r.ErrorCode != "" {
failedEvents = append(failedEvents, events[i])
}
}

if len(failedEvents) > 0 {
logp.Warn("Retrying %d events", len(failedEvents))
batch.RetryEvents(failedEvents)
return
}
logp.Warn("Retrying %d events", len(failedEvents))
return failedEvents
}
return []publisher.Event{}
}
131 changes: 123 additions & 8 deletions streams/client_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
package streams

import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/publisher"
"testing"
)

type MockCodec struct {
type StubCodec struct {
dat []byte
err error
}

func (mock MockCodec) Encode(index string, event *beat.Event) ([]byte, error) {
return []byte("boom"), nil
func (c StubCodec) Encode(index string, event *beat.Event) ([]byte, error) {
return c.dat, c.err
}

type StubClient struct {
out *kinesis.PutRecordsOutput
err error
}

func (c StubClient) PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) {
return c.out, c.err
}

func TestCreateXidPartitionKeyProvider(t *testing.T) {
Expand Down Expand Up @@ -51,15 +65,15 @@ func TestMapEvent(t *testing.T) {
fieldForPartitionKey := "mypartitionkey"
expectedPartitionKey := "foobar"
provider := newFieldPartitionKeyProvider(fieldForPartitionKey)
client := client{encoder: MockCodec{}, partitionKeyProvider: provider}
client := client{encoder: StubCodec{dat: []byte("boom")}, partitionKeyProvider: provider}
event := &publisher.Event{Content: beat.Event{Fields: common.MapStr{fieldForPartitionKey: expectedPartitionKey}}}
record, err := client.mapEvent(event)

if err != nil {
t.Fatalf("uenxpected error: %v", err)
}

if string(record.Data) != "boom" {
if string(record.Data) != "boom\n" {
t.Errorf("Unexpected data: %s", record.Data)
}

Expand All @@ -74,16 +88,117 @@ func TestMapEvents(t *testing.T) {
expectedPartitionKey := "foobar"
provider := newFieldPartitionKeyProvider(fieldForPartitionKey)

client := client{encoder: MockCodec{}, partitionKeyProvider: provider}
client := client{encoder: StubCodec{dat: []byte("boom")}, partitionKeyProvider: provider}
event := publisher.Event{Content: beat.Event{Fields: common.MapStr{fieldForPartitionKey: expectedPartitionKey}}}
events := []publisher.Event{event}
records, _ := client.mapEvents(events)
okEvents, records, _ := client.mapEvents(events)

if len(records) != 1 {
t.Errorf("Expected 1 records, got %v", len(records))
}

if string(records[0].Data) != "boom" {
if len(okEvents) != 1 {
t.Errorf("Expected 1 ok events, got %v", len(okEvents))
}

if string(records[0].Data) != "boom\n" {
t.Errorf("Unexpected data %s", records[0].Data)
}
}

func TestPublishEvents(t *testing.T) {
fieldForPartitionKey := "mypartitionkey"
expectedPartitionKey := "foobar"
provider := newFieldPartitionKeyProvider(fieldForPartitionKey)
client := client{
partitionKeyProvider: provider,
observer: outputs.NewNilObserver(),
}
event := publisher.Event{Content: beat.Event{Fields: common.MapStr{fieldForPartitionKey: expectedPartitionKey}}}
events := []publisher.Event{event}

{
client.encoder = StubCodec{dat: []byte("boom"), err: nil}
client.streams = StubClient{
out: &kinesis.PutRecordsOutput{
Records: []*kinesis.PutRecordsResultEntry{
&kinesis.PutRecordsResultEntry{
ErrorCode: aws.String(""),
},
},
FailedRecordCount: aws.Int64(0),
},
}
rest, err := client.publishEvents(events)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(rest) != 0 {
t.Errorf("unexpected number of remaining events: %d", len(rest))
}
}

{
// An event that can't be encoded should be ignored without any error, but with some log.
client.encoder = StubCodec{dat: []byte(""), err: fmt.Errorf("failed to encode")}
client.streams = StubClient{
out: &kinesis.PutRecordsOutput{
Records: []*kinesis.PutRecordsResultEntry{
&kinesis.PutRecordsResultEntry{
ErrorCode: aws.String(""),
},
},
FailedRecordCount: aws.Int64(0),
},
}
rest, err := client.publishEvents(events)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(rest) != 0 {
t.Errorf("unexpected number of remaining events: %d", len(rest))
}
}

{
// Nil records returned by Kinesis should be ignored with some log
client.encoder = StubCodec{dat: []byte("boom"), err: nil}
client.streams = StubClient{
out: &kinesis.PutRecordsOutput{
Records: []*kinesis.PutRecordsResultEntry{
nil,
},
FailedRecordCount: aws.Int64(1),
},
}
rest, err := client.publishEvents(events)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(rest) != 0 {
t.Errorf("unexpected number of remaining events: %d", len(rest))
}
}

{
// Kinesis received the event but it was not persisted, probably due to underlying infrastructure failure
client.encoder = StubCodec{dat: []byte("boom"), err: nil}
client.streams = StubClient{
out: &kinesis.PutRecordsOutput{
Records: []*kinesis.PutRecordsResultEntry{
&kinesis.PutRecordsResultEntry{
ErrorCode: aws.String("simulated_error"),
},
},
FailedRecordCount: aws.Int64(1),
},
}
rest, err := client.publishEvents(events)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(rest) != 1 {
t.Errorf("unexpected number of remaining events: %d", len(rest))
}
}
}

0 comments on commit a086eea

Please sign in to comment.