From 8b9e649a45d82e15c7d1001f5bac2d1c527d130d Mon Sep 17 00:00:00 2001 From: "Rayhan Hossain (Mukla.C)" Date: Wed, 16 Oct 2019 17:20:38 -0700 Subject: [PATCH] Implementation of the amazon kinesis data streams fluent bit plugin (#1) * .gitignore file added * Implementation of the fluent-bit kinesis streams output plugin * unit tests added for the fluent-bit kinesis streams output plugin * config value added for appending newline after each log * Added table driven test, Makefile updated, and addressed some PR feedback * README file updated --- .gitignore | 15 ++ Makefile | 44 +++++ README.md | 52 +++++- fluent-bit-kinesis.go | 156 ++++++++++++++++ go.mod | 13 ++ go.sum | 47 +++++ kinesis/generate_mock.go | 16 ++ kinesis/kinesis.go | 332 +++++++++++++++++++++++++++++++++++ kinesis/kinesis_test.go | 101 +++++++++++ kinesis/mock_kinesis/mock.go | 63 +++++++ scripts/mockgen.sh | 44 +++++ 11 files changed, 878 insertions(+), 5 deletions(-) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 fluent-bit-kinesis.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 kinesis/generate_mock.go create mode 100644 kinesis/kinesis.go create mode 100644 kinesis/kinesis_test.go create mode 100644 kinesis/mock_kinesis/mock.go create mode 100755 scripts/mockgen.sh diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8f41087 --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# build output dir +bin + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..d570bce --- /dev/null +++ b/Makefile @@ -0,0 +1,44 @@ +# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +ROOT := $(shell pwd) + +all: build + +SCRIPT_PATH := $(ROOT)/scripts/:${PATH} +SOURCES := $(shell find . -name '*.go') +PLUGIN_BINARY := ./bin/kinesis.so + +.PHONY: release +release: + mkdir -p ./bin + go build -buildmode c-shared -o ./bin/kinesis.so ./ + @echo "Built Amazon Kinesis Data Streams Fluent Bit Plugin" + +.PHONY: build +build: $(PLUGIN_BINARY) release + +$(PLUGIN_BINARY): $(SOURCES) + PATH=${PATH} golint ./kinesis + +.PHONY: generate +generate: $(SOURCES) + PATH=$(SCRIPT_PATH) go generate ./... + +.PHONY: test +test: + go test -timeout=120s -v -cover ./... + +.PHONY: clean +clean: + rm -rf ./bin/* diff --git a/README.md b/README.md index d0471f3..2794561 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,51 @@ -## Fluent Bit Plugin for Kinesis Streams +## Fluent Bit Plugin for Amazon Kinesis Data Streams -A Fluent Bit output plugin for Kinesis Streams +A Fluent Bit output plugin for Amazon Kinesis Data Streams. -# Under Development +#### Security disclosures -## License +If you think you’ve found a potential security issue, please do not post it in the Issues. Instead, please follow the instructions [here](https://aws.amazon.com/security/vulnerability-reporting/) or email AWS security directly at [aws-security@amazon.com](mailto:aws-security@amazon.com). -This library is licensed under the Apache 2.0 License. +### Plugin Options + +* `region`: The region which your Kinesis Data Stream is in. +* `stream`: The name of the Kinesis Data Stream that you want log records sent to. +* `partition_key`: A partition key is used to group data by shard within a stream. A Kinesis Data Stream uses the partition key that is associated with each data record to determine which shard a given data record belongs to. For example, if your logs come from Docker containers, you can use container_id as the partition key, and the logs will be grouped and stored on different shards depending upon the id of the container they were generated from. As the data within a shard are coarsely ordered, you will get all your logs from one container in one shard roughly in order. If you don't set a partition key or put an invalid one, a random key will be generated, and the logs will be directed to random shards. If the partition key is invalid, the plugin will print an warning message. +* `data_keys`: By default, the whole log record will be sent to Kinesis. If you specify key name(s) with this option, then only those keys and values will be sent to Kinesis. For example, if you are using the Fluentd Docker log driver, you can specify `data_keys log` and only the log message will be sent to Kinesis. If you specify multiple keys, they should be comma delimited. +* `role_arn`: ARN of an IAM role to assume (for cross account access). +* `endpoint`: Specify a custom endpoint for the Kinesis Streams API. +* `append_newline`: If you set append_newline as true, a newline will be addded after each log record. + +### Permissions + +The plugin requires `kinesis:PutRecords` permissions. + +### Credentials + +This plugin uses the AWS SDK for Go, and uses its [default credential provider chain](https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html). If you are using the plugin on Amazon EC2 or Amazon ECS, the plugin will use your EC2 instance role or ECS Task role permissions. The plugin can also retrieve credentials from a (shared credentials file)[https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html], or from the standard `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_SESSION_TOKEN` environment variables. + +### Environment Variables + +* `FLB_LOG_LEVEL`: Set the log level for the plugin. Valid values are: `debug`, `info`, and `error` (case insensitive). Default is `info`. **Note**: Setting log level in the Fluent Bit Configuration file using the Service key will not affect the plugin log level (because the plugin is external). +* `SEND_FAILURE_TIMEOUT`: Allows you to configure a timeout if the plugin can not send logs to Kinesis Streams. The timeout is specified as a [Golang duration](https://golang.org/pkg/time/#ParseDuration), for example: `5m30s`. If the plugin has failed to make any progress for the given period of time, then it will exit and kill Fluent Bit. This is useful in scenarios where you want your logging solution to fail fast if it has been misconfigured (i.e. network or credentials have not been set up to allow it to send to Kinesis Streams). + +### Fluent Bit Versions + +This plugin has been tested with Fluent Bit 1.2.0+. It may not work with older Fluent Bit versions. We recommend using the latest version of Fluent Bit as it will contain the newest features and bug fixes. + +### Example Fluent Bit Config File + +``` +[INPUT] + Name forward + Listen 0.0.0.0 + Port 24224 + +[OUTPUT] + Name kinesis + Match * + region us-west-2 + stream my-kinesis-stream-name + partition_key container_id + append_newline true +``` diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go new file mode 100644 index 0000000..ce2e7ad --- /dev/null +++ b/fluent-bit-kinesis.go @@ -0,0 +1,156 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package main + +import ( + "fmt" + "C" + "unsafe" + "strings" + + "github.com/aws/amazon-kinesis-streams-for-fluent-bit/kinesis" + "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" + "github.com/fluent/fluent-bit-go/output" + "github.com/sirupsen/logrus" +) + +var ( + pluginInstances []*kinesis.OutputPlugin +) + +func addPluginInstance(ctx unsafe.Pointer) error { + pluginID := len(pluginInstances) + output.FLBPluginSetContext(ctx, pluginID) + instance, err := newKinesisOutput(ctx, pluginID) + if err != nil { + return err + } + + pluginInstances = append(pluginInstances, instance) + return nil +} + +func getPluginInstance(ctx unsafe.Pointer) *kinesis.OutputPlugin { + pluginID := output.FLBPluginGetContext(ctx).(int) + return pluginInstances[pluginID] +} + + +func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin, error) { + stream := output.FLBPluginConfigKey(ctx, "stream") + logrus.Infof("[kinesis %d] plugin parameter stream = '%s'", pluginID, stream) + region := output.FLBPluginConfigKey(ctx, "region") + logrus.Infof("[kinesis %d] plugin parameter region = '%s'", pluginID, region) + dataKeys := output.FLBPluginConfigKey(ctx, "data_keys") + logrus.Infof("[kinesis %d] plugin parameter data_keys = '%s'", pluginID, dataKeys) + partitionKey := output.FLBPluginConfigKey(ctx, "partition_key") + logrus.Infof("[kinesis %d] plugin parameter partition_key = '%s'", pluginID, partitionKey) + roleARN := output.FLBPluginConfigKey(ctx, "role_arn") + logrus.Infof("[kinesis %d] plugin parameter role_arn = '%s'", pluginID, roleARN) + endpoint := output.FLBPluginConfigKey(ctx, "endpoint") + logrus.Infof("[kinesis %d] plugin parameter endpoint = '%s'", pluginID, endpoint) + appendNewline := output.FLBPluginConfigKey(ctx, "append_newline") + logrus.Infof("[kinesis %d] plugin parameter append_newline = %s", pluginID, appendNewline) + + if stream == "" || region == "" { + return nil, fmt.Errorf("[kinesis %d] stream and region are required configuration parameters", pluginID) + } + + if partitionKey == "log" { + return nil, fmt.Errorf("[kinesis %d] 'log' cannot be set as the partition key", pluginID) + } + + if partitionKey == "" { + logrus.Infof("[kinesis %d] no partition key provided. A random one will be generated.", pluginID) + } + + appendNL := false + if strings.ToLower(appendNewline) == "true" { + appendNL = true + } + return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, appendNL, pluginID) +} + + +// The "export" comments have syntactic meaning +// This is how the compiler knows a function should be callable from the C code + +//export FLBPluginRegister +func FLBPluginRegister(ctx unsafe.Pointer) int { + return output.FLBPluginRegister(ctx, "kinesis", "Amazon Kinesis Data Streams Fluent Bit Plugin.") +} + +//export FLBPluginInit +func FLBPluginInit(ctx unsafe.Pointer) int { + plugins.SetupLogger() + logrus.Debugf("[kinesis] Debug log level test successful") + err := addPluginInstance(ctx) + if err != nil { + logrus.Errorf("[kinesis] Failed to initialize plugin: %v\n", err) + return output.FLB_ERROR + } + return output.FLB_OK +} + +//export FLBPluginFlushCtx +func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { + var count int + var ret int + var record map[interface{}]interface{} + + // Create Fluent Bit decoder + dec := output.NewDecoder(data, int(length)) + + kinesisOutput := getPluginInstance(ctx) + fluentTag := C.GoString(tag) + logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag) + + for { + //Extract Record + ret, _, record = output.GetRecord(dec) + if ret != 0 { + break + } + + retCode := kinesisOutput.AddRecord(record) + if retCode != output.FLB_OK { + return retCode + } + count++ + } + err := kinesisOutput.Flush() + if err != nil { + logrus.Errorf("[kinesis %d] %v\n", kinesisOutput.PluginID, err) + return output.FLB_ERROR + } + logrus.Debugf("[kinesis %d] Processed %d events with tag %s\n", kinesisOutput.PluginID, count, fluentTag) + + return output.FLB_OK +} + +//export FLBPluginExit +func FLBPluginExit() int { + // Before final exit, call Flush() for all the instances of the Output Plugin + for i := range pluginInstances { + err := pluginInstances[i].Flush() + if err != nil { + logrus.Errorf("[kinesis %d] %v\n", pluginInstances[i].PluginID, err) + } + } + + return output.FLB_OK +} + +func main() { +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..284babd --- /dev/null +++ b/go.mod @@ -0,0 +1,13 @@ +module github.com/aws/amazon-kinesis-streams-for-fluent-bit + +go 1.12 + +require ( + github.com/aws/amazon-kinesis-firehose-for-fluent-bit v0.0.0-20190911230620-0883cb76f511 + github.com/aws/aws-sdk-go v1.25.1 + github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c + github.com/golang/mock v1.3.1 + github.com/json-iterator/go v1.1.7 + github.com/sirupsen/logrus v1.4.2 + github.com/stretchr/testify v1.3.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..57a1401 --- /dev/null +++ b/go.sum @@ -0,0 +1,47 @@ +github.com/aws/amazon-kinesis-firehose-for-fluent-bit v0.0.0-20190911230620-0883cb76f511 h1:h4ta9iM29D9zSiLCmr7ybUY5qcv2QgP+GVjipJ1Kx/4= +github.com/aws/amazon-kinesis-firehose-for-fluent-bit v0.0.0-20190911230620-0883cb76f511/go.mod h1:jZjLd+hsaK0oNVotfqbMK9gUsIhXvYEl7Z3tdZTYa54= +github.com/aws/aws-sdk-go v1.20.6/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/aws/aws-sdk-go v1.25.1 h1:d7zDXFT2Tgq/yw7Wku49+lKisE8Xc85erb+8PlE/Shk= +github.com/aws/aws-sdk-go v1.25.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/cenkalti/backoff v2.1.1+incompatible h1:tKJnvO2kl0zmb/jA5UKAt4VoEVw1qxKWjE/Bpp46npY= +github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fluent/fluent-bit-go v0.0.0-20190614024040-c017a8579953/go.mod h1:WQX+afhrekY9rGK+WT4xvKSlzmia9gDoLYu4GGYGASQ= +github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c h1:QwbffUs/+ptC4kTFPEN9Ej2latTq3bZJ5HO/OwPXYMs= +github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c/go.mod h1:WQX+afhrekY9rGK+WT4xvKSlzmia9gDoLYu4GGYGASQ= +github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= +github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/json-iterator/go v1.1.7 h1:KfgG9LzI+pYjr4xvmz/5H4FXjokeP+rlHLhv3iH62Fo= +github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/ugorji/go v1.1.4 h1:j4s+tAvLfL3bZyefP2SEWmhBzmuIlH/eqNuPdFPgngw= +github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= diff --git a/kinesis/generate_mock.go b/kinesis/generate_mock.go new file mode 100644 index 0000000..2aa8db0 --- /dev/null +++ b/kinesis/generate_mock.go @@ -0,0 +1,16 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package kinesis + +//go:generate mockgen.sh github.com/aws/amazon-kinesis-streams-for-fluent-bit/kinesis PutRecordsClient mock_kinesis/mock.go diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go new file mode 100644 index 0000000..073221c --- /dev/null +++ b/kinesis/kinesis.go @@ -0,0 +1,332 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +// Package kinesis contains the OutputPlugin which sends log records to Kinesis Stream +package kinesis + +import ( + "fmt" + "os" + "time" + "math/rand" + + "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/credentials/stscreds" + "github.com/aws/aws-sdk-go/aws/endpoints" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + fluentbit "github.com/fluent/fluent-bit-go/output" + jsoniter "github.com/json-iterator/go" + "github.com/sirupsen/logrus" +) + +const ( + partitionKeyCharset = "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" +) + +const ( + // Kinesis API Limit https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords + maximumRecordsPerPut = 500 + maximumPutRecordBatchSize = 1024 * 1024 * 5 // 5 MB + maximumRecordSize = 1024 * 1024 // 1 MB + + partitionKeyMaxLength = 256 +) + +// PutRecordsClient contains the kinesis PutRecords method call +type PutRecordsClient interface{ + PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) +} + +type random struct { + seededRandom *rand.Rand + buffer []byte +} + +// OutputPlugin sends log records to kinesis +type OutputPlugin struct { + // The name of the stream that you want log records sent to + stream string + // If specified, only these keys and values will be send as the log record + dataKeys string + // If specified, the value of that data key will be used as the partition key. + // Otherwise a random string will be used. + // Partition key decides in which shard of your stream the data belongs to + partitionKey string + // Decides whether to append a newline after each data record + appendNewline bool + lastInvalidPartitionKeyIndex int + client PutRecordsClient + records []*kinesis.PutRecordsRequestEntry + dataLength int + backoff *plugins.Backoff + timer *plugins.Timeout + PluginID int + random *random +} + +// NewOutputPlugin creates an OutputPlugin object +func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint string, appendNewline bool, pluginID int) (*OutputPlugin, error) { + client, err := newPutRecordsClient(roleARN, region, endpoint) + if err != nil { + return nil, err + } + + records := make([]*kinesis.PutRecordsRequestEntry, 0, maximumRecordsPerPut) + timer, err := plugins.NewTimeout(func (d time.Duration) { + logrus.Errorf("[kinesis %d] timeout threshold reached: Failed to send logs for %s\n", pluginID, d.String()) + logrus.Errorf("[kinesis %d] Quitting Fluent Bit", pluginID) + os.Exit(1) + }) + + if err != nil { + return nil, err + } + + seededRand := rand.New(rand.NewSource(time.Now().UnixNano())) + random := &random{ + seededRandom: seededRand, + buffer: make([]byte, 8), + } + + return &OutputPlugin{ + stream: stream, + client: client, + records: records, + dataKeys: dataKeys, + partitionKey: partitionKey, + appendNewline: appendNewline, + lastInvalidPartitionKeyIndex: -1, + backoff: plugins.NewBackoff(), + timer: timer, + PluginID: pluginID, + random: random, + }, nil +} + +// newPutRecordsClient creates the Kinesis client for calling the PutRecords method +func newPutRecordsClient(roleARN string, awsRegion string, endpoint string) (*kinesis.Kinesis, error) { + sess, err := session.NewSession(&aws.Config{ + Region: aws.String(awsRegion), + }) + if err != nil { + return nil, err + } + + svcConfig := &aws.Config{} + if endpoint != "" { + defaultResolver := endpoints.DefaultResolver() + cwCustomResolverFn := func(service, region string, optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) { + if service == "kinesis" { + return endpoints.ResolvedEndpoint{ + URL: endpoint, + }, nil + } + return defaultResolver.EndpointFor(service, region, optFns...) + } + svcConfig.EndpointResolver = endpoints.ResolverFunc(cwCustomResolverFn) + } + + if roleARN != "" { + creds := stscreds.NewCredentials(sess, roleARN) + svcConfig.Credentials = creds + } + + client := kinesis.New(sess, svcConfig) + client.Handlers.Build.PushBackNamed(plugins.CustomUserAgentHandler()) + return client, nil +} + +// AddRecord accepts a record and adds it to the buffer, flushing the buffer if it is full +// the return value is one of: FLB_OK FLB_RETRY +// API Errors lead to an FLB_RETRY, and data processing errors are logged, the record is discarded and FLB_OK is returned +func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}) int { + data, err := outputPlugin.processRecord(record) + if err != nil { + logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) + // discard this single bad record instead and let the batch continue + return fluentbit.FLB_OK + } + + newDataSize := len(data) + + if len(outputPlugin.records) == maximumRecordsPerPut || (outputPlugin.dataLength+newDataSize) > maximumPutRecordBatchSize { + err = outputPlugin.sendCurrentBatch() + if err != nil { + logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) + // If FluentBit fails to send logs, it will retry rather than discarding the logs + return fluentbit.FLB_RETRY + } + } + + partitionKey := outputPlugin.getPartitionKey(record) + outputPlugin.records = append(outputPlugin.records, &kinesis.PutRecordsRequestEntry{ + Data: data, + PartitionKey: aws.String(partitionKey), + }) + outputPlugin.dataLength += newDataSize + return fluentbit.FLB_OK +} + +// Flush sends the current buffer of log records +func (outputPlugin *OutputPlugin) Flush() error { + return outputPlugin.sendCurrentBatch() +} + +func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface{}) ([]byte, error) { + if outputPlugin.dataKeys != "" { + record = plugins.DataKeys(outputPlugin.dataKeys, record) + } + + var err error + record, err = plugins.DecodeMap(record) + if err != nil { + logrus.Debugf("[kinesis %d] Failed to decode record: %v\n", outputPlugin.PluginID, record) + return nil, err + } + + var json = jsoniter.ConfigCompatibleWithStandardLibrary + data, err := json.Marshal(record) + if err != nil { + logrus.Debugf("[kinesis %d] Failed to marshal record: %v\n", outputPlugin.PluginID, record) + return nil, err + } + + // append a newline after each log record + if outputPlugin.appendNewline { + data = append(data, []byte("\n")...) + } + + if len(data) > maximumRecordSize { + return nil, fmt.Errorf("Log record greater than max size allowed by Kinesis") + } + + return data, nil +} + +func (outputPlugin *OutputPlugin) sendCurrentBatch() error { + if outputPlugin.lastInvalidPartitionKeyIndex >= 0 { + logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, outputPlugin.records[outputPlugin.lastInvalidPartitionKeyIndex].Data) + outputPlugin.lastInvalidPartitionKeyIndex = -1 + } + outputPlugin.backoff.Wait() + outputPlugin.timer.Check() + + response, err := outputPlugin.client.PutRecords(&kinesis.PutRecordsInput{ + Records: outputPlugin.records, + StreamName: aws.String(outputPlugin.stream), + }) + if err != nil { + logrus.Errorf("[kinesis %d] PutRecords failed with %v\n", outputPlugin.PluginID, err) + outputPlugin.timer.Start() + if aerr, ok := err.(awserr.Error); ok { + if aerr.Code() == kinesis.ErrCodeProvisionedThroughputExceededException { + logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) + // Backoff and Retry + outputPlugin.backoff.StartBackoff() + } + } + return err + } + logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(outputPlugin.records)) + + return outputPlugin.processAPIResponse(response) +} + +// processAPIResponse processes the successful and failed records +// it returns an error iff no records succeeded (i.e.) no progress has been made +func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecordsOutput) error { + if aws.Int64Value(response.FailedRecordCount) > 0 { + // start timer if all records failed (no progress has been made) + if aws.Int64Value(response.FailedRecordCount) == int64(len(outputPlugin.records)) { + outputPlugin.timer.Start() + return fmt.Errorf("PutRecords request returned with no records successfully recieved") + } + + logrus.Errorf("[kinesis %d] %d records failed to be delivered\n", outputPlugin.PluginID, aws.Int64Value(response.FailedRecordCount)) + failedRecords := make([]*kinesis.PutRecordsRequestEntry, 0, aws.Int64Value(response.FailedRecordCount)) + // try to resend failed records + for i, record := range response.Records { + if record.ErrorMessage != nil { + logrus.Debugf("[kinesis %d] Record failed to send with error: %s\n", outputPlugin.PluginID, aws.StringValue(record.ErrorMessage)) + failedRecords = append(failedRecords, outputPlugin.records[i]) + } + if aws.StringValue(record.ErrorCode) == kinesis.ErrCodeProvisionedThroughputExceededException { + // Backoff and Retry + outputPlugin.backoff.StartBackoff() + } + } + + outputPlugin.records = outputPlugin.records[:0] + outputPlugin.records = append(outputPlugin.records, failedRecords...) + outputPlugin.dataLength = 0 + for _, record := range outputPlugin.records { + outputPlugin.dataLength += len(record.Data) + } + } else { + //request fully succeeded + for i, record := range response.Records { + logrus.Debugf("[kinesis %d] record- %d:Shard ID: %s", outputPlugin.PluginID, i, aws.StringValue(record.ShardId)) + } + outputPlugin.timer.Reset() + outputPlugin.backoff.Reset() + outputPlugin.records = outputPlugin.records[:0] + outputPlugin.dataLength = 0 + } + return nil +} + +//randomString generates a random string of length 8 +//it uses the math/rand library +func (outputPlugin *OutputPlugin) randomString() string { + for i := range outputPlugin.random.buffer { + outputPlugin.random.buffer[i] = partitionKeyCharset[outputPlugin.random.seededRandom.Intn(len(partitionKeyCharset))] + } + return string(outputPlugin.random.buffer) +} + +//getPartitionKey returns the value for a given valid key +//if the given key is emapty or invalid, it returns a random string +func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interface{}) string { + partitionKey := outputPlugin.partitionKey + if partitionKey != "" { + for k, v := range record { + dataKey := stringOrByteArray(k) + if(dataKey == partitionKey) { + value := stringOrByteArray(v) + if(value != "") { + if len(value) > partitionKeyMaxLength { + value = value[0:partitionKeyMaxLength] + } + return value + } + } + } + outputPlugin.lastInvalidPartitionKeyIndex = len(outputPlugin.records) + } + return outputPlugin.randomString() +} + +//stringOrByteArray returns the string value if the input is a string or byte array otherwise an empty string +func stringOrByteArray(v interface{}) string { + switch t := v.(type) { + case []byte: + return string(t) + case string: + return t + default: + return "" + } +} diff --git a/kinesis/kinesis_test.go b/kinesis/kinesis_test.go new file mode 100644 index 0000000..f3198f6 --- /dev/null +++ b/kinesis/kinesis_test.go @@ -0,0 +1,101 @@ +package kinesis + +import ( + "os" + "time" + "math/rand" + "testing" + + "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/stretchr/testify/assert" + "github.com/sirupsen/logrus" + "github.com/golang/mock/gomock" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/amazon-kinesis-streams-for-fluent-bit/kinesis/mock_kinesis" + fluentbit "github.com/fluent/fluent-bit-go/output" +) + +// newMockOutputPlugin creates an mock OutputPlugin object +func newMockOutputPlugin(client *mock_kinesis.MockPutRecordsClient) (*OutputPlugin, error) { + records := make([]*kinesis.PutRecordsRequestEntry, 0, 500) + + timer, _ := plugins.NewTimeout(func (d time.Duration) { + logrus.Errorf("[kinesis] timeout threshold reached: Failed to send logs for %v", d) + logrus.Errorf("[kinesis] Quitting Fluent Bit") + os.Exit(1) + }) + + seededRand := rand.New(rand.NewSource(time.Now().UnixNano())) + b := make([]byte, 8) + random := &random{ + seededRandom: seededRand, + buffer: b, + } + + return &OutputPlugin{ + stream: "stream", + client: client, + records: records, + dataKeys: "", + partitionKey: "", + lastInvalidPartitionKeyIndex: -1, + backoff: plugins.NewBackoff(), + timer: timer, + PluginID: 0, + random: random, + }, nil +} + +// Test cases for TestStringOrByteArray +var testCases = []struct { + input interface{} + output string +}{ + {"testString", "testString"}, + {35344, ""}, + {[]byte{'b', 'y', 't', 'e'}, "byte"}, + {nil, ""}, +} + +func TestStringOrByteArray(t *testing.T) { + for _, testCase := range testCases { + result := stringOrByteArray(testCase.input) + if result != testCase.output { + t.Errorf("[Test Failed] Expeced: %s, Returned: %s", testCase.output, result) + } + } +} + +func TestAddRecord(t *testing.T) { + record := map[interface{}]interface{} { + "testkey": []byte("test value"), + } + + outputPlugin, _ := newMockOutputPlugin(nil) + + retCode := outputPlugin.AddRecord(record) + assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") + assert.Len(t, outputPlugin.records, 1, "Expected output to contain 1 record") +} + +func TestAddRecordAndFlush(t *testing.T) { + record := map[interface{}]interface{} { + "testkey": []byte("test value"), + } + + ctrl := gomock.NewController(t) + mockKinesis := mock_kinesis.NewMockPutRecordsClient(ctrl) + + mockKinesis.EXPECT().PutRecords(gomock.Any()).Return(&kinesis.PutRecordsOutput{ + FailedRecordCount: aws.Int64(0), + }, nil) + + outputPlugin, _ := newMockOutputPlugin(mockKinesis) + + retCode := outputPlugin.AddRecord(record) + assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") + + err := outputPlugin.Flush() + assert.NoError(t, err, "Unexpected error calling flush") +} diff --git a/kinesis/mock_kinesis/mock.go b/kinesis/mock_kinesis/mock.go new file mode 100644 index 0000000..5642fdc --- /dev/null +++ b/kinesis/mock_kinesis/mock.go @@ -0,0 +1,63 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/aws/amazon-kinesis-streams-for-fluent-bit/kinesis (interfaces: PutRecordsClient) + +// Package mock_kinesis is a generated GoMock package. +package mock_kinesis + +import ( + reflect "reflect" + + kinesis "github.com/aws/aws-sdk-go/service/kinesis" + gomock "github.com/golang/mock/gomock" +) + +// MockPutRecordsClient is a mock of PutRecordsClient interface +type MockPutRecordsClient struct { + ctrl *gomock.Controller + recorder *MockPutRecordsClientMockRecorder +} + +// MockPutRecordsClientMockRecorder is the mock recorder for MockPutRecordsClient +type MockPutRecordsClientMockRecorder struct { + mock *MockPutRecordsClient +} + +// NewMockPutRecordsClient creates a new mock instance +func NewMockPutRecordsClient(ctrl *gomock.Controller) *MockPutRecordsClient { + mock := &MockPutRecordsClient{ctrl: ctrl} + mock.recorder = &MockPutRecordsClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockPutRecordsClient) EXPECT() *MockPutRecordsClientMockRecorder { + return m.recorder +} + +// PutRecords mocks base method +func (m *MockPutRecordsClient) PutRecords(arg0 *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PutRecords", arg0) + ret0, _ := ret[0].(*kinesis.PutRecordsOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PutRecords indicates an expected call of PutRecords +func (mr *MockPutRecordsClientMockRecorder) PutRecords(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutRecords", reflect.TypeOf((*MockPutRecordsClient)(nil).PutRecords), arg0) +} diff --git a/scripts/mockgen.sh b/scripts/mockgen.sh new file mode 100755 index 0000000..1ed7e1d --- /dev/null +++ b/scripts/mockgen.sh @@ -0,0 +1,44 @@ +#!/bin/bash +# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the +# "License"). You may not use this file except in compliance +# with the License. A copy of the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and +# limitations under the License. +# +# This script wraps the mockgen tool and inserts licensing information. + +set -e +package=${1?Must provide package} +interfaces=${2?Must provide interface names} +outputfile=${3?Must provide an output file} + +export PATH="${GOPATH//://bin:}/bin:$PATH" + +data=$( +cat << EOF +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +$(mockgen "$package" "$interfaces") +EOF +) + +echo "$data" | goimports > "${outputfile}"