diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 8f1bc2e..384c65b 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -756,7 +756,12 @@ func (output *OutputPlugin) putLogEvents(stream *logStream) error { } else if awsErr.Code() == cloudwatchlogs.ErrCodeInvalidSequenceTokenException { // sequence code is bad, grab the correct one and retry parts := strings.Split(awsErr.Message(), " ") - stream.nextSequenceToken = &parts[len(parts)-1] + nextSequenceToken := &parts[len(parts)-1] + // If this is a new stream then the error will end like "The next expected sequenceToken is: null" and sequenceToken should be nil + if strings.HasPrefix(*nextSequenceToken, "null") { + nextSequenceToken = nil + } + stream.nextSequenceToken = nextSequenceToken return output.putLogEvents(stream) } else if awsErr.Code() == cloudwatchlogs.ErrCodeResourceNotFoundException { diff --git a/cloudwatch/cloudwatch_test.go b/cloudwatch/cloudwatch_test.go index fbc0bbc..afcec93 100644 --- a/cloudwatch/cloudwatch_test.go +++ b/cloudwatch/cloudwatch_test.go @@ -691,6 +691,46 @@ func TestAddEventAndFlushDataInvalidSequenceTokenException(t *testing.T) { output.Flush() } +func TestAddEventAndFlushDataInvalidSequenceTokenNextNullException(t *testing.T) { + ctrl := gomock.NewController(t) + mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) + + gomock.InOrder( + mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log stream name to match") + }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil), + mockCloudWatch.EXPECT().PutLogEvents(gomock.Any()).Do(func(input *cloudwatchlogs.PutLogEventsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log stream name to match") + }).Return(nil, awserr.New(cloudwatchlogs.ErrCodeInvalidSequenceTokenException, "The given sequenceToken is invalid; The next expected sequenceToken is: null", fmt.Errorf("API Error"))), + mockCloudWatch.EXPECT().PutLogEvents(gomock.Any()).Do(func(input *cloudwatchlogs.PutLogEventsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log stream name to match") + assert.Nil(t, input.SequenceToken, "Expected sequence token to be nil") + }).Return(&cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String("token"), + }, nil), + ) + + output := OutputPlugin{ + logGroupName: testTemplate(testLogGroup), + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + timer: setupTimeout(), + streams: make(map[string]*logStream), + groups: map[string]struct{}{testLogGroup: {}}, + } + + record := map[interface{}]interface{}{ + "somekey": []byte("some value"), + } + + retCode := output.AddEvent(&Event{TS: time.Now(), Tag: testTag, Record: record}) + assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to FLB_OK") + output.Flush() +} + func TestAddEventAndDataResourceNotFoundExceptionWithNoLogGroup(t *testing.T) { ctrl := gomock.NewController(t) mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl)