Skip to content

Commit

Permalink
events: allow use of AbortController with on
Browse files Browse the repository at this point in the history
Signed-off-by: James M Snell <[email protected]>

PR-URL: nodejs#34912
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Denys Otrishko <[email protected]>
  • Loading branch information
jasnell authored and targos committed Apr 26, 2021
1 parent 7f46ec7 commit 7cfb6bd
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 4 deletions.
32 changes: 31 additions & 1 deletion doc/api/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ Value: `Symbol.for('nodejs.rejection')`

See how to write a custom [rejection handler][rejection].

## `events.on(emitter, eventName)`
## `events.on(emitter, eventName[, options])`
<!-- YAML
added:
- v13.6.0
Expand All @@ -1013,6 +1013,9 @@ added:

* `emitter` {EventEmitter}
* `eventName` {string|symbol} The name of the event being listened for
* `options` {Object}
* `signal` {AbortSignal} An {AbortSignal} that can be used to cancel awaiting
events.
* Returns: {AsyncIterator} that iterates `eventName` events emitted by the `emitter`

```js
Expand Down Expand Up @@ -1042,6 +1045,33 @@ if the `EventEmitter` emits `'error'`. It removes all listeners when
exiting the loop. The `value` returned by each iteration is an array
composed of the emitted event arguments.

An {AbortSignal} may be used to cancel waiting on events:

```js
const { on, EventEmitter } = require('events');
const ac = new AbortController();

(async () => {
const ee = new EventEmitter();

// Emit later on
process.nextTick(() => {
ee.emit('foo', 'bar');
ee.emit('foo', 42);
});

for await (const event of on(ee, 'foo', { signal: ac.signal })) {
// The execution of this inner block is synchronous and it
// processes one event at a time (even with await). Do not use
// if concurrent execution is required.
console.log(event); // prints ['bar'] [42]
}
// Unreachable here
})();

process.nextTick(() => ac.abort());
```

## `EventTarget` and `Event` API
<!-- YAML
added: v14.5.0
Expand Down
28 changes: 27 additions & 1 deletion lib/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,13 @@ function eventTargetAgnosticAddListener(emitter, name, listener, flags) {
}
}

function on(emitter, event) {
function on(emitter, event, options) {
const { signal } = { ...options };
validateAbortSignal(signal, 'options.signal');
if (signal && signal.aborted) {
throw lazyDOMException('The operation was aborted', 'AbortError');
}

const unconsumedEvents = [];
const unconsumedPromises = [];
let error = null;
Expand Down Expand Up @@ -769,6 +775,15 @@ function on(emitter, event) {
return() {
eventTargetAgnosticRemoveListener(emitter, event, eventHandler);
eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);

if (signal) {
eventTargetAgnosticRemoveListener(
signal,
'abort',
abortListener,
{ once: true });
}

finished = true;

for (const promise of unconsumedPromises) {
Expand Down Expand Up @@ -798,9 +813,20 @@ function on(emitter, event) {
addErrorHandlerIfEventEmitter(emitter, errorHandler);
}

if (signal) {
eventTargetAgnosticAddListener(
signal,
'abort',
abortListener,
{ once: true });
}

return iterator;

function abortListener() {
errorHandler(lazyDOMException('The operation was aborted', 'AbortError'));
}

function eventHandler(...args) {
const promise = unconsumedPromises.shift();
if (promise) {
Expand Down
121 changes: 119 additions & 2 deletions test/parallel/test-event-on-async-iterator.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Flags: --expose-internals
// Flags: --expose-internals --no-warnings --experimental-abortcontroller
'use strict';

const common = require('../common');
Expand Down Expand Up @@ -248,6 +248,117 @@ async function nodeEventTarget() {
clearInterval(interval);
}

async function abortableOnBefore() {
const ee = new EventEmitter();
const ac = new AbortController();
ac.abort();
[1, {}, null, false, 'hi'].forEach((signal) => {
assert.throws(() => on(ee, 'foo', { signal }), {
code: 'ERR_INVALID_ARG_TYPE'
});
});
assert.throws(() => on(ee, 'foo', { signal: ac.signal }), {
name: 'AbortError'
});
}

async function eventTargetAbortableOnBefore() {
const et = new EventTarget();
const ac = new AbortController();
ac.abort();
[1, {}, null, false, 'hi'].forEach((signal) => {
assert.throws(() => on(et, 'foo', { signal }), {
code: 'ERR_INVALID_ARG_TYPE'
});
});
assert.throws(() => on(et, 'foo', { signal: ac.signal }), {
name: 'AbortError'
});
}

async function abortableOnAfter() {
const ee = new EventEmitter();
const ac = new AbortController();

const i = setInterval(() => ee.emit('foo', 'foo'), 10);

async function foo() {
for await (const f of on(ee, 'foo', { signal: ac.signal })) {
assert.strictEqual(f, 'foo');
}
}

foo().catch(common.mustCall((error) => {
assert.strictEqual(error.name, 'AbortError');
})).finally(() => {
clearInterval(i);
});

process.nextTick(() => ac.abort());
}

async function eventTargetAbortableOnAfter() {
const et = new EventTarget();
const ac = new AbortController();

const i = setInterval(() => et.dispatchEvent(new Event('foo')), 10);

async function foo() {
for await (const f of on(et, 'foo', { signal: ac.signal })) {
assert(f);
}
}

foo().catch(common.mustCall((error) => {
assert.strictEqual(error.name, 'AbortError');
})).finally(() => {
clearInterval(i);
});

process.nextTick(() => ac.abort());
}

async function eventTargetAbortableOnAfter2() {
const et = new EventTarget();
const ac = new AbortController();

const i = setInterval(() => et.dispatchEvent(new Event('foo')), 10);

async function foo() {
for await (const f of on(et, 'foo', { signal: ac.signal })) {
assert(f);
// Cancel after a single event has been triggered.
ac.abort();
}
}

foo().catch(common.mustCall((error) => {
assert.strictEqual(error.name, 'AbortError');
})).finally(() => {
clearInterval(i);
});
}

async function abortableOnAfterDone() {
const ee = new EventEmitter();
const ac = new AbortController();

const i = setInterval(() => ee.emit('foo', 'foo'), 1);
let count = 0;

async function foo() {
for await (const f of on(ee, 'foo', { signal: ac.signal })) {
assert.strictEqual(f[0], 'foo');
if (++count === 5)
break;
}
ac.abort(); // No error will occur
}

foo().finally(() => {
clearInterval(i);
});
}

async function run() {
const funcs = [
Expand All @@ -260,7 +371,13 @@ async function run() {
iterableThrow,
eventTarget,
errorListenerCount,
nodeEventTarget
nodeEventTarget,
abortableOnBefore,
abortableOnAfter,
eventTargetAbortableOnBefore,
eventTargetAbortableOnAfter,
eventTargetAbortableOnAfter2,
abortableOnAfterDone
];

for (const fn of funcs) {
Expand Down

0 comments on commit 7cfb6bd

Please sign in to comment.