Skip to content

Commit

Permalink
Refactor search indexing to be notifying, and update indexing lambda …
Browse files Browse the repository at this point in the history
…to accept messages from SNS
  • Loading branch information
stephenhand committed Nov 20, 2024
1 parent aeb33d0 commit f095ebd
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 80 deletions.
6 changes: 3 additions & 3 deletions hrm-domain/hrm-core/case/caseReindexService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import { HrmAccountId } from '@tech-matters/types';
import { caseRecordToCase } from './caseService';
import { publishCaseToSearchIndex } from '../jobs/search/publishToSearchIndex';
import { publishCaseChangeNotification } from '../notifications/entityChangeNotify';
import { maxPermissions } from '../permissions';
import formatISO from 'date-fns/formatISO';
import { CaseRecord, streamCasesForReindexing } from './caseDataAccess';
Expand Down Expand Up @@ -61,10 +61,10 @@ export const reindexCasesStream = async (
async transform(caseRecord: CaseRecord, _, callback) {
const caseObj = caseRecordToCase(caseRecord);
try {
const { MessageId } = await publishCaseToSearchIndex({
const { MessageId } = await publishCaseChangeNotification({
accountSid,
case: caseObj,
operation: 'index',
operation: 'reindex',
});

this.push(
Expand Down
8 changes: 4 additions & 4 deletions hrm-domain/hrm-core/case/caseSection/caseSectionService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import { TwilioUser } from '@tech-matters/twilio-worker-auth';
import { RulesFile, TKConditionsSets } from '../../permissions/rulesMap';
import { ListConfiguration } from '../caseDataAccess';
import { HrmAccountId } from '@tech-matters/types';
import { indexCaseInSearchIndex } from '../caseService';
import { updateCaseNotify } from '../caseService';

const sectionRecordToSection = (
sectionRecord: CaseSectionRecord | undefined,
Expand Down Expand Up @@ -72,7 +72,7 @@ export const createCaseSection = async (

if (!skipSearchIndex) {
// trigger index operation but don't await for it
indexCaseInSearchIndex({ accountSid, caseId: parseInt(caseId, 10) });
updateCaseNotify({ accountSid, caseId: parseInt(caseId, 10) });
}

return sectionRecordToSection(created);
Expand Down Expand Up @@ -105,7 +105,7 @@ export const replaceCaseSection = async (

if (!skipSearchIndex) {
// trigger index operation but don't await for it
indexCaseInSearchIndex({ accountSid, caseId: parseInt(caseId, 10) });
updateCaseNotify({ accountSid, caseId: parseInt(caseId, 10) });
}

return sectionRecordToSection(updated);
Expand Down Expand Up @@ -183,7 +183,7 @@ export const deleteCaseSection = async (

if (!skipSearchIndex) {
// trigger index operation but don't await for it
indexCaseInSearchIndex({ accountSid, caseId: parseInt(caseId, 10) });
updateCaseNotify({ accountSid, caseId: parseInt(caseId, 10) });
}

return sectionRecordToSection(deleted);
Expand Down
23 changes: 12 additions & 11 deletions hrm-domain/hrm-core/case/caseService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ import {
DocumentType,
HRM_CASES_INDEX_TYPE,
hrmSearchConfiguration,
type IndexMessage,
} from '@tech-matters/hrm-search-config';
import { publishCaseToSearchIndex } from '../jobs/search/publishToSearchIndex';
import { publishCaseChangeNotification } from '../notifications/entityChangeNotify';
import { enablePublishHrmSearchIndex } from '../featureFlags';
import { getClient } from '@tech-matters/elasticsearch-client';
import {
Expand All @@ -66,6 +65,7 @@ import {
} from './caseSearchIndex';
import { ContactListCondition } from '../contact/contactSearchIndex';
import { maxPermissions } from '../permissions';
import { NotificationOperation } from '@tech-matters/hrm-types/dist/NotificationOperation';

export { WELL_KNOWN_CASE_SECTION_NAMES, CaseService, CaseInfoSection };

Expand Down Expand Up @@ -285,8 +285,8 @@ const mapEssentialData =
};
};

const doCaseInSearchIndexOP =
(operation: IndexMessage['operation']) =>
const doCaseChangeNotification =
(operation: NotificationOperation) =>
async ({
accountSid,
caseId,
Expand All @@ -305,7 +305,7 @@ const doCaseInSearchIndexOP =
caseRecord || (await getById(caseId, accountSid, maxPermissions.user, []));

if (caseObj) {
await publishCaseToSearchIndex({
await publishCaseChangeNotification({
accountSid,
case: caseRecordToCase(caseObj),
operation,
Expand All @@ -319,8 +319,9 @@ const doCaseInSearchIndexOP =
}
};

export const indexCaseInSearchIndex = doCaseInSearchIndexOP('index');
const removeCaseInSearchIndex = doCaseInSearchIndexOP('remove');
export const createCaseNotify = doCaseChangeNotification('create');
export const updateCaseNotify = doCaseChangeNotification('update');
const deleteCaseNotify = doCaseChangeNotification('delete');

export const createCase = async (
body: Partial<CaseService>,
Expand All @@ -347,7 +348,7 @@ export const createCase = async (

if (!skipSearchIndex) {
// trigger index operation but don't await for it
indexCaseInSearchIndex({ accountSid, caseId: created.id });
createCaseNotify({ accountSid, caseId: created.id });
}

// A new case is always initialized with empty connected contacts. No need to apply mapContactTransformations here
Expand Down Expand Up @@ -379,7 +380,7 @@ export const updateCaseStatus = async (

if (!skipSearchIndex) {
// trigger index operation but don't await for it
indexCaseInSearchIndex({ accountSid, caseId: updated.id });
updateCaseNotify({ accountSid, caseId: updated.id });
}

return caseRecordToCase(withTransformedContacts);
Expand All @@ -397,7 +398,7 @@ export const updateCaseOverview = async (

if (!skipSearchIndex) {
// trigger index operation but don't await for it
indexCaseInSearchIndex({ accountSid, caseId: updated.id });
updateCaseNotify({ accountSid, caseId: updated.id });
}

return caseRecordToCase(updated);
Expand Down Expand Up @@ -625,7 +626,7 @@ export const deleteCaseById = async ({
const deleted = await deleteById(caseId, accountSid);

// trigger remove operation but don't await for it
removeCaseInSearchIndex({ accountSid, caseId: deleted?.id, caseRecord: deleted });
deleteCaseNotify({ accountSid, caseId: deleted?.id, caseRecord: deleted });

return deleted;
};
30 changes: 17 additions & 13 deletions hrm-domain/hrm-core/contact/contactService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ import {
isDatabaseUniqueConstraintViolationErrorResult,
} from '../sql';
import { systemUser } from '@tech-matters/twilio-worker-auth';
import { publishContactToSearchIndex } from '../jobs/search/publishToSearchIndex';
import { publishContactChangeNotification } from '../notifications/entityChangeNotify';
import type { RulesFile, TKConditionsSets } from '../permissions/rulesMap';
import type { IndexMessage } from '@tech-matters/hrm-search-config';
import {
ContactListCondition,
generateContactSearchFilters,
generateContactPermissionsFilters,
} from './contactSearchIndex';
import { NotificationOperation } from '@tech-matters/hrm-types/dist/NotificationOperation';

// Re export as is:
export { Contact } from './contactDataAccess';
Expand Down Expand Up @@ -178,8 +178,8 @@ const initProfile = async (
});
};

const doContactInSearchIndexOP =
(operation: IndexMessage['operation']) =>
const doContactChangeNotification =
(operation: NotificationOperation) =>
async ({
accountSid,
contactId,
Expand All @@ -195,7 +195,7 @@ const doContactInSearchIndexOP =
const contact = await getById(accountSid, contactId);

if (contact) {
await publishContactToSearchIndex({ accountSid, contact, operation });
await publishContactChangeNotification({ accountSid, contact, operation });
}
} catch (err) {
console.error(
Expand All @@ -205,8 +205,9 @@ const doContactInSearchIndexOP =
}
};

const indexContactInSearchIndex = doContactInSearchIndexOP('index');
const removeContactInSearchIndex = doContactInSearchIndexOP('remove');
const createContactInSearchIndex = doContactChangeNotification('create');
const updateContactInSearchIndex = doContactChangeNotification('update');
const deleteContactInSearchIndex = doContactChangeNotification('delete');

// Creates a contact with all its related records within a single transaction
export const createContact = async (
Expand Down Expand Up @@ -262,7 +263,7 @@ export const createContact = async (
if (isOk(result)) {
// trigger index operation but don't await for it
if (!skipSearchIndex) {
indexContactInSearchIndex({ accountSid, contactId: result.data.id });
createContactInSearchIndex({ accountSid, contactId: result.data.id });
}
return result.data;
}
Expand Down Expand Up @@ -335,7 +336,7 @@ export const patchContact = async (
// trigger index operation but don't await for it

if (!skipSearchIndex) {
indexContactInSearchIndex({ accountSid, contactId: parseInt(contactId, 10) });
updateContactInSearchIndex({ accountSid, contactId: parseInt(contactId, 10) });
}

return applyTransformations(updated);
Expand All @@ -350,7 +351,7 @@ export const connectContactToCase = async (
): Promise<Contact> => {
if (caseId === null) {
// trigger remove operation, awaiting for it, since we'll lost the information of which is the "old case" otherwise
await removeContactInSearchIndex({ accountSid, contactId: parseInt(contactId, 10) });
await deleteContactInSearchIndex({ accountSid, contactId: parseInt(contactId, 10) });
}

const updated: Contact | undefined = await connectToCase()(
Expand All @@ -367,7 +368,7 @@ export const connectContactToCase = async (

// trigger index operation but don't await for it
if (!skipSearchIndex) {
indexContactInSearchIndex({ accountSid, contactId: parseInt(contactId, 10) });
updateContactInSearchIndex({ accountSid, contactId: parseInt(contactId, 10) });
}

return applyTransformations(updated);
Expand Down Expand Up @@ -415,7 +416,10 @@ export const addConversationMediaToContact = async (

// trigger index operation but don't await for it
if (!skipSearchIndex) {
indexContactInSearchIndex({ accountSid, contactId: parseInt(contactIdString, 10) });
updateContactInSearchIndex({
accountSid,
contactId: parseInt(contactIdString, 10),
});
}

return applyTransformations(updated);
Expand Down Expand Up @@ -599,7 +603,7 @@ export const updateConversationMediaData =

// trigger index operation but don't await for it
if (!skipSearchIndex) {
indexContactInSearchIndex({ accountSid, contactId });
updateContactInSearchIndex({ accountSid, contactId });
}

return result;
Expand Down
4 changes: 2 additions & 2 deletions hrm-domain/hrm-core/contact/contactsReindexService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import { HrmAccountId } from '@tech-matters/types';
import { publishContactToSearchIndex } from '../jobs/search/publishToSearchIndex';
import { publishContactChangeNotification } from '../notifications/entityChangeNotify';
import { maxPermissions } from '../permissions';
import { Transform } from 'stream';
import { streamContactsForReindexing } from './contactDataAccess';
Expand Down Expand Up @@ -53,7 +53,7 @@ export const reindexContactsStream = async (
highWaterMark,
async transform(contact, _, callback) {
try {
const { MessageId } = await publishContactToSearchIndex({
const { MessageId } = await publishContactChangeNotification({
accountSid,
contact,
operation: 'reindex',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,29 @@
*/

import { sendSqsMessage } from '@tech-matters/sqs-client';
import { getSsmParameter } from '../../config/ssmCache';
import { getSsmParameter } from '../config/ssmCache';
import { IndexMessage } from '@tech-matters/hrm-search-config';
import { CaseService, Contact } from '@tech-matters/hrm-types';
import { AccountSID, HrmAccountId } from '@tech-matters/types';
import { publishSns, PublishSnsParams } from '@tech-matters/sns-client';
import { SsmParameterNotFound } from '@tech-matters/ssm-cache';
import { NotificationOperation } from '@tech-matters/hrm-types';

type DeleteNotificationPayload = {
accountSid: HrmAccountId;
operation: 'delete';
operation: NotificationOperation & 'delete';
id: string;
};

type UpsertCaseNotificationPayload = {
accountSid: HrmAccountId;
operation: 'update' | 'create' | 'reindex';
operation: NotificationOperation & ('update' | 'create' | 'reindex');
case: CaseService;
};

type UpsertContactNotificationPayload = {
accountSid: HrmAccountId;
operation: 'update' | 'create' | 'reindex';
operation: NotificationOperation & ('update' | 'create' | 'reindex');
contact: Contact;
};

Expand Down Expand Up @@ -117,29 +118,29 @@ const publishEntityToSearchIndex = async (
accountSid: HrmAccountId,
entityType: 'contact' | 'case',
entity: Contact | CaseService,
operation: IndexMessage['operation'] | 'reindex',
operation: NotificationOperation,
) => {
const messageGroupId = `${accountSid}-${entityType}-${entity.id}`;
if (operation === 'remove') {
if (operation === 'delete') {
await publishToSns({
entityType,
payload: { accountSid, id: entity.id.toString(), operation: 'delete' },
payload: { accountSid, id: entity.id.toString(), operation },
messageGroupId,
});
} else {
const publishOperation = operation === 'reindex' ? 'reindex' : 'update';
await publishToSns({
entityType,
// Update / create are identical for now, will differentiate between the 2 ops in a follow pu refactor PR
payload: {
accountSid,
[entityType]: entity,
operation: publishOperation,
operation,
} as NotificationPayload,
messageGroupId,
});
}
const indexOperation = operation === 'reindex' ? 'index' : operation;
const indexOperation: IndexMessage['operation'] =
operation === 'delete' ? 'index' : 'remove';
return publishToSearchIndex({
message: {
accountSid,
Expand All @@ -151,14 +152,14 @@ const publishEntityToSearchIndex = async (
});
};

export const publishContactToSearchIndex = async ({
export const publishContactChangeNotification = async ({
accountSid,
contact,
operation,
}: {
accountSid: HrmAccountId;
contact: Contact;
operation: IndexMessage['operation'] | 'reindex';
operation: NotificationOperation;
}) => {
console.info(
`[generalised-search-contacts]: Indexing Started. Account SID: ${accountSid}, Contact ID: ${
Expand All @@ -172,14 +173,14 @@ export const publishContactToSearchIndex = async ({
return publishEntityToSearchIndex(accountSid, 'contact', contact, operation);
};

export const publishCaseToSearchIndex = async ({
export const publishCaseChangeNotification = async ({
accountSid,
case: caseObj,
operation,
}: {
accountSid: AccountSID;
case: CaseService;
operation: IndexMessage['operation'] | 'reindex';
operation: NotificationOperation;
}) => {
console.info(
`[generalised-search-cases]: Indexing Request Started. Account SID: ${accountSid}, Case ID: ${
Expand Down
Loading

0 comments on commit f095ebd

Please sign in to comment.