diff --git a/api/apps/api/src/modules/api-events/dto/scenario-geofeature-data-v1-alpha.ts b/api/apps/api/src/modules/api-events/dto/scenario-geofeature-data-v1-alpha.ts index 244d7a481e..0193870979 100644 --- a/api/apps/api/src/modules/api-events/dto/scenario-geofeature-data-v1-alpha.ts +++ b/api/apps/api/src/modules/api-events/dto/scenario-geofeature-data-v1-alpha.ts @@ -1,10 +1,7 @@ import { ApiProperty } from '@nestjs/swagger'; import { IsIn, IsUUID } from 'class-validator'; -import { API_EVENT_KINDS } from '@marxan/api-events'; -import { - ScenarioGeofeatureDataV1Alpha, - ScenarioGeofeatureEvents, -} from '../events-data/scenario-geofeature-data-v1-alpha'; +import { API_EVENT_KINDS, ScenarioGeofeatureEvents } from '@marxan/api-events'; +import { ScenarioGeofeatureDataV1Alpha } from '../events-data/scenario-geofeature-data-v1-alpha'; import { ValuesType } from 'utility-types'; // it's guarded by typing, no mismatch possible diff --git a/api/apps/api/src/modules/api-events/events-data/scenario-geofeature-data-v1-alpha.ts b/api/apps/api/src/modules/api-events/events-data/scenario-geofeature-data-v1-alpha.ts index 48783948e8..b60ab8a4b8 100644 --- a/api/apps/api/src/modules/api-events/events-data/scenario-geofeature-data-v1-alpha.ts +++ b/api/apps/api/src/modules/api-events/events-data/scenario-geofeature-data-v1-alpha.ts @@ -1,14 +1,6 @@ -import { API_EVENT_KINDS } from '@marxan/api-events'; +import { ScenarioGeofeatureEvents } from '@marxan/api-events'; import { ValuesType } from 'utility-types'; -export type ScenarioGeofeatureEvents = Pick< - typeof API_EVENT_KINDS, - Extract< - keyof typeof API_EVENT_KINDS, - `scenario__geofeature${`Copy` | `Split` | `Stratification`}${string}` - > ->; - export class ScenarioGeofeatureDataV1Alpha { kind!: ValuesType; featureId!: string; diff --git a/api/apps/api/src/modules/geo-features/processing/run.service.ts b/api/apps/api/src/modules/geo-features/processing/run.service.ts index 460932b005..141911d34c 100644 --- a/api/apps/api/src/modules/geo-features/processing/run.service.ts +++ b/api/apps/api/src/modules/geo-features/processing/run.service.ts @@ -1,17 +1,26 @@ import { Inject } from '@nestjs/common'; -import { Queue } from 'bullmq'; +import { Job, Queue } from 'bullmq'; import { CopyJobData, + FeaturesJobData, + FeaturesJobProgress, SplitJobData, StratificationJobData, } from '@marxan/geofeature-calculations'; -import { API_EVENT_KINDS } from '@marxan/api-events'; +import { + API_EVENT_KINDS, + ScenarioGeofeatureEventValues, +} from '@marxan/api-events'; import { ApiEventsService } from '@marxan-api/modules/api-events/api-events.service'; import { copyQueueToken, splitQueueToken, stratificationQueueToken, } from './queue-providers'; +import { isDefined } from '@marxan/utils'; +import { left, right, Either } from 'fp-ts/Either'; +export const notFound = Symbol('not found'); +export type NotFound = typeof notFound; export class RunService { constructor( @@ -25,39 +34,49 @@ export class RunService { ) {} async runCopy(data: CopyJobData) { - const job = await this.copyQueue.add(`run`, data); - const kind = - API_EVENT_KINDS.scenario__geofeatureCopy__submitted__v1__alpha1; - await this.apiEvents.create({ - externalId: job.id + kind, - kind, - topic: data.scenarioId, - data: { - kind, - featureId: data.featureId, - }, - }); + await this.run( + this.copyQueue, + data, + API_EVENT_KINDS.scenario__geofeatureCopy__submitted__v1__alpha1, + ); + } + + async cancelCopy(data: FeaturesJobData): Promise> { + return await this.cancel(this.copyQueue, data); } async runSplit(data: SplitJobData) { - const job = await this.splitQueue.add(`run`, data); - const kind = - API_EVENT_KINDS.scenario__geofeatureSplit__submitted__v1__alpha1; - await this.apiEvents.create({ - externalId: job.id + kind, - kind, - topic: data.scenarioId, - data: { - kind, - featureId: data.featureId, - }, - }); + await this.run( + this.splitQueue, + data, + API_EVENT_KINDS.scenario__geofeatureSplit__submitted__v1__alpha1, + ); + } + + async cancelSplit(data: FeaturesJobData): Promise> { + return await this.cancel(this.splitQueue, data); } async runStratification(data: StratificationJobData) { - const job = await this.stratificationQueue.add(`run`, data); - const kind = - API_EVENT_KINDS.scenario__geofeatureStratification__submitted__v1__alpha1; + await this.run( + this.stratificationQueue, + data, + API_EVENT_KINDS.scenario__geofeatureStratification__submitted__v1__alpha1, + ); + } + + async cancelStratification( + data: FeaturesJobData, + ): Promise> { + return await this.cancel(this.stratificationQueue, data); + } + + private async run( + queue: Queue, + data: FeaturesJobData, + kind: ScenarioGeofeatureEventValues, + ) { + const job = await queue.add(`run`, data); await this.apiEvents.create({ externalId: job.id + kind, kind, @@ -68,4 +87,33 @@ export class RunService { }, }); } + + private async cancel( + queue: Queue, + data: FeaturesJobData, + ): Promise> { + const activeJobs: Job[] = await queue.getJobs([ + 'active', + 'waiting', + ]); + const job = activeJobs.find( + (job) => + job.data.featureId === data.featureId && + job.data.scenarioId === data.scenarioId, + ); + if (!isDefined(job)) return left(notFound); + + if (await job.isWaiting()) await job.remove(); + if (await job.isActive()) { + const cancellingProgress: FeaturesJobProgress = { + type: 'canceled', + canceled: true, + featureId: data.featureId, + scenarioId: data.scenarioId, + }; + await job.updateProgress(cancellingProgress); + } + + return right(void 0); + } } diff --git a/api/apps/geoprocessing/src/modules/features/features.module.ts b/api/apps/geoprocessing/src/modules/features/features.module.ts index 1af0383a10..865bf67c6a 100644 --- a/api/apps/geoprocessing/src/modules/features/features.module.ts +++ b/api/apps/geoprocessing/src/modules/features/features.module.ts @@ -9,10 +9,13 @@ import { WorkerModule } from '@marxan-geoprocessing/modules/worker'; import { copyQueueNameProvider, copyWorkerBuilderProvider, + copyQueueEventsProvider, splitQueueNameProvider, splitWorkerBuilderProvider, + splitQueueEventsProvider, stratificationQueueNameProvider, stratificationWorkerBuilderProvider, + stratificationQueueEventsProvider, } from './processing/worker-builder.providers'; import { ProcessingWorker } from './processing/processing.worker'; @@ -26,10 +29,13 @@ import { ProcessingWorker } from './processing/processing.worker'; FeatureService, copyQueueNameProvider, copyWorkerBuilderProvider, + copyQueueEventsProvider, splitQueueNameProvider, splitWorkerBuilderProvider, + splitQueueEventsProvider, stratificationQueueNameProvider, stratificationWorkerBuilderProvider, + stratificationQueueEventsProvider, ProcessingWorker, ], controllers: [FeaturesController], diff --git a/api/apps/geoprocessing/src/modules/features/processing/processing.worker.ts b/api/apps/geoprocessing/src/modules/features/processing/processing.worker.ts index d51ab747aa..1298d95836 100644 --- a/api/apps/geoprocessing/src/modules/features/processing/processing.worker.ts +++ b/api/apps/geoprocessing/src/modules/features/processing/processing.worker.ts @@ -1,16 +1,21 @@ import { Inject, Injectable } from '@nestjs/common'; -import { Job, Worker } from 'bullmq'; +import { Job, QueueEvents, Worker } from 'bullmq'; import { CopyJobData, + FeaturesJobCancelProgress, + FeaturesJobProgress, SplitJobData, StratificationJobData, } from '@marxan/geofeature-calculations'; import { WorkerBuilder } from '@marxan-geoprocessing/modules/worker'; import { + copyQueueEventsToken, copyQueueNameToken, copyWorkerBuilderToken, + splitQueueEventsToken, splitQueueNameToken, splitWorkerBuilderToken, + stratificationQueueEventsToken, stratificationQueueNameToken, stratificationWorkerBuilderToken, } from './worker-builder.providers'; @@ -31,6 +36,12 @@ export class ProcessingWorker { stratificationQueueName: string, @Inject(stratificationWorkerBuilderToken) stratificationWorkerBuilder: WorkerBuilder, + @Inject(copyQueueEventsToken) + copyQueueEvents: QueueEvents, + @Inject(splitQueueEventsToken) + splitQueueEvents: QueueEvents, + @Inject(stratificationQueueEventsToken) + stratificationQueueEvents: QueueEvents, ) { this.copyWorker = copyWorkerBuilder.build( copyQueueName, @@ -38,29 +49,69 @@ export class ProcessingWorker { process: this.copyProcess.bind(this), }, ); + copyQueueEvents.on( + `progress`, + async ({ data }: { data: FeaturesJobProgress }) => { + if (this.isCancel(data)) await this.cancelCopy(data); + }, + ); this.splitWorker = splitWorkerBuilder.build( splitQueueName, { process: this.splitProcess.bind(this), }, ); + splitQueueEvents.on( + `progress`, + async ({ data }: { data: FeaturesJobProgress }) => { + if (this.isCancel(data)) await this.cancelSplit(data); + }, + ); this.stratificationWorker = stratificationWorkerBuilder.build< StratificationJobData, void >(stratificationQueueName, { process: this.stratificationProcess.bind(this), }); + stratificationQueueEvents.on( + `progress`, + async ({ data }: { data: FeaturesJobProgress }) => { + if (this.isCancel(data)) await this.cancelStratification(data); + }, + ); + } + + private async copyProcess(_job: Job): Promise { + // } - async copyProcess(_job: Job): Promise { + private async splitProcess(_job: Job): Promise { // } - async splitProcess(_job: Job): Promise { + private async stratificationProcess( + _job: Job, + ): Promise { // } - async stratificationProcess(_job: Job): Promise { + private async cancelCopy(_data: FeaturesJobCancelProgress): Promise { // } + + private async cancelSplit(_data: FeaturesJobCancelProgress): Promise { + // + } + + private async cancelStratification( + _data: FeaturesJobCancelProgress, + ): Promise { + // + } + + private isCancel( + data: FeaturesJobProgress, + ): data is FeaturesJobCancelProgress { + return data.type === 'canceled' && data.canceled; + } } diff --git a/api/apps/geoprocessing/src/modules/features/processing/worker-builder.providers.ts b/api/apps/geoprocessing/src/modules/features/processing/worker-builder.providers.ts index 792a331c88..6628329e31 100644 --- a/api/apps/geoprocessing/src/modules/features/processing/worker-builder.providers.ts +++ b/api/apps/geoprocessing/src/modules/features/processing/worker-builder.providers.ts @@ -5,6 +5,8 @@ import { stratificationQueueName, } from '@marxan/geofeature-calculations'; import { WorkerBuilder } from '@marxan-geoprocessing/modules/worker'; +import { QueueEvents } from 'bullmq'; +import { QueueEventsBuilder } from '@marxan-api/modules/queue'; export const copyQueueNameToken = Symbol('copy queue token'); export const copyQueueNameProvider: ValueProvider = { @@ -18,6 +20,14 @@ export const copyWorkerBuilderProvider: FactoryProvider = { inject: [WorkerBuilder], }; +export const copyQueueEventsToken = Symbol('copy queue events token'); +export const copyQueueEventsProvider: FactoryProvider = { + provide: copyQueueEventsToken, + useFactory: (queueEventsBuilder: QueueEventsBuilder, queueName: string) => + queueEventsBuilder.buildQueueEvents(queueName), + inject: [QueueEventsBuilder, copyQueueNameToken], +}; + export const splitQueueNameToken = Symbol('split queue token'); export const splitQueueNameProvider: ValueProvider = { provide: splitQueueNameToken, @@ -30,6 +40,14 @@ export const splitWorkerBuilderProvider: FactoryProvider = { inject: [WorkerBuilder], }; +export const splitQueueEventsToken = Symbol('split queue events token'); +export const splitQueueEventsProvider: FactoryProvider = { + provide: splitQueueEventsToken, + useFactory: (queueEventsBuilder: QueueEventsBuilder, queueName: string) => + queueEventsBuilder.buildQueueEvents(queueName), + inject: [QueueEventsBuilder, splitQueueNameToken], +}; + export const stratificationQueueNameToken = Symbol( 'stratification queue token', ); @@ -45,3 +63,13 @@ export const stratificationWorkerBuilderProvider: FactoryProvider useFactory: (builder: WorkerBuilder) => builder, inject: [WorkerBuilder], }; + +export const stratificationQueueEventsToken = Symbol( + 'stratification queue events token', +); +export const stratificationQueueEventsProvider: FactoryProvider = { + provide: stratificationQueueEventsToken, + useFactory: (queueEventsBuilder: QueueEventsBuilder, queueName: string) => + queueEventsBuilder.buildQueueEvents(queueName), + inject: [QueueEventsBuilder, stratificationQueueNameToken], +}; diff --git a/api/libs/api-events/src/api-event-kinds.enum.ts b/api/libs/api-events/src/api-event-kinds.enum.ts index 030caabef1..e3d27709f8 100644 --- a/api/libs/api-events/src/api-event-kinds.enum.ts +++ b/api/libs/api-events/src/api-event-kinds.enum.ts @@ -1,3 +1,5 @@ +import { ValuesType } from 'utility-types'; + export enum API_EVENT_KINDS { user__signedUp__v1alpha1 = 'user.signedUp/v1alpha1', user__accountActivationTokenGenerated__v1alpha1 = 'user.accountActivationTokenGenerated/v1alpha1', @@ -38,3 +40,13 @@ export type ScenarioEvents = Pick< typeof API_EVENT_KINDS, Extract >; + +export type ScenarioGeoFeatureEventKeys = Extract< + keyof typeof API_EVENT_KINDS, + `scenario__geofeature${`Copy` | `Split` | `Stratification`}${string}` +>; +export type ScenarioGeofeatureEvents = Pick< + typeof API_EVENT_KINDS, + ScenarioGeoFeatureEventKeys +>; +export type ScenarioGeofeatureEventValues = ValuesType; diff --git a/api/libs/api-events/src/index.ts b/api/libs/api-events/src/index.ts index ba0e594b6e..0693236354 100644 --- a/api/libs/api-events/src/index.ts +++ b/api/libs/api-events/src/index.ts @@ -1 +1 @@ -export { API_EVENT_KINDS } from './api-event-kinds.enum'; +export * from './api-event-kinds.enum'; diff --git a/api/libs/geofeature-calculations/src/index.ts b/api/libs/geofeature-calculations/src/index.ts index 9a41fc546d..d4e5030cc3 100644 --- a/api/libs/geofeature-calculations/src/index.ts +++ b/api/libs/geofeature-calculations/src/index.ts @@ -7,6 +7,16 @@ export interface FeaturesJobData { scenarioId: string; } +export type FeaturesJobCancelProgress = { + type: 'canceled'; + canceled: boolean; +} & FeaturesJobData; +export type FeaturesJobProgress = + | { + type: 'empty'; + } + | FeaturesJobCancelProgress; + export type CopyJobData = FeaturesJobData; export type SplitJobData = FeaturesJobData; export type StratificationJobData = FeaturesJobData;