Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seurat upload #411

Merged
merged 27 commits into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0994a2a
add seurat as sample tech
alexvpickering Aug 31, 2022
c23e64e
init adding seurat enum
alexvpickering Aug 31, 2022
5034dd5
add sample_file_type to migration
alexvpickering Sep 6, 2022
b492073
remove double blank line
alexvpickering Sep 6, 2022
36715e7
init seurat pipeline boilerplate
alexvpickering Sep 7, 2022
977e857
add seurat backend status
alexvpickering Sep 7, 2022
9b55959
add seurat as pipeline_type
alexvpickering Sep 7, 2022
c615940
add seurat to pipeline status; seurat pipeline is created
alexvpickering Sep 7, 2022
93ac876
move SEURATResponse yaml
alexvpickering Sep 7, 2022
0fe0575
seurat pipeline sends updates; embedding work request adapted for seurat
alexvpickering Sep 14, 2022
cacc577
fix tests
alexvpickering Sep 23, 2022
663fcbb
multipart upload works for large seurat objects
alexvpickering Sep 26, 2022
5ab2edc
comment out tests to check staging
alexvpickering Oct 31, 2022
72c1d84
try fix out of memory in api
alexvpickering Oct 31, 2022
c756fb4
add logging
alexvpickering Nov 1, 2022
7e6c408
add logging
alexvpickering Nov 1, 2022
c54e0db
add seurat tests
alexvpickering Nov 2, 2022
e0dc827
remove debug messages
alexvpickering Nov 4, 2022
940f021
improve naming
alexvpickering Nov 8, 2022
9dcdef0
adding tests
alexvpickering Nov 14, 2022
6405026
add sampleFile tests for completeMultipartUpload
alexvpickering Nov 15, 2022
c623e30
add seuratController tests
alexvpickering Nov 15, 2022
d9d0584
test seurat pipeline helpers
alexvpickering Nov 15, 2022
cfff3da
add tests; rename completeMultiPartUpload --> completeMultipartUpload
alexvpickering Nov 15, 2022
e074cc2
fix notification emails for seurat; add tests
alexvpickering Nov 17, 2022
e1d2d23
fix returned file name
alexvpickering Nov 28, 2022
28447db
Merge branch 'master' into seurat-upload
alexvpickering Dec 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/api.v2/constants.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Pipeline names
const QC_PROCESS_NAME = 'qc';
const GEM2S_PROCESS_NAME = 'gem2s';
const SEURAT_PROCESS_NAME = 'seurat';
const OLD_QC_NAME_TO_BE_REMOVED = 'pipeline';

