Skip to content

Commit

Permalink
Add time_key and time_key_format config options to add timestamp to r…
Browse files Browse the repository at this point in the history
…ecords
  • Loading branch information
PettitWesley authored and hossain-rayhan committed Mar 3, 2020
1 parent e2a2962 commit 2ee6017
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 7 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ If you think you’ve found a potential security issue, please do not post it in
* `role_arn`: ARN of an IAM role to assume (for cross account access).
* `endpoint`: Specify a custom endpoint for the Kinesis Streams API.
* `append_newline`: If you set append_newline as true, a newline will be addded after each log record.
* `time_key`: Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis.
* `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`.

### Permissions

Expand Down
24 changes: 21 additions & 3 deletions fluent-bit-kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"C"
"fmt"
"strings"
"time"
"unsafe"

"github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins"
Expand Down Expand Up @@ -61,6 +62,10 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
logrus.Infof("[kinesis %d] plugin parameter endpoint = '%s'", pluginID, endpoint)
appendNewline := output.FLBPluginConfigKey(ctx, "append_newline")
logrus.Infof("[kinesis %d] plugin parameter append_newline = %s", pluginID, appendNewline)
timeKey := output.FLBPluginConfigKey(ctx, "time_key")
logrus.Infof("[firehose %d] plugin parameter time_key = '%s'\n", pluginID, timeKey)
timeKeyFmt := output.FLBPluginConfigKey(ctx, "time_key_format")
logrus.Infof("[firehose %d] plugin parameter time_key_format = '%s'\n", pluginID, timeKeyFmt)

if stream == "" || region == "" {
return nil, fmt.Errorf("[kinesis %d] stream and region are required configuration parameters", pluginID)
Expand All @@ -78,7 +83,7 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
if strings.ToLower(appendNewline) == "true" {
appendNL = true
}
return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, appendNL, pluginID)
return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeKeyFmt, appendNL, pluginID)
}

