Skip to content

Commit

Permalink
fix: hooks for marker & embedding requests using correct ETag
Browse files Browse the repository at this point in the history
Signed-off-by: Pol Alvarez <[email protected]>
  • Loading branch information
kafkasl committed Nov 29, 2023
1 parent 381fd6e commit 5879ba8
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 140 deletions.
20 changes: 16 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ API
======

A nodejs service that sits in between the UI, the Cellenics backends and the Data store and does the following:
- Authorizes and validates requests.

- Authorizes and validates requests.
- Creates and starts gem2s and qc state machines for data processing tasks.
- Creates a SQS queue and assigns available worker to an experiement that needs one for data analysis tasks.
- Deletes worker pods that are no longer needed.
- Listens to broadcasted messages by the worker on Redis and processes the message that's relevant to it (based on the value of `socketId` in the message body).
See the [Emitters](https://socket.io/docs/v4/redis-adapter/#emitter) section in the Redis adapter documention for more details about how the worker-API communication happens.
See the [Emitters](https://socket.io/docs/v4/redis-adapter/#emitter) section in the Redis adapter documention for more details about how the worker-API communication happens.
- Communicates to the UI via socket connections to send the status of worker requests and details about where they can be found.
- Contains HTTP endpoints that allow programmatic access and modification to experiment data by authorized users.

Expand Down Expand Up @@ -66,6 +67,7 @@ If you haven't completed step 1 and run Inframock locally, you should see the fo
[2021-01-03T11:51:36.310Z] Server listening on port: 3000
[2021-01-03T11:51:36.312Z] redis:reader An error occurred: connect ECONNREFUSED 127.0.0.1:6379
```

The reason for the Redis error is that the API is not connected with the rest of the platform.

If you did complete step 1 and are runnig Inframock locally, you should see the following output on your terminal:
Expand All @@ -90,6 +92,7 @@ If you did complete step 1 and are runnig Inframock locally, you should see the
```

#### Step 3. Run the UI locally

This is required to run the API with a local version of the UI.
Go to the [UI repo](https://github.com/hms-dbmi-cellenics/ui) and follow the instructions in the README to set it up and start it on a separate terminal.
After the UI service is started, any request from the UI will be automatically forwarded to the API and vice versa.
Expand All @@ -108,9 +111,18 @@ After the worker service is started, any request from the API will be automatica

## Deployment

The API is deployed as a Helm chart to an AWS-managed Kubernetes cluster. The deployment is handled by the cluster Helm operator and the [API Github Actions workflow](https://github.com/hms-dbmi-cellenics/api/blob/master/.github/workflows/ci.yaml).
The API is deployed as a Helm chart to an AWS-managed Kubernetes cluster. The deployment is handled by the cluster Helm operator and the [API Github Actions workflow](https://github.com/hms-dbmi-cellenics/api/blob/master/.github/workflows/ci.yaml).

During a deployment, API Github Actions workflow does the following:

- It pushes new API images to ECR.
- Adds API-specific configurations to the [nodejs Helm chart](https://github.com/hms-dbmi-cellenics/iac/tree/master/charts/nodejs), that is used for the deployment of the API.
- Adds API-specific configurations to the [nodejs Helm chart](https://github.com/hms-dbmi-cellenics/iac/tree/master/charts/nodejs), that is used for the deployment of the API.
- Pushes the API-specific configuration changes into the [releases/](https://github.com/hms-dbmi-cellenics/iac/tree/master/releases) folder in iac, under the relevant environment.

## Cache

By default development and staging will run without cache to make development faster. The exception is `GetEmbedding` tasks because other tasks like download seurat object or trajectory analysis depend on the original embedding ETag to work.

If you want to enable cache locally you can do so by setting the flag `USE_CACHE=true`. E.g. run the api with:

`USE_CACHE=true make run`
5 changes: 5 additions & 0 deletions src/api.v2/helpers/worker/generateEtag.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ const DISABLE_UNIQUE_KEYS = [
// however trajectory & download rds depend on the embedding's ETag so we have to
// disable the unique keys for the embeddings task
const getCacheUniquenessKey = (taskName) => {
// allow people to enable cache in development by setting USE_CACHE=true
if (process.env.USE_CACHE === 'true') {
return null;
}

if (config.clusterEnv !== 'production' && !DISABLE_UNIQUE_KEYS.includes(taskName)) {
return Math.random();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
const getCellSetsAffectingDownsampling = async (_experimentId, body, cellSets) => {
// If not downsampling, then there's no dependency set by this getter
if (!body.downsampleSettings) return '';

const { selectedCellSet, groupedTracks } = body.downsampleSettings;

const selectedCellSetKeys = cellSets
.find(({ key }) => key === selectedCellSet)
.children.map(({ key }) => key);

const groupedCellSetKeys = cellSets
.filter(({ key }) => groupedTracks.includes(key))
.flatMap(({ children }) => children)
.map(({ key }) => key);

// Keep them in separate lists, they each represent different changes in the settings
return [selectedCellSetKeys, groupedCellSetKeys];
};

module.exports = getCellSetsAffectingDownsampling;
22 changes: 2 additions & 20 deletions src/api.v2/helpers/worker/workSubmit/getExtraDependencies.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const Experiment = require('../../../model/Experiment');
const getLastModified = require('../../s3/getLastModified');
const bucketNames = require('../../../../config/bucketNames');
const getS3Object = require('../../s3/getObject');
const getCellSetsAffectingDownsampling = require('./getCellSetsAffectingDownsampling');

const getClusteringSettings = async (experimentId) => {
const {
Expand Down Expand Up @@ -41,25 +42,6 @@ const getCellSetsLastVersion = async (experimentId) => {
};


const getCellSetsThatAffectDownsampling = async (_experimentId, body, cellSets) => {
// If not downsampling, then there's no dependency set by this getter
if (!body.downsampleSettings) return '';

const { selectedCellSet, groupedTracks } = body.downsampleSettings;

const selectedCellSetKeys = cellSets
.find(({ key }) => key === selectedCellSet)
.children.map(({ key }) => key);

const groupedCellSetKeys = cellSets
.filter(({ key }) => groupedTracks.includes(key))
.flatMap(({ children }) => children)
.map(({ key }) => key);

// Keep them in separate lists, they each represent different changes in the settings
return [selectedCellSetKeys, groupedCellSetKeys];
};

const dependencyGetters = {
ClusterCells: [],
ScTypeAnnotate: [],
Expand All @@ -76,7 +58,7 @@ const dependencyGetters = {
GetNGenes: [],
GetNUmis: [],
MarkerHeatmap: [
getClusteringSettings, getCellSetsThatAffectDownsampling,
getClusteringSettings, getCellSetsAffectingDownsampling,
],
GetTrajectoryAnalysisStartingNodes: [getClusteringSettings],
GetTrajectoryAnalysisPseudoTime: [getClusteringSettings, getEmbeddingSettings],
Expand Down
12 changes: 3 additions & 9 deletions src/api.v2/helpers/worker/workSubmit/submitEmbeddingWork.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,21 @@
const submitWork = require('./submitWork');
const submitWorkForHook = require('./submitWorkForHook');

const submitEmbeddingWork = async (message) => {
const {
experimentId, input:
{ authJWT, config: { embeddingSettings: { method, methodSettings, useSaved } } },
{ authJWT, config: { embeddingSettings: { method, methodSettings } } },
} = message;

const embeddingConfig = methodSettings[method];

const body = {
name: 'GetEmbedding',
type: method,
useSaved,
config: embeddingConfig,
};

// these values need to match explicitly the default ones defined in the UI at
// src/utils/work/fetchWork.js when calling the function generateETag if this file
// or the one in the UI has any default changed, the pre-computing of embeddings/marker heatmp
// will stop working as the ETags will no longer match.
const extraDependencies = [];

const ETag = await submitWork(experimentId, authJWT, body, extraDependencies);
const ETag = await submitWorkForHook(experimentId, authJWT, body);

// explicitly return ETag to make it stand out more in tests and so harder to break
return ETag;
Expand Down
17 changes: 13 additions & 4 deletions src/api.v2/helpers/worker/workSubmit/submitMarkerHeatmapWork.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
const getExtraDependencies = require('./getExtraDependencies');
const submitWork = require('./submitWork');
const getCellSetsAffectingDownsampling = require('./getCellSetsAffectingDownsampling');
const submitWorkForHook = require('./submitWorkForHook');
const bucketNames = require('../../../../config/bucketNames');
const getObject = require('../../s3/getObject');

const submitMarkerHeatmapWork = async (message) => {
const { experimentId, input: { authJWT } } = message;

const { cellSets } = JSON.parse(await getObject({
Bucket: bucketNames.CELL_SETS,
Key: experimentId,
}));

const body = {
name: 'MarkerHeatmap',
nGenes: 5,
Expand All @@ -15,9 +22,11 @@ const submitMarkerHeatmapWork = async (message) => {
},
};

const extraDependencies = await getExtraDependencies(experimentId, body.name, body);
const cs = await getCellSetsAffectingDownsampling(experimentId, body, cellSets);

body.downsampleSettings.cellSets = cs;

const ETag = await submitWork(experimentId, authJWT, body, extraDependencies);
const ETag = await submitWorkForHook(experimentId, authJWT, body);

// explicitly return ETag to make it stand out more in tests and so harder to break
return ETag;
Expand Down
48 changes: 0 additions & 48 deletions src/api.v2/helpers/worker/workSubmit/submitWork.js

This file was deleted.

33 changes: 33 additions & 0 deletions src/api.v2/helpers/worker/workSubmit/submitWorkForHook.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
const validateAndSubmitWork = require('../../../events/validateAndSubmitWork');
const generateETag = require('../generateEtag');


const submitWorkForHook = async (experimentId, authJWT, body) => {
const extras = undefined;

const data = {
experimentId,
body,
extras,
};

const ETag = await generateETag(data);
const now = new Date();
const timeout = 15 * 60 * 1000; // 15min in ms
const timeoutDate = new Date(now.getTime() + timeout);
const request = {
ETag,
socketId: 'randomID',
experimentId,
authJWT,
timeout: timeoutDate.toISOString(),
body,
};

await validateAndSubmitWork(request);

// explicitly return ETag to make it stand out more in tests and so harder to break
return ETag;
};

module.exports = submitWorkForHook;
Original file line number Diff line number Diff line change
@@ -1,27 +1,5 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`submitWorkEmbedding submits the work and the ETag / params are correct 1`] = `
Array [
Array [
Object {
"body": Object {
"config": Object {
"distanceMetric": "cosine",
"minimumDistance": 0.3,
},
"name": "GetEmbedding",
"type": "umap",
"useSaved": undefined,
},
"cacheUniquenessKey": null,
"experimentId": "6463cb35-3e08-4e94-a181-6d155a5ca570",
"extraDependencies": Array [],
"extras": undefined,
"qcPipelineStartDate": "2016-03-22T04:00:00.000Z",
"workerVersion": 4,
},
],
]
`;
exports[`submitWorkEmbedding submits the work and the ETag / params are correct 1`] = `Array []`;

exports[`submitWorkEmbedding submits the work and the ETag / params are correct 2`] = `"5c144d6e44aa4e09497a4bc5b12a285c"`;
exports[`submitWorkEmbedding submits the work and the ETag / params are correct 2`] = `"163301323ab0ee82902d1574a5dcd061"`;
Original file line number Diff line number Diff line change
@@ -1,31 +1,5 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`submitWorkEmbedding submits the work and the ETag / params are correct 1`] = `
Array [
Array [
Object {
"body": Object {
"downsampleSettings": Object {
"groupedTracks": Array [
"louvain",
"sample",
],
"hiddenCellSets": Array [],
"selectedCellSet": "louvain",
"selectedPoints": "All",
},
"nGenes": 5,
"name": "MarkerHeatmap",
},
"cacheUniquenessKey": null,
"experimentId": "6463cb35-3e08-4e94-a181-6d155a5ca570",
"extraDependencies": undefined,
"extras": undefined,
"qcPipelineStartDate": "2016-03-22T04:00:00.000Z",
"workerVersion": 4,
},
],
]
`;
exports[`submitWorkEmbedding submits the work and the ETag / params are correct 1`] = `Array []`;

exports[`submitWorkEmbedding submits the work and the ETag / params are correct 2`] = `"31a417d5acf6614a1c041bbfefab49cb"`;
exports[`submitWorkEmbedding submits the work and the ETag / params are correct 2`] = `"b37d4df92a6f1d2a3dbb6ec29960d612"`;
11 changes: 10 additions & 1 deletion tests/api.v2/helpers/worker/submitEmbeddingWork.test.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
const AWSMock = require('aws-sdk-mock');

const createObjectHash = require('../../../../src/api.v2/helpers/worker/createObjectHash');
const submitEmbeddingWork = require('../../../../src/api.v2/helpers/worker/workSubmit/submitEmbeddingWork');
const validateAndSubmitWork = require('../../../../src/api.v2/events/validateAndSubmitWork');
const { mockS3GetObject } = require('../../../test-utils/mockAWSServices');


jest.mock('../../../../src/api.v2/helpers/worker/createObjectHash');
jest.mock('../../../../src/api.v2/helpers/pipeline/getPipelineStatus');
jest.mock('../../../../src/api.v2/helpers/worker/getWorkerStatus');
jest.mock('../../../../src/api.v2/events/validateAndSubmitWork');


const message = {
experimentId: '6463cb35-3e08-4e94-a181-6d155a5ca570',
taskName: 'configureEmbedding',
Expand Down Expand Up @@ -44,7 +46,14 @@ const message = {
};

describe('submitWorkEmbedding', () => {
beforeEach(() => {
AWSMock.restore();
process.env.USE_CACHE = 'true';
});

it('submits the work and the ETag / params are correct', async () => {
mockS3GetObject({ Body: '{}' });

const ETag = await submitEmbeddingWork(message);

expect(createObjectHash.mock.calls).toMatchSnapshot();
Expand Down
3 changes: 1 addition & 2 deletions tests/api.v2/helpers/worker/submitMarkerHeatmapWork.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ const { mockS3GetObject } = require('../../../test-utils/mockAWSServices');


jest.mock('../../../../src/api.v2/helpers/worker/workSubmit/getExtraDependencies');


jest.mock('../../../../src/api.v2/helpers/worker/createObjectHash');
jest.mock('../../../../src/api.v2/helpers/pipeline/getPipelineStatus');
jest.mock('../../../../src/api.v2/helpers/worker/getWorkerStatus');
Expand Down Expand Up @@ -74,6 +72,7 @@ const mockCellSets = {
describe('submitWorkEmbedding', () => {
beforeEach(() => {
AWSMock.restore();
process.env.USE_CACHE = 'true';
});

it('submits the work and the ETag / params are correct', async () => {
Expand Down

0 comments on commit 5879ba8

Please sign in to comment.