From 04db2d4aa5c420cdc8d69522b1ab693e803a9b6b Mon Sep 17 00:00:00 2001 From: Stephen Hand Date: Thu, 7 Nov 2024 11:11:24 +0000 Subject: [PATCH] Unrevert sns publishing for merging back to master --- .../contact/contactsReindexService.ts | 2 +- .../jobs/search/publishToSearchIndex.ts | 119 ++++++++++++++++-- hrm-domain/hrm-core/package.json | 1 + packages/sns-client/index.ts | 18 ++- .../tests/fixtures/sampleResources.ts | 8 +- 5 files changed, 134 insertions(+), 14 deletions(-) diff --git a/hrm-domain/hrm-core/contact/contactsReindexService.ts b/hrm-domain/hrm-core/contact/contactsReindexService.ts index 491b6e85a..44f722ea4 100644 --- a/hrm-domain/hrm-core/contact/contactsReindexService.ts +++ b/hrm-domain/hrm-core/contact/contactsReindexService.ts @@ -56,7 +56,7 @@ export const reindexContactsStream = async ( const { MessageId } = await publishContactToSearchIndex({ accountSid, contact, - operation: 'index', + operation: 'reindex', }); this.push( diff --git a/hrm-domain/hrm-core/jobs/search/publishToSearchIndex.ts b/hrm-domain/hrm-core/jobs/search/publishToSearchIndex.ts index a4c9a850b..903cd67fe 100644 --- a/hrm-domain/hrm-core/jobs/search/publishToSearchIndex.ts +++ b/hrm-domain/hrm-core/jobs/search/publishToSearchIndex.ts @@ -19,11 +19,41 @@ import { getSsmParameter } from '../../config/ssmCache'; import { IndexMessage } from '@tech-matters/hrm-search-config'; import { CaseService, Contact } from '@tech-matters/hrm-types'; import { AccountSID, HrmAccountId } from '@tech-matters/types'; +import { publishSns, PublishSnsParams } from '@tech-matters/sns-client'; +import { SsmParameterNotFound } from '@tech-matters/ssm-cache'; + +type DeleteNotificationPayload = { + accountSid: HrmAccountId; + operation: 'delete'; + id: string; +}; + +type UpsertCaseNotificationPayload = { + accountSid: HrmAccountId; + operation: 'update' | 'create' | 'reindex'; + case: CaseService; +}; + +type UpsertContactNotificationPayload = { + accountSid: HrmAccountId; + operation: 'update' | 'create' | 'reindex'; + contact: Contact; +}; + +type NotificationPayload = + | DeleteNotificationPayload + | UpsertCaseNotificationPayload + | UpsertContactNotificationPayload; const PENDING_INDEX_QUEUE_SSM_PATH = `/${process.env.NODE_ENV}/${ process.env.AWS_REGION ?? process.env.AWS_DEFAULT_REGION }/sqs/jobs/hrm-search-index/queue-url-consumer`; +const getSnsSsmPath = dataType => + `/${process.env.NODE_ENV}/${ + process.env.AWS_REGION ?? process.env.AWS_DEFAULT_REGION + }/sns/hrm/${dataType}/update-notifications-sns-topic`; + const publishToSearchIndex = async ({ message, messageGroupId, @@ -46,6 +76,81 @@ const publishToSearchIndex = async ({ } }; +const publishToSns = async ({ + entityType, + payload, + messageGroupId, +}: { + entityType: 'contact' | 'case'; + payload: NotificationPayload; + messageGroupId: string; +}) => { + try { + const topicArn = await getSsmParameter(getSnsSsmPath(entityType)); + const publishParameters: PublishSnsParams = { + topicArn, + message: JSON.stringify({ ...payload, entityType }), + messageGroupId, + messageAttributes: { operation: payload.operation }, + }; + console.debug('Publishing HRM entity update:', publishParameters); + return await publishSns(publishParameters); + } catch (err) { + if (err instanceof SsmParameterNotFound) { + console.debug( + `No SNS topic stored in SSM parameter ${getSnsSsmPath( + entityType, + )}. Skipping publish.`, + ); + return; + } + console.error( + `Error trying to publish message to SNS topic stored in SSM parameter ${getSnsSsmPath( + entityType, + )}`, + err, + ); + } +}; + +const publishEntityToSearchIndex = async ( + accountSid: HrmAccountId, + entityType: 'contact' | 'case', + entity: Contact | CaseService, + operation: IndexMessage['operation'] | 'reindex', +) => { + const messageGroupId = `${accountSid}-${entityType}-${entity.id}`; + if (operation === 'remove') { + await publishToSns({ + entityType, + payload: { accountSid, id: entity.id.toString(), operation: 'delete' }, + messageGroupId, + }); + } else { + const publishOperation = operation === 'reindex' ? 'reindex' : 'update'; + await publishToSns({ + entityType, + // Update / create are identical for now, will differentiate between the 2 ops in a follow pu refactor PR + payload: { + accountSid, + [entityType]: entity, + operation: publishOperation, + } as NotificationPayload, + messageGroupId, + }); + } + const indexOperation = operation === 'reindex' ? 'index' : operation; + return publishToSearchIndex({ + message: { + accountSid, + type: entityType, + [entityType]: entity, + operation: indexOperation, + } as IndexMessage, + messageGroupId: `${accountSid}-${entityType}-${entity.id}`, + }); +}; + export const publishContactToSearchIndex = async ({ accountSid, contact, @@ -53,7 +158,7 @@ export const publishContactToSearchIndex = async ({ }: { accountSid: HrmAccountId; contact: Contact; - operation: IndexMessage['operation']; + operation: IndexMessage['operation'] | 'reindex'; }) => { console.info( `[generalised-search-contacts]: Indexing Started. Account SID: ${accountSid}, Contact ID: ${ @@ -64,10 +169,7 @@ export const publishContactToSearchIndex = async ({ contact.updatedAt ?? contact.createdAt }/${operation})`, ); - return publishToSearchIndex({ - message: { accountSid, type: 'contact', contact, operation }, - messageGroupId: `${accountSid}-contact-${contact.id}`, - }); + return publishEntityToSearchIndex(accountSid, 'contact', contact, operation); }; export const publishCaseToSearchIndex = async ({ @@ -77,7 +179,7 @@ export const publishCaseToSearchIndex = async ({ }: { accountSid: AccountSID; case: CaseService; - operation: IndexMessage['operation']; + operation: IndexMessage['operation'] | 'reindex'; }) => { console.info( `[generalised-search-cases]: Indexing Request Started. Account SID: ${accountSid}, Case ID: ${ @@ -88,8 +190,5 @@ export const publishCaseToSearchIndex = async ({ caseObj.updatedAt ?? caseObj.createdAt }/${operation})`, ); - return publishToSearchIndex({ - message: { accountSid, type: 'case', case: caseObj, operation }, - messageGroupId: `${accountSid}-case-${caseObj.id}`, - }); + return publishEntityToSearchIndex(accountSid, 'case', caseObj, operation); }; diff --git a/hrm-domain/hrm-core/package.json b/hrm-domain/hrm-core/package.json index d425e02e1..20e6b1afe 100644 --- a/hrm-domain/hrm-core/package.json +++ b/hrm-domain/hrm-core/package.json @@ -30,6 +30,7 @@ "@tech-matters/resources-service": "^1.0.0", "@tech-matters/s3-client": "^1.0.0", "@tech-matters/sqs-client": "^1.0.0", + "@tech-matters/sns-client": "^1.0.0", "@tech-matters/ssm-cache": "^1.0.0", "@tech-matters/twilio-client": "^1.0.0", "@tech-matters/twilio-worker-auth": "^1.0.0", diff --git a/packages/sns-client/index.ts b/packages/sns-client/index.ts index 7d3a2d5f3..94b8f5e01 100644 --- a/packages/sns-client/index.ts +++ b/packages/sns-client/index.ts @@ -14,7 +14,12 @@ * along with this program. If not, see https://www.gnu.org/licenses/. */ -import { PublishCommand, SNSClient, SNSClientConfig } from '@aws-sdk/client-sns'; +import { + MessageAttributeValue, + PublishCommand, + SNSClient, + SNSClientConfig, +} from '@aws-sdk/client-sns'; const convertToEndpoint = (endpointUrl: string) => { const url: URL = new URL(endpointUrl); @@ -46,14 +51,23 @@ export const sns = new SNSClient(getSnsConfig()); export type PublishSnsParams = { topicArn: string; message: string; + messageGroupId?: string; + messageAttributes?: Record; }; export const publishSns = async (params: PublishSnsParams) => { - const { topicArn, message } = params; + const { topicArn, message, messageGroupId, messageAttributes } = params; + const messageAttributesPayload: Record = {}; + Object.entries(messageAttributes ?? {}).forEach( + ([key, value]) => + (messageAttributesPayload[key] = { DataType: 'String', StringValue: value }), + ); const command = new PublishCommand({ TopicArn: topicArn, Message: message, + MessageAttributes: messageAttributesPayload, + ...(messageGroupId ? { MessageGroupId: messageGroupId } : {}), }); return sns.send(command); diff --git a/resources-domain/lambdas/import-producer/tests/fixtures/sampleResources.ts b/resources-domain/lambdas/import-producer/tests/fixtures/sampleResources.ts index 0183ae112..ff4276451 100644 --- a/resources-domain/lambdas/import-producer/tests/fixtures/sampleResources.ts +++ b/resources-domain/lambdas/import-producer/tests/fixtures/sampleResources.ts @@ -7994,7 +7994,13 @@ export const withSites_20240823 = { }, eligibilityMaxAge: 30, eligibilityMinAge: 18, - feeStructureSource: [], + feeStructureSource: [ + { + en: 'Free', + fr: 'Gratuit', + objectId: '66635363033cc0b95a022076', + }, + ], feeStructureSourceFreeTextEn: null, feeStructureSourceFreeTextFr: null, howIsServiceOffered: [