Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sce upload #527

Merged
merged 10 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/api.v2/constants.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Pipeline names
const QC_PROCESS_NAME = 'qc';
const GEM2S_PROCESS_NAME = 'gem2s';
const SEURAT_PROCESS_NAME = 'seurat';
const OBJ2S_PROCESS_NAME = 'obj2s';
const OLD_QC_NAME_TO_BE_REMOVED = 'pipeline';
const SUBSET_PROCESS_NAME = 'subset';
const COPY_PROCESS_NAME = 'copy';
Expand Down Expand Up @@ -44,7 +44,7 @@ const ACCOUNT_ID = {
module.exports = {
QC_PROCESS_NAME,
GEM2S_PROCESS_NAME,
SEURAT_PROCESS_NAME,
OBJ2S_PROCESS_NAME,
OLD_QC_NAME_TO_BE_REMOVED,
SUBSET_PROCESS_NAME,
COPY_PROCESS_NAME,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
const { runSeurat, handleSeuratResponse } = require('../helpers/pipeline/seurat');
const { runObj2s, handleObj2sResponse } = require('../helpers/pipeline/obj2s');
const { OK } = require('../../utils/responses');
const getLogger = require('../../utils/getLogger');
const parseSNSMessage = require('../../utils/parseSNSMessage');
const snsTopics = require('../../config/snsTopics');

const logger = getLogger('[SeuratController] - ');
const logger = getLogger('[Obj2sController] - ');

const handleResponse = async (req, res) => {
let result;
Expand All @@ -22,10 +22,10 @@ const handleResponse = async (req, res) => {
const isSnsNotification = parsedMessage !== undefined;
if (isSnsNotification) {
try {
await handleSeuratResponse(io, parsedMessage);
await handleObj2sResponse(io, parsedMessage);
} catch (e) {
logger.error(
'seurat pipeline response handler failed with error: ', e,
'obj2s pipeline response handler failed with error: ', e,
);

res.status(200).send('nok');
Expand All @@ -36,16 +36,16 @@ const handleResponse = async (req, res) => {
res.status(200).send('ok');
};

const handleSeuratRequest = async (req, res) => {
const handleObj2sRequest = async (req, res) => {
const params = {
experimentId: req.params.experimentId,
};

await runSeurat(params, req.headers.authorization);
await runObj2s(params, req.headers.authorization);
res.json(OK());
};

module.exports = {
handleSeuratRequest,
handleObj2sRequest,
handleResponse,
};
10 changes: 5 additions & 5 deletions src/api.v2/events/validateAndSubmitWork.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ const validateAndSubmitWork = async (req) => {
experimentId, pipelineConstants.QC_PROCESS_NAME,
);

const { seurat: { status: seuratPipelineStatus } } = await getPipelineStatus(
experimentId, pipelineConstants.SEURAT_PROCESS_NAME,
const { obj2s: { status: obj2sPipelineStatus } } = await getPipelineStatus(
experimentId, pipelineConstants.OBJ2S_PROCESS_NAME,
);

if (!checkSomeEqualTo([qcPipelineStatus, seuratPipelineStatus], pipelineConstants.SUCCEEDED)) {
throw new Error(`Work request can not be handled because pipeline is ${qcPipelineStatus} or seurat status is ${seuratPipelineStatus}`);
if (!checkSomeEqualTo([qcPipelineStatus, obj2sPipelineStatus], pipelineConstants.SUCCEEDED)) {
throw new Error(`Work request can not be handled because pipeline is ${qcPipelineStatus} or obj2s status is ${obj2sPipelineStatus}`);
}

// add the embedding etag if the work request, needed by trajectory analysis & download seurat object
// add the embedding etag if the work request, needed by trajectory analysis & download obj2s object
workRequest = await addEmbeddingEtag(experimentId, workRequest);

await validateRequest(workRequest, 'WorkRequest.v2.yaml');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@ const getPipelineStatus = require('../pipeline/getPipelineStatus');
const getWorkerStatus = require('../worker/getWorkerStatus');

const getExperimentBackendStatus = async (experimentId) => {
const [{ gem2s }, { qc }, { seurat }, { worker }] = await Promise.all(
const [{ gem2s }, { qc }, { obj2s }, { worker }] = await Promise.all(
[
getPipelineStatus(experimentId, constants.GEM2S_PROCESS_NAME),
getPipelineStatus(experimentId, constants.QC_PROCESS_NAME),
getPipelineStatus(experimentId, constants.SEURAT_PROCESS_NAME),
getPipelineStatus(experimentId, constants.OBJ2S_PROCESS_NAME),
getWorkerStatus(experimentId),
],
);

const formattedResponse = {
[constants.OLD_QC_NAME_TO_BE_REMOVED]: qc,
gem2s,
seurat,
obj2s,
worker,
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const response = {
status: pipelineConstants.NOT_CREATED,
stopDate: null,
},
seurat: {
obj2s: {
completedSteps: [],
startDate: null,
status: pipelineConstants.NOT_CREATED,
Expand Down
2 changes: 1 addition & 1 deletion src/api.v2/helpers/pipeline/__mocks__/getPipelineStatus.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const responseTemplates = {
status: pipelineConstants.SUCCEEDED,
stopDate: null,
},
seurat: {
obj2s: {
completedSteps: [],
startDate: null,
status: pipelineConstants.NOT_CREATED,
Expand Down
14 changes: 7 additions & 7 deletions src/api.v2/helpers/pipeline/getPipelineStatus.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ const gem2sPipelineSteps = [
'UploadToAWS',
];

const seuratPipelineSteps = [
'DownloadSeurat',
'ProcessSeurat',
'UploadSeuratToAWS',
const obj2sPipelineSteps = [
'DownloadObj2sFile',
'ProcessObj2s',
'UploadObj2sToAWS',
];

// pipelineStepNames are the names of pipeline steps for which we
Expand Down Expand Up @@ -87,8 +87,8 @@ const buildCompletedStatus = (processName, date, shouldRerun) => {
case pipelineConstants.GEM2S_PROCESS_NAME:
completedSteps = gem2sPipelineSteps;
break;
case pipelineConstants.SEURAT_PROCESS_NAME:
completedSteps = seuratPipelineSteps;
case pipelineConstants.OBJ2S_PROCESS_NAME:
completedSteps = obj2sPipelineSteps;
break;
case pipelineConstants.QC_PROCESS_NAME:
completedSteps = qcPipelineSteps;
Expand Down Expand Up @@ -260,7 +260,7 @@ const getCompletedSteps = async (
.map((rawStepName) => stepNameToBackendStepNames[rawStepName]);

completedSteps = stepsCompletedInPreviousRuns.concat(lastRunExecutedSteps);
} if (processName === 'gem2s' || processName === 'seurat') {
} if (processName === 'gem2s' || processName === 'obj2s') {
completedSteps = lastRunExecutedSteps;
}

Expand Down
4 changes: 2 additions & 2 deletions src/api.v2/helpers/pipeline/hooks/sendNotification.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const { authenticationMiddlewareSocketIO } = require('../../../middlewares/authMiddlewares');

const {
SUCCEEDED, FAILED, QC_PROCESS_NAME, SEURAT_PROCESS_NAME,
SUCCEEDED, FAILED, QC_PROCESS_NAME, OBJ2S_PROCESS_NAME,
} = require('../../../constants');

const getPipelineStatus = require('../getPipelineStatus');
Expand Down Expand Up @@ -42,7 +42,7 @@ const sendNotification = async (message) => {
}

if (experiment.notifyByEmail
&& (([QC_PROCESS_NAME, SEURAT_PROCESS_NAME].includes(processName) && status === SUCCEEDED)
&& (([QC_PROCESS_NAME, OBJ2S_PROCESS_NAME].includes(processName) && status === SUCCEEDED)
|| status === FAILED)) {
try {
const emailParams = buildPipelineStatusEmailBody(message.experimentId, status, user);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const _ = require('lodash');

const getPipelineStatus = require('./getPipelineStatus');
const { createSeuratPipeline } = require('./pipelineConstruct');
const { createObj2sPipeline } = require('./pipelineConstruct');

const Sample = require('../../model/Sample');
const Experiment = require('../../model/Experiment');
Expand All @@ -15,9 +15,9 @@ const getLogger = require('../../../utils/getLogger');

const { getGem2sParams, formatSamples } = require('./shouldPipelineRerun');

const logger = getLogger('[SeuratService] - ');
const logger = getLogger('[Obj2sService] - ');

const { SEURAT_PROCESS_NAME } = require('../../constants');
const { OBJ2S_PROCESS_NAME } = require('../../constants');

const hookRunner = new HookRunner();

Expand All @@ -29,7 +29,7 @@ const updateProcessingConfig = async (payload) => {

await new Experiment().updateById(experimentId, { processing_config: item.processingConfig });

logger.log(`Experiment: ${experimentId}. Saved processing config received from seurat`);
logger.log(`Experiment: ${experimentId}. Saved processing config received from obj2s`);
};

/**
Expand Down Expand Up @@ -69,35 +69,35 @@ const setupSubsetSamples = async (payload) => {
};

hookRunner.register('subsetSeurat', [setupSubsetSamples]);
hookRunner.register('uploadSeuratToAWS', [
hookRunner.register('uploadObj2sToAWS', [
updateProcessingConfig,
]);


hookRunner.registerAll([sendNotification]);

const sendUpdateToSubscribed = async (experimentId, message, io) => {
const statusRes = await getPipelineStatus(experimentId, SEURAT_PROCESS_NAME);
const statusRes = await getPipelineStatus(experimentId, OBJ2S_PROCESS_NAME);

// Concatenate into a proper response.
const response = {
...message,
status: statusRes,
type: SEURAT_PROCESS_NAME,
type: OBJ2S_PROCESS_NAME,
};

const { error = null } = message.response || {};
if (error) {
logger.log(`Error in ${SEURAT_PROCESS_NAME} received`);
logger.log(`Error in ${OBJ2S_PROCESS_NAME} received`);
}

logger.log('Sending to all clients subscribed to experiment', experimentId);

io.sockets.emit(`ExperimentUpdates-${experimentId}`, response);
};

const generateSeuratTaskParams = async (experimentId, rawSamples, authJWT) => {
logger.log('Generating seurat params');
const generateObj2sTaskParams = async (experimentId, rawSamples, authJWT) => {
logger.log('Generating obj2s params');
const experiment = await new Experiment().findById(experimentId).first();
const {
sampleTechnology,
Expand Down Expand Up @@ -133,42 +133,42 @@ const generateSeuratTaskParams = async (experimentId, rawSamples, authJWT) => {
};
};

const startSeuratPipeline = async (params, authJWT) => {
logger.log('Creating SEURAT params...');
const startObj2sPipeline = async (params, authJWT) => {
logger.log('Creating OBJ2S params...');
const { experimentId } = params;
const samples = await new Sample().getSamples(experimentId);

const currentSeuratParams = await getGem2sParams(experimentId, samples);
const taskParams = await generateSeuratTaskParams(experimentId, samples, authJWT);
const currentObj2sParams = await getGem2sParams(experimentId, samples);
const taskParams = await generateObj2sTaskParams(experimentId, samples, authJWT);

const {
stateMachineArn,
executionArn,
} = await createSeuratPipeline(experimentId, taskParams, authJWT);
} = await createObj2sPipeline(experimentId, taskParams, authJWT);

logger.log('SEURAT params created.');
logger.log('OBJ2S params created.');

const newExecution = {
last_pipeline_params: currentSeuratParams,
last_pipeline_params: currentObj2sParams,
state_machine_arn: stateMachineArn,
execution_arn: executionArn,
};

// Save execution params
await new ExperimentExecution().updateExecution(
experimentId,
SEURAT_PROCESS_NAME,
OBJ2S_PROCESS_NAME,
newExecution,
params,
);
logger.log('SEURAT params saved.');
logger.log('OBJ2S params saved.');

return newExecution;
};

const handleSeuratResponse = async (io, message) => {
const handleObj2sResponse = async (io, message) => {
// Fail hard if there was an error.
await validateRequest(message, 'SeuratResponse.v2.yaml');
await validateRequest(message, 'OBJ2SResponse.v2.yaml');

await hookRunner.run(message);

Expand All @@ -183,28 +183,28 @@ const handleSeuratResponse = async (io, message) => {
await sendUpdateToSubscribed(experimentId, messageForClient, io);
};

const runSeurat = async (params, authorization) => {
const runObj2s = async (params, authorization) => {
const { experimentId } = params;

logger.log(`Starting seurat for experiment ${experimentId}`);
logger.log(`Starting obj2s for experiment ${experimentId}`);

const { parentExperimentId = null } = await new ExperimentParent()
.find({ experiment_id: experimentId })
.first();

if (parentExperimentId) {
throw new MethodNotAllowedError(`Experiment ${experimentId} can't run seurat`);
throw new MethodNotAllowedError(`Experiment ${experimentId} can't run obj2s`);
}

const newExecution = await startSeuratPipeline(params, authorization);
const newExecution = await startObj2sPipeline(params, authorization);

logger.log(`Started seurat for experiment ${experimentId} successfully, `);
logger.log(`Started obj2s for experiment ${experimentId} successfully, `);
logger.log('New executions data:');
logger.log(JSON.stringify(newExecution));
};

module.exports = {
runSeurat,
startSeuratPipeline,
handleSeuratResponse,
runObj2s,
startObj2sPipeline,
handleObj2sResponse,
};
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const {
QC_PROCESS_NAME, GEM2S_PROCESS_NAME, SUBSET_PROCESS_NAME, SEURAT_PROCESS_NAME, COPY_PROCESS_NAME,
QC_PROCESS_NAME, GEM2S_PROCESS_NAME, SUBSET_PROCESS_NAME, OBJ2S_PROCESS_NAME, COPY_PROCESS_NAME,
} = require('../../../../constants');
const getGeneralParams = require('./paramsGetters/getGeneralParams');
const getQCParams = require('./paramsGetters/getQCParams');
Expand All @@ -9,7 +9,7 @@ const buildParams = (context, stepArgs) => {

if (context.processName === QC_PROCESS_NAME) {
stepParams = getQCParams(context, stepArgs);
} else if ([GEM2S_PROCESS_NAME, SEURAT_PROCESS_NAME].includes(context.processName)) {
} else if ([GEM2S_PROCESS_NAME, OBJ2S_PROCESS_NAME].includes(context.processName)) {
stepParams = context.taskParams;
} else if ([SUBSET_PROCESS_NAME, COPY_PROCESS_NAME].includes(context.processName)) {
stepParams = context.taskParams[stepArgs.taskName];
Expand Down
Loading
Loading