From 352f1b9cb287acb62e761c3312afe1f054a33742 Mon Sep 17 00:00:00 2001 From: Mike Lohmeier Date: Mon, 15 Aug 2022 16:38:25 -0500 Subject: [PATCH] feat: Issue 674 added support for up to 100 items in a FHIR transaction --- .../dynamoDbBundleService.test.ts | 286 +++++++++++++++++- src/dataServices/dynamoDbBundleService.ts | 73 +++-- 2 files changed, 329 insertions(+), 30 deletions(-) diff --git a/src/dataServices/dynamoDbBundleService.test.ts b/src/dataServices/dynamoDbBundleService.test.ts index 9473a07..962955c 100644 --- a/src/dataServices/dynamoDbBundleService.test.ts +++ b/src/dataServices/dynamoDbBundleService.test.ts @@ -6,7 +6,7 @@ // eslint-disable-next-line import/no-extraneous-dependencies import * as AWSMock from 'aws-sdk-mock'; -import { QueryInput, TransactWriteItemsInput } from 'aws-sdk/clients/dynamodb'; +import { QueryInput, TransactWriteItemsInput, TransactWriteItem } from 'aws-sdk/clients/dynamodb'; // @ts-ignore import AWS from 'aws-sdk'; import { @@ -15,6 +15,7 @@ import { TypeOperation, ResourceNotFoundError, } from 'fhir-works-on-aws-interface'; +import { range } from 'lodash'; import { DynamoDbBundleService } from './dynamoDbBundleService'; import { DynamoDBConverter } from './dynamoDb'; import { timeFromEpochInMsRegExp, utcTimeRegExp, uuidRegExp } from '../../testUtilities/regExpressions'; @@ -97,6 +98,56 @@ describe('atomicallyReadWriteResources', () => { await runTest(expectedResponse); }); + test('LOCK: One of the DynamoDB transaction fails', async () => { + // BUILD + + // READ items (Success) + AWSMock.mock('DynamoDB', 'query', (params: QueryInput, callback: Function) => { + const queryId = params.ExpressionAttributeValues![':hkey'].S; + callback(null, { + Items: [ + DynamoDBConverter.marshall({ + id: queryId, + vid: '1', + resourceType: 'Patient', + meta: { versionId: '1', lastUpdated: new Date().toISOString() }, + }), + ], + }); + }); + + // transactWriteItems 1/2 calls succeed + const transactWriteItemsStub = sinon.stub(); + transactWriteItemsStub.onCall(0).yields(null, {}); // lock call 1/2 + transactWriteItemsStub.onCall(1).yields('ConditionalCheckFailed', {}); // lock call 2/2 + transactWriteItemsStub.onCall(2).yields(null, {}); // unlock call 1/2 + transactWriteItemsStub.onCall(3).yields(null, {}); // unlock call 2/2 + AWSMock.mock('DynamoDB', 'transactWriteItems', transactWriteItemsStub); + + // OPERATE + const dynamoDb = new AWS.DynamoDB(); + const bundleService = new DynamoDbBundleService(dynamoDb); + const actualResponse = await bundleService.transaction({ + requests: range(0, 26).map((i) => { + return { + operation: 'delete', + resourceType: 'Patient', + id: `${id}-${i}`, + resource: `Patient/bce8411e-c15e-448c-95dd-69155a837405-${i}`, + }; + }), + startTime: new Date(), + }); + + // CHECK + expect(actualResponse).toStrictEqual({ + success: false, + message: 'Failed to lock resources for transaction. Please try again after 35 seconds.', + batchReadWriteResponses: [], + errorType: 'SYSTEM_ERROR', + }); + }); + test('STAGING: Item exist and lock obtained, but failed to stage', async () => { // READ items (Success) AWSMock.mock('DynamoDB', 'query', (params: QueryInput, callback: Function) => { @@ -135,6 +186,61 @@ describe('atomicallyReadWriteResources', () => { await runTest(expectedResponse); }); + + test('STAGING: One of the DynamoDB transaction fails', async () => { + // BUILD + + // READ items (Success) + AWSMock.mock('DynamoDB', 'query', (params: QueryInput, callback: Function) => { + const queryId = params.ExpressionAttributeValues![':hkey'].S; + callback(null, { + Items: [ + DynamoDBConverter.marshall({ + id: queryId, + vid: '1', + resourceType: 'Patient', + meta: { versionId: '1', lastUpdated: new Date().toISOString() }, + }), + ], + }); + }); + + // transactWriteItems 1/2 calls succeed + const transactWriteItemsStub = sinon.stub(); + transactWriteItemsStub.onCall(0).yields(null, {}); // 1/2 lock calls + transactWriteItemsStub.onCall(1).yields(null, {}); // 2/2 lock calls + transactWriteItemsStub.onCall(2).yields(null, {}); // 1/2 staging calls + transactWriteItemsStub.onCall(2).yields('ConditionalCheckFailed', {}); // 2/2 staging calls + transactWriteItemsStub.onCall(3).yields(null, {}); // rollback call 1/2 + transactWriteItemsStub.onCall(4).yields(null, {}); // rollback call 2/2 + transactWriteItemsStub.onCall(5).yields(null, {}); // unlock call 1/2 + transactWriteItemsStub.onCall(6).yields(null, {}); // unlock call 2/2 + AWSMock.mock('DynamoDB', 'transactWriteItems', transactWriteItemsStub); + + // OPERATE + const dynamoDb = new AWS.DynamoDB(); + const bundleService = new DynamoDbBundleService(dynamoDb); + const actualResponse = await bundleService.transaction({ + requests: range(0, 26).map((i) => { + return { + operation: 'delete', + resourceType: 'Patient', + id: `${id}-${i}`, + vid: '1', + resource: `Patient/bce8411e-c15e-448c-95dd-69155a837405-${i}`, + }; + }), + startTime: new Date(), + }); + + // CHECK + expect(actualResponse).toStrictEqual({ + success: false, + message: 'Failed to stage resources for transaction', + batchReadWriteResponses: [], + errorType: 'SYSTEM_ERROR', + }); + }); }); describe('SUCCESS Cases', () => { @@ -300,6 +406,88 @@ describe('atomicallyReadWriteResources', () => { await runCreateTest(true, true); }); + test('CREATING more than 25 resources', async () => { + // BUILD + + // READ items (Success) + AWSMock.mock('DynamoDB', 'query', (params: QueryInput, callback: Function) => { + const queryId = params.ExpressionAttributeValues![':hkey'].S; + callback(null, { + Items: [ + DynamoDBConverter.marshall({ + id: queryId, + vid: '1', + resourceType: 'Patient', + meta: { versionId: '1', lastUpdated: new Date().toISOString() }, + }), + ], + }); + }); + + // transactWriteItems all calls succeed + const transactWriteItemsStub = sinon.stub(); + transactWriteItemsStub.yields(null, {}); + AWSMock.mock('DynamoDB', 'transactWriteItems', transactWriteItemsStub); + + // OPERATE + const dynamoDb = new AWS.DynamoDB(); + const bundleService = new DynamoDbBundleService(dynamoDb); + const actualResponse = await bundleService.transaction({ + requests: range(0, 26).map((i) => { + return { + operation: 'create', + resourceType: 'Patient', + id: `${id}-${i}`, + vid: '1', + resource: { + resourceType: 'Patient', + name: [ + { + family: `${i}`, + given: [`${i}`], + }, + ], + }, + }; + }), + startTime: new Date(), + }); + + // CHECK + expect(transactWriteItemsStub.callCount).toBe(4); + + // make sure item was staged, unlocked and returned in the response + const stageRequestItems = transactWriteItemsStub + .getCall(0) + .args[0].TransactItems.concat(transactWriteItemsStub.getCall(1).args[0].TransactItems); + const unlockRequestItems = transactWriteItemsStub + .getCall(2) + .args[0].TransactItems.concat(transactWriteItemsStub.getCall(3).args[0].TransactItems); + + range(0, 26).forEach((i) => { + expect( + stageRequestItems.some((item: TransactWriteItem) => { + return item.Put!.Item.id.S === `${id}-${i}`; + }), + ).toBeTruthy(); + expect( + unlockRequestItems.some((item: TransactWriteItem) => { + return item.Update!.Key.id.S === `${id}-${i}`; + }), + ).toBeTruthy(); + + // make sure we include the item in the response + expect( + actualResponse.batchReadWriteResponses.some((item) => { + return item.id === `${id}-${i}`; + }), + ).toBeTruthy(); + }); + + // check the response adds up + expect(actualResponse.success).toBeTruthy(); + }); + async function runUpdateTest(shouldReqHasReferences: boolean, useVersionedReferences: boolean = false) { // BUILD const transactWriteItemSpy = sinon.spy(); @@ -513,6 +701,102 @@ describe('atomicallyReadWriteResources', () => { test('UPDATING a resource with references and versioned reference links', async () => { await runUpdateTest(true, true); }); + + test('UPDATING more than 25 resources', async () => { + // BUILD + + // READ items (Success) + AWSMock.mock('DynamoDB', 'query', (params: QueryInput, callback: Function) => { + const queryId = params.ExpressionAttributeValues![':hkey'].S; + callback(null, { + Items: [ + DynamoDBConverter.marshall({ + id: queryId, + vid: 1, + resourceType: 'Patient', + meta: { versionId: 1, lastUpdated: new Date().toISOString() }, + }), + ], + }); + }); + + // transactWriteItems all calls succeed + const transactWriteItemsStub = sinon.stub(); + transactWriteItemsStub.yields(null, {}); // all calls succeed + AWSMock.mock('DynamoDB', 'transactWriteItems', transactWriteItemsStub); + + // OPERATE + const dynamoDb = new AWS.DynamoDB(); + const bundleService = new DynamoDbBundleService(dynamoDb); + const actualResponse = await bundleService.transaction({ + requests: range(0, 26).map((i) => { + return { + operation: 'update', + resourceType: 'Patient', + id: `${id}-${i}`, + resource: { + id: `${id}-${i}`, + resourceType: 'Patient', + name: [ + { + family: `${i}`, + given: [`${i}`], + }, + ], + meta: { versionId: 2, lastUpdated: new Date().toISOString() }, + }, + }; + }), + startTime: new Date(), + }); + + // CHECK + // [0,1] = lock + // [2,3] = pending + // [4,6] = DELETE vid 1 & AVAILABLE vid 2 + expect(transactWriteItemsStub.callCount).toBe(7); + + // there's no guarantee on which ddb transaction the BatchReadWriteRequest is processed in + + // check every item is locked, adds pending & delete/makes available new version + const lockRequestItems = transactWriteItemsStub + .getCall(0) + .args[0].TransactItems.concat(transactWriteItemsStub.getCall(1).args[0].TransactItems); + const stageRequestItems = transactWriteItemsStub + .getCall(2) + .args[0].TransactItems.concat(transactWriteItemsStub.getCall(3).args[0].TransactItems); + const unlockRequestItems = transactWriteItemsStub + .getCall(4) + .args[0].TransactItems.concat(transactWriteItemsStub.getCall(5).args[0].TransactItems) + .concat(transactWriteItemsStub.getCall(6).args[0].TransactItems); + range(0, 26).forEach((i) => { + expect( + lockRequestItems.some((item: TransactWriteItem) => { + return item.Update!.Key.id.S === `${id}-${i}`; + }), + ).toBeTruthy(); + expect( + stageRequestItems.some((item: TransactWriteItem) => { + return item.Put!.Item.id.S === `${id}-${i}`; + }), + ).toBeTruthy(); + expect( + unlockRequestItems.some((item: TransactWriteItem) => { + return item.Update!.Key.id.S === `${id}-${i}`; + }), + ).toBeTruthy(); + + // make sure we include the item in the response + expect( + actualResponse.batchReadWriteResponses.some((item) => { + return item.id === `${id}-${i}`; + }), + ).toBeTruthy(); + }); + + // check the response adds up + expect(actualResponse.success).toBeTruthy(); + }); }); describe('Update as Create Cases', () => { diff --git a/src/dataServices/dynamoDbBundleService.ts b/src/dataServices/dynamoDbBundleService.ts index 55d028c..0be851c 100644 --- a/src/dataServices/dynamoDbBundleService.ts +++ b/src/dataServices/dynamoDbBundleService.ts @@ -18,7 +18,7 @@ import { GenericResponse, } from 'fhir-works-on-aws-interface'; import flatten from 'flat'; -import set from 'lodash/set'; +import { chunk, set } from 'lodash'; import mapValues from 'lodash/mapValues'; import DOCUMENT_STATUS from './documentStatus'; @@ -46,7 +46,7 @@ export class DynamoDbBundleService implements Bundle { readonly enableMultiTenancy: boolean; - private static readonly dynamoDbMaxTransactionBundleSize = 25; + private static readonly dynamoDbMaxTransactionBundleSize = 100; private readonly maxBatchSize: Number; @@ -378,7 +378,14 @@ export class DynamoDbBundleService implements Bundle { let itemsLockedSuccessfully: ItemRequest[] = []; try { if (params.TransactItems.length > 0) { - await this.dynamoDb.transactWriteItems(params).promise(); + const lockRequests = chunk(addLockRequests, this.MAX_TRANSACTION_SIZE).map((items) => { + return this.dynamoDb + .transactWriteItems({ + TransactItems: items, + }) + .promise(); + }); + await Promise.all(lockRequests); itemsLockedSuccessfully = itemsLockedSuccessfully.concat(lockedItems); } logger.info('Finished locking'); @@ -592,10 +599,15 @@ export class DynamoDbBundleService implements Bundle { return newLockedItems; } try { - const params = { - TransactItems: transactionRequests, - }; - await this.dynamoDb.transactWriteItems(params).promise(); + const writeRequests = chunk(transactionRequests, this.MAX_TRANSACTION_SIZE).map((items) => { + // @ts-ignore + return this.dynamoDb + .transactWriteItems({ + TransactItems: items, + }) + .promise(); + }); + await Promise.all(writeRequests); return newLockedItems; } catch (e) { logger.error('Failed to unstage items', e); @@ -640,37 +652,40 @@ export class DynamoDbBundleService implements Bundle { // Order that Bundle specifies // https://www.hl7.org/fhir/http.html#trules const editRequests: any[] = [...deleteRequests, ...createRequests, ...updateRequests]; - const writeParams = - editRequests.length > 0 - ? { - TransactItems: editRequests, - } - : null; - - const readParams = - readRequests.length > 0 - ? { - TransactItems: readRequests, - } - : null; - let batchReadWriteResponses: BatchReadWriteResponse[] = []; let allLockedItems: ItemRequest[] = lockedItems; try { - if (writeParams) { - await this.dynamoDb.transactWriteItems(writeParams).promise(); + if (editRequests.length > 0) { + const writeChunkRequests = chunk(editRequests, this.MAX_TRANSACTION_SIZE).map((items) => { + return this.dynamoDb + .transactWriteItems({ + TransactItems: items, + }) + .promise(); + }); + await Promise.all(writeChunkRequests); } // Keep track of items successfully staged allLockedItems = lockedItems.concat(newLocks); batchReadWriteResponses = batchReadWriteResponses.concat(newStagingResponses); - if (readParams) { - const readResult = await this.dynamoDb.transactGetItems(readParams).promise(); - batchReadWriteResponses = DynamoDbBundleServiceHelper.populateBundleEntryResponseWithReadResult( - batchReadWriteResponses, - readResult, - ); + if (readRequests.length > 0) { + const readChunkRequests = chunk(readRequests, this.MAX_TRANSACTION_SIZE).map((items) => { + // @ts-ignore + return this.dynamoDb + .transactGetItems({ + TransactItems: items, + }) + .promise(); + }); + const readResults = await Promise.all(readChunkRequests); + readResults.forEach((readResult) => { + batchReadWriteResponses = DynamoDbBundleServiceHelper.populateBundleEntryResponseWithReadResult( + batchReadWriteResponses, + readResult, + ); + }); } logger.info('Successfully staged items');