Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BIOMAGE-1036] - Run QC after GEM2S #134

Merged
merged 3 commits into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 17 additions & 20 deletions src/api/route-services/gem2s-response.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
const AWSXRay = require('aws-xray-sdk');

const getPipelineStatus = require('../general-services/pipeline-status');
const constants = require('../general-services/pipeline-manage/constants');
const validateRequest = require('../../utils/schema-validator');
const saveProcessingConfigFromGem2s = require('../../utils/hooks/saveProcessingConfigFromGem2s');
const runQCPipeline = require('../../utils/hooks/runQCPipeline');

const logger = require('../../utils/logging');
const validateRequest = require('../../utils/schema-validator');

const ExperimentService = require('./experiment');
const SamplesService = require('./samples');
const PipelineHook = require('../../utils/hookRunner');

const experimentService = new ExperimentService();
const samplesService = new SamplesService();
const getPipelineStatus = require('../general-services/pipeline-status');
const pipelineHook = new PipelineHook();


pipelineHook.register('uploadToAWS', [saveProcessingConfigFromGem2s, runQCPipeline]);

const sendUpdateToSubscribed = async (experimentId, message, io) => {
const statusRes = await getPipelineStatus(experimentId, constants.GEM2S_PROCESS_NAME);
Expand Down Expand Up @@ -38,21 +42,14 @@ const gem2sResponse = async (io, message) => {
// Fail hard if there was an error.
await validateRequest(message, 'GEM2SResponse.v1.yaml');

const { experimentId } = message;

if (!message.table) {
await sendUpdateToSubscribed(experimentId, message, io);
return;
}

const { item, table: tableName } = message;
const {
experimentId, taskName, item,
} = message;

if (tableName.includes('experiments')) {
await experimentService.updateExperiment(experimentId, item);
} else if (tableName.includes('samples')) {
const { projectUuid } = item;
await samplesService.updateSamples(projectUuid, item);
}
pipelineHook.run(taskName, {
experimentId,
item,
});
aerlaut marked this conversation as resolved.
Show resolved Hide resolved

await sendUpdateToSubscribed(experimentId, message, io);
};
Expand Down
11 changes: 9 additions & 2 deletions src/utils/hookRunner.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ class hookRunner {

register(taskName, callback) {
if (this.hooks[taskName] === undefined) this.hooks[taskName] = [];
this.hooks[taskName].push(callback);

if (Array.isArray(callback)) {
this.hooks[taskName].push(...callback);
} else {
this.hooks[taskName].push(callback);
}

this.results[taskName] = [];
}

Expand All @@ -15,7 +21,8 @@ class hookRunner {
|| this.hooks[taskName].length === 0
) { return null; }

// Manual looping is done to prevent passing function in hooks[taskName] into a callback
// Manual looping is done to prevent passing function in hooks[taskName] into a callback,
// which might cause scoping issues
for (let idx = 0; idx < this.hooks[taskName].length; idx += 1) {
if (this.hooks[taskName][idx].constructor.name === 'AsyncFunction') {
aerlaut marked this conversation as resolved.
Show resolved Hide resolved
// eslint-disable-next-line no-await-in-loop
Expand Down
14 changes: 8 additions & 6 deletions src/utils/hooks/clusteringWorkRequest.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,25 @@ const constants = require('../../api/general-services/pipeline-manage/constants'
const workRequestBuilder = require('../workRequestBuilder');
aerlaut marked this conversation as resolved.
Show resolved Hide resolved

const clusteringWorkRequest = async (payload) => {
const { experimentId, output, statusRes } = payload;

// Run work request for cell clustering
const clusteringWorkConfig = {
experimentId: payload.experimentId,
experimentId,
body: {
name: 'ClusterCells',
cellSetName: 'Louvain clusters',
cellSetKey: 'louvain',
type: payload.output.config.clusteringSettings.method,
config: payload.output.config.clusteringSettings.methodSettings[
payload.output.config.clusteringSettings.method
type: output.config.clusteringSettings.method,
config: output.config.clusteringSettings.methodSettings[
output.config.clusteringSettings.method
],
},
PipelineRunETag: payload.statusRes[constants.OLD_QC_NAME_TO_BE_REMOVED].startDate,
PipelineRunETag: statusRes[constants.OLD_QC_NAME_TO_BE_REMOVED].startDate,
};

const workRequest = await workRequestBuilder('ClusterCells', clusteringWorkConfig);
return workRequest;
workRequest.submitWork();
};

module.exports = clusteringWorkRequest;
12 changes: 7 additions & 5 deletions src/utils/hooks/embeddingWorkRequest.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ const constants = require('../../api/general-services/pipeline-manage/constants'
const workRequestBuilder = require('../workRequestBuilder');

const embeddingWorkRequest = async (payload) => {
const { experimentId, output, statusRes } = payload;

// Run work request for embedding
const embeddingWorkConfig = {
experimentId: payload.experimentId,
experimentId,
body: {
name: 'GetEmbedding',
type: payload.output.config.embeddingSettings.method,
config: payload.output.config.embeddingSettings.methodSettings[
payload.output.config.embeddingSettings.method
type: output.config.embeddingSettings.method,
config: output.config.embeddingSettings.methodSettings[
output.config.embeddingSettings.method
],
},
PipelineRunETag: payload.statusRes[constants.OLD_QC_NAME_TO_BE_REMOVED].startDate,
PipelineRunETag: statusRes[constants.OLD_QC_NAME_TO_BE_REMOVED].startDate,
};

const workRequest = await workRequestBuilder('GetEmbedding', embeddingWorkConfig);
Expand Down
14 changes: 14 additions & 0 deletions src/utils/hooks/runQCPipeline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
const ExperimentService = require('../../api/route-services/experiment');

const { createQCPipeline } = require('../../api/general-services/pipeline-manage');

const experimentService = new ExperimentService();

const runQCPipeline = async (payload) => {
const { experimentId } = payload;

const qcHandle = await createQCPipeline(experimentId);
await experimentService.saveQCHandle(experimentId, qcHandle);
};

module.exports = runQCPipeline;
13 changes: 13 additions & 0 deletions src/utils/hooks/saveProcessingConfigFromGem2s.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
const ExperimentService = require('../../api/route-services/experiment');

const experimentService = new ExperimentService();

const saveProcessingConfigFromGem2s = async (payload) => {
const { experimentId, item } = payload;

if (!item) return;

await experimentService.updateExperiment(experimentId, item);
};

module.exports = saveProcessingConfigFromGem2s;
14 changes: 13 additions & 1 deletion tests/utils/hookRunner.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ describe('HookRunner', () => {
expect(runner.hooks).toEqual({});
});

it('should register hooks properly', () => {
it('should register single hooks properly', () => {
const testFn = () => true;

const runner = new HookRunner();
Expand All @@ -19,6 +19,18 @@ describe('HookRunner', () => {
expect(runner.hooks.test.length).toEqual(2);
});

it('should register array of hooks properly', () => {
const testFn1 = () => true;
const testFn2 = () => true;

const runner = new HookRunner();

runner.register('test', [testFn1, testFn2]);

expect(Object.keys(runner.hooks).length).toEqual(1);
expect(runner.hooks.test.length).toEqual(2);
});


it('should run hooks registered to the right event', async () => {
const fn1 = () => 1;
Expand Down