Skip to content

Commit

Permalink
Unrevert sns publishing for merging back to master
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenhand committed Nov 7, 2024
1 parent a499b9f commit 04db2d4
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 14 deletions.
2 changes: 1 addition & 1 deletion hrm-domain/hrm-core/contact/contactsReindexService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export const reindexContactsStream = async (
const { MessageId } = await publishContactToSearchIndex({
accountSid,
contact,
operation: 'index',
operation: 'reindex',
});

this.push(
Expand Down
119 changes: 109 additions & 10 deletions hrm-domain/hrm-core/jobs/search/publishToSearchIndex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,41 @@ import { getSsmParameter } from '../../config/ssmCache';
import { IndexMessage } from '@tech-matters/hrm-search-config';
import { CaseService, Contact } from '@tech-matters/hrm-types';
import { AccountSID, HrmAccountId } from '@tech-matters/types';
import { publishSns, PublishSnsParams } from '@tech-matters/sns-client';
import { SsmParameterNotFound } from '@tech-matters/ssm-cache';

type DeleteNotificationPayload = {
accountSid: HrmAccountId;
operation: 'delete';
id: string;
};

type UpsertCaseNotificationPayload = {
accountSid: HrmAccountId;
operation: 'update' | 'create' | 'reindex';
case: CaseService;
};

type UpsertContactNotificationPayload = {
accountSid: HrmAccountId;
operation: 'update' | 'create' | 'reindex';
contact: Contact;
};

type NotificationPayload =
| DeleteNotificationPayload
| UpsertCaseNotificationPayload
| UpsertContactNotificationPayload;

const PENDING_INDEX_QUEUE_SSM_PATH = `/${process.env.NODE_ENV}/${
process.env.AWS_REGION ?? process.env.AWS_DEFAULT_REGION
}/sqs/jobs/hrm-search-index/queue-url-consumer`;

const getSnsSsmPath = dataType =>
`/${process.env.NODE_ENV}/${
process.env.AWS_REGION ?? process.env.AWS_DEFAULT_REGION
}/sns/hrm/${dataType}/update-notifications-sns-topic`;

const publishToSearchIndex = async ({
message,
messageGroupId,
Expand All @@ -46,14 +76,89 @@ const publishToSearchIndex = async ({
}
};

const publishToSns = async ({
entityType,
payload,
messageGroupId,
}: {
entityType: 'contact' | 'case';
payload: NotificationPayload;
messageGroupId: string;
}) => {
try {
const topicArn = await getSsmParameter(getSnsSsmPath(entityType));
const publishParameters: PublishSnsParams = {
topicArn,
message: JSON.stringify({ ...payload, entityType }),
messageGroupId,
messageAttributes: { operation: payload.operation },
};
console.debug('Publishing HRM entity update:', publishParameters);
return await publishSns(publishParameters);
} catch (err) {
if (err instanceof SsmParameterNotFound) {
console.debug(
`No SNS topic stored in SSM parameter ${getSnsSsmPath(
entityType,
)}. Skipping publish.`,
);
return;
}
console.error(
`Error trying to publish message to SNS topic stored in SSM parameter ${getSnsSsmPath(
entityType,
)}`,
err,
);
}
};

const publishEntityToSearchIndex = async (
accountSid: HrmAccountId,
entityType: 'contact' | 'case',
entity: Contact | CaseService,
operation: IndexMessage['operation'] | 'reindex',
) => {
const messageGroupId = `${accountSid}-${entityType}-${entity.id}`;
if (operation === 'remove') {
await publishToSns({
entityType,
payload: { accountSid, id: entity.id.toString(), operation: 'delete' },
messageGroupId,
});
} else {
const publishOperation = operation === 'reindex' ? 'reindex' : 'update';
await publishToSns({
entityType,
// Update / create are identical for now, will differentiate between the 2 ops in a follow pu refactor PR
payload: {
accountSid,
[entityType]: entity,
operation: publishOperation,
} as NotificationPayload,
messageGroupId,
});
}
const indexOperation = operation === 'reindex' ? 'index' : operation;
return publishToSearchIndex({
message: {
accountSid,
type: entityType,
[entityType]: entity,
operation: indexOperation,
} as IndexMessage,
messageGroupId: `${accountSid}-${entityType}-${entity.id}`,
});
};

export const publishContactToSearchIndex = async ({
accountSid,
contact,
operation,
}: {
accountSid: HrmAccountId;
contact: Contact;
operation: IndexMessage['operation'];
operation: IndexMessage['operation'] | 'reindex';
}) => {
console.info(
`[generalised-search-contacts]: Indexing Started. Account SID: ${accountSid}, Contact ID: ${
Expand All @@ -64,10 +169,7 @@ export const publishContactToSearchIndex = async ({
contact.updatedAt ?? contact.createdAt
}/${operation})`,
);
return publishToSearchIndex({
message: { accountSid, type: 'contact', contact, operation },
messageGroupId: `${accountSid}-contact-${contact.id}`,
});
return publishEntityToSearchIndex(accountSid, 'contact', contact, operation);
};

export const publishCaseToSearchIndex = async ({
Expand All @@ -77,7 +179,7 @@ export const publishCaseToSearchIndex = async ({
}: {
accountSid: AccountSID;
case: CaseService;
operation: IndexMessage['operation'];
operation: IndexMessage['operation'] | 'reindex';
}) => {
console.info(
`[generalised-search-cases]: Indexing Request Started. Account SID: ${accountSid}, Case ID: ${
Expand All @@ -88,8 +190,5 @@ export const publishCaseToSearchIndex = async ({
caseObj.updatedAt ?? caseObj.createdAt
}/${operation})`,
);
return publishToSearchIndex({
message: { accountSid, type: 'case', case: caseObj, operation },
messageGroupId: `${accountSid}-case-${caseObj.id}`,
});
return publishEntityToSearchIndex(accountSid, 'case', caseObj, operation);
};
1 change: 1 addition & 0 deletions hrm-domain/hrm-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"@tech-matters/resources-service": "^1.0.0",
"@tech-matters/s3-client": "^1.0.0",
"@tech-matters/sqs-client": "^1.0.0",
"@tech-matters/sns-client": "^1.0.0",
"@tech-matters/ssm-cache": "^1.0.0",
"@tech-matters/twilio-client": "^1.0.0",
"@tech-matters/twilio-worker-auth": "^1.0.0",
Expand Down
18 changes: 16 additions & 2 deletions packages/sns-client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
* along with this program. If not, see https://www.gnu.org/licenses/.
*/

import { PublishCommand, SNSClient, SNSClientConfig } from '@aws-sdk/client-sns';
import {
MessageAttributeValue,
PublishCommand,
SNSClient,
SNSClientConfig,
} from '@aws-sdk/client-sns';

const convertToEndpoint = (endpointUrl: string) => {
const url: URL = new URL(endpointUrl);
Expand Down Expand Up @@ -46,14 +51,23 @@ export const sns = new SNSClient(getSnsConfig());
export type PublishSnsParams = {
topicArn: string;
message: string;
messageGroupId?: string;
messageAttributes?: Record<string, string>;
};

export const publishSns = async (params: PublishSnsParams) => {
const { topicArn, message } = params;
const { topicArn, message, messageGroupId, messageAttributes } = params;
const messageAttributesPayload: Record<string, MessageAttributeValue> = {};
Object.entries(messageAttributes ?? {}).forEach(
([key, value]) =>
(messageAttributesPayload[key] = { DataType: 'String', StringValue: value }),
);

const command = new PublishCommand({
TopicArn: topicArn,
Message: message,
MessageAttributes: messageAttributesPayload,
...(messageGroupId ? { MessageGroupId: messageGroupId } : {}),
});

return sns.send(command);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7994,7 +7994,13 @@ export const withSites_20240823 = {
},
eligibilityMaxAge: 30,
eligibilityMinAge: 18,
feeStructureSource: [],
feeStructureSource: [
{
en: 'Free',
fr: 'Gratuit',
objectId: '66635363033cc0b95a022076',
},
],
feeStructureSourceFreeTextEn: null,
feeStructureSourceFreeTextFr: null,
howIsServiceOffered: [
Expand Down

0 comments on commit 04db2d4

Please sign in to comment.