From 6f7cc34acda54693fc3ab33cf9f209ecabfc45cb Mon Sep 17 00:00:00 2001 From: Michael Bromley Date: Thu, 23 Sep 2021 21:47:46 +0200 Subject: [PATCH] fix(job-queue-plugin): Add missing logging & backoff settings --- .../src/bullmq/bullmq-job-queue-strategy.ts | 29 +++++++++++++++---- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts b/packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts index 872311bd8e..dc97e718f1 100644 --- a/packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts +++ b/packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts @@ -51,7 +51,6 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy { ...options.queueOptions, connection: options.connection, }).on('error', (e: any) => Logger.error(`BullMQ Queue error: ${e.message}`, loggerCtx, e.stack)); - const client = await this.queue.client; if (await this.queue.isPaused()) { await this.queue.resume(); @@ -59,6 +58,11 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy { this.workerProcessor = async bullJob => { const queueName = bullJob.name; + Logger.debug( + `Job ${bullJob.id} [${queueName}] starting (attempt ${bullJob.attemptsMade + 1} of ${ + bullJob.opts.attempts ?? 1 + })`, + ); const processFn = this.queueNameProcessFnMap.get(queueName); if (processFn) { const job = this.createVendureJob(bullJob); @@ -82,7 +86,11 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy { async add = {}>(job: Job): Promise> { const bullJob = await this.queue.add(job.queueName, job.data, { - attempts: job.retries, + attempts: job.retries + 1, + backoff: { + delay: 1000, + type: 'exponential', + }, }); return this.createVendureJob(bullJob); } @@ -187,9 +195,20 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy { ...this.options.workerOptions, connection: this.options.connection, }; - this.worker = new Worker(QUEUE_NAME, this.workerProcessor, options).on('error', (e: any) => - Logger.error(`BullMQ Worker error: ${e.message}`, loggerCtx, e.stack), - ); + this.worker = new Worker(QUEUE_NAME, this.workerProcessor, options) + .on('error', (e: any) => + Logger.error(`BullMQ Worker error: ${e.message}`, loggerCtx, e.stack), + ) + .on('failed', (job: Bull.Job, failedReason: string) => { + Logger.warn( + `Job ${job.id} [${job.name}] failed (attempt ${job.attemptsMade} of ${ + job.opts.attempts ?? 1 + })`, + ); + }) + .on('completed', (job: Bull.Job, failedReason: string) => { + Logger.debug(`Job ${job.id} [${job.name}] completed`); + }); } }