diff --git a/packages/core/src/job-queue/in-memory-job-queue-strategy.spec.ts b/packages/core/src/job-queue/in-memory-job-queue-strategy.spec.ts index f4dea4c405..153a77ada9 100644 --- a/packages/core/src/job-queue/in-memory-job-queue-strategy.spec.ts +++ b/packages/core/src/job-queue/in-memory-job-queue-strategy.spec.ts @@ -8,6 +8,16 @@ describe('InMemoryJobQueueStrategy', () => { let strategy: InMemoryJobQueueStrategy; beforeEach(() => { strategy = new InMemoryJobQueueStrategy(); + // init with mock injector & ProcessContext + strategy.init({ + get() { + return { isWorker: false }; + }, + } as any); + }); + + afterEach(async () => { + await strategy.destroy(); }); describe('findMany options', () => { @@ -56,7 +66,7 @@ describe('InMemoryJobQueueStrategy', () => { async function getIdResultsFor(options: JobListOptions): Promise<string[]> { const result = await strategy.findMany(options); - return result.items.map((j) => j.id as string); + return result.items.map(j => j.id as string); } it('take & skip', async () => { diff --git a/packages/core/src/job-queue/in-memory-job-queue-strategy.ts b/packages/core/src/job-queue/in-memory-job-queue-strategy.ts index 4043a4f228..d5f3066fd0 100644 --- a/packages/core/src/job-queue/in-memory-job-queue-strategy.ts +++ b/packages/core/src/job-queue/in-memory-job-queue-strategy.ts @@ -11,7 +11,9 @@ import { ID, PaginatedList } from '@vendure/common/lib/shared-types'; import { notNullOrUndefined } from '@vendure/common/lib/shared-utils'; import { Injector } from '../common'; -import { InspectableJobQueueStrategy } from '../config'; +import { InspectableJobQueueStrategy } from '../config/job-queue/inspectable-job-queue-strategy'; +import { Logger } from '../config/logger/vendure-logger'; +import { ProcessContext } from '../process-context/process-context'; import { Job } from './job'; import { PollingJobQueueStrategy } from './polling-job-queue-strategy'; @@ -21,19 +23,33 @@ import { JobData } from './types'; * @description * An in-memory {@link JobQueueStrategy}. This is the default strategy if not using a dedicated * JobQueue plugin (e.g. {@link DefaultJobQueuePlugin}). Not recommended for production, since - * the queue will be cleared when the server stops. + * the queue will be cleared when the server stops, and can only be used when the JobQueueService is + * started from the main server process: + * + * @example + * ```TypeScript + * bootstrap(config) + * .then(app => app.get(JobQueueService).start()); + * ``` + * + * Attempting to use this strategy when running the worker in a separate process (using `bootstrapWorker()`) + * will result in an error on startup. + * * Completed jobs will be evicted from the store every 2 hours to prevent a memory leak. * * @docsCategory JobQueue */ export class InMemoryJobQueueStrategy extends PollingJobQueueStrategy implements InspectableJobQueueStrategy { protected jobs = new Map<ID, Job>(); - protected unsettledJobs: { [queueName: string]: Job[] } = {}; + protected unsettledJobs: { [queueName: string]: Array<{ job: Job; updatedAt: Date }> } = {}; private timer: any; private evictJobsAfterMs = 1000 * 60 * 60 * 2; // 2 hours + private processContext: ProcessContext; + private processContextChecked = false; init(injector: Injector) { super.init(injector); + this.processContext = injector.get(ProcessContext); this.timer = setTimeout(this.evictSettledJobs, this.evictJobsAfterMs); } @@ -53,7 +69,7 @@ export class InMemoryJobQueueStrategy extends PollingJobQueueStrategy implements if (!this.unsettledJobs[job.queueName]) { this.unsettledJobs[job.queueName] = []; } - this.unsettledJobs[job.queueName].push(job); + this.unsettledJobs[job.queueName].push({ job, updatedAt: new Date() }); return job; } @@ -85,16 +101,25 @@ export class InMemoryJobQueueStrategy extends PollingJobQueueStrategy implements } async next(queueName: string): Promise<Job | undefined> { + this.checkProcessContext(); const next = this.unsettledJobs[queueName]?.shift(); if (next) { - next.start(); - return next; + if (next.job.state === JobState.RETRYING && typeof this.backOffStrategy === 'function') { + const msSinceLastFailure = Date.now() - +next.updatedAt; + const backOffDelayMs = this.backOffStrategy(queueName, next.job.attempts, next.job); + if (msSinceLastFailure < backOffDelayMs) { + this.unsettledJobs[queueName]?.push(next); + return; + } + } + next.job.start(); + return next.job; } } async update(job: Job): Promise<void> { if (job.state === JobState.RETRYING || job.state === JobState.PENDING) { - this.unsettledJobs[job.queueName].unshift(job); + this.unsettledJobs[job.queueName].unshift({ job, updatedAt: new Date() }); } // tslint:disable-next-line:no-non-null-assertion this.jobs.set(job.id!, job); @@ -195,4 +220,16 @@ export class InMemoryJobQueueStrategy extends PollingJobQueueStrategy implements this.removeSettledJobs([], new Date(olderThanMs)); this.timer = setTimeout(this.evictSettledJobs, this.evictJobsAfterMs); }; + + private checkProcessContext() { + if (!this.processContextChecked) { + if (this.processContext.isWorker) { + Logger.error( + `The InMemoryJobQueueStrategy will not work when running job queues outside the main server process!`, + ); + process.kill(process.pid, 'SIGINT'); + } + this.processContextChecked = true; + } + } } diff --git a/packages/core/src/job-queue/job-queue.service.spec.ts b/packages/core/src/job-queue/job-queue.service.spec.ts index e56eed7d42..44bf2fa7bf 100644 --- a/packages/core/src/job-queue/job-queue.service.spec.ts +++ b/packages/core/src/job-queue/job-queue.service.spec.ts @@ -8,13 +8,19 @@ import { take } from 'rxjs/operators'; import { assertFound, Injector } from '../common'; import { ConfigService } from '../config/config.service'; +import { ProcessContext, setProcessContext } from '../process-context/process-context'; import { Job } from './job'; import { JobQueueService } from './job-queue.service'; import { TestingJobQueueStrategy } from './testing-job-queue-strategy'; const queuePollInterval = 10; -const testJobQueueStrategy = new TestingJobQueueStrategy(1, queuePollInterval); +const backoffStrategySpy = jest.fn(); +const testJobQueueStrategy = new TestingJobQueueStrategy({ + concurrency: 1, + pollInterval: queuePollInterval, + backoffStrategy: backoffStrategySpy.mockReturnValue(0), +}); describe('JobQueueService', () => { let jobQueueService: JobQueueService; @@ -26,8 +32,14 @@ describe('JobQueueService', () => { } beforeEach(async () => { + setProcessContext('server'); + module = await Test.createTestingModule({ - providers: [{ provide: ConfigService, useClass: MockConfigService }, JobQueueService], + providers: [ + { provide: ConfigService, useClass: MockConfigService }, + JobQueueService, + ProcessContext, + ], }).compile(); await module.init(); @@ -236,6 +248,7 @@ describe('JobQueueService', () => { }); it('retries', async () => { + backoffStrategySpy.mockClear(); const subject = new Subject<boolean>(); const testQueue = await jobQueueService.createQueue<string>({ name: 'test', @@ -263,12 +276,20 @@ describe('JobQueueService', () => { expect((await getJob(testJob)).isSettled).toBe(false); await tick(queuePollInterval); + + expect(backoffStrategySpy).toHaveBeenCalledTimes(1); + expect(backoffStrategySpy.mock.calls[0]).toEqual(['test', 1, await getJob(testJob)]); + subject.next(false); await tick(); expect((await getJob(testJob)).state).toBe(JobState.RETRYING); expect((await getJob(testJob)).isSettled).toBe(false); await tick(queuePollInterval); + + expect(backoffStrategySpy).toHaveBeenCalledTimes(2); + expect(backoffStrategySpy.mock.calls[1]).toEqual(['test', 2, await getJob(testJob)]); + subject.next(false); await tick(); expect((await getJob(testJob)).state).toBe(JobState.FAILED); diff --git a/packages/core/src/job-queue/polling-job-queue-strategy.ts b/packages/core/src/job-queue/polling-job-queue-strategy.ts index dee2b4ab3b..9a9f3a410b 100644 --- a/packages/core/src/job-queue/polling-job-queue-strategy.ts +++ b/packages/core/src/job-queue/polling-job-queue-strategy.ts @@ -1,5 +1,6 @@ import { JobState } from '@vendure/common/lib/generated-types'; import { ID } from '@vendure/common/lib/shared-types'; +import { isObject } from '@vendure/common/lib/shared-utils'; import { interval, race, Subject, Subscription } from 'rxjs'; import { fromPromise } from 'rxjs/internal-compatibility'; import { filter, switchMap, take, throttleTime } from 'rxjs/operators'; @@ -11,6 +12,22 @@ import { Job } from './job'; import { QueueNameProcessStorage } from './queue-name-process-storage'; import { JobData } from './types'; +/** + * @description + * Defines the backoff strategy used when retrying failed jobs. Returns the delay in + * ms that should pass before the failed job is retried. + * + * @docsCategory JobQueue + * @docsPage types + */ +export type BackoffStrategy = (queueName: string, attemptsMade: number, job: Job) => number; + +export interface PollingJobQueueStrategyConfig { + concurrency?: number; + pollInterval?: number; + backoffStrategy?: BackoffStrategy; +} + const STOP_SIGNAL = Symbol('STOP_SIGNAL'); class ActiveQueue<Data extends JobData<Data> = {}> { @@ -134,12 +151,32 @@ class ActiveQueue<Data extends JobData<Data> = {}> { * @description * This class allows easier implementation of {@link JobQueueStrategy} in a polling style. * Instead of providing {@link JobQueueStrategy} `start()` you should provide a `next` method. + * + * This class should be extended by any strategy which does not support a push-based system + * to notify on new jobs. It is used by the {@link SqlJobQueueStrategy} and {@link InMemoryJobQueueStrategy}. + * + * @docsCategory JobQueue */ export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy { + public concurrency: number; + public pollInterval: number; + public backOffStrategy?: BackoffStrategy; + private activeQueues = new QueueNameProcessStorage<ActiveQueue<any>>(); - constructor(public concurrency: number = 1, public pollInterval: number = 200) { + constructor(config?: PollingJobQueueStrategyConfig); + constructor(concurrency?: number, pollInterval?: number); + constructor(concurrencyOrConfig?: number | PollingJobQueueStrategyConfig, maybePollInterval?: number) { super(); + + if (concurrencyOrConfig && isObject(concurrencyOrConfig)) { + this.concurrency = concurrencyOrConfig.concurrency ?? 1; + this.pollInterval = concurrencyOrConfig.pollInterval ?? 200; + this.backOffStrategy = concurrencyOrConfig.backoffStrategy; + } else { + this.concurrency = concurrencyOrConfig ?? 1; + this.pollInterval = maybePollInterval ?? 200; + } } async start<Data extends JobData<Data> = {}>( diff --git a/packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts b/packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts index b7d0cb421c..1af5954b07 100644 --- a/packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts +++ b/packages/core/src/plugin/default-job-queue-plugin/default-job-queue-plugin.ts @@ -1,3 +1,6 @@ +import { Type } from '@vendure/common/lib/shared-types'; + +import { BackoffStrategy } from '../../job-queue/polling-job-queue-strategy'; import { PluginCommonModule } from '../plugin-common.module'; import { VendurePlugin } from '../vendure-plugin'; @@ -26,17 +29,38 @@ import { SqlJobQueueStrategy } from './sql-job-queue-strategy'; * * It is possible to configure the behaviour of the {@link SqlJobQueueStrategy} by passing options to the static `init()` function: * + * ### pollInterval + * The interval in ms between polling for new jobs. The default is 200ms. + * Using a longer interval reduces load on the database but results in a slight + * delay in processing jobs. + * + * ### concurrency + * The number of jobs to process concurrently per worker. Defaults to 1. + * + * ### backoffStrategy + * Defines the backoff strategy used when retrying failed jobs. In other words, if a job fails + * and is configured to be re-tried, how long should we wait before the next attempt? + * + * By default a job will be retried as soon as possible, but in some cases this is not desirable. For example, + * a job may interact with an unreliable 3rd-party API which is sensitive to too many requests. In this case, an + * exponential backoff may be used which progressively increases the delay between each subsequent retry. + * * @example * ```TypeScript * export const config: VendureConfig = { * plugins: [ * DefaultJobQueuePlugin.init({ - * // The interval in ms between polling for new jobs. The default is 200ms. - * // Using a longer interval reduces load on the database but results in a slight - * // delay in processing jobs. * pollInterval: 5000, - * // The number of jobs to process concurrently per worker. Defaults to 1. * concurrency: 2 + * backoffStrategy: (queueName, attemptsMade, job) => { + * if (queueName === 'transcode-video') { + * // exponential backoff example + * return (attemptsMade ** 2) * 1000; + * } + * + * // A default delay for all other queues + * return 1000; + * }, * }), * ], * }; @@ -48,14 +72,24 @@ import { SqlJobQueueStrategy } from './sql-job-queue-strategy'; imports: [PluginCommonModule], entities: [JobRecord], configuration: config => { - const { pollInterval, concurrency } = DefaultJobQueuePlugin.options ?? {}; - config.jobQueueOptions.jobQueueStrategy = new SqlJobQueueStrategy(concurrency, pollInterval); + const { pollInterval, concurrency, backoffStrategy } = DefaultJobQueuePlugin.options ?? {}; + config.jobQueueOptions.jobQueueStrategy = new SqlJobQueueStrategy({ + concurrency, + pollInterval, + backoffStrategy, + }); return config; }, }) export class DefaultJobQueuePlugin { - static options: { pollInterval?: number; concurrency?: number }; - static init(options: { pollInterval?: number; concurrency?: number }) { + /** @internal */ + static options: { pollInterval?: number; concurrency?: number; backoffStrategy?: BackoffStrategy }; + + static init(options: { + pollInterval?: number; + concurrency?: number; + backoffStrategy?: BackoffStrategy; + }): Type<DefaultJobQueuePlugin> { DefaultJobQueuePlugin.options = options; return DefaultJobQueuePlugin; } diff --git a/packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts b/packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts index 4be8c52517..79c693132f 100644 --- a/packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts +++ b/packages/core/src/plugin/default-job-queue-plugin/sql-job-queue-strategy.ts @@ -93,6 +93,13 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp const record = await qb.getOne(); if (record) { const job = this.fromRecord(record); + if (record.state === JobState.RETRYING && typeof this.backOffStrategy === 'function') { + const msSinceLastFailure = Date.now() - +record.updatedAt; + const backOffDelayMs = this.backOffStrategy(queueName, record.attempts, job); + if (msSinceLastFailure < backOffDelayMs) { + return; + } + } job.start(); record.state = JobState.RUNNING; await manager.getRepository(JobRecord).save(record, { reload: false }); diff --git a/packages/dev-server/test-plugins/job-queue-test/job-queue-test-plugin.ts b/packages/dev-server/test-plugins/job-queue-test/job-queue-test-plugin.ts index a633071155..315e39eb40 100644 --- a/packages/dev-server/test-plugins/job-queue-test/job-queue-test-plugin.ts +++ b/packages/dev-server/test-plugins/job-queue-test/job-queue-test-plugin.ts @@ -6,6 +6,13 @@ import { gql } from 'apollo-server-core'; import { of } from 'rxjs'; import { catchError, map, tap } from 'rxjs/operators'; +interface TaskConfigInput { + intervalMs: number; + shouldFail: boolean; + retries: number; + subscribeToResult: boolean; +} + @Injectable() export class JobQueueTestService implements OnModuleInit { private myQueue: JobQueue<{ intervalMs: number; shouldFail: boolean }>; @@ -34,8 +41,9 @@ export class JobQueueTestService implements OnModuleInit { }); } - async startTask(intervalMs: number, shouldFail: boolean, subscribeToResult: boolean) { - const job = await this.myQueue.add({ intervalMs, shouldFail }, { retries: 0 }); + async startTask(input: TaskConfigInput) { + const { intervalMs, shouldFail, subscribeToResult, retries } = input; + const job = await this.myQueue.add({ intervalMs, shouldFail }, { retries }); if (subscribeToResult) { return job.updates().pipe( map(update => { @@ -60,7 +68,7 @@ export class JobQueueTestResolver { @Mutation() startTask(@Args() args: any) { - return this.service.startTask(args.intervalMs, args.shouldFail, args.subscribeToResult); + return this.service.startTask(args.input); } } @@ -73,8 +81,14 @@ export class JobQueueTestResolver { adminApiExtensions: { resolvers: [JobQueueTestResolver], schema: gql` + input TaskConfigInput { + intervalMs: Int! + shouldFail: Boolean! + retries: Int + subscribeToResult: Boolean + } extend type Mutation { - startTask(intervalMs: Int, shouldFail: Boolean!, subscribeToResult: Boolean!): JSON! + startTask(input: TaskConfigInput!): JSON! } `, },