Skip to content

Commit

Permalink
Fix firehose data corruption (#19)
Browse files Browse the repository at this point in the history
Fixes #10
  • Loading branch information
mumoshu authored and Sergey Novikov committed May 23, 2018
1 parent 01068c8 commit b560855
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions firehose/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,24 @@ func (client *client) mapEvents(events []publisher.Event) ([]*firehose.Record, i
}

func (client *client) mapEvent(event *publisher.Event) (*firehose.Record, error) {
serializedEvent, err := client.encoder.Encode(client.beatName, &event.Content)
if err != nil {
if !event.Guaranteed() {
var buf []byte
{
serializedEvent, err := client.encoder.Encode(client.beatName, &event.Content)
if err != nil {
if !event.Guaranteed() {
return nil, err
}

logp.Critical("Unable to encode event: %v", err)
return nil, err
}

logp.Critical("Unable to encode event: %v", err)
return nil, err
// See https://github.com/elastic/beats/blob/5a6630a8bc9b9caf312978f57d1d9193bdab1ac7/libbeat/outputs/kafka/client.go#L163-L164
// You need to copy the byte data like this. Otherwise you see strange issues like all the records sent in a same batch has the same Data.
buf = make([]byte, len(serializedEvent))
copy(buf, serializedEvent)
}

return &firehose.Record{Data: serializedEvent}, nil
return &firehose.Record{Data: buf}, nil
}
func (client *client) sendRecords(records []*firehose.Record) (*firehose.PutRecordBatchOutput, error) {
request := firehose.PutRecordBatchInput{
Expand Down

0 comments on commit b560855

Please sign in to comment.