Skip to content

Commit

Permalink
Fix firehose issues (#44)
Browse files Browse the repository at this point in the history
These are similar fixes to what has been done in the streams plugin.

Please see the commit d0db8d4 as it looks like the same problem from #41 but I think this is a slightly neater way of handling it.

We've been running this branch for 3 weeks now and it's gone from crashing very often to not crashing at all.

Closes #39

Changelog:

* Properly format json for firehose

This was already done for streams in a086eea

* Fix panic on Firehose ack/retry

Not entirely sure what the problem is but we've seen panics from multiple places in the code. Mostly copying changes that were made to the streams client in ce91e04 and hoping it helps.

* Fix nil dereference in Firehose failed responses

The test fails with the old function and passes with the new one. I haven't seen the actual responses from the API but I suspect that when some records failed and others passed, only the ones that failed have an ErrorCode. So it needs to check if `r.ErrorCode != nil` before checking the value. It seems the `aws.StringValue` helper function does that and also removes the need for the other nil check.
  • Loading branch information
raymondbutcher authored and mumoshu committed Jul 25, 2018
1 parent 0b86784 commit 3f6c0fb
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 29 deletions.
77 changes: 51 additions & 26 deletions firehose/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package firehose

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/elastic/beats/libbeat/beat"
Expand Down Expand Up @@ -44,38 +45,60 @@ func (client *client) Connect() error {

func (client *client) Publish(batch publisher.Batch) error {
events := batch.Events()
rest, _ := client.publishEvents(events)
if len(rest) == 0 {
// We have to ACK only when all the submission succeeded
// Ref: https://github.com/elastic/beats/blob/c4af03c51373c1de7daaca660f5d21b3f602771c/libbeat/outputs/elasticsearch/client.go#L232
batch.ACK()
} else {
// Mark the failed events to retry
// Ref: https://github.com/elastic/beats/blob/c4af03c51373c1de7daaca660f5d21b3f602771c/libbeat/outputs/elasticsearch/client.go#L234
batch.RetryEvents(rest)
}
// This shouldn't be an error object according to other official beats' implementations
// Ref: https://github.com/elastic/beats/blob/c4af03c51373c1de7daaca660f5d21b3f602771c/libbeat/outputs/kafka/client.go#L119
return nil
}

func (client *client) publishEvents(events []publisher.Event) ([]publisher.Event, error) {
observer := client.observer
observer.NewBatch(len(events))

records, dropped := client.mapEvents(events)
res, err := client.sendRecords(records)
if err != nil {
logp.Critical("Unable to send batch: %v", err)
observer.Dropped(len(events))
return err
}
logp.Debug("firehose", "received events: %v", events)
okEvents, records, dropped := client.mapEvents(events)

processFailedDeliveries(res, batch)
batch.ACK()
logp.Debug("firehose", "Sent %d records", len(events))
logp.Debug("firehose", "sent %d records: %v", len(records), records)
observer.Dropped(dropped)
observer.Acked(len(events) - dropped)
return nil
observer.Acked(len(okEvents))

logp.Debug("firehose", "mapped to records: %v", records)
res, err := client.sendRecords(records)
failed := collectFailedEvents(res, events)
if err != nil && len(failed) == 0 {
failed = events
}
if len(failed) > 0 {
logp.Info("retrying %d events on error: %v", len(failed), err)
}
return failed, err
}

func (client *client) mapEvents(events []publisher.Event) ([]*firehose.Record, int) {
func (client *client) mapEvents(events []publisher.Event) ([]publisher.Event, []*firehose.Record, int) {
dropped := 0
records := make([]*firehose.Record, 0, len(events))
okEvents := make([]publisher.Event, 0, len(events))
for _, event := range events {
record, err := client.mapEvent(&event)
if err != nil {
logp.Debug("firehose", "failed to map event(%v): %v", event, err)
dropped++
} else {
okEvents = append(okEvents, event)
records = append(records, record)
}
}

return records, dropped
return okEvents, records, dropped
}

func (client *client) mapEvent(event *publisher.Event) (*firehose.Record, error) {
Expand All @@ -92,8 +115,15 @@ func (client *client) mapEvent(event *publisher.Event) (*firehose.Record, error)
}
// See https://github.com/elastic/beats/blob/5a6630a8bc9b9caf312978f57d1d9193bdab1ac7/libbeat/outputs/kafka/client.go#L163-L164
// You need to copy the byte data like this. Otherwise you see strange issues like all the records sent in a same batch has the same Data.
buf = make([]byte, len(serializedEvent))
buf = make([]byte, len(serializedEvent)+1)
copy(buf, serializedEvent)
// Firehose doesn't automatically add trailing new-line on after each record.
// This ends up a stream->firehose->s3 pipeline to produce useless s3 objects.
// No ndjson, but a sequence of json objects without separators...
// Fix it just adding a new-line.
//
// See https://stackoverflow.com/questions/43010117/writing-properly-formatted-json-to-s3-to-load-in-athena-redshift
buf[len(buf)-1] = byte('\n')
}

return &firehose.Record{Data: buf}, nil
Expand All @@ -106,21 +136,16 @@ func (client *client) sendRecords(records []*firehose.Record) (*firehose.PutReco
return client.firehose.PutRecordBatch(&request)
}

func processFailedDeliveries(res *firehose.PutRecordBatchOutput, batch publisher.Batch) {
if *res.FailedPutCount > 0 {
events := batch.Events()
func collectFailedEvents(res *firehose.PutRecordBatchOutput, events []publisher.Event) []publisher.Event {
if aws.Int64Value(res.FailedPutCount) > 0 {
failedEvents := make([]publisher.Event, 0)
responses := res.RequestResponses
for i, response := range responses {
if *response.ErrorCode != "" {
for i, r := range responses {
if aws.StringValue(r.ErrorCode) != "" {
failedEvents = append(failedEvents, events[i])
}
}

if len(failedEvents) > 0 {
logp.Warn("Retrying %d events", len(failedEvents))
batch.RetryEvents(failedEvents)
return
}
return failedEvents
}
return []publisher.Event{}
}
41 changes: 38 additions & 3 deletions firehose/client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package firehose

import (
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/publisher"
"testing"
Expand All @@ -17,21 +18,55 @@ func TestMapEvent(t *testing.T) {
client := client{encoder: MockCodec{}}
record, _ := client.mapEvent(&publisher.Event{})

if string(record.Data) != "boom" {
if string(record.Data) != "boom\n" {
t.Errorf("Unexpected data: %s", record.Data)
}
}

func TestMapEvents(t *testing.T) {
client := client{encoder: MockCodec{}}
events := []publisher.Event{{}}
records, _ := client.mapEvents(events)
okEvents, records, _ := client.mapEvents(events)

if len(records) != 1 {
t.Errorf("Expected 1 records, got %v", len(records))
}

if string(records[0].Data) != "boom" {
if len(okEvents) != 1 {
t.Errorf("Expected 1 ok events, got %v", len(okEvents))
}

if string(records[0].Data) != "boom\n" {
t.Errorf("Unexpected data %s", records[0].Data)
}
}

func TestCollectFailedEvents(t *testing.T) {
client := client{encoder: MockCodec{}}
events := []publisher.Event{{}, {}}
okEvents, _, _ := client.mapEvents(events)

res := firehose.PutRecordBatchOutput{}
entry1 := firehose.PutRecordBatchResponseEntry{}
entry2 := firehose.PutRecordBatchResponseEntry{}
responses := []*firehose.PutRecordBatchResponseEntry{&entry1, &entry2}
res.SetRequestResponses(responses)

{
failed := collectFailedEvents(&res, okEvents)

if len(failed) != 0 {
t.Errorf("Expected 0 failed, got %v", len(failed))
}
}
{
res.SetFailedPutCount(1)
entry2.SetErrorCode("boom")

failed := collectFailedEvents(&res, okEvents)

if len(failed) != 1 {
t.Errorf("Expected 1 failed, got %v", len(failed))
}
}
}

0 comments on commit 3f6c0fb

Please sign in to comment.