Skip to content

Commit

Permalink
Extract all the fields for a Kinesis Record and not just the metadata (
Browse files Browse the repository at this point in the history
…elastic#11141)

Add data field which contains raw data of the events and information
about the partition, the sequence and the schema.

Add a test to ensure the tranformation is OK from a KinesisRecord to a
beat.Event.

(cherry picked from commit 19c7699)
  • Loading branch information
ph committed Mar 8, 2019
1 parent 6b678be commit b6ea17c
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 8 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ https://github.com/elastic/beats/compare/v6.6.0...6.x[Check the HEAD diff]

- The CLI will now log CloudFormation Stack events. {issue}8912[8912]
- Mark Functionbeat as GA. {pull}10564[10564]

- Correctly normalize Cloudformation resource name. {issue}10087[10087]
- Functionbeat can now deploy a function for Kinesis. {10116}10116[10116]
- Allow functionbeat to use the keystore. {issue}9009[9009]
- Correctly extract Kinesis Data field from the Kinesis Record. {pull}11141[11141]

==== Bugfixes

Expand Down
19 changes: 12 additions & 7 deletions x-pack/functionbeat/provider/aws/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,24 @@ func APIGatewayProxyRequest(request events.APIGatewayProxyRequest) beat.Event {
}

// KinesisEvent takes a kinesis event and create multiples beat events.
// DOCS: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html
func KinesisEvent(request events.KinesisEvent) []beat.Event {
events := make([]beat.Event, len(request.Records))
for idx, record := range request.Records {
events[idx] = beat.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
"event_id": record.EventID,
"event_name": record.EventName,
"event_source": record.EventSource,
"event_source_arn": record.EventSourceArn,
"event_version": record.EventVersion,
"aws_region": record.AwsRegion,
// TODO: more meta data at KinesisRecord, need to check doc
"event_id": record.EventID,
"event_name": record.EventName,
"event_source": record.EventSource,
"event_source_arn": record.EventSourceArn,
"event_version": record.EventVersion,
"aws_region": record.AwsRegion,
"message": string(record.Kinesis.Data),
"kinesis_partition_key": record.Kinesis.PartitionKey,
"kinesis_schema_version": record.Kinesis.KinesisSchemaVersion,
"kinesis_sequence_number": record.Kinesis.SequenceNumber,
"kinesis_encryption_type": record.Kinesis.EncryptionType,
},
}
}
Expand Down
55 changes: 55 additions & 0 deletions x-pack/functionbeat/provider/aws/transformer/transformer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package transformer

import (
"testing"

"github.com/aws/aws-lambda-go/events"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/common"
)

func TestKinesis(t *testing.T) {
request := events.KinesisEvent{
Records: []events.KinesisEventRecord{
events.KinesisEventRecord{
AwsRegion: "us-east-1",
EventID: "1234",
EventName: "connect",
EventSource: "web",
EventVersion: "1.0",
EventSourceArn: "arn:aws:iam::00000000:role/functionbeat",
Kinesis: events.KinesisRecord{
Data: []byte("hello world"),
PartitionKey: "abc123",
SequenceNumber: "12345",
KinesisSchemaVersion: "1.0",
EncryptionType: "test",
},
},
},
}

events := KinesisEvent(request)
assert.Equal(t, 1, len(events))

fields := common.MapStr{
"event_id": "1234",
"event_name": "connect",
"event_source": "web",
"event_source_arn": "arn:aws:iam::00000000:role/functionbeat",
"event_version": "1.0",
"aws_region": "us-east-1",
"message": "hello world",
"kinesis_partition_key": "abc123",
"kinesis_schema_version": "1.0",
"kinesis_sequence_number": "12345",
"kinesis_encryption_type": "test",
}

assert.Equal(t, fields, events[0].Fields)
}

0 comments on commit b6ea17c

Please sign in to comment.