Skip to content

Commit

Permalink
fix: Correct records sent to Kinesis Data Streams
Browse files Browse the repository at this point in the history
Fixes #11

`glide up` wasnt necessarily but anyway I verified this to work with the latest version of aws-sdk-go
  • Loading branch information
mumoshu committed May 22, 2018
1 parent f0aad1c commit cd1aafe
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 12 deletions.
7 changes: 5 additions & 2 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 17 additions & 10 deletions streams/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,19 @@ func (client *client) Publish(batch publisher.Batch) error {
observer := client.observer
observer.NewBatch(len(events))

logp.Debug("kinesis", "received events: %v", events)
records, dropped := client.mapEvents(events)
logp.Debug("kinesis", "mapped to records: %v", records)
res, err := client.sendRecords(records)
if err != nil {
logp.Critical("Unable to send batch: %v", err)
observer.Dropped(len(events))
return err
}

processFailedDeliveries(res, batch)

batch.ACK()
logp.Debug("kinesis", "Sent %d records", len(events))
logp.Debug("kinesis", "sent %d records: %v", len(records), records)
observer.Dropped(dropped)
observer.Acked(len(events) - dropped)
return nil
Expand All @@ -70,9 +72,11 @@ func (client *client) Publish(batch publisher.Batch) error {
func (client *client) mapEvents(events []publisher.Event) ([]*kinesis.PutRecordsRequestEntry, int) {
dropped := 0
records := make([]*kinesis.PutRecordsRequestEntry, 0, len(events))
for _, event := range events {
for i := range events {
event := events[i]
record, err := client.mapEvent(&event)
if err != nil {
logp.Debug("kinesis", "failed to map event(%v): %v", event, err)
dropped++
} else {
records = append(records, record)
Expand All @@ -83,14 +87,17 @@ func (client *client) mapEvents(events []publisher.Event) ([]*kinesis.PutRecords
}

func (client *client) mapEvent(event *publisher.Event) (*kinesis.PutRecordsRequestEntry, error) {
serializedEvent, err := client.encoder.Encode(client.beatName, &event.Content)
if err != nil {
if !event.Guaranteed() {
var buf []byte
{
serializedEvent, err := client.encoder.Encode(client.beatName, &event.Content)
if err != nil {
logp.Critical("Unable to encode event: %v", err)
return nil, err
}

logp.Critical("Unable to encode event: %v", err)
return nil, err
// See https://github.com/elastic/beats/blob/5a6630a8bc9b9caf312978f57d1d9193bdab1ac7/libbeat/outputs/kafka/client.go#L163-L164
// You need to copy the byte data like this. Otherwise you see strange issues like all the records sent in a same batch has the same Data.
buf = make([]byte, len(serializedEvent))
copy(buf, serializedEvent)
}

rawPartitionKey, err := event.Content.GetValue(client.partitionKey)
Expand All @@ -103,7 +110,7 @@ func (client *client) mapEvent(event *publisher.Event) (*kinesis.PutRecordsReque
return nil, fmt.Errorf("failed to get partition key: %s(=%v) is found, but not a string", client.partitionKey, rawPartitionKey)
}

return &kinesis.PutRecordsRequestEntry{Data: serializedEvent, PartitionKey: aws.String(partitionKey)}, nil
return &kinesis.PutRecordsRequestEntry{Data: buf, PartitionKey: aws.String(partitionKey)}, nil
}
func (client *client) sendRecords(records []*kinesis.PutRecordsRequestEntry) (*kinesis.PutRecordsOutput, error) {
request := kinesis.PutRecordsInput{
Expand Down

0 comments on commit cd1aafe

Please sign in to comment.