diff --git a/packages/core/src/job-queue/job-queue.ts b/packages/core/src/job-queue/job-queue.ts index 2e2ac46edc..53b5f46b01 100644 --- a/packages/core/src/job-queue/job-queue.ts +++ b/packages/core/src/job-queue/job-queue.ts @@ -1,6 +1,9 @@ import { JobState } from '@vendure/common/lib/generated-types'; +import { Subject, Subscription } from 'rxjs'; +import { throttleTime } from 'rxjs/operators'; import { JobQueueStrategy } from '../config/job-queue/job-queue-strategy'; +import { Logger } from '../config/logger/vendure-logger'; import { Job } from './job'; import { CreateQueueOptions, JobConfig, JobData } from './types'; @@ -22,6 +25,8 @@ export class JobQueue = {}> { private timer: any; private fooId: number; private running = false; + private errorNotifier$ = new Subject<[string, string]>(); + private subscription: Subscription; get concurrency(): number { return this.options.concurrency; @@ -39,34 +44,49 @@ export class JobQueue = {}> { private options: CreateQueueOptions, private jobQueueStrategy: JobQueueStrategy, private pollInterval: number, - ) {} + ) { + this.subscription = this.errorNotifier$.pipe(throttleTime(3000)).subscribe(([message, stack]) => { + Logger.error(message); + Logger.debug(stack); + }); + } /** @internal */ start() { if (this.running) { return; } + Logger.debug(`Starting JobQueue "${this.options.name}"`); this.running = true; const concurrency = this.options.concurrency; const runNextJobs = async () => { - const runningJobsCount = this.activeJobs.length; - for (let i = runningJobsCount; i < concurrency; i++) { - const nextJob: Job | undefined = await this.jobQueueStrategy.next(this.options.name); - if (nextJob) { - this.activeJobs.push(nextJob); - await this.jobQueueStrategy.update(nextJob); - nextJob.on('complete', job => this.onFailOrComplete(job)); - nextJob.on('progress', job => this.jobQueueStrategy.update(job)); - nextJob.on('fail', job => this.onFailOrComplete(job)); - try { - const returnVal = this.options.process(nextJob); - if (returnVal instanceof Promise) { - returnVal.catch(err => nextJob.fail(err)); + try { + const runningJobsCount = this.activeJobs.length; + for (let i = runningJobsCount; i < concurrency; i++) { + const nextJob: Job | undefined = await this.jobQueueStrategy.next( + this.options.name, + ); + if (nextJob) { + this.activeJobs.push(nextJob); + await this.jobQueueStrategy.update(nextJob); + nextJob.on('complete', job => this.onFailOrComplete(job)); + nextJob.on('progress', job => this.jobQueueStrategy.update(job)); + nextJob.on('fail', job => this.onFailOrComplete(job)); + try { + const returnVal = this.options.process(nextJob); + if (returnVal instanceof Promise) { + returnVal.catch(err => nextJob.fail(err)); + } + } catch (err) { + nextJob.fail(err); } - } catch (err) { - nextJob.fail(err); } } + } catch (e) { + this.errorNotifier$.next([ + `Job queue "${this.options.name}" encountered an error (set log level to Debug for trace): ${e.message}`, + e.stack, + ]); } if (this.running) { this.timer = setTimeout(runNextJobs, this.pollInterval); @@ -78,6 +98,7 @@ export class JobQueue = {}> { /** @internal */ pause() { + Logger.debug(`Pausing JobQueue "${this.options.name}"`); this.running = false; clearTimeout(this.timer); }