Skip to content

Commit

Permalink
Merge pull request #359 from hms-dbmi-cellenics/1877-implement-run-qc…
Browse files Browse the repository at this point in the history
…-for-api-v2

[BIOMAGE-1877] Implement run qc for api v2
  • Loading branch information
cosa65 authored Jun 6, 2022
2 parents f3d0f86 + 1b97049 commit cdbfa22
Show file tree
Hide file tree
Showing 43 changed files with 1,798 additions and 190 deletions.
21 changes: 4 additions & 17 deletions src/api.v2/controllers/experimentController.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ const getLogger = require('../../utils/getLogger');
const { OK, NotFoundError } = require('../../utils/responses');
const sqlClient = require('../../sql/sqlClient');

const constants = require('../helpers/pipeline/constants');
const getPipelineStatus = require('../helpers/pipeline/getPipelineStatus');
const getWorkerStatus = require('../helpers/worker/getWorkerStatus');
const getExperimentBackendStatus = require('../helpers/backendStatus/getExperimentBackendStatus');

const logger = getLogger('[ExperimentController] - ');

Expand All @@ -36,6 +34,7 @@ const getExperiment = async (req, res) => {
const createExperiment = async (req, res) => {
const { params: { experimentId }, user, body } = req;
const { name, description } = body;

logger.log('Creating experiment');

await sqlClient.get().transaction(async (trx) => {
Expand Down Expand Up @@ -118,22 +117,10 @@ const getBackendStatus = async (req, res) => {
const { experimentId } = req.params;
logger.log(`Getting backend status for experiment ${experimentId}`);

const [{ gem2s }, { qc }, { worker }] = await Promise.all(
[
getPipelineStatus(experimentId, constants.GEM2S_PROCESS_NAME),
getPipelineStatus(experimentId, constants.QC_PROCESS_NAME),
getWorkerStatus(experimentId),
],
);

const formattedResponse = {
[constants.OLD_QC_NAME_TO_BE_REMOVED]: qc,
gem2s,
worker,
};
const response = await getExperimentBackendStatus(experimentId);

logger.log(`Finished getting backend status for experiment ${experimentId} successfully`);
res.json(formattedResponse);
res.json(response);
};

const downloadData = async (req, res) => {
Expand Down
6 changes: 3 additions & 3 deletions src/api.v2/controllers/gem2sController.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const AWSXRay = require('aws-xray-sdk');

const { gem2sCreate, gem2sResponse } = require('../helpers/pipeline/gem2s');
const { createGem2sPipeline, handleGem2sResponse } = require('../helpers/pipeline/gem2s');
const { OK } = require('../../utils/responses');
const getLogger = require('../../utils/getLogger');
const parseSNSMessage = require('../../utils/parse-sns-message');
Expand All @@ -12,7 +12,7 @@ const runGem2s = async (req, res) => {

logger.log(`Starting gem2s for experiment ${experimentId}`);

const newExecution = await gem2sCreate(experimentId, req.body, req.headers.authorization);
const newExecution = await createGem2sPipeline(experimentId, req.body, req.headers.authorization);

logger.log(`Started gem2s for experiment ${experimentId} successfully, `);
logger.log('New executions data:');
Expand All @@ -38,7 +38,7 @@ const handleResponse = async (req, res) => {
const isSnsNotification = parsedMessage !== undefined;
if (isSnsNotification) {
try {
await gem2sResponse(io, parsedMessage);
await handleGem2sResponse(io, parsedMessage);
} catch (e) {
logger.error(
'gem2s pipeline response handler failed with error: ', e,
Expand Down
66 changes: 66 additions & 0 deletions src/api.v2/controllers/qcController.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// const { OK } = require('../../utils/responses');
const AWSXRay = require('aws-xray-sdk');

const { OK } = require('../../utils/responses');

const { createQCPipeline } = require('../helpers/pipeline/pipelineConstruct');
const handleQCResponse = require('../helpers/pipeline/handleQCResponse');

const getLogger = require('../../utils/getLogger');
const parseSNSMessage = require('../../utils/parse-sns-message');

const logger = getLogger('[QCController] - ');

const runQC = async (req, res) => {
const { experimentId } = req.params;
const { processingConfig } = req.body;

logger.log(`Starting qc for experiment ${experimentId}`);

await createQCPipeline(
req.params.experimentId,
processingConfig || [],
req.headers.authorization,
);

logger.log(`Started qc for experiment ${experimentId} successfully, `);

res.json(OK());
};

const handleResponse = async (req, res) => {
let result;

try {
result = await parseSNSMessage(req);
} catch (e) {
logger.error('Parsing initial SNS message failed:', e);
AWSXRay.getSegment().addError(e);
res.status(200).send('nok');
return;
}

const { io, parsedMessage } = result;

const isSnsNotification = parsedMessage !== undefined;
if (isSnsNotification) {
try {
await handleQCResponse(io, parsedMessage);
} catch (e) {
logger.error(
'qc pipeline response handler failed with error: ', e,
);

AWSXRay.getSegment().addError(e);
res.status(200).send('nok');
return;
}
}

res.status(200).send('ok');
};

module.exports = {
runQC,
handleResponse,
};
23 changes: 23 additions & 0 deletions src/api.v2/helpers/backendStatus/getExperimentBackendStatus.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
const constants = require('../pipeline/constants');
const getPipelineStatus = require('../pipeline/getPipelineStatus');
const getWorkerStatus = require('../worker/getWorkerStatus');

const getExperimentBackendStatus = async (experimentId) => {
const [{ gem2s }, { qc }, { worker }] = await Promise.all(
[
getPipelineStatus(experimentId, constants.GEM2S_PROCESS_NAME),
getPipelineStatus(experimentId, constants.QC_PROCESS_NAME),
getWorkerStatus(experimentId),
],
);

const formattedResponse = {
[constants.OLD_QC_NAME_TO_BE_REMOVED]: qc,
gem2s,
worker,
};

return formattedResponse;
};

module.exports = getExperimentBackendStatus;
8 changes: 4 additions & 4 deletions src/api.v2/helpers/pipeline/__mocks__/gem2s.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const gem2sCreate = jest.fn();
const gem2sResponse = jest.fn();
const createGem2sPipeline = jest.fn();
const handleGem2sResponse = jest.fn();

module.exports = {
gem2sCreate,
gem2sResponse,
createGem2sPipeline,
handleGem2sResponse,
};
31 changes: 19 additions & 12 deletions src/api.v2/helpers/pipeline/gem2s.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const AWSXRay = require('aws-xray-sdk');

const constants = require('./constants');
const getPipelineStatus = require('./getPipelineStatus');
const { createGem2SPipeline } = require('./pipelineConstruct');
const { createGem2SPipeline, createQCPipeline } = require('./pipelineConstruct');

const Sample = require('../../model/Sample');
const Experiment = require('../../model/Experiment');
Expand All @@ -19,17 +19,24 @@ const logger = getLogger('[Gem2sService] - ');

const hookRunner = new HookRunner();

const saveProcessingConfigFromGem2s = ({ experimentId, item }) => {
logger.log('Saving processing config for gem2s');
new Experiment().updateById(experimentId, { processing_config: item });
logger.log('Finished saving processing config for gem2s');
};
const continueToQC = async (payload) => {
const { experimentId, item } = payload;

await new Experiment().updateById(experimentId, { processing_config: item.processingConfig });

logger.log(`Experiment: ${experimentId}. Saved processing config received from gem2s`);

logger.log(`Experiment: ${experimentId}. Starting qc run because gem2s finished successfully`);

// we need to change this once we rework the pipeline message response
const authJWT = payload.authJWT || payload.input.authJWT;

await createQCPipeline(experimentId, [], authJWT);

const continueToQC = () => {
logger.log('Started qc successfully');
};

hookRunner.register('uploadToAWS', [
saveProcessingConfigFromGem2s,
continueToQC,
]);

Expand Down Expand Up @@ -119,7 +126,7 @@ const generateGem2sParams = async (experimentId, authJWT) => {
return taskParams;
};

const gem2sCreate = async (experimentId, body, authJWT) => {
const createGem2sPipeline = async (experimentId, body, authJWT) => {
logger.log('Creating GEM2S params...');
const { paramsHash } = body;

Expand Down Expand Up @@ -148,7 +155,7 @@ const gem2sCreate = async (experimentId, body, authJWT) => {
return newExecution;
};

const gem2sResponse = async (io, message) => {
const handleGem2sResponse = async (io, message) => {
AWSXRay.getSegment().addMetadata('message', message);

// Fail hard if there was an error.
Expand All @@ -168,6 +175,6 @@ const gem2sResponse = async (io, message) => {
};

module.exports = {
gem2sCreate,
gem2sResponse,
createGem2sPipeline,
handleGem2sResponse,
};
147 changes: 147 additions & 0 deletions src/api.v2/helpers/pipeline/handleQCResponse.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
const AWSXRay = require('aws-xray-sdk');
const validateRequest = require('../../../utils/schema-validator');
const AWS = require('../../../utils/requireAWS');
const getLogger = require('../../../utils/getLogger');


const HookRunner = require('./hooks/HookRunner');
const assignPodToPipeline = require('./hooks/assignPodToPipeline');
const { cleanupPods } = require('./hooks/podCleanup');
const sendNotification = require('./hooks/sendNotification');

const constants = require('./constants');
const getPipelineStatus = require('./getPipelineStatus');

const Experiment = require('../../model/Experiment');
const Plot = require('../../model/Plot');

const logger = getLogger();

const hookRunner = new HookRunner();

hookRunner.register(constants.ASSIGN_POD_TO_PIPELINE, [assignPodToPipeline]);
hookRunner.registerAll([sendNotification]);
hookRunner.register('configureEmbedding', [cleanupPods]);

const getOutputFromS3 = async (message) => {
const { output: { bucket, key } } = message;
const s3 = new AWS.S3();
const outputObject = await s3.getObject(
{
Bucket: bucket,
Key: key,
},
).promise();

return JSON.parse(outputObject.Body.toString());
};

const updatePlotDataKeys = async (taskName, experimentId, output) => {
if (output && output.plotDataKeys) {
const plotConfigUploads = Object.entries(output.plotDataKeys)
.map(async ([plotUuid, s3DataKey]) => (
await new Plot().upsert(
{ id: plotUuid, experiment_id: experimentId },
{ s3_data_key: s3DataKey },
)
));

logger.log(`Uploading plot data s3 keys for ${taskName} to sql`);

// Promise.all stops if it encounters errors.
// This handles errors so that error in one upload does not stop others
await Promise.all(
plotConfigUploads.map((p) => p.catch((e) => logger.error(e))),
);
}
};

const updateProcessingConfigWithQCStep = async (taskName, experimentId, output, sampleUuid) => {
// TODO the processing config validation was not being enforced because the 'anyOf' requirement
// was not being correctly applied. This needs to be refactored together with the
// pipeline and ideally while unifying the qc & gem2s responses.
// await validateRequest(output, 'ProcessingConfigBodies.v1.yaml');

const experiment = new Experiment();

const {
processingConfig: previousConfig,
} = await experiment.findById(experimentId).first();

if (sampleUuid !== '') {
const { auto } = output.config;

// This is a temporary fix to save defaultFilterSettings calculated in the QC pipeline
// to patch for old experiments with hardcoded defaultFilterSettings.
// Remove this once we're done migrating to the new experiment schema with
// defaultFilterSettings
const sampleOutput = output;

if (auto) sampleOutput.config.defaultFilterSettings = output.config.filterSettings;

await experiment.updateProcessingConfig(experimentId, [
{
name: taskName,
body: {
...previousConfig[taskName],
[sampleUuid]: { ...sampleOutput.config },
},
},
]);
} else {
await experiment.updateProcessingConfig(experimentId, [
{
name: taskName,
body: output.config,
},
]);
}
};

const sendUpdateToSubscribed = async (experimentId, message, output, error, io) => {
const statusRes = await getPipelineStatus(experimentId, constants.QC_PROCESS_NAME);
const statusResToSend = { pipeline: statusRes[constants.QC_PROCESS_NAME] };

// Concatenate into a proper response.
const response = {
...message,
status: statusResToSend,
type: constants.QC_PROCESS_NAME,
};

if (output !== null) {
response.output = output;
}
if (error) {
logger.log(`Error in ${constants.QC_PROCESS_NAME} received`);
AWSXRay.getSegment().addError(error);
}
logger.log('Sending to all clients subscribed to experiment', experimentId);
io.sockets.emit(`ExperimentUpdates-${experimentId}`, response);
};

const handleQCResponse = async (io, message) => {
AWSXRay.getSegment().addMetadata('message', message);

await validateRequest(message, 'PipelineResponse.v1.yaml');

await hookRunner.run(message);

const { experimentId } = message;
const { error = false } = message.response || {};

let qcStepOutput = null;
// if there aren't errors proceed with the updates
if (!error && 'output' in message) {
const { input: { sampleUuid, taskName } } = message;

qcStepOutput = await getOutputFromS3(message);

await updatePlotDataKeys(taskName, experimentId, qcStepOutput);
await updateProcessingConfigWithQCStep(taskName, experimentId, qcStepOutput, sampleUuid);
}
// we want to send the update to the subscribed both in successful and error case
await sendUpdateToSubscribed(experimentId, message, qcStepOutput, error, io);
};

module.exports = handleQCResponse;
Loading

0 comments on commit cdbfa22

Please sign in to comment.