From cd1aafeb84c3628cc68c7da731d512d447719923 Mon Sep 17 00:00:00 2001 From: Yusuke KUOKA Date: Tue, 22 May 2018 19:21:29 +0900 Subject: [PATCH] fix: Correct records sent to Kinesis Data Streams Fixes #11 `glide up` wasnt necessarily but anyway I verified this to work with the latest version of aws-sdk-go --- glide.lock | 7 +++++-- streams/client.go | 27 +++++++++++++++++---------- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/glide.lock b/glide.lock index 4bf5c70..d263ac6 100644 --- a/glide.lock +++ b/glide.lock @@ -1,8 +1,8 @@ hash: 12f47a7eb83aa755e36611aec2e5b4373251bf882052de4840c166c1df263c59 -updated: 2018-02-08T00:11:34.347856133+01:00 +updated: 2018-05-22T17:57:57.395546+09:00 imports: - name: github.com/aws/aws-sdk-go - version: 586c9ba6027a527800564282bb843d7e6e7985c9 + version: cf00d544fc3c02f31e05e8d02444a1325d3a840b subpackages: - aws - aws/awserr @@ -20,6 +20,8 @@ imports: - aws/request - aws/session - aws/signer/v4 + - internal/sdkio + - internal/sdkrand - internal/shareddefaults - private/protocol - private/protocol/json/jsonutil @@ -29,6 +31,7 @@ imports: - private/protocol/rest - private/protocol/xml/xmlutil - service/firehose + - service/kinesis - service/sts - name: github.com/go-ini/ini version: 32e4c1e6bc4e7d0d8451aa6b75200d19e37a536a diff --git a/streams/client.go b/streams/client.go index f658ed4..b9bbad4 100644 --- a/streams/client.go +++ b/streams/client.go @@ -51,17 +51,19 @@ func (client *client) Publish(batch publisher.Batch) error { observer := client.observer observer.NewBatch(len(events)) + logp.Debug("kinesis", "received events: %v", events) records, dropped := client.mapEvents(events) + logp.Debug("kinesis", "mapped to records: %v", records) res, err := client.sendRecords(records) if err != nil { logp.Critical("Unable to send batch: %v", err) observer.Dropped(len(events)) return err } - processFailedDeliveries(res, batch) + batch.ACK() - logp.Debug("kinesis", "Sent %d records", len(events)) + logp.Debug("kinesis", "sent %d records: %v", len(records), records) observer.Dropped(dropped) observer.Acked(len(events) - dropped) return nil @@ -70,9 +72,11 @@ func (client *client) Publish(batch publisher.Batch) error { func (client *client) mapEvents(events []publisher.Event) ([]*kinesis.PutRecordsRequestEntry, int) { dropped := 0 records := make([]*kinesis.PutRecordsRequestEntry, 0, len(events)) - for _, event := range events { + for i := range events { + event := events[i] record, err := client.mapEvent(&event) if err != nil { + logp.Debug("kinesis", "failed to map event(%v): %v", event, err) dropped++ } else { records = append(records, record) @@ -83,14 +87,17 @@ func (client *client) mapEvents(events []publisher.Event) ([]*kinesis.PutRecords } func (client *client) mapEvent(event *publisher.Event) (*kinesis.PutRecordsRequestEntry, error) { - serializedEvent, err := client.encoder.Encode(client.beatName, &event.Content) - if err != nil { - if !event.Guaranteed() { + var buf []byte + { + serializedEvent, err := client.encoder.Encode(client.beatName, &event.Content) + if err != nil { + logp.Critical("Unable to encode event: %v", err) return nil, err } - - logp.Critical("Unable to encode event: %v", err) - return nil, err + // See https://github.com/elastic/beats/blob/5a6630a8bc9b9caf312978f57d1d9193bdab1ac7/libbeat/outputs/kafka/client.go#L163-L164 + // You need to copy the byte data like this. Otherwise you see strange issues like all the records sent in a same batch has the same Data. + buf = make([]byte, len(serializedEvent)) + copy(buf, serializedEvent) } rawPartitionKey, err := event.Content.GetValue(client.partitionKey) @@ -103,7 +110,7 @@ func (client *client) mapEvent(event *publisher.Event) (*kinesis.PutRecordsReque return nil, fmt.Errorf("failed to get partition key: %s(=%v) is found, but not a string", client.partitionKey, rawPartitionKey) } - return &kinesis.PutRecordsRequestEntry{Data: serializedEvent, PartitionKey: aws.String(partitionKey)}, nil + return &kinesis.PutRecordsRequestEntry{Data: buf, PartitionKey: aws.String(partitionKey)}, nil } func (client *client) sendRecords(records []*kinesis.PutRecordsRequestEntry) (*kinesis.PutRecordsOutput, error) { request := kinesis.PutRecordsInput{