From f7608fa2fdbafed218d71d60d5e2361d7dcf79ed Mon Sep 17 00:00:00 2001 From: Yusuke KUOKA Date: Wed, 2 May 2018 22:00:24 +0900 Subject: [PATCH 1/2] feat: Kinesis Data Streams support Makes following changes to resolve #5 - Add `streams` package for the Kinesis Data Streams output plugin - Updated `plugins/firehose/main.go` to also include the brand-new Streams plugin - Rename `plugins/firehose` to `plugins/kinesis`, and `firehose.so` to `kinesis.so` according to the scope of the plugin - Update README accordingly --- Makefile | 12 ++- README.md | 18 +++- firehose/client.go | 8 +- firehose/client_test.go | 4 +- firehose/config.go | 2 +- firehose/firehose.go | 6 +- plugins/{firehose => kinesis}/main.go | 2 + streams/client.go | 133 ++++++++++++++++++++++++++ streams/client_test.go | 54 +++++++++++ streams/config.go | 54 +++++++++++ streams/config_test.go | 43 +++++++++ streams/streams.go | 39 ++++++++ 12 files changed, 360 insertions(+), 15 deletions(-) rename plugins/{firehose => kinesis}/main.go (74%) create mode 100644 streams/client.go create mode 100644 streams/client_test.go create mode 100644 streams/config.go create mode 100644 streams/config_test.go create mode 100644 streams/streams.go diff --git a/Makefile b/Makefile index 9a098ab..e390ea2 100644 --- a/Makefile +++ b/Makefile @@ -7,12 +7,18 @@ AWSBEATS_VERSION ?= "1-snapshot" all: test beats build test: + test -z "$$(find . -path ./vendor -prune -type f -o -name '*.go' -exec gofmt -d {} + | tee /dev/stderr)" go test ./firehose -v -coverprofile=coverage.txt -covermode=atomic + go test ./streams -v -coverprofile=coverage.txt -covermode=atomic -build: - go build -buildmode=plugin ./plugins/firehose +format: + test -z "$$(find . -path ./vendor -prune -type f -o -name '*.go' -exec gofmt -d {} + | tee /dev/stderr)" || \ + test -z "$$(find . -path ./vendor -prune -type f -o -name '*.go' -exec gofmt -w {} + | tee /dev/stderr)" + +build: format + go build -buildmode=plugin ./plugins/kinesis @mkdir -p "$(CURDIR)/target" - @mv firehose.so "$(CURDIR)/target/firehose-$(AWSBEATS_VERSION)-$(BEATS_VERSION)-$(GO_VERSION).so" + @mv kinesis.so "$(CURDIR)/target/kinesis-$(AWSBEATS_VERSION)-$(BEATS_VERSION)-$(GO_VERSION).so" beats: ifdef BEATS_VERSION diff --git a/README.md b/README.md index ee7e338..c43c840 100644 --- a/README.md +++ b/README.md @@ -11,13 +11,27 @@ __NOTE: Filebeat and plugin should be built using the same Golang version.__ ## Quick start - Download binary files from https://github.com/s12v/awsbeats/releases + +### Firehose + - Add to `filebeats.yml`: ``` output.firehose: region: eu-central-1 stream_name: test1 # Your delivery stream name ``` -- Run filebeat with plugin `./filebeat-v6.1.3-go1.10rc1-linux-amd64 -plugin firehose.so-0.0.3-v6.1.3-go1.10rc1-linux-amd64` +- Run filebeat with plugin `./filebeat-v6.1.3-go1.10rc1-linux-amd64 -plugin kinesis.so-0.0.3-v6.1.3-go1.10rc1-linux-amd64` + +### Streams + +- Download binary files from https://github.com/s12v/awsbeats/releases +- Add to `filebeats.yml`: +``` +output.streams: + region: eu-central-1 + stream_name: test1 # Your stream name +``` +- Run filebeat with plugin `./filebeat-v6.1.3-go1.10rc1-linux-amd64 -plugin kinesis.so-0.0.3-v6.1.3-go1.10rc1-linux-amd64` ## AWS authentication @@ -38,7 +52,7 @@ make BEATS_VERSION=v6.1.3 In `target/` you will find filebeat and plugin, for example: ``` filebeat-v6.1.3-go1.10rc1-linux-amd64 -firehose.so-1-snapshot-v6.1.3-go1.10rc1-linux-amd64 +kinesis.so-1-snapshot-v6.1.3-go1.10rc1-linux-amd64 ``` ## Output buffering diff --git a/firehose/client.go b/firehose/client.go index 7345310..ff367dd 100644 --- a/firehose/client.go +++ b/firehose/client.go @@ -1,15 +1,15 @@ package firehose import ( - "time" - "github.com/aws/aws-sdk-go/service/firehose" "github.com/aws/aws-sdk-go/aws/session" - "github.com/elastic/beats/libbeat/publisher" + "github.com/aws/aws-sdk-go/service/firehose" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/codec" - "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/outputs/codec/json" + "github.com/elastic/beats/libbeat/publisher" + "time" ) type client struct { diff --git a/firehose/client_test.go b/firehose/client_test.go index 40d4d26..eaf1f56 100644 --- a/firehose/client_test.go +++ b/firehose/client_test.go @@ -1,9 +1,9 @@ package firehose import ( - "testing" - "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/publisher" + "testing" ) type MockCodec struct { diff --git a/firehose/config.go b/firehose/config.go index 3572892..afcb84d 100644 --- a/firehose/config.go +++ b/firehose/config.go @@ -1,8 +1,8 @@ package firehose import ( - "time" "errors" + "time" ) type FirehoseConfig struct { diff --git a/firehose/firehose.go b/firehose/firehose.go index 25f37dd..36e3fd4 100644 --- a/firehose/firehose.go +++ b/firehose/firehose.go @@ -1,11 +1,11 @@ package firehose import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/outputs" - "github.com/elastic/beats/libbeat/beat" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/aws" ) var ( diff --git a/plugins/firehose/main.go b/plugins/kinesis/main.go similarity index 74% rename from plugins/firehose/main.go rename to plugins/kinesis/main.go index e20ca93..272dfc5 100644 --- a/plugins/firehose/main.go +++ b/plugins/kinesis/main.go @@ -4,8 +4,10 @@ import ( "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/plugin" "github.com/s12v/awsbeats/firehose" + "github.com/s12v/awsbeats/streams" ) var Bundle = plugin.Bundle( outputs.Plugin("firehose", firehose.New), + outputs.Plugin("streams", streams.New), ) diff --git a/streams/client.go b/streams/client.go new file mode 100644 index 0000000..f658ed4 --- /dev/null +++ b/streams/client.go @@ -0,0 +1,133 @@ +package streams + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs" + "github.com/elastic/beats/libbeat/outputs/codec" + "github.com/elastic/beats/libbeat/outputs/codec/json" + "github.com/elastic/beats/libbeat/publisher" + "time" +) + +type client struct { + streams *kinesis.Kinesis + streamName string + partitionKey string + beatName string + encoder codec.Codec + timeout time.Duration + observer outputs.Observer +} + +func newClient(sess *session.Session, config *StreamsConfig, observer outputs.Observer, beat beat.Info) (*client, error) { + client := &client{ + streams: kinesis.New(sess), + streamName: config.DeliveryStreamName, + partitionKey: config.PartitionKey, + beatName: beat.Beat, + encoder: json.New(false, beat.Version), + timeout: config.Timeout, + observer: observer, + } + + return client, nil +} + +func (client *client) Close() error { + return nil +} + +func (client *client) Connect() error { + return nil +} + +func (client *client) Publish(batch publisher.Batch) error { + events := batch.Events() + observer := client.observer + observer.NewBatch(len(events)) + + records, dropped := client.mapEvents(events) + res, err := client.sendRecords(records) + if err != nil { + logp.Critical("Unable to send batch: %v", err) + observer.Dropped(len(events)) + return err + } + + processFailedDeliveries(res, batch) + batch.ACK() + logp.Debug("kinesis", "Sent %d records", len(events)) + observer.Dropped(dropped) + observer.Acked(len(events) - dropped) + return nil +} + +func (client *client) mapEvents(events []publisher.Event) ([]*kinesis.PutRecordsRequestEntry, int) { + dropped := 0 + records := make([]*kinesis.PutRecordsRequestEntry, 0, len(events)) + for _, event := range events { + record, err := client.mapEvent(&event) + if err != nil { + dropped++ + } else { + records = append(records, record) + } + } + + return records, dropped +} + +func (client *client) mapEvent(event *publisher.Event) (*kinesis.PutRecordsRequestEntry, error) { + serializedEvent, err := client.encoder.Encode(client.beatName, &event.Content) + if err != nil { + if !event.Guaranteed() { + return nil, err + } + + logp.Critical("Unable to encode event: %v", err) + return nil, err + } + + rawPartitionKey, err := event.Content.GetValue(client.partitionKey) + if err != nil { + return nil, fmt.Errorf("failed to get parition key: %v", err) + } + + partitionKey, ok := rawPartitionKey.(string) + if !ok { + return nil, fmt.Errorf("failed to get partition key: %s(=%v) is found, but not a string", client.partitionKey, rawPartitionKey) + } + + return &kinesis.PutRecordsRequestEntry{Data: serializedEvent, PartitionKey: aws.String(partitionKey)}, nil +} +func (client *client) sendRecords(records []*kinesis.PutRecordsRequestEntry) (*kinesis.PutRecordsOutput, error) { + request := kinesis.PutRecordsInput{ + StreamName: &client.streamName, + Records: records, + } + return client.streams.PutRecords(&request) +} + +func processFailedDeliveries(res *kinesis.PutRecordsOutput, batch publisher.Batch) { + if *res.FailedRecordCount > 0 { + events := batch.Events() + failedEvents := make([]publisher.Event, 0) + records := res.Records + for i, r := range records { + if *r.ErrorCode != "" { + failedEvents = append(failedEvents, events[i]) + } + } + + if len(failedEvents) > 0 { + logp.Warn("Retrying %d events", len(failedEvents)) + batch.RetryEvents(failedEvents) + return + } + } +} diff --git a/streams/client_test.go b/streams/client_test.go new file mode 100644 index 0000000..a5fc678 --- /dev/null +++ b/streams/client_test.go @@ -0,0 +1,54 @@ +package streams + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/publisher" + "testing" +) + +type MockCodec struct { +} + +func (mock MockCodec) Encode(index string, event *beat.Event) ([]byte, error) { + return []byte("boom"), nil +} + +func TestMapEvent(t *testing.T) { + fieldForPartitionKey := "mypartitionkey" + expectedPartitionKey := "foobar" + client := client{encoder: MockCodec{}, partitionKey: fieldForPartitionKey} + event := &publisher.Event{Content: beat.Event{Fields: common.MapStr{fieldForPartitionKey: expectedPartitionKey}}} + record, err := client.mapEvent(event) + + if err != nil { + t.Fatalf("uenxpected error: %v", err) + } + + if string(record.Data) != "boom" { + t.Errorf("Unexpected data: %s", record.Data) + } + + actualPartitionKey := aws.StringValue(record.PartitionKey) + if actualPartitionKey != expectedPartitionKey { + t.Errorf("unexpected partition key: %s", actualPartitionKey) + } +} + +func TestMapEvents(t *testing.T) { + fieldForPartitionKey := "mypartitionkey" + expectedPartitionKey := "foobar" + client := client{encoder: MockCodec{}, partitionKey: fieldForPartitionKey} + event := publisher.Event{Content: beat.Event{Fields: common.MapStr{fieldForPartitionKey: expectedPartitionKey}}} + events := []publisher.Event{event} + records, _ := client.mapEvents(events) + + if len(records) != 1 { + t.Errorf("Expected 1 records, got %v", len(records)) + } + + if string(records[0].Data) != "boom" { + t.Errorf("Unexpected data %s", records[0].Data) + } +} diff --git a/streams/config.go b/streams/config.go new file mode 100644 index 0000000..d4651a8 --- /dev/null +++ b/streams/config.go @@ -0,0 +1,54 @@ +package streams + +import ( + "errors" + "time" +) + +type StreamsConfig struct { + Region string `config:"region"` + DeliveryStreamName string `config:"stream_name"` + PartitionKey string `config:"partition_key"` + BatchSize int `config:"batch_size"` + MaxRetries int `config:"max_retries"` + Timeout time.Duration `config:"timeout"` + Backoff backoff `config:"backoff"` +} + +type backoff struct { + Init time.Duration + Max time.Duration +} + +const ( + defaultBatchSize = 50 + // As per https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords + maxBatchSize = 500 +) + +var ( + defaultConfig = StreamsConfig{ + Timeout: 90 * time.Second, + MaxRetries: 3, + Backoff: backoff{ + Init: 1 * time.Second, + Max: 60 * time.Second, + }, + } +) + +func (c *StreamsConfig) Validate() error { + if c.Region == "" { + return errors.New("region is not defined") + } + + if c.DeliveryStreamName == "" { + return errors.New("stream_name is not defined") + } + + if c.BatchSize > maxBatchSize || c.BatchSize < 1 { + return errors.New("invalid batch size") + } + + return nil +} diff --git a/streams/config_test.go b/streams/config_test.go new file mode 100644 index 0000000..22ecea9 --- /dev/null +++ b/streams/config_test.go @@ -0,0 +1,43 @@ +package streams + +import "testing" + +func TestValidate(t *testing.T) { + config := &StreamsConfig{} + err := config.Validate() + if err == nil { + t.Errorf("Expected an error") + } +} + +func TestValidateWithRegion(t *testing.T) { + config := &StreamsConfig{Region: "eu-central-1"} + err := config.Validate() + if err == nil { + t.Errorf("Expected an error") + } +} + +func TestValidateWithRegionAndStreamNameAndBatchSize(t *testing.T) { + config := &StreamsConfig{Region: "eu-central-1", DeliveryStreamName: "foo", BatchSize: 50} + err := config.Validate() + if err != nil { + t.Errorf("Unexpected error: %v", err) + } +} + +func TestValidateWithRegionAndStreamNameAndInvalidBatchSize501(t *testing.T) { + config := &StreamsConfig{Region: "eu-central-1", DeliveryStreamName: "foo", BatchSize: 501} + err := config.Validate() + if err == nil { + t.Errorf("Expected an error") + } +} + +func TestValidateWithRegionAndStreamNameAndInvalidBatchSize0(t *testing.T) { + config := &StreamsConfig{Region: "eu-central-1", DeliveryStreamName: "foo", BatchSize: 0} + err := config.Validate() + if err == nil { + t.Errorf("Expected an error") + } +} diff --git a/streams/streams.go b/streams/streams.go new file mode 100644 index 0000000..43bcfc3 --- /dev/null +++ b/streams/streams.go @@ -0,0 +1,39 @@ +package streams + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/outputs" +) + +var ( + newClientFunc = newClient + awsNewSession = session.NewSession +) + +func New( + beat beat.Info, + stats outputs.Observer, + cfg *common.Config, +) (outputs.Group, error) { + if !cfg.HasField("batch_size") { + cfg.SetInt("batch_size", -1, defaultBatchSize) + } + + config := defaultConfig + if err := cfg.Unpack(&config); err != nil { + return outputs.Fail(err) + } + + var client outputs.NetworkClient + sess := session.Must(awsNewSession(&aws.Config{Region: aws.String(config.Region)})) + client, err := newClientFunc(sess, &config, stats, beat) + if err != nil { + return outputs.Fail(err) + } + + client = outputs.WithBackoff(client, config.Backoff.Init, config.Backoff.Max) + return outputs.Success(config.BatchSize, config.MaxRetries, client) +} From 457c55747eb2346e2be9ecd0421dc3f75c720416 Mon Sep 17 00:00:00 2001 From: Yusuke KUOKA Date: Wed, 2 May 2018 22:11:28 +0900 Subject: [PATCH 2/2] Try to fix the integration test on Travis --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 159ca29..b5f251a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,7 +18,7 @@ script: fi - | FB=$(ls target/filebeat* | tail -1) - FH=$(ls target/firehose* | tail -1) + FH=$(ls target/kinesis* | tail -1) echo "$FB -plugin $FH -c example/filebeat.yml --once" $FB -plugin $FH -c example/filebeat.yml --once