diff --git a/README.md b/README.md index d526608..cbff2da 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ Run `make` to build `./bin/cloudwatch.so`. Then use with Fluent Bit: * `log_format`: An optional parameter that can be used to tell CloudWatch the format of the data. A value of `json/emf` enables CloudWatch to extract custom metrics embedded in a JSON payload. See the [Embedded Metric Format](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html). * `role_arn`: ARN of an IAM role to assume (for cross account access). * `auto_create_group`: Automatically create log groups (and add tags). Valid values are "true" or "false" (case insensitive). Defaults to false. If you use dynamic variables in your log group name, you may need this to be `true`. +* `auto_create_stream`: Automatically create log streams. Valid values are "true" or "false" (case insensitive). Defaults to true. * `new_log_group_tags`: Comma/equal delimited string of tags to include with _auto created_ log groups. Example: `"tag=val,cooltag2=my other value"` * `log_retention_days`: If set to a number greater than zero, and newly create log group's retention policy is set to this many days. * `endpoint`: Specify a custom endpoint for the CloudWatch Logs API. diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 384c65b..d40bfbf 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -97,6 +97,11 @@ type TaskMetadata struct { TaskID string `json:"TaskID,omitempty"` } +type streamDoesntExistError struct { + streamName string + groupName string +} + func (stream *logStream) isExpired() bool { if len(stream.logEvents) == 0 && stream.expiration.Before(time.Now()) { return true @@ -130,6 +135,7 @@ type OutputPlugin struct { logGroupTags map[string]*string logGroupRetention int64 autoCreateGroup bool + autoCreateStream bool bufferPool bytebufferpool.Pool ecsMetadata TaskMetadata runningInECS bool @@ -148,6 +154,7 @@ type OutputPluginConfig struct { LogKey string RoleARN string AutoCreateGroup bool + AutoCreateStream bool NewLogGroupTags string LogRetentionDays int64 CWEndpoint string @@ -227,6 +234,7 @@ func NewOutputPlugin(config OutputPluginConfig) (*OutputPlugin, error) { logGroupTags: tagKeysToMap(config.NewLogGroupTags), logGroupRetention: config.LogRetentionDays, autoCreateGroup: config.AutoCreateGroup, + autoCreateStream: config.AutoCreateStream, groups: make(map[string]struct{}), ecsMetadata: TaskMetadata{}, runningInECS: runningInECS, @@ -431,22 +439,23 @@ func (output *OutputPlugin) cleanUpExpiredLogStreams() { } } +func (err *streamDoesntExistError) Error() string { + return fmt.Sprintf("error: stream %s doesn't exist in log group %s", err.streamName, err.groupName) +} + func (output *OutputPlugin) getLogStream(e *Event) (*logStream, error) { stream, ok := output.streams[e.group+e.stream] if !ok { - // stream doesn't exist, create it - stream, err := output.createStream(e) + // assume the stream exists + stream, err := output.existingLogStream(e) if err != nil { - if awsErr, ok := err.(awserr.Error); ok { - if awsErr.Code() == cloudwatchlogs.ErrCodeResourceAlreadyExistsException { - // existing stream - return output.existingLogStream(e) - } + // if it doesn't then create it + if _, ok := err.(*streamDoesntExistError); ok { + return output.createStream(e) } } return stream, err } - return stream, nil } @@ -478,7 +487,10 @@ func (output *OutputPlugin) existingLogStream(e *Event) (*logStream, error) { } if stream == nil && resp.NextToken == nil { - return nil, fmt.Errorf("error: does not compute: Log Stream %s could not be created, but also could not be found in the log group", e.stream) + return nil, &streamDoesntExistError{ + streamName: e.stream, + groupName: e.group, + } } nextToken = resp.NextToken @@ -545,6 +557,9 @@ func (output *OutputPlugin) setGroupStreamNames(e *Event) { } func (output *OutputPlugin) createStream(e *Event) (*logStream, error) { + if !output.autoCreateStream { + return nil, fmt.Errorf("error: Log Stream %s does not exist in the log group %s and can't be created as autoCreateStream is disabled", e.stream, e.group) + } output.timer.Check() _, err := output.client.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ LogGroupName: aws.String(e.group), diff --git a/cloudwatch/cloudwatch_test.go b/cloudwatch/cloudwatch_test.go index afcec93..6ea5d02 100644 --- a/cloudwatch/cloudwatch_test.go +++ b/cloudwatch/cloudwatch_test.go @@ -130,18 +130,24 @@ func TestAddEvent(t *testing.T) { ctrl := gomock.NewController(t) mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) - mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { - assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") - assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log group name to match") - }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil) + gomock.InOrder( + mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + }).Return(&cloudwatchlogs.DescribeLogStreamsOutput{}, nil), + mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log group name to match") + }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil), + ) output := OutputPlugin{ - logGroupName: testTemplate(testLogGroup), - logStreamPrefix: testLogStreamPrefix, - client: mockCloudWatch, - timer: setupTimeout(), - streams: make(map[string]*logStream), - groups: map[string]struct{}{testLogGroup: {}}, + logGroupName: testTemplate(testLogGroup), + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + timer: setupTimeout(), + streams: make(map[string]*logStream), + groups: map[string]struct{}{testLogGroup: {}}, + autoCreateStream: true, } record := map[interface{}]interface{}{ @@ -156,18 +162,24 @@ func TestTruncateLargeLogEvent(t *testing.T) { ctrl := gomock.NewController(t) mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) - mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { - assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") - assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log group name to match") - }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil) + gomock.InOrder( + mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + }).Return(&cloudwatchlogs.DescribeLogStreamsOutput{}, nil), + mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log group name to match") + }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil), + ) output := OutputPlugin{ - logGroupName: testTemplate(testLogGroup), - logStreamPrefix: testLogStreamPrefix, - client: mockCloudWatch, - timer: setupTimeout(), - streams: make(map[string]*logStream), - groups: map[string]struct{}{testLogGroup: {}}, + logGroupName: testTemplate(testLogGroup), + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + timer: setupTimeout(), + streams: make(map[string]*logStream), + groups: map[string]struct{}{testLogGroup: {}}, + autoCreateStream: true, } record := map[interface{}]interface{}{ @@ -189,18 +201,24 @@ func TestTruncateLargeLogEventWithSpecialCharacterOneTrailingFragments(t *testin ctrl := gomock.NewController(t) mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) - mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { - assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") - assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log group name to match") - }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil) + gomock.InOrder( + mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + }).Return(&cloudwatchlogs.DescribeLogStreamsOutput{}, nil), + mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log group name to match") + }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil), + ) output := OutputPlugin{ - logGroupName: testTemplate(testLogGroup), - logStreamPrefix: testLogStreamPrefix, - client: mockCloudWatch, - timer: setupTimeout(), - streams: make(map[string]*logStream), - groups: map[string]struct{}{testLogGroup: {}}, + logGroupName: testTemplate(testLogGroup), + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + timer: setupTimeout(), + streams: make(map[string]*logStream), + groups: map[string]struct{}{testLogGroup: {}}, + autoCreateStream: true, } var b bytes.Buffer @@ -234,19 +252,24 @@ func TestTruncateLargeLogEventWithSpecialCharacterOneTrailingFragments(t *testin func TestTruncateLargeLogEventWithSpecialCharacterTwoTrailingFragments(t *testing.T) { ctrl := gomock.NewController(t) mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) - - mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { - assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") - assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log group name to match") - }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil) + gomock.InOrder( + mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + }).Return(&cloudwatchlogs.DescribeLogStreamsOutput{}, nil), + mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log group name to match") + }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil), + ) output := OutputPlugin{ - logGroupName: testTemplate(testLogGroup), - logStreamPrefix: testLogStreamPrefix, - client: mockCloudWatch, - timer: setupTimeout(), - streams: make(map[string]*logStream), - groups: map[string]struct{}{testLogGroup: {}}, + logGroupName: testTemplate(testLogGroup), + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + timer: setupTimeout(), + streams: make(map[string]*logStream), + groups: map[string]struct{}{testLogGroup: {}}, + autoCreateStream: true, } var b bytes.Buffer @@ -280,19 +303,24 @@ func TestTruncateLargeLogEventWithSpecialCharacterTwoTrailingFragments(t *testin func TestTruncateLargeLogEventWithSpecialCharacterThreeTrailingFragments(t *testing.T) { ctrl := gomock.NewController(t) mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) - - mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { - assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") - assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log group name to match") - }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil) + gomock.InOrder( + mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + }).Return(&cloudwatchlogs.DescribeLogStreamsOutput{}, nil), + mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log group name to match") + }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil), + ) output := OutputPlugin{ - logGroupName: testTemplate(testLogGroup), - logStreamPrefix: testLogStreamPrefix, - client: mockCloudWatch, - timer: setupTimeout(), - streams: make(map[string]*logStream), - groups: map[string]struct{}{testLogGroup: {}}, + logGroupName: testTemplate(testLogGroup), + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + timer: setupTimeout(), + streams: make(map[string]*logStream), + groups: map[string]struct{}{testLogGroup: {}}, + autoCreateStream: true, } var b bytes.Buffer @@ -330,6 +358,9 @@ func TestAddEventCreateLogGroup(t *testing.T) { gomock.InOrder( mockCloudWatch.EXPECT().CreateLogGroup(gomock.Any()).Return(&cloudwatchlogs.CreateLogGroupOutput{}, nil), mockCloudWatch.EXPECT().PutRetentionPolicy(gomock.Any()).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil), + mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + }).Return(&cloudwatchlogs.DescribeLogStreamsOutput{}, nil), mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log group name to match") @@ -345,6 +376,7 @@ func TestAddEventCreateLogGroup(t *testing.T) { groups: make(map[string]struct{}), logGroupRetention: 14, autoCreateGroup: true, + autoCreateStream: true, } record := map[interface{}]interface{}{ @@ -362,10 +394,6 @@ func TestAddEventExistingStream(t *testing.T) { mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) gomock.InOrder( - mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { - assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") - assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log group name to match") - }).Return(nil, awserr.New(cloudwatchlogs.ErrCodeResourceAlreadyExistsException, "Log Stream already exists", fmt.Errorf("API Error"))), mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") assert.Equal(t, aws.StringValue(input.LogStreamNamePrefix), testLogStreamPrefix+testTag, "Expected log group name to match") @@ -409,15 +437,64 @@ func TestAddEventExistingStream(t *testing.T) { } +func TestAddEventDescribeStreamsException(t *testing.T) { + ctrl := gomock.NewController(t) + mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) + + mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { + }).Return(nil, awserr.New(cloudwatchlogs.ErrCodeResourceNotFoundException, "The specified log group does not exist.", fmt.Errorf("API Error"))) + + output := OutputPlugin{ + logGroupName: testTemplate(testLogGroup), + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + timer: setupTimeout(), + streams: make(map[string]*logStream), + groups: map[string]struct{}{testLogGroup: {}}, + autoCreateStream: true, + } + + record := map[interface{}]interface{}{ + "somekey": []byte("some value"), + } + + retCode := output.AddEvent(&Event{TS: time.Now(), Tag: testTag, Record: record}) + assert.Equal(t, retCode, fluentbit.FLB_RETRY, "Expected return code to FLB_OK") +} + +func TestAddEventAutoCreateDisabled(t *testing.T) { + ctrl := gomock.NewController(t) + mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) + + mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + }).Return(&cloudwatchlogs.DescribeLogStreamsOutput{}, nil) + mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { + }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Times(0) + + output := OutputPlugin{ + logGroupName: testTemplate(testLogGroup), + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + timer: setupTimeout(), + streams: make(map[string]*logStream), + groups: map[string]struct{}{testLogGroup: {}}, + autoCreateStream: false, + } + + record := map[interface{}]interface{}{ + "somekey": []byte("some value"), + } + + retCode := output.AddEvent(&Event{TS: time.Now(), Tag: testTag, Record: record}) + assert.Equal(t, retCode, fluentbit.FLB_RETRY, "Expected return code to FLB_RETRY") +} + func TestAddEventExistingStreamNotFound(t *testing.T) { ctrl := gomock.NewController(t) mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) gomock.InOrder( - mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { - assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") - assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log group name to match") - }).Return(nil, awserr.New(cloudwatchlogs.ErrCodeResourceAlreadyExistsException, "Log Stream already exists", fmt.Errorf("API Error"))), mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") assert.Equal(t, aws.StringValue(input.LogStreamNamePrefix), testLogStreamPrefix+testTag, "Expected log group name to match") @@ -440,15 +517,20 @@ func TestAddEventExistingStreamNotFound(t *testing.T) { }, }, }, nil), + mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log group name to match") + }).Return(nil, awserr.New(cloudwatchlogs.ErrCodeResourceAlreadyExistsException, "Log Stream already exists", fmt.Errorf("API Error"))), ) output := OutputPlugin{ - logGroupName: testTemplate(testLogGroup), - logStreamPrefix: testLogStreamPrefix, - client: mockCloudWatch, - timer: setupTimeout(), - streams: make(map[string]*logStream), - groups: map[string]struct{}{testLogGroup: {}}, + logGroupName: testTemplate(testLogGroup), + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + timer: setupTimeout(), + streams: make(map[string]*logStream), + groups: map[string]struct{}{testLogGroup: {}}, + autoCreateStream: true, } record := map[interface{}]interface{}{ @@ -488,6 +570,9 @@ func TestAddEventAndFlush(t *testing.T) { mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) gomock.InOrder( + mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + }).Return(&cloudwatchlogs.DescribeLogStreamsOutput{}, nil), mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log stream name to match") @@ -501,12 +586,13 @@ func TestAddEventAndFlush(t *testing.T) { ) output := OutputPlugin{ - logGroupName: testTemplate(testLogGroup), - logStreamPrefix: testLogStreamPrefix, - client: mockCloudWatch, - timer: setupTimeout(), - streams: make(map[string]*logStream), - groups: map[string]struct{}{testLogGroup: {}}, + logGroupName: testTemplate(testLogGroup), + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + timer: setupTimeout(), + streams: make(map[string]*logStream), + groups: map[string]struct{}{testLogGroup: {}}, + autoCreateStream: true, } record := map[interface{}]interface{}{ @@ -623,6 +709,9 @@ func TestAddEventAndFlushDataAlreadyAcceptedException(t *testing.T) { mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) gomock.InOrder( + mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + }).Return(&cloudwatchlogs.DescribeLogStreamsOutput{}, nil), mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log stream name to match") @@ -634,12 +723,13 @@ func TestAddEventAndFlushDataAlreadyAcceptedException(t *testing.T) { ) output := OutputPlugin{ - logGroupName: testTemplate(testLogGroup), - logStreamPrefix: testLogStreamPrefix, - client: mockCloudWatch, - timer: setupTimeout(), - streams: make(map[string]*logStream), - groups: map[string]struct{}{testLogGroup: {}}, + logGroupName: testTemplate(testLogGroup), + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + timer: setupTimeout(), + streams: make(map[string]*logStream), + groups: map[string]struct{}{testLogGroup: {}}, + autoCreateStream: true, } record := map[interface{}]interface{}{ @@ -656,6 +746,9 @@ func TestAddEventAndFlushDataInvalidSequenceTokenException(t *testing.T) { mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) gomock.InOrder( + mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + }).Return(&cloudwatchlogs.DescribeLogStreamsOutput{}, nil), mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log stream name to match") @@ -674,12 +767,13 @@ func TestAddEventAndFlushDataInvalidSequenceTokenException(t *testing.T) { ) output := OutputPlugin{ - logGroupName: testTemplate(testLogGroup), - logStreamPrefix: testLogStreamPrefix, - client: mockCloudWatch, - timer: setupTimeout(), - streams: make(map[string]*logStream), - groups: map[string]struct{}{testLogGroup: {}}, + logGroupName: testTemplate(testLogGroup), + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + timer: setupTimeout(), + streams: make(map[string]*logStream), + groups: map[string]struct{}{testLogGroup: {}}, + autoCreateStream: true, } record := map[interface{}]interface{}{ @@ -696,6 +790,9 @@ func TestAddEventAndFlushDataInvalidSequenceTokenNextNullException(t *testing.T) mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) gomock.InOrder( + mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + }).Return(&cloudwatchlogs.DescribeLogStreamsOutput{}, nil), mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log stream name to match") @@ -714,12 +811,13 @@ func TestAddEventAndFlushDataInvalidSequenceTokenNextNullException(t *testing.T) ) output := OutputPlugin{ - logGroupName: testTemplate(testLogGroup), - logStreamPrefix: testLogStreamPrefix, - client: mockCloudWatch, - timer: setupTimeout(), - streams: make(map[string]*logStream), - groups: map[string]struct{}{testLogGroup: {}}, + logGroupName: testTemplate(testLogGroup), + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + timer: setupTimeout(), + streams: make(map[string]*logStream), + groups: map[string]struct{}{testLogGroup: {}}, + autoCreateStream: true, } record := map[interface{}]interface{}{ @@ -736,6 +834,9 @@ func TestAddEventAndDataResourceNotFoundExceptionWithNoLogGroup(t *testing.T) { mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) gomock.InOrder( + mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + }).Return(&cloudwatchlogs.DescribeLogStreamsOutput{}, nil), mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log stream name to match") @@ -750,12 +851,13 @@ func TestAddEventAndDataResourceNotFoundExceptionWithNoLogGroup(t *testing.T) { ) output := OutputPlugin{ - logGroupName: testTemplate(testLogGroup), - logStreamPrefix: testLogStreamPrefix, - client: mockCloudWatch, - timer: setupTimeout(), - streams: make(map[string]*logStream), - groups: map[string]struct{}{testLogGroup: {}}, + logGroupName: testTemplate(testLogGroup), + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + timer: setupTimeout(), + streams: make(map[string]*logStream), + groups: map[string]struct{}{testLogGroup: {}}, + autoCreateStream: true, } record := map[interface{}]interface{}{ @@ -771,6 +873,9 @@ func TestAddEventAndDataResourceNotFoundExceptionWithNoLogStream(t *testing.T) { mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) gomock.InOrder( + mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + }).Return(&cloudwatchlogs.DescribeLogStreamsOutput{}, nil), mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log stream name to match") @@ -786,12 +891,13 @@ func TestAddEventAndDataResourceNotFoundExceptionWithNoLogStream(t *testing.T) { ) output := OutputPlugin{ - logGroupName: testTemplate(testLogGroup), - logStreamPrefix: testLogStreamPrefix, - client: mockCloudWatch, - timer: setupTimeout(), - streams: make(map[string]*logStream), - groups: map[string]struct{}{testLogGroup: {}}, + logGroupName: testTemplate(testLogGroup), + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + timer: setupTimeout(), + streams: make(map[string]*logStream), + groups: map[string]struct{}{testLogGroup: {}}, + autoCreateStream: true, } record := map[interface{}]interface{}{ @@ -893,6 +999,9 @@ func setupLimitTestOutput(t *testing.T, times int) OutputPlugin { mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) gomock.InOrder( + mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + }).Return(&cloudwatchlogs.DescribeLogStreamsOutput{}, nil), mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).AnyTimes().Do(func(input *cloudwatchlogs.CreateLogStreamInput) { assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log stream name to match") @@ -901,12 +1010,13 @@ func setupLimitTestOutput(t *testing.T, times int) OutputPlugin { ) return OutputPlugin{ - logGroupName: testTemplate(testLogGroup), - logStreamPrefix: testLogStreamPrefix, - client: mockCloudWatch, - timer: setupTimeout(), - streams: make(map[string]*logStream), - groups: map[string]struct{}{testLogGroup: {}}, + logGroupName: testTemplate(testLogGroup), + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + timer: setupTimeout(), + streams: make(map[string]*logStream), + groups: map[string]struct{}{testLogGroup: {}}, + autoCreateStream: true, } } diff --git a/fluent-bit-cloudwatch.go b/fluent-bit-cloudwatch.go index 47346a5..0f982c2 100644 --- a/fluent-bit-cloudwatch.go +++ b/fluent-bit-cloudwatch.go @@ -103,6 +103,9 @@ func getConfiguration(ctx unsafe.Pointer, pluginID int) cloudwatch.OutputPluginC config.AutoCreateGroup = getBoolParam(ctx, "auto_create_group", false) logrus.Infof("[cloudwatch %d] plugin parameter auto_create_group = '%v'", pluginID, config.AutoCreateGroup) + config.AutoCreateStream = getBoolParam(ctx, "auto_create_stream", true) + logrus.Infof("[cloudwatch %d] plugin parameter auto_create_stream = '%v'", pluginID, config.AutoCreateStream) + config.NewLogGroupTags = output.FLBPluginConfigKey(ctx, "new_log_group_tags") logrus.Infof("[cloudwatch %d] plugin parameter new_log_group_tags = '%s'", pluginID, config.NewLogGroupTags)