Skip to content

Commit

Permalink
fix(streams): Fix panicing on a retry
Browse files Browse the repository at this point in the history
Turns out awsbeats was calling `batch.RetryEvents` on different data and timing. Revise the implementation according to the [official beat outputs](https://github.com/elastic/beats/blob/c4af03c51373c1de7daaca660f5d21b3f602771c/libbeat/outputs/elasticsearch/client.go#L234)

Fixes s12v#29
  • Loading branch information
mumoshu committed May 28, 2018
1 parent 124dcd9 commit ce91e04
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 26 deletions.
68 changes: 43 additions & 25 deletions streams/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,42 +57,60 @@ func (client *client) Connect() error {

func (client *client) Publish(batch publisher.Batch) error {
events := batch.Events()
rest, err := client.publishEvents(events)
if len(rest) == 0 {
// We have to ACK only when all the submission succeeded
// Ref: https://github.com/elastic/beats/blob/c4af03c51373c1de7daaca660f5d21b3f602771c/libbeat/outputs/elasticsearch/client.go#L232
batch.ACK()
} else {
// Mark the failed events to retry
// Ref: https://github.com/elastic/beats/blob/c4af03c51373c1de7daaca660f5d21b3f602771c/libbeat/outputs/elasticsearch/client.go#L234
batch.RetryEvents(rest)
}
return err
}

func (client *client) publishEvents(events []publisher.Event) ([]publisher.Event, error) {
observer := client.observer
observer.NewBatch(len(events))

logp.Debug("kinesis", "received events: %v", events)
records, dropped := client.mapEvents(events)
okEvents, records, dropped := client.mapEvents(events)
if dropped > 0 {
logp.Debug("kinesis", "sent %d records: %v", len(records), records)
observer.Dropped(dropped)
observer.Acked(len(okEvents))
}
logp.Debug("kinesis", "mapped to records: %v", records)
res, err := client.sendRecords(records)
res, err := client.putKinesisRecords(records)
if err != nil {
logp.Critical("Unable to send batch: %v", err)
observer.Dropped(len(events))
return err
if res == nil {
logp.Critical("permanently failed to send %d records: %v", len(events), err)
return []publisher.Event{}, nil
}
failed := collectFailedEvents(res, events)
logp.Info("retrying %d events on error: %v", len(failed), err)
return failed, err
}
processFailedDeliveries(res, batch)

batch.ACK()
logp.Debug("kinesis", "sent %d records: %v", len(records), records)
observer.Dropped(dropped)
observer.Acked(len(events) - dropped)
return nil
return []publisher.Event{}, nil
}

func (client *client) mapEvents(events []publisher.Event) ([]*kinesis.PutRecordsRequestEntry, int) {
func (client *client) mapEvents(events []publisher.Event) ([]publisher.Event, []*kinesis.PutRecordsRequestEntry, int) {
dropped := 0
records := make([]*kinesis.PutRecordsRequestEntry, 0, len(events))
okEvents := make([]publisher.Event, 0, len(events))
for i := range events {
event := events[i]
record, err := client.mapEvent(&event)
if err != nil {
logp.Debug("kinesis", "failed to map event(%v): %v", event, err)
dropped++
} else {
okEvents = append(okEvents, event)
records = append(records, record)
}
}

return records, dropped
return okEvents, records, dropped
}

func (client *client) mapEvent(event *publisher.Event) (*kinesis.PutRecordsRequestEntry, error) {
Expand All @@ -116,17 +134,20 @@ func (client *client) mapEvent(event *publisher.Event) (*kinesis.PutRecordsReque

return &kinesis.PutRecordsRequestEntry{Data: buf, PartitionKey: aws.String(partitionKey)}, nil
}
func (client *client) sendRecords(records []*kinesis.PutRecordsRequestEntry) (*kinesis.PutRecordsOutput, error) {
func (client *client) putKinesisRecords(records []*kinesis.PutRecordsRequestEntry) (*kinesis.PutRecordsOutput, error) {
request := kinesis.PutRecordsInput{
StreamName: &client.streamName,
Records: records,
}
return client.streams.PutRecords(&request)
res, err := client.streams.PutRecords(&request)
if err != nil {
return nil, fmt.Errorf("failed to put records: %v", err)
}
return res, nil
}

func processFailedDeliveries(res *kinesis.PutRecordsOutput, batch publisher.Batch) {
func collectFailedEvents(res *kinesis.PutRecordsOutput, events []publisher.Event) []publisher.Event {
if *res.FailedRecordCount > 0 {
events := batch.Events()
failedEvents := make([]publisher.Event, 0)
records := res.Records
for i, r := range records {
Expand All @@ -139,11 +160,8 @@ func processFailedDeliveries(res *kinesis.PutRecordsOutput, batch publisher.Batc
failedEvents = append(failedEvents, events[i])
}
}

if len(failedEvents) > 0 {
logp.Warn("Retrying %d events", len(failedEvents))
batch.RetryEvents(failedEvents)
return
}
logp.Warn("Retrying %d events", len(failedEvents))
return failedEvents
}
return []publisher.Event{}
}
6 changes: 5 additions & 1 deletion streams/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,16 @@ func TestMapEvents(t *testing.T) {
client := client{encoder: MockCodec{}, partitionKeyProvider: provider}
event := publisher.Event{Content: beat.Event{Fields: common.MapStr{fieldForPartitionKey: expectedPartitionKey}}}
events := []publisher.Event{event}
records, _ := client.mapEvents(events)
okEvents, records, _ := client.mapEvents(events)

if len(records) != 1 {
t.Errorf("Expected 1 records, got %v", len(records))
}

if len(okEvents) != 1 {
t.Errorf("Expected 1 ok events, got %v", len(okEvents))
}

if string(records[0].Data) != "boom" {
t.Errorf("Unexpected data %s", records[0].Data)
}
Expand Down

0 comments on commit ce91e04

Please sign in to comment.