Skip to content

Commit

Permalink
Add auto_create_stream
Browse files Browse the repository at this point in the history
  • Loading branch information
usamj committed Jun 9, 2022
1 parent 107f11f commit 7aaf4ec
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 117 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Run `make` to build `./bin/cloudwatch.so`. Then use with Fluent Bit:
* `log_format`: An optional parameter that can be used to tell CloudWatch the format of the data. A value of `json/emf` enables CloudWatch to extract custom metrics embedded in a JSON payload. See the [Embedded Metric Format](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html).
* `role_arn`: ARN of an IAM role to assume (for cross account access).
* `auto_create_group`: Automatically create log groups (and add tags). Valid values are "true" or "false" (case insensitive). Defaults to false. If you use dynamic variables in your log group name, you may need this to be `true`.
* `auto_create_stream`: Automatically create log streams. Valid values are "true" or "false" (case insensitive). Defaults to true.
* `new_log_group_tags`: Comma/equal delimited string of tags to include with _auto created_ log groups. Example: `"tag=val,cooltag2=my other value"`
* `log_retention_days`: If set to a number greater than zero, and newly create log group's retention policy is set to this many days.
* `endpoint`: Specify a custom endpoint for the CloudWatch Logs API.
Expand Down
33 changes: 24 additions & 9 deletions cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ type TaskMetadata struct {
TaskID string `json:"TaskID,omitempty"`
}

type streamDoesntExistError struct {
streamName string
groupName string
}

func (stream *logStream) isExpired() bool {
if len(stream.logEvents) == 0 && stream.expiration.Before(time.Now()) {
return true
Expand Down Expand Up @@ -130,6 +135,7 @@ type OutputPlugin struct {
logGroupTags map[string]*string
logGroupRetention int64
autoCreateGroup bool
autoCreateStream bool
bufferPool bytebufferpool.Pool
ecsMetadata TaskMetadata
runningInECS bool
Expand All @@ -148,6 +154,7 @@ type OutputPluginConfig struct {
LogKey string
RoleARN string
AutoCreateGroup bool
AutoCreateStream bool
NewLogGroupTags string
LogRetentionDays int64
CWEndpoint string
Expand Down Expand Up @@ -227,6 +234,7 @@ func NewOutputPlugin(config OutputPluginConfig) (*OutputPlugin, error) {
logGroupTags: tagKeysToMap(config.NewLogGroupTags),
logGroupRetention: config.LogRetentionDays,
autoCreateGroup: config.AutoCreateGroup,
autoCreateStream: config.AutoCreateStream,
groups: make(map[string]struct{}),
ecsMetadata: TaskMetadata{},
runningInECS: runningInECS,
Expand Down Expand Up @@ -431,22 +439,23 @@ func (output *OutputPlugin) cleanUpExpiredLogStreams() {
}
}

func (err *streamDoesntExistError) Error() string {
return fmt.Sprintf("error: stream %s doesn't exist in log group %s", err.streamName, err.groupName)
}

func (output *OutputPlugin) getLogStream(e *Event) (*logStream, error) {
stream, ok := output.streams[e.group+e.stream]
if !ok {
// stream doesn't exist, create it
stream, err := output.createStream(e)
// assume the stream exists
stream, err := output.existingLogStream(e)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == cloudwatchlogs.ErrCodeResourceAlreadyExistsException {
// existing stream
return output.existingLogStream(e)
}
// if it doesn't then create it
if _, ok := err.(*streamDoesntExistError); ok {
return output.createStream(e)
}
}
return stream, err
}

return stream, nil
}

Expand Down Expand Up @@ -478,7 +487,10 @@ func (output *OutputPlugin) existingLogStream(e *Event) (*logStream, error) {
}

if stream == nil && resp.NextToken == nil {
return nil, fmt.Errorf("error: does not compute: Log Stream %s could not be created, but also could not be found in the log group", e.stream)
return nil, &streamDoesntExistError{
streamName: e.stream,
groupName: e.group,
}
}

nextToken = resp.NextToken
Expand Down Expand Up @@ -545,6 +557,9 @@ func (output *OutputPlugin) setGroupStreamNames(e *Event) {
}

func (output *OutputPlugin) createStream(e *Event) (*logStream, error) {
if !output.autoCreateStream {
return nil, fmt.Errorf("error: Log Stream %s does not exist in the log group %s and can't be created as autoCreateStream is disabled", e.stream, e.group)
}
output.timer.Check()
_, err := output.client.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
LogGroupName: aws.String(e.group),
Expand Down
Loading

0 comments on commit 7aaf4ec

Please sign in to comment.