Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add idle event #109

Merged
merged 5 commits into from
May 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,32 @@ queue.add(() => Promise.resolve());
queue.add(() => Promise.resolve());
queue.add(() => delay(500));
```
#### idle

Emitted every time the queue becomes empty and all promises have completed; `queue.size === 0 && queue.pending === 0`.

```js
const delay = require('delay');
const {default: PQueue} = require('p-queue');

const queue = new PQueue();

queue.on('idle', () => {
console.log(`Queue is idle. Size: ${queue.size} Pending: ${queue.pending}`);
});

const job1 = queue.add(() => delay(2000));
const job2 = queue.add(() => delay(500));

await job1;
await job2;
// => 'Queue is idle. Size: 0 Pending: 0'

await queue.add(() => delay(600));
// => 'Queue is idle. Size: 0 Pending: 0'
```

The `idle` event is emitted every time the queue reaches an idle state. On the other hand, the promise the `onIdle()` function returns resolves once the queue becomes idle instead of every time the queue is idle.

## Advanced example

Expand Down
3 changes: 2 additions & 1 deletion source/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const timeoutError = new TimeoutError();
/**
Promise queue with concurrency control.
*/
export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions> extends EventEmitter<'active'> {
export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = DefaultAddOptions> extends EventEmitter<'active' | 'idle'> {
private readonly _carryoverConcurrencyCount: boolean;

private readonly _isIntervalIgnored: boolean;
Expand Down Expand Up @@ -108,6 +108,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
if (this._pendingCount === 0) {
this._resolveIdle();
this._resolveIdle = empty;
this.emit('idle');
}
}

Expand Down
41 changes: 40 additions & 1 deletion test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ test('.add() - concurrency: 1', async t => {
const end = timeSpan();
const queue = new PQueue({concurrency: 1});

const mapper = async ([value, ms]: readonly number[]) => queue.add(async () => {
const mapper = async ([value, ms]: readonly number[]): Promise<number> => queue.add(async () => {
await delay(ms);
return value;
});
Expand Down Expand Up @@ -814,6 +814,45 @@ test('should emit active event per item', async t => {
t.is(eventCount, items.length);
});

test('should emit idle event when idle', async t => {
const queue = new PQueue({concurrency: 1});

let timesCalled = 0;
queue.on('idle', () => {
timesCalled++;
});

const job1 = queue.add(async () => delay(100));
const job2 = queue.add(async () => delay(100));

t.is(queue.pending, 1);
t.is(queue.size, 1);
t.is(timesCalled, 0);

await job1;

t.is(queue.pending, 1);
t.is(queue.size, 0);
t.is(timesCalled, 0);

await job2;

t.is(queue.pending, 0);
t.is(queue.size, 0);
t.is(timesCalled, 1);

const job3 = queue.add(async () => delay(100));

t.is(queue.pending, 1);
t.is(queue.size, 0);
t.is(timesCalled, 1);

await job3;
t.is(queue.pending, 0);
t.is(queue.size, 0);
t.is(timesCalled, 2);
});

test('should verify timeout overrides passed to add', async t => {
const queue = new PQueue({timeout: 200, throwOnTimeout: true});

Expand Down