Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Kinesis Data Streams support #7

Merged
merged 2 commits into from
May 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ script:
fi
- |
FB=$(ls target/filebeat* | tail -1)
FH=$(ls target/firehose* | tail -1)
FH=$(ls target/kinesis* | tail -1)
echo "$FB -plugin $FH -c example/filebeat.yml --once"
$FB -plugin $FH -c example/filebeat.yml --once

Expand Down
12 changes: 9 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,18 @@ AWSBEATS_VERSION ?= "1-snapshot"
all: test beats build

test:
test -z "$$(find . -path ./vendor -prune -type f -o -name '*.go' -exec gofmt -d {} + | tee /dev/stderr)"
go test ./firehose -v -coverprofile=coverage.txt -covermode=atomic
go test ./streams -v -coverprofile=coverage.txt -covermode=atomic

build:
go build -buildmode=plugin ./plugins/firehose
format:
test -z "$$(find . -path ./vendor -prune -type f -o -name '*.go' -exec gofmt -d {} + | tee /dev/stderr)" || \
test -z "$$(find . -path ./vendor -prune -type f -o -name '*.go' -exec gofmt -w {} + | tee /dev/stderr)"

build: format
go build -buildmode=plugin ./plugins/kinesis
@mkdir -p "$(CURDIR)/target"
@mv firehose.so "$(CURDIR)/target/firehose-$(AWSBEATS_VERSION)-$(BEATS_VERSION)-$(GO_VERSION).so"
@mv kinesis.so "$(CURDIR)/target/kinesis-$(AWSBEATS_VERSION)-$(BEATS_VERSION)-$(GO_VERSION).so"

beats:
ifdef BEATS_VERSION
Expand Down
18 changes: 16 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,27 @@ __NOTE: Filebeat and plugin should be built using the same Golang version.__
## Quick start

- Download binary files from https://github.com/s12v/awsbeats/releases

### Firehose

- Add to `filebeats.yml`:
```
output.firehose:
region: eu-central-1
stream_name: test1 # Your delivery stream name
```
- Run filebeat with plugin `./filebeat-v6.1.3-go1.10rc1-linux-amd64 -plugin firehose.so-0.0.3-v6.1.3-go1.10rc1-linux-amd64`
- Run filebeat with plugin `./filebeat-v6.1.3-go1.10rc1-linux-amd64 -plugin kinesis.so-0.0.3-v6.1.3-go1.10rc1-linux-amd64`

### Streams

- Download binary files from https://github.com/s12v/awsbeats/releases
- Add to `filebeats.yml`:
```
output.streams:
region: eu-central-1
stream_name: test1 # Your stream name
```
- Run filebeat with plugin `./filebeat-v6.1.3-go1.10rc1-linux-amd64 -plugin kinesis.so-0.0.3-v6.1.3-go1.10rc1-linux-amd64`

## AWS authentication

Expand All @@ -38,7 +52,7 @@ make BEATS_VERSION=v6.1.3
In `target/` you will find filebeat and plugin, for example:
```
filebeat-v6.1.3-go1.10rc1-linux-amd64
firehose.so-1-snapshot-v6.1.3-go1.10rc1-linux-amd64
kinesis.so-1-snapshot-v6.1.3-go1.10rc1-linux-amd64
```

## Output buffering
Expand Down
8 changes: 4 additions & 4 deletions firehose/client.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package firehose

import (
"time"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/elastic/beats/libbeat/publisher"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/codec"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/outputs/codec/json"
"github.com/elastic/beats/libbeat/publisher"
"time"
)

