Skip to content

Commit

Permalink
fix(core): Gracefully handle errors in JobQueue
Browse files Browse the repository at this point in the history
Closes #635
  • Loading branch information
michaelbromley committed Jan 14, 2021
1 parent 72ed50c commit 6d1b8c6
Showing 1 changed file with 37 additions and 16 deletions.
53 changes: 37 additions & 16 deletions packages/core/src/job-queue/job-queue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { JobState } from '@vendure/common/lib/generated-types';
import { Subject, Subscription } from 'rxjs';
import { throttleTime } from 'rxjs/operators';

import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy';
import { Logger } from '../config/logger/vendure-logger';

import { Job } from './job';
import { CreateQueueOptions, JobConfig, JobData } from './types';
Expand All @@ -22,6 +25,8 @@ export class JobQueue<Data extends JobData<Data> = {}> {
private timer: any;
private fooId: number;
private running = false;
private errorNotifier$ = new Subject<[string, string]>();
private subscription: Subscription;

get concurrency(): number {
return this.options.concurrency;
Expand All @@ -39,34 +44,49 @@ export class JobQueue<Data extends JobData<Data> = {}> {
private options: CreateQueueOptions<Data>,
private jobQueueStrategy: JobQueueStrategy,
private pollInterval: number,
) {}
) {
this.subscription = this.errorNotifier$.pipe(throttleTime(3000)).subscribe(([message, stack]) => {
Logger.error(message);
Logger.debug(stack);
});
}

/** @internal */
start() {
if (this.running) {
return;
}
Logger.debug(`Starting JobQueue "${this.options.name}"`);
this.running = true;
const concurrency = this.options.concurrency;
const runNextJobs = async () => {
const runningJobsCount = this.activeJobs.length;
for (let i = runningJobsCount; i < concurrency; i++) {
const nextJob: Job<Data> | undefined = await this.jobQueueStrategy.next(this.options.name);
if (nextJob) {
this.activeJobs.push(nextJob);
await this.jobQueueStrategy.update(nextJob);
nextJob.on('complete', job => this.onFailOrComplete(job));
nextJob.on('progress', job => this.jobQueueStrategy.update(job));
nextJob.on('fail', job => this.onFailOrComplete(job));
try {
const returnVal = this.options.process(nextJob);
if (returnVal instanceof Promise) {
returnVal.catch(err => nextJob.fail(err));
try {
const runningJobsCount = this.activeJobs.length;
for (let i = runningJobsCount; i < concurrency; i++) {
const nextJob: Job<Data> | undefined = await this.jobQueueStrategy.next(
this.options.name,
);
if (nextJob) {
this.activeJobs.push(nextJob);
await this.jobQueueStrategy.update(nextJob);
nextJob.on('complete', job => this.onFailOrComplete(job));
nextJob.on('progress', job => this.jobQueueStrategy.update(job));
nextJob.on('fail', job => this.onFailOrComplete(job));
try {
const returnVal = this.options.process(nextJob);
if (returnVal instanceof Promise) {
returnVal.catch(err => nextJob.fail(err));
}
} catch (err) {
nextJob.fail(err);
}
} catch (err) {
nextJob.fail(err);
}
}
} catch (e) {
this.errorNotifier$.next([
`Job queue "${this.options.name}" encountered an error (set log level to Debug for trace): ${e.message}`,
e.stack,
]);
}
if (this.running) {
this.timer = setTimeout(runNextJobs, this.pollInterval);
Expand All @@ -78,6 +98,7 @@ export class JobQueue<Data extends JobData<Data> = {}> {

/** @internal */
pause() {
Logger.debug(`Pausing JobQueue "${this.options.name}"`);
this.running = false;
clearTimeout(this.timer);
}
Expand Down

0 comments on commit 6d1b8c6

Please sign in to comment.