diff --git a/packages/@aws-cdk/aws-dynamodb/lib/table.ts b/packages/@aws-cdk/aws-dynamodb/lib/table.ts index f857a7d8be470..9fb9d7f887c5b 100644 --- a/packages/@aws-cdk/aws-dynamodb/lib/table.ts +++ b/packages/@aws-cdk/aws-dynamodb/lib/table.ts @@ -18,6 +18,12 @@ const READ_DATA_ACTIONS = [ 'dynamodb:Scan' ]; +const READ_STREAM_DATA_ACTIONS = [ + "dynamodb:DescribeStream", + "dynamodb:GetRecords", + "dynamodb:GetShardIterator", +]; + const WRITE_DATA_ACTIONS = [ 'dynamodb:BatchWriteItem', 'dynamodb:PutItem', @@ -174,6 +180,18 @@ export interface LocalSecondaryIndexProps extends SecondaryIndexProps { * Provides a DynamoDB table. */ export class Table extends Construct { + /** + * Permits an IAM Principal to list all DynamoDB Streams. + * @param principal The principal (no-op if undefined) + */ + public static grantListStreams(principal?: iam.IPrincipal): void { + if (principal) { + principal.addToPolicy(new iam.PolicyStatement() + .addAction('dynamodb:ListStreams') + .addResource("*")); + } + } + public readonly tableArn: string; public readonly tableName: string; public readonly tableStreamArn: string; @@ -433,6 +451,21 @@ export class Table extends Construct { .addActions(...actions)); } + /** + * Adds an IAM policy statement associated with this table's stream to an + * IAM principal's policy. + * @param principal The principal (no-op if undefined) + * @param actions The set of actions to allow (i.e. "dynamodb:DescribeStream", "dynamodb:GetRecords", ...) + */ + public grantStream(principal?: iam.IPrincipal, ...actions: string[]) { + if (!principal) { + return; + } + principal.addToPolicy(new iam.PolicyStatement() + .addResource(this.tableStreamArn) + .addActions(...actions)); + } + /** * Permits an IAM principal all data read operations from this table: * BatchGetItem, GetRecords, GetShardIterator, Query, GetItem, Scan. @@ -442,6 +475,16 @@ export class Table extends Construct { this.grant(principal, ...READ_DATA_ACTIONS); } + /** + * Permis an IAM principal all stream data read operations for this + * table's stream: + * DescribeStream, GetRecords, GetShardIterator, ListStreams. + * @param principal The principal to grant access to + */ + public grantStreamRead(principal?: iam.IPrincipal) { + this.grantStream(principal, ...READ_STREAM_DATA_ACTIONS); + } + /** * Permits an IAM principal all data write operations to this table: * BatchWriteItem, PutItem, UpdateItem, DeleteItem. diff --git a/packages/@aws-cdk/aws-dynamodb/test/test.dynamodb.ts b/packages/@aws-cdk/aws-dynamodb/test/test.dynamodb.ts index 6724f9c89ed38..c487df3d2ca0e 100644 --- a/packages/@aws-cdk/aws-dynamodb/test/test.dynamodb.ts +++ b/packages/@aws-cdk/aws-dynamodb/test/test.dynamodb.ts @@ -1355,6 +1355,71 @@ export = { testGrant(test, [ '*' ], (p, t) => t.grantFullAccess(p)); }, + '"Table.grantListStreams" allows principal to list all streams'(test: Test) { + // GIVEN + const stack = new Stack(); + const user = new iam.User(stack, 'user'); + + // WHEN + Table.grantListStreams(user); + + // THEN + expect(stack).to(haveResource('AWS::IAM::Policy', { + "PolicyDocument": { + "Statement": [ + { + "Action": "dynamodb:ListStreams", + "Effect": "Allow", + "Resource": "*" + } + ], + "Version": "2012-10-17" + }, + "Users": [ { "Ref": "user2C2B57AE" } ] + })); + test.done(); + }, + + '"grantStreamRead" allows principal to read and describe the table stream"'(test: Test) { + // GIVEN + const stack = new Stack(); + const table = new Table(stack, 'my-table', { + partitionKey: { + name: 'id', + type: AttributeType.String + }, + streamSpecification: StreamViewType.NewImage + }); + const user = new iam.User(stack, 'user'); + + // WHEN + table.grantStreamRead(user); + + // THEN + expect(stack).to(haveResource('AWS::IAM::Policy', { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "dynamodb:DescribeStream", + "dynamodb:GetRecords", + "dynamodb:GetShardIterator" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "mytable0324D45C", + "StreamArn" + ] + } + } + ], + "Version": "2012-10-17" + }, + "Users": [ { "Ref": "user2C2B57AE" } ] + })); + test.done(); + }, 'if table has an index grant gives access to the index'(test: Test) { // GIVEN const stack = new Stack(); @@ -1389,10 +1454,10 @@ export = { ], "Version": "2012-10-17" }, + "Users": [ { "Ref": "user2C2B57AE" } ] })); test.done(); - }, - + } }, }; diff --git a/packages/@aws-cdk/aws-lambda-event-sources/README.md b/packages/@aws-cdk/aws-lambda-event-sources/README.md index be2f0dff65393..50c4d018a38f6 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/README.md +++ b/packages/@aws-cdk/aws-lambda-event-sources/README.md @@ -94,6 +94,33 @@ times. After three tries, if Amazon SNS still could not successfully invoke the Lambda function, then Amazon SNS will send a delivery status failure message to CloudWatch. +### DynamoDB Streams + +You can write Lambda functions to process change events from a DynamoDB Table. An event is emitted to a DynamoDB stream (if configured) whenever a write (Put, Delete, Update) +operation is performed against the table. See [Using AWS Lambda with Amazon DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html) for more information. + +To process events with a Lambda function, first create or update a DynamoDB table and enable a `streamSpecification` configuration. Then, create a `DynamoEventSource` +and add it to your Lambda function. The following parameters will impact Amazon DynamoDB's polling behavior: + +* __batchSize__: Determines how many records are buffered before invoking your lambnda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low). +* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all reocrds that arrived prior to attaching the event source. + +```ts +import dynamodb = require('@aws-cdk/aws-dynamodb'); +import lambda = require('@aws-cdk/aws-lambda'); +import { DynamoEventSource } from '@aws-cdk/aws-lambda-event-sources'; + +const table = new dynamodb.Table(..., { + partitionKey: ..., + streamSpecification: dynamodb.StreamViewType.NewImage // make sure stream is configured +}); + +const function = new lambda.Function(...); +function.addEventSource(new DynamoEventSource(table, { + startingPosition: lambda.StartingPosition.TrimHorizon +})); +``` + ### Kinesis You can write Lambda functions to process streaming data in Amazon Kinesis Streams. For more information about Amazon SQS, see [Amazon Kinesis diff --git a/packages/@aws-cdk/aws-lambda-event-sources/lib/dynamodb.ts b/packages/@aws-cdk/aws-lambda-event-sources/lib/dynamodb.ts new file mode 100644 index 0000000000000..e1b8ac388f3c3 --- /dev/null +++ b/packages/@aws-cdk/aws-lambda-event-sources/lib/dynamodb.ts @@ -0,0 +1,43 @@ +import dynamodb = require('@aws-cdk/aws-dynamodb'); +import lambda = require('@aws-cdk/aws-lambda'); + +export interface DynamoEventSourceProps { + /** + * The largest number of records that AWS Lambda will retrieve from your event + * source at the time of invoking your function. Your function receives an + * event with all the retrieved records. + * + * Valid Range: Minimum value of 1. Maximum value of 1000. + * + * @default 100 + */ + batchSize?: number; + + /** + * Where to begin consuming the DynamoDB stream. + */ + startingPosition: lambda.StartingPosition; +} + +/** + * Use an Amazon DynamoDB stream as an event source for AWS Lambda. + */ +export class DynamoEventSource implements lambda.IEventSource { + constructor(private readonly table: dynamodb.Table, private readonly props: DynamoEventSourceProps) { + if (this.props.batchSize !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 1000)) { + throw new Error(`Maximum batch size must be between 1 and 1000 inclusive (given ${this.props.batchSize})`); + } + } + + public bind(target: lambda.FunctionBase) { + new lambda.EventSourceMapping(target, `DynamoDBEventSource:${this.table.node.uniqueId}`, { + target, + batchSize: this.props.batchSize || 100, + eventSourceArn: this.table.tableStreamArn, + startingPosition: this.props.startingPosition + }); + + this.table.grantStreamRead(target.role); + dynamodb.Table.grantListStreams(target.role); + } +} diff --git a/packages/@aws-cdk/aws-lambda-event-sources/lib/index.ts b/packages/@aws-cdk/aws-lambda-event-sources/lib/index.ts index 7e6f6ab865a2b..2a1a19dc4c07f 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/lib/index.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/lib/index.ts @@ -1,3 +1,4 @@ +export * from './dynamodb'; export * from './kinesis'; export * from './sqs'; export * from './s3'; diff --git a/packages/@aws-cdk/aws-lambda-event-sources/package.json b/packages/@aws-cdk/aws-lambda-event-sources/package.json index 759a2ffb626c4..6e54db1d37db3 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/package.json +++ b/packages/@aws-cdk/aws-lambda-event-sources/package.json @@ -56,6 +56,7 @@ "pkglint": "^0.22.0" }, "dependencies": { + "@aws-cdk/aws-dynamodb": "^0.22.0", "@aws-cdk/aws-events": "^0.22.0", "@aws-cdk/aws-iam": "^0.22.0", "@aws-cdk/aws-kinesis": "^0.22.0", @@ -67,6 +68,7 @@ }, "homepage": "https://github.com/awslabs/aws-cdk", "peerDependencies": { + "@aws-cdk/aws-dynamodb": "^0.22.0", "@aws-cdk/aws-kinesis": "^0.22.0", "@aws-cdk/aws-lambda": "^0.22.0", "@aws-cdk/aws-s3": "^0.22.0", diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/integ.dynamodb.expected.json b/packages/@aws-cdk/aws-lambda-event-sources/test/integ.dynamodb.expected.json new file mode 100644 index 0000000000000..95f12af72cf00 --- /dev/null +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/integ.dynamodb.expected.json @@ -0,0 +1,130 @@ +{ + "Resources": { + "FServiceRole3AC82EE1": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ] + ] + } + ] + } + }, + "FServiceRoleDefaultPolicy17A19BFA": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "dynamodb:DescribeStream", + "dynamodb:GetRecords", + "dynamodb:GetShardIterator" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "TD925BC7E", + "StreamArn" + ] + } + }, + { + "Action": "dynamodb:ListStreams", + "Effect": "Allow", + "Resource": "*" + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "FServiceRoleDefaultPolicy17A19BFA", + "Roles": [ + { + "Ref": "FServiceRole3AC82EE1" + } + ] + } + }, + "FC4345940": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "ZipFile": "exports.handler = async function handler(event) {\n console.log('event:', JSON.stringify(event, undefined, 2));\n return { event };\n}" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "FServiceRole3AC82EE1", + "Arn" + ] + }, + "Runtime": "nodejs8.10" + }, + "DependsOn": [ + "FServiceRole3AC82EE1", + "FServiceRoleDefaultPolicy17A19BFA" + ] + }, + "FDynamoDBEventSourcelambdaeventsourcedynamodbT7967476AE652DA48": { + "Type": "AWS::Lambda::EventSourceMapping", + "Properties": { + "EventSourceArn": { + "Fn::GetAtt": [ + "TD925BC7E", + "StreamArn" + ] + }, + "FunctionName": { + "Ref": "FC4345940" + }, + "BatchSize": 5, + "StartingPosition": "TRIM_HORIZON" + } + }, + "TD925BC7E": { + "Type": "AWS::DynamoDB::Table", + "Properties": { + "KeySchema": [ + { + "AttributeName": "id", + "KeyType": "HASH" + } + ], + "AttributeDefinitions": [ + { + "AttributeName": "id", + "AttributeType": "S" + } + ], + "ProvisionedThroughput": { + "ReadCapacityUnits": 5, + "WriteCapacityUnits": 5 + }, + "StreamSpecification": { + "StreamViewType": "NEW_IMAGE" + } + } + } + } +} diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/integ.dynamodb.ts b/packages/@aws-cdk/aws-lambda-event-sources/test/integ.dynamodb.ts new file mode 100644 index 0000000000000..d50cc58e2cbb5 --- /dev/null +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/integ.dynamodb.ts @@ -0,0 +1,29 @@ +import dynamodb = require('@aws-cdk/aws-dynamodb'); +import lambda = require('@aws-cdk/aws-lambda'); +import cdk = require('@aws-cdk/cdk'); +import { DynamoEventSource } from '../lib'; +import { TestFunction } from './test-function'; + +class DynamoEventSourceTest extends cdk.Stack { + constructor(scope: cdk.App, id: string) { + super(scope, id); + + const fn = new TestFunction(this, 'F'); + const queue = new dynamodb.Table(this, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.String + }, + streamSpecification: dynamodb.StreamViewType.NewImage + }); + + fn.addEventSource(new DynamoEventSource(queue, { + batchSize: 5, + startingPosition: lambda.StartingPosition.TrimHorizon + })); + } +} + +const app = new cdk.App(); +new DynamoEventSourceTest(app, 'lambda-event-source-dynamodb'); +app.run(); \ No newline at end of file diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts b/packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts new file mode 100644 index 0000000000000..75e7461c35a7d --- /dev/null +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts @@ -0,0 +1,155 @@ +import { expect, haveResource } from '@aws-cdk/assert'; +import dynamodb = require('@aws-cdk/aws-dynamodb'); +import lambda = require('@aws-cdk/aws-lambda'); +import cdk = require('@aws-cdk/cdk'); +import { Test } from 'nodeunit'; +import sources = require('../lib'); +import { TestFunction } from './test-function'; + +// tslint:disable:object-literal-key-quotes + +export = { + 'sufficiently complex example'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const table = new dynamodb.Table(stack, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.String + }, + streamSpecification: dynamodb.StreamViewType.NewImage + }); + + // WHEN + fn.addEventSource(new sources.DynamoEventSource(table, { + startingPosition: lambda.StartingPosition.TrimHorizon + })); + + // THEN + expect(stack).to(haveResource('AWS::IAM::Policy', { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "dynamodb:DescribeStream", + "dynamodb:GetRecords", + "dynamodb:GetShardIterator", + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "TD925BC7E", + "StreamArn" + ] + } + }, + { + "Action": "dynamodb:ListStreams", + "Effect": "Allow", + "Resource": "*" + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "FnServiceRoleDefaultPolicyC6A839BF", + "Roles": [{ + "Ref": "FnServiceRoleB9001A96" + }] + })); + + expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', { + "EventSourceArn": { + "Fn::GetAtt": [ + "TD925BC7E", + "StreamArn" + ] + }, + "FunctionName": { + "Ref": "Fn9270CBC0" + }, + "BatchSize": 100, + "StartingPosition": "TRIM_HORIZON" + })); + + test.done(); + }, + + 'specific batch size'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const table = new dynamodb.Table(stack, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.String + }, + streamSpecification: dynamodb.StreamViewType.NewImage + }); + + // WHEN + fn.addEventSource(new sources.DynamoEventSource(table, { + batchSize: 50, + startingPosition: lambda.StartingPosition.Latest + })); + + // THEN + expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', { + "EventSourceArn": { + "Fn::GetAtt": [ + "TD925BC7E", + "StreamArn" + ] + }, + "FunctionName": { + "Ref": "Fn9270CBC0" + }, + "BatchSize": 50, + "StartingPosition": "LATEST" + })); + + test.done(); + }, + + 'fails if batch size < 1'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const table = new dynamodb.Table(stack, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.String + }, + streamSpecification: dynamodb.StreamViewType.NewImage + }); + + // WHEN + test.throws(() => fn.addEventSource(new sources.DynamoEventSource(table, { + batchSize: 0, + startingPosition: lambda.StartingPosition.Latest + })), /Maximum batch size must be between 1 and 1000 inclusive \(given 0\)/); + + test.done(); + }, + + 'fails if batch size > 1000'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const table = new dynamodb.Table(stack, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.String + }, + streamSpecification: dynamodb.StreamViewType.NewImage + }); + + // WHEN + test.throws(() => fn.addEventSource(new sources.DynamoEventSource(table, { + batchSize: 1001, + startingPosition: lambda.StartingPosition.Latest + })), /Maximum batch size must be between 1 and 1000 inclusive \(given 1001\)/); + + test.done(); + }, +};