Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BIOMAGE-1027] Add safeBatchGetItem that divides batchGetItem in groups of requests of 100 keys #138

Merged
merged 6 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/api/route-services/experiment.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ const mockData = require('./mock-data.json');

const AWS = require('../../utils/requireAWS');
const logger = require('../../utils/logging');

const { OK, NotFoundError } = require('../../utils/responses');
const safeBatchGetItem = require('../../utils/safeBatchGetItem');

const constants = require('../general-services/pipeline-manage/constants');

const {
Expand Down Expand Up @@ -47,7 +48,7 @@ class ExperimentService {
};

try {
const response = await dynamodb.batchGetItem(params).promise();
const response = await safeBatchGetItem(dynamodb, params);

return response.Responses[this.experimentsTableName].map(
(experiment) => convertToJsObject(experiment),
Expand Down
3 changes: 2 additions & 1 deletion src/api/route-services/projects.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const {
const logger = require('../../utils/logging');

const { OK, NotFoundError } = require('../../utils/responses');
const safeBatchGetItem = require('../../utils/safeBatchGetItem');

const SamplesService = require('./samples');
const ExperimentService = require('./experiment');
Expand Down Expand Up @@ -119,7 +120,7 @@ class ProjectsService {
},
};

const data = await dynamodb.batchGetItem(params).promise();
const data = await safeBatchGetItem(dynamodb, params);

const existingProjectIds = new Set(data.Responses[this.tableName].map((entry) => {
const newData = convertToJsObject(entry);
Expand Down
109 changes: 109 additions & 0 deletions src/utils/safeBatchGetItem.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
const _ = require('lodash');

// DO NOT MODIFY, this value is specified in dynamodb docs
const maxKeys = 100;

const mergeIntoBatchKeysObject = (keys, tableName, batchKeysObject) => {
const firstBatchObj = { ...batchKeysObject };

const spaceOccupied = _.sumBy(Object.values(batchKeysObject), (obj) => obj.length);
const freeSpace = maxKeys - spaceOccupied;

// If the new keys fit in the current object then just return it with the new entry
if (keys.length <= freeSpace) {
return [{ ...batchKeysObject, [tableName]: keys }];
}

const firstKeys = keys.slice(0, freeSpace);
const secondKeys = keys.slice(freeSpace);

// Put as many keys as we can fit in old object (if we can)
if (freeSpace) firstBatchObj[tableName] = firstKeys;

// Create a new object for all the ones that we couldn't put in the first one
const secondBatchObj = { [tableName]: secondKeys };

return [firstBatchObj, secondBatchObj];
};

const sendBatchGetItemRequest = (batchKeysObject, allParams, dynamodb) => {
const keysObject = {};

Object.entries(batchKeysObject).forEach(([tableName, keys]) => {
const { Keys, ...restOfParams } = allParams.RequestItems[tableName];

keysObject[tableName] = {
...restOfParams,
Keys: keys,
};
});

const params = {
...allParams,
RequestItems: keysObject,
};

return dynamodb.batchGetItem(params).promise();
};

const concatIfArray = (objValue, srcValue) => {
if (_.isArray(objValue) && _.isArray(srcValue)) {
return [...objValue, ...srcValue];
}

return undefined;
};

/**
* A wrapper for dynamodb's batchGetItem
* This wrapper handles requests of more than 100 items at the same time
* by splitting them into separate batchGetItems
*
* @param {*} dynamodb dynamodb sdk client
* @param {*} params params as would be passed to batchGetItem
* @returns The result as would be returned by batchGetItem
*/
const safeBatchGetItem = async (dynamodb, params) => {
let batchGetKeys = [{}];

// Fill up batchGetKeys with the keys for each table for each batchGet
Object.entries(params.RequestItems).forEach(([tableName, { Keys: keys }]) => {
const keyPartitions = _.chunk(keys, maxKeys);

// Take out last element of keyPartitions (that might still have not reached 100)
const lastKeyPartition = keyPartitions.pop();
// Take out last element of batchGetKeys (that might still have not reached 100)
const lastBatchGetObj = batchGetKeys.pop();

// Combine these two objects into one if possible
// (or into one with 100 and the other with the rest)
const lastBatchKeysObjects = mergeIntoBatchKeysObject(
lastKeyPartition,
tableName,
lastBatchGetObj,
);

// Convert all other keys that we know are groups of 100 into batchGetKeys objects
const newBatchGets = keyPartitions.map((partition) => ({ [tableName]: partition }));

// Concat all objects
batchGetKeys = [...batchGetKeys, ...newBatchGets, ...lastBatchKeysObjects];
});

const requestPromises = batchGetKeys.map(
(batchGetObject) => sendBatchGetItemRequest(batchGetObject, params, dynamodb),
);

// Wait for all batchGetItems to resolve
const getResults = await Promise.all(requestPromises);

// Merge all results into one
const flattenedResult = getResults.reduce((acc, curr) => {
_.mergeWith(acc, curr, concatIfArray);
return acc;
}, {});

return flattenedResult;
};

module.exports = safeBatchGetItem;
Loading