Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streams: refactor ReadableStream asyncIterator creation and a few fixes #23042

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const { emitExperimentalWarning } = require('internal/util');

// Lazy loaded to improve the startup performance.
let StringDecoder;
let ReadableAsyncIterator;
let createReadableStreamAsyncIterator;

util.inherits(Readable, Stream);

Expand Down Expand Up @@ -990,9 +990,11 @@ Readable.prototype.wrap = function(stream) {

Readable.prototype[Symbol.asyncIterator] = function() {
emitExperimentalWarning('Readable[Symbol.asyncIterator]');
if (ReadableAsyncIterator === undefined)
ReadableAsyncIterator = require('internal/streams/async_iterator');
return new ReadableAsyncIterator(this);
if (createReadableStreamAsyncIterator === undefined) {
createReadableStreamAsyncIterator =
require('internal/streams/async_iterator');
}
return createReadableStreamAsyncIterator(this);
};

Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {
Expand Down
94 changes: 50 additions & 44 deletions lib/internal/streams/async_iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@ const kLastPromise = Symbol('lastPromise');
const kHandlePromise = Symbol('handlePromise');
const kStream = Symbol('stream');

const AsyncIteratorRecord = class AsyncIteratorRecord {
constructor(value, done) {
this.done = done;
this.value = value;
}
};
function createIterResult(value, done) {
return { value, done };
}

function readAndResolve(iter) {
const resolve = iter[kLastResolve];
Expand All @@ -26,7 +23,7 @@ function readAndResolve(iter) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
resolve(new AsyncIteratorRecord(data, false));
resolve(createIterResult(data, false));
}
}
}
Expand All @@ -43,7 +40,7 @@ function onEnd(iter) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
resolve(new AsyncIteratorRecord(null, true));
resolve(createIterResult(null, true));
}
iter[kEnded] = true;
}
Expand All @@ -69,39 +66,13 @@ function wrapForNext(lastPromise, iter) {
};
}

const ReadableAsyncIterator = class ReadableAsyncIterator {
constructor(stream) {
this[kStream] = stream;
this[kLastResolve] = null;
this[kLastReject] = null;
this[kError] = null;
this[kEnded] = false;
this[kLastPromise] = null;

stream.on('readable', onReadable.bind(null, this));
stream.on('end', onEnd.bind(null, this));
stream.on('error', onError.bind(null, this));

// the function passed to new Promise
// is cached so we avoid allocating a new
// closure at every run
this[kHandlePromise] = (resolve, reject) => {
const data = this[kStream].read();
if (data) {
this[kLastPromise] = null;
this[kLastResolve] = null;
this[kLastReject] = null;
resolve(new AsyncIteratorRecord(data, false));
} else {
this[kLastResolve] = resolve;
this[kLastReject] = reject;
}
};
}
const AsyncIteratorPrototype = Object.getPrototypeOf(
Object.getPrototypeOf(async function* () {}).prototype);

const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
get stream() {
return this[kStream];
}
},

next() {
// if we have detected an error in the meanwhile
Expand All @@ -112,7 +83,7 @@ const ReadableAsyncIterator = class ReadableAsyncIterator {
}

if (this[kEnded]) {
return Promise.resolve(new AsyncIteratorRecord(null, true));
return Promise.resolve(createIterResult(null, true));
}

// if we have multiple next() calls
Expand All @@ -129,7 +100,7 @@ const ReadableAsyncIterator = class ReadableAsyncIterator {
// without triggering the next() queue
const data = this[kStream].read();
if (data !== null) {
return Promise.resolve(new AsyncIteratorRecord(data, false));
return Promise.resolve(createIterResult(data, false));
}

promise = new Promise(this[kHandlePromise]);
Expand All @@ -138,7 +109,7 @@ const ReadableAsyncIterator = class ReadableAsyncIterator {
this[kLastPromise] = promise;

return promise;
}
},

return() {
// destroy(err, cb) is a private API
Expand All @@ -150,10 +121,45 @@ const ReadableAsyncIterator = class ReadableAsyncIterator {
reject(err);
return;
}
resolve(new AsyncIteratorRecord(null, true));
resolve(createIterResult(null, true));
});
});
}
},
}, AsyncIteratorPrototype);

const createReadableStreamAsyncIterator = (stream) => {
const iterator = Object.create(ReadableStreamAsyncIteratorPrototype, {
[kStream]: { value: stream, writable: true },
[kLastResolve]: { value: null, writable: true },
[kLastReject]: { value: null, writable: true },
[kError]: { value: null, writable: true },
[kEnded]: { value: false, writable: true },
[kLastPromise]: { value: null, writable: true },
// the function passed to new Promise
// is cached so we avoid allocating a new
// closure at every run
[kHandlePromise]: {
value: (resolve, reject) => {
const data = iterator[kStream].read();
if (data) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
resolve(createIterResult(data, false));
} else {
iterator[kLastResolve] = resolve;
iterator[kLastReject] = reject;
}
},
writable: true,
},
});

stream.on('readable', onReadable.bind(null, iterator));
stream.on('end', onEnd.bind(null, iterator));
stream.on('error', onError.bind(null, iterator));

return iterator;
};

module.exports = ReadableAsyncIterator;
module.exports = createReadableStreamAsyncIterator;
22 changes: 22 additions & 0 deletions test/parallel/test-stream-readable-async-iterators.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,28 @@ const { Readable } = require('stream');
const assert = require('assert');

async function tests() {
{
const AsyncIteratorPrototype = Object.getPrototypeOf(
Object.getPrototypeOf(async function* () {}).prototype);
const rs = new Readable({});
assert.strictEqual(
Object.getPrototypeOf(Object.getPrototypeOf(rs[Symbol.asyncIterator]())),
AsyncIteratorPrototype);
}

await (async function() {
const readable = new Readable({ objectMode: true, read() {} });
readable.push(0);
readable.push(1);
readable.push(null);

const iter = readable[Symbol.asyncIterator]();
assert.strictEqual((await iter.next()).value, 0);
for await (const d of iter) {
assert.strictEqual(d, 1);
}
})();

await (async function() {
console.log('read without for..await');
const max = 5;
Expand Down