Skip to content

Commit

Permalink
Merge pull request #430 from hms-dbmi-cellenics/biomage-changes-3
Browse files Browse the repository at this point in the history
Merging changes
  • Loading branch information
alexvpickering authored Mar 28, 2023
2 parents ebefb8a + b3c8edd commit 75c6819
Show file tree
Hide file tree
Showing 66 changed files with 1,385 additions and 243 deletions.
5 changes: 3 additions & 2 deletions .flux.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@ spec:
ref:
branch: FILLED_IN_BY_CI
---
apiVersion: image.toolkit.fluxcd.io/v1beta1
apiVersion: image.toolkit.fluxcd.io/v1beta2
kind: ImageRepository
metadata:
name: FILLED_IN_BY_CI
namespace: FILLED_IN_BY_CI
spec:
image: FILLED_IN_BY_CI
interval: 2m0s
provider: aws
---
apiVersion: image.toolkit.fluxcd.io/v1beta1
apiVersion: image.toolkit.fluxcd.io/v1beta2
kind: ImagePolicy
metadata:
name: FILLED_IN_BY_CI
Expand Down
6 changes: 3 additions & 3 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#### Link to staging deployment URL (or set N/A)
<!---
Delete this comment and include the URL of the staging environment for this pull request.
Refer to https://github.com/hms-dbmi-cellenics/biomage-utils#stage on how to stage a staging environment.
Refer to https://github.com/hms-dbmi-cellenics/cellenics-utils#stage on how to stage a staging environment.
If a staging environment for testing is not necessary for this PR, replace this comment with N/A
and explain why a staging environment is not required for this PR.
Expand Down Expand Up @@ -54,9 +54,9 @@ Have best practices and ongoing refactors being observed in this PR
- [ ] Unit tests written **or** no unit tests required for change, e.g. documentation update.

<!---
Download the latest production data using `biomage experiment pull`.
Download the latest production data using `cellenics experiment pull`.
To set up easy local testing with inframock, follow the instructions here: https://github.com/hms-dbmi-cellenics/inframock
To deploy to the staging environment, follow the instructions here: https://github.com/hms-dbmi-cellenics/biomage-utils
To deploy to the staging environment, follow the instructions here: https://github.com/hms-dbmi-cellenics/cellenics-utils
-->

### Integration testing
Expand Down
2 changes: 1 addition & 1 deletion .secrets.baseline
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"custom_plugin_paths": [],
"exclude": {
"lines": ".*integrity.*",
"lines": ".*integrity.*|\"datadogApiKey\": \"\",",
"files": "package-lock.json"
},
"generated_at": "2020-10-27T12:47:37Z",
Expand Down
18 changes: 17 additions & 1 deletion src/api.v2/controllers/experimentController.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,21 @@ const Sample = require('../model/Sample');
const invalidatePlotsForEvent = require('../../utils/plotConfigInvalidation/invalidatePlotsForEvent');
const events = require('../../utils/plotConfigInvalidation/events');
const getAdminSub = require('../../utils/getAdminSub');
const config = require('../../config');

const logger = getLogger('[ExperimentController] - ');

const getDefaultCPUMem = (env) => {
switch (env) {
case 'development':
return { podCPUs: null, podMemory: null };
case 'staging':
return { podCPUs: 1, podMemory: 14000 };
default:
return { podCPUs: 2, podMemory: 28000 };
}
};

