Skip to content

Commit

Permalink
Add auto_create_stream
Browse files Browse the repository at this point in the history
  • Loading branch information
usamj committed Jun 8, 2022
1 parent dc0cc54 commit c031498
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 112 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Run `make` to build `./bin/cloudwatch.so`. Then use with Fluent Bit:
* `log_format`: An optional parameter that can be used to tell CloudWatch the format of the data. A value of `json/emf` enables CloudWatch to extract custom metrics embedded in a JSON payload. See the [Embedded Metric Format](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html).
* `role_arn`: ARN of an IAM role to assume (for cross account access).
* `auto_create_group`: Automatically create log groups (and add tags). Valid values are "true" or "false" (case insensitive). Defaults to false. If you use dynamic variables in your log group name, you may need this to be `true`.
* `auto_create_stream`: Automatically create log streams. Valid values are "true" or "false" (cace insensitive). Defaults to true.
* `new_log_group_tags`: Comma/equal delimited string of tags to include with _auto created_ log groups. Example: `"tag=val,cooltag2=my other value"`
* `log_retention_days`: If set to a number greater than zero, and newly create log group's retention policy is set to this many days.
* `endpoint`: Specify a custom endpoint for the CloudWatch Logs API.
Expand Down
38 changes: 28 additions & 10 deletions cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ type TaskMetadata struct {
TaskID string `json:"TaskID,omitempty"`
}

type StreamDoesntExistError struct {
streamName string
groupName string
}

func (stream *logStream) isExpired() bool {
if len(stream.logEvents) == 0 && stream.expiration.Before(time.Now()) {
return true
Expand Down Expand Up @@ -130,6 +135,7 @@ type OutputPlugin struct {
logGroupTags map[string]*string
logGroupRetention int64
autoCreateGroup bool
autoCreateStream bool
bufferPool bytebufferpool.Pool
ecsMetadata TaskMetadata
runningInECS bool
Expand All @@ -148,6 +154,7 @@ type OutputPluginConfig struct {
LogKey string
RoleARN string
AutoCreateGroup bool
AutoCreateStream bool
NewLogGroupTags string
LogRetentionDays int64
CWEndpoint string
Expand Down Expand Up @@ -227,6 +234,7 @@ func NewOutputPlugin(config OutputPluginConfig) (*OutputPlugin, error) {
logGroupTags: tagKeysToMap(config.NewLogGroupTags),
logGroupRetention: config.LogRetentionDays,
autoCreateGroup: config.AutoCreateGroup,
autoCreateStream: config.AutoCreateStream,
groups: make(map[string]struct{}),
ecsMetadata: TaskMetadata{},
runningInECS: runningInECS,
Expand Down Expand Up @@ -431,22 +439,22 @@ func (output *OutputPlugin) cleanUpExpiredLogStreams() {
}
}

func (err *StreamDoesntExistError) Error() string {
return fmt.Sprintf("error: stream %s doesn't exist in log group %s", err.streamName, err.groupName)
}

func (output *OutputPlugin) getLogStream(e *Event) (*logStream, error) {
stream, ok := output.streams[e.group+e.stream]
if !ok {
// stream doesn't exist, create it
stream, err := output.createStream(e)
// check if stream exists
stream, err := output.existingLogStream(e)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == cloudwatchlogs.ErrCodeResourceAlreadyExistsException {
// existing stream
return output.existingLogStream(e)
}
if _, ok := err.(*StreamDoesntExistError); ok {
return output.createStream(e)
}
}
return stream, err
}

return stream, nil
}

Expand Down Expand Up @@ -478,7 +486,10 @@ func (output *OutputPlugin) existingLogStream(e *Event) (*logStream, error) {
}

if stream == nil && resp.NextToken == nil {
return nil, fmt.Errorf("error: does not compute: Log Stream %s could not be created, but also could not be found in the log group", e.stream)
return nil, &StreamDoesntExistError{
streamName: e.stream,
groupName: e.group,
}
}

nextToken = resp.NextToken
Expand Down Expand Up @@ -545,6 +556,9 @@ func (output *OutputPlugin) setGroupStreamNames(e *Event) {
}

func (output *OutputPlugin) createStream(e *Event) (*logStream, error) {
if !output.autoCreateStream {
return nil, fmt.Errorf("error: Log Stream %s does not exist in the log group %s and can't be created as autoCreateStream is disabled", e.stream, e.group)
}
output.timer.Check()
_, err := output.client.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
LogGroupName: aws.String(e.group),
Expand Down Expand Up @@ -756,7 +770,11 @@ func (output *OutputPlugin) putLogEvents(stream *logStream) error {
} else if awsErr.Code() == cloudwatchlogs.ErrCodeInvalidSequenceTokenException {
// sequence code is bad, grab the correct one and retry
parts := strings.Split(awsErr.Message(), " ")
stream.nextSequenceToken = &parts[len(parts)-1]
nextSequenceToken := &parts[len(parts)-1]
if strings.HasPrefix(*nextSequenceToken, "null") {
nextSequenceToken = nil
}
stream.nextSequenceToken = nextSequenceToken

return output.putLogEvents(stream)
} else if awsErr.Code() == cloudwatchlogs.ErrCodeResourceNotFoundException {
Expand Down
Loading

0 comments on commit c031498

Please sign in to comment.