Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BIOMAGE-989] - Work request throw error if pipeline has not yet succeeded #139

Merged
merged 7 commits into from
Jun 3, 2021
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion src/api/event-services/work-request.js
Original file line number Diff line number Diff line change
@@ -5,12 +5,50 @@ const { cacheGetRequest } = require('../../utils/cache-request');
const { CacheMissError } = require('../../cache/cache-utils');
const { handlePagination } = require('../../utils/handlePagination');
const validateRequest = require('../../utils/schema-validator');
const getPipelineStatus = require('../general-services/pipeline-status');

const pipelineConstants = require('../general-services/pipeline-manage/constants');

class WorkRequestError extends Error {
constructor(message) {
super(message);
this.name = 'WorkRequestError';
}
}

const createWorkResponseError = (message) => ({
response: {
code: 503,
error: message,
},
});


const handleWorkRequest = async (workRequest, socket) => {
const { uuid, pagination } = workRequest;
const { uuid, pagination, experimentId } = workRequest;

// Check if pipeline is runnning
const { qc: { status: qcPipelineStatus } } = await getPipelineStatus(
experimentId, pipelineConstants.QC_PROCESS_NAME,
);

try {
if (qcPipelineStatus === pipelineConstants.NOT_CREATED) {
throw new WorkRequestError('Work request can not be handled as QC pipeline has not been run');
}

if (qcPipelineStatus === pipelineConstants.RUNNING) {
throw new WorkRequestError('Work request can not be handled as a QC pipeline is running');
}

if ([
pipelineConstants.ABORTED,
pipelineConstants.FAILED,
pipelineConstants.TIMED_OUT,
].includes(qcPipelineStatus)) {
throw new WorkRequestError('Work request can not be handled because the previous QC pipeline run had an error.');
}

aerlaut marked this conversation as resolved.
Show resolved Hide resolved
logger.log(`Trying to fetch response to request ${uuid} from cache...`);
const cachedResponse = await cacheGetRequest(workRequest);
logger.log(`We found a cached response for ${uuid}. Checking if pagination is needed...`);
@@ -40,6 +78,12 @@ const handleWorkRequest = async (workRequest, socket) => {

const workSubmitService = new WorkSubmitService(workRequest);
await workSubmitService.submitWork();
} else if (e instanceof WorkRequestError) {
logger.log(e.message);
logger.log('Work request error : ', e.message);

socket.emit(`WorkResponse-${uuid}`, createWorkResponseError(e.message));
logger.log(`Error response sent back to ${uuid}`);
aerlaut marked this conversation as resolved.
Show resolved Hide resolved
} else {
logger.log('Unexpected error happened while trying to process cached response:', e.message);
aerlaut marked this conversation as resolved.
Show resolved Hide resolved
AWSXRay.getSegment().addError(e);
aerlaut marked this conversation as resolved.
Show resolved Hide resolved
8 changes: 5 additions & 3 deletions src/api/general-services/__mocks__/backend-status.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
const pipelineConstants = require('../pipeline-manage/constants');

const response = {
gem2s: {
completedSteps: [],
startDate: null,
status: 'NotCreated',
status: pipelineConstants.NOT_CREATED,
stopDate: null,
},
qc: {
completedSteps: [],
startDate: null,
status: 'NotCreated',
status: pipelineConstants.NOT_CREATED,
stopDate: null,
},
worker: {
ready: true,
restartCount: 0,
started: true,
status: 'Running',
status: pipelineConstants.RUNNING,
},
};

27 changes: 25 additions & 2 deletions src/api/general-services/pipeline-manage/constants.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,29 @@

// Pipeline names
const QC_PROCESS_NAME = 'qc';
const GEM2S_PROCESS_NAME = 'gem2s';
const OLD_QC_NAME_TO_BE_REMOVED = 'pipeline';

module.exports = { QC_PROCESS_NAME, GEM2S_PROCESS_NAME, OLD_QC_NAME_TO_BE_REMOVED };
// Pipeline states as defined in
// https://docs.aws.amazon.com/step-functions/latest/apireference/API_DescribeExecution.html
const RUNNING = 'RUNNING';
const FAILED = 'FAILED';
const TIMED_OUT = 'TIMED_OUT';
const ABORTED = 'ABORTED';
const SUCCEEDED = 'SUCCEEDED';

// Custom defined statuses defined in the API
const NOT_CREATED = 'NOT_CREATED';

// Additional custom statuses

module.exports = {
QC_PROCESS_NAME,
GEM2S_PROCESS_NAME,
OLD_QC_NAME_TO_BE_REMOVED,
RUNNING,
FAILED,
TIMED_OUT,
ABORTED,
SUCCEEDED,
NOT_CREATED,
};
3 changes: 2 additions & 1 deletion src/api/general-services/pipeline-status.js
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ const AWS = require('../../utils/requireAWS');
const ExperimentService = require('../route-services/experiment');
const config = require('../../config');
const logger = require('../../utils/logging');
const pipelineConstants = require('./pipeline-manage/constants');


const privateSteps = [
@@ -116,7 +117,7 @@ const getPipelineStatus = async (experimentId, processName) => {
execution = {
startDate: null,
stopDate: null,
status: 'NotCreated',
status: pipelineConstants.NOT_CREATED,
};
} else {
const stepFunctions = new AWS.StepFunctions({
37 changes: 16 additions & 21 deletions tests/api/general-services/pipeline-status.test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const constants = require('../../../src/api/general-services/pipeline-manage/constants');
const pipelineConstants = require('../../../src/api/general-services/pipeline-manage/constants');
const pipelineStatus = require('../../../src/api/general-services/pipeline-status');
const ExperimentService = require('../../../src/api/route-services/experiment');

@@ -337,35 +337,30 @@ describe('getStepsFromExecutionHistory', () => {
});
});

jest.mock('../../../src/api/route-services/experiment', () => jest.fn().mockImplementation(() => {
// eslint-disable-next-line global-require
const internalConstants = require('../../../src/api/general-services/pipeline-manage/constants');

return {
getPipelinesHandles: () => ({
[internalConstants.GEM2S_PROCESS_NAME]: {
stateMachineArn: '',
executionArn: '',
},
[internalConstants.QC_PROCESS_NAME]: {
stateMachineArn: '',
executionArn: '',
},
}),
};
}));
jest.mock('../../../src/api/route-services/experiment', () => jest.fn().mockImplementation(() => ({
getPipelinesHandles: () => ({
[pipelineConstants.GEM2S_PROCESS_NAME]: {
stateMachineArn: '',
executionArn: '',
},
[pipelineConstants.QC_PROCESS_NAME]: {
stateMachineArn: '',
executionArn: '',
},
}),
})));

describe('pipelineStatus', () => {
beforeEach(() => {
ExperimentService.mockClear();
});
it('handles properly an empty dynamodb record', async () => {
const status = await pipelineStatus('1234', constants.QC_PROCESS_NAME);
const status = await pipelineStatus('1234', pipelineConstants.QC_PROCESS_NAME);
expect(status).toEqual({
[constants.QC_PROCESS_NAME]: {
[pipelineConstants.QC_PROCESS_NAME]: {
startDate: null,
stopDate: null,
status: 'NotCreated',
status: pipelineConstants.NOT_CREATED,
completedSteps: [],
},
});