diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.assets.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.assets.json index a01ffb4d5f4c8..18daf0934754d 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.assets.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.assets.json @@ -1,5 +1,5 @@ { - "version": "36.0.0", + "version": "38.0.1", "files": { "21fbb51d7b23f6a6c262b46a9caee79d744a3ac019fd45422d988b96d44b2a22": { "source": { diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/cdk.out b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/cdk.out index 1f0068d32659a..c6e612584e352 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/cdk.out +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/cdk.out @@ -1 +1 @@ -{"version":"36.0.0"} \ No newline at end of file +{"version":"38.0.1"} \ No newline at end of file diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/integ.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/integ.json index eb53722c5afaf..b2142e0e738f8 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/integ.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/integ.json @@ -1,5 +1,5 @@ { - "version": "36.0.0", + "version": "38.0.1", "testCases": { "LambdaEventSourceKafkaSelfManagedTest/DefaultTest": { "stacks": [ diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.assets.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.assets.json index cb4ec6e990114..983f0b88ef58f 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.assets.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.assets.json @@ -1,7 +1,7 @@ { - "version": "36.0.0", + "version": "38.0.1", "files": { - "4bf07b5cad381e52a796b0a42748934cce430e155ffe31f0366eef200d40356f": { + "cb2d7d48eaf634edd0acc6e6475e904f57e491fce9fa3ac839e93cba823616c1": { "source": { "path": "lambda-event-source-kafka-self-managed.template.json", "packaging": "file" @@ -9,7 +9,7 @@ "destinations": { "current_account-current_region": { "bucketName": "cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}", - "objectKey": "4bf07b5cad381e52a796b0a42748934cce430e155ffe31f0366eef200d40356f.json", + "objectKey": "cb2d7d48eaf634edd0acc6e6475e904f57e491fce9fa3ac839e93cba823616c1.json", "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-file-publishing-role-${AWS::AccountId}-${AWS::Region}" } } diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json index dd921a80f1344..5a5f9daa01865 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json @@ -322,6 +322,138 @@ "my-test-topic2" ] } + }, + "F3ServiceRole2F65FFC0": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ] + ] + } + ] + } + }, + "F3ServiceRoleDefaultPolicy1C0463D1": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:DescribeSecret", + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": [ + { + "Ref": "S509448A1" + }, + { + "Ref": "SC0855C491" + } + ] + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "F3ServiceRoleDefaultPolicy1C0463D1", + "Roles": [ + { + "Ref": "F3ServiceRole2F65FFC0" + } + ] + } + }, + "F38FF9B13A": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "ZipFile": "exports.handler = async function handler(event) {\n console.log('event:', JSON.stringify(event, undefined, 2));\n return { event };\n}" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "F3ServiceRole2F65FFC0", + "Arn" + ] + }, + "Runtime": "nodejs18.x" + }, + "DependsOn": [ + "F3ServiceRoleDefaultPolicy1C0463D1", + "F3ServiceRole2F65FFC0" + ] + }, + "F3KafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic3ED015F25": { + "Type": "AWS::Lambda::EventSourceMapping", + "Properties": { + "BatchSize": 100, + "FilterCriteria": { + "Filters": [ + { + "Pattern": "{\"numericEquals\":[{\"numeric\":[\"=\",1]}]}" + } + ] + }, + "FunctionName": { + "Ref": "F38FF9B13A" + }, + "ProvisionedPollerConfig": { + "MaximumPollers": 3, + "MinimumPollers": 1 + }, + "SelfManagedEventSource": { + "Endpoints": { + "KafkaBootstrapServers": [ + "my-self-hosted-kafka-broker-1:9092", + "my-self-hosted-kafka-broker-2:9092", + "my-self-hosted-kafka-broker-3:9092" + ] + } + }, + "SelfManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "myTestConsumerGroup3" + }, + "SourceAccessConfigurations": [ + { + "Type": "CLIENT_CERTIFICATE_TLS_AUTH", + "URI": { + "Ref": "SC0855C491" + } + }, + { + "Type": "SERVER_ROOT_CA_CERTIFICATE", + "URI": { + "Ref": "S509448A1" + } + } + ], + "StartingPosition": "TRIM_HORIZON", + "Topics": [ + "my-test-topic3" + ] + } } }, "Parameters": { diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/manifest.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/manifest.json index 06655a65cd8cb..d41cc1cc8586e 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/manifest.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/manifest.json @@ -1,5 +1,5 @@ { - "version": "36.0.0", + "version": "38.0.1", "artifacts": { "lambda-event-source-kafka-self-managed.assets": { "type": "cdk:asset-manifest", @@ -16,9 +16,10 @@ "templateFile": "lambda-event-source-kafka-self-managed.template.json", "terminationProtection": false, "validateOnSynth": false, + "notificationArns": [], "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-deploy-role-${AWS::AccountId}-${AWS::Region}", "cloudFormationExecutionRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-cfn-exec-role-${AWS::AccountId}-${AWS::Region}", - "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/4bf07b5cad381e52a796b0a42748934cce430e155ffe31f0366eef200d40356f.json", + "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/cb2d7d48eaf634edd0acc6e6475e904f57e491fce9fa3ac839e93cba823616c1.json", "requiresBootstrapStackVersion": 6, "bootstrapStackVersionSsmParameter": "/cdk-bootstrap/hnb659fds/version", "additionalDependencies": [ @@ -100,6 +101,30 @@ "data": "F2KafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic20A678189" } ], + "/lambda-event-source-kafka-self-managed/F3/ServiceRole/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "F3ServiceRole2F65FFC0" + } + ], + "/lambda-event-source-kafka-self-managed/F3/ServiceRole/DefaultPolicy/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "F3ServiceRoleDefaultPolicy1C0463D1" + } + ], + "/lambda-event-source-kafka-self-managed/F3/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "F38FF9B13A" + } + ], + "/lambda-event-source-kafka-self-managed/F3/KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic3/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "F3KafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic3ED015F25" + } + ], "/lambda-event-source-kafka-self-managed/BootstrapVersion": [ { "type": "aws:cdk:logicalId", @@ -130,6 +155,7 @@ "templateFile": "LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.template.json", "terminationProtection": false, "validateOnSynth": false, + "notificationArns": [], "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-deploy-role-${AWS::AccountId}-${AWS::Region}", "cloudFormationExecutionRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-cfn-exec-role-${AWS::AccountId}-${AWS::Region}", "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/21fbb51d7b23f6a6c262b46a9caee79d744a3ac019fd45422d988b96d44b2a22.json", diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/tree.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/tree.json index 54543d8610b3a..168d1caad8aee 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/tree.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/tree.json @@ -531,6 +531,214 @@ "version": "0.0.0" } }, + "F3": { + "id": "F3", + "path": "lambda-event-source-kafka-self-managed/F3", + "children": { + "ServiceRole": { + "id": "ServiceRole", + "path": "lambda-event-source-kafka-self-managed/F3/ServiceRole", + "children": { + "ImportServiceRole": { + "id": "ImportServiceRole", + "path": "lambda-event-source-kafka-self-managed/F3/ServiceRole/ImportServiceRole", + "constructInfo": { + "fqn": "aws-cdk-lib.Resource", + "version": "0.0.0" + } + }, + "Resource": { + "id": "Resource", + "path": "lambda-event-source-kafka-self-managed/F3/ServiceRole/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::IAM::Role", + "aws:cdk:cloudformation:props": { + "assumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + }, + "managedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ] + ] + } + ] + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.CfnRole", + "version": "0.0.0" + } + }, + "DefaultPolicy": { + "id": "DefaultPolicy", + "path": "lambda-event-source-kafka-self-managed/F3/ServiceRole/DefaultPolicy", + "children": { + "Resource": { + "id": "Resource", + "path": "lambda-event-source-kafka-self-managed/F3/ServiceRole/DefaultPolicy/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::IAM::Policy", + "aws:cdk:cloudformation:props": { + "policyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:DescribeSecret", + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": [ + { + "Ref": "S509448A1" + }, + { + "Ref": "SC0855C491" + } + ] + } + ], + "Version": "2012-10-17" + }, + "policyName": "F3ServiceRoleDefaultPolicy1C0463D1", + "roles": [ + { + "Ref": "F3ServiceRole2F65FFC0" + } + ] + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.CfnPolicy", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.Policy", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.Role", + "version": "0.0.0" + } + }, + "Resource": { + "id": "Resource", + "path": "lambda-event-source-kafka-self-managed/F3/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::Lambda::Function", + "aws:cdk:cloudformation:props": { + "code": { + "zipFile": "exports.handler = async function handler(event) {\n console.log('event:', JSON.stringify(event, undefined, 2));\n return { event };\n}" + }, + "handler": "index.handler", + "role": { + "Fn::GetAtt": [ + "F3ServiceRole2F65FFC0", + "Arn" + ] + }, + "runtime": "nodejs18.x" + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_lambda.CfnFunction", + "version": "0.0.0" + } + }, + "KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic3": { + "id": "KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic3", + "path": "lambda-event-source-kafka-self-managed/F3/KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic3", + "children": { + "Resource": { + "id": "Resource", + "path": "lambda-event-source-kafka-self-managed/F3/KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic3/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::Lambda::EventSourceMapping", + "aws:cdk:cloudformation:props": { + "batchSize": 100, + "filterCriteria": { + "filters": [ + { + "pattern": "{\"numericEquals\":[{\"numeric\":[\"=\",1]}]}" + } + ] + }, + "functionName": { + "Ref": "F38FF9B13A" + }, + "provisionedPollerConfig": { + "minimumPollers": 1, + "maximumPollers": 3 + }, + "selfManagedEventSource": { + "endpoints": { + "kafkaBootstrapServers": [ + "my-self-hosted-kafka-broker-1:9092", + "my-self-hosted-kafka-broker-2:9092", + "my-self-hosted-kafka-broker-3:9092" + ] + } + }, + "selfManagedKafkaEventSourceConfig": { + "consumerGroupId": "myTestConsumerGroup3" + }, + "sourceAccessConfigurations": [ + { + "type": "CLIENT_CERTIFICATE_TLS_AUTH", + "uri": { + "Ref": "SC0855C491" + } + }, + { + "type": "SERVER_ROOT_CA_CERTIFICATE", + "uri": { + "Ref": "S509448A1" + } + } + ], + "startingPosition": "TRIM_HORIZON", + "topics": [ + "my-test-topic3" + ] + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_lambda.CfnEventSourceMapping", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_lambda.EventSourceMapping", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_lambda.Function", + "version": "0.0.0" + } + }, "BootstrapVersion": { "id": "BootstrapVersion", "path": "lambda-event-source-kafka-self-managed/BootstrapVersion", @@ -566,7 +774,7 @@ "path": "LambdaEventSourceKafkaSelfManagedTest/DefaultTest/Default", "constructInfo": { "fqn": "constructs.Construct", - "version": "10.3.0" + "version": "10.4.2" } }, "DeployAssert": { @@ -612,7 +820,7 @@ "path": "Tree", "constructInfo": { "fqn": "constructs.Construct", - "version": "10.3.0" + "version": "10.4.2" } } }, diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts index 3af619c6f8bc2..a9a8b266a2029 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts @@ -87,6 +87,29 @@ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== ], filterEncryption: myKey, })); + + const fn3 = new TestFunction(this, 'F3'); + rootCASecret.grantRead(fn3); + clientCertificatesSecret.grantRead(fn3); + + fn3.addEventSource(new SelfManagedKafkaEventSource({ + bootstrapServers, + topic: 'my-test-topic3', + consumerGroupId: 'myTestConsumerGroup3', + secret: clientCertificatesSecret, + authenticationMethod: AuthenticationMethod.CLIENT_CERTIFICATE_TLS_AUTH, + rootCACertificate: rootCASecret, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + filters: [ + lambda.FilterCriteria.filter({ + numericEquals: lambda.FilterRule.isEqual(1), + }), + ], + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 3, + }, + })); } } diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/README.md b/packages/aws-cdk-lib/aws-lambda-event-sources/README.md index 57aa2b3fe3e5c..1d4f12937a812 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/README.md +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/README.md @@ -396,6 +396,29 @@ myFunction.addEventSource(new ManagedKafkaEventSource({ })); ``` +Set configuration for provisioned pollers that read from the event source. + +```ts +import { ManagedKafkaEventSource } from 'aws-cdk-lib/aws-lambda-event-sources'; + +// Your MSK cluster arn +declare const clusterArn: string + +// The Kafka topic you want to subscribe to +const topic = 'some-cool-topic'; + +declare const myFunction: lambda.Function; +myFunction.addEventSource(new ManagedKafkaEventSource({ + clusterArn, + topic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 3, + }, +})); +``` + ## Roadmap Eventually, this module will support all the event sources described under diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts index 3c75a45a51447..c7a837b23ff16 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts @@ -164,6 +164,7 @@ export class ManagedKafkaEventSource extends StreamEventSource { kafkaConsumerGroupId: this.innerProps.consumerGroupId, onFailure: this.innerProps.onFailure, supportS3OnFailureDestination: true, + provisionedPollerConfig: this.innerProps.provisionedPollerConfig, }), ); @@ -240,6 +241,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { throw new Error('secret must be set if Kafka brokers accessed over Internet'); } this.innerProps = props; + } public bind(target: lambda.IFunction) { @@ -256,6 +258,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { sourceAccessConfigurations: this.sourceAccessConfigurations(), onFailure: this.innerProps.onFailure, supportS3OnFailureDestination: true, + provisionedPollerConfig: this.innerProps.provisionedPollerConfig, }), ); diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts index 40b765da22c63..f65cb6a9852a1 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts @@ -44,6 +44,29 @@ export interface BaseStreamEventSourceProps{ * @default true */ readonly enabled?: boolean; + + /** + * Configuration for provisioned pollers that read from the event source. + * When specified, allows control over the minimum and maximum number of pollers + * that can be provisioned to process events from the source. + * @default - no provisioned pollers + */ + readonly provisionedPollerConfig?: ProvisionedPollerConfig; +} + +export interface ProvisionedPollerConfig { + /** + * The minimum number of pollers that should be provisioned. + * + * @default 1 + */ + readonly minimumPollers: number; + /** + * The maximum number of pollers that can be provisioned. + * + * @default 200 + */ + readonly maximumPollers: number; } /** @@ -151,6 +174,24 @@ export interface StreamEventSourceProps extends BaseStreamEventSourceProps { */ export abstract class StreamEventSource implements lambda.IEventSource { protected constructor(protected readonly props: StreamEventSourceProps) { + if (props.provisionedPollerConfig) { + const { minimumPollers, maximumPollers } = props.provisionedPollerConfig; + if (minimumPollers != undefined) { + if (minimumPollers < 1 || minimumPollers > 200) { + throw new Error('Minimum provisioned pollers must be between 1 and 200 inclusive'); + } + } + if (maximumPollers != undefined) { + if (maximumPollers < 1 || maximumPollers > 2000) { + throw new Error('Maximum provisioned pollers must be between 1 and 2000 inclusive'); + } + } + if (minimumPollers != undefined && maximumPollers != undefined) { + if (minimumPollers > maximumPollers) { + throw new Error('Minimum provisioned pollers must be less than or equal to maximum provisioned pollers'); + } + } + } } public abstract bind(_target: lambda.IFunction): void; diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts index d5b1df2c9a657..5775a8ad1507f 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts @@ -308,6 +308,105 @@ describe('KafkaEventSource', () => { }); }); + test('with provisioned pollers', () => { + // GIVEN + const stack = new cdk.Stack(); + const testLambdaFunction = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + const bucket = Bucket.fromBucketName(stack, 'BucketByName', 'my-bucket'); + const s3OnFailureDestination = new sources.S3OnFailureDestination(bucket); + // WHEN + testLambdaFunction.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + onFailure: s3OnFailureDestination, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 3, + }, + })); + + // THEN + Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', { + DestinationConfig: { + OnFailure: { + Destination: { + 'Fn::Join': ['', ['arn:', { Ref: 'AWS::Partition' }, ':s3:::my-bucket']], + }, + }, + }, + ProvisionedPollerConfig: { + MinimumPollers: 1, + MaximumPollers: 3, + }, + }); + }); + + test('maximum provisioned poller is out of limit', () => { + const stack = new cdk.Stack(); + const testLambdaFunction = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + const bucket = Bucket.fromBucketName(stack, 'BucketByName', 'my-bucket'); + const s3OnFailureDestination = new sources.S3OnFailureDestination(bucket); + + expect(() => testLambdaFunction.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + onFailure: s3OnFailureDestination, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 2001, + }, + }))).toThrow(/Maximum provisioned pollers must be between 1 and 2000 inclusive/); + }); + + test('minimum provisioned poller is out of limit', () => { + const stack = new cdk.Stack(); + const testLambdaFunction = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + const bucket = Bucket.fromBucketName(stack, 'BucketByName', 'my-bucket'); + const s3OnFailureDestination = new sources.S3OnFailureDestination(bucket); + + expect(() => testLambdaFunction.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + onFailure: s3OnFailureDestination, + provisionedPollerConfig: { + minimumPollers: 0, + maximumPollers: 3, + }, + }))).toThrow(/Minimum provisioned pollers must be between 1 and 200 inclusive/); + }); + + test('Minimum provisioned poller greater than maximum provisioned poller', () => { + const stack = new cdk.Stack(); + const testLambdaFunction = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + const bucket = Bucket.fromBucketName(stack, 'BucketByName', 'my-bucket'); + const s3OnFailureDestination = new sources.S3OnFailureDestination(bucket); + + expect(() => testLambdaFunction.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + onFailure: s3OnFailureDestination, + provisionedPollerConfig: { + minimumPollers: 3, + maximumPollers: 1, + }, + }))).toThrow(/Minimum provisioned pollers must be less than or equal to maximum provisioned pollers/); + }); }); describe('self-managed kafka', () => { @@ -998,5 +1097,94 @@ describe('KafkaEventSource', () => { expect(mskEventMapping.eventSourceMappingId).toBeDefined(); expect(mskEventMapping.eventSourceMappingArn).toBeDefined(); }); + + test('with provisioned pollers', () => { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + const mskEventMapping = new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 3, + }, + }); + + // WHEN + fn.addEventSource(mskEventMapping); + expect(mskEventMapping.eventSourceMappingId).toBeDefined(); + expect(mskEventMapping.eventSourceMappingArn).toBeDefined(); + + const template = Template.fromStack(stack); + template.hasResourceProperties('AWS::Lambda::EventSourceMapping', { + ProvisionedPollerConfig: { + MinimumPollers: 1, + MaximumPollers: 3, + }, + }); + }); + + test('maximum provisioned poller is out of limit', () => { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + expect(() => fn.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 2001, + }, + }))).toThrow(/Maximum provisioned pollers must be between 1 and 2000 inclusive/); + }); + + test('minimum provisioned poller is out of limit', () => { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + expect(() => fn.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + provisionedPollerConfig: { + minimumPollers: 0, + maximumPollers: 3, + }, + }))).toThrow(/Minimum provisioned pollers must be between 1 and 200 inclusive/); + }); + + test('Minimum provisioned poller greater than maximum provisioned poller', () => { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + expect(() => fn.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + provisionedPollerConfig: { + minimumPollers: 3, + maximumPollers: 1, + }, + }))).toThrow(/Minimum provisioned pollers must be less than or equal to maximum provisioned pollers/); + }); }); }); diff --git a/packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts b/packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts index 842cdeb66fecc..fe4f4cc78b88f 100644 --- a/packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts +++ b/packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts @@ -81,6 +81,21 @@ export interface SourceAccessConfiguration { readonly uri: string; } +export interface ProvisionedPollerConfig { + /** + * The minimum number of pollers that should be provisioned. + * + * @default - 1 + */ + readonly minimumPollers?: number; + /** + * The maximum number of pollers that can be provisioned. + * + * @default - 200 + */ + readonly maximumPollers?: number; +} + export interface EventSourceMappingOptions { /** * The Amazon Resource Name (ARN) of the event source. Any record added to @@ -270,6 +285,14 @@ export interface EventSourceMappingOptions { */ readonly supportS3OnFailureDestination?: boolean; + /** + * Configuration for provisioned pollers that read from the event source. + * When specified, allows control over the minimum and maximum number of pollers + * that can be provisioned to process events from the source. + * @default - no provisioned pollers + */ + readonly provisionedPollerConfig?: ProvisionedPollerConfig; + /** * Configuration for enhanced monitoring metrics collection * When specified, enables collection of additional metrics for the stream event source @@ -383,6 +406,25 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp throw new Error('eventSourceArn and kafkaBootstrapServers are mutually exclusive'); } + if (props.provisionedPollerConfig) { + const { minimumPollers, maximumPollers } = props.provisionedPollerConfig; + if (minimumPollers != undefined) { + if (minimumPollers < 1 || minimumPollers > 200) { + throw new Error('Minimum provisioned pollers must be between 1 and 200 inclusive'); + } + } + if (maximumPollers != undefined) { + if (maximumPollers < 1 || maximumPollers > 2000) { + throw new Error('Maximum provisioned pollers must be between 1 and 2000 inclusive'); + } + } + if (minimumPollers != undefined && maximumPollers != undefined) { + if (minimumPollers > maximumPollers) { + throw new Error('Minimum provisioned pollers must be less than or equal to maximum provisioned pollers'); + } + } + } + if (props.kafkaBootstrapServers && (props.kafkaBootstrapServers?.length < 1)) { throw new Error('kafkaBootStrapServers must not be empty if set'); } @@ -482,6 +524,7 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp kmsKeyArn: props.filterEncryption?.keyArn, selfManagedKafkaEventSourceConfig: props.kafkaBootstrapServers ? consumerGroupConfig : undefined, amazonManagedKafkaEventSourceConfig: props.eventSourceArn ? consumerGroupConfig : undefined, + provisionedPollerConfig: props.provisionedPollerConfig, metricsConfig: props.metricsConfig, }); this.eventSourceMappingId = cfnEventSourceMapping.ref; diff --git a/packages/aws-cdk-lib/aws-lambda/test/event-source-mapping.test.ts b/packages/aws-cdk-lib/aws-lambda/test/event-source-mapping.test.ts index 315074ec62210..b8baaa24d22dd 100644 --- a/packages/aws-cdk-lib/aws-lambda/test/event-source-mapping.test.ts +++ b/packages/aws-cdk-lib/aws-lambda/test/event-source-mapping.test.ts @@ -2,7 +2,7 @@ import { Match, Template } from '../../assertions'; import { Key } from '../../aws-kms'; import * as cdk from '../../core'; import * as lambda from '../lib'; -import { Code, EventSourceMapping, Function, Runtime, Alias, StartingPosition, FilterRule, FilterCriteria } from '../lib'; +import { Code, EventSourceMapping, Function, Alias, StartingPosition, FilterRule, FilterCriteria } from '../lib'; let stack: cdk.Stack; let fn: Function; @@ -532,4 +532,75 @@ describe('event source mapping', () => { }, }); }); + + test('provisioned pollers is set', () => { + new EventSourceMapping(stack, 'test', { + target: fn, + eventSourceArn: '', + startingPosition: StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 3, + }, + }); + Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', { + StartingPosition: 'AT_TIMESTAMP', + StartingPositionTimestamp: 1640995200, + ProvisionedPollerConfig: { + MinimumPollers: 1, + MaximumPollers: 3, + }, + }); + }); + + test('minimum provisioned poller is out of limit', () => { + expect(() => new EventSourceMapping(stack, 'test', { + target: fn, + eventSourceArn: '', + startingPosition: StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + provisionedPollerConfig: { + minimumPollers: 0, + }, + })).toThrow(/Minimum provisioned pollers must be between 1 and 200 inclusive/); + }); + + test('maximum provisioned poller is out of limit', () => { + expect(() => new EventSourceMapping(stack, 'test', { + target: fn, + eventSourceArn: '', + startingPosition: StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + provisionedPollerConfig: { + maximumPollers: 2001, + }, + })).toThrow(/Maximum provisioned pollers must be between 1 and 2000 inclusive/); + }); + + test('only maximum provisioned poller is out of limit', () => { + expect(() => new EventSourceMapping(stack, 'test', { + target: fn, + eventSourceArn: '', + startingPosition: StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 2001, + }, + })).toThrow(/Maximum provisioned pollers must be between 1 and 2000 inclusive/); + }); + + test('Minimum provisioned poller greater than maximum provisioned poller', () => { + expect(() => new EventSourceMapping(stack, 'test', { + target: fn, + eventSourceArn: '', + startingPosition: StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + provisionedPollerConfig: { + minimumPollers: 3, + maximumPollers: 2, + }, + })).toThrow(/Minimum provisioned pollers must be less than or equal to maximum provisioned pollers/); + }); }); diff --git a/packages/aws-cdk-lib/awslint.json b/packages/aws-cdk-lib/awslint.json index 528f143682a16..465f57aceeeef 100644 --- a/packages/aws-cdk-lib/awslint.json +++ b/packages/aws-cdk-lib/awslint.json @@ -405,6 +405,18 @@ "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.S3EventSource.bucket", "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.SnsEventSource.topic", "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.SqsEventSource.queue", + "docs-public-apis:aws-cdk-lib.aws_lambda.EventSourceMappingOptions.provisionedPollerConfig", + "props-default-doc:aws-cdk-lib.aws_lambda.EventSourceMappingOptions.provisionedPollerConfig", + "docs-public-apis:aws-cdk-lib.aws_lambda.ProvisionedPollerConfig", + "docs-public-apis:aws-cdk-lib.aws_lambda.ProvisionedPollerConfig.maximumPollers", + "docs-public-apis:aws-cdk-lib.aws_lambda.ProvisionedPollerConfig.minimumPollers", + "props-default-doc:aws-cdk-lib.aws_lambda.ProvisionedPollerConfig.maximumPollers", + "props-default-doc:aws-cdk-lib.aws_lambda.ProvisionedPollerConfig.minimumPollers", + "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.KafkaEventSourceProps.provisionedPollerConfig", + "props-default-doc:aws-cdk-lib.aws_lambda_event_sources.KafkaEventSourceProps.provisionedPollerConfig", + "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.ProvisionedPollerConfig", + "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.ProvisionedPollerConfig.maximumPollers", + "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.ProvisionedPollerConfig.minimumPollers", "docs-public-apis:aws-cdk-lib.aws_logs.CrossAccountDestination.addToPolicy", "docs-public-apis:aws-cdk-lib.aws_logs.DataIdentifier.*", "docs-public-apis:aws-cdk-lib.aws_logs.JsonPattern.jsonPatternString",