From 08d4b601ca0a123ee3accd7701599eaf1f3793ae Mon Sep 17 00:00:00 2001 From: Zhonghui Hu <“hzhonghu@amazon.com”> Date: Mon, 12 Oct 2020 14:25:43 -0700 Subject: [PATCH] Replace Dots in Key names with other symbols --- README.md | 2 ++ firehose/firehose.go | 28 +++++++++++++++++++++++- firehose/firehose_test.go | 46 +++++++++++++++++++++++++++++++++++++++ fluent-bit-firehose.go | 8 ++++++- 4 files changed, 82 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 143f470..8f22b3c 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ Run `make` to build `./bin/firehose.so`. Then use with Fluent Bit: * `sts_endpoint`: Specify a custom endpoint for the STS API; used to assume your custom role provided with `role_arn`. * `time_key`: Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis. * `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`. You can also use `%L` for milliseconds and `%f` for microseconds. If you are using ECS FireLens, make sure you are running Amazon ECS Container Agent v1.42.0 or later, otherwise the timestamps associated with your container logs will only have second precision. +* `replace_dots`: A string which will be what you replace the dots in the key name with. By default, it will be empty which means the feature is disabled. ### Permissions @@ -59,6 +60,7 @@ This plugin has been tested with Fluent Bit 1.2.0+. It may not work with older F Match * region us-west-2 delivery_stream my-stream + replace_dots _ ``` ### AWS for Fluent Bit diff --git a/firehose/firehose.go b/firehose/firehose.go index a1eb5dc..38f863a 100644 --- a/firehose/firehose.go +++ b/firehose/firehose.go @@ -19,6 +19,7 @@ import ( "fmt" "os" "time" + "strings" "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" "github.com/aws/aws-sdk-go/aws" @@ -64,10 +65,11 @@ type OutputPlugin struct { dataLength int timer *plugins.Timeout PluginID int + replaceDots string } // NewOutputPlugin creates an OutputPlugin object -func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeFmt, logKey string, pluginID int) (*OutputPlugin, error) { +func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeFmt, logKey, replaceDots string, pluginID int) (*OutputPlugin, error) { client, err := newPutRecordBatcher(roleARN, region, firehoseEndpoint, stsEndpoint) if err != nil { return nil, err @@ -108,6 +110,7 @@ func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint fmtStrftime: timeFormatter, logKey: logKey, PluginID: pluginID, + replaceDots: replaceDots, }, nil } @@ -194,6 +197,25 @@ func (output *OutputPlugin) Flush() int { return retCode } +func replaceDots(obj map[interface{}]interface{}, replacement string) map[interface{}]interface{} { + for k, v := range obj { + var curK = k + switch kt := k.(type) { + case string: + curK = strings.ReplaceAll(kt, ".", replacement) + } + delete(obj, k) + switch vt := v.(type) { + case map[interface{}]interface{}: + v = replaceDots(vt, replacement) + } + + obj[curK] = v + } + + return obj +} + func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([]byte, error) { if output.dataKeys != "" { record = plugins.DataKeys(output.dataKeys, record) @@ -206,6 +228,10 @@ func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([ return nil, err } + if output.replaceDots != "" { + record = replaceDots(record, output.replaceDots) + } + var json = jsoniter.ConfigCompatibleWithStandardLibrary var data []byte diff --git a/firehose/firehose_test.go b/firehose/firehose_test.go index 38cd22e..befc9d5 100644 --- a/firehose/firehose_test.go +++ b/firehose/firehose_test.go @@ -14,6 +14,7 @@ package firehose import ( + "encoding/json" "os" "testing" "time" @@ -143,3 +144,48 @@ func TestSendCurrentBatch(t *testing.T) { assert.Nil(t, err) } + +func TestDotReplace(t *testing.T) { + timer, _ := plugins.NewTimeout(func(d time.Duration) { + logrus.Errorf("[firehose] timeout threshold reached: Failed to send logs for %v\n", d) + logrus.Error("[firehose] Quitting Fluent Bit") + os.Exit(1) + }) + output := OutputPlugin{ + region: "us-east-1", + deliveryStream: "stream", + dataKeys: "", + client: nil, + records: make([]*firehose.Record, 0, 500), + timer: timer, + replaceDots: "-", + } + + record := map[interface{}]interface{}{ + "message.key": map[interface{}]interface{}{ + "messagevalue": []byte("some.message"), + "message.value/one": []byte("some message"), + "message.value/two": []byte("some message"), + }, + "kubernetes": map[interface{}]interface{}{ + "app": []byte("test app label"), + "app.kubernetes.io/name": []byte("test key with dots"), + }, + } + + timeStamp := time.Now() + retCode := output.AddRecord(record, &timeStamp) + assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") + assert.Len(t, output.records, 1, "Expected output to contain 1 record") + + data := output.records[0].Data + + var log map[string]map[string]interface{} + json.Unmarshal(data, &log) + + assert.Equal(t, "test app label", log["kubernetes"]["app"]) + assert.Equal(t, "test key with dots", log["kubernetes"]["app-kubernetes-io/name"]) + assert.Equal(t, "some.message", log["message-key"]["messagevalue"]) + assert.Equal(t, "some message", log["message-key"]["message-value/one"]) + assert.Equal(t, "some message", log["message-key"]["message-value/two"]) +} diff --git a/fluent-bit-firehose.go b/fluent-bit-firehose.go index a7f3a46..5e02316 100644 --- a/fluent-bit-firehose.go +++ b/fluent-bit-firehose.go @@ -76,12 +76,18 @@ func newFirehoseOutput(ctx unsafe.Pointer, pluginID int) (*firehose.OutputPlugin logrus.Infof("[firehose %d] plugin parameter time_key_format = '%s'", pluginID, timeKeyFmt) logKey := output.FLBPluginConfigKey(ctx, "log_key") logrus.Infof("[firehose %d] plugin parameter log_key = '%s'", pluginID, logKey) + replaceDots := output.FLBPluginConfigKey(ctx, "replace_dots") + logrus.Infof("[firehose %d] plugin parameter replace_dots = '%s'", pluginID, replaceDots) if deliveryStream == "" || region == "" { return nil, fmt.Errorf("[firehose %d] delivery_stream and region are required configuration parameters", pluginID) } - return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, pluginID) + if replaceDots == "" { + logrus.Infof("[firehose %d] No replacement provided. Do not need to replace dots with other symbols.", pluginID) + } + + return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, replaceDots, pluginID) } //export FLBPluginInit