Skip to content

Commit

Permalink
feat(lambda-event-sources) add DynamoEventSource (#1563)
Browse files Browse the repository at this point in the history
Add a Dynamo EventSource for consuming a table's stream from Lambda.
  • Loading branch information
Sam Goodwin authored Jan 23, 2019
1 parent 9f8bfa5 commit d8207e3
Show file tree
Hide file tree
Showing 9 changed files with 497 additions and 2 deletions.
43 changes: 43 additions & 0 deletions packages/@aws-cdk/aws-dynamodb/lib/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ const READ_DATA_ACTIONS = [
'dynamodb:Scan'
];

const READ_STREAM_DATA_ACTIONS = [
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
];

const WRITE_DATA_ACTIONS = [
'dynamodb:BatchWriteItem',
'dynamodb:PutItem',
Expand Down Expand Up @@ -174,6 +180,18 @@ export interface LocalSecondaryIndexProps extends SecondaryIndexProps {
* Provides a DynamoDB table.
*/
export class Table extends Construct {
/**
* Permits an IAM Principal to list all DynamoDB Streams.
* @param principal The principal (no-op if undefined)
*/
public static grantListStreams(principal?: iam.IPrincipal): void {
if (principal) {
principal.addToPolicy(new iam.PolicyStatement()
.addAction('dynamodb:ListStreams')
.addResource("*"));
}
}

public readonly tableArn: string;
public readonly tableName: string;
public readonly tableStreamArn: string;
Expand Down Expand Up @@ -433,6 +451,21 @@ export class Table extends Construct {
.addActions(...actions));
}

/**
* Adds an IAM policy statement associated with this table's stream to an
* IAM principal's policy.
* @param principal The principal (no-op if undefined)
* @param actions The set of actions to allow (i.e. "dynamodb:DescribeStream", "dynamodb:GetRecords", ...)
*/
public grantStream(principal?: iam.IPrincipal, ...actions: string[]) {
if (!principal) {
return;
}
principal.addToPolicy(new iam.PolicyStatement()
.addResource(this.tableStreamArn)
.addActions(...actions));
}

/**
* Permits an IAM principal all data read operations from this table:
* BatchGetItem, GetRecords, GetShardIterator, Query, GetItem, Scan.
Expand All @@ -442,6 +475,16 @@ export class Table extends Construct {
this.grant(principal, ...READ_DATA_ACTIONS);
}

/**
* Permis an IAM principal all stream data read operations for this
* table's stream:
* DescribeStream, GetRecords, GetShardIterator, ListStreams.
* @param principal The principal to grant access to
*/
public grantStreamRead(principal?: iam.IPrincipal) {
this.grantStream(principal, ...READ_STREAM_DATA_ACTIONS);
}

/**
* Permits an IAM principal all data write operations to this table:
* BatchWriteItem, PutItem, UpdateItem, DeleteItem.
Expand Down
69 changes: 67 additions & 2 deletions packages/@aws-cdk/aws-dynamodb/test/test.dynamodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1355,6 +1355,71 @@ export = {
testGrant(test, [ '*' ], (p, t) => t.grantFullAccess(p));
},

'"Table.grantListStreams" allows principal to list all streams'(test: Test) {
// GIVEN
const stack = new Stack();
const user = new iam.User(stack, 'user');

// WHEN
Table.grantListStreams(user);

// THEN
expect(stack).to(haveResource('AWS::IAM::Policy', {
"PolicyDocument": {
"Statement": [
{
"Action": "dynamodb:ListStreams",
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
},
"Users": [ { "Ref": "user2C2B57AE" } ]
}));
test.done();
},

'"grantStreamRead" allows principal to read and describe the table stream"'(test: Test) {
// GIVEN
const stack = new Stack();
const table = new Table(stack, 'my-table', {
partitionKey: {
name: 'id',
type: AttributeType.String
},
streamSpecification: StreamViewType.NewImage
});
const user = new iam.User(stack, 'user');

// WHEN
table.grantStreamRead(user);

// THEN
expect(stack).to(haveResource('AWS::IAM::Policy', {
"PolicyDocument": {
"Statement": [
{
"Action": [
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator"
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"mytable0324D45C",
"StreamArn"
]
}
}
],
"Version": "2012-10-17"
},
"Users": [ { "Ref": "user2C2B57AE" } ]
}));
test.done();
},
'if table has an index grant gives access to the index'(test: Test) {
// GIVEN
const stack = new Stack();
Expand Down Expand Up @@ -1389,10 +1454,10 @@ export = {
],
"Version": "2012-10-17"
},
"Users": [ { "Ref": "user2C2B57AE" } ]
}));
test.done();
},

}
},
};

Expand Down
27 changes: 27 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,33 @@ times. After three tries, if Amazon SNS still could not successfully invoke the
Lambda function, then Amazon SNS will send a delivery status failure message to
CloudWatch.
### DynamoDB Streams
You can write Lambda functions to process change events from a DynamoDB Table. An event is emitted to a DynamoDB stream (if configured) whenever a write (Put, Delete, Update)
operation is performed against the table. See [Using AWS Lambda with Amazon DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html) for more information.
To process events with a Lambda function, first create or update a DynamoDB table and enable a `streamSpecification` configuration. Then, create a `DynamoEventSource`
and add it to your Lambda function. The following parameters will impact Amazon DynamoDB's polling behavior:
* __batchSize__: Determines how many records are buffered before invoking your lambnda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low).
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all reocrds that arrived prior to attaching the event source.
```ts
import dynamodb = require('@aws-cdk/aws-dynamodb');
import lambda = require('@aws-cdk/aws-lambda');
import { DynamoEventSource } from '@aws-cdk/aws-lambda-event-sources';

const table = new dynamodb.Table(..., {
partitionKey: ...,
streamSpecification: dynamodb.StreamViewType.NewImage // make sure stream is configured
});

const function = new lambda.Function(...);
function.addEventSource(new DynamoEventSource(table, {
startingPosition: lambda.StartingPosition.TrimHorizon
}));
```
### Kinesis
You can write Lambda functions to process streaming data in Amazon Kinesis Streams. For more information about Amazon SQS, see [Amazon Kinesis
Expand Down
43 changes: 43 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/dynamodb.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import dynamodb = require('@aws-cdk/aws-dynamodb');
import lambda = require('@aws-cdk/aws-lambda');

export interface DynamoEventSourceProps {
/**
* The largest number of records that AWS Lambda will retrieve from your event
* source at the time of invoking your function. Your function receives an
* event with all the retrieved records.
*
* Valid Range: Minimum value of 1. Maximum value of 1000.
*
* @default 100
*/
batchSize?: number;

/**
* Where to begin consuming the DynamoDB stream.
*/
startingPosition: lambda.StartingPosition;
}

/**
* Use an Amazon DynamoDB stream as an event source for AWS Lambda.
*/
export class DynamoEventSource implements lambda.IEventSource {
constructor(private readonly table: dynamodb.Table, private readonly props: DynamoEventSourceProps) {
if (this.props.batchSize !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 1000)) {
throw new Error(`Maximum batch size must be between 1 and 1000 inclusive (given ${this.props.batchSize})`);
}
}

public bind(target: lambda.FunctionBase) {
new lambda.EventSourceMapping(target, `DynamoDBEventSource:${this.table.node.uniqueId}`, {
target,
batchSize: this.props.batchSize || 100,
eventSourceArn: this.table.tableStreamArn,
startingPosition: this.props.startingPosition
});

this.table.grantStreamRead(target.role);
dynamodb.Table.grantListStreams(target.role);
}
}
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './dynamodb';
export * from './kinesis';
export * from './sqs';
export * from './s3';
Expand Down
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"pkglint": "^0.22.0"
},
"dependencies": {
"@aws-cdk/aws-dynamodb": "^0.22.0",
"@aws-cdk/aws-events": "^0.22.0",
"@aws-cdk/aws-iam": "^0.22.0",
"@aws-cdk/aws-kinesis": "^0.22.0",
Expand All @@ -67,6 +68,7 @@
},
"homepage": "https://github.com/awslabs/aws-cdk",
"peerDependencies": {
"@aws-cdk/aws-dynamodb": "^0.22.0",
"@aws-cdk/aws-kinesis": "^0.22.0",
"@aws-cdk/aws-lambda": "^0.22.0",
"@aws-cdk/aws-s3": "^0.22.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
{
"Resources": {
"FServiceRole3AC82EE1": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
},
"ManagedPolicyArns": [
{
"Fn::Join": [
"",
[
"arn:",
{
"Ref": "AWS::Partition"
},
":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
]
]
}
]
}
},
"FServiceRoleDefaultPolicy17A19BFA": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator"
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"TD925BC7E",
"StreamArn"
]
}
},
{
"Action": "dynamodb:ListStreams",
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
},
"PolicyName": "FServiceRoleDefaultPolicy17A19BFA",
"Roles": [
{
"Ref": "FServiceRole3AC82EE1"
}
]
}
},
"FC4345940": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"ZipFile": "exports.handler = async function handler(event) {\n console.log('event:', JSON.stringify(event, undefined, 2));\n return { event };\n}"
},
"Handler": "index.handler",
"Role": {
"Fn::GetAtt": [
"FServiceRole3AC82EE1",
"Arn"
]
},
"Runtime": "nodejs8.10"
},
"DependsOn": [
"FServiceRole3AC82EE1",
"FServiceRoleDefaultPolicy17A19BFA"
]
},
"FDynamoDBEventSourcelambdaeventsourcedynamodbT7967476AE652DA48": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"EventSourceArn": {
"Fn::GetAtt": [
"TD925BC7E",
"StreamArn"
]
},
"FunctionName": {
"Ref": "FC4345940"
},
"BatchSize": 5,
"StartingPosition": "TRIM_HORIZON"
}
},
"TD925BC7E": {
"Type": "AWS::DynamoDB::Table",
"Properties": {
"KeySchema": [
{
"AttributeName": "id",
"KeyType": "HASH"
}
],
"AttributeDefinitions": [
{
"AttributeName": "id",
"AttributeType": "S"
}
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
},
"StreamSpecification": {
"StreamViewType": "NEW_IMAGE"
}
}
}
}
}
Loading

0 comments on commit d8207e3

Please sign in to comment.