Skip to content

Commit

Permalink
Kinesis Data Streams support (#7)
Browse files Browse the repository at this point in the history
* feat: Kinesis Data Streams support

Makes following changes to resolve #5

- Add `streams` package for the Kinesis Data Streams output plugin
- Updated `plugins/firehose/main.go` to also include the brand-new Streams plugin
- Rename `plugins/firehose` to `plugins/kinesis`, and `firehose.so` to `kinesis.so` according to the scope of the plugin
- Update README accordingly

* Try to fix the integration test on Travis
  • Loading branch information
mumoshu authored and Sergey Novikov committed May 10, 2018
1 parent 1653a6a commit 7ed4a0e
Show file tree
Hide file tree
Showing 13 changed files with 361 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ script:
fi
- |
FB=$(ls target/filebeat* | tail -1)
FH=$(ls target/firehose* | tail -1)
FH=$(ls target/kinesis* | tail -1)
echo "$FB -plugin $FH -c example/filebeat.yml --once"
$FB -plugin $FH -c example/filebeat.yml --once
Expand Down
12 changes: 9 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,18 @@ AWSBEATS_VERSION ?= "1-snapshot"
all: test beats build

test:
test -z "$$(find . -path ./vendor -prune -type f -o -name '*.go' -exec gofmt -d {} + | tee /dev/stderr)"
go test ./firehose -v -coverprofile=coverage.txt -covermode=atomic
go test ./streams -v -coverprofile=coverage.txt -covermode=atomic

build:
go build -buildmode=plugin ./plugins/firehose
format:
test -z "$$(find . -path ./vendor -prune -type f -o -name '*.go' -exec gofmt -d {} + | tee /dev/stderr)" || \
test -z "$$(find . -path ./vendor -prune -type f -o -name '*.go' -exec gofmt -w {} + | tee /dev/stderr)"

build: format
go build -buildmode=plugin ./plugins/kinesis
@mkdir -p "$(CURDIR)/target"
@mv firehose.so "$(CURDIR)/target/firehose-$(AWSBEATS_VERSION)-$(BEATS_VERSION)-$(GO_VERSION).so"
@mv kinesis.so "$(CURDIR)/target/kinesis-$(AWSBEATS_VERSION)-$(BEATS_VERSION)-$(GO_VERSION).so"

beats:
ifdef BEATS_VERSION
Expand Down
18 changes: 16 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,27 @@ __NOTE: Filebeat and plugin should be built using the same Golang version.__
## Quick start

- Download binary files from https://github.com/s12v/awsbeats/releases

### Firehose

- Add to `filebeats.yml`:
```
output.firehose:
region: eu-central-1
stream_name: test1 # Your delivery stream name
```
- Run filebeat with plugin `./filebeat-v6.1.3-go1.10rc1-linux-amd64 -plugin firehose.so-0.0.3-v6.1.3-go1.10rc1-linux-amd64`
- Run filebeat with plugin `./filebeat-v6.1.3-go1.10rc1-linux-amd64 -plugin kinesis.so-0.0.3-v6.1.3-go1.10rc1-linux-amd64`

### Streams

- Download binary files from https://github.com/s12v/awsbeats/releases
- Add to `filebeats.yml`:
```
output.streams:
region: eu-central-1
stream_name: test1 # Your stream name
```
- Run filebeat with plugin `./filebeat-v6.1.3-go1.10rc1-linux-amd64 -plugin kinesis.so-0.0.3-v6.1.3-go1.10rc1-linux-amd64`

## AWS authentication

Expand All @@ -38,7 +52,7 @@ make BEATS_VERSION=v6.1.3
In `target/` you will find filebeat and plugin, for example:
```
filebeat-v6.1.3-go1.10rc1-linux-amd64
firehose.so-1-snapshot-v6.1.3-go1.10rc1-linux-amd64
kinesis.so-1-snapshot-v6.1.3-go1.10rc1-linux-amd64
```

## Output buffering
Expand Down
8 changes: 4 additions & 4 deletions firehose/client.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package firehose

import (
"time"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/elastic/beats/libbeat/publisher"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/codec"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/outputs/codec/json"
"github.com/elastic/beats/libbeat/publisher"
"time"
)

type client struct {
Expand Down
4 changes: 2 additions & 2 deletions firehose/client_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package firehose

import (
"testing"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/publisher"
"testing"
)

type MockCodec struct {
Expand Down
2 changes: 1 addition & 1 deletion firehose/config.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package firehose

import (
"time"
"errors"
"time"
)

type FirehoseConfig struct {
Expand Down
6 changes: 3 additions & 3 deletions firehose/firehose.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package firehose

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/beat"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/aws"
)

var (
Expand Down
2 changes: 2 additions & 0 deletions plugins/firehose/main.go → plugins/kinesis/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/plugin"
"github.com/s12v/awsbeats/firehose"
"github.com/s12v/awsbeats/streams"
)

var Bundle = plugin.Bundle(
outputs.Plugin("firehose", firehose.New),
outputs.Plugin("streams", streams.New),
)
133 changes: 133 additions & 0 deletions streams/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package streams

import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/codec"
"github.com/elastic/beats/libbeat/outputs/codec/json"
"github.com/elastic/beats/libbeat/publisher"
"time"
)

type client struct {
streams *kinesis.Kinesis
streamName string
partitionKey string
beatName string
encoder codec.Codec
timeout time.Duration
observer outputs.Observer
}

func newClient(sess *session.Session, config *StreamsConfig, observer outputs.Observer, beat beat.Info) (*client, error) {
client := &client{
streams: kinesis.New(sess),
streamName: config.DeliveryStreamName,
partitionKey: config.PartitionKey,
beatName: beat.Beat,
encoder: json.New(false, beat.Version),
timeout: config.Timeout,
observer: observer,
}

return client, nil
}

func (client *client) Close() error {
return nil
}

func (client *client) Connect() error {
return nil
}

func (client *client) Publish(batch publisher.Batch) error {
events := batch.Events()
observer := client.observer
observer.NewBatch(len(events))

records, dropped := client.mapEvents(events)
res, err := client.sendRecords(records)
if err != nil {
logp.Critical("Unable to send batch: %v", err)
observer.Dropped(len(events))
return err
}

processFailedDeliveries(res, batch)
batch.ACK()
logp.Debug("kinesis", "Sent %d records", len(events))
observer.Dropped(dropped)
observer.Acked(len(events) - dropped)
return nil
}

func (client *client) mapEvents(events []publisher.Event) ([]*kinesis.PutRecordsRequestEntry, int) {
dropped := 0
records := make([]*kinesis.PutRecordsRequestEntry, 0, len(events))
for _, event := range events {
record, err := client.mapEvent(&event)
if err != nil {
dropped++
} else {
records = append(records, record)
}
}

return records, dropped
}

func (client *client) mapEvent(event *publisher.Event) (*kinesis.PutRecordsRequestEntry, error) {
serializedEvent, err := client.encoder.Encode(client.beatName, &event.Content)
if err != nil {
if !event.Guaranteed() {
return nil, err
}

logp.Critical("Unable to encode event: %v", err)
return nil, err
}

rawPartitionKey, err := event.Content.GetValue(client.partitionKey)
if err != nil {
return nil, fmt.Errorf("failed to get parition key: %v", err)
}

partitionKey, ok := rawPartitionKey.(string)
if !ok {
return nil, fmt.Errorf("failed to get partition key: %s(=%v) is found, but not a string", client.partitionKey, rawPartitionKey)
}

return &kinesis.PutRecordsRequestEntry{Data: serializedEvent, PartitionKey: aws.String(partitionKey)}, nil
}
func (client *client) sendRecords(records []*kinesis.PutRecordsRequestEntry) (*kinesis.PutRecordsOutput, error) {
request := kinesis.PutRecordsInput{
StreamName: &client.streamName,
Records: records,
}
return client.streams.PutRecords(&request)
}

func processFailedDeliveries(res *kinesis.PutRecordsOutput, batch publisher.Batch) {
if *res.FailedRecordCount > 0 {
events := batch.Events()
failedEvents := make([]publisher.Event, 0)
records := res.Records
for i, r := range records {
if *r.ErrorCode != "" {
failedEvents = append(failedEvents, events[i])
}
}

if len(failedEvents) > 0 {
logp.Warn("Retrying %d events", len(failedEvents))
batch.RetryEvents(failedEvents)
return
}
}
}
54 changes: 54 additions & 0 deletions streams/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package streams

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/publisher"
"testing"
)

type MockCodec struct {
}

func (mock MockCodec) Encode(index string, event *beat.Event) ([]byte, error) {
return []byte("boom"), nil
}

func TestMapEvent(t *testing.T) {
fieldForPartitionKey := "mypartitionkey"
expectedPartitionKey := "foobar"
client := client{encoder: MockCodec{}, partitionKey: fieldForPartitionKey}
event := &publisher.Event{Content: beat.Event{Fields: common.MapStr{fieldForPartitionKey: expectedPartitionKey}}}
record, err := client.mapEvent(event)

if err != nil {
t.Fatalf("uenxpected error: %v", err)
}

if string(record.Data) != "boom" {
t.Errorf("Unexpected data: %s", record.Data)
}

actualPartitionKey := aws.StringValue(record.PartitionKey)
if actualPartitionKey != expectedPartitionKey {
t.Errorf("unexpected partition key: %s", actualPartitionKey)
}
}

func TestMapEvents(t *testing.T) {
fieldForPartitionKey := "mypartitionkey"
expectedPartitionKey := "foobar"
client := client{encoder: MockCodec{}, partitionKey: fieldForPartitionKey}
event := publisher.Event{Content: beat.Event{Fields: common.MapStr{fieldForPartitionKey: expectedPartitionKey}}}
events := []publisher.Event{event}
records, _ := client.mapEvents(events)

if len(records) != 1 {
t.Errorf("Expected 1 records, got %v", len(records))
}

if string(records[0].Data) != "boom" {
t.Errorf("Unexpected data %s", records[0].Data)
}
}
54 changes: 54 additions & 0 deletions streams/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package streams

import (
"errors"
"time"
)

type StreamsConfig struct {
Region string `config:"region"`
DeliveryStreamName string `config:"stream_name"`
PartitionKey string `config:"partition_key"`
BatchSize int `config:"batch_size"`
MaxRetries int `config:"max_retries"`
Timeout time.Duration `config:"timeout"`
Backoff backoff `config:"backoff"`
}

type backoff struct {
Init time.Duration
Max time.Duration
}

const (
defaultBatchSize = 50
// As per https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords
maxBatchSize = 500
)

var (
defaultConfig = StreamsConfig{
Timeout: 90 * time.Second,
MaxRetries: 3,
Backoff: backoff{
Init: 1 * time.Second,
Max: 60 * time.Second,
},
}
)

func (c *StreamsConfig) Validate() error {
if c.Region == "" {
return errors.New("region is not defined")
}

if c.DeliveryStreamName == "" {
return errors.New("stream_name is not defined")
}

if c.BatchSize > maxBatchSize || c.BatchSize < 1 {
return errors.New("invalid batch size")
}

return nil
}
Loading

0 comments on commit 7ed4a0e

Please sign in to comment.