Skip to content

Commit

Permalink
events: add EventEmitter.on to async iterate over events
Browse files Browse the repository at this point in the history
Fixes: #27847
PR-URL: #27994
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Gus Caplan <[email protected]>
Reviewed-By: Anna Henningsen <[email protected]>
Reviewed-By: Rich Trott <[email protected]>
  • Loading branch information
mcollina authored and targos committed Jan 14, 2020
1 parent 1d45ba3 commit b6d09e8
Show file tree
Hide file tree
Showing 3 changed files with 362 additions and 0 deletions.
35 changes: 35 additions & 0 deletions doc/api/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,41 @@ Value: `Symbol.for('nodejs.rejection')`

See how to write a custom [rejection handler][rejection].

## events.on(emitter, eventName)
<!-- YAML
added: REPLACEME
-->

* `emitter` {EventEmitter}
* `eventName` {string|symbol} The name of the event being listened for
* Returns: {AsyncIterator} that iterates `eventName` events emitted by the `emitter`

```js
const { on, EventEmitter } = require('events');

(async () => {
const ee = new EventEmitter();

// Emit later on
process.nextTick(() => {
ee.emit('foo', 'bar');
ee.emit('foo', 42);
});

for await (const event of on(ee, 'foo')) {
// The execution of this inner block is synchronous and it
// processes one event at a time (even with await). Do not use
// if concurrent execution is required.
console.log(event); // prints ['bar'] [42]
}
})();
```

Returns an `AsyncIterator` that iterates `eventName` events. It will throw
if the `EventEmitter` emits `'error'`. It removes all listeners when
exiting the loop. The `value` returned by each iteration is an array
composed of the emitted event arguments.

[WHATWG-EventTarget]: https://dom.spec.whatwg.org/#interface-eventtarget
[`--trace-warnings`]: cli.html#cli_trace_warnings
[`EventEmitter.defaultMaxListeners`]: #events_eventemitter_defaultmaxlisteners
Expand Down
104 changes: 104 additions & 0 deletions lib/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ const {
ObjectCreate,
ObjectDefineProperty,
ObjectGetPrototypeOf,
ObjectSetPrototypeOf,
ObjectKeys,
Promise,
PromiseReject,
PromiseResolve,
ReflectApply,
ReflectOwnKeys,
Symbol,
SymbolFor,
SymbolAsyncIterator
} = primordials;
const kRejection = SymbolFor('nodejs.rejection');

Expand Down Expand Up @@ -63,6 +67,7 @@ function EventEmitter(opts) {
}
module.exports = EventEmitter;
module.exports.once = once;
module.exports.on = on;

// Backwards-compat with node 0.10.x
EventEmitter.EventEmitter = EventEmitter;
Expand Down Expand Up @@ -658,3 +663,102 @@ function once(emitter, name) {
emitter.once(name, eventListener);
});
}

const AsyncIteratorPrototype = ObjectGetPrototypeOf(
ObjectGetPrototypeOf(async function* () {}).prototype);

function createIterResult(value, done) {
return { value, done };
}

function on(emitter, event) {
const unconsumedEvents = [];
const unconsumedPromises = [];
let error = null;
let finished = false;

const iterator = ObjectSetPrototypeOf({
next() {
// First, we consume all unread events
const value = unconsumedEvents.shift();
if (value) {
return PromiseResolve(createIterResult(value, false));
}

// Then we error, if an error happened
// This happens one time if at all, because after 'error'
// we stop listening
if (error) {
const p = PromiseReject(error);
// Only the first element errors
error = null;
return p;
}

// If the iterator is finished, resolve to done
if (finished) {
return PromiseResolve(createIterResult(undefined, true));
}

// Wait until an event happens
return new Promise(function(resolve, reject) {
unconsumedPromises.push({ resolve, reject });
});
},

return() {
emitter.removeListener(event, eventHandler);
emitter.removeListener('error', errorHandler);
finished = true;

for (const promise of unconsumedPromises) {
promise.resolve(createIterResult(undefined, true));
}

return PromiseResolve(createIterResult(undefined, true));
},

throw(err) {
if (!err || !(err instanceof Error)) {
throw new ERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator',
'Error', err);
}
error = err;
emitter.removeListener(event, eventHandler);
emitter.removeListener('error', errorHandler);
},

[SymbolAsyncIterator]() {
return this;
}
}, AsyncIteratorPrototype);

emitter.on(event, eventHandler);
emitter.on('error', errorHandler);

return iterator;

function eventHandler(...args) {
const promise = unconsumedPromises.shift();
if (promise) {
promise.resolve(createIterResult(args, false));
} else {
unconsumedEvents.push(args);
}
}

