Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of the amazon kinesis data streams fluent bit plugin #1

Merged
merged 12 commits into from
Oct 17, 2019
Merged
16 changes: 6 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,22 @@ SCRIPT_PATH := $(ROOT)/scripts/:${PATH}
SOURCES := $(shell find . -name '*.go')
PLUGIN_BINARY := ./bin/kinesis.so

.PHONY: build
build: $(PLUGIN_BINARY)

$(PLUGIN_BINARY): $(SOURCES)
PATH=${PATH} golint ./kinesis
mkdir -p ./bin
go build -buildmode c-shared -o ./bin/kinesis.so ./
@echo "Built Amazon Kinesis Data Streams Fluent Bit Plugin"

.PHONY: release
release:
hencrice marked this conversation as resolved.
Show resolved Hide resolved
mkdir -p ./bin
go build -buildmode c-shared -o ./bin/kinesis.so ./
@echo "Built Amazon Kinesis Data Streams Fluent Bit Plugin"

.PHONY: build
build: $(PLUGIN_BINARY) release

$(PLUGIN_BINARY): $(SOURCES)
PATH=${PATH} golint ./kinesis

.PHONY: generate
generate: $(SOURCES)
PATH=$(SCRIPT_PATH) go generate ./...


.PHONY: test
test:
go test -timeout=120s -v -cover ./...
Expand Down
50 changes: 45 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,49 @@
## Fluent Bit Plugin for Kinesis Streams
## Fluent Bit Plugin for Amazon Kinesis Data Streams

A Fluent Bit output plugin for Kinesis Streams
A Fluent Bit output plugin for Amazon Kinesis Data Streams.

# Under Development
#### Security disclosures

