diff --git a/packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts b/packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts index 9400c62284..75fec49cf4 100644 --- a/packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts +++ b/packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts @@ -21,6 +21,8 @@ import Bull, { } from 'bullmq'; import { EventEmitter } from 'events'; import { Cluster, Redis, RedisOptions } from 'ioredis'; +import { Subject } from 'rxjs'; +import { filter, takeUntil } from 'rxjs/operators'; import { ALL_JOB_TYPES, BULLMQ_PLUGIN_OPTIONS, loggerCtx } from './constants'; import { RedisHealthIndicator } from './redis-health-indicator'; @@ -45,6 +47,10 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy { private workerProcessor: Processor; private options: BullMQPluginOptions; private queueNameProcessFnMap = new Map Promise>(); + private cancellationSub: Redis; + private cancelRunningJob$ = new Subject(); + private readonly CANCEL_JOB_CHANNEL = 'cancel-job'; + private readonly CANCELLED_JOB_LIST_NAME = 'vendure:cancelled-jobs'; async init(injector: Injector): Promise { const options = injector.get(BULLMQ_PLUGIN_OPTIONS); @@ -109,18 +115,38 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy { const processFn = this.queueNameProcessFnMap.get(queueName); if (processFn) { const job = await this.createVendureJob(bullJob); + const completed$ = new Subject(); try { // eslint-disable-next-line job.on('progress', _job => bullJob.updateProgress(_job.progress)); + + this.cancelRunningJob$ + .pipe( + filter(jobId => jobId === job.id), + takeUntil(completed$), + ) + .subscribe(() => { + Logger.info(`Cancelling job ${job.id ?? ''}`, loggerCtx); + job.cancel(); + }); const result = await processFn(job); + await bullJob.updateProgress(100); return result; } catch (e: any) { throw e; + } finally { + if (job.id) { + await this.redisConnection.srem(this.CANCELLED_JOB_LIST_NAME, job.id?.toString()); + } + completed$.next(); + completed$.complete(); } } throw new InternalServerError(`No processor defined for the queue "${queueName}"`); }; + // Subscription-mode Redis connection for the cancellation messages + this.cancellationSub = new Redis(this.connectionOptions as RedisOptions); } async destroy() { @@ -144,17 +170,18 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy { const bullJob = await this.queue.getJob(jobId); if (bullJob) { if (await bullJob.isActive()) { - // Not yet possible in BullMQ, see - // https://github.com/taskforcesh/bullmq/issues/632 - throw new InternalServerError('Cannot cancel a running job'); - } - try { - await bullJob.remove(); + await this.setActiveJobAsCancelled(jobId); return this.createVendureJob(bullJob); - } catch (e: any) { - const message = `Error when cancelling job: ${JSON.stringify(e.message)}`; - Logger.error(message, loggerCtx); - throw new InternalServerError(message); + } else { + try { + const job = await this.createVendureJob(bullJob); + await bullJob.remove(); + return job; + } catch (e: any) { + const message = `Error when cancelling job: ${JSON.stringify(e.message)}`; + Logger.error(message, loggerCtx); + throw new InternalServerError(message); + } } } } @@ -262,23 +289,32 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy { this.worker = new Worker(QUEUE_NAME, this.workerProcessor, options) .on('error', e => Logger.error(`BullMQ Worker error: ${e.message}`, loggerCtx, e.stack)) .on('closing', e => Logger.verbose(`BullMQ Worker closing: ${e}`, loggerCtx)) - .on('closed', () => Logger.verbose('BullMQ Worker closed')) + .on('closed', () => Logger.verbose('BullMQ Worker closed', loggerCtx)) .on('failed', (job: Bull.Job | undefined, error) => { Logger.warn( `Job ${job?.id ?? '(unknown id)'} [${job?.name ?? 'unknown name'}] failed (attempt ${ job?.attemptsMade ?? 'unknown' } of ${job?.opts.attempts ?? 1})`, + loggerCtx, ); }) .on('stalled', (jobId: string) => { Logger.warn(`BullMQ Worker: job ${jobId} stalled`, loggerCtx); }) .on('completed', (job: Bull.Job) => { - Logger.debug(`Job ${job?.id ?? 'unknown id'} [${job.name}] completed`); + Logger.debug(`Job ${job?.id ?? 'unknown id'} [${job.name}] completed`, loggerCtx); }); + await this.cancellationSub.subscribe(this.CANCEL_JOB_CHANNEL); + this.cancellationSub.on('message', this.subscribeToCancellationEvents); } } + private subscribeToCancellationEvents = (channel: string, jobId: string) => { + if (channel === this.CANCEL_JOB_CHANNEL && jobId) { + this.cancelRunningJob$.next(jobId); + } + }; + private stopped = false; async stop = object>( @@ -288,13 +324,44 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy { if (!this.stopped) { this.stopped = true; try { - await Promise.all([this.queue.close(), this.worker.close()]); + Logger.info(`Closing worker`, loggerCtx); + + let timer: NodeJS.Timeout; + const checkActive = async () => { + const activeCount = await this.queue.getActiveCount(); + if (0 < activeCount) { + const activeJobs = await this.queue.getActive(); + Logger.info( + `Waiting on ${activeCount} active ${ + activeCount > 1 ? 'jobs' : 'job' + } (${activeJobs.map(j => j.id).join(', ')})...`, + loggerCtx, + ); + timer = setTimeout(checkActive, 2000); + } + }; + timer = setTimeout(checkActive, 2000); + + await this.worker.close(); + Logger.info(`Worker closed`, loggerCtx); + await this.queue.close(); + clearTimeout(timer); + Logger.info(`Queue closed`, loggerCtx); + this.cancellationSub.off('message', this.subscribeToCancellationEvents); } catch (e: any) { Logger.error(e, loggerCtx, e.stack); } } } + private async setActiveJobAsCancelled(jobId: string) { + // Not yet possible natively in BullMQ, see + // https://github.com/taskforcesh/bullmq/issues/632 + // So we have our own custom method of marking a job as cancelled. + await this.redisConnection.publish(this.CANCEL_JOB_CHANNEL, jobId); + await this.redisConnection.sadd(this.CANCELLED_JOB_LIST_NAME, jobId.toString()); + } + private async createVendureJob(bullJob: Bull.Job): Promise { const jobJson = bullJob.toJSON(); return new Job({ @@ -314,13 +381,19 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy { } private async getState(bullJob: Bull.Job): Promise { - const jobJson = bullJob.toJSON(); + const jobId = bullJob.id?.toString(); if ((await bullJob.isWaiting()) || (await bullJob.isWaitingChildren())) { return JobState.PENDING; } if (await bullJob.isActive()) { - return JobState.RUNNING; + const isCancelled = + jobId && (await this.redisConnection.sismember(this.CANCELLED_JOB_LIST_NAME, jobId)); + if (isCancelled) { + return JobState.CANCELLED; + } else { + return JobState.RUNNING; + } } if (await bullJob.isDelayed()) { return JobState.RETRYING; @@ -331,9 +404,6 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy { if (await bullJob.isCompleted()) { return JobState.COMPLETED; } - if (!jobJson.finishedOn) { - return JobState.CANCELLED; - } throw new InternalServerError('Could not determine job state'); // TODO: how to handle "cancelled" state? Currently when we cancel a job, we simply remove all record of it. } diff --git a/packages/job-queue-plugin/src/bullmq/types.ts b/packages/job-queue-plugin/src/bullmq/types.ts index 2109e44e25..e8d1cf549a 100644 --- a/packages/job-queue-plugin/src/bullmq/types.ts +++ b/packages/job-queue-plugin/src/bullmq/types.ts @@ -26,14 +26,14 @@ export interface BullMQPluginOptions { * Queue instance. * See the [BullMQ QueueOptions docs](https://github.com/taskforcesh/bullmq/blob/master/docs/gitbook/api/bullmq.queueoptions.md) */ - queueOptions?: Exclude; + queueOptions?: Omit; /** * @description * Additional options used when instantiating the BullMQ * Worker instance. * See the [BullMQ WorkerOptions docs](https://github.com/taskforcesh/bullmq/blob/master/docs/gitbook/api/bullmq.workeroptions.md) */ - workerOptions?: Exclude; + workerOptions?: Omit; /** * @description * When a job is added to the JobQueue using `JobQueue.add()`, the calling