Skip to content

Commit

Permalink
Add error & completed events (sindresorhus#130)
Browse files Browse the repository at this point in the history
Co-authored-by: Sindre Sorhus <[email protected]>
  • Loading branch information
2 people authored and Totenfluch committed Jul 18, 2021
1 parent 2ceda3b commit eb0467e
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 9 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "p-queue",
"version": "6.6.2",
"name": "p-aggregate-queue",
"version": "8.0.4",
"description": "Promise queue with concurrency control",
"license": "MIT",
"repository": "sindresorhus/p-queue",
Expand Down
41 changes: 40 additions & 1 deletion readme.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# p-queue

> Promise queue with concurrency control
> Same as p-queue but exposes promise: https://github.com/sindresorhus/p-queue/pull/105
> Loads as a normal module..............
Useful for rate-limiting async (or sync) operations. For example, when interacting with a REST API or when doing CPU/memory intensive tasks.

Expand Down Expand Up @@ -112,6 +114,8 @@ If `true`, specifies that any [pending](https://developer.mozilla.org/en-US/docs

Adds a sync or async task to the queue. Always returns a promise.

Note: If your items can potentially throw an exception, you must handle those errors from the returned Promise or they may be reported as an unhandled Promise rejection and potentially cause your process to exit immediately.

##### fn

Type: `Function`
Expand Down Expand Up @@ -226,6 +230,41 @@ queue.add(() => Promise.resolve());
queue.add(() => Promise.resolve());
queue.add(() => delay(500));
```

#### completed

Emitted when an item completes without error.

```js
import delay from 'delay';
import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 2});

queue.on('completed', result => {
console.log(result);
});

queue.add(() => Promise.resolve('hello, world!'));
```

#### error

Emitted if an item throws an error.

```js
import delay from 'delay';
import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 2});

queue.on('error', error => {
console.error(error);
});

queue.add(() => Promise.reject(new Error('error')));
```

#### idle

Emitted every time the queue becomes empty and all promises have completed; `queue.size === 0 && queue.pending === 0`.
Expand Down Expand Up @@ -259,7 +298,7 @@ Emitted every time the add method is called and the number of pending or queued

#### next

Emitted every time a task is completed and the number of pending or queued tasks is decreased.
Emitted every time a task is completed and the number of pending or queued tasks is decreased. This is emitted regardless of whether the task completed normally or with an error.

```js
const delay = require('delay');
Expand Down
11 changes: 6 additions & 5 deletions source/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const timeoutError = new TimeoutError();
/**
Promise queue with concurrency control.
*/
export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions> extends EventEmitter<'active' | 'idle' | 'add' | 'next'> {
export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions> extends EventEmitter<'active' | 'idle' | 'add' | 'next' | 'completed' | 'error'> {
private readonly _carryoverConcurrencyCount: boolean;

private readonly _isIntervalIgnored: boolean;
Expand Down Expand Up @@ -252,17 +252,18 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
}
);

// TODO: Fix this ignore.
// @ts-expect-error
resolve(await operation);
const result = await operation;
resolve(result!);
this.emit('completed', result);
} catch (error: unknown) {
reject(error);
this.emit('error', error);
}

this._next();
};

this._queue.enqueue(run, options);
this._queue.enqueue(run, options, resolve);
this._tryToStartAnother();
this.emit('add');
});
Expand Down
2 changes: 1 addition & 1 deletion source/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ export interface Queue<Element, Options> {
size: number;
filter: (options: Partial<Options>) => Element[];
dequeue: () => Element | undefined;
enqueue: (run: Element, options?: Partial<Options>) => void;
enqueue: (run: Element, options?: Partial<Options>, resolve?: any) => void;
}
57 changes: 57 additions & 0 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,63 @@ test('should emit next event when completing task', async t => {
t.is(timesCalled, 3);
});

test('should emit completed / error events', async t => {
const queue = new PQueue({concurrency: 1});

let errorEvents = 0;
let completedEvents = 0;
queue.on('error', () => {
errorEvents++;
});
queue.on('completed', () => {
completedEvents++;
});

const job1 = queue.add(async () => delay(100));

t.is(queue.pending, 1);
t.is(queue.size, 0);
t.is(errorEvents, 0);
t.is(completedEvents, 0);

const job2 = queue.add(async () => {
await delay(1);
throw new Error('failure');
});

t.is(queue.pending, 1);
t.is(queue.size, 1);
t.is(errorEvents, 0);
t.is(completedEvents, 0);

await job1;

t.is(queue.pending, 1);
t.is(queue.size, 0);
t.is(errorEvents, 0);
t.is(completedEvents, 1);

await t.throwsAsync(job2);

t.is(queue.pending, 0);
t.is(queue.size, 0);
t.is(errorEvents, 1);
t.is(completedEvents, 1);

const job3 = queue.add(async () => delay(100));

t.is(queue.pending, 1);
t.is(queue.size, 0);
t.is(errorEvents, 1);
t.is(completedEvents, 1);

await job3;
t.is(queue.pending, 0);
t.is(queue.size, 0);
t.is(errorEvents, 1);
t.is(completedEvents, 2);
});

test('should verify timeout overrides passed to add', async t => {
const queue = new PQueue({timeout: 200, throwOnTimeout: true});

Expand Down

0 comments on commit eb0467e

Please sign in to comment.