diff --git a/packages/core/src/job-queue/polling-job-queue-strategy.ts b/packages/core/src/job-queue/polling-job-queue-strategy.ts index 9a9f3a410b..57b305c780 100644 --- a/packages/core/src/job-queue/polling-job-queue-strategy.ts +++ b/packages/core/src/job-queue/polling-job-queue-strategy.ts @@ -24,7 +24,7 @@ export type BackoffStrategy = (queueName: string, attemptsMade: number, job: Job export interface PollingJobQueueStrategyConfig { concurrency?: number; - pollInterval?: number; + pollInterval?: number | ((queueName: string) => number); backoffStrategy?: BackoffStrategy; } @@ -38,6 +38,7 @@ class ActiveQueue = {}> { private errorNotifier$ = new Subject<[string, string]>(); private queueStopped$ = new Subject(); private subscription: Subscription; + private readonly pollInterval: number; constructor( private readonly queueName: string, @@ -48,6 +49,10 @@ class ActiveQueue = {}> { Logger.error(message); Logger.debug(stack); }); + this.pollInterval = + typeof this.jobQueueStrategy.pollInterval === 'function' + ? this.jobQueueStrategy.pollInterval(queueName) + : this.jobQueueStrategy.pollInterval; } start() { @@ -63,7 +68,7 @@ class ActiveQueue = {}> { await this.jobQueueStrategy.update(nextJob); const onProgress = (job: Job) => this.jobQueueStrategy.update(job); nextJob.on('progress', onProgress); - const cancellationSignal$ = interval(this.jobQueueStrategy.pollInterval * 5).pipe( + const cancellationSignal$ = interval(this.pollInterval * 5).pipe( // tslint:disable-next-line:no-non-null-assertion switchMap(() => this.jobQueueStrategy.findOne(nextJob.id!)), filter(job => job?.state === JobState.CANCELLED), @@ -106,7 +111,7 @@ class ActiveQueue = {}> { ]); } if (this.running) { - this.timer = setTimeout(runNextJobs, this.jobQueueStrategy.pollInterval); + this.timer = setTimeout(runNextJobs, this.pollInterval); } }; @@ -159,7 +164,7 @@ class ActiveQueue = {}> { */ export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy { public concurrency: number; - public pollInterval: number; + public pollInterval: number | ((queueName: string) => number); public backOffStrategy?: BackoffStrategy; private activeQueues = new QueueNameProcessStorage>(); diff --git a/packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts b/packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts index 1af5954b07..8a46061528 100644 --- a/packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts +++ b/packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts @@ -7,6 +7,20 @@ import { VendurePlugin } from '../vendure-plugin'; import { JobRecord } from './job-record.entity'; import { SqlJobQueueStrategy } from './sql-job-queue-strategy'; +/** + * @description + * Configuration options for the DefaultJobQueuePlugin. These values get passed into the + * {@link SqlJobQueueStrategy}. + * + * @docsCategory JobQueue + * @docsPage DefaultJobQueuePlugin + */ +export interface DefaultJobQueueOptions { + pollInterval?: number | ((queueName: string) => number); + concurrency?: number; + backoffStrategy?: BackoffStrategy; +} + /** * @description * A plugin which configures Vendure to use the SQL database to persist the JobQueue jobs using the {@link SqlJobQueueStrategy}. If you add this @@ -32,8 +46,27 @@ import { SqlJobQueueStrategy } from './sql-job-queue-strategy'; * ### pollInterval * The interval in ms between polling for new jobs. The default is 200ms. * Using a longer interval reduces load on the database but results in a slight - * delay in processing jobs. + * delay in processing jobs. For more control, it is possible to supply a function which can specify + * a pollInterval based on the queue name: * + * @example + * ```TypeScript + * export const config: VendureConfig = { + * plugins: [ + * DefaultJobQueuePlugin.init({ + * pollInterval: queueName => { + * if (queueName === 'cart-recovery-email') { + * // This queue does not need to be polled so frequently, + * // so we set a longer interval in order to reduce load + * // on the database. + * return 10000; + * } + * return 200; + * }, + * }), + * ], + * }; + * ``` * ### concurrency * The number of jobs to process concurrently per worker. Defaults to 1. * @@ -67,6 +100,7 @@ import { SqlJobQueueStrategy } from './sql-job-queue-strategy'; * ``` * * @docsCategory JobQueue + * @docsWeight 0 */ @VendurePlugin({ imports: [PluginCommonModule], @@ -83,13 +117,9 @@ import { SqlJobQueueStrategy } from './sql-job-queue-strategy'; }) export class DefaultJobQueuePlugin { /** @internal */ - static options: { pollInterval?: number; concurrency?: number; backoffStrategy?: BackoffStrategy }; + static options: DefaultJobQueueOptions; - static init(options: { - pollInterval?: number; - concurrency?: number; - backoffStrategy?: BackoffStrategy; - }): Type { + static init(options: DefaultJobQueueOptions): Type { DefaultJobQueuePlugin.options = options; return DefaultJobQueuePlugin; }