diff --git a/README.md b/README.md index 143f470..4a086a1 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ Run `make` to build `./bin/firehose.so`. Then use with Fluent Bit: * `sts_endpoint`: Specify a custom endpoint for the STS API; used to assume your custom role provided with `role_arn`. * `time_key`: Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis. * `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`. You can also use `%L` for milliseconds and `%f` for microseconds. If you are using ECS FireLens, make sure you are running Amazon ECS Container Agent v1.42.0 or later, otherwise the timestamps associated with your container logs will only have second precision. +* `replace_dots`: Replace dot characters in key names with the value of this option. For example, if you add `replace_dots _` in your config then all occurrences of `.` will be replaced with an underscore. By default, dots will not be replaced. ### Permissions @@ -59,6 +60,7 @@ This plugin has been tested with Fluent Bit 1.2.0+. It may not work with older F Match * region us-west-2 delivery_stream my-stream + replace_dots _ ``` ### AWS for Fluent Bit diff --git a/firehose/firehose.go b/firehose/firehose.go index 0946d56..21aada5 100644 --- a/firehose/firehose.go +++ b/firehose/firehose.go @@ -18,6 +18,7 @@ import ( "bytes" "fmt" "os" + "strings" "time" "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" @@ -64,10 +65,11 @@ type OutputPlugin struct { dataLength int timer *plugins.Timeout PluginID int + replaceDots string } // NewOutputPlugin creates an OutputPlugin object -func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeFmt, logKey string, pluginID int) (*OutputPlugin, error) { +func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeFmt, logKey, replaceDots string, pluginID int) (*OutputPlugin, error) { client, err := newPutRecordBatcher(roleARN, region, firehoseEndpoint, stsEndpoint, pluginID) if err != nil { return nil, err @@ -108,6 +110,7 @@ func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint fmtStrftime: timeFormatter, logKey: logKey, PluginID: pluginID, + replaceDots: replaceDots, }, nil } @@ -221,6 +224,25 @@ func (output *OutputPlugin) Flush() int { return retCode } +func replaceDots(obj map[interface{}]interface{}, replacement string) map[interface{}]interface{} { + for k, v := range obj { + var curK = k + switch kt := k.(type) { + case string: + curK = strings.ReplaceAll(kt, ".", replacement) + } + delete(obj, k) + switch vt := v.(type) { + case map[interface{}]interface{}: + v = replaceDots(vt, replacement) + } + + obj[curK] = v + } + + return obj +} + func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([]byte, error) { if output.dataKeys != "" { record = plugins.DataKeys(output.dataKeys, record) @@ -233,6 +255,10 @@ func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([ return nil, err } + if output.replaceDots != "" { + record = replaceDots(record, output.replaceDots) + } + var json = jsoniter.ConfigCompatibleWithStandardLibrary var data []byte diff --git a/firehose/firehose_test.go b/firehose/firehose_test.go index 38cd22e..befc9d5 100644 --- a/firehose/firehose_test.go +++ b/firehose/firehose_test.go @@ -14,6 +14,7 @@ package firehose import ( + "encoding/json" "os" "testing" "time" @@ -143,3 +144,48 @@ func TestSendCurrentBatch(t *testing.T) { assert.Nil(t, err) } + +func TestDotReplace(t *testing.T) { + timer, _ := plugins.NewTimeout(func(d time.Duration) { + logrus.Errorf("[firehose] timeout threshold reached: Failed to send logs for %v\n", d) + logrus.Error("[firehose] Quitting Fluent Bit") + os.Exit(1) + }) + output := OutputPlugin{ + region: "us-east-1", + deliveryStream: "stream", + dataKeys: "", + client: nil, + records: make([]*firehose.Record, 0, 500), + timer: timer, + replaceDots: "-", + } + + record := map[interface{}]interface{}{ + "message.key": map[interface{}]interface{}{ + "messagevalue": []byte("some.message"), + "message.value/one": []byte("some message"), + "message.value/two": []byte("some message"), + }, + "kubernetes": map[interface{}]interface{}{ + "app": []byte("test app label"), + "app.kubernetes.io/name": []byte("test key with dots"), + }, + } + + timeStamp := time.Now() + retCode := output.AddRecord(record, &timeStamp) + assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") + assert.Len(t, output.records, 1, "Expected output to contain 1 record") + + data := output.records[0].Data + + var log map[string]map[string]interface{} + json.Unmarshal(data, &log) + + assert.Equal(t, "test app label", log["kubernetes"]["app"]) + assert.Equal(t, "test key with dots", log["kubernetes"]["app-kubernetes-io/name"]) + assert.Equal(t, "some.message", log["message-key"]["messagevalue"]) + assert.Equal(t, "some message", log["message-key"]["message-value/one"]) + assert.Equal(t, "some message", log["message-key"]["message-value/two"]) +} diff --git a/fluent-bit-firehose.go b/fluent-bit-firehose.go index a7f3a46..b13b3f8 100644 --- a/fluent-bit-firehose.go +++ b/fluent-bit-firehose.go @@ -76,12 +76,14 @@ func newFirehoseOutput(ctx unsafe.Pointer, pluginID int) (*firehose.OutputPlugin logrus.Infof("[firehose %d] plugin parameter time_key_format = '%s'", pluginID, timeKeyFmt) logKey := output.FLBPluginConfigKey(ctx, "log_key") logrus.Infof("[firehose %d] plugin parameter log_key = '%s'", pluginID, logKey) + replaceDots := output.FLBPluginConfigKey(ctx, "replace_dots") + logrus.Infof("[firehose %d] plugin parameter replace_dots = '%s'", pluginID, replaceDots) if deliveryStream == "" || region == "" { return nil, fmt.Errorf("[firehose %d] delivery_stream and region are required configuration parameters", pluginID) } - return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, pluginID) + return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, replaceDots, pluginID) } //export FLBPluginInit