diff --git a/.changeset/hot-worms-decide.md b/.changeset/hot-worms-decide.md new file mode 100644 index 000000000..e4e42d06c --- /dev/null +++ b/.changeset/hot-worms-decide.md @@ -0,0 +1,7 @@ +--- +'skuba': patch +--- + +template/lambda-sqs-worker-cdk: Align template with Serverless template + +This adds the same boilerplate code available in `lambda-sqs-worker` along with Datadog integration. diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 614a24ddd..85d8c5020 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -501,6 +501,12 @@ importers: '@seek/logger': specifier: ^9.0.0 version: 9.0.0 + datadog-lambda-js: + specifier: ^9.0.0 + version: 9.115.0 + dd-trace: + specifier: ^5.0.0 + version: 5.22.0 skuba-dive: specifier: ^2.0.0 version: 2.0.0 @@ -514,6 +520,9 @@ importers: '@types/aws-lambda': specifier: ^8.10.82 version: 8.10.145 + '@types/chance': + specifier: ^1.1.3 + version: 1.1.6 '@types/node': specifier: ^20.16.5 version: 20.16.7 @@ -523,9 +532,21 @@ importers: aws-cdk-lib: specifier: ^2.109.0 version: 2.160.0(constructs@10.3.0) + aws-sdk-client-mock: + specifier: ^4.0.0 + version: 4.0.2 + aws-sdk-client-mock-jest: + specifier: ^4.0.0 + version: 4.0.2(aws-sdk-client-mock@4.0.2) + chance: + specifier: ^1.1.8 + version: 1.1.12 constructs: specifier: ^10.0.17 version: 10.3.0 + datadog-cdk-constructs-v2: + specifier: ^1.18.0 + version: 1.18.0(aws-cdk-lib@2.160.0(constructs@10.3.0))(constructs@10.3.0) pino-pretty: specifier: ^11.0.0 version: 11.2.2 @@ -3561,6 +3582,15 @@ packages: resolution: {integrity: sha512-t/Ygsytq+R995EJ5PZlD4Cu56sWa8InXySaViRzw9apusqsOO2bQP+SbYzAhR0pFKoB+43lYy8rWban9JSuXnA==} engines: {node: '>= 0.4'} + datadog-cdk-constructs-v2@1.18.0: + resolution: {integrity: sha512-BE1rE4H31AvkmVGAbzaUz7/u+aEd989Wf5z4OmJaHa+xAul6oeXtWaUMyq6JjJukdHnk5E4h5gF5HuBRxaa2VQ==} + engines: {node: '>= 14.15.0'} + peerDependencies: + aws-cdk-lib: ^2.134.0 + constructs: ^10.0.5 + bundledDependencies: + - loglevel + datadog-lambda-js@9.115.0: resolution: {integrity: sha512-0MW83owXDwSMpqsWhIoSkk/FjLpVG2ErVcDdayBAa7Dzyap2aJMJBZbwuC9Z1keVyJNJAoleHgdTzW12EEnSiw==} @@ -12879,6 +12909,11 @@ snapshots: es-errors: 1.3.0 is-data-view: 1.0.1 + datadog-cdk-constructs-v2@1.18.0(aws-cdk-lib@2.160.0(constructs@10.3.0))(constructs@10.3.0): + dependencies: + aws-cdk-lib: 2.160.0(constructs@10.3.0) + constructs: 10.3.0 + datadog-lambda-js@9.115.0: dependencies: '@aws-crypto/sha256-js': 5.2.0 diff --git a/template/lambda-sqs-worker-cdk/.env b/template/lambda-sqs-worker-cdk/.env new file mode 100644 index 000000000..9c53edffc --- /dev/null +++ b/template/lambda-sqs-worker-cdk/.env @@ -0,0 +1 @@ +ENVIRONMENT=local diff --git a/template/lambda-sqs-worker-cdk/README.md b/template/lambda-sqs-worker-cdk/README.md new file mode 100644 index 000000000..5db50567e --- /dev/null +++ b/template/lambda-sqs-worker-cdk/README.md @@ -0,0 +1,145 @@ +# <%- repoName %> + +[![Powered by skuba](https://img.shields.io/badge/🤿%20skuba-powered-009DC4)](https://github.com/seek-oss/skuba) + +Next steps: + +1. [ ] Finish templating if this was skipped earlier: + + ```shell + pnpm exec skuba configure + ``` + +2. [ ] Create a new repository in the appropriate GitHub organisation. +3. [ ] Add the repository to BuildAgency; + see our internal [Buildkite Docs] for more information. +4. [ ] Add Datadog extension, deployment bucket configuration and data classification tags to [infra/config.ts](infra/config.ts). +5. [ ] Push local commits to the upstream GitHub branch. +6. [ ] Configure [GitHub repository settings]. +7. [ ] Delete this checklist 😌. + +[Buildkite Docs]: https://backstage.myseek.xyz/docs/default/component/buildkite-docs +[GitHub repository settings]: https://github.com/<%-orgName%>/<%-repoName%>/settings + +## Design + +<%-repoName %> is a Node.js [Lambda] application built in line with our [Technical Guidelines]. +It is backed by a typical SQS message + dead letter queue configuration and uses common SEEK packages. +Workers enable fault-tolerant asynchronous processing of events. + +The `lambda-sqs-worker-cdk` template is modelled after a hypothetical enricher that scores job advertisements. +It's stubbed out with in-memory [scoring service](src/services/jobScorer.ts). +This would be replaced with internal logic or an external service in production. + +This project is deployed with [AWS CDK]. +The Lambda runtime provisions a single Node.js process per container. +The supplied [infra/appStack.ts](infra/appStack.ts) starts out with a minimal `memorySize` which may require tuning based on workload. +Under load, we autoscale horizontally in terms of container count up to `reservedConcurrency`. + +[@seek/aws-codedeploy-hooks] configures [CodeDeploy] for a blue-green deployment approach. +A smoke test is run against the new version before traffic is switched over, +providing an opportunity to test access and connectivity to online dependencies. +This defaults to an invocation with an empty object `{}`. + +## Development + +### Test + +```shell +# Run Jest tests locally +pnpm test + +# Authenticate to dev account +awsauth + +# Run smoke test against deployed application +ENVIRONMENT=dev pnpm smoke +``` + +### Lint + +```shell +# Fix issues +pnpm format + +# Check for issues +pnpm lint +``` + +### Start + +```shell +# Start a local HTTP server +pnpm start + +# Start with Node.js Inspector enabled +pnpm start:debug +``` + +This serves the Lambda application over HTTP. +For example, to invoke the handler with an empty object `{}` for smoke testing: + +```shell +curl --data '[{}, {"awsRequestId": "local"}]' --include localhost:<%- port %> +``` + +### Deploy + +This project is deployed through a [Buildkite pipeline](.buildkite/pipeline.yml). + +- Commits to a feature branch can be deployed to the dev environment by unblocking a step in the Buildkite UI +- Commits to the default branch are automatically deployed to the dev and prod environments in sequence + +To deploy locally: + +```shell +# Authenticate to dev account +awsauth + +ENVIRONMENT=dev pnpm run deploy +``` + +A hotswap deploy enables faster deployment but come with caveats such as requiring a Lambda to be rebuilt with every build. + +To deploy a [hotswap]: + +```shell +# Authenticate to dev account +awsauth + +ENVIRONMENT=dev pnpm run deploy:hotswap +``` + +To rapidly roll back a change, +retry an individual deployment step from the previous build in Buildkite. +Note that this will introduce drift between the head of the default Git branch and the live environment; +use with caution and always follow up with a proper revert or fix in Git history. + +## Support + +### Dev + +TODO: add support links for the dev environment. + + + +### Prod + +TODO: add support links for the prod environment. + + + +[@seek/aws-codedeploy-hooks]: https://github.com/seek-oss/aws-codedeploy-hooks +[AWS CDK]: https://docs.aws.amazon.com/cdk/v2/guide/home.html +[CodeDeploy]: https://docs.aws.amazon.com/codedeploy +[Hotswap]: https://docs.aws.amazon.com/cdk/v2/guide/ref-cli-cmd-deploy.html#ref-cli-cmd-deploy-options +[Lambda]: https://docs.aws.amazon.com/lambda +[Technical Guidelines]: https://myseek.atlassian.net/wiki/spaces/AA/pages/2358346017/ diff --git a/template/lambda-sqs-worker-cdk/infra/__snapshots__/appStack.test.ts.snap b/template/lambda-sqs-worker-cdk/infra/__snapshots__/appStack.test.ts.snap index bb92918c3..857901057 100644 --- a/template/lambda-sqs-worker-cdk/infra/__snapshots__/appStack.test.ts.snap +++ b/template/lambda-sqs-worker-cdk/infra/__snapshots__/appStack.test.ts.snap @@ -10,6 +10,40 @@ exports[`returns expected CloudFormation stack for dev 1`] = ` }, }, "Resources": { + "datadogapikeysecret046FEF06": { + "DeletionPolicy": "Delete", + "Properties": { + "GenerateSecretString": {}, + }, + "Type": "AWS::SecretsManager::Secret", + "UpdateReplacePolicy": "Delete", + }, + "destinationtopicDCE2E0B8": { + "Properties": { + "KmsMasterKeyId": { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition", + }, + ":kms:", + { + "Ref": "AWS::Region", + }, + ":", + { + "Ref": "AWS::AccountId", + }, + ":alias/aws/sns", + ], + ], + }, + "TopicName": "serviceName", + }, + "Type": "AWS::SNS::Topic", + }, "kmskey49FBC3B3": { "DeletionPolicy": "Retain", "Properties": { @@ -105,17 +139,6 @@ exports[`returns expected CloudFormation stack for dev 1`] = ` }, "Resource": "*", }, - { - "Action": [ - "kms:Decrypt", - "kms:GenerateDataKey", - ], - "Effect": "Allow", - "Principal": { - "Service": "sns.amazonaws.com", - }, - "Resource": "*", - }, ], "Version": "2012-10-17", }, @@ -135,9 +158,6 @@ exports[`returns expected CloudFormation stack for dev 1`] = ` }, "Type": "AWS::KMS::Alias", }, - "sourcetopic7C3DC892": { - "Type": "AWS::SNS::Topic", - }, "worker28EA3E30": { "DependsOn": [ "workerServiceRoleDefaultPolicyBA498553", @@ -156,6 +176,22 @@ exports[`returns expected CloudFormation stack for dev 1`] = ` "Description": "Updated at 1212-12-12T12:12:12.121Z", "Environment": { "Variables": { + "DD_API_KEY_SECRET_ARN": { + "Ref": "datadogapikeysecret046FEF06", + }, + "DD_CAPTURE_LAMBDA_PAYLOAD": "false", + "DD_FLUSH_TO_LOG": "false", + "DD_LAMBDA_HANDLER": "index.handler", + "DD_LOGS_INJECTION": "false", + "DD_MERGE_XRAY_TRACES": "false", + "DD_SERVERLESS_APPSEC_ENABLED": "false", + "DD_SERVERLESS_LOGS_ENABLED": "false", + "DD_SITE": "datadoghq.com", + "DD_TAGS": "git.commit.sha:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx,git.repository_url:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", + "DD_TRACE_ENABLED": "true", + "DESTINATION_SNS_TOPIC_ARN": { + "Ref": "destinationtopicDCE2E0B8", + }, "ENVIRONMENT": "dev", "NODE_ENV": "production", "NODE_OPTIONS": "--enable-source-maps", @@ -164,13 +200,27 @@ exports[`returns expected CloudFormation stack for dev 1`] = ` }, }, "FunctionName": "serviceName", - "Handler": "index.handler", + "Handler": "node_modules/datadog-lambda-js/dist/handler.handler", "KmsKeyArn": { "Fn::GetAtt": [ "kmskey49FBC3B3", "Arn", ], }, + "Layers": [ + { + "Fn::Join": [ + "", + [ + "arn:aws:lambda:", + { + "Ref": "AWS::Region", + }, + ":464622532012:layer:Datadog-Extension-ARM:x", + ], + ], + }, + ], "ReservedConcurrentExecutions": 2, "Role": { "Fn::GetAtt": [ @@ -184,6 +234,10 @@ exports[`returns expected CloudFormation stack for dev 1`] = ` "Key": "aws-codedeploy-hooks", "Value": "x.x.x-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", }, + { + "Key": "dd_cdk_construct", + "Value": "vx.x.x", + }, ], "Timeout": 30, }, @@ -550,6 +604,16 @@ exports[`returns expected CloudFormation stack for dev 1`] = ` "Properties": { "PolicyDocument": { "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue", + "secretsmanager:DescribeSecret", + ], + "Effect": "Allow", + "Resource": { + "Ref": "datadogapikeysecret046FEF06", + }, + }, { "Action": [ "sqs:ReceiveMessage", @@ -611,59 +675,6 @@ exports[`returns expected CloudFormation stack for dev 1`] = ` "Type": "AWS::SQS::Queue", "UpdateReplacePolicy": "Delete", }, - "workerqueuePolicy97054CB4": { - "Properties": { - "PolicyDocument": { - "Statement": [ - { - "Action": "sqs:SendMessage", - "Condition": { - "ArnEquals": { - "aws:SourceArn": { - "Ref": "sourcetopic7C3DC892", - }, - }, - }, - "Effect": "Allow", - "Principal": { - "Service": "sns.amazonaws.com", - }, - "Resource": { - "Fn::GetAtt": [ - "workerqueueA05CE5C6", - "Arn", - ], - }, - }, - ], - "Version": "2012-10-17", - }, - "Queues": [ - { - "Ref": "workerqueueA05CE5C6", - }, - ], - }, - "Type": "AWS::SQS::QueuePolicy", - }, - "workerqueueappStacksourcetopic613C6BDBD2F224F5": { - "DependsOn": [ - "workerqueuePolicy97054CB4", - ], - "Properties": { - "Endpoint": { - "Fn::GetAtt": [ - "workerqueueA05CE5C6", - "Arn", - ], - }, - "Protocol": "sqs", - "TopicArn": { - "Ref": "sourcetopic7C3DC892", - }, - }, - "Type": "AWS::SNS::Subscription", - }, "workerqueuedeadletters83F3505C": { "DeletionPolicy": "Delete", "Properties": { @@ -719,6 +730,40 @@ exports[`returns expected CloudFormation stack for prod 1`] = ` }, }, "Resources": { + "datadogapikeysecret046FEF06": { + "DeletionPolicy": "Delete", + "Properties": { + "GenerateSecretString": {}, + }, + "Type": "AWS::SecretsManager::Secret", + "UpdateReplacePolicy": "Delete", + }, + "destinationtopicDCE2E0B8": { + "Properties": { + "KmsMasterKeyId": { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition", + }, + ":kms:", + { + "Ref": "AWS::Region", + }, + ":", + { + "Ref": "AWS::AccountId", + }, + ":alias/aws/sns", + ], + ], + }, + "TopicName": "serviceName", + }, + "Type": "AWS::SNS::Topic", + }, "kmskey49FBC3B3": { "DeletionPolicy": "Retain", "Properties": { @@ -814,17 +859,6 @@ exports[`returns expected CloudFormation stack for prod 1`] = ` }, "Resource": "*", }, - { - "Action": [ - "kms:Decrypt", - "kms:GenerateDataKey", - ], - "Effect": "Allow", - "Principal": { - "Service": "sns.amazonaws.com", - }, - "Resource": "*", - }, ], "Version": "2012-10-17", }, @@ -844,9 +878,6 @@ exports[`returns expected CloudFormation stack for prod 1`] = ` }, "Type": "AWS::KMS::Alias", }, - "sourcetopic7C3DC892": { - "Type": "AWS::SNS::Topic", - }, "worker28EA3E30": { "DependsOn": [ "workerServiceRoleDefaultPolicyBA498553", @@ -865,6 +896,22 @@ exports[`returns expected CloudFormation stack for prod 1`] = ` "Description": "Updated at 1212-12-12T12:12:12.121Z", "Environment": { "Variables": { + "DD_API_KEY_SECRET_ARN": { + "Ref": "datadogapikeysecret046FEF06", + }, + "DD_CAPTURE_LAMBDA_PAYLOAD": "false", + "DD_FLUSH_TO_LOG": "false", + "DD_LAMBDA_HANDLER": "index.handler", + "DD_LOGS_INJECTION": "false", + "DD_MERGE_XRAY_TRACES": "false", + "DD_SERVERLESS_APPSEC_ENABLED": "false", + "DD_SERVERLESS_LOGS_ENABLED": "false", + "DD_SITE": "datadoghq.com", + "DD_TAGS": "git.commit.sha:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx,git.repository_url:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", + "DD_TRACE_ENABLED": "true", + "DESTINATION_SNS_TOPIC_ARN": { + "Ref": "destinationtopicDCE2E0B8", + }, "ENVIRONMENT": "prod", "NODE_ENV": "production", "NODE_OPTIONS": "--enable-source-maps", @@ -873,13 +920,27 @@ exports[`returns expected CloudFormation stack for prod 1`] = ` }, }, "FunctionName": "serviceName", - "Handler": "index.handler", + "Handler": "node_modules/datadog-lambda-js/dist/handler.handler", "KmsKeyArn": { "Fn::GetAtt": [ "kmskey49FBC3B3", "Arn", ], }, + "Layers": [ + { + "Fn::Join": [ + "", + [ + "arn:aws:lambda:", + { + "Ref": "AWS::Region", + }, + ":464622532012:layer:Datadog-Extension-ARM:x", + ], + ], + }, + ], "ReservedConcurrentExecutions": 20, "Role": { "Fn::GetAtt": [ @@ -893,6 +954,10 @@ exports[`returns expected CloudFormation stack for prod 1`] = ` "Key": "aws-codedeploy-hooks", "Value": "x.x.x-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", }, + { + "Key": "dd_cdk_construct", + "Value": "vx.x.x", + }, ], "Timeout": 30, }, @@ -1259,6 +1324,16 @@ exports[`returns expected CloudFormation stack for prod 1`] = ` "Properties": { "PolicyDocument": { "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue", + "secretsmanager:DescribeSecret", + ], + "Effect": "Allow", + "Resource": { + "Ref": "datadogapikeysecret046FEF06", + }, + }, { "Action": [ "sqs:ReceiveMessage", @@ -1320,59 +1395,6 @@ exports[`returns expected CloudFormation stack for prod 1`] = ` "Type": "AWS::SQS::Queue", "UpdateReplacePolicy": "Delete", }, - "workerqueuePolicy97054CB4": { - "Properties": { - "PolicyDocument": { - "Statement": [ - { - "Action": "sqs:SendMessage", - "Condition": { - "ArnEquals": { - "aws:SourceArn": { - "Ref": "sourcetopic7C3DC892", - }, - }, - }, - "Effect": "Allow", - "Principal": { - "Service": "sns.amazonaws.com", - }, - "Resource": { - "Fn::GetAtt": [ - "workerqueueA05CE5C6", - "Arn", - ], - }, - }, - ], - "Version": "2012-10-17", - }, - "Queues": [ - { - "Ref": "workerqueueA05CE5C6", - }, - ], - }, - "Type": "AWS::SQS::QueuePolicy", - }, - "workerqueueappStacksourcetopic613C6BDBD2F224F5": { - "DependsOn": [ - "workerqueuePolicy97054CB4", - ], - "Properties": { - "Endpoint": { - "Fn::GetAtt": [ - "workerqueueA05CE5C6", - "Arn", - ], - }, - "Protocol": "sqs", - "TopicArn": { - "Ref": "sourcetopic7C3DC892", - }, - }, - "Type": "AWS::SNS::Subscription", - }, "workerqueuedeadletters83F3505C": { "DeletionPolicy": "Delete", "Properties": { diff --git a/template/lambda-sqs-worker-cdk/infra/appStack.test.ts b/template/lambda-sqs-worker-cdk/infra/appStack.test.ts index cdd0bb5f1..3bedca236 100644 --- a/template/lambda-sqs-worker-cdk/infra/appStack.test.ts +++ b/template/lambda-sqs-worker-cdk/infra/appStack.test.ts @@ -1,4 +1,4 @@ -import { App, aws_sns } from 'aws-cdk-lib'; +import { App, aws_secretsmanager, aws_sns } from 'aws-cdk-lib'; import { Template } from 'aws-cdk-lib/assertions'; const currentDate = '1212-12-12T12:12:12.121Z'; @@ -36,6 +36,12 @@ it.each(['dev', 'prod'])( .spyOn(aws_sns.Topic, 'fromTopicArn') .mockImplementation((scope, id) => new aws_sns.Topic(scope, id)); + jest + .spyOn(aws_secretsmanager.Secret, 'fromSecretPartialArn') + .mockImplementation( + (scope, id) => new aws_secretsmanager.Secret(scope, id), + ); + const app = new App(); const stack = new AppStack(app, 'appStack'); @@ -47,13 +53,23 @@ it.each(['dev', 'prod'])( /"S3Key":"([0-9a-f]+)\.zip"/g, (_, hash) => `"S3Key":"${'x'.repeat(hash.length)}.zip"`, ) - .replaceAll( + .replace( /workerCurrentVersion([0-9a-zA-Z]+)"/g, (_, hash) => `workerCurrentVersion${'x'.repeat(hash.length)}"`, ) .replaceAll( /"Value":"\d+\.\d+\.\d+-([^"]+)"/g, (_, hash) => `"Value": "x.x.x-${'x'.repeat(hash.length)}"`, + ) + .replaceAll(/"Value":"v\d+\.\d+\.\d+"/g, (_) => `"Value": "vx.x.x"`) + .replace( + /"DD_TAGS":"git.commit.sha:([0-9a-f]+),git.repository_url:([^\"]+)"/g, + (_, sha, url) => + `"DD_TAGS":"git.commit.sha:${'x'.repeat(sha.length)},git.repository_url:${'x'.repeat(url.length)}"`, + ) + .replaceAll( + /(layer:Datadog-Extension-.+?:)\d+/g, + (_, layer) => `${layer}x`, ); expect(JSON.parse(json)).toMatchSnapshot(); }, diff --git a/template/lambda-sqs-worker-cdk/infra/appStack.ts b/template/lambda-sqs-worker-cdk/infra/appStack.ts index 8602bc7a0..7d6ef1bf9 100644 --- a/template/lambda-sqs-worker-cdk/infra/appStack.ts +++ b/template/lambda-sqs-worker-cdk/infra/appStack.ts @@ -8,14 +8,18 @@ import { aws_lambda, aws_lambda_event_sources, aws_lambda_nodejs, + aws_secretsmanager, aws_sns, - aws_sns_subscriptions, aws_sqs, } from 'aws-cdk-lib'; import type { Construct } from 'constructs'; +import { Datadog } from 'datadog-cdk-constructs-v2'; import { config } from './config'; +// Updated by https://github.com/seek-oss/rynovate +const DATADOG_EXTENSION_LAYER_VERSION = 64; + export class AppStack extends Stack { constructor(scope: Construct, id: string, props?: StackProps) { super(scope, id, props); @@ -49,47 +53,54 @@ export class AppStack extends Stack { encryptionMasterKey: kmsKey, }); - const topic = aws_sns.Topic.fromTopicArn( + // const topic = aws_sns.Topic.fromTopicArn( + // this, + // 'source-topic', + // config.sourceSnsTopicArn, + // ); + + // topic.addSubscription( + // new aws_sns_subscriptions.SqsSubscription(queue, { + // rawMessageDelivery: true, // Remove this property if you require end to end datadog tracing + // }), + // ); + + const snsKey = aws_kms.Alias.fromAliasName( this, - 'source-topic', - config.sourceSnsTopicArn, + 'alias-aws-sns', + 'alias/aws/sns', ); - topic.addSubscription(new aws_sns_subscriptions.SqsSubscription(queue)); + const destinationTopic = new aws_sns.Topic(this, 'destination-topic', { + masterKey: snsKey, + topicName: '<%- serviceName %>', + }); const architecture = '<%- lambdaCdkArchitecture %>'; - const defaultWorkerConfig: aws_lambda_nodejs.NodejsFunctionProps = { + const worker = new aws_lambda_nodejs.NodejsFunction(this, 'worker', { architecture: aws_lambda.Architecture[architecture], runtime: aws_lambda.Runtime.NODEJS_20_X, environmentEncryption: kmsKey, // aws-sdk-v3 sets this to true by default, so it is not necessary to set the environment variable // https://docs.aws.amazon.com/sdk-for-javascript/v3/developer-guide/node-reusing-connections.html awsSdkConnectionReuse: false, - }; - - const defaultWorkerBundlingConfig: aws_lambda_nodejs.BundlingOptions = { - sourceMap: true, - target: 'node20', - // aws-sdk-v3 is set as an external module by default, but we want it to be bundled with the function - externalModules: [], - }; - - const defaultWorkerEnvironment: Record = { - NODE_ENV: 'production', - // https://nodejs.org/api/cli.html#cli_node_options_options - NODE_OPTIONS: '--enable-source-maps', - }; - - const worker = new aws_lambda_nodejs.NodejsFunction(this, 'worker', { - ...defaultWorkerConfig, entry: './src/app.ts', timeout: Duration.seconds(30), - bundling: defaultWorkerBundlingConfig, + bundling: { + sourceMap: true, + target: 'node20', + // aws-sdk-v3 is set as an external module by default, but we want it to be bundled with the function + externalModules: [], + nodeModules: ['datadog-lambda-js', 'dd-trace'], + }, functionName: '<%- serviceName %>', environment: { - ...defaultWorkerEnvironment, ...config.workerLambda.environment, + NODE_ENV: 'production', + // https://nodejs.org/api/cli.html#cli_node_options_options + NODE_OPTIONS: '--enable-source-maps', + DESTINATION_SNS_TOPIC_ARN: destinationTopic.topicArn, }, // https://github.com/aws/aws-cdk/issues/28237 // This forces the lambda to be updated on every deployment @@ -98,6 +109,22 @@ export class AppStack extends Stack { reservedConcurrentExecutions: config.workerLambda.reservedConcurrency, }); + const datadogSecret = aws_secretsmanager.Secret.fromSecretPartialArn( + this, + 'datadog-api-key-secret', + config.datadogApiKeySecretArn, + ); + + const datadog = new Datadog(this, 'datadog', { + apiKeySecret: datadogSecret, + addLayers: false, + enableDatadogLogs: false, + flushMetricsToLogs: false, + extensionLayerVersion: DATADOG_EXTENSION_LAYER_VERSION, + }); + + datadog.addLambdaFunctions([worker]); + const workerDeployment = new LambdaDeployment(this, 'workerDeployment', { lambdaFunction: worker, }); diff --git a/template/lambda-sqs-worker-cdk/infra/config.ts b/template/lambda-sqs-worker-cdk/infra/config.ts index 73f7dabe7..b178233bb 100644 --- a/template/lambda-sqs-worker-cdk/infra/config.ts +++ b/template/lambda-sqs-worker-cdk/infra/config.ts @@ -16,6 +16,7 @@ interface Config { VERSION: string; }; }; + datadogApiKeySecretArn: string; sourceSnsTopicArn: string; } @@ -30,6 +31,7 @@ const configs: Record = { VERSION: Env.string('VERSION', { default: 'local' }), }, }, + datadogApiKeySecretArn: 'TODO: datadogApiKeySecretArn', sourceSnsTopicArn: 'TODO: sourceSnsTopicArn', }, prod: { @@ -42,6 +44,7 @@ const configs: Record = { VERSION: Env.string('VERSION', { default: 'local' }), }, }, + datadogApiKeySecretArn: 'TODO: datadogApiKeySecretArn', sourceSnsTopicArn: 'TODO: sourceSnsTopicArn', }, }; diff --git a/template/lambda-sqs-worker-cdk/package.json b/template/lambda-sqs-worker-cdk/package.json index cf2f47327..05ea983f0 100644 --- a/template/lambda-sqs-worker-cdk/package.json +++ b/template/lambda-sqs-worker-cdk/package.json @@ -17,16 +17,23 @@ "@aws-sdk/client-lambda": "^3.363.0", "@aws-sdk/client-sns": "^3.363.0", "@seek/logger": "^9.0.0", + "datadog-lambda-js": "^9.0.0", + "dd-trace": "^5.0.0", "skuba-dive": "^2.0.0", "zod": "^3.19.1" }, "devDependencies": { "@seek/aws-codedeploy-infra": "^2.1.0", "@types/aws-lambda": "^8.10.82", + "@types/chance": "^1.1.3", "@types/node": "^20.16.5", "aws-cdk": "^2.109.0", "aws-cdk-lib": "^2.109.0", + "aws-sdk-client-mock": "^4.0.0", + "aws-sdk-client-mock-jest": "^4.0.0", + "chance": "^1.1.8", "constructs": "^10.0.17", + "datadog-cdk-constructs-v2": "^1.18.0", "pino-pretty": "^11.0.0", "skuba": "*" }, diff --git a/template/lambda-sqs-worker-cdk/src/app.test.ts b/template/lambda-sqs-worker-cdk/src/app.test.ts new file mode 100644 index 000000000..869bc943e --- /dev/null +++ b/template/lambda-sqs-worker-cdk/src/app.test.ts @@ -0,0 +1,116 @@ +import { PublishCommand } from '@aws-sdk/client-sns'; + +import { metricsClient } from 'src/framework/metrics'; +import { createCtx, createSqsEvent } from 'src/testing/handler'; +import { logger } from 'src/testing/logging'; +import { scoringService, sns } from 'src/testing/services'; +import { chance, mockJobPublishedEvent } from 'src/testing/types'; + +import * as app from './app'; + +describe('app', () => { + it('exports a handler', () => expect(app).toHaveProperty('handler')); +}); + +describe('handler', () => { + const ctx = createCtx(); + + const jobPublished = mockJobPublishedEvent({ entityId: chance.name() }); + + const score = chance.floating({ max: 1, min: 0 }); + + const distribution = jest + .spyOn(metricsClient, 'distribution') + .mockReturnValue(); + + beforeAll(logger.spy); + beforeAll(scoringService.spy); + + beforeEach(() => { + scoringService.request.mockResolvedValue(score); + sns.publish.resolves({ MessageId: chance.guid({ version: 4 }) }); + }); + + afterEach(() => { + logger.clear(); + distribution.mockClear(); + scoringService.clear(); + sns.clear(); + }); + + it('handles one record', async () => { + const event = createSqsEvent([JSON.stringify(jobPublished)]); + + await expect(app.handler(event, ctx)).resolves.toBeUndefined(); + + expect(scoringService.request).toHaveBeenCalledTimes(1); + + expect(logger.error).not.toHaveBeenCalled(); + + expect(logger.debug.mock.calls).toEqual([ + [{ count: 1 }, 'Received jobs'], + [{ snsMessageId: expect.any(String) }, 'Scored job'], + ['Function succeeded'], + ]); + + expect(distribution.mock.calls).toEqual([ + ['job.received', 1], + ['job.scored', 1], + ]); + + expect(sns.client).toReceiveCommandTimes(PublishCommand, 1); + }); + + it('throws on invalid input', () => { + const event = createSqsEvent(['}']); + + return expect(app.handler(event, ctx)).rejects.toThrow('Function failed'); + }); + + it('bubbles up scoring service error', async () => { + const err = Error(chance.sentence()); + + scoringService.request.mockRejectedValue(err); + + const event = createSqsEvent([JSON.stringify(jobPublished)]); + + await expect(app.handler(event, ctx)).rejects.toThrow('Function failed'); + + expect(logger.error).toHaveBeenCalledWith({ err }, 'Function failed'); + }); + + it('bubbles up SNS error', async () => { + const err = Error(chance.sentence()); + + sns.publish.rejects(err); + + const event = createSqsEvent([JSON.stringify(jobPublished)]); + + await expect(app.handler(event, ctx)).rejects.toThrow('Function failed'); + + expect(logger.error).toHaveBeenCalledWith({ err }, 'Function failed'); + }); + + it('throws on zero records', async () => { + const err = new Error('Received 0 records'); + + const event = createSqsEvent([]); + + await expect(app.handler(event, ctx)).rejects.toThrow('Function failed'); + + expect(logger.error).toHaveBeenCalledWith({ err }, 'Function failed'); + }); + + it('throws on multiple records', async () => { + const err = new Error('Received 2 records'); + + const event = createSqsEvent([ + JSON.stringify(jobPublished), + JSON.stringify(jobPublished), + ]); + + await expect(app.handler(event, ctx)).rejects.toThrow('Function failed'); + + expect(logger.error).toHaveBeenCalledWith({ err }, 'Function failed'); + }); +}); diff --git a/template/lambda-sqs-worker-cdk/src/app.ts b/template/lambda-sqs-worker-cdk/src/app.ts index 3f67e6827..bc0f7a8f7 100644 --- a/template/lambda-sqs-worker-cdk/src/app.ts +++ b/template/lambda-sqs-worker-cdk/src/app.ts @@ -1,35 +1,57 @@ -import createLogger from '@seek/logger'; -import type { SQSEvent, SQSHandler } from 'aws-lambda'; +import 'skuba-dive/register'; -import { config } from './config'; +import type { SQSEvent } from 'aws-lambda'; -export const logger = createLogger({ - base: { - environment: config.environment, - version: config.version, - }, - - level: config.logLevel, - - name: config.name, - - transport: - config.environment === 'local' ? { target: 'pino-pretty' } : undefined, -}); +import { createHandler } from 'src/framework/handler'; +import { logger } from 'src/framework/logging'; +import { metricsClient } from 'src/framework/metrics'; +import { validateJson } from 'src/framework/validation'; +import { scoreJobPublishedEvent, scoringService } from 'src/services/jobScorer'; +import { sendPipelineEvent } from 'src/services/pipelineEventSender'; +import { JobPublishedEventSchema } from 'src/types/pipelineEvents'; /** * Tests connectivity to ensure appropriate access and network configuration. */ -const smokeTest = async () => Promise.resolve(); +const smokeTest = async () => { + await Promise.all([scoringService.smokeTest(), sendPipelineEvent({}, true)]); +}; -export const handler: SQSHandler = (event: SQSEvent) => { +export const handler = createHandler(async (event) => { // Treat an empty object as our smoke test event. if (!Object.keys(event).length) { logger.debug('Received smoke test request'); return smokeTest(); } - logger.info('Hello World!'); + const count = event.Records.length; - return; -}; + if (count !== 1) { + throw Error(`Received ${count} records`); + } + + logger.debug({ count }, 'Received jobs'); + + metricsClient.distribution('job.received', event.Records.length); + + const record = event.Records[0]; + if (!record) { + throw new Error('Malformed SQS event with no records'); + } + + const { body } = record; + + // TODO: this throws an error, which will cause the Lambda function to retry + // the event and eventually send it to your dead-letter queue. If you don't + // trust your source to provide consistently well-formed input, consider + // catching and handling this error in code. + const publishedJob = validateJson(body, JobPublishedEventSchema); + + const scoredJob = await scoreJobPublishedEvent(publishedJob); + + const snsMessageId = await sendPipelineEvent(scoredJob); + + logger.debug({ snsMessageId }, 'Scored job'); + + metricsClient.distribution('job.scored', 1); +}); diff --git a/template/lambda-sqs-worker-cdk/src/config.ts b/template/lambda-sqs-worker-cdk/src/config.ts index 946645108..34cc9ffc2 100644 --- a/template/lambda-sqs-worker-cdk/src/config.ts +++ b/template/lambda-sqs-worker-cdk/src/config.ts @@ -4,8 +4,11 @@ interface Config { environment: Environment; logLevel: string; + metrics: boolean; name: string; version: string; + + destinationSnsTopicArn: string; } type Environment = (typeof environments)[number]; @@ -18,26 +21,38 @@ const environment = Env.oneOf(environments)('ENVIRONMENT'); const configs: Record Omit> = { local: () => ({ logLevel: 'debug', + metrics: false, name: '<%- serviceName %>', version: 'local', + + destinationSnsTopicArn: 'arn:aws:sns:us-east-2:123456789012:destination', }), test: () => ({ logLevel: Env.string('LOG_LEVEL', { default: 'silent' }), + metrics: false, name: '<%- serviceName %>', version: 'test', + + destinationSnsTopicArn: 'arn:aws:sns:us-east-2:123456789012:destination', }), dev: () => ({ logLevel: 'debug', + metrics: true, name: Env.string('SERVICE'), version: Env.string('VERSION'), + + destinationSnsTopicArn: Env.string('DESTINATION_SNS_TOPIC_ARN'), }), prod: () => ({ logLevel: 'info', + metrics: true, name: Env.string('SERVICE'), version: Env.string('VERSION'), + + destinationSnsTopicArn: Env.string('DESTINATION_SNS_TOPIC_ARN'), }), }; diff --git a/template/lambda-sqs-worker-cdk/src/framework/handler.test.ts b/template/lambda-sqs-worker-cdk/src/framework/handler.test.ts new file mode 100644 index 000000000..629829334 --- /dev/null +++ b/template/lambda-sqs-worker-cdk/src/framework/handler.test.ts @@ -0,0 +1,61 @@ +import { createCtx } from 'src/testing/handler'; +import { logger } from 'src/testing/logging'; +import { chance } from 'src/testing/types'; + +import { createHandler } from './handler'; + +describe('createHandler', () => { + const ctx = createCtx(); + const input = chance.paragraph(); + + beforeAll(logger.spy); + + afterEach(logger.clear); + + it('handles happy path', async () => { + const output = chance.paragraph(); + + const handler = createHandler((event) => { + expect(event).toBe(input); + + logger.debug('Handler invoked'); + + return Promise.resolve(output); + }); + + await expect(handler(input, ctx)).resolves.toBe(output); + + expect(logger.error).not.toHaveBeenCalled(); + + expect(logger.debug.mock.calls).toEqual([ + ['Handler invoked'], + ['Function succeeded'], + ]); + }); + + it('handles async error', async () => { + const err = Error(chance.sentence()); + + const handler = createHandler(() => Promise.reject(err)); + + await expect(handler(input, ctx)).rejects.toThrow('Function failed'); + + expect(logger.error).toHaveBeenCalledWith({ err }, 'Function failed'); + + expect(logger.debug).not.toHaveBeenCalled(); + }); + + it('handles sync error', async () => { + const err = Error(chance.sentence()); + + const handler = createHandler(() => { + throw err; + }); + + await expect(handler(input, ctx)).rejects.toThrow('Function failed'); + + expect(logger.error).toHaveBeenCalledWith({ err }, 'Function failed'); + + expect(logger.debug).not.toHaveBeenCalled(); + }); +}); diff --git a/template/lambda-sqs-worker-cdk/src/framework/handler.ts b/template/lambda-sqs-worker-cdk/src/framework/handler.ts new file mode 100644 index 000000000..19a458c4e --- /dev/null +++ b/template/lambda-sqs-worker-cdk/src/framework/handler.ts @@ -0,0 +1,43 @@ +import { datadog } from 'datadog-lambda-js'; + +import { config } from 'src/config'; +import { logger, loggerContext } from 'src/framework/logging'; + +interface LambdaContext { + awsRequestId: string; +} + +type Handler = ( + event: Event, + ctx: LambdaContext, +) => Promise; + +/** + * Conditionally applies the Datadog wrapper to a Lambda handler. + * + * This also "fixes" its broken type definitions. + */ +const withDatadog = ( + fn: Handler, +): Handler => + // istanbul ignore next + config.metrics ? (datadog(fn) as Handler) : fn; + +export const createHandler = ( + fn: (event: Event) => Promise, +) => + withDatadog((event, { awsRequestId }) => + loggerContext.run({ awsRequestId }, async () => { + try { + const output = await fn(event); + + logger.debug('Function succeeded'); + + return output; + } catch (err) { + logger.error({ err }, 'Function failed'); + + throw new Error('Function failed'); + } + }), + ); diff --git a/template/lambda-sqs-worker-cdk/src/framework/logging.ts b/template/lambda-sqs-worker-cdk/src/framework/logging.ts new file mode 100644 index 000000000..c4641a1cf --- /dev/null +++ b/template/lambda-sqs-worker-cdk/src/framework/logging.ts @@ -0,0 +1,27 @@ +import { AsyncLocalStorage } from 'async_hooks'; + +import createLogger from '@seek/logger'; + +import { config } from 'src/config'; + +interface LoggerContext { + awsRequestId: string; +} + +export const loggerContext = new AsyncLocalStorage(); + +export const logger = createLogger({ + base: { + environment: config.environment, + version: config.version, + }, + + level: config.logLevel, + + mixin: () => ({ ...loggerContext.getStore() }), + + name: config.name, + + transport: + config.environment === 'local' ? { target: 'pino-pretty' } : undefined, +}); diff --git a/template/lambda-sqs-worker-cdk/src/framework/metrics.ts b/template/lambda-sqs-worker-cdk/src/framework/metrics.ts new file mode 100644 index 000000000..5c564cd25 --- /dev/null +++ b/template/lambda-sqs-worker-cdk/src/framework/metrics.ts @@ -0,0 +1,14 @@ +import { sendDistributionMetric } from 'datadog-lambda-js'; + +import { config } from 'src/config'; + +const prefix = `${config.name}.`; + +export const metricsClient = { + distribution: ( + ...[name, ...rest]: Parameters + ) => + config.metrics + ? sendDistributionMetric(`${prefix}${name}`, ...rest) + : undefined, +}; diff --git a/template/lambda-sqs-worker-cdk/src/framework/validation.test.ts b/template/lambda-sqs-worker-cdk/src/framework/validation.test.ts new file mode 100644 index 000000000..4b9c7bf74 --- /dev/null +++ b/template/lambda-sqs-worker-cdk/src/framework/validation.test.ts @@ -0,0 +1,84 @@ +import { + IdDescriptionSchema, + chance, + mockIdDescription, +} from 'src/testing/types'; + +import { validateJson } from './validation'; + +describe('validateJson', () => { + const idDescription = mockIdDescription(); + + it('permits valid input', () => { + const input = JSON.stringify(idDescription); + + expect(validateJson(input, IdDescriptionSchema)).toStrictEqual( + idDescription, + ); + }); + + it('filters additional properties', () => { + const input = JSON.stringify({ ...idDescription, hacker: chance.name() }); + + expect(validateJson(input, IdDescriptionSchema)).toStrictEqual( + idDescription, + ); + }); + + it('blocks mistyped prop', () => { + const input = JSON.stringify({ ...idDescription, id: null }); + + expect(() => validateJson(input, IdDescriptionSchema)) + .toThrowErrorMatchingInlineSnapshot(` + "[ + { + "code": "invalid_type", + "expected": "string", + "received": "null", + "path": [ + "id" + ], + "message": "Expected string, received null" + } + ]" + `); + }); + + it('blocks missing prop', () => { + const input = '{}'; + + expect(() => validateJson(input, IdDescriptionSchema)) + .toThrowErrorMatchingInlineSnapshot(` + "[ + { + "code": "invalid_type", + "expected": "string", + "received": "undefined", + "path": [ + "id" + ], + "message": "Required" + }, + { + "code": "invalid_type", + "expected": "string", + "received": "undefined", + "path": [ + "description" + ], + "message": "Required" + } + ]" + `); + }); + + it('blocks invalid JSON', () => { + const input = '}'; + + expect(() => + validateJson(input, IdDescriptionSchema), + ).toThrowErrorMatchingInlineSnapshot( + `"Unexpected token '}', "}" is not valid JSON"`, + ); + }); +}); diff --git a/template/lambda-sqs-worker-cdk/src/framework/validation.ts b/template/lambda-sqs-worker-cdk/src/framework/validation.ts new file mode 100644 index 000000000..27b9058f9 --- /dev/null +++ b/template/lambda-sqs-worker-cdk/src/framework/validation.ts @@ -0,0 +1,10 @@ +import type { z } from 'zod'; + +export const validateJson = < + Output, + Def extends z.ZodTypeDef = z.ZodTypeDef, + Input = Output, +>( + input: string, + schema: z.ZodSchema, +): Output => schema.parse(JSON.parse(input)); diff --git a/template/lambda-sqs-worker-cdk/src/mapping/jobScorer.ts b/template/lambda-sqs-worker-cdk/src/mapping/jobScorer.ts new file mode 100644 index 000000000..e92c486e6 --- /dev/null +++ b/template/lambda-sqs-worker-cdk/src/mapping/jobScorer.ts @@ -0,0 +1,22 @@ +import type { JobScorerInput, JobScorerOutput } from 'src/types/jobScorer'; +import type { + JobPublishedEvent, + JobScoredEvent, +} from 'src/types/pipelineEvents'; + +export const jobPublishedEventToScorerInput = ( + record: JobPublishedEvent, +): JobScorerInput => ({ + details: record.data.details, + id: record.entityId, +}); + +export const jobScorerOutputToScoredEvent = ( + output: JobScorerOutput, +): JobScoredEvent => ({ + data: { + score: output.score, + }, + entityId: output.id, + eventType: 'JobScored', +}); diff --git a/template/lambda-sqs-worker-cdk/src/services/aws.ts b/template/lambda-sqs-worker-cdk/src/services/aws.ts new file mode 100644 index 000000000..44d73728b --- /dev/null +++ b/template/lambda-sqs-worker-cdk/src/services/aws.ts @@ -0,0 +1,5 @@ +import { SNSClient } from '@aws-sdk/client-sns'; + +export const sns = new SNSClient({ + apiVersion: '2010-03-31', +}); diff --git a/template/lambda-sqs-worker-cdk/src/services/jobScorer.test.ts b/template/lambda-sqs-worker-cdk/src/services/jobScorer.test.ts new file mode 100644 index 000000000..08363f0a2 --- /dev/null +++ b/template/lambda-sqs-worker-cdk/src/services/jobScorer.test.ts @@ -0,0 +1,44 @@ +import { scoringService } from 'src/testing/services'; +import { chance, mockJobPublishedEvent } from 'src/testing/types'; + +import * as jobScorer from './jobScorer'; + +describe('scoreJobPublishedEvent', () => { + beforeAll(scoringService.spy); + + afterEach(scoringService.clear); + + it('scores an event', async () => { + const score = chance.floating({ max: 1, min: 0 }); + + scoringService.request.mockResolvedValue(score); + + await expect( + jobScorer.scoreJobPublishedEvent( + mockJobPublishedEvent({ entityId: '1' }), + ), + ).resolves.toStrictEqual({ + data: { + score, + }, + entityId: '1', + eventType: 'JobScored', + }); + + expect(scoringService.request).toHaveBeenCalledTimes(1); + }); + + it('bubbles up scoring service error', async () => { + const err = Error(chance.sentence()); + + scoringService.request.mockRejectedValue(err); + + await expect( + jobScorer.scoreJobPublishedEvent( + mockJobPublishedEvent({ entityId: '1' }), + ), + ).rejects.toThrow(err); + + expect(scoringService.request).toHaveBeenCalledTimes(1); + }); +}); diff --git a/template/lambda-sqs-worker-cdk/src/services/jobScorer.ts b/template/lambda-sqs-worker-cdk/src/services/jobScorer.ts new file mode 100644 index 000000000..0e5fc176b --- /dev/null +++ b/template/lambda-sqs-worker-cdk/src/services/jobScorer.ts @@ -0,0 +1,59 @@ +import { + jobPublishedEventToScorerInput, + jobScorerOutputToScoredEvent, +} from 'src/mapping/jobScorer'; +import { + type JobScorerInput, + type JobScorerOutput, + JobScorerOutputSchema, +} from 'src/types/jobScorer'; +import type { + JobPublishedEvent, + JobScoredEvent, +} from 'src/types/pipelineEvents'; + +/* istanbul ignore next: simulation of an external service */ +export const scoringService = { + request: (details: string): Promise => { + // Networking woes + if (Math.random() < 0.05) { + const err = Error('could not reach scoring service'); + + return Promise.reject(err); + } + + // Unexpected behaviour on certain inputs + if (details.length % 100 === 0) { + return Promise.resolve(null); + } + + return Promise.resolve(Math.random()); + }, + + smokeTest: async (): Promise => { + // A connectivity test + await Promise.resolve(); + }, +}; + +const scoreJob = async ({ + details, + id, +}: JobScorerInput): Promise => { + const score = await scoringService.request(details); + + return JobScorerOutputSchema.parse({ + id, + score, + }); +}; + +export const scoreJobPublishedEvent = async ( + publishedJob: JobPublishedEvent, +): Promise => { + const scorerInput = jobPublishedEventToScorerInput(publishedJob); + + const scorerOutput = await scoreJob(scorerInput); + + return jobScorerOutputToScoredEvent(scorerOutput); +}; diff --git a/template/lambda-sqs-worker-cdk/src/services/pipelineEventSender.test.ts b/template/lambda-sqs-worker-cdk/src/services/pipelineEventSender.test.ts new file mode 100644 index 000000000..761fb5c51 --- /dev/null +++ b/template/lambda-sqs-worker-cdk/src/services/pipelineEventSender.test.ts @@ -0,0 +1,40 @@ +import { PublishCommand } from '@aws-sdk/client-sns'; + +import { sns } from 'src/testing/services'; +import { chance } from 'src/testing/types'; + +import { sendPipelineEvent } from './pipelineEventSender'; + +describe('sendPipelineEvent', () => { + afterEach(() => { + jest.clearAllMocks(); + }); + + it('handles happy path', async () => { + const messageId = chance.guid({ version: 4 }); + + sns.publish.resolves({ MessageId: messageId }); + + await expect(sendPipelineEvent({})).resolves.toBe(messageId); + + expect(sns.client).toReceiveCommandTimes(PublishCommand, 1); + }); + + it('bubbles up SNS error', () => { + const err = Error(chance.sentence()); + + sns.publish.rejects(err); + + return expect(sendPipelineEvent({})).rejects.toThrow(err); + }); + + it('throws on missing message ID', () => { + sns.publish.resolves({}); + + return expect( + sendPipelineEvent({}), + ).rejects.toThrowErrorMatchingInlineSnapshot( + `"SNS did not return a message ID"`, + ); + }); +}); diff --git a/template/lambda-sqs-worker-cdk/src/services/pipelineEventSender.ts b/template/lambda-sqs-worker-cdk/src/services/pipelineEventSender.ts new file mode 100644 index 000000000..95683d45d --- /dev/null +++ b/template/lambda-sqs-worker-cdk/src/services/pipelineEventSender.ts @@ -0,0 +1,33 @@ +import { PublishCommand } from '@aws-sdk/client-sns'; + +import { config } from 'src/config'; + +import { sns } from './aws'; + +export const sendPipelineEvent = async ( + event: unknown, + smokeTest: boolean = false, +): Promise => { + const snsResponse = await sns.send( + new PublishCommand({ + Message: JSON.stringify(event), + ...(smokeTest && { + MessageAttributes: { + // Used for connectivity tests. + // Subscribers should filter out messages containing this attribute. + SmokeTest: { + DataType: 'String', + StringValue: 'true', + }, + }, + }), + TopicArn: config.destinationSnsTopicArn, + }), + ); + + if (snsResponse.MessageId === undefined) { + throw Error('SNS did not return a message ID'); + } + + return snsResponse.MessageId; +}; diff --git a/template/lambda-sqs-worker-cdk/src/testing/handler.ts b/template/lambda-sqs-worker-cdk/src/testing/handler.ts new file mode 100644 index 000000000..a7dd9c99d --- /dev/null +++ b/template/lambda-sqs-worker-cdk/src/testing/handler.ts @@ -0,0 +1,13 @@ +import type { Context, SQSEvent } from 'aws-lambda'; + +import { chance } from './types'; + +export const createCtx = () => + ({ + awsRequestId: chance.guid({ version: 4 }), + }) as Context; + +export const createSqsEvent = (bodies: string[]) => + ({ + Records: bodies.map((body) => ({ body })), + }) as SQSEvent; diff --git a/template/lambda-sqs-worker-cdk/src/testing/logging.ts b/template/lambda-sqs-worker-cdk/src/testing/logging.ts new file mode 100644 index 000000000..87d8dffa9 --- /dev/null +++ b/template/lambda-sqs-worker-cdk/src/testing/logging.ts @@ -0,0 +1,19 @@ +import * as logging from 'src/framework/logging'; + +export const logger = { + error: jest.fn(), + info: jest.fn(), + debug: jest.fn(), + + clear: () => { + logger.error.mockClear(); + logger.info.mockClear(); + logger.debug.mockClear(); + }, + + spy: () => { + jest.spyOn(logging.logger, 'error').mockImplementation(logger.error); + jest.spyOn(logging.logger, 'info').mockImplementation(logger.info); + jest.spyOn(logging.logger, 'debug').mockImplementation(logger.debug); + }, +}; diff --git a/template/lambda-sqs-worker-cdk/src/testing/services.ts b/template/lambda-sqs-worker-cdk/src/testing/services.ts new file mode 100644 index 000000000..f87ef47f4 --- /dev/null +++ b/template/lambda-sqs-worker-cdk/src/testing/services.ts @@ -0,0 +1,28 @@ +import 'aws-sdk-client-mock-jest'; + +import { PublishCommand } from '@aws-sdk/client-sns'; +import { mockClient } from 'aws-sdk-client-mock'; + +import { sns as snsClient } from 'src/services/aws'; +import * as jobScorer from 'src/services/jobScorer'; + +export const scoringService = { + request: jest.fn(), + + clear: () => scoringService.request.mockClear(), + + spy: () => + jest + .spyOn(jobScorer.scoringService, 'request') + .mockImplementation(scoringService.request), +}; + +const snsMock = mockClient(snsClient); + +export const sns = { + publish: snsMock.on(PublishCommand), + + clear: () => snsMock.resetHistory(), + + client: snsMock, +}; diff --git a/template/lambda-sqs-worker-cdk/src/testing/types.ts b/template/lambda-sqs-worker-cdk/src/testing/types.ts new file mode 100644 index 000000000..1dcf36906 --- /dev/null +++ b/template/lambda-sqs-worker-cdk/src/testing/types.ts @@ -0,0 +1,33 @@ +import { Chance } from 'chance'; +import { z } from 'zod'; + +import type { JobPublishedEvent } from 'src/types/pipelineEvents'; + +export type IdDescription = z.infer; + +export const IdDescriptionSchema = z.object({ + id: z.string(), + description: z.string(), +}); + +export const chance = new Chance(); + +export const mockIdDescription = (): IdDescription => ({ + id: chance.guid({ version: 4 }), + description: chance.sentence(), +}); + +export const mockIdDescriptionJson = (): string => + JSON.stringify(mockIdDescription()); + +export const mockJobPublishedEvent = ({ + entityId, +}: { + entityId: string; +}): JobPublishedEvent => ({ + data: { + details: chance.paragraph(), + }, + entityId, + eventType: 'JobPublished', +}); diff --git a/template/lambda-sqs-worker-cdk/src/types/jobScorer.ts b/template/lambda-sqs-worker-cdk/src/types/jobScorer.ts new file mode 100644 index 000000000..ee9d3d119 --- /dev/null +++ b/template/lambda-sqs-worker-cdk/src/types/jobScorer.ts @@ -0,0 +1,15 @@ +import { z } from 'zod'; + +export type JobScorerInput = z.infer; + +export const JobScorerInputSchema = z.object({ + id: z.string(), + details: z.string(), +}); + +export type JobScorerOutput = z.infer; + +export const JobScorerOutputSchema = z.object({ + id: z.string(), + score: z.number(), +}); diff --git a/template/lambda-sqs-worker-cdk/src/types/pipelineEvents.ts b/template/lambda-sqs-worker-cdk/src/types/pipelineEvents.ts new file mode 100644 index 000000000..69101aad4 --- /dev/null +++ b/template/lambda-sqs-worker-cdk/src/types/pipelineEvents.ts @@ -0,0 +1,21 @@ +import { z } from 'zod'; + +export type JobPublishedEvent = z.infer; + +export const JobPublishedEventSchema = z.object({ + data: z.object({ + details: z.string(), + }), + entityId: z.string(), + eventType: z.literal('JobPublished'), +}); + +export type JobScoredEvent = z.infer; + +export const JobScoredEventSchema = z.object({ + data: z.object({ + score: z.number(), + }), + entityId: z.string(), + eventType: z.literal('JobScored'), +}); diff --git a/template/lambda-sqs-worker/serverless.yml b/template/lambda-sqs-worker/serverless.yml index 0b57fc07c..4e9020217 100644 --- a/template/lambda-sqs-worker/serverless.yml +++ b/template/lambda-sqs-worker/serverless.yml @@ -173,7 +173,7 @@ resources: # Properties: # Endpoint: !GetAtt MessageQueue.Arn # Protocol: sqs - # RawMessageDelivery: true + # RawMessageDelivery: true # Remove this property if you require end to end datadog tracing # TopicArn: 'TODO: sourceSnsTopicArn' DestinationTopic: