diff --git a/src/api/general-services/backend-status.js b/src/api/general-services/backend-status.js index c6933b2a9..4357691fe 100644 --- a/src/api/general-services/backend-status.js +++ b/src/api/general-services/backend-status.js @@ -1,78 +1,14 @@ -const k8s = require('@kubernetes/client-node'); -const crypto = require('crypto'); -const config = require('../../config'); +const pipelineStatus = require('./pipeline-status'); +const workerStatus = require('./worker-status'); -const kc = new k8s.KubeConfig(); -kc.loadFromDefault(); - const getBackendStatus = async (experimentId) => { - const response = { - worker: { - status: 'NotLaunched', - started: false, - ready: false, - restartCount: 0, - }, - }; - - // This will not work in development as we are not in a cluster. - // Always show the worker as 'up'. - if (config.clusterEnv === 'development') { - return { - ...response, - worker: { - ...response.worker, - status: 'Running', - started: true, - ready: true, - }, - }; - } - - const workerHash = crypto - .createHash('sha1') - .update(`${experimentId}-${config.sandboxId}`) - .digest('hex'); - - const coreApi = kc.makeApiClient(k8s.CoreV1Api); - - // Get worker status - const podList = await coreApi.listNamespacedPod( - `worker-${config.sandboxId}`, - undefined, undefined, undefined, undefined, - `job-name=worker-${workerHash}-job`, + const [{ pipeline }, { worker }] = await Promise.all( + [pipelineStatus(experimentId), workerStatus(experimentId)], ); - - const workerDetails = podList.body.items[0]; - - if (!workerDetails) { - return response; - } - - response.worker.status = workerDetails.status.phase; - - let containerStatus = {}; - - if (workerDetails.status.containerStatuses.length >= 0) { - containerStatus = workerDetails.status.containerStatuses.reduce((accumulator, current) => ({ - ...accumulator, - started: accumulator.started && current.started, - ready: accumulator.ready && current.ready, - restartCount: Math.max(accumulator.restartCount, current.restartCount), - }), { - started: true, - ready: true, - restartCount: -111, - }); - } - return { - ...response, - worker: { - ...response.worker, - ...containerStatus, - }, + pipeline, + worker, }; }; diff --git a/src/api/general-services/pipeline-manage/constructors/create-new-job-if-not-exist.js b/src/api/general-services/pipeline-manage/constructors/create-new-job-if-not-exist.js index 0e83514ea..a150f9f47 100644 --- a/src/api/general-services/pipeline-manage/constructors/create-new-job-if-not-exist.js +++ b/src/api/general-services/pipeline-manage/constructors/create-new-job-if-not-exist.js @@ -19,6 +19,13 @@ const createNewJobIfNotExist = (context, step) => { detached: true, }, }, + Catch: [ + { + ErrorEquals: ['States.ALL'], + ResultPath: '$.error-info', + Next: step.XNextOnCatch || step.Next, + }, + ], }; } @@ -70,7 +77,7 @@ const createNewJobIfNotExist = (context, step) => { { ErrorEquals: ['EKS.409'], ResultPath: '$.error-info', - Next: step.Next, + Next: step.XNextOnCatch || step.Next, }, ], }; diff --git a/src/api/general-services/pipeline-manage/constructors/create-new-step.js b/src/api/general-services/pipeline-manage/constructors/create-new-step.js index 2381e67d7..39f3b393e 100644 --- a/src/api/general-services/pipeline-manage/constructors/create-new-step.js +++ b/src/api/general-services/pipeline-manage/constructors/create-new-step.js @@ -28,6 +28,13 @@ const createNewStep = (context, step, args) => { detached: false, }, }, + Catch: [ + { + ErrorEquals: ['States.ALL'], + ResultPath: '$.error-info', + Next: step.XNextOnCatch || step.Next, + }, + ], }; } @@ -41,9 +48,8 @@ const createNewStep = (context, step, args) => { ClusterName: clusterInfo.name, CertificateAuthority: clusterInfo.certAuthority, Endpoint: clusterInfo.endpoint, - Method: 'POST', - Path: `/apis/batch/v1/namespaces/${config.workerNamespace}/jobs`, - RequestBody: { + Namespace: config.workerNamespace, + Job: { apiVersion: 'batch/v1', kind: 'Job', metadata: { @@ -82,7 +88,7 @@ const createNewStep = (context, step, args) => { { ErrorEquals: ['EKS.409'], ResultPath: '$.error-info', - Next: step.Next, + Next: step.XNextOnCatch || step.Next, }, ], }; diff --git a/src/api/general-services/pipeline-manage/index.js b/src/api/general-services/pipeline-manage/index.js index dcb12e274..4d46b48e1 100644 --- a/src/api/general-services/pipeline-manage/index.js +++ b/src/api/general-services/pipeline-manage/index.js @@ -100,7 +100,9 @@ const executeStateMachine = async (stateMachineArn) => { const { executionArn } = await stepFunctions.startExecution({ stateMachineArn, - input: '{}', + input: JSON.stringify({ + samples: ['single-branch-in-map-state'], + }), traceHeader: traceId, }).promise(); @@ -143,15 +145,18 @@ const createPipeline = async (experimentId, processingConfigUpdates) => { DeleteCompletedPipelineWorker: { XStepType: 'delete-completed-jobs', Next: 'LaunchNewPipelineWorker', + ResultPath: null, }, LaunchNewPipelineWorker: { XStepType: 'create-new-job-if-not-exist', Next: 'Filters', + ResultPath: null, }, Filters: { - Type: 'Parallel', + Type: 'Map', Next: 'DataIntegration', - Branches: [{ + ItemsPath: '$.samples', + Iterator: { StartAt: 'CellSizeDistributionFilter', States: { CellSizeDistributionFilter: { @@ -187,10 +192,15 @@ const createPipeline = async (experimentId, processingConfigUpdates) => { XConstructorArgs: { taskName: 'doubletScores', }, + XNextOnCatch: 'EndOfMap', + End: true, + }, + EndOfMap: { + Type: 'Pass', End: true, }, }, - }], + }, }, DataIntegration: { XStepType: 'create-new-step', @@ -204,6 +214,11 @@ const createPipeline = async (experimentId, processingConfigUpdates) => { XConstructorArgs: { taskName: 'configureEmbedding', }, + XNextOnCatch: 'EndOfPipeline', + End: true, + }, + EndOfPipeline: { + Type: 'Pass', End: true, }, }, @@ -216,6 +231,7 @@ const createPipeline = async (experimentId, processingConfigUpdates) => { ...constructPipelineStep(context, o), XStepType: undefined, XConstructorArgs: undefined, + XNextOnCatch: undefined, }; } return undefined; diff --git a/src/api/general-services/pipeline-status.js b/src/api/general-services/pipeline-status.js new file mode 100644 index 000000000..210d49241 --- /dev/null +++ b/src/api/general-services/pipeline-status.js @@ -0,0 +1,126 @@ +const AWS = require('../../utils/requireAWS'); +const ExperimentService = require('../route-services/experiment'); +const config = require('../../config'); +const logger = require('../../utils/logging'); + +const getStepsFromExecutionHistory = (history) => { + const { events } = history; + class Branch { + constructor(event, makeRoot) { + this.visited = [event.id]; + this.currentState = ''; + this.completedTasks = []; + this.branches = {}; + this.branchCount = 0; + this.updateCurrentState(event); + if (makeRoot) { + this.visited.push(event.previousEventId); + } + } + + allBranchesStarted() { + return this.branchCount === Object.values(this.branches).length; + } + + updateCurrentState(event) { + if ('stateEnteredEventDetails' in event) { + this.currentState = event.stateEnteredEventDetails.name; + } + } + + nextConsumer(event) { + if (this.visited.includes(event.previousEventId)) { + return this; + } + if (event.type === 'MapStateExited') { + return this; + } + const branches = Object.values(this.branches); + for (let ii = 0; ii < branches.length; ii += 1) { + const candidate = branches[ii]; + const consumer = candidate.nextConsumer(event); + if (consumer) { + return consumer; + } + } + return null; + } + + consume(event) { + if (event.type === 'MapIterationStarted') { + this.branches[event.mapIterationStartedEventDetails.index] = new Branch(event); + } else { + this.visited.push(event.id); + this.updateCurrentState(event); + if (event.type === 'TaskSucceeded') { + this.completedTasks.push(this.currentState); + } else if (event.type === 'MapStateStarted') { + this.branchCount = event.mapStateStartedEventDetails.length; + } + } + } + } + const main = new Branch(events[0], true); + for (let ii = 1; ii < events.length; ii += 1) { + const consumer = main.nextConsumer(events[ii]); + consumer.consume(events[ii]); + } + let shortestCompleted = null; + if (main.allBranchesStarted()) { + const branches = Object.values(main.branches); + for (let ii = 0; ii < branches.length; ii += 1) { + if (!shortestCompleted || branches[ii].completedTasks.length < shortestCompleted.length) { + shortestCompleted = branches[ii].completedTasks; + } + } + } + return shortestCompleted || []; +}; + +/* + * Return the `completedSteps` of the state machine (SM) associated to the `experimentId`'s pipeline + * The code assumes that + * - the relevant states for the steps are defined within a Map of the SM + * - the relevant Map is the first Map in the SM + * - a step is only considered completed if it has been completed for all iteration of the Map + * - steps are returned in the completion order, and are unique in the returned array + */ +const getPipelineStatus = async (experimentId) => { + const { executionArn } = await (new ExperimentService()).getPipelineHandle(experimentId); + let execution = {}; + let completedSteps = []; + if (!executionArn.length) { + execution = { + startDate: null, + stopDate: null, + status: 'NotCreated', + }; + } else { + const stepFunctions = new AWS.StepFunctions({ + region: config.awsRegion, + }); + execution = await stepFunctions.describeExecution({ + executionArn, + }).promise(); + const history = await stepFunctions.getExecutionHistory({ + executionArn, + includeExecutionData: false, + }).promise(); + + completedSteps = getStepsFromExecutionHistory(history); + logger.log(`ExecutionHistory for ARN ${executionArn}: ${history.events.length} events, ${completedSteps.length} completed steps`); + } + + const response = { + pipeline: { + startDate: execution.startDate, + stopDate: execution.stopDate, + status: execution.status, + completedSteps, + }, + }; + return response; +}; + +module.exports = getPipelineStatus; +module.exports.getStepsFromExecutionHistory = getStepsFromExecutionHistory; diff --git a/src/api/general-services/worker-status.js b/src/api/general-services/worker-status.js new file mode 100644 index 000000000..144311aef --- /dev/null +++ b/src/api/general-services/worker-status.js @@ -0,0 +1,79 @@ +const k8s = require('@kubernetes/client-node'); +const crypto = require('crypto'); +const config = require('../../config'); + + +const kc = new k8s.KubeConfig(); +kc.loadFromDefault(); + +const getWorkerStatus = async (experimentId) => { + const response = { + worker: { + status: 'NotLaunched', + started: false, + ready: false, + restartCount: 0, + }, + }; + + // This will not work in development as we are not in a cluster. + // Always show the worker as 'up'. + if (config.clusterEnv === 'development') { + return { + ...response, + worker: { + ...response.worker, + status: 'Running', + started: true, + ready: true, + }, + }; + } + + const workerHash = crypto + .createHash('sha1') + .update(`${experimentId}-${config.sandboxId}`) + .digest('hex'); + + const coreApi = kc.makeApiClient(k8s.CoreV1Api); + + // Get worker status + const podList = await coreApi.listNamespacedPod( + `worker-${config.sandboxId}`, + undefined, undefined, undefined, undefined, + `job-name=worker-${workerHash}-job`, + ); + + const workerDetails = podList.body.items[0]; + + if (!workerDetails) { + return response; + } + + response.worker.status = workerDetails.status.phase; + + let containerStatus = {}; + + if (workerDetails.status.containerStatuses.length >= 0) { + containerStatus = workerDetails.status.containerStatuses.reduce((accumulator, current) => ({ + ...accumulator, + started: accumulator.started && current.started, + ready: accumulator.ready && current.ready, + restartCount: Math.max(accumulator.restartCount, current.restartCount), + }), { + started: true, + ready: true, + restartCount: -111, + }); + } + + return { + ...response, + worker: { + ...response.worker, + ...containerStatus, + }, + }; +}; + +module.exports = getWorkerStatus; diff --git a/src/api/route-services/experiment.js b/src/api/route-services/experiment.js index 3301517d5..fb644c08a 100644 --- a/src/api/route-services/experiment.js +++ b/src/api/route-services/experiment.js @@ -5,6 +5,26 @@ const { } = require('../../utils/dynamoDb'); const NotFoundError = require('../../utils/NotFoundError'); +const getExperimentAttributes = async (tableName, experimentId, attributes) => { + const dynamodb = createDynamoDbInstance(); + const key = convertToDynamoDbRecord({ experimentId }); + + const params = { + TableName: tableName, + Key: key, + ProjectionExpression: attributes.join(), + }; + + const data = await dynamodb.getItem(params).promise(); + if (Object.keys(data).length === 0) { + throw new NotFoundError('Experiment does not exist.'); + } + + const prettyData = convertToJsObject(data.Item); + return prettyData; +}; + + class ExperimentService { constructor() { this.tableName = `experiments-${config.clusterEnv}`; @@ -14,46 +34,27 @@ class ExperimentService { } async getExperimentData(experimentId) { - const dynamodb = createDynamoDbInstance(); - let key = { experimentId }; - key = convertToDynamoDbRecord(key); - - const params = { - TableName: this.tableName, - Key: key, - ProjectionExpression: 'experimentId, experimentName', - }; - - const data = await dynamodb.getItem(params).promise(); - - if (Object.keys(data).length === 0) { - throw new NotFoundError('Experiment does not exist.'); - } - - const prettyData = convertToJsObject(data.Item); - return prettyData; + const data = await getExperimentAttributes(this.tableName, experimentId, ['experimentId', 'experimentName']); + return data; } async getCellSets(experimentId) { - const dynamodb = createDynamoDbInstance(); - let key = { experimentId }; - key = convertToDynamoDbRecord(key); - - const params = { - TableName: this.tableName, - Key: key, - ProjectionExpression: 'cellSets', - }; - - const data = await dynamodb.getItem(params).promise(); - - if (Object.keys(data).length === 0) { - throw new NotFoundError('Experiment does not exist.'); - } + const data = await getExperimentAttributes(this.tableName, experimentId, ['cellSets']); + return data; + } - const prettyData = convertToJsObject(data.Item); + async getProcessingConfig(experimentId) { + const data = await getExperimentAttributes(this.tableName, experimentId, ['processingConfig']); + return data; + } - return prettyData; + async getPipelineHandle(experimentId) { + const data = await getExperimentAttributes(this.tableName, experimentId, ['meta']); + return { + stateMachineArn: '', + executionArn: '', + ...data.meta.pipeline, + }; } async updateCellSets(experimentId, cellSetData) { @@ -76,28 +77,6 @@ class ExperimentService { return cellSetData; } - async getProcessingConfig(experimentId) { - const dynamodb = createDynamoDbInstance(); - let key = { experimentId }; - key = convertToDynamoDbRecord(key); - - const params = { - TableName: this.tableName, - Key: key, - ProjectionExpression: 'processingConfig', - }; - - const data = await dynamodb.getItem(params).promise(); - - if (Object.keys(data).length === 0) { - throw new NotFoundError('Experiment does not exist.'); - } - - const prettyData = convertToJsObject(data.Item); - - return prettyData; - } - async updateProcessingConfig(experimentId, processingConfig) { const dynamodb = createDynamoDbInstance(); @@ -132,7 +111,27 @@ class ExperimentService { const result = await dynamodb.updateItem(params).promise(); const prettyData = convertToJsObject(result.Attributes); + return prettyData; + } + + async savePipelineHandle(experimentId, handle) { + const dynamodb = createDynamoDbInstance(); + let key = { experimentId }; + + key = convertToDynamoDbRecord(key); + + const data = convertToDynamoDbRecord({ ':x': handle }); + + const params = { + TableName: this.tableName, + Key: key, + UpdateExpression: 'set meta.pipeline = :x', + ExpressionAttributeValues: data, + }; + + const result = await dynamodb.updateItem(params).promise(); + const prettyData = convertToJsObject(result.Attributes); return prettyData; } } diff --git a/src/api/routes/pipelines.js b/src/api/routes/pipelines.js index 0e10d5cdd..12ccdf7c0 100644 --- a/src/api/routes/pipelines.js +++ b/src/api/routes/pipelines.js @@ -1,4 +1,5 @@ const createPipeline = require('../general-services/pipeline-manage'); +const ExperimentService = require('../route-services/experiment'); const getBackendStatus = require('../general-services/backend-status'); const pipelineResponse = require('../route-services/pipeline-response'); const parseSNSMessage = require('../../utils/parse-sns-message'); @@ -14,8 +15,12 @@ module.exports = { 'pipelines#create': (req, res, next) => { const { processingConfig } = req.body; - createPipeline(req.params.experimentId, processingConfig) - .then((data) => res.json(data)) + createPipeline(req.params.experimentId, processingConfig || []) + .then((data) => { + const experimentService = new ExperimentService(); + experimentService.savePipelineHandle(req.params.experimentId, data) + .then(() => res.json(data)); + }) .catch(next); }, diff --git a/tests/api/general-services/__snapshots__/pipeline-manage.test.js.snap b/tests/api/general-services/__snapshots__/pipeline-manage.test.js.snap index 52cc05751..face09ee4 100644 --- a/tests/api/general-services/__snapshots__/pipeline-manage.test.js.snap +++ b/tests/api/general-services/__snapshots__/pipeline-manage.test.js.snap @@ -55,7 +55,7 @@ Array [ Object { "type": "return", "value": Object { - "input": "{}", + "input": "{\\"samples\\":[\\"single-branch-in-map-state\\"]}", "stateMachineArn": "test-machine", "traceHeader": undefined, }, @@ -77,6 +77,7 @@ Array [ "ErrorEquals": Array [ "EKS.409", ], + "Next": "EndOfPipeline", "ResultPath": "$.error-info", }, ], @@ -86,9 +87,7 @@ Array [ "CertificateAuthority": "AAAAAAAAAAA", "ClusterName": "biomage-test", "Endpoint": "https://test-endpoint.me/fgh", - "Method": "POST", - "Path": "/apis/batch/v1/namespaces/undefined/jobs", - "RequestBody": Object { + "Job": Object { "apiVersion": "batch/v1", "kind": "Job", "metadata": Object { @@ -144,9 +143,7 @@ Array [ "CertificateAuthority": "AAAAAAAAAAA", "ClusterName": "biomage-test", "Endpoint": "https://test-endpoint.me/fgh", - "Method": "POST", - "Path": "/apis/batch/v1/namespaces/undefined/jobs", - "RequestBody": Object { + "Job": Object { "apiVersion": "batch/v1", "kind": "Job", "metadata": Object { @@ -202,307 +199,306 @@ Array [ }, }, "Resource": "arn:aws:states:::eks:call", + "ResultPath": null, "Type": "Task", }, + "EndOfPipeline": Object { + "End": true, + "Type": "Pass", + }, "Filters": Object { - "Branches": Array [ - Object { - "StartAt": "CellSizeDistributionFilter", - "States": Object { - "CellSizeDistributionFilter": Object { - "Catch": Array [ - Object { - "ErrorEquals": Array [ - "EKS.409", - ], - "Next": "MitochondrialContentFilter", - "ResultPath": "$.error-info", + "ItemsPath": "$.samples", + "Iterator": Object { + "StartAt": "CellSizeDistributionFilter", + "States": Object { + "CellSizeDistributionFilter": Object { + "Catch": Array [ + Object { + "ErrorEquals": Array [ + "EKS.409", + ], + "Next": "MitochondrialContentFilter", + "ResultPath": "$.error-info", + }, + ], + "Comment": "Attempts to create a Kubernetes Job for the pipeline server. Will swallow a 409 (already exists) error.", + "Next": "MitochondrialContentFilter", + "Parameters": Object { + "CertificateAuthority": "AAAAAAAAAAA", + "ClusterName": "biomage-test", + "Endpoint": "https://test-endpoint.me/fgh", + "Job": Object { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": Object { + "name": "remoter-client-testExperimentId", }, - ], - "Comment": "Attempts to create a Kubernetes Job for the pipeline server. Will swallow a 409 (already exists) error.", - "Next": "MitochondrialContentFilter", - "Parameters": Object { - "CertificateAuthority": "AAAAAAAAAAA", - "ClusterName": "biomage-test", - "Endpoint": "https://test-endpoint.me/fgh", - "Method": "POST", - "Path": "/apis/batch/v1/namespaces/undefined/jobs", - "RequestBody": Object { - "apiVersion": "batch/v1", - "kind": "Job", - "metadata": Object { - "name": "remoter-client-testExperimentId", - }, - "spec": Object { - "template": Object { - "metadata": Object { - "name": "remoter-client-testExperimentId", - }, - "spec": Object { - "containers": Array [ - Object { - "args": Array [ - "{\\"experimentId\\":\\"testExperimentId\\",\\"taskName\\":\\"cellSizeDistribution\\",\\"config\\":{}}", - ], - "image": "MOCK_IMAGE_PATH", - "name": "remoter-client", - }, - ], - "restartPolicy": "Never", - }, + "spec": Object { + "template": Object { + "metadata": Object { + "name": "remoter-client-testExperimentId", + }, + "spec": Object { + "containers": Array [ + Object { + "args": Array [ + "{\\"experimentId\\":\\"testExperimentId\\",\\"taskName\\":\\"cellSizeDistribution\\",\\"config\\":{}}", + ], + "image": "MOCK_IMAGE_PATH", + "name": "remoter-client", + }, + ], + "restartPolicy": "Never", }, }, }, }, - "Resource": "arn:aws:states:::eks:runJob.sync", - "Retry": Array [ - Object { - "BackoffRate": 2, - "ErrorEquals": Array [ - "EKS.409", - ], - "IntervalSeconds": 1, - "MaxAttempts": 2, - }, - ], - "Type": "Task", }, - "ClassifierFilter": Object { - "Catch": Array [ - Object { - "ErrorEquals": Array [ - "EKS.409", - ], - "Next": "NumGenesVsNumUmisFilter", - "ResultPath": "$.error-info", + "Resource": "arn:aws:states:::eks:runJob.sync", + "Retry": Array [ + Object { + "BackoffRate": 2, + "ErrorEquals": Array [ + "EKS.409", + ], + "IntervalSeconds": 1, + "MaxAttempts": 2, + }, + ], + "Type": "Task", + }, + "ClassifierFilter": Object { + "Catch": Array [ + Object { + "ErrorEquals": Array [ + "EKS.409", + ], + "Next": "NumGenesVsNumUmisFilter", + "ResultPath": "$.error-info", + }, + ], + "Comment": "Attempts to create a Kubernetes Job for the pipeline server. Will swallow a 409 (already exists) error.", + "Next": "NumGenesVsNumUmisFilter", + "Parameters": Object { + "CertificateAuthority": "AAAAAAAAAAA", + "ClusterName": "biomage-test", + "Endpoint": "https://test-endpoint.me/fgh", + "Job": Object { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": Object { + "name": "remoter-client-testExperimentId", }, - ], - "Comment": "Attempts to create a Kubernetes Job for the pipeline server. Will swallow a 409 (already exists) error.", - "Next": "NumGenesVsNumUmisFilter", - "Parameters": Object { - "CertificateAuthority": "AAAAAAAAAAA", - "ClusterName": "biomage-test", - "Endpoint": "https://test-endpoint.me/fgh", - "Method": "POST", - "Path": "/apis/batch/v1/namespaces/undefined/jobs", - "RequestBody": Object { - "apiVersion": "batch/v1", - "kind": "Job", - "metadata": Object { - "name": "remoter-client-testExperimentId", - }, - "spec": Object { - "template": Object { - "metadata": Object { - "name": "remoter-client-testExperimentId", - }, - "spec": Object { - "containers": Array [ - Object { - "args": Array [ - "{\\"experimentId\\":\\"testExperimentId\\",\\"taskName\\":\\"classifier\\",\\"config\\":{}}", - ], - "image": "MOCK_IMAGE_PATH", - "name": "remoter-client", - }, - ], - "restartPolicy": "Never", - }, + "spec": Object { + "template": Object { + "metadata": Object { + "name": "remoter-client-testExperimentId", + }, + "spec": Object { + "containers": Array [ + Object { + "args": Array [ + "{\\"experimentId\\":\\"testExperimentId\\",\\"taskName\\":\\"classifier\\",\\"config\\":{}}", + ], + "image": "MOCK_IMAGE_PATH", + "name": "remoter-client", + }, + ], + "restartPolicy": "Never", }, }, }, }, - "Resource": "arn:aws:states:::eks:runJob.sync", - "Retry": Array [ - Object { - "BackoffRate": 2, - "ErrorEquals": Array [ - "EKS.409", - ], - "IntervalSeconds": 1, - "MaxAttempts": 2, - }, - ], - "Type": "Task", }, - "DoubletScoresFilter": Object { - "Catch": Array [ - Object { - "ErrorEquals": Array [ - "EKS.409", - ], - "ResultPath": "$.error-info", + "Resource": "arn:aws:states:::eks:runJob.sync", + "Retry": Array [ + Object { + "BackoffRate": 2, + "ErrorEquals": Array [ + "EKS.409", + ], + "IntervalSeconds": 1, + "MaxAttempts": 2, + }, + ], + "Type": "Task", + }, + "DoubletScoresFilter": Object { + "Catch": Array [ + Object { + "ErrorEquals": Array [ + "EKS.409", + ], + "Next": "EndOfMap", + "ResultPath": "$.error-info", + }, + ], + "Comment": "Attempts to create a Kubernetes Job for the pipeline server. Will swallow a 409 (already exists) error.", + "End": true, + "Parameters": Object { + "CertificateAuthority": "AAAAAAAAAAA", + "ClusterName": "biomage-test", + "Endpoint": "https://test-endpoint.me/fgh", + "Job": Object { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": Object { + "name": "remoter-client-testExperimentId", }, - ], - "Comment": "Attempts to create a Kubernetes Job for the pipeline server. Will swallow a 409 (already exists) error.", - "End": true, - "Parameters": Object { - "CertificateAuthority": "AAAAAAAAAAA", - "ClusterName": "biomage-test", - "Endpoint": "https://test-endpoint.me/fgh", - "Method": "POST", - "Path": "/apis/batch/v1/namespaces/undefined/jobs", - "RequestBody": Object { - "apiVersion": "batch/v1", - "kind": "Job", - "metadata": Object { - "name": "remoter-client-testExperimentId", - }, - "spec": Object { - "template": Object { - "metadata": Object { - "name": "remoter-client-testExperimentId", - }, - "spec": Object { - "containers": Array [ - Object { - "args": Array [ - "{\\"experimentId\\":\\"testExperimentId\\",\\"taskName\\":\\"doubletScores\\",\\"config\\":{\\"filterSettings\\":{\\"oneSetting\\":1},\\"oneSample\\":{\\"filterSettings\\":{\\"oneSetting\\":7}},\\"otherSample\\":{\\"filterSettings\\":{\\"oneSetting\\":15}}}}", - ], - "image": "MOCK_IMAGE_PATH", - "name": "remoter-client", - }, - ], - "restartPolicy": "Never", - }, + "spec": Object { + "template": Object { + "metadata": Object { + "name": "remoter-client-testExperimentId", + }, + "spec": Object { + "containers": Array [ + Object { + "args": Array [ + "{\\"experimentId\\":\\"testExperimentId\\",\\"taskName\\":\\"doubletScores\\",\\"config\\":{\\"filterSettings\\":{\\"oneSetting\\":1},\\"oneSample\\":{\\"filterSettings\\":{\\"oneSetting\\":7}},\\"otherSample\\":{\\"filterSettings\\":{\\"oneSetting\\":15}}}}", + ], + "image": "MOCK_IMAGE_PATH", + "name": "remoter-client", + }, + ], + "restartPolicy": "Never", }, }, }, }, - "Resource": "arn:aws:states:::eks:runJob.sync", - "Retry": Array [ - Object { - "BackoffRate": 2, - "ErrorEquals": Array [ - "EKS.409", - ], - "IntervalSeconds": 1, - "MaxAttempts": 2, - }, - ], - "Type": "Task", }, - "MitochondrialContentFilter": Object { - "Catch": Array [ - Object { - "ErrorEquals": Array [ - "EKS.409", - ], - "Next": "ClassifierFilter", - "ResultPath": "$.error-info", + "Resource": "arn:aws:states:::eks:runJob.sync", + "Retry": Array [ + Object { + "BackoffRate": 2, + "ErrorEquals": Array [ + "EKS.409", + ], + "IntervalSeconds": 1, + "MaxAttempts": 2, + }, + ], + "Type": "Task", + }, + "EndOfMap": Object { + "End": true, + "Type": "Pass", + }, + "MitochondrialContentFilter": Object { + "Catch": Array [ + Object { + "ErrorEquals": Array [ + "EKS.409", + ], + "Next": "ClassifierFilter", + "ResultPath": "$.error-info", + }, + ], + "Comment": "Attempts to create a Kubernetes Job for the pipeline server. Will swallow a 409 (already exists) error.", + "Next": "ClassifierFilter", + "Parameters": Object { + "CertificateAuthority": "AAAAAAAAAAA", + "ClusterName": "biomage-test", + "Endpoint": "https://test-endpoint.me/fgh", + "Job": Object { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": Object { + "name": "remoter-client-testExperimentId", }, - ], - "Comment": "Attempts to create a Kubernetes Job for the pipeline server. Will swallow a 409 (already exists) error.", - "Next": "ClassifierFilter", - "Parameters": Object { - "CertificateAuthority": "AAAAAAAAAAA", - "ClusterName": "biomage-test", - "Endpoint": "https://test-endpoint.me/fgh", - "Method": "POST", - "Path": "/apis/batch/v1/namespaces/undefined/jobs", - "RequestBody": Object { - "apiVersion": "batch/v1", - "kind": "Job", - "metadata": Object { - "name": "remoter-client-testExperimentId", - }, - "spec": Object { - "template": Object { - "metadata": Object { - "name": "remoter-client-testExperimentId", - }, - "spec": Object { - "containers": Array [ - Object { - "args": Array [ - "{\\"experimentId\\":\\"testExperimentId\\",\\"taskName\\":\\"mitochondrialContent\\",\\"config\\":{}}", - ], - "image": "MOCK_IMAGE_PATH", - "name": "remoter-client", - }, - ], - "restartPolicy": "Never", - }, + "spec": Object { + "template": Object { + "metadata": Object { + "name": "remoter-client-testExperimentId", + }, + "spec": Object { + "containers": Array [ + Object { + "args": Array [ + "{\\"experimentId\\":\\"testExperimentId\\",\\"taskName\\":\\"mitochondrialContent\\",\\"config\\":{}}", + ], + "image": "MOCK_IMAGE_PATH", + "name": "remoter-client", + }, + ], + "restartPolicy": "Never", }, }, }, }, - "Resource": "arn:aws:states:::eks:runJob.sync", - "Retry": Array [ - Object { - "BackoffRate": 2, - "ErrorEquals": Array [ - "EKS.409", - ], - "IntervalSeconds": 1, - "MaxAttempts": 2, - }, - ], - "Type": "Task", }, - "NumGenesVsNumUmisFilter": Object { - "Catch": Array [ - Object { - "ErrorEquals": Array [ - "EKS.409", - ], - "Next": "DoubletScoresFilter", - "ResultPath": "$.error-info", + "Resource": "arn:aws:states:::eks:runJob.sync", + "Retry": Array [ + Object { + "BackoffRate": 2, + "ErrorEquals": Array [ + "EKS.409", + ], + "IntervalSeconds": 1, + "MaxAttempts": 2, + }, + ], + "Type": "Task", + }, + "NumGenesVsNumUmisFilter": Object { + "Catch": Array [ + Object { + "ErrorEquals": Array [ + "EKS.409", + ], + "Next": "DoubletScoresFilter", + "ResultPath": "$.error-info", + }, + ], + "Comment": "Attempts to create a Kubernetes Job for the pipeline server. Will swallow a 409 (already exists) error.", + "Next": "DoubletScoresFilter", + "Parameters": Object { + "CertificateAuthority": "AAAAAAAAAAA", + "ClusterName": "biomage-test", + "Endpoint": "https://test-endpoint.me/fgh", + "Job": Object { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": Object { + "name": "remoter-client-testExperimentId", }, - ], - "Comment": "Attempts to create a Kubernetes Job for the pipeline server. Will swallow a 409 (already exists) error.", - "Next": "DoubletScoresFilter", - "Parameters": Object { - "CertificateAuthority": "AAAAAAAAAAA", - "ClusterName": "biomage-test", - "Endpoint": "https://test-endpoint.me/fgh", - "Method": "POST", - "Path": "/apis/batch/v1/namespaces/undefined/jobs", - "RequestBody": Object { - "apiVersion": "batch/v1", - "kind": "Job", - "metadata": Object { - "name": "remoter-client-testExperimentId", - }, - "spec": Object { - "template": Object { - "metadata": Object { - "name": "remoter-client-testExperimentId", - }, - "spec": Object { - "containers": Array [ - Object { - "args": Array [ - "{\\"experimentId\\":\\"testExperimentId\\",\\"taskName\\":\\"numGenesVsNumUmis\\",\\"config\\":{}}", - ], - "image": "MOCK_IMAGE_PATH", - "name": "remoter-client", - }, - ], - "restartPolicy": "Never", - }, + "spec": Object { + "template": Object { + "metadata": Object { + "name": "remoter-client-testExperimentId", + }, + "spec": Object { + "containers": Array [ + Object { + "args": Array [ + "{\\"experimentId\\":\\"testExperimentId\\",\\"taskName\\":\\"numGenesVsNumUmis\\",\\"config\\":{}}", + ], + "image": "MOCK_IMAGE_PATH", + "name": "remoter-client", + }, + ], + "restartPolicy": "Never", }, }, }, }, - "Resource": "arn:aws:states:::eks:runJob.sync", - "Retry": Array [ - Object { - "BackoffRate": 2, - "ErrorEquals": Array [ - "EKS.409", - ], - "IntervalSeconds": 1, - "MaxAttempts": 2, - }, - ], - "Type": "Task", }, + "Resource": "arn:aws:states:::eks:runJob.sync", + "Retry": Array [ + Object { + "BackoffRate": 2, + "ErrorEquals": Array [ + "EKS.409", + ], + "IntervalSeconds": 1, + "MaxAttempts": 2, + }, + ], + "Type": "Task", }, }, - ], + }, "Next": "DataIntegration", - "Type": "Parallel", + "Type": "Map", }, "LaunchNewPipelineWorker": Object { "Catch": Array [ @@ -547,6 +543,7 @@ Array [ }, }, "Resource": "arn:aws:states:::eks:call", + "ResultPath": null, "Retry": Array [ Object { "BackoffRate": 2, @@ -632,7 +629,7 @@ Array [ Object { "type": "return", "value": Object { - "input": "{}", + "input": "{\\"samples\\":[\\"single-branch-in-map-state\\"]}", "stateMachineArn": "arn:aws:states:eu-west-1:test-account-id:stateMachine:biomage-pipeline-test-af3b5c3acaa6348165d8e5e73c70d93b732bf6db", "traceHeader": undefined, }, diff --git a/tests/api/general-services/pipeline-status.test.js b/tests/api/general-services/pipeline-status.test.js new file mode 100644 index 000000000..daee07180 --- /dev/null +++ b/tests/api/general-services/pipeline-status.test.js @@ -0,0 +1,279 @@ +const pipelineStatus = require('../../../src/api/general-services/pipeline-status'); +const ExperimentService = require('../../../src/api/route-services/experiment'); + +describe('getStepsFromExecutionHistory', () => { + const fullHistory = [ + { + type: 'ExecutionStarted', + id: 1, + previousEventId: 0, + }, + { + type: 'Dummy', + id: 'dummy-state-having-zero-as-previous', + previousEventId: 0, + }, + { + type: 'MapStateEntered', + id: 12, + previousEventId: 'dummy-state-having-zero-as-previous', + stateEnteredEventDetails: { + name: 'Filters', + }, + }, + { + type: 'MapStateStarted', + id: 13, + previousEventId: 12, + mapStateStartedEventDetails: { + length: 2, + }, + }, + { + type: 'MapIterationStarted', + id: 14, + previousEventId: 13, + mapIterationStartedEventDetails: { + name: 'Filters', + index: 0, + }, + }, + { + // Iteration 0 + type: 'TaskStateEntered', + id: 15, + previousEventId: 14, + stateEnteredEventDetails: { + name: 'CellSizeDistributionFilter', + }, + }, + { + // Iteration 0 + type: 'TaskSucceeded', + id: 16, + previousEventId: 15, + }, + { + // Iteration 0 + type: 'TaskStateExited', + id: 17, + previousEventId: 16, + stateExitedEventDetails: { + name: 'CellSizeDistributionFilter', + }, + }, + { + type: 'MapIterationStarted', + id: 18, + previousEventId: 13, + mapIterationStartedEventDetails: { + name: 'Filters', + index: 1, + }, + }, + { + // Iteration 0 + type: 'TaskStateEntered', + id: '19-before-anything-completed', + previousEventId: 17, + stateEnteredEventDetails: { + name: 'MitochondrialContentFilter', + }, + }, + { + // Iteration 0 + type: 'TaskSucceeded', + id: '20-one-comlpetion-vs-unstarted', + previousEventId: '19-before-anything-completed', + }, + { + // Iteration 1 + type: 'TaskStateEntered', + id: 21, + previousEventId: 18, + stateEnteredEventDetails: { + name: 'CellSizeDistributionFilter', + }, + }, + { + // Iteration 0 + type: 'TaskStateExited', + id: 22, + previousEventId: '20-one-comlpetion-vs-unstarted', + stateExitedEventDetails: { + name: 'MitochondrialContentFilter', + }, + }, + { + // Iteration 1 + type: 'TaskSucceeded', + id: 23, + previousEventId: 21, + }, + { + // Iteration 1 + type: 'TaskStateExited', + id: '24-two-completions-vs-zero', + previousEventId: 23, + stateExitedEventDetails: { + name: 'CellSizeDistributionFilter', + }, + }, + ]; + + const singleIterationHistory = [ + { + type: 'ExecutionStarted', + id: 1, + previousEventId: 0, + }, + { + type: 'Dummy', + id: 'dummy-state-having-zero-as-previous', + previousEventId: 0, + }, + { + type: 'MapStateEntered', + id: 12, + previousEventId: 'dummy-state-having-zero-as-previous', + stateEnteredEventDetails: { + name: 'Filters', + }, + }, + { + type: 'MapStateStarted', + id: 13, + previousEventId: 12, + mapStateStartedEventDetails: { + length: 1, + }, + }, + { + type: 'MapIterationStarted', + id: 14, + previousEventId: 13, + mapIterationStartedEventDetails: { + name: 'Filters', + index: 0, + }, + }, + { + type: 'TaskStateEntered', + id: 15, + previousEventId: 14, + stateEnteredEventDetails: { + name: 'CellSizeDistributionFilter', + }, + }, + { + type: 'TaskSucceeded', + id: 16, + previousEventId: 15, + }, + { + type: 'TaskStateExited', + id: 17, + previousEventId: 16, + stateExitedEventDetails: { + name: 'CellSizeDistributionFilter', + }, + }, + { + type: 'MapIterationSucceeded', + id: 18, + previousEventId: 17, + mapIterationSucceededEventDetails: { + name: 'Filters', + index: 0, + }, + }, + { + type: 'MapStateSucceeded', + id: 19, + previousEventId: 18, + }, + { + type: 'MapStateExited', + id: 20, + previousEventId: 19, + stateExitedEventDetails: { + name: 'Filters', + }, + }, + { + type: 'TaskStateEntered', + id: 21, + previousEventId: 20, + stateEnteredEventDetails: { + name: 'DataIntegration', + }, + }, + { + type: 'TaskSucceeded', + id: 22, + previousEventId: 21, + }, + { + type: 'TaskStateExited', + id: 23, + previousEventId: 22, + stateExitedEventDetails: { + name: 'DataIntegration', + }, + }, + ]; + + + const truncateHistory = (lastEventId) => { + const lastEventIndex = fullHistory.findIndex((element) => element.id === lastEventId); + return fullHistory.slice(0, lastEventIndex + 1); + }; + + it('returns empty array if nothing has been completed', () => { + const events = truncateHistory('19-before-anything-completed'); + const completedSteps = pipelineStatus.getStepsFromExecutionHistory({ events }); + expect(completedSteps).toEqual([]); + }); + + it('returns empty array if any branch has not started', () => { + const events = truncateHistory('20-one-comlpetion-vs-unstarted'); + const completedSteps = pipelineStatus.getStepsFromExecutionHistory({ events }); + expect(completedSteps).toEqual([]); + }); + + it('returns steps completed in all branches', () => { + const events = truncateHistory('24-two-completions-vs-zero'); + const completedSteps = pipelineStatus.getStepsFromExecutionHistory({ events }); + expect(completedSteps).toEqual(['CellSizeDistributionFilter']); + }); + + it('returns only the steps contained in the Map for one-element iterations', () => { + const history = { events: singleIterationHistory }; + const completedSteps = pipelineStatus.getStepsFromExecutionHistory(history); + expect(completedSteps).toEqual(['CellSizeDistributionFilter']); + }); +}); + +jest.mock('../../../src/api/route-services/experiment', () => jest.fn().mockImplementation(() => ({ + getPipelineHandle: () => ({ + stateMachineArn: '', + executionArn: '', + }), +}))); + +describe('pipelineStatus', () => { + beforeEach(() => { + ExperimentService.mockClear(); + }); + it('handles properly an empty dynamodb record', async () => { + const status = await pipelineStatus('1234'); + expect(status).toEqual({ + pipeline: { + startDate: null, + stopDate: null, + status: 'NotCreated', + completedSteps: [], + }, + }); + }); +}); diff --git a/tests/api/general-services/backend-status.test.js b/tests/api/general-services/worker-status.test.js similarity index 88% rename from tests/api/general-services/backend-status.test.js rename to tests/api/general-services/worker-status.test.js index 620ed6991..8eba67644 100644 --- a/tests/api/general-services/backend-status.test.js +++ b/tests/api/general-services/worker-status.test.js @@ -1,9 +1,9 @@ const k8s = require('@kubernetes/client-node'); -const getBackendStatus = require('../../../src/api/general-services/backend-status'); +const getWorkerStatus = require('../../../src/api/general-services/worker-status'); -describe('Get backend status path', () => { +describe('Get worker status', () => { beforeEach(() => jest.resetModules()); it('Returns up when the worker is up.', async () => { @@ -31,7 +31,7 @@ describe('Get backend status path', () => { })); - const result = await getBackendStatus('sample-experiment'); + const result = await getWorkerStatus('sample-experiment'); expect(result).toEqual({ worker: { @@ -65,7 +65,7 @@ describe('Get backend status path', () => { })); - const result = await getBackendStatus('sample-experiment'); + const result = await getWorkerStatus('sample-experiment'); expect(result).toEqual({ worker: { @@ -99,7 +99,7 @@ describe('Get backend status path', () => { })); - const result = await getBackendStatus('sample-experiment'); + const result = await getWorkerStatus('sample-experiment'); expect(result).toEqual({ worker: { diff --git a/tests/api/route-services/experiment.test.js b/tests/api/route-services/experiment.test.js index 5988da300..37a385bd8 100644 --- a/tests/api/route-services/experiment.test.js +++ b/tests/api/route-services/experiment.test.js @@ -8,56 +8,51 @@ describe('tests for the experiment service', () => { AWSMock.restore('DynamoDB'); }); - it('Get experiment data works', async (done) => { - const unmarshalledData = { - Item: { - experimentId: { S: '12345' }, - experimentName: { S: 'TGFB1 experiment' }, - }, - }; - - const marshalledData = { - experimentId: '12345', - experimentName: 'TGFB1 experiment', + const mockDynamoGetItem = (jsData) => { + const dynamodbData = { + Item: AWS.DynamoDB.Converter.marshall(jsData), }; - - const e = new ExperimentService(); - const getItemSpy = jest.fn((x) => x); AWSMock.setSDKInstance(AWS); AWSMock.mock('DynamoDB', 'getItem', (params, callback) => { getItemSpy(params); - callback(null, unmarshalledData); + callback(null, dynamodbData); + }); + return getItemSpy; + }; + + const mockDynamoUpdateItem = () => { + const updateItemSpy = jest.fn((x) => x); + AWSMock.setSDKInstance(AWS); + AWSMock.mock('DynamoDB', 'updateItem', (params, callback) => { + updateItemSpy(params); + callback(null, {}); // We do not care about the return value here, it is not used. }); + return updateItemSpy; + }; - e.getExperimentData('12345') + it('Get experiment data works', async (done) => { + const jsData = { + experimentId: '12345', + experimentName: 'TGFB1 experiment', + }; + + const getItemSpy = mockDynamoGetItem(jsData); + + (new ExperimentService()).getExperimentData('12345') .then((data) => { - expect(data).toEqual(marshalledData); + expect(data).toEqual(jsData); expect(getItemSpy).toHaveBeenCalledWith({ TableName: 'experiments-test', Key: { experimentId: { S: '12345' } }, - ProjectionExpression: 'experimentId, experimentName', + ProjectionExpression: 'experimentId,experimentName', }); }) .then(() => done()); }); it('Get cell sets works', async (done) => { - const e = new ExperimentService(); - - const unmarshalledData = { - Item: { - cellSets: { - L: [ - { M: { key: { N: 1 }, name: { S: 'set 1' }, color: { S: '#008DA6' } } }, - { M: { key: { N: 2 }, name: { S: 'set 2' }, color: { S: '#008D56' } } }, - { M: { key: { N: 3 }, name: { S: 'set 3' }, rootNode: { BOOL: true } } }, - ], - }, - }, - }; - - const marshalledData = { + const jsData = { cellSets: [ { key: 1, name: 'set 1', color: '#008DA6' }, { key: 2, name: 'set 2', color: '#008D56' }, @@ -65,16 +60,11 @@ describe('tests for the experiment service', () => { ], }; - AWSMock.setSDKInstance(AWS); - const getItemSpy = jest.fn((x) => x); - AWSMock.mock('DynamoDB', 'getItem', (params, callback) => { - getItemSpy(params); - callback(null, unmarshalledData); - }); + const getItemSpy = mockDynamoGetItem(jsData); - e.getCellSets('12345') + (new ExperimentService()).getCellSets('12345') .then((data) => { - expect(data).toEqual(marshalledData); + expect(data).toEqual(jsData); expect(getItemSpy).toHaveBeenCalledWith( { TableName: 'experiments-test', @@ -87,9 +77,7 @@ describe('tests for the experiment service', () => { }); it('Update experiment cell sets works', async (done) => { - const e = new ExperimentService(); - - const testData = [ + const jsTestData = [ { name: 'Empty cluster', key: 'empty', @@ -99,24 +87,18 @@ describe('tests for the experiment service', () => { }, ]; - AWSMock.setSDKInstance(AWS); - const putItemSpy = jest.fn((x) => x); - AWSMock.mock('DynamoDB', 'updateItem', (params, callback) => { - putItemSpy(params); - callback(null, []); // We do not care about the return value here, it is not used. - }); - - const marshalledTestData = AWS.DynamoDB.Converter.marshall({ ':x': testData }); + const updateItemSpy = mockDynamoUpdateItem(); + const dynamoTestData = AWS.DynamoDB.Converter.marshall({ ':x': jsTestData }); - e.updateCellSets('12345', testData) + (new ExperimentService()).updateCellSets('12345', jsTestData) .then((returnValue) => { - expect(returnValue).toEqual(testData); - expect(putItemSpy).toHaveBeenCalledWith( + expect(returnValue).toEqual(jsTestData); + expect(updateItemSpy).toHaveBeenCalledWith( { TableName: 'experiments-test', Key: { experimentId: { S: '12345' } }, UpdateExpression: 'set cellSets = :x', - ExpressionAttributeValues: marshalledTestData, + ExpressionAttributeValues: dynamoTestData, }, ); }) @@ -124,40 +106,7 @@ describe('tests for the experiment service', () => { }); it('Get processing config works', async (done) => { - const e = new ExperimentService(); - - const unmarshalledData = { - Item: { - processing: { - M: { - cellSizeDistribution: { - M: { - enabled: { BOOL: true }, - filterSettings: { - M: { - minCellSize: { N: '10800' }, - binStep: { N: '200' }, - }, - }, - }, - }, - classifier: { - M: { - enabled: { BOOL: true }, - filterSettings: { - M: { - minProbabiliy: { N: '0.8' }, - filterThreshold: { N: -1 }, - }, - }, - }, - }, - }, - }, - }, - }; - - const marshalledData = { + const jsData = { processing: { cellSizeDistribution: { enabled: true, @@ -176,16 +125,11 @@ describe('tests for the experiment service', () => { }, }; - AWSMock.setSDKInstance(AWS); - const getItemSpy = jest.fn((x) => x); - AWSMock.mock('DynamoDB', 'getItem', (params, callback) => { - getItemSpy(params); - callback(null, unmarshalledData); - }); + const getItemSpy = mockDynamoGetItem(jsData); - e.getProcessingConfig('12345') + (new ExperimentService()).getProcessingConfig('12345') .then((data) => { - expect(data).toEqual(marshalledData); + expect(data).toEqual(jsData); expect(getItemSpy).toHaveBeenCalledWith( { TableName: 'experiments-test', @@ -198,8 +142,6 @@ describe('tests for the experiment service', () => { }); it('Update processing config works', async (done) => { - const e = new ExperimentService(); - const testData = [ { name: 'classifier', @@ -213,16 +155,11 @@ describe('tests for the experiment service', () => { }, ]; - AWSMock.setSDKInstance(AWS); - const putItemSpy = jest.fn((x) => x); - AWSMock.mock('DynamoDB', 'updateItem', (params, callback) => { - putItemSpy(params); - callback(null, {}); // We do not care about the return value here, it is not used. - }); + const updateItemSpy = mockDynamoUpdateItem(); - e.updateProcessingConfig('12345', testData) + (new ExperimentService()).updateProcessingConfig('12345', testData) .then(() => { - expect(putItemSpy).toHaveBeenCalledWith( + expect(updateItemSpy).toHaveBeenCalledWith( { TableName: 'experiments-test', Key: { experimentId: { S: '12345' } }, @@ -249,4 +186,59 @@ describe('tests for the experiment service', () => { }) .then(() => done()); }); + + it('Get Pipeline Handle works', async (done) => { + const handle = { + stateMachineArn: 'STATE-MACHINE-ID', + executionArn: '', + }; + + const jsData = { + meta: { + pipeline: { + stateMachineArn: handle.stateMachineArn, + }, + organism: 'mmusculus', + type: '10x', + }, + }; + + const getItemSpy = mockDynamoGetItem(jsData); + + (new ExperimentService()).getPipelineHandle('12345') + .then((data) => { + expect(data).toEqual(handle); + expect(getItemSpy).toHaveBeenCalledWith( + { + TableName: 'experiments-test', + Key: { experimentId: { S: '12345' } }, + ProjectionExpression: 'meta', + }, + ); + }) + .then(() => done()); + }); + + it('Set Pipeline Handle works', async (done) => { + const jsTestData = { + stateMachineArn: 'STATE-MACHINE-ID', + executionArn: 'EXECUTION-ID', + }; + + const updateItemSpy = mockDynamoUpdateItem(); + const dynamoTestData = AWS.DynamoDB.Converter.marshall({ ':x': jsTestData }); + + (new ExperimentService()).savePipelineHandle('12345', jsTestData) + .then(() => { + expect(updateItemSpy).toHaveBeenCalledWith( + { + TableName: 'experiments-test', + Key: { experimentId: { S: '12345' } }, + UpdateExpression: 'set meta.pipeline = :x', + ExpressionAttributeValues: dynamoTestData, + }, + ); + }) + .then(() => done()); + }); });