Skip to content

Commit

Permalink
Add empty event
Browse files Browse the repository at this point in the history
Signed-off-by: Richie Bendall <[email protected]>
  • Loading branch information
Richienb committed Jul 29, 2022
1 parent 345e553 commit 5362aa7
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 54 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"delay": "^5.0.0",
"in-range": "^3.0.0",
"nyc": "^15.1.0",
"p-defer": "^4.0.0",
"random-int": "^3.0.0",
"time-span": "^5.0.0",
"ts-node": "^10.4.0",
Expand Down
8 changes: 8 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,18 @@ queue.on('error', error => {
queue.add(() => Promise.reject(new Error('error')));
```

#### empty

Emitted every time the queue becomes empty.

Useful if you for example add additional items at a later time.

#### idle

Emitted every time the queue becomes empty and all promises have completed; `queue.size === 0 && queue.pending === 0`.

The difference with `empty` is that `idle` guarantees that all work from the queue has finished. `empty` merely signals that the queue is empty, but it could mean that some promises haven't completed yet.

```js
import delay from 'delay';
import PQueue from 'p-queue';
Expand Down
86 changes: 32 additions & 54 deletions source/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,23 @@ import {Queue, RunFunction} from './queue.js';
import PriorityQueue from './priority-queue.js';
import {QueueAddOptions, Options, TaskOptions} from './options.js';

type ResolveFunction<T = void> = (value?: T | PromiseLike<T>) => void;

type Task<TaskResultType> =
| ((options: TaskOptions) => PromiseLike<TaskResultType>)
| ((options: TaskOptions) => TaskResultType);

// eslint-disable-next-line @typescript-eslint/no-empty-function
const empty = (): void => {};

const timeoutError = new TimeoutError();

/**
The error thrown by `queue.add()` when a job is aborted before it is run. See `signal`.
*/
export class AbortError extends Error {}

type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error';

/**
Promise queue with concurrency control.
*/
export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = QueueAddOptions> extends EventEmitter<'active' | 'idle' | 'add' | 'next' | 'completed' | 'error'> {
export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = QueueAddOptions> extends EventEmitter<EventName> {
readonly #carryoverConcurrencyCount: boolean;

readonly #isIntervalIgnored: boolean;
Expand Down Expand Up @@ -51,13 +48,14 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT

#isPaused: boolean;

#resolveEmpty: ResolveFunction = empty;

#resolveIdle: ResolveFunction = empty;
readonly #throwOnTimeout: boolean;

#timeout?: number;
/**
Per-operation timeout in milliseconds. Operations fulfill once `timeout` elapses if they haven't already.
readonly #throwOnTimeout: boolean;
Applies to each future operation.
*/
timeout?: number;

constructor(options?: Options<QueueType, EnqueueOptionsType>) {
super();
Expand Down Expand Up @@ -88,7 +86,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
this.#queue = new options.queueClass!();
this.#queueClass = options.queueClass!;
this.concurrency = options.concurrency!;
this.#timeout = options.timeout;
this.timeout = options.timeout;
this.#throwOnTimeout = options.throwOnTimeout === true;
this.#isPaused = options.autoStart === false;
}
Expand All @@ -107,13 +105,10 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
this.emit('next');
}

#resolvePromises(): void {
this.#resolveEmpty();
this.#resolveEmpty = empty;
#emitEvents(): void {
this.emit('empty');

if (this.#pendingCount === 0) {
this.#resolveIdle();
this.#resolveIdle = empty;
this.emit('idle');
}
}
Expand All @@ -124,7 +119,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
this.#timeoutId = undefined;
}

#isIntervalPaused(): boolean {
get #isIntervalPaused(): boolean {
const now = Date.now();

if (this.#intervalId === undefined) {
Expand Down Expand Up @@ -161,13 +156,13 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT

this.#intervalId = undefined;

this.#resolvePromises();
this.#emitEvents();

return false;
}

if (!this.#isPaused) {
const canInitializeInterval = !this.#isIntervalPaused();
const canInitializeInterval = !this.#isIntervalPaused;
if (this.#doesIntervalAllowAnother && this.#doesConcurrentAllowAnother) {
const job = this.#queue.dequeue();
if (!job) {
Expand Down Expand Up @@ -251,9 +246,9 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
return;
}

const operation = (this.#timeout === undefined && options.timeout === undefined) ? fn({signal: options.signal}) : pTimeout(
const operation = (this.timeout === undefined && options.timeout === undefined) ? fn({signal: options.signal}) : pTimeout(
Promise.resolve(fn({signal: options.signal})),
(options.timeout === undefined ? this.#timeout : options.timeout)!,
(options.timeout === undefined ? this.timeout : options.timeout)!,
() => {
if (options.throwOnTimeout === undefined ? this.#throwOnTimeout : options.throwOnTimeout) {
reject(timeoutError);
Expand Down Expand Up @@ -331,13 +326,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
return;
}

return new Promise<void>(resolve => {
const existingResolve = this.#resolveEmpty;
this.#resolveEmpty = () => {
existingResolve();
resolve();
};
});
await this.#onEvent('empty');
}

/**
Expand All @@ -353,16 +342,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
return;
}

return new Promise<void>(resolve => {
const listener = () => {
if (this.#queue.size < limit) {
this.removeListener('next', listener);
resolve();
}
};

this.on('next', listener);
});
await this.#onEvent('next', () => this.#queue.size < limit);
}

/**
Expand All @@ -376,12 +356,21 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
return;
}

return new Promise<void>(resolve => {
const existingResolve = this.#resolveIdle;
this.#resolveIdle = () => {
existingResolve();
await this.#onEvent('idle');
}

async #onEvent(event: EventName, filter?: () => boolean): Promise<void> {
return new Promise(resolve => {
const listener = () => {
if (filter && !filter()) {
return;
}

this.off(event, listener);
resolve();
};

this.on(event, listener);
});
}

Expand Down Expand Up @@ -415,17 +404,6 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
get isPaused(): boolean {
return this.#isPaused;
}

get timeout(): number | undefined {
return this.#timeout;
}

/**
Set the timeout for future operations.
*/
set timeout(milliseconds: number | undefined) {
this.#timeout = milliseconds;
}
}

// TODO: Rename `DefaultAddOptions` to `QueueAddOptions` in next major version
Expand Down
33 changes: 33 additions & 0 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import delay from 'delay';
import inRange from 'in-range';
import timeSpan from 'time-span';
import randomInt from 'random-int';
import pDefer from 'p-defer';
import PQueue, {AbortError} from '../source/index.js';

const fixture = Symbol('fixture');
Expand Down Expand Up @@ -892,6 +893,38 @@ test('should emit idle event when idle', async t => {
t.is(timesCalled, 2);
});

test('should emit empty event when empty', async t => {
const queue = new PQueue({concurrency: 1});

let timesCalled = 0;
queue.on('empty', () => {
timesCalled++;
});

const {resolve: resolveJob1, promise: job1Promise} = pDefer();
const {resolve: resolveJob2, promise: job2Promise} = pDefer();

const job1 = queue.add(async () => job1Promise);
const job2 = queue.add(async () => job2Promise);
t.is(queue.size, 1);
t.is(queue.pending, 1);
t.is(timesCalled, 0);

resolveJob1();
await job1;

t.is(queue.size, 0);
t.is(queue.pending, 1);
t.is(timesCalled, 0);

resolveJob2();
await job2;

t.is(queue.size, 0);
t.is(queue.pending, 0);
t.is(timesCalled, 1);
});

test('should emit add event when adding task', async t => {
const queue = new PQueue({concurrency: 1});

Expand Down

1 comment on commit 5362aa7

@Richienb
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit fixes fragmentation where an onEmpty function exists but the corresponding event does not.

Please sign in to comment.