## License
If you think you’ve found a potential security issue, please do not post it in the Issues. Instead, please follow the instructions [here](https://aws.amazon.com/security/vulnerability-reporting/) or email AWS security directly at [[email protected]](mailto:[email protected]).

This library is licensed under the Apache 2.0 License.
### Plugin Options

* `region`: The region which your Kinesis stream(s) is/are in.
Copy link
Contributor

@PettitWesley PettitWesley Oct 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it "stream(s)"... you can't have more than one in a single configuration.

* `stream`: The name of the stream that you want log records sent to.
* `partition_key`: A partition key is used to group data by shard within a stream. A Kinesis Data Stream uses the partition key that is associated with each data record to determine which shard a given data record belongs to. For example, if your logs come from Docker containers, you can use container_id as the partition key, and the logs will be grouped and stored on different shards depending upon the id of the container they were generated from. As the data within a shard are coarsely ordered, you will get all your logs from one container in one shard roughly in order. If you don't set a partition key or put an invalid one, a random key will be generated, and the logs will be directed to random shards. In case of invalid partition key, the plugin will print an warning message.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"In case of invalid partition key" => "If the partition key is invalid"

* `data_keys`: By default, the whole log record will be sent to Kinesis. If you specify a key name(s) with this option, then only those keys and values will be sent to Kinesis. For example, if you are using the Fluentd Docker log driver, you can specify `data_keys log` and only the log message will be sent to Kinesis. If you specify multiple keys, they should be comma delimited.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"If you specify a key name(s)" => "If you specify key name(s)"

* `role_arn`: ARN of an IAM role to assume (for cross account access).
* `endpoint`: Specify a custom endpoint for the Kinesis Streams API.

### Permissions

The plugin requires `kinesis:PutRecords` permissions.

### Credentials

This plugin uses the AWS SDK Go, and uses its [default credential provider chain](https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html). If you are using the plugin on Amazon EC2 or Amazon ECS, the plugin will use your EC2 instance role or ECS Task role permissions. The plugin can also retrieve credentials from a (shared credentials file)[https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html], or from the standard `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_SESSION_TOKEN` environment variables.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"uses the AWS SDK Go" -> "uses the Go AWS SDK"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about- "AWS SDK for Go"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are using the default credential provider chain, maybe we should just link to that and don't explain it again here.


### Environment Variables

* `FLB_LOG_LEVEL`: Set the log level for the plugin. Valid values are: `debug`, `info`, and `error` (case insensitive). Default is `info`. **Note**: Setting log level in the Fluent Bit Configuration file using the Service key will not affect the plugin log level (because the plugin is external).
* `SEND_FAILURE_TIMEOUT`: Allows you to configure a timeout if the plugin can not send logs to Kinesis Streams. The timeout is specified as a [Golang duration](https://golang.org/pkg/time/#ParseDuration), for example: `5m30s`. If the plugin has failed to make any progress for the given period of time, then it will exit and kill Fluent Bit. This is useful in scenarios where you want your logging solution to fail fast if it has been misconfigured (i.e. network or credentials have not been set up to allow it to send to Kinesis Streams).

### Fluent Bit Versions

This plugin has been tested with Fluent Bit 1.2.0+. It may not work with older Fluent Bit versions. We recommend using the latest version of Fluent Bit as it will contain the newest features and bug fixes.

### Example Fluent Bit Config File

```
[INPUT]
Name forward
Listen 0.0.0.0
Port 24224

[OUTPUT]
Name kinesis
Match *
region us-west-2
stream my-kinesis-stream-name
partition_key container_id
```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add the Docker Hub and ECR links, same as in the Firehose readme: https://github.com/aws/amazon-kinesis-firehose-for-fluent-bit#docker-hub

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will update it onec our images are avaiable on Docker Hub and ECR.

25 changes: 19 additions & 6 deletions fluent-bit-kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@
package main

import (
"fmt"
"C"
"unsafe"
"strings"

"github.com/aws/amazon-kinesis-streams-for-fluent-bit/kinesis"
"github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins"
"github.com/fluent/fluent-bit-go/output"
"github.com/sirupsen/logrus"
)
import (
"fmt"
)

var (
pluginInstances []*kinesis.OutputPlugin
Expand Down Expand Up @@ -56,11 +55,13 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
dataKeys := output.FLBPluginConfigKey(ctx, "data_keys")
logrus.Infof("[kinesis %d] plugin parameter data_keys = '%s'", pluginID, dataKeys)
partitionKey := output.FLBPluginConfigKey(ctx, "partition_key")
logrus.Infof("[kinesis %d] plugin parameter partition_key = %s", pluginID, partitionKey)
logrus.Infof("[kinesis %d] plugin parameter partition_key = '%s'", pluginID, partitionKey)
roleARN := output.FLBPluginConfigKey(ctx, "role_arn")
logrus.Infof("[kinesis %d] plugin parameter role_arn = '%s'", pluginID, roleARN)
endpoint := output.FLBPluginConfigKey(ctx, "endpoint")
logrus.Infof("[kinesis %d] plugin parameter endpoint = '%s'", pluginID, endpoint)
appendNewline := output.FLBPluginConfigKey(ctx, "append_newline")
logrus.Infof("[kinesis %d] plugin parameter append_newline = %s", pluginID, appendNewline)

if stream == "" || region == "" {
return nil, fmt.Errorf("[kinesis %d] stream and region are required configuration parameters", pluginID)
Expand All @@ -74,7 +75,11 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
logrus.Infof("[kinesis %d] no partition key provided. A random one will be generated.", pluginID)
}

return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, pluginID)
appendNL := false
if strings.ToLower(appendNewline) == "true" {
appendNL = true
}
return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, appendNL, pluginID)
}


