Skip to content
This repository has been archived by the owner on Apr 13, 2023. It is now read-only.

Commit

Permalink
feat: add subscriptionsMatcher Lambda (#559)
Browse files Browse the repository at this point in the history
Also fixed a bunch of small things on the reaper lambda

Co-authored-by: zheyanyu <[email protected]>
  • Loading branch information
carvantes and zheyanyu authored Feb 17, 2022
1 parent 56cf3f8 commit 1ef9172
Show file tree
Hide file tree
Showing 14 changed files with 333 additions and 112 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ dist
.idea
yarn-error.log


auditLogMover/.serverless
auditLogMover/node_modules

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"fhir-works-on-aws-search-es": "3.9.2",
"lodash": "^4.17.21",
"serverless-http": "^2.7.0",
"tslib": "^2.3.1",
"yargs": "^16.2.0"
},
"devDependencies": {
Expand Down
119 changes: 113 additions & 6 deletions serverless.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,23 @@ functions:
maximumRetryAttempts: 3
startingPosition: LATEST

subscriptionsMatcher:
timeout: 60
runtime: nodejs14.x
description: 'Match ddb events against active Subscriptions and emit notifications'
role: SubscriptionsMatcherLambdaRole
handler: src/subscriptions/matcherLambda/index.handler
environment:
SUBSCRIPTIONS_TOPIC: !Ref SubscriptionsTopic
events:
- stream:
type: dynamodb
arn: !GetAtt ResourceDynamoDBTableV2.StreamArn
batchSize: 15 # Lambda payload size limit ~6MB; DDB row limit ~400KB = 15 items
maximumRetryAttempts: 3
startingPosition: LATEST
enabled: ${self:custom.enableSubscriptions} # will only run if opted into subscription feature

startExportJob:
timeout: 30
memorySize: 192
Expand Down Expand Up @@ -197,19 +214,18 @@ functions:
runtime: nodejs14.x
description: 'Scheduled Lambda to remove expired Subscriptions'
role: SubscriptionReaperRole
handler: src/subscriptions/index.reaperHandler
handler: src/subscriptions/reaperLambda/index.handler
events:
- schedule: rate(5 minutes)
- enable: ${self:custom.enableSubscriptions} # will only run if opted into subscription feature
environment:
ENABLE_MULTI_TENANCY: !Ref EnableMultiTenancy
- schedule:
rate: rate(5 minutes)
enabled: ${self:custom.enableSubscriptions} # will only run if opted into subscription feature

subscriptionsRestHook:
timeout: 20
runtime: nodejs14.x
description: 'Send rest-hook notification for subscription'
role: RestHookLambdaRole
handler: src/subscriptions/index.handler
handler: src/subscriptions/restHookLambda/index.handler
events:
- sqs:
arn:
Expand Down Expand Up @@ -727,6 +743,91 @@ resources:
Resource:
- !GetAtt DynamodbKMSKey.Arn
- !GetAtt ElasticSearchKMSKey.Arn

SubscriptionsMatcherLambdaRole:
Type: AWS::IAM::Role
Metadata:
cfn_nag:
rules_to_suppress:
- id: W11
reason: '* only applies to X-Ray statement which does not define a group or sampling-rule'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: 'Allow'
Principal:
Service: 'lambda.amazonaws.com'
Action: 'sts:AssumeRole'
Policies:
- PolicyName: 'SubscriptionsMatcherLambdaPolicy'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- logs:CreateLogStream
- logs:CreateLogGroup
- logs:PutLogEvents
Resource: !Sub 'arn:${AWS::Partition}:logs:${AWS::Region}:*:*'
- Effect: Allow
Action:
- dynamodb:GetShardIterator
- dynamodb:DescribeStream
- dynamodb:ListStreams
- dynamodb:GetRecords
Resource:
- !GetAtt ResourceDynamoDBTableV2.StreamArn
- Effect: Allow
Action:
- 'dynamodb:Query'
- 'dynamodb:Scan'
- 'dynamodb:GetItem'
Resource:
- !GetAtt ResourceDynamoDBTableV2.Arn
- Effect: Allow
Action:
- 'dynamodb:Query'
Resource:
- !Join [ '', [ !GetAtt ResourceDynamoDBTableV2.Arn, '/index/*' ] ]
- Effect: Allow
Action:
- xray:PutTraceSegments
- xray:PutTelemetryRecords
Resource:
- '*'
- PolicyName: 'KMSPolicy'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 'kms:Describe*'
- 'kms:Get*'
- 'kms:List*'
- 'kms:Encrypt'
- 'kms:Decrypt'
- 'kms:ReEncrypt*'
- 'kms:GenerateDataKey'
- 'kms:GenerateDataKeyWithoutPlaintext'
Resource:
- !GetAtt DynamodbKMSKey.Arn
- PolicyName: 'PublishToSNSPolicy'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 'kms:GenerateDataKey'
- 'kms:Decrypt'
Resource:
- !GetAtt SubscriptionsKey.Arn
- Effect: Allow
Action:
- 'sns:Publish'
Resource:
- !Ref SubscriptionsTopic

UpdateSearchMappingsLambdaRole:
Type: AWS::IAM::Role
Metadata:
Expand Down Expand Up @@ -804,6 +905,12 @@ resources:
Resource:
- !Join ['', [!GetAtt ResourceDynamoDBTableV2.Arn, '/index/*']]
- !GetAtt ResourceDynamoDBTableV2.Arn
- Effect: Allow
Action:
- logs:CreateLogStream
- logs:CreateLogGroup
- logs:PutLogEvents
Resource: !Sub 'arn:${AWS::Partition}:logs:${AWS::Region}:*:*'
- PolicyName: 'KMSPolicy'
PolicyDocument:
Version: '2012-10-17'
Expand Down
24 changes: 24 additions & 0 deletions src/subscriptions/matcherLambda/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*
*/

import { StreamSubscriptionMatcher } from 'fhir-works-on-aws-search-es';

import { DynamoDb, DynamoDbDataService } from 'fhir-works-on-aws-persistence-ddb';
import { fhirVersion } from '../../config';
import { loadImplementationGuides } from '../../implementationGuides/loadCompiledIGs';

const dynamoDbDataService = new DynamoDbDataService(DynamoDb);

const topicArn = process.env.SUBSCRIPTIONS_TOPIC as string;

const streamSubscriptionMatcher = new StreamSubscriptionMatcher(dynamoDbDataService, topicArn, {
fhirVersion,
compiledImplementationGuides: loadImplementationGuides('fhir-works-on-aws-search-es'),
});

exports.handler = async (event: any) => {
await streamSubscriptionMatcher.match(event);
};
6 changes: 6 additions & 0 deletions src/subscriptions/reaperLambda/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import reaperHandler from './subscriptionReaper';

/**
* Custom lambda handler that handles deleting expired subscriptions.
*/
exports.handler = reaperHandler;
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ const reaperHandler = async (event: any) => {
// filter out subscriptions without a defined end time.
// check if subscription is past its end date (ISO format)
// example format of subscriptions: https://www.hl7.org/fhir/subscription-example.json.html
console.log(subscriptions);
return Promise.all(
subscriptions
.filter((s: Record<string, any>) => {
if (!s.end) {
return false;
}
const date = new Date(s.end);
if (date.toString() === 'Invalid Date') {
console.log(`Skipping subscription ${s.id} since the end date is not in a valid format: ${s.end}`);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { SubscriptionEndpoint } from 'fhir-works-on-aws-routing/lib/router/validation/subscriptionValidator';
import { groupBy } from 'lodash';
import { makeLogger } from 'fhir-works-on-aws-interface';
import getAllowListedSubscriptionEndpoints from './allowList';
import getAllowListedSubscriptionEndpoints from '../allowList';

const SINGLE_TENANT_ALLOW_LIST_KEY = 'SINGLE_TENANT_ALLOW_LIST_KEY';
const logger = makeLogger({ component: 'subscriptions' });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*
*/

import reaperHandler from './subscriptionReaper';
import RestHookHandler from './restHook';
import { AllowListInfo, getAllowListInfo } from './allowListUtil';

Expand All @@ -19,8 +18,3 @@ const restHookHandler = new RestHookHandler({ enableMultitenancy });
exports.handler = async (event: any) => {
return restHookHandler.sendRestHookNotification(event, allowListPromise);
};

/**
* Custom lambda handler that handles deleting expired subscriptions.
*/
exports.reaperHandler = reaperHandler;
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { AllowListInfo, getAllowListInfo } from './allowListUtil';
jest.mock('axios');
// This mock only works on the file level for once
// Separating multi-tenant tests to a separate file to use other mock value
jest.mock('./allowList', () => ({
jest.mock('../allowList', () => ({
__esModule: true,
default: async () => [
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import axios from 'axios';
import { makeLogger } from 'fhir-works-on-aws-interface';
import { SQSEvent } from 'aws-lambda';
import { SubscriptionMatchMessage } from './types';
import ensureAsyncInit from '../index';
import { SubscriptionNotification } from 'fhir-works-on-aws-search-es';
import ensureAsyncInit from '../../index';
import { AllowListInfo, getAllowListHeaders } from './allowListUtil';

const logger = makeLogger({ component: 'subscriptions' });
Expand Down Expand Up @@ -44,7 +44,7 @@ export default class RestHookHandler {
const notificationPromises = event.Records.map((record: any) => {
const body = JSON.parse(record.body);
logger.debug(body);
const message: SubscriptionMatchMessage = JSON.parse(body.Message);
const message: SubscriptionNotification = JSON.parse(body.Message);
const { endpoint, channelHeader, channelPayload, matchedResource, tenantId } = message;
const allowListHeaders = getAllowListHeaders(allowList, endpoint, {
enableMultitenancy: this.enableMultitenancy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import RestHookHandler from './restHook';
import { AllowListInfo, getAllowListInfo } from './allowListUtil';

jest.mock('axios');
jest.mock('./allowList', () => ({
jest.mock('../allowList', () => ({
__esModule: true,
default: async () => [
{
Expand Down
14 changes: 0 additions & 14 deletions src/subscriptions/types.ts

This file was deleted.

Loading

0 comments on commit 1ef9172

Please sign in to comment.