diff --git a/api/apps/api/src/modules/scenarios/marxan-run/cancel.handler.ts b/api/apps/api/src/modules/scenarios/marxan-run/cancel.handler.ts new file mode 100644 index 0000000000..ab07695e73 --- /dev/null +++ b/api/apps/api/src/modules/scenarios/marxan-run/cancel.handler.ts @@ -0,0 +1,37 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { Job, Queue } from 'bullmq'; +import { Either, left, right } from 'fp-ts/Either'; +import { JobData } from '@marxan/scenario-run-queue'; +import { isDefined } from '@marxan/utils'; +import { runQueueToken } from './tokens'; + +export const notFound = Symbol('not found'); +export type NotFound = typeof notFound; + +@Injectable() +export class CancelHandler { + constructor( + @Inject(runQueueToken) + private readonly queue: Queue, + ) {} + + async cancel(scenarioId: string): Promise> { + const activeJobs: Job[] = await this.queue.getJobs([ + 'active', + 'waiting', + ]); + const scenarioJob = activeJobs.find( + (job) => job.data.scenarioId === scenarioId, + ); + if (!isDefined(scenarioJob)) return left(notFound); + + if (await scenarioJob.isActive()) + await scenarioJob.updateProgress({ + canceled: true, + scenarioId, + }); + else if (await scenarioJob.isWaiting()) await scenarioJob.remove(); + + return right(void 0); + } +} diff --git a/api/apps/api/src/modules/scenarios/marxan-run/events.handler.ts b/api/apps/api/src/modules/scenarios/marxan-run/events.handler.ts new file mode 100644 index 0000000000..f4bd897506 --- /dev/null +++ b/api/apps/api/src/modules/scenarios/marxan-run/events.handler.ts @@ -0,0 +1,86 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { Job, Queue, QueueEvents } from 'bullmq'; +import { JobData, ProgressData } from '@marxan/scenario-run-queue'; +import { API_EVENT_KINDS } from '@marxan/api-events'; +import { assertDefined } from '@marxan/utils'; +import { ApiEventsService } from '@marxan-api/modules/api-events/api-events.service'; +import { ScenarioRunProgressV1Alpha1DTO } from '@marxan-api/modules/api-events/dto/scenario-run-progress-v1-alpha-1'; +import { runEventsToken, runQueueToken } from './tokens'; + +@Injectable() +export class EventsHandler { + constructor( + @Inject(runQueueToken) + private readonly queue: Queue, + @Inject(runEventsToken) + queueEvents: QueueEvents, + private readonly apiEvents: ApiEventsService, + ) { + queueEvents.on(`completed`, ({ jobId }, eventId) => + this.handleFinished(jobId, eventId), + ); + queueEvents.on(`failed`, ({ jobId }, eventId) => + this.handleFailed(jobId, eventId), + ); + queueEvents.on( + `progress`, + async ( + { jobId, data }: { data: ProgressData; jobId: string }, + eventId, + ) => { + await this.handleProgress(jobId, eventId, data); + }, + ); + } + + private async handleProgress( + jobId: string, + eventId: string, + progress: ProgressData | null, + ) { + if ( + typeof progress !== 'object' || + progress === null || + !('fractionalProgress' in progress) + ) + return; + const job = await this.getJob(jobId); + const kind = API_EVENT_KINDS.scenario__run__progress__v1__alpha1; + const eventData: ScenarioRunProgressV1Alpha1DTO = { + kind, + fractionalProgress: progress.fractionalProgress, + }; + await this.apiEvents.createIfNotExists({ + topic: job.data.scenarioId, + kind, + externalId: eventId, + data: eventData, + }); + } + + private async handleFinished(jobId: string, eventId: string) { + const job = await this.getJob(jobId); + const kind = API_EVENT_KINDS.scenario__run__finished__v1__alpha1; + await this.apiEvents.createIfNotExists({ + topic: job.data.scenarioId, + kind, + externalId: eventId, + }); + } + + private async handleFailed(jobId: string, eventId: string) { + const job = await this.getJob(jobId); + const kind = API_EVENT_KINDS.scenario__run__failed__v1__alpha1; + await this.apiEvents.createIfNotExists({ + topic: job.data.scenarioId, + kind, + externalId: eventId, + }); + } + + private async getJob(jobId: string): Promise> { + const job = await this.queue.getJob(jobId); + assertDefined(job); + return job; + } +} diff --git a/api/apps/api/src/modules/scenarios/marxan-run/index.ts b/api/apps/api/src/modules/scenarios/marxan-run/index.ts index 89fe93b9fd..489809238c 100644 --- a/api/apps/api/src/modules/scenarios/marxan-run/index.ts +++ b/api/apps/api/src/modules/scenarios/marxan-run/index.ts @@ -1,2 +1,4 @@ -export { RunService, notFound, NotFound } from './run.service'; +export { RunService } from './run.service'; export { MarxanRunModule } from './marxan-run.module'; +export { NotFound } from './cancel.handler'; +export { notFound } from './cancel.handler'; diff --git a/api/apps/api/src/modules/scenarios/marxan-run/marxan-run.module.ts b/api/apps/api/src/modules/scenarios/marxan-run/marxan-run.module.ts index 20207f6938..817110cd20 100644 --- a/api/apps/api/src/modules/scenarios/marxan-run/marxan-run.module.ts +++ b/api/apps/api/src/modules/scenarios/marxan-run/marxan-run.module.ts @@ -12,6 +12,9 @@ import { runQueueEventsProvider, runQueueProvider, } from './run-service.providers'; +import { RunHandler } from './run.handler'; +import { CancelHandler } from './cancel.handler'; +import { EventsHandler } from './events.handler'; @Module({ imports: [ @@ -21,6 +24,9 @@ import { InputFilesModule, ], providers: [ + RunHandler, + CancelHandler, + EventsHandler, runQueueProvider, runQueueEventsProvider, blmDefaultProvider, diff --git a/api/apps/api/src/modules/scenarios/marxan-run/run-service.providers.ts b/api/apps/api/src/modules/scenarios/marxan-run/run-service.providers.ts index fa43b8b01c..ab40f9115f 100644 --- a/api/apps/api/src/modules/scenarios/marxan-run/run-service.providers.ts +++ b/api/apps/api/src/modules/scenarios/marxan-run/run-service.providers.ts @@ -1,13 +1,9 @@ import { FactoryProvider } from '@nestjs/common'; import { Queue, QueueEvents } from 'bullmq'; import { JobData, queueName } from '@marxan/scenario-run-queue'; -import { QueueBuilder, QueueEventsBuilder } from '@marxan-api/modules/queue'; -import { - blmDefaultToken, - runEventsToken, - runQueueToken, -} from '@marxan-api/modules/scenarios/marxan-run/run.service'; import { MarxanParametersDefaults } from '@marxan/marxan-input'; +import { QueueBuilder, QueueEventsBuilder } from '@marxan-api/modules/queue'; +import { blmDefaultToken, runEventsToken, runQueueToken } from './tokens'; export const runQueueProvider: FactoryProvider> = { provide: runQueueToken, diff --git a/api/apps/api/src/modules/scenarios/marxan-run/run.handler.ts b/api/apps/api/src/modules/scenarios/marxan-run/run.handler.ts new file mode 100644 index 0000000000..40ed84b9c8 --- /dev/null +++ b/api/apps/api/src/modules/scenarios/marxan-run/run.handler.ts @@ -0,0 +1,52 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { Queue } from 'bullmq'; +import { Repository } from 'typeorm'; +import { InjectRepository } from '@nestjs/typeorm'; + +import { JobData } from '@marxan/scenario-run-queue'; +import { API_EVENT_KINDS } from '@marxan/api-events'; +import { assertDefined } from '@marxan/utils'; +import { ApiEventsService } from '@marxan-api/modules/api-events/api-events.service'; +import { Scenario } from '../scenario.api.entity'; +import { blmDefaultToken, runQueueToken } from './tokens'; +import { AssetsService } from './assets.service'; + +@Injectable() +export class RunHandler { + constructor( + @Inject(runQueueToken) + private readonly queue: Queue, + private readonly apiEvents: ApiEventsService, + @InjectRepository(Scenario) + private readonly scenarios: Repository, + private readonly assets: AssetsService, + @Inject(blmDefaultToken) + private readonly blmDefault: number, + ) {} + + async run( + scenario: { + id: string; + boundaryLengthModifier?: number; + }, + overridingBlm?: number, + ): Promise { + const blm = + overridingBlm ?? scenario.boundaryLengthModifier ?? this.blmDefault; + const assets = await this.assets.forScenario(scenario.id, blm); + assertDefined(assets); + const job = await this.queue.add(`run-scenario`, { + scenarioId: scenario.id, + assets, + }); + await this.scenarios.update(scenario.id, { + ranAtLeastOnce: true, + }); + const kind = API_EVENT_KINDS.scenario__run__submitted__v1__alpha1; + await this.apiEvents.create({ + topic: scenario.id, + kind, + externalId: job.id + kind, + }); + } +} diff --git a/api/apps/api/src/modules/scenarios/marxan-run/run.service.spec.ts b/api/apps/api/src/modules/scenarios/marxan-run/run.service.spec.ts index f4ba2a8fa2..53c6bd6bdf 100644 --- a/api/apps/api/src/modules/scenarios/marxan-run/run.service.spec.ts +++ b/api/apps/api/src/modules/scenarios/marxan-run/run.service.spec.ts @@ -9,14 +9,12 @@ import { ProgressData } from '@marxan/scenario-run-queue'; import { ApiEventsService } from '@marxan-api/modules/api-events/api-events.service'; import { CreateApiEventDTO } from '@marxan-api/modules/api-events/dto/create.api-event.dto'; import { Scenario } from '../scenario.api.entity'; -import { - blmDefaultToken, - notFound, - runEventsToken, - runQueueToken, - RunService, -} from './run.service'; +import { RunService } from './run.service'; import { AssetsService } from './assets.service'; +import { blmDefaultToken, runEventsToken, runQueueToken } from './tokens'; +import { RunHandler } from './run.handler'; +import { CancelHandler, notFound } from './cancel.handler'; +import { EventsHandler } from './events.handler'; let fixtures: PromiseType>; let runService: RunService; @@ -32,7 +30,7 @@ test(`scheduling job`, async () => { fixtures.GivenAssetsAvailable(); // when - await runService.run('scenario-1'); + await runService.run({ id: 'scenario-1' }); // then fixtures.ThenShouldUpdateScenario(); @@ -41,28 +39,43 @@ test(`scheduling job`, async () => { fixtures.ThenShouldUseDefaultBlm(); }); -test(`scheduling job with blm`, async () => { - fixtures.setupMocksForSchedulingJobs(); +test(`scheduling job with overriding blm`, async () => { + fixtures.setupMocksForSchedulingJobs(() => `1234`); // given fixtures.GivenAssetsAvailable(); // when - await runService.run('scenario-1', -123); + await runService.run({ id: 'scenario-1', boundaryLengthModifier: 78 }, -123); // then fixtures.ThenShouldUpdateScenario(); - fixtures.ThenShouldEmitSubmittedEvent(); + fixtures.ThenShouldEmitSubmittedEvent(`1234`); fixtures.ThenShouldAddJob(); fixtures.ThenShouldUseBlm(-123); }); +test(`scheduling job with scenario that has blm`, async () => { + fixtures.setupMocksForSchedulingJobs(() => `1234`); + // given + fixtures.GivenAssetsAvailable(); + + // when + await runService.run({ id: 'scenario-1', boundaryLengthModifier: 78 }); + + // then + fixtures.ThenShouldUpdateScenario(); + fixtures.ThenShouldEmitSubmittedEvent(`1234`); + fixtures.ThenShouldAddJob(); + fixtures.ThenShouldUseBlm(78); +}); + test(`scheduling job for scenario without assets`, async () => { fixtures.setupMocksForSchedulingJobs(() => `12345`); // given fixtures.GivenAssetsNotAvailable(); // when - const result = runService.run('scenario-1'); + const result = runService.run({ id: 'scenario-1' }); // then await expect(result).rejects.toBeDefined(); @@ -174,6 +187,9 @@ async function getFixtures() { }; const testingModule = await Test.createTestingModule({ providers: [ + RunHandler, + CancelHandler, + EventsHandler, { provide: blmDefaultToken, useValue: 42, diff --git a/api/apps/api/src/modules/scenarios/marxan-run/run.service.ts b/api/apps/api/src/modules/scenarios/marxan-run/run.service.ts index 09e032d536..89cc859c86 100644 --- a/api/apps/api/src/modules/scenarios/marxan-run/run.service.ts +++ b/api/apps/api/src/modules/scenarios/marxan-run/run.service.ts @@ -1,141 +1,16 @@ -import { Job, Queue, QueueEvents } from 'bullmq'; -import { Either, left, right } from 'fp-ts/Either'; -import { Inject, Injectable } from '@nestjs/common'; -import { Repository } from 'typeorm'; -import { InjectRepository } from '@nestjs/typeorm'; -import { assertDefined, isDefined } from '@marxan/utils'; -import { JobData, ProgressData } from '@marxan/scenario-run-queue'; -import { ScenarioRunProgressV1Alpha1DTO } from '@marxan-api/modules/api-events/dto/scenario-run-progress-v1-alpha-1'; -import { ApiEventsService } from '@marxan-api/modules/api-events/api-events.service'; -import { API_EVENT_KINDS } from '@marxan/api-events'; -import { Scenario } from '../scenario.api.entity'; -import { AssetsService } from './assets.service'; - -export const runQueueToken = Symbol('run queue token'); -export const runEventsToken = Symbol('run events token'); -export const blmDefaultToken = Symbol('blm default token'); - -export const notFound = Symbol('not found'); -export type NotFound = typeof notFound; +import { Injectable } from '@nestjs/common'; +import { RunHandler } from './run.handler'; +import { CancelHandler } from './cancel.handler'; +import { EventsHandler } from './events.handler'; @Injectable() export class RunService { constructor( - @Inject(runQueueToken) - private readonly queue: Queue, - @Inject(runEventsToken) - queueEvents: QueueEvents, - private readonly apiEvents: ApiEventsService, - @InjectRepository(Scenario) - private readonly scenarios: Repository, - private readonly assets: AssetsService, - @Inject(blmDefaultToken) - private readonly blmDefault: number, - ) { - queueEvents.on(`completed`, ({ jobId }, eventId) => - this.handleFinished(jobId, eventId), - ); - queueEvents.on(`failed`, ({ jobId }, eventId) => - this.handleFailed(jobId, eventId), - ); - queueEvents.on( - `progress`, - async ( - { jobId, data }: { data: ProgressData; jobId: string }, - eventId, - ) => { - await this.handleProgress(jobId, eventId, data); - }, - ); - } - - async run(scenarioId: string, blm?: number): Promise { - blm ??= this.blmDefault; - const assets = await this.assets.forScenario(scenarioId, blm); - assertDefined(assets); - const job = await this.queue.add(`run-scenario`, { - scenarioId, - assets, - }); - await this.scenarios.update(scenarioId, { - ranAtLeastOnce: true, - }); - const kind = API_EVENT_KINDS.scenario__run__submitted__v1__alpha1; - await this.apiEvents.create({ - topic: scenarioId, - kind, - externalId: job.id + kind, - }); - } - - async cancel(scenarioId: string): Promise> { - const activeJobs: Job[] = await this.queue.getJobs([ - 'active', - 'waiting', - ]); - const scenarioJob = activeJobs.find( - (job) => job.data.scenarioId === scenarioId, - ); - if (!isDefined(scenarioJob)) return left(notFound); - - if (await scenarioJob.isActive()) - await scenarioJob.updateProgress({ - canceled: true, - scenarioId, - }); - else if (await scenarioJob.isWaiting()) await scenarioJob.remove(); - - return right(void 0); - } - - private async handleProgress( - jobId: string, - eventId: string, - progress: ProgressData | null, - ) { - if ( - typeof progress !== 'object' || - progress === null || - !('fractionalProgress' in progress) - ) - return; - const job = await this.getJob(jobId); - const kind = API_EVENT_KINDS.scenario__run__progress__v1__alpha1; - const eventData: ScenarioRunProgressV1Alpha1DTO = { - kind, - fractionalProgress: progress.fractionalProgress, - }; - await this.apiEvents.createIfNotExists({ - topic: job.data.scenarioId, - kind, - externalId: eventId, - data: eventData, - }); - } - - private async handleFinished(jobId: string, eventId: string) { - const job = await this.getJob(jobId); - const kind = API_EVENT_KINDS.scenario__run__finished__v1__alpha1; - await this.apiEvents.createIfNotExists({ - topic: job.data.scenarioId, - kind, - externalId: eventId, - }); - } - - private async handleFailed(jobId: string, eventId: string) { - const job = await this.getJob(jobId); - const kind = API_EVENT_KINDS.scenario__run__failed__v1__alpha1; - await this.apiEvents.createIfNotExists({ - topic: job.data.scenarioId, - kind, - externalId: eventId, - }); - } + private readonly runHandler: RunHandler, + private readonly cancelHandler: CancelHandler, + private readonly _eventsHandler: EventsHandler, + ) {} - private async getJob(jobId: string): Promise> { - const job = await this.queue.getJob(jobId); - assertDefined(job); - return job; - } + run = this.runHandler.run.bind(this.runHandler); + cancel = this.cancelHandler.cancel.bind(this.cancelHandler); } diff --git a/api/apps/api/src/modules/scenarios/marxan-run/tokens.ts b/api/apps/api/src/modules/scenarios/marxan-run/tokens.ts new file mode 100644 index 0000000000..401289d31d --- /dev/null +++ b/api/apps/api/src/modules/scenarios/marxan-run/tokens.ts @@ -0,0 +1,3 @@ +export const blmDefaultToken = Symbol('blm default token'); +export const runQueueToken = Symbol('run queue token'); +export const runEventsToken = Symbol('run events token'); diff --git a/api/apps/api/src/modules/scenarios/scenarios.service.ts b/api/apps/api/src/modules/scenarios/scenarios.service.ts index f98be1879c..7d6cd53df8 100644 --- a/api/apps/api/src/modules/scenarios/scenarios.service.ts +++ b/api/apps/api/src/modules/scenarios/scenarios.service.ts @@ -10,6 +10,7 @@ import { FetchSpecification } from 'nestjs-base-service'; import { classToClass } from 'class-transformer'; import * as stream from 'stream'; import { isLeft } from 'fp-ts/Either'; +import { pick } from 'lodash'; import { MarxanInput, MarxanParameters } from '@marxan/marxan-input'; import { AppInfoDTO } from '@marxan-api/dto/info.dto'; @@ -181,8 +182,8 @@ export class ScenariosService { async run(scenarioId: string, blm?: number): Promise { const scenario = await this.assertScenario(scenarioId); await this.runService.run( - scenario.id, - blm ?? scenario.boundaryLengthModifier, + pick(scenario, 'id', 'boundaryLengthModifier'), + blm, ); }