Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BIOMAGE-989] - Work request throw error if pipeline has not yet succeeded #139

Merged
merged 7 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/api/event-services/work-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,24 @@ const { cacheGetRequest } = require('../../utils/cache-request');
const { CacheMissError } = require('../../cache/cache-utils');
const { handlePagination } = require('../../utils/handlePagination');
const validateRequest = require('../../utils/schema-validator');
const getPipelineStatus = require('../general-services/pipeline-status');

const pipelineConstants = require('../general-services/pipeline-manage/constants');

const handleWorkRequest = async (workRequest, socket) => {
const { uuid, pagination } = workRequest;
const { uuid, pagination, experimentId } = workRequest;

// Check if pipeline is runnning
const { qc: { status: qcPipelineStatus } } = await getPipelineStatus(
experimentId, pipelineConstants.QC_PROCESS_NAME,
);

if (qcPipelineStatus !== pipelineConstants.SUCCEEDED) {
const e = new Error(`Work request can not be handled because pipeline is ${qcPipelineStatus}`);

AWSXRay.getSegment().addError(e);
throw e;
}

try {
logger.log(`Trying to fetch response to request ${uuid} from cache...`);
Expand Down
8 changes: 5 additions & 3 deletions src/api/general-services/__mocks__/backend-status.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
const pipelineConstants = require('../pipeline-manage/constants');

const response = {
gem2s: {
completedSteps: [],
startDate: null,
status: 'NotCreated',
status: pipelineConstants.NOT_CREATED,
stopDate: null,
},
qc: {
completedSteps: [],
startDate: null,
status: 'NotCreated',
status: pipelineConstants.NOT_CREATED,
stopDate: null,
},
worker: {
ready: true,
restartCount: 0,
started: true,
status: 'Running',
status: pipelineConstants.RUNNING,
},
};

Expand Down
40 changes: 40 additions & 0 deletions src/api/general-services/__mocks__/pipeline-status.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
const pipelineConstants = require('../pipeline-manage/constants');

const responseTemplates = {
gem2s: {
completedSteps: [],
startDate: null,
status: pipelineConstants.SUCCEEDED,
stopDate: null,
},
qc: {
completedSteps: [],
startDate: null,
status: pipelineConstants.SUCCEEDED,
stopDate: null,
},
worker: {
ready: true,
restartCount: 0,
started: true,
status: pipelineConstants.RUNNING,
},
};

const mockGetPipelineStatus = jest.fn(
(experimentId, processName) => new Promise((resolve, reject) => {
if (experimentId === 'nonExistentId') {
const err = new Error('Unkonwn project or sample');
err.status = 404;

reject(err);
}

const response = { [processName]: responseTemplates[processName] };

resolve(response);
}),
);

module.exports = mockGetPipelineStatus;
module.exports.responseTemplates = responseTemplates;
27 changes: 25 additions & 2 deletions src/api/general-services/pipeline-manage/constants.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,29 @@

// Pipeline names
const QC_PROCESS_NAME = 'qc';
const GEM2S_PROCESS_NAME = 'gem2s';
const OLD_QC_NAME_TO_BE_REMOVED = 'pipeline';

module.exports = { QC_PROCESS_NAME, GEM2S_PROCESS_NAME, OLD_QC_NAME_TO_BE_REMOVED };
// Pipeline states as defined in
// https://docs.aws.amazon.com/step-functions/latest/apireference/API_DescribeExecution.html
const RUNNING = 'RUNNING';
const FAILED = 'FAILED';
const TIMED_OUT = 'TIMED_OUT';
const ABORTED = 'ABORTED';
const SUCCEEDED = 'SUCCEEDED';

// Custom defined statuses defined in the API
const NOT_CREATED = 'NOT_CREATED';

// Additional custom statuses

module.exports = {
QC_PROCESS_NAME,
GEM2S_PROCESS_NAME,
OLD_QC_NAME_TO_BE_REMOVED,
RUNNING,
FAILED,
TIMED_OUT,
ABORTED,
SUCCEEDED,
NOT_CREATED,
};
3 changes: 2 additions & 1 deletion src/api/general-services/pipeline-status.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const AWS = require('../../utils/requireAWS');
const ExperimentService = require('../route-services/experiment');
const config = require('../../config');
const logger = require('../../utils/logging');
const pipelineConstants = require('./pipeline-manage/constants');


const privateSteps = [
Expand Down Expand Up @@ -116,7 +117,7 @@ const getPipelineStatus = async (experimentId, processName) => {
execution = {
startDate: null,
stopDate: null,
status: 'NotCreated',
status: pipelineConstants.NOT_CREATED,
};
} else {
const stepFunctions = new AWS.StepFunctions({
Expand Down
34 changes: 34 additions & 0 deletions tests/api/event-services/work-request.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ const MockSocket = require('socket.io-mock');
const handleWorkRequest = require('../../../src/api/event-services/work-request');
const handlePagination = require('../../../src/utils/handlePagination');
const CacheSingleton = require('../../../src/cache');
const getPipelineStatus = require('../../../src/api/general-services/pipeline-status');
const pipelineConstants = require('../../../src/api/general-services/pipeline-manage/constants');

jest.mock('../../../src/api/general-services/pipeline-status');
jest.mock('../../../src/utils/handlePagination');
jest.mock('../../../src/cache');

Expand Down Expand Up @@ -129,6 +132,37 @@ describe('handleWorkRequest', () => {
}
});

it('Throws if pipeline is not yet done or have failed', async () => {
// Initialize with an empty cache so a worker hit will be encountered.
CacheSingleton.createMock({});
expect.assertions(1);

const workRequest = {
uuid: '12345',
socketId: '6789',
experimentId: 'my-experiment',
timeout: '2099-01-01T00:00:00Z',
body: { name: 'GetEmbedding', type: 'umap', config: { distanceMetric: 'euclidean' } },
};

getPipelineStatus.mockImplementationOnce(() => ({
qc: {
...getPipelineStatus.responseTemplates.qc,
status: pipelineConstants.RUNNING,
},
}));

try {
await handleWorkRequest(workRequest, socket);
} catch (e) {
console.log(e.message);

expect(e.message).toMatch(
'Work request can not be handled because pipeline is RUNNING',
);
}
});

it('Triggers pagination when pagination is specified and result is cached already.', async () => {
CacheSingleton.createMock({
'4029461266b19b22d8753895e51c1a8a': { // pragma: allowlist secret
Expand Down
8 changes: 4 additions & 4 deletions tests/api/general-services/pipeline-status.test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const constants = require('../../../src/api/general-services/pipeline-manage/constants');
const pipelineConstants = require('../../../src/api/general-services/pipeline-manage/constants');
const pipelineStatus = require('../../../src/api/general-services/pipeline-status');
const ExperimentService = require('../../../src/api/route-services/experiment');

Expand Down Expand Up @@ -360,12 +360,12 @@ describe('pipelineStatus', () => {
ExperimentService.mockClear();
});
it('handles properly an empty dynamodb record', async () => {
const status = await pipelineStatus('1234', constants.QC_PROCESS_NAME);
const status = await pipelineStatus('1234', pipelineConstants.QC_PROCESS_NAME);
expect(status).toEqual({
[constants.QC_PROCESS_NAME]: {
[pipelineConstants.QC_PROCESS_NAME]: {
startDate: null,
stopDate: null,
status: 'NotCreated',
status: pipelineConstants.NOT_CREATED,
completedSteps: [],
},
});
Expand Down