Skip to content

Commit

Permalink
BIOMAGE-513 Report back state of pipeline from an API endpoint (#60)
Browse files Browse the repository at this point in the history
* Make room for pipeline status

* Fix typo in method call

* Support ExperimentService.getPipelineHandle()

* Store state machine execution arn

* Save execution arn in the right place

* Retrieve execution arn from the right place

* Report the SM execution status

* Use Map for State Machine parallelization

* Inspect execution history to get completed steps

* Allow GETing unitilized pipelines

* Fix SM definition for non local

* Cleaner definition of next steps on Catch

- Fixed local SM definition, broken in previous commit

* CompleteSteps need to belong to the Map
  • Loading branch information
xverges authored Mar 12, 2021
1 parent 6995912 commit d205e4a
Show file tree
Hide file tree
Showing 12 changed files with 966 additions and 524 deletions.
76 changes: 6 additions & 70 deletions src/api/general-services/backend-status.js
Original file line number Diff line number Diff line change
@@ -1,78 +1,14 @@
const k8s = require('@kubernetes/client-node');
const crypto = require('crypto');
const config = require('../../config');
const pipelineStatus = require('./pipeline-status');
const workerStatus = require('./worker-status');


const kc = new k8s.KubeConfig();
kc.loadFromDefault();

const getBackendStatus = async (experimentId) => {
const response = {
worker: {
status: 'NotLaunched',
started: false,
ready: false,
restartCount: 0,
},
};

// This will not work in development as we are not in a cluster.
// Always show the worker as 'up'.
if (config.clusterEnv === 'development') {
return {
...response,
worker: {
...response.worker,
status: 'Running',
started: true,
ready: true,
},
};
}

const workerHash = crypto
.createHash('sha1')
.update(`${experimentId}-${config.sandboxId}`)
.digest('hex');

const coreApi = kc.makeApiClient(k8s.CoreV1Api);

// Get worker status
const podList = await coreApi.listNamespacedPod(
`worker-${config.sandboxId}`,
undefined, undefined, undefined, undefined,
`job-name=worker-${workerHash}-job`,
const [{ pipeline }, { worker }] = await Promise.all(
[pipelineStatus(experimentId), workerStatus(experimentId)],
);

const workerDetails = podList.body.items[0];

if (!workerDetails) {
return response;
}

response.worker.status = workerDetails.status.phase;

let containerStatus = {};

if (workerDetails.status.containerStatuses.length >= 0) {
containerStatus = workerDetails.status.containerStatuses.reduce((accumulator, current) => ({
...accumulator,
started: accumulator.started && current.started,
ready: accumulator.ready && current.ready,
restartCount: Math.max(accumulator.restartCount, current.restartCount),
}), {
started: true,
ready: true,
restartCount: -111,
});
}

return {
...response,
worker: {
...response.worker,
...containerStatus,
},
pipeline,
worker,
};
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ const createNewJobIfNotExist = (context, step) => {
detached: true,
},
},
Catch: [
{
ErrorEquals: ['States.ALL'],
ResultPath: '$.error-info',
Next: step.XNextOnCatch || step.Next,
},
],
};
}

Expand Down Expand Up @@ -70,7 +77,7 @@ const createNewJobIfNotExist = (context, step) => {
{
ErrorEquals: ['EKS.409'],
ResultPath: '$.error-info',
Next: step.Next,
Next: step.XNextOnCatch || step.Next,
},
],
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ const createNewStep = (context, step, args) => {
detached: false,
},
},
Catch: [
{
ErrorEquals: ['States.ALL'],
ResultPath: '$.error-info',
Next: step.XNextOnCatch || step.Next,
},
],
};
}

