From 11cf0ed39c8f0ddb4c13f4ab0d1b1a2e40dfa9a2 Mon Sep 17 00:00:00 2001 From: Dobes Vandermeer Date: Tue, 30 Mar 2021 22:33:13 -0700 Subject: [PATCH] Add `error` & `completed` events (#130) Co-authored-by: Sindre Sorhus # Conflicts: # readme.md --- readme.md | 39 ++++++++++++++++++++++++++++++++- source/index.ts | 9 ++++---- test/test.ts | 57 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 100 insertions(+), 5 deletions(-) diff --git a/readme.md b/readme.md index 877e204..624bb43 100644 --- a/readme.md +++ b/readme.md @@ -112,6 +112,8 @@ If `true`, specifies that any [pending](https://developer.mozilla.org/en-US/docs Adds a sync or async task to the queue. Always returns a promise. +Note: If your items can potentially throw an exception, you must handle those errors from the returned Promise or they may be reported as an unhandled Promise rejection and potentially cause your process to exit immediately. + ##### fn Type: `Function` @@ -226,6 +228,41 @@ queue.add(() => Promise.resolve()); queue.add(() => Promise.resolve()); queue.add(() => delay(500)); ``` + +#### completed + +Emitted when an item completes without error. + +```js +const delay = require('delay'); +const {default: PQueue} = require('p-queue'); + +const queue = new PQueue({concurrency: 2}); + +queue.on('completed', result => { + console.log(result); +}); + +queue.add(() => Promise.resolve('hello, world!')); +``` + +#### error + +Emitted if an item throws an error. + +```js +const delay = require('delay'); +const {default: PQueue} = require('p-queue'); + +const queue = new PQueue({concurrency: 2}); + +queue.on('error', error => { + console.error(error); +}); + +queue.add(() => Promise.reject(new Error('error'))); +``` + #### idle Emitted every time the queue becomes empty and all promises have completed; `queue.size === 0 && queue.pending === 0`. @@ -259,7 +296,7 @@ Emitted every time the add method is called and the number of pending or queued #### next -Emitted every time a task is completed and the number of pending or queued tasks is decreased. +Emitted every time a task is completed and the number of pending or queued tasks is decreased. This is emitted regardless of whether the task completed normally or with an error. ```js const delay = require('delay'); diff --git a/source/index.ts b/source/index.ts index 6ecd2ec..966c196 100644 --- a/source/index.ts +++ b/source/index.ts @@ -18,7 +18,7 @@ const timeoutError = new TimeoutError(); /** Promise queue with concurrency control. */ -export default class PQueue = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions> extends EventEmitter<'active' | 'idle' | 'add' | 'next'> { +export default class PQueue = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions> extends EventEmitter<'active' | 'idle' | 'add' | 'next' | 'completed' | 'error'> { private readonly _carryoverConcurrencyCount: boolean; private readonly _isIntervalIgnored: boolean; @@ -252,11 +252,12 @@ export default class PQueue { t.is(timesCalled, 3); }); +test('should emit completed / error events', async t => { + const queue = new PQueue({concurrency: 1}); + + let errorEvents = 0; + let completedEvents = 0; + queue.on('error', () => { + errorEvents++; + }); + queue.on('completed', () => { + completedEvents++; + }); + + const job1 = queue.add(async () => delay(100)); + + t.is(queue.pending, 1); + t.is(queue.size, 0); + t.is(errorEvents, 0); + t.is(completedEvents, 0); + + const job2 = queue.add(async () => { + await delay(1); + throw new Error('failure'); + }); + + t.is(queue.pending, 1); + t.is(queue.size, 1); + t.is(errorEvents, 0); + t.is(completedEvents, 0); + + await job1; + + t.is(queue.pending, 1); + t.is(queue.size, 0); + t.is(errorEvents, 0); + t.is(completedEvents, 1); + + await t.throwsAsync(job2); + + t.is(queue.pending, 0); + t.is(queue.size, 0); + t.is(errorEvents, 1); + t.is(completedEvents, 1); + + const job3 = queue.add(async () => delay(100)); + + t.is(queue.pending, 1); + t.is(queue.size, 0); + t.is(errorEvents, 1); + t.is(completedEvents, 1); + + await job3; + t.is(queue.pending, 0); + t.is(queue.size, 0); + t.is(errorEvents, 1); + t.is(completedEvents, 2); +}); + test('should verify timeout overrides passed to add', async t => { const queue = new PQueue({timeout: 200, throwOnTimeout: true});