Skip to content

Commit

Permalink
feat(job-queue-plugin): Implement cancellation mechanism
Browse files Browse the repository at this point in the history
Relates to #1127, relates to #2650. This commit adds a mechanism to
track cancellation of jobs in Redis using pub/sub on a custom
set which tracks the IDs which have been cancelled.

The job process functions will still continue to execute unless
there is specific logic to check the job state and throw an
error on cancellation.
  • Loading branch information
michaelbromley committed Feb 2, 2024
1 parent 6860b43 commit d0e97ca
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 20 deletions.
106 changes: 88 additions & 18 deletions packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import Bull, {
} from 'bullmq';
import { EventEmitter } from 'events';
import { Cluster, Redis, RedisOptions } from 'ioredis';
import { Subject } from 'rxjs';
import { filter, takeUntil } from 'rxjs/operators';

import { ALL_JOB_TYPES, BULLMQ_PLUGIN_OPTIONS, loggerCtx } from './constants';
import { RedisHealthIndicator } from './redis-health-indicator';
Expand All @@ -45,6 +47,10 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
private workerProcessor: Processor;
private options: BullMQPluginOptions;
private queueNameProcessFnMap = new Map<string, (job: Job) => Promise<any>>();
private cancellationSub: Redis;
private cancelRunningJob$ = new Subject<string>();
private readonly CANCEL_JOB_CHANNEL = 'cancel-job';
private readonly CANCELLED_JOB_LIST_NAME = 'vendure:cancelled-jobs';

async init(injector: Injector): Promise<void> {
const options = injector.get<BullMQPluginOptions>(BULLMQ_PLUGIN_OPTIONS);
Expand Down Expand Up @@ -109,18 +115,38 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
const processFn = this.queueNameProcessFnMap.get(queueName);
if (processFn) {
const job = await this.createVendureJob(bullJob);
const completed$ = new Subject<void>();
try {
// eslint-disable-next-line
job.on('progress', _job => bullJob.updateProgress(_job.progress));

this.cancelRunningJob$
.pipe(
filter(jobId => jobId === job.id),
takeUntil(completed$),
)
.subscribe(() => {
Logger.info(`Cancelling job ${job.id ?? ''}`, loggerCtx);
job.cancel();
});
const result = await processFn(job);

await bullJob.updateProgress(100);
return result;
} catch (e: any) {
throw e;
} finally {
if (job.id) {
await this.redisConnection.srem(this.CANCELLED_JOB_LIST_NAME, job.id?.toString());
}
completed$.next();
completed$.complete();
}
}
throw new InternalServerError(`No processor defined for the queue "${queueName}"`);
};
// Subscription-mode Redis connection for the cancellation messages
this.cancellationSub = new Redis(this.connectionOptions as RedisOptions);
}

async destroy() {
Expand All @@ -144,17 +170,18 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
const bullJob = await this.queue.getJob(jobId);
if (bullJob) {
if (await bullJob.isActive()) {
// Not yet possible in BullMQ, see
// https://github.com/taskforcesh/bullmq/issues/632
throw new InternalServerError('Cannot cancel a running job');
}
try {
await bullJob.remove();
await this.setActiveJobAsCancelled(jobId);
return this.createVendureJob(bullJob);
} catch (e: any) {
const message = `Error when cancelling job: ${JSON.stringify(e.message)}`;
Logger.error(message, loggerCtx);
throw new InternalServerError(message);
} else {
try {
const job = await this.createVendureJob(bullJob);
await bullJob.remove();
return job;
} catch (e: any) {
const message = `Error when cancelling job: ${JSON.stringify(e.message)}`;
Logger.error(message, loggerCtx);
throw new InternalServerError(message);
}
}
}
}
Expand Down Expand Up @@ -262,23 +289,32 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
this.worker = new Worker(QUEUE_NAME, this.workerProcessor, options)
.on('error', e => Logger.error(`BullMQ Worker error: ${e.message}`, loggerCtx, e.stack))
.on('closing', e => Logger.verbose(`BullMQ Worker closing: ${e}`, loggerCtx))
.on('closed', () => Logger.verbose('BullMQ Worker closed'))
.on('closed', () => Logger.verbose('BullMQ Worker closed', loggerCtx))
.on('failed', (job: Bull.Job | undefined, error) => {
Logger.warn(
`Job ${job?.id ?? '(unknown id)'} [${job?.name ?? 'unknown name'}] failed (attempt ${
job?.attemptsMade ?? 'unknown'
} of ${job?.opts.attempts ?? 1})`,
loggerCtx,
);
})
.on('stalled', (jobId: string) => {
Logger.warn(`BullMQ Worker: job ${jobId} stalled`, loggerCtx);
})
.on('completed', (job: Bull.Job) => {
Logger.debug(`Job ${job?.id ?? 'unknown id'} [${job.name}] completed`);
Logger.debug(`Job ${job?.id ?? 'unknown id'} [${job.name}] completed`, loggerCtx);
});
await this.cancellationSub.subscribe(this.CANCEL_JOB_CHANNEL);
this.cancellationSub.on('message', this.subscribeToCancellationEvents);
}
}

