Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default fallback log group and stream names #111

Merged
merged 4 commits into from
Oct 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ jobs:
runs-on: ubuntu-latest
steps:

- name: Set up Go 1.12
- name: Set up Go 1.15
uses: actions/setup-go@v2
with:
go-version: 1.12
go-version: 1.15
id: go

- name: Check out code into the Go module directory
uses: actions/checkout@v2

- name: golint
run: go get -u golang.org/x/lint/golint

- name: Build
run: make build test
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ Run `make` to build `./bin/cloudwatch.so`. Then use with Fluent Bit:
* `region`: The AWS region.
* `log_group_name`: The name of the CloudWatch Log Group that you want log records sent to. This value allows a template in the form of `$(variable)`. See `log_stream_name` description for more. The app will attempt to create missing log groups, and will throw an error if it does not have access.
* `log_stream_name`: The name of the CloudWatch Log Stream that you want log records sent to. This value allows a template in the form of `$(variable)` where
* `default_log_group_name`: This required variable is the fallback in case any variables in `log_group_name` fails to parse. Defaults to `fluentbit-default`
* `default_log_stream_name`: This required variable is the fallback in case any variables in `log_stream_name` fails to parse. Defaults to `/fluentbit-default`
`variable` is a map key name in the log message. To access sub-values in the map
use the form `$(variable['subkey'])`. Special values: `$(tag)` references the full tag name, `$(tag[0])` and `$(tag[1])` are the first and second values of log tag split on periods. You may access any member by index, 0 through 9.
* `log_stream_prefix`: (deprecated) Prefix for the Log Stream name. Setting this to `prefix-` is the same as setting `log_stream_name = prefix-$(tag)`.
Expand Down
66 changes: 34 additions & 32 deletions cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,10 @@ type fastTemplate struct {
// OutputPlugin is the CloudWatch Logs Fluent Bit output plugin
type OutputPlugin struct {
logGroupName *fastTemplate
defaultLogGroupName string
logStreamPrefix string
logStreamName *fastTemplate
defaultLogStreamName string
logKey string
client LogsClient
streams map[string]*logStream
Expand All @@ -120,20 +122,22 @@ type OutputPlugin struct {

// OutputPluginConfig is the input information used by NewOutputPlugin to create a new OutputPlugin
type OutputPluginConfig struct {
Region string
LogGroupName string
LogStreamPrefix string
LogStreamName string
LogKey string
RoleARN string
AutoCreateGroup bool
NewLogGroupTags string
LogRetentionDays int64
CWEndpoint string
STSEndpoint string
CredsEndpoint string
PluginInstanceID int
LogFormat string
Region string
LogGroupName string
DefaultLogGroupName string
LogStreamPrefix string
LogStreamName string
DefaultLogStreamName string
LogKey string
RoleARN string
AutoCreateGroup bool
NewLogGroupTags string
LogRetentionDays int64
CWEndpoint string
STSEndpoint string
CredsEndpoint string
PluginInstanceID int
LogFormat string
}

// Validate checks the configuration input for an OutputPlugin instances
Expand Down Expand Up @@ -183,6 +187,8 @@ func NewOutputPlugin(config OutputPluginConfig) (*OutputPlugin, error) {
logGroupName: logGroupTemplate,
logStreamName: logStreamTemplate,
logStreamPrefix: config.LogStreamPrefix,
defaultLogGroupName: config.DefaultLogGroupName,
defaultLogStreamName: config.DefaultLogStreamName,
logKey: config.LogKey,
client: client,
timer: timer,
Expand Down Expand Up @@ -440,19 +446,16 @@ func (output *OutputPlugin) setGroupStreamNames(e *Event) {
s := &sanitizer{sanitize: sanitizeGroup, buf: output.bufferPool.Get()}

if _, err := parseDataMapTags(e, logTagSplit, output.logGroupName, s); err != nil {
logrus.Errorf("[cloudwatch %d] parsing log_group_name template: %v", output.PluginInstanceID, err)
}

if e.group = s.buf.String(); len(e.group) == 0 {
e.group = output.logGroupName.String
}

if len(e.group) > maxGroupStreamLength {
e.group = output.defaultLogGroupName
logrus.Errorf("[cloudwatch %d] parsing log_group_name template '%s' "+
"(using value of default_log_group_name instead): %v",
output.PluginInstanceID, output.logGroupName.String, err)
} else if e.group = s.buf.String(); len(e.group) == 0 {
e.group = output.defaultLogGroupName
} else if len(e.group) > maxGroupStreamLength {
e.group = e.group[:maxGroupStreamLength]
}

s.buf.Reset()

if output.logStreamPrefix != "" {
e.stream = output.logStreamPrefix + e.Tag
output.bufferPool.Put(s.buf)
Expand All @@ -461,16 +464,15 @@ func (output *OutputPlugin) setGroupStreamNames(e *Event) {
}

s.sanitize = sanitizeStream
s.buf.Reset()

if _, err := parseDataMapTags(e, logTagSplit, output.logStreamName, s); err != nil {
logrus.Errorf("[cloudwatch %d] parsing log_stream_name template: %v", output.PluginInstanceID, err)
}

if e.stream = s.buf.String(); len(e.stream) == 0 {
e.stream = output.logStreamName.String
}

if len(e.stream) > maxGroupStreamLength {
e.stream = output.defaultLogStreamName
logrus.Errorf("[cloudwatch %d] parsing log_stream_name template '%s': %v",
output.PluginInstanceID, output.logStreamName.String, err)
} else if e.stream = s.buf.String(); len(e.stream) == 0 {
e.stream = output.defaultLogStreamName
} else if len(e.stream) > maxGroupStreamLength {
e.stream = e.stream[:maxGroupStreamLength]
}

Expand Down
28 changes: 25 additions & 3 deletions cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,28 +334,50 @@ func TestSetGroupStreamNames(t *testing.T) {

// Test against non-template name.
output := OutputPlugin{
logStreamName: testTemplate("/aws/ecs/test-stream-name"),
logGroupName: testTemplate(""),
logStreamName: testTemplate("/aws/ecs/test-stream-name"),
logGroupName: testTemplate(""),
defaultLogGroupName: "fluentbit-default",
defaultLogStreamName: "/fluentbit-default",
}

output.setGroupStreamNames(e)
assert.Equal(t, "/aws/ecs/test-stream-name", e.stream,
"The provided stream name must be returned exactly, without modifications.")

output.logStreamName = testTemplate("")
output.setGroupStreamNames(e)
assert.Equal(t, output.defaultLogStreamName, e.stream,
"The default stream name must be set when no stream name is provided.")

// Test against a simple log stream prefix.
output.logStreamPrefix = "/aws/ecs/test-stream-prefix/"
output.setGroupStreamNames(e)
assert.Equal(t, output.logStreamPrefix+"syslog.0", e.stream,
"The provided stream prefix must be prefixed to the provided tag name.")

// Test replacing items from template variables.
output.logStreamPrefix = ""
output.logStreamName = testTemplate("/aws/ecs/$(tag[0])/$(tag[1])/$(details['region'])/$(details['az'])/$(ident)")
output.setGroupStreamNames(e)
assert.Equal(t, "/aws/ecs/syslog/0/us-west-2/a/cron", e.stream,
"The stream name template was not correctly parsed.")
assert.Equal(t, output.defaultLogGroupName, e.group,
"The default log group name must be set when no log group is provided.")

// Test another bad template ] missing.
output.logStreamName = testTemplate("/aws/ecs/$(details['region')")
output.setGroupStreamNames(e)
assert.Equal(t, "/aws/ecs/['region'", e.stream,
"The provided stream name must match when parsing fails.")
"The provided stream name must match when the tag is incomplete.")

// Make sure we get default group and stream names when their variables cannot be parsed.
output.logStreamName = testTemplate("/aws/ecs/$(details['activity'])")
output.logGroupName = testTemplate("$(details['activity'])")
output.setGroupStreamNames(e)
assert.Equal(t, output.defaultLogStreamName, e.stream,
"The default stream name must return when elements are missing.")
assert.Equal(t, output.defaultLogGroupName, e.group,
"The default group name must return when elements are missing.")

// Test that log stream and log group names get truncated to the maximum allowed.
b := make([]byte, maxGroupStreamLength*2)
Expand Down
16 changes: 12 additions & 4 deletions cloudwatch/helpers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cloudwatch

import (
"fmt"
"io"
"strconv"
"strings"
Expand All @@ -9,9 +10,17 @@ import (
"github.com/valyala/fasttemplate"
)

// Errors output by the help procedures.
var (
ErrNoTagValue = fmt.Errorf("not enough dots in the tag to satisfy the index position")
ErrMissingTagName = fmt.Errorf("tag name not found")
ErrMissingSubName = fmt.Errorf("sub-tag name not found")
)

// newTemplate is the only place you'll find the template start and end tags.
func newTemplate(template string) (*fastTemplate, error) {
t, err := fasttemplate.NewTemplate(template, "$(", ")")

return &fastTemplate{Template: t, String: template}, err
}

Expand Down Expand Up @@ -63,7 +72,7 @@ func parseKeysTemplate(data map[interface{}]interface{}, keys string, w io.Write
data = val // drill down another level.
return 0, nil
default: // missing
return w.Write([]byte(tag))
return 0, fmt.Errorf("%s: %w", tag, ErrMissingSubName)
}
})
}
Expand All @@ -86,8 +95,7 @@ func parseDataMapTags(e *Event, logTags []string, t *fastTemplate, w io.Writer)
case len(tag) >= 5: // input string is at least "tag[x" where x is hopefully an integer 0-9.
// The index value is always in the same position: 4:5 (this is why supporting more than 0-9 is rough)
if v, _ = strconv.Atoi(tag[4:5]); len(logTags) <= v {
// not enough dots the tag to satisfy the index position, so return whatever the input string was.
return w.Write([]byte("tag" + tag[4:5]))
return 0, fmt.Errorf("%s: %w", tag, ErrNoTagValue)
}

return w.Write([]byte(logTags[v]))
Expand All @@ -105,7 +113,7 @@ func parseDataMapTags(e *Event, logTags []string, t *fastTemplate, w io.Writer)
// we should never land here because the interface{} map should have already been converted to strings.
return w.Write(val)
default: // missing
return w.Write([]byte(tag))
return 0, fmt.Errorf("%s: %w", tag, ErrMissingTagName)
}
})
}
Expand Down
31 changes: 25 additions & 6 deletions cloudwatch/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func TestTagKeysToMap(t *testing.T) {
func TestParseDataMapTags(t *testing.T) {
t.Parallel()

template := "$(missing).$(tag).$(pam['item2']['subitem2']['more']).$(pam['item']).$(pam['item2'])." +
"$(pam['item2']['subitem'])-$(pam['item2']['subitem55'])-$(pam['item2']['subitem2']['more'])-$(tag[1])-$(tag[6])"
template := testTemplate("$(tag).$(pam['item2']['subitem2']['more']).$(pam['item']).$(pam['item2'])." +
"$(pam['item2']['subitem'])-$(pam['item2']['subitem2']['more'])-$(tag[1])")
data := map[interface{}]interface{}{
"pam": map[interface{}]interface{}{
"item": "soup",
Expand All @@ -38,10 +38,29 @@ func TestParseDataMapTags(t *testing.T) {
s := &sanitizer{buf: bytebufferpool.Get(), sanitize: sanitizeGroup}
defer bytebufferpool.Put(s.buf)

_, err := parseDataMapTags(&Event{Record: data, Tag: "syslog.0"}, []string{"syslog", "0"}, testTemplate(template), s)

assert.Nil(t, err)
assert.Equal(t, "missing.syslog.0.final.soup..SubIt3m-subitem55-final-0-tag6", s.buf.String(), "Rendered string is incorrect.")
_, err := parseDataMapTags(&Event{Record: data, Tag: "syslog.0"}, []string{"syslog", "0"}, template, s)

assert.Nil(t, err, err)
assert.Equal(t, "syslog.0.final.soup..SubIt3m-final-0", s.buf.String(), "Rendered string is incorrect.")

// Test missing variables. These should always return an error and an empty string.
s.buf.Reset()
template = testTemplate("$(missing-variable).stuff")
_, err = parseDataMapTags(&Event{Record: data, Tag: "syslog.0"}, []string{"syslog", "0"}, template, s)
assert.EqualError(t, err, "missing-variable: "+ErrMissingTagName.Error(), "the wrong error was returned")
assert.Empty(t, s.buf.String())

s.buf.Reset()
template = testTemplate("$(pam['item6']).stuff")
_, err = parseDataMapTags(&Event{Record: data, Tag: "syslog.0"}, []string{"syslog", "0"}, template, s)
assert.EqualError(t, err, "item6: "+ErrMissingSubName.Error(), "the wrong error was returned")
assert.Empty(t, s.buf.String())

s.buf.Reset()
template = testTemplate("$(tag[9]).stuff")
_, err = parseDataMapTags(&Event{Record: data, Tag: "syslog.0"}, []string{"syslog", "0"}, template, s)
assert.EqualError(t, err, "tag[9]: "+ErrNoTagValue.Error(), "the wrong error was returned")
assert.Empty(t, s.buf.String())
}

func TestSanitizeGroup(t *testing.T) {
Expand Down
14 changes: 14 additions & 0 deletions fluent-bit-cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,26 @@ func getConfiguration(ctx unsafe.Pointer, pluginID int) cloudwatch.OutputPluginC
config.LogGroupName = output.FLBPluginConfigKey(ctx, "log_group_name")
logrus.Infof("[cloudwatch %d] plugin parameter log_group_name = '%s'", pluginID, config.LogGroupName)

config.DefaultLogGroupName = output.FLBPluginConfigKey(ctx, "default_log_group_name")
if config.DefaultLogGroupName == "" {
config.DefaultLogGroupName = "fluentbit-default"
}

logrus.Infof("[cloudwatch %d] plugin parameter default_log_group_name = '%s'", pluginID, config.DefaultLogGroupName)

config.LogStreamPrefix = output.FLBPluginConfigKey(ctx, "log_stream_prefix")
logrus.Infof("[cloudwatch %d] plugin parameter log_stream_prefix = '%s'", pluginID, config.LogStreamPrefix)

config.LogStreamName = output.FLBPluginConfigKey(ctx, "log_stream_name")
logrus.Infof("[cloudwatch %d] plugin parameter log_stream_name = '%s'", pluginID, config.LogStreamName)

config.DefaultLogStreamName = output.FLBPluginConfigKey(ctx, "default_log_stream_name")
if config.DefaultLogStreamName == "" {
config.DefaultLogStreamName = "/fluentbit-default"
}

logrus.Infof("[cloudwatch %d] plugin parameter default_log_stream_name = '%s'", pluginID, config.DefaultLogStreamName)

config.Region = output.FLBPluginConfigKey(ctx, "region")
logrus.Infof("[cloudwatch %d] plugin parameter region = '%s'", pluginID, config.Region)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.12

require (
github.com/aws/amazon-kinesis-firehose-for-fluent-bit v1.4.2
github.com/aws/aws-sdk-go v1.35.14
github.com/aws/aws-sdk-go v1.34.27
github.com/fluent/fluent-bit-go v0.0.0-20200707230002-2a28684e2382
github.com/golang/mock v1.4.4
github.com/json-iterator/go v1.1.10
Expand Down
18 changes: 16 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
github.com/aws/amazon-kinesis-firehose-for-fluent-bit v1.4.1 h1:Vq+RfIAuyzw5t1J4sts7rZdBLRWm2SB0r79OQsvdk40=
github.com/aws/amazon-kinesis-firehose-for-fluent-bit v1.4.1/go.mod h1:2hixxBh6Xygvxe6x/PSUTphh8dZ0WptVty98ftZIhAU=
github.com/aws/amazon-kinesis-firehose-for-fluent-bit v1.4.2 h1:cxszZVyfBNmT5fgi5ZAILh5TjJke602txB3X7AcWoBc=
github.com/aws/amazon-kinesis-firehose-for-fluent-bit v1.4.2/go.mod h1:xKkwx++juNoJMl0mm2V1+CJ+cKK22lOUEvxIwebe6hc=
github.com/aws/aws-sdk-go v1.33.14 h1:ucjyVEvtIdtn4acf+RKsgk6ybAYeMLXpGZeqoVvi7Kk=
github.com/aws/aws-sdk-go v1.33.14/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.33.17 h1:vngPRchZs603qLtJH7lh2pBCDqiFxA9+9nDWJ5WYJ5A=
github.com/aws/aws-sdk-go v1.33.17/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.34.0 h1:brux2dRrlwCF5JhTL7MUT3WUwo9zfDHZZp3+g3Mvlmo=
github.com/aws/aws-sdk-go v1.34.0/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.34.6 h1:2aPXQGkR6xeheN5dns13mSoDWeUlj4wDmfZ+8ZDHauw=
github.com/aws/aws-sdk-go v1.34.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.34.12 h1:7UbBEYDUa4uW0YmRnOd806MS1yoJMcaodBWDzvBShAI=
github.com/aws/aws-sdk-go v1.34.12/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.34.13 h1:wwNWSUh4FGJxXVOVVNj2lWI8wTe5hK8sGWlK7ziEcgg=
github.com/aws/aws-sdk-go v1.34.13/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.34.22/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.35.14 h1:nucVVXXjAr9UkmYCBWxQWRuYa5KOlaXjuJGg2ulW0K0=
github.com/aws/aws-sdk-go v1.35.14/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k=
github.com/aws/aws-sdk-go v1.34.27 h1:qBqccUrlz43Zermh0U1O502bHYZsgMlBm+LUVabzBPA=
github.com/aws/aws-sdk-go v1.34.27/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down