Skip to content

Commit

Permalink
feat(core): JobQueueStrategy pollInterval accepts function
Browse files Browse the repository at this point in the history
Allows finer control over polling intervals based on queue type.
  • Loading branch information
michaelbromley committed Apr 20, 2021
1 parent 709cdff commit c2701b9
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 11 deletions.
13 changes: 9 additions & 4 deletions packages/core/src/job-queue/polling-job-queue-strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export type BackoffStrategy = (queueName: string, attemptsMade: number, job: Job

export interface PollingJobQueueStrategyConfig {
concurrency?: number;
pollInterval?: number;
pollInterval?: number | ((queueName: string) => number);
backoffStrategy?: BackoffStrategy;
}

Expand All @@ -38,6 +38,7 @@ class ActiveQueue<Data extends JobData<Data> = {}> {
private errorNotifier$ = new Subject<[string, string]>();
private queueStopped$ = new Subject<typeof STOP_SIGNAL>();
private subscription: Subscription;
private readonly pollInterval: number;

constructor(
private readonly queueName: string,
Expand All @@ -48,6 +49,10 @@ class ActiveQueue<Data extends JobData<Data> = {}> {
Logger.error(message);
Logger.debug(stack);
});
this.pollInterval =
typeof this.jobQueueStrategy.pollInterval === 'function'
? this.jobQueueStrategy.pollInterval(queueName)
: this.jobQueueStrategy.pollInterval;
}

start() {
Expand All @@ -63,7 +68,7 @@ class ActiveQueue<Data extends JobData<Data> = {}> {
await this.jobQueueStrategy.update(nextJob);
const onProgress = (job: Job) => this.jobQueueStrategy.update(job);
nextJob.on('progress', onProgress);
const cancellationSignal$ = interval(this.jobQueueStrategy.pollInterval * 5).pipe(
const cancellationSignal$ = interval(this.pollInterval * 5).pipe(
// tslint:disable-next-line:no-non-null-assertion
switchMap(() => this.jobQueueStrategy.findOne(nextJob.id!)),
filter(job => job?.state === JobState.CANCELLED),
Expand Down Expand Up @@ -106,7 +111,7 @@ class ActiveQueue<Data extends JobData<Data> = {}> {
]);
}
if (this.running) {
this.timer = setTimeout(runNextJobs, this.jobQueueStrategy.pollInterval);
this.timer = setTimeout(runNextJobs, this.pollInterval);
}
};

Expand Down Expand Up @@ -159,7 +164,7 @@ class ActiveQueue<Data extends JobData<Data> = {}> {
*/
export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy {
public concurrency: number;
public pollInterval: number;
public pollInterval: number | ((queueName: string) => number);
public backOffStrategy?: BackoffStrategy;

private activeQueues = new QueueNameProcessStorage<ActiveQueue<any>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,20 @@ import { VendurePlugin } from '../vendure-plugin';
import { JobRecord } from './job-record.entity';
import { SqlJobQueueStrategy } from './sql-job-queue-strategy';

/**
* @description
* Configuration options for the DefaultJobQueuePlugin. These values get passed into the
* {@link SqlJobQueueStrategy}.
*
* @docsCategory JobQueue
* @docsPage DefaultJobQueuePlugin
*/
export interface DefaultJobQueueOptions {
pollInterval?: number | ((queueName: string) => number);
concurrency?: number;
backoffStrategy?: BackoffStrategy;
}

/**
* @description
* A plugin which configures Vendure to use the SQL database to persist the JobQueue jobs using the {@link SqlJobQueueStrategy}. If you add this
Expand All @@ -32,8 +46,27 @@ import { SqlJobQueueStrategy } from './sql-job-queue-strategy';
* ### pollInterval
* The interval in ms between polling for new jobs. The default is 200ms.
* Using a longer interval reduces load on the database but results in a slight
* delay in processing jobs.
* delay in processing jobs. For more control, it is possible to supply a function which can specify
* a pollInterval based on the queue name:
*
* @example
* ```TypeScript
* export const config: VendureConfig = {
* plugins: [
* DefaultJobQueuePlugin.init({
* pollInterval: queueName => {
* if (queueName === 'cart-recovery-email') {
* // This queue does not need to be polled so frequently,
* // so we set a longer interval in order to reduce load
* // on the database.
* return 10000;
* }
* return 200;
* },
* }),
* ],
* };
* ```
* ### concurrency
* The number of jobs to process concurrently per worker. Defaults to 1.
*
Expand Down Expand Up @@ -67,6 +100,7 @@ import { SqlJobQueueStrategy } from './sql-job-queue-strategy';
* ```
*
* @docsCategory JobQueue
* @docsWeight 0
*/
@VendurePlugin({
imports: [PluginCommonModule],
Expand All @@ -83,13 +117,9 @@ import { SqlJobQueueStrategy } from './sql-job-queue-strategy';
})
export class DefaultJobQueuePlugin {
/** @internal */
static options: { pollInterval?: number; concurrency?: number; backoffStrategy?: BackoffStrategy };
static options: DefaultJobQueueOptions;

static init(options: {
pollInterval?: number;
concurrency?: number;
backoffStrategy?: BackoffStrategy;
}): Type<DefaultJobQueuePlugin> {
static init(options: DefaultJobQueueOptions): Type<DefaultJobQueuePlugin> {
DefaultJobQueuePlugin.options = options;
return DefaultJobQueuePlugin;
}
Expand Down

0 comments on commit c2701b9

Please sign in to comment.