Skip to content

Commit

Permalink
Merge pull request #74 from nhsconnect/PRMT-4402-hotfix
Browse files Browse the repository at this point in the history
[PRMT-4402] [hot-fix]
  • Loading branch information
martin-nhs authored Mar 21, 2024
2 parents fc47d7b + 59395e4 commit 1549922
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 42 deletions.
11 changes: 11 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"express-validator": "^6.14.0",
"express-winston": "^4.2.0",
"helmet": "^6.0.0",
"lodash.chunk": "^4.2.0",
"lodash.clonedeep": "^4.5.0",
"lodash.merge": "^4.6.2",
"moment": "^2.30.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ import { RecordType } from '../../../models/enums';
import {
cleanupRecordsForTest,
createConversationForTest,
generateMultipleUUID
} from '../../../utilities/integration-test-utilities';
import { buildCore } from '../../../models/core';
import { buildFragmentUpdateParams } from '../../../models/fragment';

// suppress logs
jest.mock('../../../middleware/logging');

describe('EhrTransferTracker', () => {
const testConversationId = uuid();
const testNhsNumber = '9000000001';

beforeEach(async () => {
await createConversationForTest(testConversationId, testNhsNumber);
});

afterEach(async () => {
await cleanupRecordsForTest(testConversationId);
});
Expand All @@ -23,6 +24,7 @@ describe('EhrTransferTracker', () => {
// given
const db = EhrTransferTracker.getInstance();
const testMessageId = uuid();
await createConversationForTest(testConversationId, testNhsNumber);

const ehrCore = buildCore(testConversationId, testMessageId);

Expand All @@ -38,7 +40,52 @@ describe('EhrTransferTracker', () => {
Layer: RecordType.CORE,
ReceivedAt: expect.any(String),
CreatedAt: expect.any(String),
UpdatedAt: expect.any(String),
UpdatedAt: expect.any(String)
});
});

describe('writeItemsInTransaction / updateItemsInTransaction', () => {
it('can write / update multiple items into dynamodb', async () => {
const testSize = 120;
const db = EhrTransferTracker.getInstance();
const fragmentIds = generateMultipleUUID(testSize);
const fragments = fragmentIds.map((fragmentId) => {
return {
InboundConversationId: testConversationId,
Layer: `FRAGMENT#${fragmentId}`,
TestColumn: 'test'
};
});

await db.writeItemsInTransaction(fragments);

const records = await db.queryTableByConversationId(testConversationId);
expect(records).toHaveLength(testSize);
records.forEach((item) => {
expect(item).toMatchObject({
InboundConversationId: testConversationId,
Layer: expect.stringContaining('FRAGMENT#'),
TestColumn: 'test'
});
});

const updates = fragmentIds.map((fragmentId) =>
buildFragmentUpdateParams(testConversationId, fragmentId, {
TransferStatus: 'test update fields'
})
);

await db.updateItemsInTransaction(updates);

const updatedRecords = await db.queryTableByConversationId(testConversationId);
updatedRecords.forEach((item) => {
expect(item).toMatchObject({
InboundConversationId: testConversationId,
Layer: expect.stringContaining('FRAGMENT#'),
TransferStatus: 'test update fields',
TestColumn: 'test'
});
});
});
});
});
72 changes: 48 additions & 24 deletions src/services/database/dynamo-ehr-transfer-tracker.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { TransactWriteCommand, QueryCommand, GetCommand } from '@aws-sdk/lib-dynamodb';
import chunk from 'lodash.chunk';

import { logError, logInfo } from '../../middleware/logging';
import { logError, logInfo, logWarning } from "../../middleware/logging";
import { RecordType } from '../../models/enums';
import { getDynamodbClient } from './dynamodb-client';
import { IS_IN_LOCAL } from '../../utilities/integration-test-utilities';
Expand Down Expand Up @@ -40,38 +41,61 @@ export class EhrTransferTracker {
}

