Skip to content

Commit

Permalink
Handle gem2s response (#127)
Browse files Browse the repository at this point in the history
* created gem2s endpoint

* added skeleton parameter to state-machine-definition tests

* passing all required params

* fixed qc-pipeline skeleton

* fixed gem2s pipeline skeleton name

* renamed ARN to avoid clashes

* converted gem2s pipeline steps into normal ones instead of maps

* added last steps for gem2s

* removed testing clutter

* fixed some tests

* Save gem2s pipeline handle

* Save response to dynamodb

* Add gem2s handle and status report to status

* Fix dynamodb updates to experiments and other minor things

* Fix tests and minor bugs

* Minor fixes

Co-authored-by: Pol Alvarez <[email protected]>
Co-authored-by: Martin Fosco <[email protected]>
  • Loading branch information
3 people authored May 25, 2021
1 parent c50f429 commit bf1d71b
Show file tree
Hide file tree
Showing 15 changed files with 249 additions and 67 deletions.
14 changes: 9 additions & 5 deletions src/api/general-services/backend-status.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
const pipelineStatus = require('./pipeline-status');
const workerStatus = require('./worker-status');

const constants = require('./pipeline-manage/constants');
const getPipelineStatus = require('./pipeline-status');
const getWorkerStatus = require('./worker-status');

const getBackendStatus = async (experimentId) => {
const [{ pipeline }, { worker }] = await Promise.all(
[pipelineStatus(experimentId), workerStatus(experimentId)],
const [{ pipeline }, { gem2s }, { worker }] = await Promise.all(
[
getPipelineStatus(experimentId, constants.QC_PROCESS_NAME),
getPipelineStatus(experimentId, constants.GEM2S_PROCESS_NAME),
getWorkerStatus(experimentId)],
);
return {
pipeline,
gem2s,
worker,
};
};
Expand Down
18 changes: 11 additions & 7 deletions src/api/general-services/pipeline-status.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ const config = require('../../config');
const logger = require('../../utils/logging');


const privateSteps = ['DeleteCompletedPipelineWorker', 'LaunchNewPipelineWorker'];
const privateSteps = [
'DeleteCompletedPipelineWorker', 'LaunchNewPipelineWorker',
'DeleteCompletedGem2SWorker', 'LaunchNewGem2SWorker',
];

const getStepsFromExecutionHistory = (events) => {
class Branch {
Expand Down Expand Up @@ -97,13 +100,15 @@ const getStepsFromExecutionHistory = (events) => {
/*
* Return `completedSteps` of the state machine (SM) associated to the `experimentId`'s pipeline
* The code assumes that
* - the relevant states for the steps are defined within a Map of the SM
* - the relevant Map is the first Map in the SM
* - a step is only considered completed if it has been completed for all iteration of the Map
* - steps are returned in the completion order, and are unique in the returned array
*/
const getPipelineStatus = async (experimentId) => {
const { executionArn } = await (new ExperimentService()).getPipelineHandle(experimentId);
const getPipelineStatus = async (experimentId, pipelineType) => {
const experimentService = new ExperimentService();

const pipelinesHandles = await experimentService.getPipelinesHandles(experimentId);
const { executionArn } = pipelinesHandles[pipelineType];

let execution = {};
let completedSteps = [];
if (!executionArn.length) {
Expand All @@ -121,7 +126,6 @@ const getPipelineStatus = async (experimentId) => {
executionArn,
}).promise();


/* eslint-disable no-await-in-loop */
let events = [];
let nextToken;
Expand All @@ -142,7 +146,7 @@ const getPipelineStatus = async (experimentId) => {
}

const response = {
pipeline: {
[pipelineType]: {
startDate: execution.startDate,
stopDate: execution.stopDate,
status: execution.status,
Expand Down
73 changes: 52 additions & 21 deletions src/api/route-services/experiment.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ const AWS = require('../../utils/requireAWS');
const logger = require('../../utils/logging');

const { OK, NotFoundError } = require('../../utils/responses');
const constants = require('../general-services/pipeline-manage/constants');


const {
createDynamoDbInstance, convertToJsObject, convertToDynamoDbRecord, configArrayToUpdateObjs,
Expand Down Expand Up @@ -112,27 +114,48 @@ class ExperimentService {

async updateExperiment(experimentId, body) {
const dynamodb = createDynamoDbInstance();
const key = convertToDynamoDbRecord({ experimentId });

const marshalledData = convertToDynamoDbRecord({
':experimentName': body.name,
':createdAt': body.createdAt,
':lastViewed': body.lastViewed,
':projectId': body.projectUuid,
':description': body.description,
':meta': {},
});
const dataToUpdate = {
experimentName: body.name || body.experimentName,
apiVersion: body.apiVersion,
createdAt: body.createdAt,
lastViewed: body.lastViewed,
projectId: body.projectUuid || body.projectId,
description: body.description,
meta: body.meta,
processingConfig: body.processingConfig,
};

const objectToMarshall = {};
const updateExpression = Object.entries(dataToUpdate).reduce((acc, [key, val]) => {
if (!val) {
return acc;
}

const expressionKey = `:${key}`;
objectToMarshall[expressionKey] = val;

return [...acc, `${key} = ${expressionKey}`];
}, []);

// dataToUpdate = dataToUpdate.filter((attribute) => attribute.value);

// let updateExpression = 'SET ';

// dataToUpdate.forEach(({ key, value }) => {
// const expressionKey = `:${key}`;

// objectToMarshall[expressionKey] = value;
// updateExpression += `${key} = ${expressionKey},`;
// });

// updateExpression = _.trimEnd(updateExpression, ',');

const params = {
TableName: this.experimentsTableName,
Key: key,
UpdateExpression: `SET experimentName = :experimentName,
createdAt = :createdAt,
lastViewed = :lastViewed,
projectId = :projectId,
description = :description,
meta = :meta`,
ExpressionAttributeValues: marshalledData,
Key: convertToDynamoDbRecord({ experimentId }),
UpdateExpression: `SET ${updateExpression.join(', ')}`,
ExpressionAttributeValues: convertToDynamoDbRecord(objectToMarshall),
ReturnValues: 'UPDATED_NEW',
};

Expand All @@ -153,12 +176,20 @@ class ExperimentService {
return data;
}

async getPipelineHandle(experimentId) {
async getPipelinesHandles(experimentId) {
const data = await getExperimentAttributes(this.experimentsTableName, experimentId, ['meta']);

return {
stateMachineArn: '',
executionArn: '',
...data.meta.pipeline,
[constants.QC_PROCESS_NAME]: {
stateMachineArn: '',
executionArn: '',
...data.meta.pipeline,
},
[constants.GEM2S_PROCESS_NAME]: {
stateMachineArn: '',
executionArn: '',
...data.meta.gem2s,
},
};
}

Expand Down
60 changes: 60 additions & 0 deletions src/api/route-services/gem2s-response.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
const AWSXRay = require('aws-xray-sdk');

const constants = require('../general-services/pipeline-manage/constants');
const validateRequest = require('../../utils/schema-validator');
const logger = require('../../utils/logging');

const ExperimentService = require('./experiment');
const SamplesService = require('./samples');

const experimentService = new ExperimentService();
const samplesService = new SamplesService();
const getPipelineStatus = require('../general-services/pipeline-status');

const sendUpdateToSubscribed = async (experimentId, message, io) => {
const statusRes = await getPipelineStatus(experimentId, constants.GEM2S_PROCESS_NAME);

// How do we handle errors? TODO This needs to be handled
// if (statusRes.gem2s) {
// AWSXRay.getSegment().addError(error);
// io.sockets.emit(`ExperimentUpdates-${experimentId}`, message);
// return;
// }

// Concatenate into a proper response.
const response = {
...message,
status: statusRes,
type: 'gem2s',
};

logger.log('Sending to all clients subscribed to experiment', experimentId);
io.sockets.emit(`ExperimentUpdates-${experimentId}`, response);
};

const gem2sResponse = async (io, message) => {
AWSXRay.getSegment().addMetadata('message', message);

// Fail hard if there was an error.
await validateRequest(message, 'GEM2SResponse.v1.yaml');

const { experimentId } = message;

if (!message.table) {
await sendUpdateToSubscribed(experimentId, message, io);
return;
}

const { item, table: tableName } = message;

if (tableName.includes('experiments')) {
await experimentService.updateExperiment(experimentId, item);
} else if (tableName.includes('samples')) {
const { projectUuid } = item;
await samplesService.updateSamples(projectUuid, item);
}

await sendUpdateToSubscribed(experimentId, message, io);
};

module.exports = gem2sResponse;
6 changes: 4 additions & 2 deletions src/api/route-services/pipeline-response.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ const AWS = require('../../utils/requireAWS');
const validateRequest = require('../../utils/schema-validator');
const logger = require('../../utils/logging');

const constants = require('../general-services/pipeline-manage/constants');

const ExperimentService = require('./experiment');
const PlotsTablesService = require('./plots-tables');
const PipelineHook = require('../../utils/hookRunner');

const plotsTableService = new PlotsTablesService();
const experimentService = new ExperimentService();

const pipelineStatus = require('../general-services/pipeline-status');
const getPipelineStatus = require('../general-services/pipeline-status');
const embeddingWorkRequest = require('../../utils/hooks/embeddingWorkRequest');
const clusteringWorkRequest = require('../../utils/hooks/clusteringWorkRequest');

Expand Down Expand Up @@ -85,7 +87,7 @@ const pipelineResponse = async (io, message) => {
]);
}

const statusRes = await pipelineStatus(experimentId);
const statusRes = await getPipelineStatus(experimentId, constants.QC_PROCESS_NAME);
pipelineHook.run(taskName, {
experimentId,
output,
Expand Down
7 changes: 6 additions & 1 deletion src/api/route-services/projects.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ class ProjectsService {
throw new NotFoundError('No projects available!');
}

const projectIds = response.Items.map((entry) => convertToJsObject(entry).projectId);
const projectIds = response.Items.map(
(entry) => convertToJsObject(entry).projectId,
).filter((id) => id);

return this.getProjectsFromIds(new Set(projectIds));
}

Expand All @@ -110,11 +113,13 @@ class ProjectsService {
};

const data = await dynamodb.batchGetItem(params).promise();

const existingProjectIds = new Set(data.Responses[this.tableName].map((entry) => {
const newData = convertToJsObject(entry);
return newData.projects.uuid;
}));


// Build up projects that do not exist in Dynamo yet.
const projects = [...projectIds]
.filter((entry) => (
Expand Down
14 changes: 2 additions & 12 deletions src/api/routes/gem2s.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,13 @@
const AWSXRay = require('aws-xray-sdk');
const { createGem2SPipeline } = require('../general-services/pipeline-manage');
const ExperimentService = require('../route-services/experiment');
const getBackendStatus = require('../general-services/backend-status');
const pipelineResponse = require('../route-services/pipeline-response');
const gem2sResponse = require('../route-services/gem2s-response');
const parseSNSMessage = require('../../utils/parse-sns-message');
const logger = require('../../utils/logging');

const { expressAuthorizationMiddleware } = require('../../utils/authMiddlewares');

module.exports = {
'gem2s#get': [
expressAuthorizationMiddleware,
(req, res, next) => {
getBackendStatus(req.params.experimentId)
.then((data) => res.json(data))
.catch(next);
},
],

'gem2s#create': [
expressAuthorizationMiddleware,
(req, res, next) => {
Expand Down Expand Up @@ -46,7 +36,7 @@ module.exports = {
const { io, parsedMessage } = result;

try {
await pipelineResponse(io, parsedMessage);
await gem2sResponse(io, parsedMessage);
} catch (e) {
logger.error(
'Pipeline response handler failed with error: ', e,
Expand Down
1 change: 1 addition & 0 deletions src/api/routes/pipelines.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module.exports = {
'pipelines#get': [
expressAuthorizationMiddleware,
(req, res, next) => {
// The changes to add gem2s status will be obsoleted once agi's PR is merged in
getBackendStatus(req.params.experimentId)
.then((data) => res.json(data))
.catch(next);
Expand Down
43 changes: 43 additions & 0 deletions src/specs/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,49 @@ paths:
tags:
- work
parameters: []
/gem2sResults:
post:
operationId: receiveGem2sResponse
x-eov-operation-id: 'gem2s#response'
x-eov-operation-handler: routes/gem2s
requestBody:
description: 'The results from the execution of a pipeline step, sent via SNS.'
required: true
content:
text/plain:
schema:
type: string
examples: {}
application/json:
schema:
type: object
properties: {}
examples: {}
responses:
'200':
description: 'A JSON-parseable was received by the server, *irrespective of whether it was correct/acceptable or not*.'
content:
text/plain:
schema:
type: string
pattern: ok
'500':
description: The data sent by the server could not be parsed as JSON.
content:
text/plain:
schema:
type: string
pattern: nok
description: |-
Results from Step Function pipeline steps are relayed to the API through this endpoint.
Note that this endpoint is only exposed to AWS SNS, and since it has a specific communication protocol with limited feedback, the schema defined here is designed to be liberally enforceable. This endpoint is also used by SNS to handle subscribe/unsubscribe events.
The actual JSON passed by SNS is found in the `PipelineResponse` model, which is to be validated by the API.
summary: Retrieve results from pipeline step functions
tags:
- work
parameters: []
/pipelineResults:
post:
operationId: receivePipelineResponse
Expand Down
Loading

0 comments on commit bf1d71b

Please sign in to comment.