diff --git a/.travis.yml b/.travis.yml index adcf114..ded6e80 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,6 @@ go: - "1.10" install: - - go get github.com/elastic/beats || true - curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh - dep ensure diff --git a/Makefile b/Makefile index f79bc87..502c480 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ BEAT_DOCKER_IMAGE ?= docker.elastic.co/beats/$(BEAT_NAME):$(BEATS_VERSION) GOPATH ?= $(HOME)/go .PHONY: all -all: vars test beats build +all: vars beats test build .PHONY: vars vars: diff --git a/streams/client.go b/streams/client.go index 83053df..44d0e74 100644 --- a/streams/client.go +++ b/streams/client.go @@ -167,6 +167,10 @@ func collectFailedEvents(res *kinesis.PutRecordsOutput, events []publisher.Event logp.Warn("no record returned from kinesis for event: ", events[i]) continue } + if r.ErrorCode == nil { + logp.Warn("skipping failed event with unexpected state: corresponding kinesis record misses error code: ", r) + continue + } if *r.ErrorCode != "" { failedEvents = append(failedEvents, events[i]) } diff --git a/streams/client_test.go b/streams/client_test.go index 26dd394..9d93165 100644 --- a/streams/client_test.go +++ b/streams/client_test.go @@ -180,6 +180,28 @@ func TestPublishEvents(t *testing.T) { } } + { + // Records with nil error codes should be ignored with some log + client.encoder = StubCodec{dat: []byte("boom"), err: nil} + client.streams = StubClient{ + out: &kinesis.PutRecordsOutput{ + Records: []*kinesis.PutRecordsResultEntry{ + &kinesis.PutRecordsResultEntry{ + ErrorCode: nil, + }, + }, + FailedRecordCount: aws.Int64(1), + }, + } + rest, err := client.publishEvents(events) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(rest) != 0 { + t.Errorf("unexpected number of remaining events: %d", len(rest)) + } + } + { // Kinesis received the event but it was not persisted, probably due to underlying infrastructure failure client.encoder = StubCodec{dat: []byte("boom"), err: nil}