type client struct {
Expand Down
4 changes: 2 additions & 2 deletions firehose/client_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package firehose

import (
"testing"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/publisher"
"testing"
)

type MockCodec struct {
Expand Down
2 changes: 1 addition & 1 deletion firehose/config.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package firehose

import (
"time"
"errors"
"time"
)

type FirehoseConfig struct {
Expand Down
6 changes: 3 additions & 3 deletions firehose/firehose.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package firehose

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/beat"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/aws"
)

var (
Expand Down
2 changes: 2 additions & 0 deletions plugins/firehose/main.go → plugins/kinesis/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/plugin"
"github.com/s12v/awsbeats/firehose"
"github.com/s12v/awsbeats/streams"
)

var Bundle = plugin.Bundle(
outputs.Plugin("firehose", firehose.New),
outputs.Plugin("streams", streams.New),
)
133 changes: 133 additions & 0 deletions streams/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package streams

import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/codec"
"github.com/elastic/beats/libbeat/outputs/codec/json"
"github.com/elastic/beats/libbeat/publisher"
"time"
)

type client struct {
streams *kinesis.Kinesis
streamName string
partitionKey string
beatName string
encoder codec.Codec
timeout time.Duration
observer outputs.Observer
}

func newClient(sess *session.Session, config *StreamsConfig, observer outputs.Observer, beat beat.Info) (*client, error) {
client := &client{
streams: kinesis.New(sess),
streamName: config.DeliveryStreamName,
partitionKey: config.PartitionKey,
beatName: beat.Beat,
encoder: json.New(false, beat.Version),
timeout: config.Timeout,
observer: observer,
}

return client, nil
}

func (client *client) Close() error {
return nil
}

func (client *client) Connect() error {
return nil
}

func (client *client) Publish(batch publisher.Batch) error {
events := batch.Events()
observer := client.observer
observer.NewBatch(len(events))

records, dropped := client.mapEvents(events)
res, err := client.sendRecords(records)
if err != nil {
logp.Critical("Unable to send batch: %v", err)
observer.Dropped(len(events))
return err
}

processFailedDeliveries(res, batch)
batch.ACK()
logp.Debug("kinesis", "Sent %d records", len(events))
observer.Dropped(dropped)
observer.Acked(len(events) - dropped)
return nil
}

func (client *client) mapEvents(events []publisher.Event) ([]*kinesis.PutRecordsRequestEntry, int) {
dropped := 0
records := make([]*kinesis.PutRecordsRequestEntry, 0, len(events))
for _, event := range events {
record, err := client.mapEvent(&event)
if err != nil {
dropped++
} else {
records = append(records, record)
}
}

return records, dropped
}

func (client *client) mapEvent(event *publisher.Event) (*kinesis.PutRecordsRequestEntry, error) {
serializedEvent, err := client.encoder.Encode(client.beatName, &event.Content)
if err != nil {
if !event.Guaranteed() {
return nil, err
}

logp.Critical("Unable to encode event: %v", err)
return nil, err
}

rawPartitionKey, err := event.Content.GetValue(client.partitionKey)
if err != nil {
return nil, fmt.Errorf("failed to get parition key: %v", err)
}

partitionKey, ok := rawPartitionKey.(string)
if !ok {
return nil, fmt.Errorf("failed to get partition key: %s(=%v) is found, but not a string", client.partitionKey, rawPartitionKey)
}

return &kinesis.PutRecordsRequestEntry{Data: serializedEvent, PartitionKey: aws.String(partitionKey)}, nil
}
func (client *client) sendRecords(records []*kinesis.PutRecordsRequestEntry) (*kinesis.PutRecordsOutput, error) {
request := kinesis.PutRecordsInput{
StreamName: &client.streamName,
Records: records,
}
return client.streams.PutRecords(&request)
}

func processFailedDeliveries(res *kinesis.PutRecordsOutput, batch publisher.Batch) {
if *res.FailedRecordCount > 0 {
events := batch.Events()
failedEvents := make([]publisher.Event, 0)
records := res.Records
for i, r := range records {
if *r.ErrorCode != "" {
failedEvents = append(failedEvents, events[i])
}
}

if len(failedEvents) > 0 {
logp.Warn("Retrying %d events", len(failedEvents))
batch.RetryEvents(failedEvents)
return
}
}
}
54 changes: 54 additions & 0 deletions streams/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package streams

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/publisher"
"testing"
)

type MockCodec struct {
}

func (mock MockCodec) Encode(index string, event *beat.Event) ([]byte, error) {
return []byte("boom"), nil
}

func TestMapEvent(t *testing.T) {
fieldForPartitionKey := "mypartitionkey"
expectedPartitionKey := "foobar"
client := client{encoder: MockCodec{}, partitionKey: fieldForPartitionKey}
event := &publisher.Event{Content: beat.Event{Fields: common.MapStr{fieldForPartitionKey: expectedPartitionKey}}}
record, err := client.mapEvent(event)

if err != nil {
t.Fatalf("uenxpected error: %v", err)
}

if string(record.Data) != "boom" {
t.Errorf("Unexpected data: %s", record.Data)
}

actualPartitionKey := aws.StringValue(record.PartitionKey)
if actualPartitionKey != expectedPartitionKey {
t.Errorf("unexpected partition key: %s", actualPartitionKey)
}
}

func TestMapEvents(t *testing.T) {
fieldForPartitionKey := "mypartitionkey"
expectedPartitionKey := "foobar"
client := client{encoder: MockCodec{}, partitionKey: fieldForPartitionKey}
event := publisher.Event{Content: beat.Event{Fields: common.MapStr{fieldForPartitionKey: expectedPartitionKey}}}
events := []publisher.Event{event}
records, _ := client.mapEvents(events)

if len(records) != 1 {
t.Errorf("Expected 1 records, got %v", len(records))
}

if string(records[0].Data) != "boom" {
t.Errorf("Unexpected data %s", records[0].Data)
}
}
54 changes: 54 additions & 0 deletions streams/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package streams

import (
"errors"
"time"
)

type StreamsConfig struct {
Region string `config:"region"`
DeliveryStreamName string `config:"stream_name"`
PartitionKey string `config:"partition_key"`
BatchSize int `config:"batch_size"`
MaxRetries int `config:"max_retries"`
Timeout time.Duration `config:"timeout"`
Backoff backoff `config:"backoff"`
}

type backoff struct {
Init time.Duration
Max time.Duration
}

const (
defaultBatchSize = 50
// As per https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords
maxBatchSize = 500
)

var (
defaultConfig = StreamsConfig{
Timeout: 90 * time.Second,
MaxRetries: 3,
Backoff: backoff{
Init: 1 * time.Second,
Max: 60 * time.Second,
},
}
)

func (c *StreamsConfig) Validate() error {
if c.Region == "" {
return errors.New("region is not defined")
}

if c.DeliveryStreamName == "" {
return errors.New("stream_name is not defined")
}

if c.BatchSize > maxBatchSize || c.BatchSize < 1 {
return errors.New("invalid batch size")
}

return nil
}
Loading