Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BIOMAGE-1044] Add a needsRunning flag to gem2s pipeline status #147

Merged
merged 8 commits into from
Jun 11, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/api/general-services/pipeline-manage/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ const SUCCEEDED = 'SUCCEEDED';
// Custom defined statuses defined in the API
const NOT_CREATED = 'NOT_CREATED';

// Additional custom statuses

module.exports = {
QC_PROCESS_NAME,
GEM2S_PROCESS_NAME,
Expand Down
31 changes: 2 additions & 29 deletions src/api/general-services/pipeline-manage/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,34 +231,7 @@ const createQCPipeline = async (experimentId, processingConfigUpdates) => {
const createGem2SPipeline = async (experimentId) => {
const accountId = await config.awsAccountIdPromise;
const roleArn = `arn:aws:iam::${accountId}:role/state-machine-role-${config.clusterEnv}`;

const experiment = await experimentService.getExperimentData(experimentId);
const { samples } = await samplesService.getSamplesByExperimentId(experimentId);
const { metadataKeys } = await projectService.getProject(experiment.projectId);

const defaultMetadataValue = 'N.A.';

const taskParams = {
projectId: experiment.projectId,
experimentName: experiment.experimentName,
organism: experiment.meta.organism,
input: { type: experiment.meta.type },
sampleIds: samples.ids,
sampleNames: samples.ids.map((id) => samples[id].name),
};

if (metadataKeys.length) {
taskParams.metadata = metadataKeys.reduce((acc, key) => {
// Make sure the key does not contain '-' as it will cause failure in GEM2S
const sanitizedKey = key.replace(/-+/g, '_');

acc[sanitizedKey] = samples.ids.map(
// Fetch using unsanitized key as it is the key used to store metadata in sample
(sampleUuid) => samples[sampleUuid].metadata[key] || defaultMetadataValue,
);
return acc;
}, {});
}
const taskParams = await projectService.getGem2sParams(experimentId);

const context = {
taskParams,
Expand All @@ -285,7 +258,7 @@ const createGem2SPipeline = async (experimentId) => {
const executionArn = await executeStateMachine(stateMachineArn);
logger.log(`Execution with ARN ${executionArn} created.`);

return { stateMachineArn, executionArn };
return { stateMachineArn, executionArn, paramsHash: taskParams.paramsHash };
};


Expand Down
27 changes: 27 additions & 0 deletions src/api/general-services/pipeline-status.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const _ = require('lodash');
const AWS = require('../../utils/requireAWS');
const ExperimentService = require('../route-services/experiment');
const ProjectService = require('../route-services/projects');
const config = require('../../config');
const logger = require('../../utils/logging');
const pipelineConstants = require('./pipeline-manage/constants');
Expand Down Expand Up @@ -74,6 +75,10 @@ const getStepsFromExecutionHistory = (events) => {
}
}

if (!events.length) {
return [];
}

const main = new Branch(events[0], true);
for (let ii = 1; ii < events.length; ii += 1) {
const consumer = main.nextConsumer(events[ii]);
Expand All @@ -98,6 +103,21 @@ const getStepsFromExecutionHistory = (events) => {
return shortestCompletedToReport || [];
};

const gem2sNeedsRunning = async (handles, experimentId, processName, response) => {
let needsRunning = true;
const gem2sStatus = response[processName];
if (gem2sStatus.status === pipelineConstants.RUNNING) {
needsRunning = false;
} else if (gem2sStatus.status === pipelineConstants.SUCCEEDED) {
if (handles[processName].paramsHash) {
const projectService = new ProjectService();
const gem2sParams = await projectService.getGem2sParams(experimentId);
needsRunning = gem2sParams.paramsHash !== handles[processName].paramsHash;
}
}
return needsRunning;
};

/*
* Return `completedSteps` of the state machine (SM) associated to the `experimentId`'s pipeline
* The code assumes that
Expand Down Expand Up @@ -156,6 +176,13 @@ const getPipelineStatus = async (experimentId, processName) => {
completedSteps,
},
};

if (processName === pipelineConstants.GEM2S_PROCESS_NAME) {
response[processName].needsRunning = await gem2sNeedsRunning(
pipelinesHandles, experimentId, processName, response,
);
}

return response;
};

Expand Down
37 changes: 37 additions & 0 deletions src/api/route-services/projects.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const crypto = require('crypto');
const config = require('../../config');
const {
createDynamoDbInstance, convertToDynamoDbRecord, convertToJsObject,
Expand Down Expand Up @@ -211,6 +212,42 @@ class ProjectsService {
throw e;
}
}

async getGem2sParams(experimentId) {
const experiment = await experimentService.getExperimentData(experimentId);
const { samples } = await samplesService.getSamplesByExperimentId(experimentId);
const { metadataKeys } = await this.getProject(experiment.projectId);

const defaultMetadataValue = 'N.A.';

const taskParams = {
projectId: experiment.projectId,
experimentName: experiment.experimentName,
organism: experiment.meta.organism,
input: { type: experiment.meta.type },
sampleIds: samples.ids,
sampleNames: samples.ids.map((id) => samples[id].name),
};

if (metadataKeys.length) {
taskParams.metadata = metadataKeys.reduce((acc, key) => {
// Make sure the key does not contain '-' as it will cause failure in GEM2S
const sanitizedKey = key.replace(/-+/g, '_');

acc[sanitizedKey] = samples.ids.map(
// Fetch using unsanitized key as it is the key used to store metadata in sample
(sampleUuid) => samples[sampleUuid].metadata[key] || defaultMetadataValue,
);
return acc;
}, {});
}
taskParams.paramsHash = crypto
.createHash('sha1')
.update(JSON.stringify(taskParams))
.digest('hex');

return taskParams;
}
}


Expand Down
158 changes: 141 additions & 17 deletions tests/api/general-services/pipeline-status.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
const pipelineConstants = require('../../../src/api/general-services/pipeline-manage/constants');
const AWSMock = require('aws-sdk-mock');
const AWS = require('../../../src/utils/requireAWS');
const constants = require('../../../src/api/general-services/pipeline-manage/constants');
const experimentHelpers = require('../../../src/api/route-services/experimentHelpers');

const getExperimentAttributesSpy = jest.spyOn(experimentHelpers, 'getExperimentAttributes');

const pipelineStatus = require('../../../src/api/general-services/pipeline-status');
const ExperimentService = require('../../../src/api/route-services/experiment');
const ProjectService = require('../../../src/api/route-services/projects');

const {
RUNNING, SUCCEEDED, NOT_CREATED, FAILED, TIMED_OUT, ABORTED, GEM2S_PROCESS_NAME, QC_PROCESS_NAME,
} = constants;

describe('getStepsFromExecutionHistory', () => {
const fullHistory = [
Expand Down Expand Up @@ -337,37 +347,151 @@ describe('getStepsFromExecutionHistory', () => {
});
});

jest.mock('../../../src/api/route-services/experiment', () => jest.fn().mockImplementation(() => {
// eslint-disable-next-line global-require
const internalConstants = require('../../../src/api/general-services/pipeline-manage/constants');

return {
getPipelinesHandles: () => ({
[internalConstants.GEM2S_PROCESS_NAME]: {
describe('pipelineStatus', () => {
const mockNotRunResponse = {
meta: {
[GEM2S_PROCESS_NAME]: {
stateMachineArn: '',
executionArn: '',
},
[internalConstants.QC_PROCESS_NAME]: {
[QC_PROCESS_NAME]: {
stateMachineArn: '',
executionArn: '',
},
}),
},
};
}));

describe('pipelineStatus', () => {
const mockHasBeenRun = {
meta: {
[GEM2S_PROCESS_NAME]: {
stateMachineArn: 'arnSM_gem2s',
executionArn: 'arnE_sem2s',
paramsHash: 'hashOfTheSamplesData',
},
[QC_PROCESS_NAME]: {
stateMachineArn: 'arnSM_qc',
executionArn: 'arnE_qc',
},
},
};

const mockHasBeenRunLegacy = {
meta: {
[GEM2S_PROCESS_NAME]: {
stateMachineArn: 'arnSM_gem2s',
executionArn: 'arnE_sem2s',
},
[QC_PROCESS_NAME]: {
stateMachineArn: 'arnSM_qc',
executionArn: 'arnE_qc',
},
},
};

const mockDescribeExecution = jest.fn();

const mockDynamoGetItem = jest.fn().mockImplementation(() => ({
Item: AWS.DynamoDB.Converter.marshall({
// Dumb meaningless payload to prevent crahes
meta: {},
samples: {
ids: [],
},
}),
}));

beforeEach(() => {
ExperimentService.mockClear();
getExperimentAttributesSpy.mockClear();
AWSMock.setSDKInstance(AWS);

AWSMock.mock('StepFunctions', 'describeExecution', (params, callback) => {
callback(null, mockDescribeExecution(params));
});

AWSMock.mock('StepFunctions', 'getExecutionHistory', (params, callback) => {
callback(null, { events: [] });
});

AWSMock.mock('DynamoDB', 'getItem', (params, callback) => {
callback(null, mockDynamoGetItem(params));
});
});

it('handles properly an empty dynamodb record', async () => {
const status = await pipelineStatus('1234', pipelineConstants.QC_PROCESS_NAME);
getExperimentAttributesSpy.mockReturnValueOnce(mockNotRunResponse);
const status = await pipelineStatus('1234', QC_PROCESS_NAME);

expect(status).toEqual({
[pipelineConstants.QC_PROCESS_NAME]: {
[QC_PROCESS_NAME]: {
startDate: null,
stopDate: null,
status: pipelineConstants.NOT_CREATED,
status: constants.NOT_CREATED,
completedSteps: [],
},
});

expect(mockDynamoGetItem).not.toHaveBeenCalled();
});

it('returns a true "needsRunning" attribute for failed/not started gem2s pipelines', async () => {
getExperimentAttributesSpy.mockReturnValue(mockHasBeenRun);

[NOT_CREATED, FAILED, TIMED_OUT, ABORTED].forEach(async (state) => {
mockDescribeExecution.mockReturnValue({ startDate: null, stopDate: null, status: state });
// eslint-disable-next-line no-await-in-loop
const status = await pipelineStatus('1234', GEM2S_PROCESS_NAME);
expect(status[GEM2S_PROCESS_NAME].needsRunning).toBe(true);
expect(mockDynamoGetItem).not.toHaveBeenCalled();
});
});

it('returns a false "needsRunning" attribute for running gem2s pipelines', async () => {
getExperimentAttributesSpy.mockReturnValue(mockHasBeenRun);
mockDescribeExecution.mockReturnValue(
{ startDate: null, stopDate: null, status: RUNNING },
);

const status = await pipelineStatus('1234', GEM2S_PROCESS_NAME);

expect(status[GEM2S_PROCESS_NAME].needsRunning).toBe(false);
expect(mockDynamoGetItem).not.toHaveBeenCalled();
});

it('returns a true "needsRunning" attribute for sucesful gem2s pipelines without hash', async () => {
getExperimentAttributesSpy.mockReturnValue(mockHasBeenRunLegacy);
mockDescribeExecution.mockReturnValue(
{ startDate: null, stopDate: null, status: SUCCEEDED },
);

const status = await pipelineStatus('1234', GEM2S_PROCESS_NAME);

expect(status[GEM2S_PROCESS_NAME].needsRunning).toBe(true);
expect(mockDynamoGetItem).not.toHaveBeenCalled();
});

it('uses dynamodb to determine "needsRunning" attribute for succesful gem2s pipelines', async () => {
getExperimentAttributesSpy.mockReturnValue(mockHasBeenRun);
mockDescribeExecution.mockReturnValue(
{ startDate: null, stopDate: null, status: SUCCEEDED },
);
jest.spyOn(ProjectService.prototype, 'getProject').mockReturnValue({ metadataKeys: [] });

const status = await pipelineStatus('1234', GEM2S_PROCESS_NAME);

expect(status[GEM2S_PROCESS_NAME].needsRunning).toBe(true);
expect(mockDynamoGetItem).toHaveBeenCalled();
});

it('returns a false "needsRunning" attribute for succesful gem2s pipelines with matching hashes', async () => {
getExperimentAttributesSpy.mockReturnValue(mockHasBeenRun);
mockDescribeExecution.mockReturnValue(
{ startDate: null, stopDate: null, status: SUCCEEDED },
);
const gem2sParamsSpy = jest.spyOn(ProjectService.prototype, 'getGem2sParams').mockReturnValue({ paramsHash: 'hashOfTheSamplesData' });

const status = await pipelineStatus('1234', GEM2S_PROCESS_NAME);

expect(status[GEM2S_PROCESS_NAME].needsRunning).toBe(false);
expect(gem2sParamsSpy).toHaveBeenCalled();
});
});