async writeItemsInTransaction(items) {
logInfo(`Writing ${items.length} items to dynamodb table`);
if (!items || !Array.isArray(items)) {
throw new Error('The given argument `items` is not an array');
}
const command = new TransactWriteCommand({
TransactItems: items.map((item) => ({
Put: {
TableName: this.tableName,
Item: item,
},
})),
});
throw new TypeError('The given argument `items` is not an array');
}
if (items.length > 100) {
logWarning('Cannot write all items in a single transaction due to limitation of dynamodb.');
logWarning('Will write the items in multiple transactions');
logWarning('If failed, a complete rollback cannot be guaranteed.');
}

await this.client.send(command);
const splitItemBy100 = chunk(items, 100);

for (const batch of splitItemBy100) {
const command = new TransactWriteCommand({
TransactItems: batch.map(item => ({
Put: {
TableName: this.tableName,
Item: item
}
}))
});
await this.client.send(command);
}
}

async updateItemsInTransaction(updateParams) {
if (!updateParams || !Array.isArray(updateParams)) {
throw new Error('The given argument `updateParams` is not an array');
}
const command = new TransactWriteCommand({
TransactItems: updateParams.map((params) => ({
Update: {
TableName: this.tableName,
...params,
},
})),
});
throw new TypeError('The given argument `updateParams` is not an array');
}

if (updateParams.length > 100) {
logWarning('Cannot update all items in a single transaction due to limitation of dynamodb.');
logWarning('Will update the items in multiple transactions');
logWarning('If failed, a complete rollback cannot be guaranteed.');
}

await this.client.send(command);
logInfo(`Updating dynamodb record with params: ${JSON.stringify(updateParams)}`);

const splitItemBy100 = chunk(updateParams, 100);

for (const batch of splitItemBy100) {
logInfo(`Updating dynamodb record with params: ${JSON.stringify(batch)}`);
const command = new TransactWriteCommand({
TransactItems: batch.map(params => ({
Update: {
TableName: this.tableName,
...params
}
}))
});
await this.client.send(command);
}
}

async queryTableByNhsNumber(nhsNumber, includeDeletedRecord = false) {
async queryTableByNhsNumber(nhsNumber, includeDeletedRecord = false) {
const params = {
TableName: this.tableName,
IndexName: 'NhsNumberSecondaryIndex',
Expand Down
30 changes: 17 additions & 13 deletions src/utilities/integration-test-utilities.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { getUKTimestamp } from '../services/time';
import { EhrTransferTracker } from '../services/database/dynamo-ehr-transfer-tracker';
import { TransactWriteCommand } from '@aws-sdk/lib-dynamodb';
import { RecordType } from '../models/enums';
import chunk from 'lodash.chunk';

export const generateRandomNhsNumber = () => (Math.floor(Math.random() * 9e9) + 1e9).toString();

Expand Down Expand Up @@ -31,7 +32,7 @@ export const createConversationForTest = async (conversationId, nhsNumber, overr
NhsNumber: nhsNumber,
CreatedAt: timestamp,
UpdatedAt: timestamp,
...overrides,
...overrides
};

await db.writeItemsInTransaction([item]);
Expand All @@ -46,19 +47,22 @@ export const cleanupRecordsForTest = async (conversationId) => {

const db = EhrTransferTracker.getInstance();
const records = await db.queryTableByConversationId(conversationId, RecordType.ALL, true);
const deleteCommand = new TransactWriteCommand({
TransactItems: records.map((item) => ({
Delete: {
TableName: db.tableName,
Key: {
InboundConversationId: item.InboundConversationId,
Layer: item.Layer,
},
},
})),
});
const splitItemBy100 = chunk(records, 100);

await db.client.send(deleteCommand);
for (const batch of splitItemBy100) {
const deleteCommand = new TransactWriteCommand({
TransactItems: batch.map((item) => ({
Delete: {
TableName: db.tableName,
Key: {
InboundConversationId: item.InboundConversationId,
Layer: item.Layer
}
}
}))
});
await db.client.send(deleteCommand);
}
};

export const cleanupRecordsForTestByNhsNumber = async (nhsNumber) => {
Expand Down

0 comments on commit 1549922

Please sign in to comment.