Skip to content

Commit

Permalink
refactor(api): use scenario's blm in run service
Browse files Browse the repository at this point in the history
  • Loading branch information
Dyostiq authored and kgajowy committed Jul 28, 2021
1 parent 93a7d60 commit 2dc03e7
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 157 deletions.
37 changes: 37 additions & 0 deletions api/apps/api/src/modules/scenarios/marxan-run/cancel.handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { Inject, Injectable } from '@nestjs/common';
import { Job, Queue } from 'bullmq';
import { Either, left, right } from 'fp-ts/Either';
import { JobData } from '@marxan/scenario-run-queue';
import { isDefined } from '@marxan/utils';
import { runQueueToken } from './tokens';

export const notFound = Symbol('not found');
export type NotFound = typeof notFound;

@Injectable()
export class CancelHandler {
constructor(
@Inject(runQueueToken)
private readonly queue: Queue<JobData>,
) {}

async cancel(scenarioId: string): Promise<Either<NotFound, void>> {
const activeJobs: Job<JobData>[] = await this.queue.getJobs([
'active',
'waiting',
]);
const scenarioJob = activeJobs.find(
(job) => job.data.scenarioId === scenarioId,
);
if (!isDefined(scenarioJob)) return left(notFound);

if (await scenarioJob.isActive())
await scenarioJob.updateProgress({
canceled: true,
scenarioId,
});
else if (await scenarioJob.isWaiting()) await scenarioJob.remove();

return right(void 0);
}
}
86 changes: 86 additions & 0 deletions api/apps/api/src/modules/scenarios/marxan-run/events.handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { Inject, Injectable } from '@nestjs/common';
import { Job, Queue, QueueEvents } from 'bullmq';
import { JobData, ProgressData } from '@marxan/scenario-run-queue';
import { API_EVENT_KINDS } from '@marxan/api-events';
import { assertDefined } from '@marxan/utils';
import { ApiEventsService } from '@marxan-api/modules/api-events/api-events.service';
import { ScenarioRunProgressV1Alpha1DTO } from '@marxan-api/modules/api-events/dto/scenario-run-progress-v1-alpha-1';
import { runEventsToken, runQueueToken } from './tokens';

@Injectable()
export class EventsHandler {
constructor(
@Inject(runQueueToken)
private readonly queue: Queue<JobData>,
@Inject(runEventsToken)
queueEvents: QueueEvents,
private readonly apiEvents: ApiEventsService,
) {
queueEvents.on(`completed`, ({ jobId }, eventId) =>
this.handleFinished(jobId, eventId),
);
queueEvents.on(`failed`, ({ jobId }, eventId) =>
this.handleFailed(jobId, eventId),
);
queueEvents.on(
`progress`,
async (
{ jobId, data }: { data: ProgressData; jobId: string },
eventId,
) => {
await this.handleProgress(jobId, eventId, data);
},
);
}

private async handleProgress(
jobId: string,
eventId: string,
progress: ProgressData | null,
) {
if (
typeof progress !== 'object' ||
progress === null ||
!('fractionalProgress' in progress)
)
return;
const job = await this.getJob(jobId);
const kind = API_EVENT_KINDS.scenario__run__progress__v1__alpha1;
const eventData: ScenarioRunProgressV1Alpha1DTO = {
kind,
fractionalProgress: progress.fractionalProgress,
};
await this.apiEvents.createIfNotExists({
topic: job.data.scenarioId,
kind,
externalId: eventId,
data: eventData,
});
}

private async handleFinished(jobId: string, eventId: string) {
const job = await this.getJob(jobId);
const kind = API_EVENT_KINDS.scenario__run__finished__v1__alpha1;
await this.apiEvents.createIfNotExists({
topic: job.data.scenarioId,
kind,
externalId: eventId,
});
}

private async handleFailed(jobId: string, eventId: string) {
const job = await this.getJob(jobId);
const kind = API_EVENT_KINDS.scenario__run__failed__v1__alpha1;
await this.apiEvents.createIfNotExists({
topic: job.data.scenarioId,
kind,
externalId: eventId,
});
}

private async getJob(jobId: string): Promise<Job<JobData>> {
const job = await this.queue.getJob(jobId);
assertDefined(job);
return job;
}
}
4 changes: 3 additions & 1 deletion api/apps/api/src/modules/scenarios/marxan-run/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
export { RunService, notFound, NotFound } from './run.service';
export { RunService } from './run.service';
export { MarxanRunModule } from './marxan-run.module';
export { NotFound } from './cancel.handler';
export { notFound } from './cancel.handler';
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import {
runQueueEventsProvider,
runQueueProvider,
} from './run-service.providers';
import { RunHandler } from './run.handler';
import { CancelHandler } from './cancel.handler';
import { EventsHandler } from './events.handler';

