Skip to content

Commit

Permalink
Replace Dots in Key names with other symbols
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhonghui Hu committed Oct 13, 2020
1 parent eb900d9 commit 550c9bb
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 2 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Run `make` to build `./bin/firehose.so`. Then use with Fluent Bit:
* `sts_endpoint`: Specify a custom endpoint for the STS API; used to assume your custom role provided with `role_arn`.
* `time_key`: Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis.
* `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`. You can also use `%L` for milliseconds and `%f` for microseconds. If you are using ECS FireLens, make sure you are running Amazon ECS Container Agent v1.42.0 or later, otherwise the timestamps associated with your container logs will only have second precision.
* `replace_dots`: Replace dot characters in key names with the value of this option. For example, if you add replace_dots _ in your config then all occurrences of . will be replaced with an underscore. By default, dots will not be replaced.

### Permissions

Expand Down Expand Up @@ -59,6 +60,7 @@ This plugin has been tested with Fluent Bit 1.2.0+. It may not work with older F
Match *
region us-west-2
delivery_stream my-stream
replace_dots _
```

### AWS for Fluent Bit
Expand Down
28 changes: 27 additions & 1 deletion firehose/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"fmt"
"os"
"strings"
"time"

"github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins"
Expand Down Expand Up @@ -64,10 +65,11 @@ type OutputPlugin struct {
dataLength int
timer *plugins.Timeout
PluginID int
replaceDots string
}

// NewOutputPlugin creates an OutputPlugin object
func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeFmt, logKey string, pluginID int) (*OutputPlugin, error) {
func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeFmt, logKey, replaceDots string, pluginID int) (*OutputPlugin, error) {
client, err := newPutRecordBatcher(roleARN, region, firehoseEndpoint, stsEndpoint, pluginID)
if err != nil {
return nil, err
Expand Down Expand Up @@ -108,6 +110,7 @@ func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint
fmtStrftime: timeFormatter,
logKey: logKey,
PluginID: pluginID,
replaceDots: replaceDots,
}, nil
}

Expand Down Expand Up @@ -221,6 +224,25 @@ func (output *OutputPlugin) Flush() int {
return retCode
}

func replaceDots(obj map[interface{}]interface{}, replacement string) map[interface{}]interface{} {
for k, v := range obj {
var curK = k
switch kt := k.(type) {
case string:
curK = strings.ReplaceAll(kt, ".", replacement)
}
delete(obj, k)
switch vt := v.(type) {
case map[interface{}]interface{}:
v = replaceDots(vt, replacement)
}

obj[curK] = v
}

return obj
}

func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([]byte, error) {
if output.dataKeys != "" {
record = plugins.DataKeys(output.dataKeys, record)
Expand All @@ -233,6 +255,10 @@ func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([
return nil, err
}

if output.replaceDots != "" {
record = replaceDots(record, output.replaceDots)
}

var json = jsoniter.ConfigCompatibleWithStandardLibrary
var data []byte

Expand Down
46 changes: 46 additions & 0 deletions firehose/firehose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package firehose

import (
"encoding/json"
"os"
"testing"
"time"
Expand Down Expand Up @@ -143,3 +144,48 @@ func TestSendCurrentBatch(t *testing.T) {
assert.Nil(t, err)

}

func TestDotReplace(t *testing.T) {
timer, _ := plugins.NewTimeout(func(d time.Duration) {
logrus.Errorf("[firehose] timeout threshold reached: Failed to send logs for %v\n", d)
logrus.Error("[firehose] Quitting Fluent Bit")
os.Exit(1)
})
output := OutputPlugin{
region: "us-east-1",
deliveryStream: "stream",
dataKeys: "",
client: nil,
records: make([]*firehose.Record, 0, 500),
timer: timer,
replaceDots: "-",
}

record := map[interface{}]interface{}{
"message.key": map[interface{}]interface{}{
"messagevalue": []byte("some.message"),
"message.value/one": []byte("some message"),
"message.value/two": []byte("some message"),
},
"kubernetes": map[interface{}]interface{}{
"app": []byte("test app label"),
"app.kubernetes.io/name": []byte("test key with dots"),
},
}

timeStamp := time.Now()
retCode := output.AddRecord(record, &timeStamp)
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK")
assert.Len(t, output.records, 1, "Expected output to contain 1 record")

data := output.records[0].Data

var log map[string]map[string]interface{}
json.Unmarshal(data, &log)

assert.Equal(t, "test app label", log["kubernetes"]["app"])
assert.Equal(t, "test key with dots", log["kubernetes"]["app-kubernetes-io/name"])
assert.Equal(t, "some.message", log["message-key"]["messagevalue"])
assert.Equal(t, "some message", log["message-key"]["message-value/one"])
assert.Equal(t, "some message", log["message-key"]["message-value/two"])
}
4 changes: 3 additions & 1 deletion fluent-bit-firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,14 @@ func newFirehoseOutput(ctx unsafe.Pointer, pluginID int) (*firehose.OutputPlugin
logrus.Infof("[firehose %d] plugin parameter time_key_format = '%s'", pluginID, timeKeyFmt)
logKey := output.FLBPluginConfigKey(ctx, "log_key")
logrus.Infof("[firehose %d] plugin parameter log_key = '%s'", pluginID, logKey)
replaceDots := output.FLBPluginConfigKey(ctx, "replace_dots")
logrus.Infof("[firehose %d] plugin parameter replace_dots = '%s'", pluginID, replaceDots)

if deliveryStream == "" || region == "" {
return nil, fmt.Errorf("[firehose %d] delivery_stream and region are required configuration parameters", pluginID)
}

return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, pluginID)
return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, replaceDots, pluginID)
}

//export FLBPluginInit
Expand Down

0 comments on commit 550c9bb

Please sign in to comment.