Expand All @@ -89,7 +94,7 @@ func FLBPluginRegister(ctx unsafe.Pointer) int {
//export FLBPluginInit
func FLBPluginInit(ctx unsafe.Pointer) int {
plugins.SetupLogger()

logrus.Debugf("[kinesis] Debug log level test successful")
err := addPluginInstance(ctx)
if err != nil {
logrus.Errorf("[kinesis] Failed to initialize plugin: %v\n", err)
Expand Down Expand Up @@ -136,6 +141,14 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int

//export FLBPluginExit
func FLBPluginExit() int {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fluentbit guarantee that all the plugin instances have a chance to flush? Asking because I'm not sure whether this method should be as simple as just returning ok or it should implement some form of graceful shutdown.

Copy link
Contributor Author

@hossain-rayhan hossain-rayhan Oct 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, before the final exit, calling Flush() for all the instances as the final try. Also, printing the error message if it fails for any plugin instance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hossain-rayhan Do you want to create issues on the other fluent bit plugin repos to request the same change. All of them should have this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PettitWesley Good thought. I will create issues for the other fluent-bit plugins.

// Before final exit, call Flush() for all the instances of the Output Plugin
for i := range pluginInstances {
err := pluginInstances[i].Flush()
if err != nil {
logrus.Errorf("[kinesis %d] %v\n", pluginInstances[i].PluginID, err)
}
}

return output.FLB_OK
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably return FLB_ERROR if any of the flushes failed, so that Fluent Bit can record the failure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure but earlier we decied only to print the error messages in case of failure. And, finally return FLB_OK from the FLBPluginExit() as the default behaviour.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 We should clearly define the behavior and be explicit in the doc.

}

Expand Down
53 changes: 32 additions & 21 deletions kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

// Package kinesis containers the OutputPlugin which sends log records to Kinesis Stream
// Package kinesis contains the OutputPlugin which sends log records to Kinesis Stream
package kinesis

import (
Expand Down Expand Up @@ -39,8 +39,8 @@ const (
const (
// Kinesis API Limit https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords
maximumRecordsPerPut = 500
maximumPutRecordBatchSize = 5242880 // 5 MB
maximumRecordSize = 1048576// 1 MB
maximumPutRecordBatchSize = 1024 * 1024 * 5 // 5 MB
maximumRecordSize = 1024 * 1024 // 1 MB
)

// PutRecordsClient contains the kinesis PutRecords method call
Expand All @@ -55,10 +55,16 @@ type random struct {

// OutputPlugin sends log records to kinesis
type OutputPlugin struct {
region string
// The name of the stream that you want log records sent to
stream string
// If specified, only these keys and values will be send as the log record
dataKeys string
// If specified, the value of that data key will be used as the partition key.
// Otherwise a random string will be used.
// Partition key decides in which shard of your stream the data belongs to
partitionKey string
// Decides whether to append a newline after each data record
appendNewline bool
lastInvalidPartitionKeyIndex int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused on what this field actually does. Seems like we just use it to log info messages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To prvide the error message with proper description-- for which exact data record the partition key was invalid. It will help the customer to debug faster.

client PutRecordsClient
records []*kinesis.PutRecordsRequestEntry
Expand All @@ -70,16 +76,12 @@ type OutputPlugin struct {
}

// NewOutputPlugin creates an OutputPlugin object
func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint string, pluginID int) (*OutputPlugin, error) {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
})
func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint string, appendNewline bool, pluginID int) (*OutputPlugin, error) {
client, err := newPutRecordsClient(roleARN, region, endpoint)
if err != nil {
return nil, err
}

client := newPutRecordsClient(roleARN, sess, endpoint)

records := make([]*kinesis.PutRecordsRequestEntry, 0, maximumRecordsPerPut)
timer, err := plugins.NewTimeout(func (d time.Duration) {
logrus.Errorf("[kinesis %d] timeout threshold reached: Failed to send logs for %s\n", pluginID, d.String())
Expand All @@ -90,20 +92,20 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint s
if err != nil {
return nil, err
}

seededRand := rand.New(rand.NewSource(time.Now().UnixNano()))
b := make([]byte, 8)
random := &random{
seededRandom: seededRand,
buffer: b,
buffer: make([]byte, 8),
}

return &OutputPlugin{
region: region,
stream: stream,
client: client,
records: records,
dataKeys: dataKeys,
partitionKey: partitionKey,
appendNewline: appendNewline,
lastInvalidPartitionKeyIndex: -1,
backoff: plugins.NewBackoff(),
timer: timer,
Expand All @@ -113,7 +115,14 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint s
}

// newPutRecordsClient creates the Kinesis client for calling the PutRecords method
func newPutRecordsClient(roleARN string, sess *session.Session, endpoint string) *kinesis.Kinesis {
func newPutRecordsClient(roleARN string, awsRegion string, endpoint string) (*kinesis.Kinesis, error) {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(awsRegion),
})
if err != nil {
return nil, err
}

svcConfig := &aws.Config{}
if endpoint != "" {
defaultResolver := endpoints.DefaultResolver()
Expand All @@ -135,12 +144,12 @@ func newPutRecordsClient(roleARN string, sess *session.Session, endpoint string)

client := kinesis.New(sess, svcConfig)
client.Handlers.Build.PushBackNamed(plugins.CustomUserAgentHandler())
return client
return client, nil
}

// AddRecord accepts a record and adds it to the buffer, flushing the buffer if it is full
// the return value is one of: FLB_OK FLB_RETRY
// API Errors lead to an FLB_RETRY, and all other errors are logged, the record is discarded and FLB_OK is returned
// API Errors lead to an FLB_RETRY, and data processing errors are logged, the record is discarded and FLB_OK is returned
func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}) int {
data, err := outputPlugin.processRecord(record)
if err != nil {
Expand All @@ -155,7 +164,7 @@ func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{})
err = outputPlugin.sendCurrentBatch()
if err != nil {
logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err)
// send failures are retryable
// If FluentBit fails to send logs, it will retry rather than discarding the logs
return fluentbit.FLB_RETRY
}
}
Expand All @@ -169,7 +178,7 @@ func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{})
return fluentbit.FLB_OK
}

// Flush sends the current buffer of records
// Flush sends the current buffer of log records
func (outputPlugin *OutputPlugin) Flush() error {
return outputPlugin.sendCurrentBatch()
}
Expand All @@ -193,8 +202,10 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface
return nil, err
}

// append newline
data = append(data, []byte("\n")...)
// append a newline after each log record
if outputPlugin.appendNewline {
data = append(data, []byte("\n")...)
}

if len(data) > maximumRecordSize {
return nil, fmt.Errorf("Log record greater than max size allowed by Kinesis")
Expand All @@ -205,7 +216,7 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface

func (outputPlugin *OutputPlugin) sendCurrentBatch() error {
if outputPlugin.lastInvalidPartitionKeyIndex >= 0 {
logrus.Infof("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s.", outputPlugin.PluginID, outputPlugin.partitionKey, outputPlugin.records[outputPlugin.lastInvalidPartitionKeyIndex].Data)
logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, outputPlugin.records[outputPlugin.lastInvalidPartitionKeyIndex].Data)
outputPlugin.lastInvalidPartitionKeyIndex = -1
}
outputPlugin.backoff.Wait()
Expand Down
56 changes: 14 additions & 42 deletions kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func newMockOutputPlugin(client *mock_kinesis.MockPutRecordsClient) (*OutputPlug
}

return &OutputPlugin{
region: "us-west-2",
stream: "stream",
client: client,
records: records,
Expand All @@ -48,34 +47,22 @@ func newMockOutputPlugin(client *mock_kinesis.MockPutRecordsClient) (*OutputPlug
}, nil
}

func TestStringOrByteArray(t *testing.T) {
s := stringOrByteArray("testString")
assert.Equal(t, s, "testString", "Expected value to be a non-empty string")

s = stringOrByteArray(2353425)
assert.Equal(t, s, "", "Expected value to be an empty string")

b := []byte{'b', 'y', 't', 'e'}
s = stringOrByteArray(b)
assert.Equal(t, s, "byte", "Expected value to be a non-empty string")
// Test cases for TestStringOrByteArray
var testCases = []struct {
input interface{}
output string
}{
{"testString", "testString"},
{35344, ""},
{[]byte{'b', 'y', 't', 'e'}, "byte"},
{nil, ""},
}

func TestStringOrByteArrayWithNil(t *testing.T) {
s := stringOrByteArray(nil)
assert.Equal(t, s, "", "Expected value to be an empty string")
}

func TestRandomString(t *testing.T) {
randomStringMap := make(map[string]bool)
outputPlugin, _ := newMockOutputPlugin(nil)

for i := 0; i < 10; i++ {
s := outputPlugin.randomString()
if _, isInMap := randomStringMap[s]; isInMap {
t.Errorf("Duplicate value found")
break
}else{
randomStringMap[s] = true
func TestStringOrByteArray(t *testing.T) {
for _, testCase := range testCases {
result := stringOrByteArray(testCase.input)
if result != testCase.output {
t.Errorf("[Test Failed] Expeced: %s, Returned: %s", testCase.output, result)
}
}
}
Expand Down Expand Up @@ -112,18 +99,3 @@ func TestAddRecordAndFlush(t *testing.T) {
err := outputPlugin.Flush()
assert.NoError(t, err, "Unexpected error calling flush")
}

func BenchmarkRandomString(b *testing.B) {
randomStringMap := make(map[string]bool)
outputPlugin, _ := newMockOutputPlugin(nil)

for i := 0; i < b.N; i++ {
s := outputPlugin.randomString()
if _, isInMap := randomStringMap[s]; isInMap {
b.Errorf("Duplicate value found")
break
}else{
randomStringMap[s] = true
}
}
}