private subscribeToCancellationEvents = (channel: string, jobId: string) => {
if (channel === this.CANCEL_JOB_CHANNEL && jobId) {
this.cancelRunningJob$.next(jobId);
}
};

private stopped = false;

async stop<Data extends JobData<Data> = object>(
Expand All @@ -288,13 +324,44 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
if (!this.stopped) {
this.stopped = true;
try {
await Promise.all([this.queue.close(), this.worker.close()]);
Logger.info(`Closing worker`, loggerCtx);

let timer: NodeJS.Timeout;
const checkActive = async () => {
const activeCount = await this.queue.getActiveCount();
if (0 < activeCount) {
const activeJobs = await this.queue.getActive();
Logger.info(
`Waiting on ${activeCount} active ${
activeCount > 1 ? 'jobs' : 'job'
} (${activeJobs.map(j => j.id).join(', ')})...`,
loggerCtx,
);
timer = setTimeout(checkActive, 2000);
}
};
timer = setTimeout(checkActive, 2000);

await this.worker.close();
Logger.info(`Worker closed`, loggerCtx);
await this.queue.close();
clearTimeout(timer);
Logger.info(`Queue closed`, loggerCtx);
this.cancellationSub.off('message', this.subscribeToCancellationEvents);
} catch (e: any) {
Logger.error(e, loggerCtx, e.stack);
}
}
}

private async setActiveJobAsCancelled(jobId: string) {
// Not yet possible natively in BullMQ, see
// https://github.com/taskforcesh/bullmq/issues/632
// So we have our own custom method of marking a job as cancelled.
await this.redisConnection.publish(this.CANCEL_JOB_CHANNEL, jobId);
await this.redisConnection.sadd(this.CANCELLED_JOB_LIST_NAME, jobId.toString());
}

private async createVendureJob(bullJob: Bull.Job): Promise<Job> {
const jobJson = bullJob.toJSON();
return new Job({
Expand All @@ -314,13 +381,19 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
}

private async getState(bullJob: Bull.Job): Promise<JobState> {
const jobJson = bullJob.toJSON();
const jobId = bullJob.id?.toString();

if ((await bullJob.isWaiting()) || (await bullJob.isWaitingChildren())) {
return JobState.PENDING;
}
if (await bullJob.isActive()) {
return JobState.RUNNING;
const isCancelled =
jobId && (await this.redisConnection.sismember(this.CANCELLED_JOB_LIST_NAME, jobId));
if (isCancelled) {
return JobState.CANCELLED;
} else {
return JobState.RUNNING;
}
}
if (await bullJob.isDelayed()) {
return JobState.RETRYING;
Expand All @@ -331,9 +404,6 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
if (await bullJob.isCompleted()) {
return JobState.COMPLETED;
}
if (!jobJson.finishedOn) {
return JobState.CANCELLED;
}
throw new InternalServerError('Could not determine job state');
// TODO: how to handle "cancelled" state? Currently when we cancel a job, we simply remove all record of it.
}
Expand Down
4 changes: 2 additions & 2 deletions packages/job-queue-plugin/src/bullmq/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ export interface BullMQPluginOptions {
* Queue instance.
* See the [BullMQ QueueOptions docs](https://github.com/taskforcesh/bullmq/blob/master/docs/gitbook/api/bullmq.queueoptions.md)
*/
queueOptions?: Exclude<QueueOptions, 'connection'>;
queueOptions?: Omit<QueueOptions, 'connection'>;
/**
* @description
* Additional options used when instantiating the BullMQ
* Worker instance.
* See the [BullMQ WorkerOptions docs](https://github.com/taskforcesh/bullmq/blob/master/docs/gitbook/api/bullmq.workeroptions.md)
*/
workerOptions?: Exclude<WorkerOptions, 'connection'>;
workerOptions?: Omit<WorkerOptions, 'connection'>;
/**
* @description
* When a job is added to the JobQueue using `JobQueue.add()`, the calling
Expand Down

0 comments on commit d0e97ca

Please sign in to comment.