Skip to content

Commit

Permalink
Add auto_create_stream
Browse files Browse the repository at this point in the history
  • Loading branch information
usamj committed Jun 7, 2022
1 parent 1be55a9 commit dc33cf2
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 110 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Run `make` to build `./bin/cloudwatch.so`. Then use with Fluent Bit:
* `log_format`: An optional parameter that can be used to tell CloudWatch the format of the data. A value of `json/emf` enables CloudWatch to extract custom metrics embedded in a JSON payload. See the [Embedded Metric Format](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html).
* `role_arn`: ARN of an IAM role to assume (for cross account access).
* `auto_create_group`: Automatically create log groups (and add tags). Valid values are "true" or "false" (case insensitive). Defaults to false. If you use dynamic variables in your log group name, you may need this to be `true`.
* `auto_create_stream`: Automatically creates log streams. Valid values are "true" or "false" (cace insensitive). Defaults to true.
* `new_log_group_tags`: Comma/equal delimited string of tags to include with _auto created_ log groups. Example: `"tag=val,cooltag2=my other value"`
* `log_retention_days`: If set to a number greater than zero, and newly create log group's retention policy is set to this many days.
* `endpoint`: Specify a custom endpoint for the CloudWatch Logs API.
Expand Down
31 changes: 23 additions & 8 deletions cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ type OutputPlugin struct {
logGroupTags map[string]*string
logGroupRetention int64
autoCreateGroup bool
autoCreateStream bool
bufferPool bytebufferpool.Pool
ecsMetadata TaskMetadata
runningInECS bool
Expand All @@ -148,6 +149,7 @@ type OutputPluginConfig struct {
LogKey string
RoleARN string
AutoCreateGroup bool
AutoCreateStream bool
NewLogGroupTags string
LogRetentionDays int64
CWEndpoint string
Expand Down Expand Up @@ -227,6 +229,7 @@ func NewOutputPlugin(config OutputPluginConfig) (*OutputPlugin, error) {
logGroupTags: tagKeysToMap(config.NewLogGroupTags),
logGroupRetention: config.LogRetentionDays,
autoCreateGroup: config.AutoCreateGroup,
autoCreateStream: config.AutoCreateStream,
groups: make(map[string]struct{}),
ecsMetadata: TaskMetadata{},
runningInECS: runningInECS,
Expand Down Expand Up @@ -431,22 +434,31 @@ func (output *OutputPlugin) cleanUpExpiredLogStreams() {
}
}

type StreamDoesntExistError struct {
streamName string
groupName string
}

func (err *StreamDoesntExistError) Error() string {
return fmt.Sprintf("error: stream %s doesn't exist in log group %s", err.streamName, err.groupName)
}

func (output *OutputPlugin) getLogStream(e *Event) (*logStream, error) {
stream, ok := output.streams[e.group+e.stream]
if !ok {
// stream doesn't exist, create it
stream, err := output.createStream(e)
// check if stream exists
stream, err := output.existingLogStream(e)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == cloudwatchlogs.ErrCodeResourceAlreadyExistsException {
// existing stream
return output.existingLogStream(e)
if _, ok := err.(*StreamDoesntExistError); ok {
if output.autoCreateStream {
// log-stream doesn't exist, create it
return output.createStream(e)
}
return nil, fmt.Errorf("error: Log Stream %s does not exist in the log group %s and can't be created as autoCreateStream is disabld", e.stream, e.group)
}
}
return stream, err
}

return stream, nil
}

Expand Down Expand Up @@ -478,7 +490,10 @@ func (output *OutputPlugin) existingLogStream(e *Event) (*logStream, error) {
}

if stream == nil && resp.NextToken == nil {
return nil, fmt.Errorf("error: does not compute: Log Stream %s could not be created, but also could not be found in the log group", e.stream)
return nil, &StreamDoesntExistError{
streamName: e.stream,
groupName: e.group,
}
}

nextToken = resp.NextToken
Expand Down
Loading

0 comments on commit dc33cf2

Please sign in to comment.