Expand All @@ -41,9 +48,8 @@ const createNewStep = (context, step, args) => {
ClusterName: clusterInfo.name,
CertificateAuthority: clusterInfo.certAuthority,
Endpoint: clusterInfo.endpoint,
Method: 'POST',
Path: `/apis/batch/v1/namespaces/${config.workerNamespace}/jobs`,
RequestBody: {
Namespace: config.workerNamespace,
Job: {
apiVersion: 'batch/v1',
kind: 'Job',
metadata: {
Expand Down Expand Up @@ -82,7 +88,7 @@ const createNewStep = (context, step, args) => {
{
ErrorEquals: ['EKS.409'],
ResultPath: '$.error-info',
Next: step.Next,
Next: step.XNextOnCatch || step.Next,
},
],
};
Expand Down
24 changes: 20 additions & 4 deletions src/api/general-services/pipeline-manage/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ const executeStateMachine = async (stateMachineArn) => {

const { executionArn } = await stepFunctions.startExecution({
stateMachineArn,
input: '{}',
input: JSON.stringify({
samples: ['single-branch-in-map-state'],
}),
traceHeader: traceId,
}).promise();

Expand Down Expand Up @@ -143,15 +145,18 @@ const createPipeline = async (experimentId, processingConfigUpdates) => {
DeleteCompletedPipelineWorker: {
XStepType: 'delete-completed-jobs',
Next: 'LaunchNewPipelineWorker',
ResultPath: null,
},
LaunchNewPipelineWorker: {
XStepType: 'create-new-job-if-not-exist',
Next: 'Filters',
ResultPath: null,
},
Filters: {
Type: 'Parallel',
Type: 'Map',
Next: 'DataIntegration',
Branches: [{
ItemsPath: '$.samples',
Iterator: {
StartAt: 'CellSizeDistributionFilter',
States: {
CellSizeDistributionFilter: {
Expand Down Expand Up @@ -187,10 +192,15 @@ const createPipeline = async (experimentId, processingConfigUpdates) => {
XConstructorArgs: {
taskName: 'doubletScores',
},
XNextOnCatch: 'EndOfMap',
End: true,
},
EndOfMap: {
Type: 'Pass',
End: true,
},
},
}],
},
},
DataIntegration: {
XStepType: 'create-new-step',
Expand All @@ -204,6 +214,11 @@ const createPipeline = async (experimentId, processingConfigUpdates) => {
XConstructorArgs: {
taskName: 'configureEmbedding',
},
XNextOnCatch: 'EndOfPipeline',
End: true,
},
EndOfPipeline: {
Type: 'Pass',
End: true,
},
},
Expand All @@ -216,6 +231,7 @@ const createPipeline = async (experimentId, processingConfigUpdates) => {
...constructPipelineStep(context, o),
XStepType: undefined,
XConstructorArgs: undefined,
XNextOnCatch: undefined,
};
}
return undefined;
Expand Down
126 changes: 126 additions & 0 deletions src/api/general-services/pipeline-status.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
const AWS = require('../../utils/requireAWS');
const ExperimentService = require('../route-services/experiment');
const config = require('../../config');
const logger = require('../../utils/logging');

const getStepsFromExecutionHistory = (history) => {
const { events } = history;
class Branch {
constructor(event, makeRoot) {
this.visited = [event.id];
this.currentState = '';
this.completedTasks = [];
this.branches = {};
this.branchCount = 0;
this.updateCurrentState(event);
if (makeRoot) {
this.visited.push(event.previousEventId);
}
}

allBranchesStarted() {
return this.branchCount === Object.values(this.branches).length;
}

updateCurrentState(event) {
if ('stateEnteredEventDetails' in event) {
this.currentState = event.stateEnteredEventDetails.name;
}
}

nextConsumer(event) {
if (this.visited.includes(event.previousEventId)) {
return this;
}
if (event.type === 'MapStateExited') {
return this;
}
const branches = Object.values(this.branches);
for (let ii = 0; ii < branches.length; ii += 1) {
const candidate = branches[ii];
const consumer = candidate.nextConsumer(event);
if (consumer) {
return consumer;
}
}
return null;
}

consume(event) {
if (event.type === 'MapIterationStarted') {
this.branches[event.mapIterationStartedEventDetails.index] = new Branch(event);
} else {
this.visited.push(event.id);
this.updateCurrentState(event);
if (event.type === 'TaskSucceeded') {
this.completedTasks.push(this.currentState);
} else if (event.type === 'MapStateStarted') {
this.branchCount = event.mapStateStartedEventDetails.length;
}
}
}
}
const main = new Branch(events[0], true);
for (let ii = 1; ii < events.length; ii += 1) {
const consumer = main.nextConsumer(events[ii]);
consumer.consume(events[ii]);
}
let shortestCompleted = null;
if (main.allBranchesStarted()) {
const branches = Object.values(main.branches);
for (let ii = 0; ii < branches.length; ii += 1) {
if (!shortestCompleted || branches[ii].completedTasks.length < shortestCompleted.length) {
shortestCompleted = branches[ii].completedTasks;
}
}
}
return shortestCompleted || [];
};

/*
* Return the `completedSteps` of the state machine (SM) associated to the `experimentId`'s pipeline
* The code assumes that
* - the relevant states for the steps are defined within a Map of the SM
* - the relevant Map is the first Map in the SM
* - a step is only considered completed if it has been completed for all iteration of the Map
* - steps are returned in the completion order, and are unique in the returned array
*/
const getPipelineStatus = async (experimentId) => {
const { executionArn } = await (new ExperimentService()).getPipelineHandle(experimentId);
let execution = {};
let completedSteps = [];
if (!executionArn.length) {
execution = {
startDate: null,
stopDate: null,
status: 'NotCreated',
};
} else {
const stepFunctions = new AWS.StepFunctions({
region: config.awsRegion,
});
execution = await stepFunctions.describeExecution({
executionArn,
}).promise();
const history = await stepFunctions.getExecutionHistory({
executionArn,
includeExecutionData: false,
}).promise();

completedSteps = getStepsFromExecutionHistory(history);
logger.log(`ExecutionHistory for ARN ${executionArn}: ${history.events.length} events, ${completedSteps.length} completed steps`);
}

const response = {
pipeline: {
startDate: execution.startDate,
stopDate: execution.stopDate,
status: execution.status,
completedSteps,
},
};
return response;
};

module.exports = getPipelineStatus;
module.exports.getStepsFromExecutionHistory = getStepsFromExecutionHistory;
Loading

0 comments on commit d205e4a

Please sign in to comment.