Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix firehose issues #44

Merged
merged 3 commits into from
Jul 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 51 additions & 26 deletions firehose/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package firehose

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/elastic/beats/libbeat/beat"
Expand Down Expand Up @@ -44,38 +45,60 @@ func (client *client) Connect() error {

func (client *client) Publish(batch publisher.Batch) error {
events := batch.Events()
rest, _ := client.publishEvents(events)
if len(rest) == 0 {
// We have to ACK only when all the submission succeeded
// Ref: https://github.com/elastic/beats/blob/c4af03c51373c1de7daaca660f5d21b3f602771c/libbeat/outputs/elasticsearch/client.go#L232
batch.ACK()
} else {
// Mark the failed events to retry
// Ref: https://github.com/elastic/beats/blob/c4af03c51373c1de7daaca660f5d21b3f602771c/libbeat/outputs/elasticsearch/client.go#L234
batch.RetryEvents(rest)
}
// This shouldn't be an error object according to other official beats' implementations
// Ref: https://github.com/elastic/beats/blob/c4af03c51373c1de7daaca660f5d21b3f602771c/libbeat/outputs/kafka/client.go#L119
return nil
}

func (client *client) publishEvents(events []publisher.Event) ([]publisher.Event, error) {
observer := client.observer
observer.NewBatch(len(events))

records, dropped := client.mapEvents(events)
res, err := client.sendRecords(records)
if err != nil {
logp.Critical("Unable to send batch: %v", err)
observer.Dropped(len(events))
return err
}
logp.Debug("firehose", "received events: %v", events)
okEvents, records, dropped := client.mapEvents(events)

processFailedDeliveries(res, batch)
batch.ACK()
logp.Debug("firehose", "Sent %d records", len(events))
logp.Debug("firehose", "sent %d records: %v", len(records), records)
observer.Dropped(dropped)
observer.Acked(len(events) - dropped)
return nil
observer.Acked(len(okEvents))

logp.Debug("firehose", "mapped to records: %v", records)
res, err := client.sendRecords(records)
failed := collectFailedEvents(res, events)
if err != nil && len(failed) == 0 {
failed = events
}
if len(failed) > 0 {
logp.Info("retrying %d events on error: %v", len(failed), err)
}
return failed, err
}

func (client *client) mapEvents(events []publisher.Event) ([]*firehose.Record, int) {
func (client *client) mapEvents(events []publisher.Event) ([]publisher.Event, []*firehose.Record, int) {
dropped := 0
records := make([]*firehose.Record, 0, len(events))
okEvents := make([]publisher.Event, 0, len(events))
for _, event := range events {
record, err := client.mapEvent(&event)
if err != nil {
logp.Debug("firehose", "failed to map event(%v): %v", event, err)
dropped++
} else {
okEvents = append(okEvents, event)
records = append(records, record)
}
}

return records, dropped
return okEvents, records, dropped
}

func (client *client) mapEvent(event *publisher.Event) (*firehose.Record, error) {
Expand All @@ -92,8 +115,15 @@ func (client *client) mapEvent(event *publisher.Event) (*firehose.Record, error)
}
// See https://github.com/elastic/beats/blob/5a6630a8bc9b9caf312978f57d1d9193bdab1ac7/libbeat/outputs/kafka/client.go#L163-L164
// You need to copy the byte data like this. Otherwise you see strange issues like all the records sent in a same batch has the same Data.
buf = make([]byte, len(serializedEvent))
buf = make([]byte, len(serializedEvent)+1)
copy(buf, serializedEvent)
// Firehose doesn't automatically add trailing new-line on after each record.
// This ends up a stream->firehose->s3 pipeline to produce useless s3 objects.
// No ndjson, but a sequence of json objects without separators...
// Fix it just adding a new-line.
//
// See https://stackoverflow.com/questions/43010117/writing-properly-formatted-json-to-s3-to-load-in-athena-redshift
buf[len(buf)-1] = byte('\n')
}

return &firehose.Record{Data: buf}, nil
Expand All @@ -106,21 +136,16 @@ func (client *client) sendRecords(records []*firehose.Record) (*firehose.PutReco
return client.firehose.PutRecordBatch(&request)
}

func processFailedDeliveries(res *firehose.PutRecordBatchOutput, batch publisher.Batch) {
if *res.FailedPutCount > 0 {
events := batch.Events()
func collectFailedEvents(res *firehose.PutRecordBatchOutput, events []publisher.Event) []publisher.Event {
if aws.Int64Value(res.FailedPutCount) > 0 {
failedEvents := make([]publisher.Event, 0)
responses := res.RequestResponses
for i, response := range responses {
if *response.ErrorCode != "" {
for i, r := range responses {
if aws.StringValue(r.ErrorCode) != "" {
failedEvents = append(failedEvents, events[i])
}
}

if len(failedEvents) > 0 {
logp.Warn("Retrying %d events", len(failedEvents))
batch.RetryEvents(failedEvents)
return
}
return failedEvents
}
return []publisher.Event{}
}
41 changes: 38 additions & 3 deletions firehose/client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package firehose

import (
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/publisher"
"testing"
Expand All @@ -17,21 +18,55 @@ func TestMapEvent(t *testing.T) {
client := client{encoder: MockCodec{}}
record, _ := client.mapEvent(&publisher.Event{})

if string(record.Data) != "boom" {
if string(record.Data) != "boom\n" {
t.Errorf("Unexpected data: %s", record.Data)
}
}

func TestMapEvents(t *testing.T) {
client := client{encoder: MockCodec{}}
events := []publisher.Event{{}}
records, _ := client.mapEvents(events)
okEvents, records, _ := client.mapEvents(events)

if len(records) != 1 {
t.Errorf("Expected 1 records, got %v", len(records))
}

if string(records[0].Data) != "boom" {
if len(okEvents) != 1 {
t.Errorf("Expected 1 ok events, got %v", len(okEvents))
}

if string(records[0].Data) != "boom\n" {
t.Errorf("Unexpected data %s", records[0].Data)
}
}

func TestCollectFailedEvents(t *testing.T) {
client := client{encoder: MockCodec{}}
events := []publisher.Event{{}, {}}
okEvents, _, _ := client.mapEvents(events)

res := firehose.PutRecordBatchOutput{}
entry1 := firehose.PutRecordBatchResponseEntry{}
entry2 := firehose.PutRecordBatchResponseEntry{}
responses := []*firehose.PutRecordBatchResponseEntry{&entry1, &entry2}
res.SetRequestResponses(responses)

{
failed := collectFailedEvents(&res, okEvents)

if len(failed) != 0 {
t.Errorf("Expected 0 failed, got %v", len(failed))
}
}
{
res.SetFailedPutCount(1)
entry2.SetErrorCode("boom")

failed := collectFailedEvents(&res, okEvents)

if len(failed) != 1 {
t.Errorf("Expected 1 failed, got %v", len(failed))
}
}
}