Skip to content

Commit

Permalink
fix(job-queue-plugin): Add missing logging & backoff settings
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbromley committed Sep 23, 2021
1 parent 423f307 commit 6f7cc34
Showing 1 changed file with 24 additions and 5 deletions.
29 changes: 24 additions & 5 deletions packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,18 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
...options.queueOptions,
connection: options.connection,
}).on('error', (e: any) => Logger.error(`BullMQ Queue error: ${e.message}`, loggerCtx, e.stack));
const client = await this.queue.client;

if (await this.queue.isPaused()) {
await this.queue.resume();
}

this.workerProcessor = async bullJob => {
const queueName = bullJob.name;
Logger.debug(
`Job ${bullJob.id} [${queueName}] starting (attempt ${bullJob.attemptsMade + 1} of ${
bullJob.opts.attempts ?? 1
})`,
);
const processFn = this.queueNameProcessFnMap.get(queueName);
if (processFn) {
const job = this.createVendureJob(bullJob);
Expand All @@ -82,7 +86,11 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {

async add<Data extends JobData<Data> = {}>(job: Job<Data>): Promise<Job<Data>> {
const bullJob = await this.queue.add(job.queueName, job.data, {
attempts: job.retries,
attempts: job.retries + 1,
backoff: {
delay: 1000,
type: 'exponential',
},
});
return this.createVendureJob(bullJob);
}
Expand Down Expand Up @@ -187,9 +195,20 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
...this.options.workerOptions,
connection: this.options.connection,
};
this.worker = new Worker(QUEUE_NAME, this.workerProcessor, options).on('error', (e: any) =>
Logger.error(`BullMQ Worker error: ${e.message}`, loggerCtx, e.stack),
);
this.worker = new Worker(QUEUE_NAME, this.workerProcessor, options)
.on('error', (e: any) =>
Logger.error(`BullMQ Worker error: ${e.message}`, loggerCtx, e.stack),
)
.on('failed', (job: Bull.Job, failedReason: string) => {
Logger.warn(
`Job ${job.id} [${job.name}] failed (attempt ${job.attemptsMade} of ${
job.opts.attempts ?? 1
})`,
);
})
.on('completed', (job: Bull.Job, failedReason: string) => {
Logger.debug(`Job ${job.id} [${job.name}] completed`);
});
}
}

Expand Down

0 comments on commit 6f7cc34

Please sign in to comment.