Skip to content

Commit

Permalink
feat(core): Configurable backoff strategy for DefaultJobQueuePlugin
Browse files Browse the repository at this point in the history
Closes #813
  • Loading branch information
michaelbromley committed Apr 7, 2021
1 parent 0241ade commit be0a27d
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 23 deletions.
12 changes: 11 additions & 1 deletion packages/core/src/job-queue/in-memory-job-queue-strategy.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ describe('InMemoryJobQueueStrategy', () => {
let strategy: InMemoryJobQueueStrategy;
beforeEach(() => {
strategy = new InMemoryJobQueueStrategy();
// init with mock injector & ProcessContext
strategy.init({
get() {
return { isWorker: false };
},
} as any);
});

afterEach(async () => {
await strategy.destroy();
});

describe('findMany options', () => {
Expand Down Expand Up @@ -56,7 +66,7 @@ describe('InMemoryJobQueueStrategy', () => {

async function getIdResultsFor(options: JobListOptions): Promise<string[]> {
const result = await strategy.findMany(options);
return result.items.map((j) => j.id as string);
return result.items.map(j => j.id as string);
}

it('take & skip', async () => {
Expand Down
51 changes: 44 additions & 7 deletions packages/core/src/job-queue/in-memory-job-queue-strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import { ID, PaginatedList } from '@vendure/common/lib/shared-types';
import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';

import { Injector } from '../common';
import { InspectableJobQueueStrategy } from '../config';
import { InspectableJobQueueStrategy } from '../config/job-queue/inspectable-job-queue-strategy';
import { Logger } from '../config/logger/vendure-logger';
import { ProcessContext } from '../process-context/process-context';

import { Job } from './job';
import { PollingJobQueueStrategy } from './polling-job-queue-strategy';
Expand All @@ -21,19 +23,33 @@ import { JobData } from './types';
* @description
* An in-memory {@link JobQueueStrategy}. This is the default strategy if not using a dedicated
* JobQueue plugin (e.g. {@link DefaultJobQueuePlugin}). Not recommended for production, since
* the queue will be cleared when the server stops.
* the queue will be cleared when the server stops, and can only be used when the JobQueueService is
* started from the main server process:
*
* @example
* ```TypeScript
* bootstrap(config)
* .then(app => app.get(JobQueueService).start());
* ```
*
* Attempting to use this strategy when running the worker in a separate process (using `bootstrapWorker()`)
* will result in an error on startup.
*
* Completed jobs will be evicted from the store every 2 hours to prevent a memory leak.
*
* @docsCategory JobQueue
*/
export class InMemoryJobQueueStrategy extends PollingJobQueueStrategy implements InspectableJobQueueStrategy {
protected jobs = new Map<ID, Job>();
protected unsettledJobs: { [queueName: string]: Job[] } = {};
protected unsettledJobs: { [queueName: string]: Array<{ job: Job; updatedAt: Date }> } = {};
private timer: any;
private evictJobsAfterMs = 1000 * 60 * 60 * 2; // 2 hours
private processContext: ProcessContext;
private processContextChecked = false;

init(injector: Injector) {
super.init(injector);
this.processContext = injector.get(ProcessContext);
this.timer = setTimeout(this.evictSettledJobs, this.evictJobsAfterMs);
}

Expand All @@ -53,7 +69,7 @@ export class InMemoryJobQueueStrategy extends PollingJobQueueStrategy implements
if (!this.unsettledJobs[job.queueName]) {
this.unsettledJobs[job.queueName] = [];
}
this.unsettledJobs[job.queueName].push(job);
this.unsettledJobs[job.queueName].push({ job, updatedAt: new Date() });
return job;
}

Expand Down Expand Up @@ -85,16 +101,25 @@ export class InMemoryJobQueueStrategy extends PollingJobQueueStrategy implements
}

async next(queueName: string): Promise<Job | undefined> {
this.checkProcessContext();
const next = this.unsettledJobs[queueName]?.shift();
if (next) {
next.start();
return next;
if (next.job.state === JobState.RETRYING && typeof this.backOffStrategy === 'function') {
const msSinceLastFailure = Date.now() - +next.updatedAt;
const backOffDelayMs = this.backOffStrategy(queueName, next.job.attempts, next.job);
if (msSinceLastFailure < backOffDelayMs) {
this.unsettledJobs[queueName]?.push(next);
return;
}
}
next.job.start();
return next.job;
}
}

async update(job: Job): Promise<void> {
if (job.state === JobState.RETRYING || job.state === JobState.PENDING) {
this.unsettledJobs[job.queueName].unshift(job);
this.unsettledJobs[job.queueName].unshift({ job, updatedAt: new Date() });
}
// tslint:disable-next-line:no-non-null-assertion
this.jobs.set(job.id!, job);
Expand Down Expand Up @@ -195,4 +220,16 @@ export class InMemoryJobQueueStrategy extends PollingJobQueueStrategy implements
this.removeSettledJobs([], new Date(olderThanMs));
this.timer = setTimeout(this.evictSettledJobs, this.evictJobsAfterMs);
};

private checkProcessContext() {
if (!this.processContextChecked) {
if (this.processContext.isWorker) {
Logger.error(
`The InMemoryJobQueueStrategy will not work when running job queues outside the main server process!`,
);
process.kill(process.pid, 'SIGINT');
}
this.processContextChecked = true;
}
}
}
25 changes: 23 additions & 2 deletions packages/core/src/job-queue/job-queue.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@ import { take } from 'rxjs/operators';

import { assertFound, Injector } from '../common';
import { ConfigService } from '../config/config.service';
import { ProcessContext, setProcessContext } from '../process-context/process-context';

import { Job } from './job';
import { JobQueueService } from './job-queue.service';
import { TestingJobQueueStrategy } from './testing-job-queue-strategy';

const queuePollInterval = 10;
const testJobQueueStrategy = new TestingJobQueueStrategy(1, queuePollInterval);
const backoffStrategySpy = jest.fn();
const testJobQueueStrategy = new TestingJobQueueStrategy({
concurrency: 1,
pollInterval: queuePollInterval,
backoffStrategy: backoffStrategySpy.mockReturnValue(0),
});

describe('JobQueueService', () => {
let jobQueueService: JobQueueService;
Expand All @@ -26,8 +32,14 @@ describe('JobQueueService', () => {
}

beforeEach(async () => {
setProcessContext('server');

module = await Test.createTestingModule({
providers: [{ provide: ConfigService, useClass: MockConfigService }, JobQueueService],
providers: [
{ provide: ConfigService, useClass: MockConfigService },
JobQueueService,
ProcessContext,
],
}).compile();
await module.init();

Expand Down Expand Up @@ -236,6 +248,7 @@ describe('JobQueueService', () => {
});

it('retries', async () => {
backoffStrategySpy.mockClear();
const subject = new Subject<boolean>();
const testQueue = await jobQueueService.createQueue<string>({
name: 'test',
Expand Down Expand Up @@ -263,12 +276,20 @@ describe('JobQueueService', () => {
expect((await getJob(testJob)).isSettled).toBe(false);

await tick(queuePollInterval);

expect(backoffStrategySpy).toHaveBeenCalledTimes(1);
expect(backoffStrategySpy.mock.calls[0]).toEqual(['test', 1, await getJob(testJob)]);

subject.next(false);
await tick();
expect((await getJob(testJob)).state).toBe(JobState.RETRYING);
expect((await getJob(testJob)).isSettled).toBe(false);

await tick(queuePollInterval);

expect(backoffStrategySpy).toHaveBeenCalledTimes(2);
expect(backoffStrategySpy.mock.calls[1]).toEqual(['test', 2, await getJob(testJob)]);

subject.next(false);
await tick();
expect((await getJob(testJob)).state).toBe(JobState.FAILED);
Expand Down
39 changes: 38 additions & 1 deletion packages/core/src/job-queue/polling-job-queue-strategy.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { JobState } from '@vendure/common/lib/generated-types';
import { ID } from '@vendure/common/lib/shared-types';
import { isObject } from '@vendure/common/lib/shared-utils';
import { interval, race, Subject, Subscription } from 'rxjs';
import { fromPromise } from 'rxjs/internal-compatibility';
import { filter, switchMap, take, throttleTime } from 'rxjs/operators';
Expand All @@ -11,6 +12,22 @@ import { Job } from './job';
import { QueueNameProcessStorage } from './queue-name-process-storage';
import { JobData } from './types';

/**
* @description
* Defines the backoff strategy used when retrying failed jobs. Returns the delay in
* ms that should pass before the failed job is retried.
*
* @docsCategory JobQueue
* @docsPage types
*/
export type BackoffStrategy = (queueName: string, attemptsMade: number, job: Job) => number;

export interface PollingJobQueueStrategyConfig {
concurrency?: number;
pollInterval?: number;
backoffStrategy?: BackoffStrategy;
}

const STOP_SIGNAL = Symbol('STOP_SIGNAL');

class ActiveQueue<Data extends JobData<Data> = {}> {
Expand Down Expand Up @@ -134,12 +151,32 @@ class ActiveQueue<Data extends JobData<Data> = {}> {
* @description
* This class allows easier implementation of {@link JobQueueStrategy} in a polling style.
* Instead of providing {@link JobQueueStrategy} `start()` you should provide a `next` method.
*
* This class should be extended by any strategy which does not support a push-based system
* to notify on new jobs. It is used by the {@link SqlJobQueueStrategy} and {@link InMemoryJobQueueStrategy}.
*
* @docsCategory JobQueue
*/
export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy {
public concurrency: number;
public pollInterval: number;
public backOffStrategy?: BackoffStrategy;

private activeQueues = new QueueNameProcessStorage<ActiveQueue<any>>();

constructor(public concurrency: number = 1, public pollInterval: number = 200) {
constructor(config?: PollingJobQueueStrategyConfig);
constructor(concurrency?: number, pollInterval?: number);
constructor(concurrencyOrConfig?: number | PollingJobQueueStrategyConfig, maybePollInterval?: number) {
super();

if (concurrencyOrConfig && isObject(concurrencyOrConfig)) {
this.concurrency = concurrencyOrConfig.concurrency ?? 1;
this.pollInterval = concurrencyOrConfig.pollInterval ?? 200;
this.backOffStrategy = concurrencyOrConfig.backoffStrategy;
} else {
this.concurrency = concurrencyOrConfig ?? 1;
this.pollInterval = maybePollInterval ?? 200;
}
}

async start<Data extends JobData<Data> = {}>(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import { Type } from '@vendure/common/lib/shared-types';

import { BackoffStrategy } from '../../job-queue/polling-job-queue-strategy';
import { PluginCommonModule } from '../plugin-common.module';
import { VendurePlugin } from '../vendure-plugin';

Expand Down Expand Up @@ -26,17 +29,38 @@ import { SqlJobQueueStrategy } from './sql-job-queue-strategy';
*
* It is possible to configure the behaviour of the {@link SqlJobQueueStrategy} by passing options to the static `init()` function:
*
* ### pollInterval
* The interval in ms between polling for new jobs. The default is 200ms.
* Using a longer interval reduces load on the database but results in a slight
* delay in processing jobs.
*
* ### concurrency
* The number of jobs to process concurrently per worker. Defaults to 1.
*
* ### backoffStrategy
* Defines the backoff strategy used when retrying failed jobs. In other words, if a job fails
* and is configured to be re-tried, how long should we wait before the next attempt?
*
* By default a job will be retried as soon as possible, but in some cases this is not desirable. For example,
* a job may interact with an unreliable 3rd-party API which is sensitive to too many requests. In this case, an
* exponential backoff may be used which progressively increases the delay between each subsequent retry.
*
* @example
* ```TypeScript
* export const config: VendureConfig = {
* plugins: [
* DefaultJobQueuePlugin.init({
* // The interval in ms between polling for new jobs. The default is 200ms.
* // Using a longer interval reduces load on the database but results in a slight
* // delay in processing jobs.
* pollInterval: 5000,
* // The number of jobs to process concurrently per worker. Defaults to 1.
* concurrency: 2
* backoffStrategy: (queueName, attemptsMade, job) => {
* if (queueName === 'transcode-video') {
* // exponential backoff example
* return (attemptsMade ** 2) * 1000;
* }
*
* // A default delay for all other queues
* return 1000;
* },
* }),
* ],
* };
Expand All @@ -48,14 +72,24 @@ import { SqlJobQueueStrategy } from './sql-job-queue-strategy';
imports: [PluginCommonModule],
entities: [JobRecord],
configuration: config => {
const { pollInterval, concurrency } = DefaultJobQueuePlugin.options ?? {};
config.jobQueueOptions.jobQueueStrategy = new SqlJobQueueStrategy(concurrency, pollInterval);
const { pollInterval, concurrency, backoffStrategy } = DefaultJobQueuePlugin.options ?? {};
config.jobQueueOptions.jobQueueStrategy = new SqlJobQueueStrategy({
concurrency,
pollInterval,
backoffStrategy,
});
return config;
},
})
export class DefaultJobQueuePlugin {
static options: { pollInterval?: number; concurrency?: number };
static init(options: { pollInterval?: number; concurrency?: number }) {
/** @internal */
static options: { pollInterval?: number; concurrency?: number; backoffStrategy?: BackoffStrategy };

static init(options: {
pollInterval?: number;
concurrency?: number;
backoffStrategy?: BackoffStrategy;
}): Type<DefaultJobQueuePlugin> {
DefaultJobQueuePlugin.options = options;
return DefaultJobQueuePlugin;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
const record = await qb.getOne();
if (record) {
const job = this.fromRecord(record);
if (record.state === JobState.RETRYING && typeof this.backOffStrategy === 'function') {
const msSinceLastFailure = Date.now() - +record.updatedAt;
const backOffDelayMs = this.backOffStrategy(queueName, record.attempts, job);
if (msSinceLastFailure < backOffDelayMs) {
return;
}
}
job.start();
record.state = JobState.RUNNING;
await manager.getRepository(JobRecord).save(record, { reload: false });
Expand Down
Loading

0 comments on commit be0a27d

Please sign in to comment.