const ASSIGN_POD_TO_PIPELINE = 'assignPodToPipeline';
Expand Down Expand Up @@ -41,6 +42,7 @@ const ADMIN_SUB = {
module.exports = {
QC_PROCESS_NAME,
GEM2S_PROCESS_NAME,
SEURAT_PROCESS_NAME,
OLD_QC_NAME_TO_BE_REMOVED,
RUNNING,
FAILED,
Expand Down
25 changes: 20 additions & 5 deletions src/api.v2/controllers/sampleFileController.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const sqlClient = require('../../sql/sqlClient');
const Sample = require('../model/Sample');
const SampleFile = require('../model/SampleFile');

const { getSampleFileUploadUrl, getSampleFileDownloadUrl } = require('../helpers/s3/signedUrl');
const { getSampleFileUploadUrls, getSampleFileDownloadUrl, completeMultipartUpload } = require('../helpers/s3/signedUrl');
const { OK } = require('../../utils/responses');
const getLogger = require('../../utils/getLogger');

Expand All @@ -24,17 +24,19 @@ const createFile = async (req, res) => {
upload_status: 'uploading',
};

let signedUrl;
let uploadUrlParams;

await sqlClient.get().transaction(async (trx) => {
await new SampleFile(trx).create(newSampleFile);
await new Sample(trx).setNewFile(sampleId, sampleFileId, sampleFileType);

signedUrl = getSampleFileUploadUrl(sampleFileId, metadata);
logger.log(`Getting multipart upload urls for ${experimentId}, sample ${sampleId}, sampleFileType ${sampleFileType}`);
uploadUrlParams = await getSampleFileUploadUrls(sampleFileId, metadata, size);
});


logger.log(`Finished creating sample file for experiment ${experimentId}, sample ${sampleId}, sampleFileType ${sampleFileType}`);
res.json(signedUrl);
res.json(uploadUrlParams);
};

const patchFile = async (req, res) => {
Expand All @@ -50,6 +52,19 @@ const patchFile = async (req, res) => {
res.json(OK());
};

const completeMultipart = async (req, res) => {
const {
body: { sampleFileId, parts, uploadId },
} = req;

logger.log(`completing multipart upload for sampleFileId ${sampleFileId}`);

completeMultipartUpload(sampleFileId, parts, uploadId);

logger.log(`completed multipart upload for sampleFileId ${sampleFileId}`);
res.json(OK());
};

const getS3DownloadUrl = async (req, res) => {
const { experimentId, sampleId, sampleFileType } = req.params;

Expand All @@ -63,5 +78,5 @@ const getS3DownloadUrl = async (req, res) => {
};

module.exports = {
createFile, patchFile, getS3DownloadUrl,
createFile, patchFile, getS3DownloadUrl, completeMultipart,
};
60 changes: 60 additions & 0 deletions src/api.v2/controllers/seuratController.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
const AWSXRay = require('aws-xray-sdk');

const { createSeuratPipeline, handleSeuratResponse } = require('../helpers/pipeline/seurat');
const { OK } = require('../../utils/responses');
const getLogger = require('../../utils/getLogger');
const parseSNSMessage = require('../../utils/parse-sns-message');

const logger = getLogger('[SeuratController] - ');

const runSeurat = async (req, res) => {
const { experimentId } = req.params;

logger.log(`Starting seurat for experiment ${experimentId}`);

const newExecution = await
createSeuratPipeline(experimentId, req.body, req.headers.authorization);

logger.log(`Started seurat for experiment ${experimentId} successfully, `);
logger.log('New executions data:');
logger.log(JSON.stringify(newExecution));

res.json(OK());
};

const handleResponse = async (req, res) => {
let result;

try {
result = await parseSNSMessage(req);
} catch (e) {
logger.error('Parsing initial SNS message failed:', e);
AWSXRay.getSegment().addError(e);
res.status(200).send('nok');
return;
}

const { io, parsedMessage } = result;

const isSnsNotification = parsedMessage !== undefined;
if (isSnsNotification) {
try {
await handleSeuratResponse(io, parsedMessage);
} catch (e) {
logger.error(
'seurat pipeline response handler failed with error: ', e,
);

AWSXRay.getSegment().addError(e);
res.status(200).send('nok');
return;
}
}

res.status(200).send('ok');
};

module.exports = {
runSeurat,
handleResponse,
};
11 changes: 9 additions & 2 deletions src/api.v2/events/validateAndSubmitWork.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ const getPipelineStatus = require('../helpers/pipeline/getPipelineStatus');

const pipelineConstants = require('../constants');

const checkSomeEqualTo = (array, testValue) => array.some((item) => item === testValue);

const validateAndSubmitWork = async (workRequest) => {
const { experimentId } = workRequest;

Expand All @@ -13,8 +15,13 @@ const validateAndSubmitWork = async (workRequest) => {
experimentId, pipelineConstants.QC_PROCESS_NAME,
);

if (qcPipelineStatus !== pipelineConstants.SUCCEEDED) {
const e = new Error(`Work request can not be handled because pipeline is ${qcPipelineStatus}`);

const { seurat: { status: seuratPipelineStatus } } = await getPipelineStatus(
experimentId, pipelineConstants.SEURAT_PROCESS_NAME,
);

if (!checkSomeEqualTo([qcPipelineStatus, seuratPipelineStatus], pipelineConstants.SUCCEEDED)) {
const e = new Error(`Work request can not be handled because pipeline is ${qcPipelineStatus} or seurat status is ${seuratPipelineStatus}`);

AWSXRay.getSegment().addError(e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@ const getPipelineStatus = require('../pipeline/getPipelineStatus');
const getWorkerStatus = require('../worker/getWorkerStatus');

const getExperimentBackendStatus = async (experimentId) => {
const [{ gem2s }, { qc }, { worker }] = await Promise.all(
const [{ gem2s }, { qc }, { seurat }, { worker }] = await Promise.all(
[
getPipelineStatus(experimentId, constants.GEM2S_PROCESS_NAME),
getPipelineStatus(experimentId, constants.QC_PROCESS_NAME),
getPipelineStatus(experimentId, constants.SEURAT_PROCESS_NAME),
getWorkerStatus(experimentId),
],
);

const formattedResponse = {
[constants.OLD_QC_NAME_TO_BE_REMOVED]: qc,
gem2s,
seurat,
worker,
};

Expand Down
6 changes: 6 additions & 0 deletions src/api.v2/helpers/pipeline/__mocks__/getBackendStatus.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ const response = {
status: pipelineConstants.NOT_CREATED,
stopDate: null,
},
seurat: {
completedSteps: [],
startDate: null,
status: pipelineConstants.NOT_CREATED,
stopDate: null,
},
worker: {
ready: true,
restartCount: 0,
Expand Down
6 changes: 6 additions & 0 deletions src/api.v2/helpers/pipeline/__mocks__/getPipelineStatus.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ const responseTemplates = {
status: pipelineConstants.SUCCEEDED,
stopDate: null,
},
seurat: {
completedSteps: [],
startDate: null,
status: pipelineConstants.NOT_CREATED,
stopDate: null,
},
qc: {
completedSteps: [],
startDate: null,
Expand Down
13 changes: 12 additions & 1 deletion src/api.v2/helpers/pipeline/getPipelineStatus.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ const gem2sPipelineSteps = [
'PrepareExperiment',
'UploadToAWS'];

const seuratPipelineSteps = [
'DownloadSeurat',
'ProcessSeurat',
'UploadSeuratToAWS'];

// pipelineStepNames are the names of pipeline steps for which we
// want to report the progress back to the user
// does not include steps used to initialize the infrastructure (like pod deletion assignation)
Expand Down Expand Up @@ -77,6 +82,9 @@ const buildCompletedStatus = (processName, date, paramsHash) => {
case pipelineConstants.GEM2S_PROCESS_NAME:
completedSteps = gem2sPipelineSteps;
break;
case pipelineConstants.SEURAT_PROCESS_NAME:
completedSteps = seuratPipelineSteps;
break;
case pipelineConstants.QC_PROCESS_NAME:
completedSteps = qcPipelineSteps;
break;
Expand Down Expand Up @@ -274,17 +282,20 @@ const getPipelineStatus = async (experimentId, processName) => {
}

const events = await getExecutionHistory(stepFunctions, executionArn);

error = checkError(events);
const executedSteps = getStepsFromExecutionHistory(events);
const lastExecuted = executedSteps[executedSteps.length - 1];

switch (processName) {
case pipelineConstants.QC_PROCESS_NAME:
completedSteps = qcPipelineSteps.slice(0, qcPipelineSteps.indexOf(lastExecuted) + 1);
break;
case pipelineConstants.GEM2S_PROCESS_NAME:
completedSteps = gem2sPipelineSteps.slice(0, gem2sPipelineSteps.indexOf(lastExecuted) + 1);
break;
case pipelineConstants.SEURAT_PROCESS_NAME:
completedSteps = seuratPipelineSteps.slice(0, seuratPipelineSteps.indexOf(lastExecuted) + 1);
break;
default:
logger.error(`unknown process name ${processName}`);
}
Expand Down
6 changes: 3 additions & 3 deletions src/api.v2/helpers/pipeline/hooks/sendNotification.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const { authenticationMiddlewareSocketIO } = require('../../../middlewares/authMiddlewares');

const {
SUCCEEDED, FAILED, QC_PROCESS_NAME,
SUCCEEDED, FAILED, QC_PROCESS_NAME, SEURAT_PROCESS_NAME,
} = require('../../../constants');

const getPipelineStatus = require('../getPipelineStatus');
Expand Down Expand Up @@ -42,8 +42,8 @@ const sendNotification = async (message) => {
}

if (experiment.notifyByEmail
&& ((processName === QC_PROCESS_NAME && status === SUCCEEDED)
|| status === FAILED)) {
&& (([QC_PROCESS_NAME, SEURAT_PROCESS_NAME].includes(processName) && status === SUCCEEDED)
|| status === FAILED)) {
try {
const emailParams = buildPipelineStatusEmailBody(message.experimentId, status, user);
await sendEmail(emailParams);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const config = require('../../../../../config');
const { QC_PROCESS_NAME, GEM2S_PROCESS_NAME } = require('../../../../constants');
const { QC_PROCESS_NAME, GEM2S_PROCESS_NAME, SEURAT_PROCESS_NAME } = require('../../../../constants');

const createTask = (taskName, context) => {
const {
Expand Down Expand Up @@ -41,6 +41,14 @@ const getGem2SParams = (task, context) => {
};
};

const getSeuratParams = (task, context) => {
const { taskParams } = context;
return {
...task,
...taskParams,
};
};


const buildParams = (task, context, stepArgs) => {
let processParams;
Expand All @@ -49,6 +57,8 @@ const buildParams = (task, context, stepArgs) => {
processParams = getQCParams(task, context, stepArgs);
} else if (task.processName === GEM2S_PROCESS_NAME) {
processParams = getGem2SParams(task, context);
} else if (task.processName === SEURAT_PROCESS_NAME) {
processParams = getSeuratParams(task, context);
}

return {
Expand Down
43 changes: 41 additions & 2 deletions src/api.v2/helpers/pipeline/pipelineConstruct/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const { v4: uuidv4 } = require('uuid');
const util = require('util');

const config = require('../../../../config');
const { QC_PROCESS_NAME, GEM2S_PROCESS_NAME } = require('../../../constants');
const { QC_PROCESS_NAME, GEM2S_PROCESS_NAME, SEURAT_PROCESS_NAME } = require('../../../constants');

const Experiment = require('../../../model/Experiment');
const ExperimentExecution = require('../../../model/ExperimentExecution');
Expand All @@ -18,7 +18,7 @@ const getLogger = require('../../../../utils/getLogger');
const asyncTimer = require('../../../../utils/asyncTimer');

const constructPipelineStep = require('./constructors/constructPipelineStep');
const { getGem2sPipelineSkeleton, getQcPipelineSkeleton } = require('./skeletons');
const { getGem2sPipelineSkeleton, getQcPipelineSkeleton, getSeuratPipelineSkeleton } = require('./skeletons');
const { getQcStepsToRun } = require('./qcHelpers');

const logger = getLogger();
Expand Down Expand Up @@ -314,9 +314,48 @@ const createGem2SPipeline = async (experimentId, taskParams) => {
return { stateMachineArn, executionArn };
};

const createSeuratObjectPipeline = async (experimentId, taskParams) => {
const accountId = config.awsAccountId;
const roleArn = `arn:aws:iam::${accountId}:role/state-machine-role-${config.clusterEnv}`;

const context = {
taskParams,
experimentId,
accountId,
roleArn,
processName: SEURAT_PROCESS_NAME,
activityArn: `arn:aws:states:${config.awsRegion}:${accountId}:activity:pipeline-${config.clusterEnv}-${uuidv4()}`,
pipelineArtifacts: await getPipelineArtifacts(),
clusterInfo: await getClusterInfo(),
sandboxId: config.sandboxId,
processingConfig: {},
environment: config.clusterEnv,
};

const seuratPipelineSkeleton = getSeuratPipelineSkeleton(config.clusterEnv);
logger.log('Skeleton constructed, now building state machine definition...');

const stateMachine = buildStateMachineDefinition(seuratPipelineSkeleton, context);
logger.log('State machine definition built, now creating activity if not already present...');

const activityArn = await createActivity(context);
logger.log(`Activity with ARN ${activityArn} created, now creating state machine from skeleton...`);

const stateMachineArn = await createNewStateMachine(context, stateMachine, SEURAT_PROCESS_NAME);
logger.log(`State machine with ARN ${stateMachineArn} created, launching it...`);
logger.log('Context:', util.inspect(context, { showHidden: false, depth: null, colors: false }));
logger.log('State machine:', util.inspect(stateMachine, { showHidden: false, depth: null, colors: false }));

const executionArn = await executeStateMachine(stateMachineArn);
logger.log(`Execution with ARN ${executionArn} created.`);

return { stateMachineArn, executionArn };
};


module.exports = {
createQCPipeline,
createGem2SPipeline,
createSeuratObjectPipeline,
buildStateMachineDefinition,
};
Loading