Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PRMT-4402] [hot-fix] #74

Merged
merged 1 commit into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"express-validator": "^6.14.0",
"express-winston": "^4.2.0",
"helmet": "^6.0.0",
"lodash.chunk": "^4.2.0",
"lodash.clonedeep": "^4.5.0",
"lodash.merge": "^4.6.2",
"moment": "^2.30.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ import { RecordType } from '../../../models/enums';
import {
cleanupRecordsForTest,
createConversationForTest,
generateMultipleUUID
} from '../../../utilities/integration-test-utilities';
import { buildCore } from '../../../models/core';
import { buildFragmentUpdateParams } from '../../../models/fragment';

// suppress logs
jest.mock('../../../middleware/logging');

describe('EhrTransferTracker', () => {
const testConversationId = uuid();
const testNhsNumber = '9000000001';

beforeEach(async () => {
await createConversationForTest(testConversationId, testNhsNumber);
});

afterEach(async () => {
await cleanupRecordsForTest(testConversationId);
});
Expand All @@ -23,6 +24,7 @@ describe('EhrTransferTracker', () => {
// given
const db = EhrTransferTracker.getInstance();
const testMessageId = uuid();
await createConversationForTest(testConversationId, testNhsNumber);

const ehrCore = buildCore(testConversationId, testMessageId);

Expand All @@ -38,7 +40,52 @@ describe('EhrTransferTracker', () => {
Layer: RecordType.CORE,
ReceivedAt: expect.any(String),
CreatedAt: expect.any(String),
UpdatedAt: expect.any(String),
UpdatedAt: expect.any(String)
});
});

describe('writeItemsInTransaction / updateItemsInTransaction', () => {
it('can write / update multiple items into dynamodb', async () => {
const testSize = 120;
const db = EhrTransferTracker.getInstance();
const fragmentIds = generateMultipleUUID(testSize);
const fragments = fragmentIds.map((fragmentId) => {
return {
InboundConversationId: testConversationId,
Layer: `FRAGMENT#${fragmentId}`,
TestColumn: 'test'
};
});

await db.writeItemsInTransaction(fragments);

const records = await db.queryTableByConversationId(testConversationId);
expect(records).toHaveLength(testSize);
records.forEach((item) => {
expect(item).toMatchObject({
InboundConversationId: testConversationId,
Layer: expect.stringContaining('FRAGMENT#'),
TestColumn: 'test'
});
});

const updates = fragmentIds.map((fragmentId) =>
buildFragmentUpdateParams(testConversationId, fragmentId, {
TransferStatus: 'test update fields'
})
);

await db.updateItemsInTransaction(updates);

const updatedRecords = await db.queryTableByConversationId(testConversationId);
updatedRecords.forEach((item) => {
expect(item).toMatchObject({
InboundConversationId: testConversationId,
Layer: expect.stringContaining('FRAGMENT#'),
TransferStatus: 'test update fields',
TestColumn: 'test'
});
});
});
});
});
72 changes: 48 additions & 24 deletions src/services/database/dynamo-ehr-transfer-tracker.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { TransactWriteCommand, QueryCommand, GetCommand } from '@aws-sdk/lib-dynamodb';
import chunk from 'lodash.chunk';

import { logError, logInfo } from '../../middleware/logging';
import { logError, logInfo, logWarning } from "../../middleware/logging";
import { RecordType } from '../../models/enums';
import { getDynamodbClient } from './dynamodb-client';
import { IS_IN_LOCAL } from '../../utilities/integration-test-utilities';
Expand Down Expand Up @@ -40,38 +41,61 @@ export class EhrTransferTracker {
}