function errorHandler(err) {
finished = true;

const toError = unconsumedPromises.shift();

if (toError) {
toError.reject(err);
} else {
// The next time we call next()
error = err;
}

iterator.return();
}
}
223 changes: 223 additions & 0 deletions test/parallel/test-event-on-async-iterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { on, EventEmitter } = require('events');

async function basic() {
const ee = new EventEmitter();
process.nextTick(() => {
ee.emit('foo', 'bar');
// 'bar' is a spurious event, we are testing
// that it does not show up in the iterable
ee.emit('bar', 24);
ee.emit('foo', 42);
});

const iterable = on(ee, 'foo');

const expected = [['bar'], [42]];

for await (const event of iterable) {
const current = expected.shift();

assert.deepStrictEqual(current, event);

if (expected.length === 0) {
break;
}
}
assert.strictEqual(ee.listenerCount('foo'), 0);
assert.strictEqual(ee.listenerCount('error'), 0);
}

async function error() {
const ee = new EventEmitter();
const _err = new Error('kaboom');
process.nextTick(() => {
ee.emit('error', _err);
});

const iterable = on(ee, 'foo');
let looped = false;
let thrown = false;

try {
// eslint-disable-next-line no-unused-vars
for await (const event of iterable) {
looped = true;
}
} catch (err) {
thrown = true;
assert.strictEqual(err, _err);
}
assert.strictEqual(thrown, true);
assert.strictEqual(looped, false);
}

async function errorDelayed() {
const ee = new EventEmitter();
const _err = new Error('kaboom');
process.nextTick(() => {
ee.emit('foo', 42);
ee.emit('error', _err);
});

const iterable = on(ee, 'foo');
const expected = [[42]];
let thrown = false;

try {
for await (const event of iterable) {
const current = expected.shift();
assert.deepStrictEqual(current, event);
}
} catch (err) {
thrown = true;
assert.strictEqual(err, _err);
}
assert.strictEqual(thrown, true);
assert.strictEqual(ee.listenerCount('foo'), 0);
assert.strictEqual(ee.listenerCount('error'), 0);
}

async function throwInLoop() {
const ee = new EventEmitter();
const _err = new Error('kaboom');

process.nextTick(() => {
ee.emit('foo', 42);
});

try {
for await (const event of on(ee, 'foo')) {
assert.deepStrictEqual(event, [42]);
throw _err;
}
} catch (err) {
assert.strictEqual(err, _err);
}

assert.strictEqual(ee.listenerCount('foo'), 0);
assert.strictEqual(ee.listenerCount('error'), 0);
}

async function next() {
const ee = new EventEmitter();
const iterable = on(ee, 'foo');

process.nextTick(function() {
ee.emit('foo', 'bar');
ee.emit('foo', 42);
iterable.return();
});

const results = await Promise.all([
iterable.next(),
iterable.next(),
iterable.next()
]);

assert.deepStrictEqual(results, [{
value: ['bar'],
done: false
}, {
value: [42],
done: false
}, {
value: undefined,
done: true
}]);

assert.deepStrictEqual(await iterable.next(), {
value: undefined,
done: true
});
}

async function nextError() {
const ee = new EventEmitter();
const iterable = on(ee, 'foo');
const _err = new Error('kaboom');
process.nextTick(function() {
ee.emit('error', _err);
});
const results = await Promise.allSettled([
iterable.next(),
iterable.next(),
iterable.next()
]);
assert.deepStrictEqual(results, [{
status: 'rejected',
reason: _err
}, {
status: 'fulfilled',
value: {
value: undefined,
done: true
}
}, {
status: 'fulfilled',
value: {
value: undefined,
done: true
}
}]);
assert.strictEqual(ee.listeners('error').length, 0);
}

async function iterableThrow() {
const ee = new EventEmitter();
const iterable = on(ee, 'foo');

process.nextTick(() => {
ee.emit('foo', 'bar');
ee.emit('foo', 42); // lost in the queue
iterable.throw(_err);
});

const _err = new Error('kaboom');
let thrown = false;

assert.throws(() => {
// No argument
iterable.throw();
}, {
message: 'The "EventEmitter.AsyncIterator" property must be' +
' an instance of Error. Received undefined',
name: 'TypeError'
});

const expected = [['bar'], [42]];

try {
for await (const event of iterable) {
assert.deepStrictEqual(event, expected.shift());
}
} catch (err) {
thrown = true;
assert.strictEqual(err, _err);
}
assert.strictEqual(thrown, true);
assert.strictEqual(expected.length, 0);
assert.strictEqual(ee.listenerCount('foo'), 0);
assert.strictEqual(ee.listenerCount('error'), 0);
}

async function run() {
const funcs = [
basic,
error,
errorDelayed,
throwInLoop,
next,
nextError,
iterableThrow
];

for (const fn of funcs) {
await fn();
}
}

run().then(common.mustCall());

0 comments on commit b6d09e8

Please sign in to comment.