diff --git a/src/api/event-services/work-request.js b/src/api/event-services/work-request.js index cd4a736fa..ac4002928 100644 --- a/src/api/event-services/work-request.js +++ b/src/api/event-services/work-request.js @@ -5,10 +5,24 @@ const { cacheGetRequest } = require('../../utils/cache-request'); const { CacheMissError } = require('../../cache/cache-utils'); const { handlePagination } = require('../../utils/handlePagination'); const validateRequest = require('../../utils/schema-validator'); +const getPipelineStatus = require('../general-services/pipeline-status'); +const pipelineConstants = require('../general-services/pipeline-manage/constants'); const handleWorkRequest = async (workRequest, socket) => { - const { uuid, pagination } = workRequest; + const { uuid, pagination, experimentId } = workRequest; + + // Check if pipeline is runnning + const { qc: { status: qcPipelineStatus } } = await getPipelineStatus( + experimentId, pipelineConstants.QC_PROCESS_NAME, + ); + + if (qcPipelineStatus !== pipelineConstants.SUCCEEDED) { + const e = new Error(`Work request can not be handled because pipeline is ${qcPipelineStatus}`); + + AWSXRay.getSegment().addError(e); + throw e; + } try { logger.log(`Trying to fetch response to request ${uuid} from cache...`); diff --git a/src/api/general-services/__mocks__/backend-status.js b/src/api/general-services/__mocks__/backend-status.js index b6f056ba2..589f37538 100644 --- a/src/api/general-services/__mocks__/backend-status.js +++ b/src/api/general-services/__mocks__/backend-status.js @@ -1,21 +1,23 @@ +const pipelineConstants = require('../pipeline-manage/constants'); + const response = { gem2s: { completedSteps: [], startDate: null, - status: 'NotCreated', + status: pipelineConstants.NOT_CREATED, stopDate: null, }, qc: { completedSteps: [], startDate: null, - status: 'NotCreated', + status: pipelineConstants.NOT_CREATED, stopDate: null, }, worker: { ready: true, restartCount: 0, started: true, - status: 'Running', + status: pipelineConstants.RUNNING, }, }; diff --git a/src/api/general-services/__mocks__/pipeline-status.js b/src/api/general-services/__mocks__/pipeline-status.js new file mode 100644 index 000000000..ba51c81ac --- /dev/null +++ b/src/api/general-services/__mocks__/pipeline-status.js @@ -0,0 +1,40 @@ +const pipelineConstants = require('../pipeline-manage/constants'); + +const responseTemplates = { + gem2s: { + completedSteps: [], + startDate: null, + status: pipelineConstants.SUCCEEDED, + stopDate: null, + }, + qc: { + completedSteps: [], + startDate: null, + status: pipelineConstants.SUCCEEDED, + stopDate: null, + }, + worker: { + ready: true, + restartCount: 0, + started: true, + status: pipelineConstants.RUNNING, + }, +}; + +const mockGetPipelineStatus = jest.fn( + (experimentId, processName) => new Promise((resolve, reject) => { + if (experimentId === 'nonExistentId') { + const err = new Error('Unkonwn project or sample'); + err.status = 404; + + reject(err); + } + + const response = { [processName]: responseTemplates[processName] }; + + resolve(response); + }), +); + +module.exports = mockGetPipelineStatus; +module.exports.responseTemplates = responseTemplates; diff --git a/src/api/general-services/pipeline-manage/constants.js b/src/api/general-services/pipeline-manage/constants.js index 8f9687075..696b02c1a 100644 --- a/src/api/general-services/pipeline-manage/constants.js +++ b/src/api/general-services/pipeline-manage/constants.js @@ -1,6 +1,29 @@ - +// Pipeline names const QC_PROCESS_NAME = 'qc'; const GEM2S_PROCESS_NAME = 'gem2s'; const OLD_QC_NAME_TO_BE_REMOVED = 'pipeline'; -module.exports = { QC_PROCESS_NAME, GEM2S_PROCESS_NAME, OLD_QC_NAME_TO_BE_REMOVED }; +// Pipeline states as defined in +// https://docs.aws.amazon.com/step-functions/latest/apireference/API_DescribeExecution.html +const RUNNING = 'RUNNING'; +const FAILED = 'FAILED'; +const TIMED_OUT = 'TIMED_OUT'; +const ABORTED = 'ABORTED'; +const SUCCEEDED = 'SUCCEEDED'; + +// Custom defined statuses defined in the API +const NOT_CREATED = 'NOT_CREATED'; + +// Additional custom statuses + +module.exports = { + QC_PROCESS_NAME, + GEM2S_PROCESS_NAME, + OLD_QC_NAME_TO_BE_REMOVED, + RUNNING, + FAILED, + TIMED_OUT, + ABORTED, + SUCCEEDED, + NOT_CREATED, +}; diff --git a/src/api/general-services/pipeline-status.js b/src/api/general-services/pipeline-status.js index 9e3d08d5e..602952dc1 100644 --- a/src/api/general-services/pipeline-status.js +++ b/src/api/general-services/pipeline-status.js @@ -3,6 +3,7 @@ const AWS = require('../../utils/requireAWS'); const ExperimentService = require('../route-services/experiment'); const config = require('../../config'); const logger = require('../../utils/logging'); +const pipelineConstants = require('./pipeline-manage/constants'); const privateSteps = [ @@ -116,7 +117,7 @@ const getPipelineStatus = async (experimentId, processName) => { execution = { startDate: null, stopDate: null, - status: 'NotCreated', + status: pipelineConstants.NOT_CREATED, }; } else { const stepFunctions = new AWS.StepFunctions({ diff --git a/tests/api/event-services/work-request.test.js b/tests/api/event-services/work-request.test.js index dc411cbe9..f1cca58d1 100644 --- a/tests/api/event-services/work-request.test.js +++ b/tests/api/event-services/work-request.test.js @@ -2,7 +2,10 @@ const MockSocket = require('socket.io-mock'); const handleWorkRequest = require('../../../src/api/event-services/work-request'); const handlePagination = require('../../../src/utils/handlePagination'); const CacheSingleton = require('../../../src/cache'); +const getPipelineStatus = require('../../../src/api/general-services/pipeline-status'); +const pipelineConstants = require('../../../src/api/general-services/pipeline-manage/constants'); +jest.mock('../../../src/api/general-services/pipeline-status'); jest.mock('../../../src/utils/handlePagination'); jest.mock('../../../src/cache'); @@ -129,6 +132,37 @@ describe('handleWorkRequest', () => { } }); + it('Throws if pipeline is not yet done or have failed', async () => { + // Initialize with an empty cache so a worker hit will be encountered. + CacheSingleton.createMock({}); + expect.assertions(1); + + const workRequest = { + uuid: '12345', + socketId: '6789', + experimentId: 'my-experiment', + timeout: '2099-01-01T00:00:00Z', + body: { name: 'GetEmbedding', type: 'umap', config: { distanceMetric: 'euclidean' } }, + }; + + getPipelineStatus.mockImplementationOnce(() => ({ + qc: { + ...getPipelineStatus.responseTemplates.qc, + status: pipelineConstants.RUNNING, + }, + })); + + try { + await handleWorkRequest(workRequest, socket); + } catch (e) { + console.log(e.message); + + expect(e.message).toMatch( + 'Work request can not be handled because pipeline is RUNNING', + ); + } + }); + it('Triggers pagination when pagination is specified and result is cached already.', async () => { CacheSingleton.createMock({ '4029461266b19b22d8753895e51c1a8a': { // pragma: allowlist secret diff --git a/tests/api/general-services/pipeline-status.test.js b/tests/api/general-services/pipeline-status.test.js index 593b4e1a4..0c08973e2 100644 --- a/tests/api/general-services/pipeline-status.test.js +++ b/tests/api/general-services/pipeline-status.test.js @@ -1,4 +1,4 @@ -const constants = require('../../../src/api/general-services/pipeline-manage/constants'); +const pipelineConstants = require('../../../src/api/general-services/pipeline-manage/constants'); const pipelineStatus = require('../../../src/api/general-services/pipeline-status'); const ExperimentService = require('../../../src/api/route-services/experiment'); @@ -360,12 +360,12 @@ describe('pipelineStatus', () => { ExperimentService.mockClear(); }); it('handles properly an empty dynamodb record', async () => { - const status = await pipelineStatus('1234', constants.QC_PROCESS_NAME); + const status = await pipelineStatus('1234', pipelineConstants.QC_PROCESS_NAME); expect(status).toEqual({ - [constants.QC_PROCESS_NAME]: { + [pipelineConstants.QC_PROCESS_NAME]: { startDate: null, stopDate: null, - status: 'NotCreated', + status: pipelineConstants.NOT_CREATED, completedSteps: [], }, });