async writeItemsInTransaction(items) {
logInfo(`Writing ${items.length} items to dynamodb table`);
if (!items || !Array.isArray(items)) {
throw new Error('The given argument `items` is not an array');
}
const command = new TransactWriteCommand({
TransactItems: items.map((item) => ({
Put: {
TableName: this.tableName,
Item: item,
},
})),
});
throw new TypeError('The given argument `items` is not an array');
}
if (items.length > 100) {
logWarning('Cannot write all items in a single transaction due to limitation of dynamodb.');
logWarning('Will write the items in multiple transactions');
logWarning('If failed, a complete rollback cannot be guaranteed.');
}

await this.client.send(command);
const splitItemBy100 = chunk(items, 100);

for (const batch of splitItemBy100) {
const command = new TransactWriteCommand({
TransactItems: batch.map(item => ({
Put: {
TableName: this.tableName,
Item: item
}
}))
});
await this.client.send(command);
}
}

async updateItemsInTransaction(updateParams) {
if (!updateParams || !Array.isArray(updateParams)) {
throw new Error('The given argument `updateParams` is not an array');
}
const command = new TransactWriteCommand({
TransactItems: updateParams.map((params) => ({
Update: {
TableName: this.tableName,
...params,
},
})),
});
throw new TypeError('The given argument `updateParams` is not an array');
}

if (updateParams.length > 100) {
logWarning('Cannot update all items in a single transaction due to limitation of dynamodb.');
logWarning('Will update the items in multiple transactions');
logWarning('If failed, a complete rollback cannot be guaranteed.');
}

await this.client.send(command);
logInfo(`Updating dynamodb record with params: ${JSON.stringify(updateParams)}`);

const splitItemBy100 = chunk(updateParams, 100);

for (const batch of splitItemBy100) {
logInfo(`Updating dynamodb record with params: ${JSON.stringify(batch)}`);
const command = new TransactWriteCommand({
TransactItems: batch.map(params => ({
Update: {
TableName: this.tableName,
...params
}
}))
});
await this.client.send(command);
}
}

async queryTableByNhsNumber(nhsNumber, includeDeletedRecord = false) {
async queryTableByNhsNumber(nhsNumber, includeDeletedRecord = false) {
const params = {
TableName: this.tableName,
IndexName: 'NhsNumberSecondaryIndex',
Expand Down
30 changes: 17 additions & 13 deletions src/utilities/integration-test-utilities.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { getUKTimestamp } from '../services/time';
import { EhrTransferTracker } from '../services/database/dynamo-ehr-transfer-tracker';
import { TransactWriteCommand } from '@aws-sdk/lib-dynamodb';
import { RecordType } from '../models/enums';
import chunk from 'lodash.chunk';

export const generateRandomNhsNumber = () => (Math.floor(Math.random() * 9e9) + 1e9).toString();

Expand Down Expand Up @@ -31,7 +32,7 @@ export const createConversationForTest = async (conversationId, nhsNumber, overr
NhsNumber: nhsNumber,
CreatedAt: timestamp,
UpdatedAt: timestamp,
...overrides,
...overrides
};

await db.writeItemsInTransaction([item]);
Expand All @@ -46,19 +47,22 @@ export const cleanupRecordsForTest = async (conversationId) => {

const db = EhrTransferTracker.getInstance();
const records = await db.queryTableByConversationId(conversationId, RecordType.ALL, true);
const deleteCommand = new TransactWriteCommand({
TransactItems: records.map((item) => ({
Delete: {
TableName: db.tableName,
Key: {
InboundConversationId: item.InboundConversationId,
Layer: item.Layer,
},
},
})),
});
const splitItemBy100 = chunk(records, 100);

await db.client.send(deleteCommand);
for (const batch of splitItemBy100) {
const deleteCommand = new TransactWriteCommand({
TransactItems: batch.map((item) => ({
Delete: {
TableName: db.tableName,
Key: {
InboundConversationId: item.InboundConversationId,
Layer: item.Layer
}
}
}))
});
await db.client.send(deleteCommand);
}
};

export const cleanupRecordsForTestByNhsNumber = async (nhsNumber) => {
Expand Down
Loading