Skip to content

Commit

Permalink
Re-implement the option as a string
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhonghui Hu committed Oct 9, 2020
1 parent e5b41c7 commit 87c4163
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 20 deletions.
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ If you think you’ve found a potential security issue, please do not post it in
* `experimental_concurrency_retries`: Specify a limit to the number of retries concurrent goroutines will attempt. By default `4` retries will be attempted before records are dropped.
* `aggregation`: Setting `aggregation` to `true` will enable KPL aggregation of records sent to Kinesis. This feature isn't compatible with the `partition_key` feature. See the KPL aggregation section below for more details.
* `compression`: Setting `compression` to `zlib` will enable zlib compression of each record. By default this feature is disabled and records are not compressed.
* `replace_dots`: If you set replace_dots as true, all dots in key names will be replaced with underscores.
* `replace_dots`: A string which will be what you replace the dots in the key name with. By default, it will be empty which means the feature is disabled.

### Permissions

Expand Down Expand Up @@ -58,7 +58,7 @@ This plugin has been tested with Fluent Bit 1.2.0+. It may not work with older F
stream my-kinesis-stream-name
partition_key container_id
append_newline true
replace_dots true
replace_dots _
```

### AWS for Fluent Bit
Expand Down Expand Up @@ -131,7 +131,6 @@ More specifically, increasing the flush value will ensure the most records are a
stream my-kinesis-stream-name
aggregation true
append_newline true
replace_dots true
```

### ZLIB Compression
Expand Down Expand Up @@ -167,5 +166,4 @@ Example config:
stream my-kinesis-stream-name
compression zlib
append_newline true
replace_dots true
```
9 changes: 4 additions & 5 deletions fluent-bit-kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
compression := output.FLBPluginConfigKey(ctx, "compression")
logrus.Infof("[kinesis %d] plugin parameter compression = '%s'", pluginID, compression)
replaceDots := output.FLBPluginConfigKey(ctx, "replace_dots")
logrus.Infof("[kinesis %d] plugin parameter replace_dots = %s", pluginID, replaceDots)
logrus.Infof("[kinesis %d] plugin parameter replace_dots = '%s'", pluginID, replaceDots)

if stream == "" || region == "" {
return nil, fmt.Errorf("[kinesis %d] stream and region are required configuration parameters", pluginID)
Expand All @@ -107,9 +107,8 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
appendNL = true
}

replaceDotsInKeys := false
if strings.ToLower(replaceDots) == "true" {
replaceDotsInKeys = true
if replaceDots == "" {
logrus.Infof("[kinesis %d] No replacement provided. Do not need to replace dots with other symbols.", pluginID)
}

isAggregate := false
Expand Down Expand Up @@ -163,7 +162,7 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
return nil, fmt.Errorf("[kinesis %d] Invalid 'compression' value (%s) specified, must be 'zlib', 'none', or undefined", pluginID, compression)
}

return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, concurrencyInt, concurrencyRetriesInt, isAggregate, appendNL, replaceDotsInKeys, comp, pluginID)
return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, replaceDots, concurrencyInt, concurrencyRetriesInt, isAggregate, appendNL, comp, pluginID)
}

// The "export" comments have syntactic meaning
Expand Down
16 changes: 8 additions & 8 deletions kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ type OutputPlugin struct {
aggregator *aggregate.Aggregator
aggregatePartitionKey string
compression CompressionType
// Decide whether dots in key names should be replaced with underscores
replaceDots bool
// If specified, dots in key names should be replaced with other symbols
replaceDots string
}

// NewOutputPlugin creates an OutputPlugin object
func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeFmt, logKey string, concurrency, retryLimit int, isAggregate, appendNewline, replaceDots bool, compression CompressionType, pluginID int) (*OutputPlugin, error) {
func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeFmt, logKey, replaceDots string, concurrency, retryLimit int, isAggregate, appendNewline bool, compression CompressionType, pluginID int) (*OutputPlugin, error) {
client, err := newPutRecordsClient(roleARN, region, kinesisEndpoint, stsEndpoint)
if err != nil {
return nil, err
Expand Down Expand Up @@ -379,17 +379,17 @@ func (outputPlugin *OutputPlugin) FlushConcurrent(count int, records []*kinesis.

}

func replaceDots(obj map[interface{}]interface{}) map[interface{}]interface{} {
func replaceDots(obj map[interface{}]interface{}, replacement string) map[interface{}]interface{} {
for k, v := range obj {
var curK = k
switch kt := k.(type) {
case string:
curK = strings.ReplaceAll(kt, ".", "_")
curK = strings.ReplaceAll(kt, ".", replacement)
}
delete(obj, k)
switch vt := v.(type) {
case map[interface{}]interface{}:
v = replaceDots(vt)
v = replaceDots(vt, replacement)
}

obj[curK] = v
Expand All @@ -410,8 +410,8 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface
return nil, err
}

if outputPlugin.replaceDots {
record = replaceDots(record)
if outputPlugin.replaceDots != "" {
record = replaceDots(record, outputPlugin.replaceDots)
}

var json = jsoniter.ConfigCompatibleWithStandardLibrary
Expand Down
13 changes: 10 additions & 3 deletions kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func newMockOutputPlugin(client *mock_kinesis.MockPutRecordsClient, isAggregate
concurrencyRetryLimit: concurrencyRetryLimit,
isAggregate: isAggregate,
aggregator: aggregator,
replaceDots: true,
replaceDots: "-",
}, nil
}

Expand Down Expand Up @@ -210,7 +210,11 @@ func TestZlibCompressionEmpty(t *testing.T) {
func TestDotReplace(t *testing.T) {
records := make([]*kinesis.PutRecordsRequestEntry, 0, 500)
record := map[interface{}]interface{}{
"testkey": []byte("test value"),
"message.key": map[interface{}]interface{}{
"messagevalue": []byte("some.message"),
"message.value/one": []byte("some message"),
"message.value/two": []byte("some message"),
},
"kubernetes": map[interface{}]interface{}{
"app": []byte("test app label"),
"app.kubernetes.io/name": []byte("test key with dots"),
Expand All @@ -230,5 +234,8 @@ func TestDotReplace(t *testing.T) {
json.Unmarshal(data, &log)

assert.Equal(t, "test app label", log["kubernetes"]["app"])
assert.Equal(t, "test key with dots", log["kubernetes"]["app_kubernetes_io/name"])
assert.Equal(t, "test key with dots", log["kubernetes"]["app-kubernetes-io/name"])
assert.Equal(t, "some.message", log["message-key"]["messagevalue"])
assert.Equal(t, "some message", log["message-key"]["message-value/one"])
assert.Equal(t, "some message", log["message-key"]["message-value/two"])
}

0 comments on commit 87c4163

Please sign in to comment.