const getAllExperiments = async (req, res) => {
const { user: { sub: userId } } = req;
logger.log(`Getting all experiments for user: ${userId}`);
Expand Down Expand Up @@ -49,8 +61,12 @@ const createExperiment = async (req, res) => {
const { name, description } = body;
logger.log('Creating experiment');

const { podCPUs, podMemory } = getDefaultCPUMem(config.clusterEnv);

await sqlClient.get().transaction(async (trx) => {
await new Experiment(trx).create({ id: experimentId, name, description });
await new Experiment(trx).create({
id: experimentId, name, description, pod_cpus: podCPUs, pod_memory: podMemory,
});
await new UserAccess(trx).createNewExperimentPermissions(user.sub, experimentId);
});

Expand Down
24 changes: 20 additions & 4 deletions src/api.v2/controllers/metadataTrackController.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,26 +82,41 @@ const patchValueForSample = async (req, res) => {
const parseMetadataFromTSV = (data, sampleNameToId) => {
const invalidLines = [];
const invalidSamples = new Set();
const invalidDuplicates = [];

const sampleMetadataPairCounts = {};

const result = data.trim().split('\n').map((line, index) => {
// check that there are 3 elements per line
const elements = line.split('\t');
const elements = line.trim().split('\t');
if (elements.length !== 3) {
invalidLines.push(index + 1);
}

const sampleName = elements[0].trim();
const metadataKey = elements[1].trim().replace(/\s+/, '_');
const metadataValue = elements[2].trim();

// check that the sample name exists in the experiment
const [sampleName, metadataKey, metadataValue] = elements;
if (!(sampleName in sampleNameToId)) {
invalidSamples.add(sampleName);
}

// Check for multiple metadata assignment to the same sample and track
if (!Object.prototype.hasOwnProperty.call(sampleMetadataPairCounts, `${sampleName}@${metadataKey}`)) {
sampleMetadataPairCounts[`${sampleName}@${metadataKey}`] = index + 1;
} else {
const duplicateLine = sampleMetadataPairCounts[`${sampleName}@${metadataKey}`];
invalidDuplicates.push(`${duplicateLine} & ${index + 1}`);
}

return { sampleId: sampleNameToId[sampleName], metadataKey, metadataValue };
});

const errors = [];
if (invalidSamples.size > 0) errors.push(`Invalid sample names: ${Array.from(invalidSamples).join(', ')}`);
if (invalidLines.length > 0) errors.push(`Invalid lines: ${invalidLines.join(', ')}`);
if (invalidSamples.size > 0) errors.push(`Invalid sample names on line(s): ${Array.from(invalidSamples).join(', ')}`);
if (invalidLines.length > 0) errors.push(`Invalid line(s): ${invalidLines.join(', ')}`);
if (invalidDuplicates.length > 0) errors.push(`Multiple assignments to the same entry on lines: ${invalidDuplicates.join(', ')}`);
if (errors.length > 0) throw new BadRequestError(errors.join('\n'));

return result;
Expand Down Expand Up @@ -136,4 +151,5 @@ module.exports = {
deleteMetadataTrack,
patchValueForSample,
createMetadataFromFile,
parseMetadataFromTSV,
};
1 change: 1 addition & 0 deletions src/api.v2/events/validateAndSubmitWork.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const checkSomeEqualTo = (array, testValue) => array.some((item) => item === tes
const validateAndSubmitWork = async (workRequest) => {
const { experimentId } = workRequest;


// Check if pipeline is runnning
const { qc: { status: qcPipelineStatus } } = await getPipelineStatus(
experimentId, pipelineConstants.QC_PROCESS_NAME,
Expand Down
5 changes: 3 additions & 2 deletions src/api.v2/helpers/pipeline/__mocks__/getPipelineStatus.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
const pipelineConstants = require('../../../constants');

const date = new Date(1458619200000);
const responseTemplates = {
gem2s: {
completedSteps: [],
startDate: null,
startDate: date,
status: pipelineConstants.SUCCEEDED,
stopDate: null,
},
Expand All @@ -15,7 +16,7 @@ const responseTemplates = {
},
qc: {
completedSteps: [],
startDate: null,
startDate: date,
status: pipelineConstants.SUCCEEDED,
stopDate: null,
},
Expand Down
82 changes: 60 additions & 22 deletions src/api.v2/helpers/pipeline/getPipelineStatus.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const getLogger = require('../../../utils/getLogger');
const pipelineConstants = require('../../constants');
const { getPipelineStepNames } = require('./pipelineConstruct/skeletons');
const shouldPipelineRerun = require('./shouldPipelineRerun');
const { qcStepNames, stepNameToBackendStepNames } = require('./pipelineConstruct/constructors/qcStepNameTranslations');

const logger = getLogger();

Expand All @@ -20,7 +21,8 @@ const qcPipelineSteps = [
'NumGenesVsNumUmisFilter',
'DoubletScoresFilter',
'DataIntegration',
'ConfigureEmbedding'];
'ConfigureEmbedding',
];

const gem2sPipelineSteps = [
'DownloadGem',
Expand All @@ -29,12 +31,14 @@ const gem2sPipelineSteps = [
'DoubletScores',
'CreateSeurat',
'PrepareExperiment',
'UploadToAWS'];
'UploadToAWS',
];

const seuratPipelineSteps = [
'DownloadSeurat',
'ProcessSeurat',
'UploadSeuratToAWS'];
'UploadSeuratToAWS'
];

// pipelineStepNames are the names of pipeline steps for which we
// want to report the progress back to the user
Expand Down Expand Up @@ -218,6 +222,51 @@ const getStepsFromExecutionHistory = (events) => {
return shortestCompletedToReport || [];
};

/**
*
* @param {*} processName The name of the pipeline to get the steps for,
* currently either qc or gem2s
* @param {*} stateMachineArn
* @param {*} lastRunExecutedSteps The steps that were executed in the last run
* @param {*} stepFunctions stepFunctions client
* @returns array of steps that can be considered completed
*
* If processName = gem2s, it returns executedSteps because we don't support partial reruns so
* we can always assume all executedSteps are all completed steps
*
* If processName = qc: it returns lastRunExecutedSteps + stepsCompletedInPreviousRuns
* stepsCompletedInPreviousRuns is all the steps that weren't scheduled to run in the last run
* The only reason we don't schedule steps is when we consider them completed,
* so we can keep considering them completed for future runs as well
*/
const getCompletedSteps = async (
processName, stateMachineArn, lastRunExecutedSteps, stepFunctions,
) => {
let completedSteps;

if (processName === 'qc') {
const stateMachine = await stepFunctions.describeStateMachine({
stateMachineArn,
}).promise();

// Get all the steps that were scheduled to be run in the last execution
const lastScheduledSteps = Object.keys(JSON.parse(stateMachine.definition).States);

// Remove from all qc steps the ones that were scheduled for execution in the last run
// We are left with all the qc steps that last run didn't consider necessary to rerun
// This means that these steps were considered completed in the last run so
// we can still consider them completed
const stepsCompletedInPreviousRuns = _.difference(qcStepNames, lastScheduledSteps)
.map((rawStepName) => stepNameToBackendStepNames[rawStepName]);

completedSteps = stepsCompletedInPreviousRuns.concat(lastRunExecutedSteps);
} if (processName === 'gem2s' || processName === 'seurat') {
completedSteps = lastRunExecutedSteps;
}

return completedSteps;
};

/*
* Return `completedSteps` of the state machine (SM) associated to the `experimentId`'s pipeline
* The code assumes that
Expand All @@ -239,36 +288,25 @@ const getPipelineStatus = async (experimentId, processName) => {
});

let execution = {};
let completedSteps = [];
let error = false;
let response;
let response = null;

const { executionArn = null, lastStatusResponse } = pipelineExecution;
const shouldRerun = await shouldPipelineRerun(experimentId, processName);
const { executionArn = null, stateMachineArn = null, lastStatusResponse } = pipelineExecution;
const shouldRerun = await shouldPipelineRerun(experimentId, processName);

try {
execution = await stepFunctions.describeExecution({
executionArn,
}).promise();

const events = await getExecutionHistory(stepFunctions, executionArn);

error = checkError(events);

const executedSteps = getStepsFromExecutionHistory(events);
const lastExecuted = executedSteps[executedSteps.length - 1];
switch (processName) {
case pipelineConstants.QC_PROCESS_NAME:
completedSteps = qcPipelineSteps.slice(0, qcPipelineSteps.indexOf(lastExecuted) + 1);
break;
case pipelineConstants.GEM2S_PROCESS_NAME:
completedSteps = gem2sPipelineSteps.slice(0, gem2sPipelineSteps.indexOf(lastExecuted) + 1);
break;
case pipelineConstants.SEURAT_PROCESS_NAME:
completedSteps = seuratPipelineSteps.slice(0, seuratPipelineSteps.indexOf(lastExecuted) + 1);
break;
default:
logger.error(`unknown process name ${processName}`);
}

const completedSteps = await getCompletedSteps(
processName, stateMachineArn, executedSteps, stepFunctions,
);

response = buildResponse(processName, execution, shouldRerun, error, completedSteps);
} catch (e) {
Expand Down
4 changes: 3 additions & 1 deletion src/api.v2/helpers/pipeline/handleQCResponse.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ const getPipelineStatus = require('./getPipelineStatus');

const Experiment = require('../../model/Experiment');
const Plot = require('../../model/Plot');
const submitEmbeddingWork = require('../worker/workSubmit/submitEmbeddingWork');
const submitMarkerHeatmapWork = require('../worker/workSubmit/submitMarkerHeatmapWork');

const logger = getLogger();

const hookRunner = new HookRunner();

hookRunner.register(constants.ASSIGN_POD_TO_PIPELINE, [assignPodToPipeline]);
hookRunner.registerAll([sendNotification]);
hookRunner.register('configureEmbedding', [cleanupPods, updatePipelineVersion]);
hookRunner.register('configureEmbedding', [cleanupPods, updatePipelineVersion, submitEmbeddingWork, submitMarkerHeatmapWork]);

const getOutputFromS3 = async (message) => {
const { output: { bucket, key } } = message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const getGeneralParams = (taskName, context) => {
taskName,
processName,
server: remoterServer,
ignoreSslCert: config.pipelineIgnoreSSLCertificate,
};

return params;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@

const filterToStepName = {
classifier: 'ClassifierFilterMap',
cellSizeDistribution: 'CellSizeDistributionFilterMap',
mitochondrialContent: 'MitochondrialContentFilterMap',
numGenesVsNumUmis: 'NumGenesVsNumUmisFilterMap',
doubletScores: 'DoubletScoresFilterMap',
dataIntegration: 'DataIntegration',
configureEmbedding: 'ConfigureEmbedding',
};

const qcStepNames = [
'ClassifierFilterMap',
'CellSizeDistributionFilterMap',
'MitochondrialContentFilterMap',
'NumGenesVsNumUmisFilterMap',
'DoubletScoresFilterMap',
'DataIntegration',
'ConfigureEmbedding',
];

const backendStepNamesToStepName = {
ClassifierFilter: 'ClassifierFilterMap',
CellSizeDistributionFilter: 'CellSizeDistributionFilterMap',
MitochondrialContentFilter: 'MitochondrialContentFilterMap',
NumGenesVsNumUmisFilter: 'NumGenesVsNumUmisFilterMap',
DoubletScoresFilter: 'DoubletScoresFilterMap',
DataIntegration: 'DataIntegration',
ConfigureEmbedding: 'ConfigureEmbedding',
};

const stepNameToBackendStepNames = {
ClassifierFilterMap: 'ClassifierFilter',
CellSizeDistributionFilterMap: 'CellSizeDistributionFilter',
MitochondrialContentFilterMap: 'MitochondrialContentFilter',
NumGenesVsNumUmisFilterMap: 'NumGenesVsNumUmisFilter',
DoubletScoresFilterMap: 'DoubletScoresFilter',
DataIntegration: 'DataIntegration',
ConfigureEmbedding: 'ConfigureEmbedding',
};

module.exports = {
stepNameToBackendStepNames, backendStepNamesToStepName, qcStepNames, filterToStepName,
};
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const submitBatchJob = (context, step) => {
Type: 'Task',
Resource: 'arn:aws:states:::batch:submitJob',
Parameters: {
JobDefinition: `job-pipeline-${environment}`,
JobDefinition: `job-pipeline-${environment}-${config.sandboxId}`,
JobName: `${environment}-${experimentId}-${processName}`, // changing the name will break job termination when a new one is submitted
JobQueue: `queue-pipeline-${environment}`,
ContainerOverrides: {
Expand Down Expand Up @@ -49,10 +49,6 @@ const submitBatchJob = (context, step) => {
Name: 'BATCH',
Value: 'true',
},
{
Name: 'IGNORE_SSL_CERTIFICATE',
Value: `${config.awsBatchIgnoreSSLCertificate}`,
},
{
Name: 'DOMAIN_NAME',
Value: `${config.domainName}`,
Expand Down
Loading

0 comments on commit 75c6819

Please sign in to comment.