Skip to content

Commit

Permalink
fix(streams): Panic on collecting failed records (#42)
Browse files Browse the repository at this point in the history
* fix(streams): Panic on collecting failed records

Fixes #41

* fix(ci): Fix travis build failures due to recent backward-incompatible changes in libbeat

Fixes https://travis-ci.org/s12v/awsbeats/builds/402503570

* test(streams): Add a test-case for the fix
  • Loading branch information
mumoshu authored Jul 12, 2018
1 parent 62f0234 commit 0b86784
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 2 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go:
- "1.10"

install:
- go get github.com/elastic/beats || true
- curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh
- dep ensure

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ BEAT_DOCKER_IMAGE ?= docker.elastic.co/beats/$(BEAT_NAME):$(BEATS_VERSION)
GOPATH ?= $(HOME)/go

.PHONY: all
all: vars test beats build
all: vars beats test build

.PHONY: vars
vars:
Expand Down
4 changes: 4 additions & 0 deletions streams/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ func collectFailedEvents(res *kinesis.PutRecordsOutput, events []publisher.Event
logp.Warn("no record returned from kinesis for event: ", events[i])
continue
}
if r.ErrorCode == nil {
logp.Warn("skipping failed event with unexpected state: corresponding kinesis record misses error code: ", r)
continue
}
if *r.ErrorCode != "" {
failedEvents = append(failedEvents, events[i])
}
Expand Down
22 changes: 22 additions & 0 deletions streams/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,28 @@ func TestPublishEvents(t *testing.T) {
}
}

{
// Records with nil error codes should be ignored with some log
client.encoder = StubCodec{dat: []byte("boom"), err: nil}
client.streams = StubClient{
out: &kinesis.PutRecordsOutput{
Records: []*kinesis.PutRecordsResultEntry{
&kinesis.PutRecordsResultEntry{
ErrorCode: nil,
},
},
FailedRecordCount: aws.Int64(1),
},
}
rest, err := client.publishEvents(events)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(rest) != 0 {
t.Errorf("unexpected number of remaining events: %d", len(rest))
}
}

{
// Kinesis received the event but it was not persisted, probably due to underlying infrastructure failure
client.encoder = StubCodec{dat: []byte("boom"), err: nil}
Expand Down

0 comments on commit 0b86784

Please sign in to comment.