// The "export" comments have syntactic meaning
Expand All @@ -104,6 +109,8 @@ func FLBPluginInit(ctx unsafe.Pointer) int {
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int {
var count int
var ret int
var ts interface{}
var timestamp time.Time
var record map[interface{}]interface{}

// Create Fluent Bit decoder
Expand All @@ -115,12 +122,23 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int

for {
//Extract Record
ret, _, record = output.GetRecord(dec)
ret, ts, record = output.GetRecord(dec)
if ret != 0 {
break
}

retCode := kinesisOutput.AddRecord(record)
switch tts := ts.(type) {
case output.FLBTime:
timestamp = tts.Time
case uint64:
// when ts is of type uint64 it appears to
// be the amount of seconds since unix epoch.
timestamp = time.Unix(int64(tts), 0)
default:
timestamp = time.Now()
}

retCode := kinesisOutput.AddRecord(record, &timestamp)
if retCode != output.FLB_OK {
return retCode
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c
github.com/golang/mock v1.3.1
github.com/json-iterator/go v1.1.7
github.com/lestrrat-go/strftime v1.0.1
github.com/sirupsen/logrus v1.4.2
github.com/stretchr/testify v1.3.0
)
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ github.com/json-iterator/go v1.1.7 h1:KfgG9LzI+pYjr4xvmz/5H4FXjokeP+rlHLhv3iH62F
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8=
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is=
github.com/lestrrat-go/strftime v1.0.1 h1:o7qz5pmLzPDLyGW4lG6JvTKPUfTFXwe+vOamIYWtnVU=
github.com/lestrrat-go/strftime v1.0.1/go.mod h1:E1nN3pCbtMSu1yjSVeyuRFVm/U0xoR76fd03sz+Qz4g=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
Expand Down
37 changes: 35 additions & 2 deletions kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package kinesis

import (
"bytes"
"fmt"
"math/rand"
"os"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/aws/aws-sdk-go/service/kinesis"
fluentbit "github.com/fluent/fluent-bit-go/output"
jsoniter "github.com/json-iterator/go"
"github.com/lestrrat-go/strftime"
"github.com/sirupsen/logrus"
)

Expand All @@ -47,6 +49,11 @@ const (
partitionKeyMaxLength = 256
)

const (
// We use strftime format specifiers because this will one day be re-written in C
defaultTimeFmt = "%Y-%m-%dT%H:%M:%S"
)

// PutRecordsClient contains the kinesis PutRecords method call
type PutRecordsClient interface {
PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error)
Expand All @@ -69,6 +76,8 @@ type OutputPlugin struct {
partitionKey string
// Decides whether to append a newline after each data record
appendNewline bool
timeKey string
fmtStrftime *strftime.Strftime
lastInvalidPartitionKeyIndex int
client PutRecordsClient
records []*kinesis.PutRecordsRequestEntry
Expand All @@ -80,7 +89,7 @@ type OutputPlugin struct {
}

// NewOutputPlugin creates an OutputPlugin object
func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint string, appendNewline bool, pluginID int) (*OutputPlugin, error) {
func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeFmt string, appendNewline bool, pluginID int) (*OutputPlugin, error) {
client, err := newPutRecordsClient(roleARN, region, endpoint)
if err != nil {
return nil, err
Expand All @@ -103,13 +112,27 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint s
buffer: make([]byte, 8),
}

var timeFormatter *strftime.Strftime
if timeKey != "" {
if timeFmt == "" {
timeFmt = defaultTimeFmt
}
timeFormatter, err = strftime.New(timeFmt)
if err != nil {
logrus.Errorf("[kinesis %d] Issue with strftime format in 'time_key_format'", pluginID)
return nil, err
}
}

return &OutputPlugin{
stream: stream,
client: client,
records: records,
dataKeys: dataKeys,
partitionKey: partitionKey,
appendNewline: appendNewline,
timeKey: timeKey,
fmtStrftime: timeFormatter,
lastInvalidPartitionKeyIndex: -1,
backoff: plugins.NewBackoff(),
timer: timer,
Expand Down Expand Up @@ -154,7 +177,17 @@ func newPutRecordsClient(roleARN string, awsRegion string, endpoint string) (*ki
// AddRecord accepts a record and adds it to the buffer, flushing the buffer if it is full
// the return value is one of: FLB_OK FLB_RETRY
// API Errors lead to an FLB_RETRY, and data processing errors are logged, the record is discarded and FLB_OK is returned
func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}) int {
func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, timeStamp *time.Time) int {
if outputPlugin.timeKey != "" {
buf := new(bytes.Buffer)
err := outputPlugin.fmtStrftime.Format(buf, *timeStamp)
if err != nil {
logrus.Errorf("[kinesis %d] Could not create timestamp %v\n", outputPlugin.PluginID, err)
return fluentbit.FLB_ERROR
}
record[outputPlugin.timeKey] = buf.String()
}

data, err := outputPlugin.processRecord(record)
if err != nil {
logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err)
Expand Down
6 changes: 4 additions & 2 deletions kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func TestAddRecord(t *testing.T) {

outputPlugin, _ := newMockOutputPlugin(nil)

retCode := outputPlugin.AddRecord(record)
timeStamp := time.Now()
retCode := outputPlugin.AddRecord(record, &timeStamp)
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK")
assert.Len(t, outputPlugin.records, 1, "Expected output to contain 1 record")
}
Expand All @@ -93,7 +94,8 @@ func TestAddRecordAndFlush(t *testing.T) {

outputPlugin, _ := newMockOutputPlugin(mockKinesis)

retCode := outputPlugin.AddRecord(record)
timeStamp := time.Now()
retCode := outputPlugin.AddRecord(record, &timeStamp)
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK")

err := outputPlugin.Flush()
Expand Down

0 comments on commit 2ee6017

Please sign in to comment.