From b5608556cf62e247cc2fb77e2cd2354c1d26f28a Mon Sep 17 00:00:00 2001 From: KUOKA Yusuke Date: Wed, 23 May 2018 23:01:36 +0900 Subject: [PATCH] Fix firehose data corruption (#19) Fixes #10 --- firehose/client.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/firehose/client.go b/firehose/client.go index ff367dd..f41f861 100644 --- a/firehose/client.go +++ b/firehose/client.go @@ -79,17 +79,24 @@ func (client *client) mapEvents(events []publisher.Event) ([]*firehose.Record, i } func (client *client) mapEvent(event *publisher.Event) (*firehose.Record, error) { - serializedEvent, err := client.encoder.Encode(client.beatName, &event.Content) - if err != nil { - if !event.Guaranteed() { + var buf []byte + { + serializedEvent, err := client.encoder.Encode(client.beatName, &event.Content) + if err != nil { + if !event.Guaranteed() { + return nil, err + } + + logp.Critical("Unable to encode event: %v", err) return nil, err } - - logp.Critical("Unable to encode event: %v", err) - return nil, err + // See https://github.com/elastic/beats/blob/5a6630a8bc9b9caf312978f57d1d9193bdab1ac7/libbeat/outputs/kafka/client.go#L163-L164 + // You need to copy the byte data like this. Otherwise you see strange issues like all the records sent in a same batch has the same Data. + buf = make([]byte, len(serializedEvent)) + copy(buf, serializedEvent) } - return &firehose.Record{Data: serializedEvent}, nil + return &firehose.Record{Data: buf}, nil } func (client *client) sendRecords(records []*firehose.Record) (*firehose.PutRecordBatchOutput, error) { request := firehose.PutRecordBatchInput{