Skip to content

Commit

Permalink
feat(api): add canceling to processing jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
Dyostiq committed Aug 10, 2021
1 parent 8bc1c32 commit 2c4e879
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import { ApiProperty } from '@nestjs/swagger';
import { IsIn, IsUUID } from 'class-validator';
import { API_EVENT_KINDS } from '@marxan/api-events';
import {
ScenarioGeofeatureDataV1Alpha,
ScenarioGeofeatureEvents,
} from '../events-data/scenario-geofeature-data-v1-alpha';
import { API_EVENT_KINDS, ScenarioGeofeatureEvents } from '@marxan/api-events';
import { ScenarioGeofeatureDataV1Alpha } from '../events-data/scenario-geofeature-data-v1-alpha';
import { ValuesType } from 'utility-types';

// it's guarded by typing, no mismatch possible
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
import { API_EVENT_KINDS } from '@marxan/api-events';
import { ScenarioGeofeatureEvents } from '@marxan/api-events';
import { ValuesType } from 'utility-types';

export type ScenarioGeofeatureEvents = Pick<
typeof API_EVENT_KINDS,
Extract<
keyof typeof API_EVENT_KINDS,
`scenario__geofeature${`Copy` | `Split` | `Stratification`}${string}`
>
>;

export class ScenarioGeofeatureDataV1Alpha {
kind!: ValuesType<ScenarioGeofeatureEvents>;
featureId!: string;
Expand Down
106 changes: 77 additions & 29 deletions api/apps/api/src/modules/geo-features/processing/run.service.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
import { Inject } from '@nestjs/common';
import { Queue } from 'bullmq';
import { Job, Queue } from 'bullmq';
import {
CopyJobData,
FeaturesJobData,
FeaturesJobProgress,
SplitJobData,
StratificationJobData,
} from '@marxan/geofeature-calculations';
import { API_EVENT_KINDS } from '@marxan/api-events';
import {
API_EVENT_KINDS,
ScenarioGeofeatureEventValues,
} from '@marxan/api-events';
import { ApiEventsService } from '@marxan-api/modules/api-events/api-events.service';
import {
copyQueueToken,
splitQueueToken,
stratificationQueueToken,
} from './queue-providers';
import { isDefined } from '@marxan/utils';
import { left, right, Either } from 'fp-ts/Either';
export const notFound = Symbol('not found');
export type NotFound = typeof notFound;

export class RunService {
constructor(
Expand All @@ -25,39 +34,49 @@ export class RunService {
) {}

async runCopy(data: CopyJobData) {
const job = await this.copyQueue.add(`run`, data);
const kind =
API_EVENT_KINDS.scenario__geofeatureCopy__submitted__v1__alpha1;
await this.apiEvents.create({
externalId: job.id + kind,
kind,
topic: data.scenarioId,
data: {
kind,
featureId: data.featureId,
},
});
await this.run(
this.copyQueue,
data,
API_EVENT_KINDS.scenario__geofeatureCopy__submitted__v1__alpha1,
);
}

async cancelCopy(data: FeaturesJobData): Promise<Either<NotFound, void>> {
return await this.cancel(this.copyQueue, data);
}

async runSplit(data: SplitJobData) {
const job = await this.splitQueue.add(`run`, data);
const kind =
API_EVENT_KINDS.scenario__geofeatureSplit__submitted__v1__alpha1;
await this.apiEvents.create({
externalId: job.id + kind,
kind,
topic: data.scenarioId,
data: {
kind,
featureId: data.featureId,
},
});
await this.run(
this.splitQueue,
data,
API_EVENT_KINDS.scenario__geofeatureSplit__submitted__v1__alpha1,
);
}

async cancelSplit(data: FeaturesJobData): Promise<Either<NotFound, void>> {
return await this.cancel(this.splitQueue, data);
}

async runStratification(data: StratificationJobData) {
const job = await this.stratificationQueue.add(`run`, data);
const kind =
API_EVENT_KINDS.scenario__geofeatureStratification__submitted__v1__alpha1;
await this.run(
this.stratificationQueue,
data,
API_EVENT_KINDS.scenario__geofeatureStratification__submitted__v1__alpha1,
);
}

async cancelStratification(
data: FeaturesJobData,
): Promise<Either<NotFound, void>> {
return await this.cancel(this.stratificationQueue, data);
}

private async run(
queue: Queue<FeaturesJobData>,
data: FeaturesJobData,
kind: ScenarioGeofeatureEventValues,
) {
const job = await queue.add(`run`, data);
await this.apiEvents.create({
externalId: job.id + kind,
kind,
Expand All @@ -68,4 +87,33 @@ export class RunService {
},
});
}

private async cancel(
queue: Queue<FeaturesJobData>,
data: FeaturesJobData,
): Promise<Either<NotFound, void>> {
const activeJobs: Job<FeaturesJobData>[] = await queue.getJobs([
'active',
'waiting',
]);
const job = activeJobs.find(
(job) =>
job.data.featureId === data.featureId &&
job.data.scenarioId === data.scenarioId,
);
if (!isDefined(job)) return left(notFound);

if (await job.isWaiting()) await job.remove();
if (await job.isActive()) {
const cancellingProgress: FeaturesJobProgress = {
type: 'canceled',
canceled: true,
featureId: data.featureId,
scenarioId: data.scenarioId,
};
await job.updateProgress(cancellingProgress);
}

return right(void 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ import { WorkerModule } from '@marxan-geoprocessing/modules/worker';
import {
copyQueueNameProvider,
copyWorkerBuilderProvider,
copyQueueEventsProvider,
splitQueueNameProvider,
splitWorkerBuilderProvider,
splitQueueEventsProvider,
stratificationQueueNameProvider,
stratificationWorkerBuilderProvider,
stratificationQueueEventsProvider,
} from './processing/worker-builder.providers';
import { ProcessingWorker } from './processing/processing.worker';

Expand All @@ -26,10 +29,13 @@ import { ProcessingWorker } from './processing/processing.worker';
FeatureService,
copyQueueNameProvider,
copyWorkerBuilderProvider,
copyQueueEventsProvider,
splitQueueNameProvider,
splitWorkerBuilderProvider,
splitQueueEventsProvider,
stratificationQueueNameProvider,
stratificationWorkerBuilderProvider,
stratificationQueueEventsProvider,
ProcessingWorker,
],
controllers: [FeaturesController],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import { Inject, Injectable } from '@nestjs/common';
import { Job, Worker } from 'bullmq';
import { Job, QueueEvents, Worker } from 'bullmq';
import {
CopyJobData,
FeaturesJobCancelProgress,
FeaturesJobProgress,
SplitJobData,
StratificationJobData,
} from '@marxan/geofeature-calculations';
import { WorkerBuilder } from '@marxan-geoprocessing/modules/worker';
import {
copyQueueEventsToken,
copyQueueNameToken,
copyWorkerBuilderToken,
splitQueueEventsToken,
splitQueueNameToken,
splitWorkerBuilderToken,
stratificationQueueEventsToken,
stratificationQueueNameToken,
stratificationWorkerBuilderToken,
} from './worker-builder.providers';
Expand All @@ -31,36 +36,82 @@ export class ProcessingWorker {
stratificationQueueName: string,
@Inject(stratificationWorkerBuilderToken)
stratificationWorkerBuilder: WorkerBuilder,
@Inject(copyQueueEventsToken)
copyQueueEvents: QueueEvents,
@Inject(splitQueueEventsToken)
splitQueueEvents: QueueEvents,
@Inject(stratificationQueueEventsToken)
stratificationQueueEvents: QueueEvents,
) {
this.copyWorker = copyWorkerBuilder.build<CopyJobData, void>(
copyQueueName,
{
process: this.copyProcess.bind(this),
},
);
copyQueueEvents.on(
`progress`,
async ({ data }: { data: FeaturesJobProgress }) => {
if (this.isCancel(data)) await this.cancelCopy(data);
},
);
this.splitWorker = splitWorkerBuilder.build<SplitJobData, void>(
splitQueueName,
{
process: this.splitProcess.bind(this),
},
);
splitQueueEvents.on(
`progress`,
async ({ data }: { data: FeaturesJobProgress }) => {
if (this.isCancel(data)) await this.cancelSplit(data);
},
);
this.stratificationWorker = stratificationWorkerBuilder.build<
StratificationJobData,
void
>(stratificationQueueName, {
process: this.stratificationProcess.bind(this),
});
stratificationQueueEvents.on(
`progress`,
async ({ data }: { data: FeaturesJobProgress }) => {
if (this.isCancel(data)) await this.cancelStratification(data);
},
);
}

private async copyProcess(_job: Job<CopyJobData>): Promise<void> {
//
}

async copyProcess(_job: Job<CopyJobData>): Promise<void> {
private async splitProcess(_job: Job<SplitJobData>): Promise<void> {
//
}

async splitProcess(_job: Job<SplitJobData>): Promise<void> {
private async stratificationProcess(
_job: Job<StratificationJobData>,
): Promise<void> {
//
}

async stratificationProcess(_job: Job<StratificationJobData>): Promise<void> {
private async cancelCopy(_data: FeaturesJobCancelProgress): Promise<void> {
//
}

private async cancelSplit(_data: FeaturesJobCancelProgress): Promise<void> {
//
}

private async cancelStratification(
_data: FeaturesJobCancelProgress,
): Promise<void> {
//
}

private isCancel(
data: FeaturesJobProgress,
): data is FeaturesJobCancelProgress {
return data.type === 'canceled' && data.canceled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import {
stratificationQueueName,
} from '@marxan/geofeature-calculations';
import { WorkerBuilder } from '@marxan-geoprocessing/modules/worker';
import { QueueEvents } from 'bullmq';
import { QueueEventsBuilder } from '@marxan-api/modules/queue';

export const copyQueueNameToken = Symbol('copy queue token');
export const copyQueueNameProvider: ValueProvider<string> = {
Expand All @@ -18,6 +20,14 @@ export const copyWorkerBuilderProvider: FactoryProvider<WorkerBuilder> = {
inject: [WorkerBuilder],
};

export const copyQueueEventsToken = Symbol('copy queue events token');
export const copyQueueEventsProvider: FactoryProvider<QueueEvents> = {
provide: copyQueueEventsToken,
useFactory: (queueEventsBuilder: QueueEventsBuilder, queueName: string) =>
queueEventsBuilder.buildQueueEvents(queueName),
inject: [QueueEventsBuilder, copyQueueNameToken],
};

export const splitQueueNameToken = Symbol('split queue token');
export const splitQueueNameProvider: ValueProvider<string> = {
provide: splitQueueNameToken,
Expand All @@ -30,6 +40,14 @@ export const splitWorkerBuilderProvider: FactoryProvider<WorkerBuilder> = {
inject: [WorkerBuilder],
};

export const splitQueueEventsToken = Symbol('split queue events token');
export const splitQueueEventsProvider: FactoryProvider<QueueEvents> = {
provide: splitQueueEventsToken,
useFactory: (queueEventsBuilder: QueueEventsBuilder, queueName: string) =>
queueEventsBuilder.buildQueueEvents(queueName),
inject: [QueueEventsBuilder, splitQueueNameToken],
};

export const stratificationQueueNameToken = Symbol(
'stratification queue token',
);
Expand All @@ -45,3 +63,13 @@ export const stratificationWorkerBuilderProvider: FactoryProvider<WorkerBuilder>
useFactory: (builder: WorkerBuilder) => builder,
inject: [WorkerBuilder],
};

export const stratificationQueueEventsToken = Symbol(
'stratification queue events token',
);
export const stratificationQueueEventsProvider: FactoryProvider<QueueEvents> = {
provide: stratificationQueueEventsToken,
useFactory: (queueEventsBuilder: QueueEventsBuilder, queueName: string) =>
queueEventsBuilder.buildQueueEvents(queueName),
inject: [QueueEventsBuilder, stratificationQueueNameToken],
};
12 changes: 12 additions & 0 deletions api/libs/api-events/src/api-event-kinds.enum.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { ValuesType } from 'utility-types';

export enum API_EVENT_KINDS {
user__signedUp__v1alpha1 = 'user.signedUp/v1alpha1',
user__accountActivationTokenGenerated__v1alpha1 = 'user.accountActivationTokenGenerated/v1alpha1',
Expand Down Expand Up @@ -38,3 +40,13 @@ export type ScenarioEvents = Pick<
typeof API_EVENT_KINDS,
Extract<keyof typeof API_EVENT_KINDS, `scenario__${string}`>
>;

export type ScenarioGeoFeatureEventKeys = Extract<
keyof typeof API_EVENT_KINDS,
`scenario__geofeature${`Copy` | `Split` | `Stratification`}${string}`
>;
export type ScenarioGeofeatureEvents = Pick<
typeof API_EVENT_KINDS,
ScenarioGeoFeatureEventKeys
>;
export type ScenarioGeofeatureEventValues = ValuesType<ScenarioGeofeatureEvents>;
2 changes: 1 addition & 1 deletion api/libs/api-events/src/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export { API_EVENT_KINDS } from './api-event-kinds.enum';
export * from './api-event-kinds.enum';
10 changes: 10 additions & 0 deletions api/libs/geofeature-calculations/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ export interface FeaturesJobData {
scenarioId: string;
}

export type FeaturesJobCancelProgress = {
type: 'canceled';
canceled: boolean;
} & FeaturesJobData;
export type FeaturesJobProgress =
| {
type: 'empty';
}
| FeaturesJobCancelProgress;

export type CopyJobData = FeaturesJobData;
export type SplitJobData = FeaturesJobData;
export type StratificationJobData = FeaturesJobData;

0 comments on commit 2c4e879

Please sign in to comment.