diff --git a/src/api/general-services/backend-status.js b/src/api/general-services/backend-status.js index 4357691fe..f105e10e6 100644 --- a/src/api/general-services/backend-status.js +++ b/src/api/general-services/backend-status.js @@ -1,13 +1,17 @@ -const pipelineStatus = require('./pipeline-status'); -const workerStatus = require('./worker-status'); - +const constants = require('./pipeline-manage/constants'); +const getPipelineStatus = require('./pipeline-status'); +const getWorkerStatus = require('./worker-status'); const getBackendStatus = async (experimentId) => { - const [{ pipeline }, { worker }] = await Promise.all( - [pipelineStatus(experimentId), workerStatus(experimentId)], + const [{ pipeline }, { gem2s }, { worker }] = await Promise.all( + [ + getPipelineStatus(experimentId, constants.QC_PROCESS_NAME), + getPipelineStatus(experimentId, constants.GEM2S_PROCESS_NAME), + getWorkerStatus(experimentId)], ); return { pipeline, + gem2s, worker, }; }; diff --git a/src/api/general-services/pipeline-status.js b/src/api/general-services/pipeline-status.js index b329c3c60..0e7ffae07 100644 --- a/src/api/general-services/pipeline-status.js +++ b/src/api/general-services/pipeline-status.js @@ -5,7 +5,10 @@ const config = require('../../config'); const logger = require('../../utils/logging'); -const privateSteps = ['DeleteCompletedPipelineWorker', 'LaunchNewPipelineWorker']; +const privateSteps = [ + 'DeleteCompletedPipelineWorker', 'LaunchNewPipelineWorker', + 'DeleteCompletedGem2SWorker', 'LaunchNewGem2SWorker', +]; const getStepsFromExecutionHistory = (events) => { class Branch { @@ -97,13 +100,15 @@ const getStepsFromExecutionHistory = (events) => { /* * Return `completedSteps` of the state machine (SM) associated to the `experimentId`'s pipeline * The code assumes that - * - the relevant states for the steps are defined within a Map of the SM - * - the relevant Map is the first Map in the SM * - a step is only considered completed if it has been completed for all iteration of the Map * - steps are returned in the completion order, and are unique in the returned array */ -const getPipelineStatus = async (experimentId) => { - const { executionArn } = await (new ExperimentService()).getPipelineHandle(experimentId); +const getPipelineStatus = async (experimentId, pipelineType) => { + const experimentService = new ExperimentService(); + + const pipelinesHandles = await experimentService.getPipelinesHandles(experimentId); + const { executionArn } = pipelinesHandles[pipelineType]; + let execution = {}; let completedSteps = []; if (!executionArn.length) { @@ -121,7 +126,6 @@ const getPipelineStatus = async (experimentId) => { executionArn, }).promise(); - /* eslint-disable no-await-in-loop */ let events = []; let nextToken; @@ -142,7 +146,7 @@ const getPipelineStatus = async (experimentId) => { } const response = { - pipeline: { + [pipelineType]: { startDate: execution.startDate, stopDate: execution.stopDate, status: execution.status, diff --git a/src/api/route-services/experiment.js b/src/api/route-services/experiment.js index 57f05ae89..49ae16c0d 100644 --- a/src/api/route-services/experiment.js +++ b/src/api/route-services/experiment.js @@ -5,6 +5,8 @@ const AWS = require('../../utils/requireAWS'); const logger = require('../../utils/logging'); const { OK, NotFoundError } = require('../../utils/responses'); +const constants = require('../general-services/pipeline-manage/constants'); + const { createDynamoDbInstance, convertToJsObject, convertToDynamoDbRecord, configArrayToUpdateObjs, @@ -112,27 +114,48 @@ class ExperimentService { async updateExperiment(experimentId, body) { const dynamodb = createDynamoDbInstance(); - const key = convertToDynamoDbRecord({ experimentId }); - const marshalledData = convertToDynamoDbRecord({ - ':experimentName': body.name, - ':createdAt': body.createdAt, - ':lastViewed': body.lastViewed, - ':projectId': body.projectUuid, - ':description': body.description, - ':meta': {}, - }); + const dataToUpdate = { + experimentName: body.name || body.experimentName, + apiVersion: body.apiVersion, + createdAt: body.createdAt, + lastViewed: body.lastViewed, + projectId: body.projectUuid || body.projectId, + description: body.description, + meta: body.meta, + processingConfig: body.processingConfig, + }; + + const objectToMarshall = {}; + const updateExpression = Object.entries(dataToUpdate).reduce((acc, [key, val]) => { + if (!val) { + return acc; + } + + const expressionKey = `:${key}`; + objectToMarshall[expressionKey] = val; + + return [...acc, `${key} = ${expressionKey}`]; + }, []); + + // dataToUpdate = dataToUpdate.filter((attribute) => attribute.value); + + // let updateExpression = 'SET '; + + // dataToUpdate.forEach(({ key, value }) => { + // const expressionKey = `:${key}`; + + // objectToMarshall[expressionKey] = value; + // updateExpression += `${key} = ${expressionKey},`; + // }); + + // updateExpression = _.trimEnd(updateExpression, ','); const params = { TableName: this.experimentsTableName, - Key: key, - UpdateExpression: `SET experimentName = :experimentName, - createdAt = :createdAt, - lastViewed = :lastViewed, - projectId = :projectId, - description = :description, - meta = :meta`, - ExpressionAttributeValues: marshalledData, + Key: convertToDynamoDbRecord({ experimentId }), + UpdateExpression: `SET ${updateExpression.join(', ')}`, + ExpressionAttributeValues: convertToDynamoDbRecord(objectToMarshall), ReturnValues: 'UPDATED_NEW', }; @@ -153,12 +176,20 @@ class ExperimentService { return data; } - async getPipelineHandle(experimentId) { + async getPipelinesHandles(experimentId) { const data = await getExperimentAttributes(this.experimentsTableName, experimentId, ['meta']); + return { - stateMachineArn: '', - executionArn: '', - ...data.meta.pipeline, + [constants.QC_PROCESS_NAME]: { + stateMachineArn: '', + executionArn: '', + ...data.meta.pipeline, + }, + [constants.GEM2S_PROCESS_NAME]: { + stateMachineArn: '', + executionArn: '', + ...data.meta.gem2s, + }, }; } diff --git a/src/api/route-services/gem2s-response.js b/src/api/route-services/gem2s-response.js new file mode 100644 index 000000000..97128012d --- /dev/null +++ b/src/api/route-services/gem2s-response.js @@ -0,0 +1,60 @@ +const AWSXRay = require('aws-xray-sdk'); + +const constants = require('../general-services/pipeline-manage/constants'); +const validateRequest = require('../../utils/schema-validator'); +const logger = require('../../utils/logging'); + +const ExperimentService = require('./experiment'); +const SamplesService = require('./samples'); + +const experimentService = new ExperimentService(); +const samplesService = new SamplesService(); +const getPipelineStatus = require('../general-services/pipeline-status'); + +const sendUpdateToSubscribed = async (experimentId, message, io) => { + const statusRes = await getPipelineStatus(experimentId, constants.GEM2S_PROCESS_NAME); + + // How do we handle errors? TODO This needs to be handled + // if (statusRes.gem2s) { + // AWSXRay.getSegment().addError(error); + // io.sockets.emit(`ExperimentUpdates-${experimentId}`, message); + // return; + // } + + // Concatenate into a proper response. + const response = { + ...message, + status: statusRes, + type: 'gem2s', + }; + + logger.log('Sending to all clients subscribed to experiment', experimentId); + io.sockets.emit(`ExperimentUpdates-${experimentId}`, response); +}; + +const gem2sResponse = async (io, message) => { + AWSXRay.getSegment().addMetadata('message', message); + + // Fail hard if there was an error. + await validateRequest(message, 'GEM2SResponse.v1.yaml'); + + const { experimentId } = message; + + if (!message.table) { + await sendUpdateToSubscribed(experimentId, message, io); + return; + } + + const { item, table: tableName } = message; + + if (tableName.includes('experiments')) { + await experimentService.updateExperiment(experimentId, item); + } else if (tableName.includes('samples')) { + const { projectUuid } = item; + await samplesService.updateSamples(projectUuid, item); + } + + await sendUpdateToSubscribed(experimentId, message, io); +}; + +module.exports = gem2sResponse; diff --git a/src/api/route-services/pipeline-response.js b/src/api/route-services/pipeline-response.js index a474b351f..dc2e3ad1a 100644 --- a/src/api/route-services/pipeline-response.js +++ b/src/api/route-services/pipeline-response.js @@ -3,6 +3,8 @@ const AWS = require('../../utils/requireAWS'); const validateRequest = require('../../utils/schema-validator'); const logger = require('../../utils/logging'); +const constants = require('../general-services/pipeline-manage/constants'); + const ExperimentService = require('./experiment'); const PlotsTablesService = require('./plots-tables'); const PipelineHook = require('../../utils/hookRunner'); @@ -10,7 +12,7 @@ const PipelineHook = require('../../utils/hookRunner'); const plotsTableService = new PlotsTablesService(); const experimentService = new ExperimentService(); -const pipelineStatus = require('../general-services/pipeline-status'); +const getPipelineStatus = require('../general-services/pipeline-status'); const embeddingWorkRequest = require('../../utils/hooks/embeddingWorkRequest'); const clusteringWorkRequest = require('../../utils/hooks/clusteringWorkRequest'); @@ -85,7 +87,7 @@ const pipelineResponse = async (io, message) => { ]); } - const statusRes = await pipelineStatus(experimentId); + const statusRes = await getPipelineStatus(experimentId, constants.QC_PROCESS_NAME); pipelineHook.run(taskName, { experimentId, output, diff --git a/src/api/route-services/projects.js b/src/api/route-services/projects.js index 7d5544c2e..51d6b2c9a 100644 --- a/src/api/route-services/projects.js +++ b/src/api/route-services/projects.js @@ -89,7 +89,10 @@ class ProjectsService { throw new NotFoundError('No projects available!'); } - const projectIds = response.Items.map((entry) => convertToJsObject(entry).projectId); + const projectIds = response.Items.map( + (entry) => convertToJsObject(entry).projectId, + ).filter((id) => id); + return this.getProjectsFromIds(new Set(projectIds)); } @@ -110,11 +113,13 @@ class ProjectsService { }; const data = await dynamodb.batchGetItem(params).promise(); + const existingProjectIds = new Set(data.Responses[this.tableName].map((entry) => { const newData = convertToJsObject(entry); return newData.projects.uuid; })); + // Build up projects that do not exist in Dynamo yet. const projects = [...projectIds] .filter((entry) => ( diff --git a/src/api/routes/gem2s.js b/src/api/routes/gem2s.js index 4e18d5695..7ee8ec4a9 100644 --- a/src/api/routes/gem2s.js +++ b/src/api/routes/gem2s.js @@ -1,23 +1,13 @@ const AWSXRay = require('aws-xray-sdk'); const { createGem2SPipeline } = require('../general-services/pipeline-manage'); const ExperimentService = require('../route-services/experiment'); -const getBackendStatus = require('../general-services/backend-status'); -const pipelineResponse = require('../route-services/pipeline-response'); +const gem2sResponse = require('../route-services/gem2s-response'); const parseSNSMessage = require('../../utils/parse-sns-message'); const logger = require('../../utils/logging'); const { expressAuthorizationMiddleware } = require('../../utils/authMiddlewares'); module.exports = { - 'gem2s#get': [ - expressAuthorizationMiddleware, - (req, res, next) => { - getBackendStatus(req.params.experimentId) - .then((data) => res.json(data)) - .catch(next); - }, - ], - 'gem2s#create': [ expressAuthorizationMiddleware, (req, res, next) => { @@ -46,7 +36,7 @@ module.exports = { const { io, parsedMessage } = result; try { - await pipelineResponse(io, parsedMessage); + await gem2sResponse(io, parsedMessage); } catch (e) { logger.error( 'Pipeline response handler failed with error: ', e, diff --git a/src/api/routes/pipelines.js b/src/api/routes/pipelines.js index 961e2eb72..3f4b3ea7b 100644 --- a/src/api/routes/pipelines.js +++ b/src/api/routes/pipelines.js @@ -11,6 +11,7 @@ module.exports = { 'pipelines#get': [ expressAuthorizationMiddleware, (req, res, next) => { + // The changes to add gem2s status will be obsoleted once agi's PR is merged in getBackendStatus(req.params.experimentId) .then((data) => res.json(data)) .catch(next); diff --git a/src/specs/api.yaml b/src/specs/api.yaml index 7bd9acbe0..508f9653f 100644 --- a/src/specs/api.yaml +++ b/src/specs/api.yaml @@ -101,6 +101,49 @@ paths: tags: - work parameters: [] + /gem2sResults: + post: + operationId: receiveGem2sResponse + x-eov-operation-id: 'gem2s#response' + x-eov-operation-handler: routes/gem2s + requestBody: + description: 'The results from the execution of a pipeline step, sent via SNS.' + required: true + content: + text/plain: + schema: + type: string + examples: {} + application/json: + schema: + type: object + properties: {} + examples: {} + responses: + '200': + description: 'A JSON-parseable was received by the server, *irrespective of whether it was correct/acceptable or not*.' + content: + text/plain: + schema: + type: string + pattern: ok + '500': + description: The data sent by the server could not be parsed as JSON. + content: + text/plain: + schema: + type: string + pattern: nok + description: |- + Results from Step Function pipeline steps are relayed to the API through this endpoint. + + Note that this endpoint is only exposed to AWS SNS, and since it has a specific communication protocol with limited feedback, the schema defined here is designed to be liberally enforceable. This endpoint is also used by SNS to handle subscribe/unsubscribe events. + + The actual JSON passed by SNS is found in the `PipelineResponse` model, which is to be validated by the API. + summary: Retrieve results from pipeline step functions + tags: + - work + parameters: [] /pipelineResults: post: operationId: receivePipelineResponse diff --git a/src/specs/models/GEM2SResponse.v1.yaml b/src/specs/models/GEM2SResponse.v1.yaml new file mode 100644 index 000000000..5e844b61f --- /dev/null +++ b/src/specs/models/GEM2SResponse.v1.yaml @@ -0,0 +1,23 @@ +title: Pipeline response +description: This is the format the gem2s clients communicate the result of a gem2s run. +type: object +properties: + experimentId: + type: string + status: + type: string + pattern: OK + oneOf: + - item: + $ref: ./api-body-schemas/Experiment.v1.yaml + table: + type: string + pattern: 'experiments' + - item: + $ref: ./api-body-schemas/Samples.v1.yaml + table: + type: string + pattern: 'samples' +required: + - experimentId + - status \ No newline at end of file diff --git a/src/specs/models/api-body-schemas/Experiment.v1.yaml b/src/specs/models/api-body-schemas/Experiment.v1.yaml index 8f8b3a561..bbb17f965 100644 --- a/src/specs/models/api-body-schemas/Experiment.v1.yaml +++ b/src/specs/models/api-body-schemas/Experiment.v1.yaml @@ -1,6 +1,6 @@ title: Experiment +description: 'An experiment object as stored in dynamodb' type: object properties: experimentId: - type: string -description: '' + type: string \ No newline at end of file diff --git a/src/specs/models/api-body-schemas/Samples.v1.yaml b/src/specs/models/api-body-schemas/Samples.v1.yaml index 41932e2bf..50c8e88f9 100644 --- a/src/specs/models/api-body-schemas/Samples.v1.yaml +++ b/src/specs/models/api-body-schemas/Samples.v1.yaml @@ -32,4 +32,6 @@ additionalProperties: items: type: string files: - type: object \ No newline at end of file + type: object +required: + - ids \ No newline at end of file diff --git a/src/utils/schema-validator.js b/src/utils/schema-validator.js index 3c9e8be22..bb8bf4159 100644 --- a/src/utils/schema-validator.js +++ b/src/utils/schema-validator.js @@ -11,10 +11,6 @@ const logger = require('./logging'); const validateRequest = async (request, schemaPath) => { const specPath = path.resolve(__dirname, '..', 'specs', 'models', schemaPath); - console.log('received validation request'); - console.log({ request }); - console.log({ schemaPath }); - console.log({ specPath }); // Create a custom Swagger client and 'HTTP fetcher' mock so we can load in // our spec spread across multiple local files. The result is the entire spec // fully resolved across all refs and imports. diff --git a/tests/api/general-services/pipeline-status.test.js b/tests/api/general-services/pipeline-status.test.js index 9dda4834b..593b4e1a4 100644 --- a/tests/api/general-services/pipeline-status.test.js +++ b/tests/api/general-services/pipeline-status.test.js @@ -1,3 +1,4 @@ +const constants = require('../../../src/api/general-services/pipeline-manage/constants'); const pipelineStatus = require('../../../src/api/general-services/pipeline-status'); const ExperimentService = require('../../../src/api/route-services/experiment'); @@ -336,21 +337,32 @@ describe('getStepsFromExecutionHistory', () => { }); }); -jest.mock('../../../src/api/route-services/experiment', () => jest.fn().mockImplementation(() => ({ - getPipelineHandle: () => ({ - stateMachineArn: '', - executionArn: '', - }), -}))); +jest.mock('../../../src/api/route-services/experiment', () => jest.fn().mockImplementation(() => { + // eslint-disable-next-line global-require + const internalConstants = require('../../../src/api/general-services/pipeline-manage/constants'); + + return { + getPipelinesHandles: () => ({ + [internalConstants.GEM2S_PROCESS_NAME]: { + stateMachineArn: '', + executionArn: '', + }, + [internalConstants.QC_PROCESS_NAME]: { + stateMachineArn: '', + executionArn: '', + }, + }), + }; +})); describe('pipelineStatus', () => { beforeEach(() => { ExperimentService.mockClear(); }); it('handles properly an empty dynamodb record', async () => { - const status = await pipelineStatus('1234'); + const status = await pipelineStatus('1234', constants.QC_PROCESS_NAME); expect(status).toEqual({ - pipeline: { + [constants.QC_PROCESS_NAME]: { startDate: null, stopDate: null, status: 'NotCreated', diff --git a/tests/api/route-services/experiment.test.js b/tests/api/route-services/experiment.test.js index 2b646bb54..29c80ab1c 100644 --- a/tests/api/route-services/experiment.test.js +++ b/tests/api/route-services/experiment.test.js @@ -1,6 +1,8 @@ const AWSMock = require('aws-sdk-mock'); const AWS = require('../../../src/utils/requireAWS'); +const constants = require('../../../src/api/general-services/pipeline-manage/constants'); + const ExperimentService = require('../../../src/api/route-services/experiment'); const { mockDynamoGetItem, @@ -197,14 +199,21 @@ describe('tests for the experiment service', () => { it('Get Pipeline Handle works', async (done) => { const handle = { - stateMachineArn: 'STATE-MACHINE-ID', - executionArn: '', + [constants.GEM2S_PROCESS_NAME]: { + executionArn: '', + stateMachineArn: '', + }, + [constants.QC_PROCESS_NAME]: { + executionArn: '', + stateMachineArn: 'STATE-MACHINE-ID', + }, }; + const jsData = { meta: { pipeline: { - stateMachineArn: handle.stateMachineArn, + stateMachineArn: handle[constants.QC_PROCESS_NAME].stateMachineArn, }, organism: 'mmusculus', type: '10x', @@ -213,7 +222,7 @@ describe('tests for the experiment service', () => { const getItemSpy = mockDynamoGetItem(jsData); - (new ExperimentService()).getPipelineHandle('12345') + (new ExperimentService()).getPipelinesHandles('12345') .then((data) => { expect(data).toEqual(handle); expect(getItemSpy).toHaveBeenCalledWith(