@Module({
imports: [
Expand All @@ -21,6 +24,9 @@ import {
InputFilesModule,
],
providers: [
RunHandler,
CancelHandler,
EventsHandler,
runQueueProvider,
runQueueEventsProvider,
blmDefaultProvider,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import { FactoryProvider } from '@nestjs/common';
import { Queue, QueueEvents } from 'bullmq';
import { JobData, queueName } from '@marxan/scenario-run-queue';
import { QueueBuilder, QueueEventsBuilder } from '@marxan-api/modules/queue';
import {
blmDefaultToken,
runEventsToken,
runQueueToken,
} from '@marxan-api/modules/scenarios/marxan-run/run.service';
import { MarxanParametersDefaults } from '@marxan/marxan-input';
import { QueueBuilder, QueueEventsBuilder } from '@marxan-api/modules/queue';
import { blmDefaultToken, runEventsToken, runQueueToken } from './tokens';

export const runQueueProvider: FactoryProvider<Queue<JobData>> = {
provide: runQueueToken,
Expand Down
52 changes: 52 additions & 0 deletions api/apps/api/src/modules/scenarios/marxan-run/run.handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { Inject, Injectable } from '@nestjs/common';
import { Queue } from 'bullmq';
import { Repository } from 'typeorm';
import { InjectRepository } from '@nestjs/typeorm';

import { JobData } from '@marxan/scenario-run-queue';
import { API_EVENT_KINDS } from '@marxan/api-events';
import { assertDefined } from '@marxan/utils';
import { ApiEventsService } from '@marxan-api/modules/api-events/api-events.service';
import { Scenario } from '../scenario.api.entity';
import { blmDefaultToken, runQueueToken } from './tokens';
import { AssetsService } from './assets.service';

@Injectable()
export class RunHandler {
constructor(
@Inject(runQueueToken)
private readonly queue: Queue<JobData>,
private readonly apiEvents: ApiEventsService,
@InjectRepository(Scenario)
private readonly scenarios: Repository<Scenario>,
private readonly assets: AssetsService,
@Inject(blmDefaultToken)
private readonly blmDefault: number,
) {}

async run(
scenario: {
id: string;
boundaryLengthModifier?: number;
},
overridingBlm?: number,
): Promise<void> {
const blm =
overridingBlm ?? scenario.boundaryLengthModifier ?? this.blmDefault;
const assets = await this.assets.forScenario(scenario.id, blm);
assertDefined(assets);
const job = await this.queue.add(`run-scenario`, {
scenarioId: scenario.id,
assets,
});
await this.scenarios.update(scenario.id, {
ranAtLeastOnce: true,
});
const kind = API_EVENT_KINDS.scenario__run__submitted__v1__alpha1;
await this.apiEvents.create({
topic: scenario.id,
kind,
externalId: job.id + kind,
});
}
}
42 changes: 29 additions & 13 deletions api/apps/api/src/modules/scenarios/marxan-run/run.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@ import { ProgressData } from '@marxan/scenario-run-queue';
import { ApiEventsService } from '@marxan-api/modules/api-events/api-events.service';
import { CreateApiEventDTO } from '@marxan-api/modules/api-events/dto/create.api-event.dto';
import { Scenario } from '../scenario.api.entity';
import {
blmDefaultToken,
notFound,
runEventsToken,
runQueueToken,
RunService,
} from './run.service';
import { RunService } from './run.service';
import { AssetsService } from './assets.service';
import { blmDefaultToken, runEventsToken, runQueueToken } from './tokens';
import { RunHandler } from './run.handler';
import { CancelHandler, notFound } from './cancel.handler';
import { EventsHandler } from './events.handler';

let fixtures: PromiseType<ReturnType<typeof getFixtures>>;
let runService: RunService;
Expand All @@ -32,7 +30,7 @@ test(`scheduling job`, async () => {
fixtures.GivenAssetsAvailable();

// when
await runService.run('scenario-1');
await runService.run({ id: 'scenario-1' });

// then
fixtures.ThenShouldUpdateScenario();
Expand All @@ -41,28 +39,43 @@ test(`scheduling job`, async () => {
fixtures.ThenShouldUseDefaultBlm();
});

test(`scheduling job with blm`, async () => {
fixtures.setupMocksForSchedulingJobs();
test(`scheduling job with overriding blm`, async () => {
fixtures.setupMocksForSchedulingJobs(() => `1234`);
// given
fixtures.GivenAssetsAvailable();

// when
await runService.run('scenario-1', -123);
await runService.run({ id: 'scenario-1', boundaryLengthModifier: 78 }, -123);

// then
fixtures.ThenShouldUpdateScenario();
fixtures.ThenShouldEmitSubmittedEvent();
fixtures.ThenShouldEmitSubmittedEvent(`1234`);
fixtures.ThenShouldAddJob();
fixtures.ThenShouldUseBlm(-123);
});

test(`scheduling job with scenario that has blm`, async () => {
fixtures.setupMocksForSchedulingJobs(() => `1234`);
// given
fixtures.GivenAssetsAvailable();

// when
await runService.run({ id: 'scenario-1', boundaryLengthModifier: 78 });

// then
fixtures.ThenShouldUpdateScenario();
fixtures.ThenShouldEmitSubmittedEvent(`1234`);
fixtures.ThenShouldAddJob();
fixtures.ThenShouldUseBlm(78);
});

test(`scheduling job for scenario without assets`, async () => {
fixtures.setupMocksForSchedulingJobs(() => `12345`);
// given
fixtures.GivenAssetsNotAvailable();

// when
const result = runService.run('scenario-1');
const result = runService.run({ id: 'scenario-1' });

// then
await expect(result).rejects.toBeDefined();
Expand Down Expand Up @@ -174,6 +187,9 @@ async function getFixtures() {
};
const testingModule = await Test.createTestingModule({
providers: [
RunHandler,
CancelHandler,
EventsHandler,
{
provide: blmDefaultToken,
useValue: 42,
Expand Down
Loading

0 comments on commit 2dc03e7

Please sign in to comment.