Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: improve Readable#from perf #50359

Merged
merged 1 commit into from
Oct 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 113 additions & 18 deletions lib/internal/streams/from.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ function from(Readable, iterable, opts) {
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
}


const readable = new Readable({
objectMode: true,
highWaterMark: 1,
Expand All @@ -46,11 +47,19 @@ function from(Readable, iterable, opts) {
// Flag to protect against _read
// being called before last iteration completion.
let reading = false;
let isAsyncValues = false;

readable._read = function() {
if (!reading) {
reading = true;
next();

if (isAsync) {
nextAsync();
} else if (isAsyncValues) {
nextSyncWithAsyncValues();
} else {
nextSyncWithSyncValues();
}
}
};

Expand Down Expand Up @@ -78,29 +87,115 @@ function from(Readable, iterable, opts) {
}
}

async function next() {
// There are a lot of duplication here, it's done on purpose for performance
// reasons - avoid await when not needed.

function nextSyncWithSyncValues() {
for (;;) {
try {
const { value, done } = iterator.next();

if (done) {
readable.push(null);
return;
}

if (value &&
typeof value.then === 'function') {
return changeToAsyncValues(value);
}

if (value === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
}

if (readable.push(value)) {
continue;
}

reading = false;
} catch (err) {
readable.destroy(err);
}
break;
}
}

async function changeToAsyncValues(value) {
isAsyncValues = true;

try {
const res = await value;

if (res === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
}

if (readable.push(res)) {
nextSyncWithAsyncValues();
return;
}

reading = false;
} catch (err) {
readable.destroy(err);
}
}

async function nextSyncWithAsyncValues() {
for (;;) {
try {
const { value, done } = isAsync ?
await iterator.next() :
iterator.next();
const { value, done } = iterator.next();

if (done) {
readable.push(null);
} else {
const res = (value &&
typeof value.then === 'function') ?
await value :
value;
if (res === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
} else if (readable.push(res)) {
continue;
} else {
reading = false;
}
return;
}

const res = (value &&
typeof value.then === 'function') ?
await value :
value;

if (res === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
}

if (readable.push(res)) {
continue;
}

reading = false;
} catch (err) {
readable.destroy(err);
}
break;
}
}

async function nextAsync() {
for (;;) {
try {
const { value, done } = await iterator.next();

if (done) {
readable.push(null);
return;
}

if (value === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
}

if (readable.push(value)) {
continue;
}

reading = false;
} catch (err) {
readable.destroy(err);
}
Expand Down