Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: unify stream utils #39294

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ const {
prepareError,
} = require('_http_common');
const { OutgoingMessage } = require('_http_outgoing');
const { kDestroy } = require('internal/streams/destroy');
const Agent = require('_http_agent');
const { Buffer } = require('buffer');
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
Expand Down Expand Up @@ -610,7 +609,6 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
req.res = res;
res.req = req;
res[kDestroy] = null;

// Add our listener first, so that we guarantee socket cleanup
res.on('end', responseOnEnd);
Expand Down
6 changes: 0 additions & 6 deletions lib/_http_incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ const {
} = primordials;

const { Readable, finished } = require('stream');
const { kDestroy } = require('internal/streams/destroy');

const kHeaders = Symbol('kHeaders');
const kHeadersCount = Symbol('kHeadersCount');
Expand Down Expand Up @@ -199,11 +198,6 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) {
}
};

IncomingMessage.prototype[kDestroy] = function(err) {
this.socket = null;
this.destroy(err);
};

IncomingMessage.prototype._addHeaderLines = _addHeaderLines;
function _addHeaderLines(headers, n) {
if (headers && headers.length) {
Expand Down
4 changes: 2 additions & 2 deletions lib/internal/streams/add-abort-signal.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ const validateAbortSignal = (signal, name) => {
}
};

function isStream(obj) {
function isNodeStream(obj) {
return !!(obj && typeof obj.pipe === 'function');
}

module.exports.addAbortSignal = function addAbortSignal(signal, stream) {
validateAbortSignal(signal, 'signal');
if (!isStream(stream)) {
if (!isNodeStream(stream)) {
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream);
}
return module.exports.addAbortSignalNoValidate(signal, stream);
Expand Down
29 changes: 10 additions & 19 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ const {
const {
Symbol,
} = primordials;
const {
kDestroyed,
isDestroyed,
isFinished,
isServerRequest
} = require('internal/streams/utils');

const kDestroy = Symbol('kDestroy');
const kConstruct = Symbol('kConstruct');
Expand Down Expand Up @@ -364,8 +370,6 @@ function isRequest(stream) {
return stream && stream.setHeader && typeof stream.abort === 'function';
}

const kDestroyed = Symbol('kDestroyed');

function emitCloseLegacy(stream) {
stream.emit('close');
}
Expand All @@ -375,31 +379,20 @@ function emitErrorCloseLegacy(stream, err) {
process.nextTick(emitCloseLegacy, stream);
}

function isDestroyed(stream) {
return stream.destroyed || stream[kDestroyed];
}

function isReadable(stream) {
return stream.readable && !stream.readableEnded && !isDestroyed(stream);
}

function isWritable(stream) {
return stream.writable && !stream.writableEnded && !isDestroyed(stream);
}

// Normalize destroy for legacy.
function destroyer(stream, err) {
if (isDestroyed(stream)) {
return;
}

if (!err && (isReadable(stream) || isWritable(stream))) {
if (!err && !isFinished(stream)) {
err = new AbortError();
}

// TODO: Remove isRequest branches.
if (typeof stream[kDestroy] === 'function') {
stream[kDestroy](err);
if (isServerRequest(stream)) {
stream.socket = null;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to prevent the socket from destroying in this case? Is there any chance to move this logic into IncomingMessage.prototype._destroy to decouple destroy logic a bit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that the response is not destroyed. No it’s not possible to decouple to due breaking reasons. Unfortunately this is the only way.

stream.destroy(err);
} else if (isRequest(stream)) {
stream.abort();
} else if (isRequest(stream.req)) {
Expand All @@ -421,8 +414,6 @@ function destroyer(stream, err) {
}

module.exports = {
kDestroy,
isDestroyed,
construct,
destroyer,
destroy,
Expand Down
90 changes: 29 additions & 61 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,48 +17,23 @@ const {
validateObject,
} = require('internal/validators');

const {
isClosed,
isReadable,
isReadableNodeStream,
isReadableFinished,
isWritable,
isWritableNodeStream,
isWritableFinished,
willEmitClose: _willEmitClose,
} = require('internal/streams/utils');

function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}

function isServerResponse(stream) {
return (
typeof stream._sent100 === 'boolean' &&
typeof stream._removedConnection === 'boolean' &&
typeof stream._removedContLen === 'boolean' &&
typeof stream._removedTE === 'boolean' &&
typeof stream._closed === 'boolean'
);
}

function isReadable(stream) {
return typeof stream.readable === 'boolean' ||
typeof stream.readableEnded === 'boolean' ||
!!stream._readableState;
}

function isWritable(stream) {
return typeof stream.writable === 'boolean' ||
typeof stream.writableEnded === 'boolean' ||
!!stream._writableState;
}

function isWritableFinished(stream) {
if (stream.writableFinished) return true;
const wState = stream._writableState;
if (!wState || wState.errored) return false;
return wState.finished || (wState.ended && wState.length === 0);
}

const nop = () => {};

function isReadableEnded(stream) {
if (stream.readableEnded) return true;
const rState = stream._readableState;
if (!rState || rState.errored) return false;
return rState.endEmitted || (rState.ended && rState.length === 0);
}

function eos(stream, options, callback) {
if (arguments.length === 2) {
callback = options;
Expand All @@ -74,13 +49,12 @@ function eos(stream, options, callback) {
callback = once(callback);

const readable = options.readable ||
(options.readable !== false && isReadable(stream));
(options.readable !== false && isReadableNodeStream(stream));
const writable = options.writable ||
(options.writable !== false && isWritable(stream));
(options.writable !== false && isWritableNodeStream(stream));

const wState = stream._writableState;
const rState = stream._readableState;
const state = wState || rState;

const onlegacyfinish = () => {
if (!stream.writable) onfinish();
Expand All @@ -89,16 +63,13 @@ function eos(stream, options, callback) {
// TODO (ronag): Improve soft detection to include core modules and
// common ecosystem modules that do properly emit 'close' but fail
// this generic check.
let willEmitClose = isServerResponse(stream) || (
state &&
state.autoDestroy &&
state.emitClose &&
state.closed === false &&
isReadable(stream) === readable &&
isWritable(stream) === writable
let willEmitClose = (
_willEmitClose(stream) &&
isReadableNodeStream(stream) === readable &&
isWritableNodeStream(stream) === writable
);
ronag marked this conversation as resolved.
Show resolved Hide resolved

let writableFinished = stream.writableFinished || wState?.finished;
let writableFinished = isWritableFinished(stream, false);
const onfinish = () => {
writableFinished = true;
// Stream should not be destroyed here. If it is that
Expand All @@ -107,12 +78,12 @@ function eos(stream, options, callback) {
if (stream.destroyed) willEmitClose = false;

if (willEmitClose && (!stream.readable || readable)) return;
if (!readable || readableEnded) callback.call(stream);
if (!readable || readableFinished) callback.call(stream);
};

let readableEnded = stream.readableEnded || rState?.endEmitted;
let readableFinished = isReadableFinished(stream, false);
const onend = () => {
readableEnded = true;
readableFinished = true;
// Stream should not be destroyed here. If it is that
// means that user space is doing something differently and
// we cannot trust willEmitClose.
Expand All @@ -126,7 +97,7 @@ function eos(stream, options, callback) {
callback.call(stream, err);
};

let closed = wState?.closed || rState?.closed;
let closed = isClosed(stream);

const onclose = () => {
closed = true;
Expand All @@ -137,13 +108,13 @@ function eos(stream, options, callback) {
return callback.call(stream, errored);
}

if (readable && !readableEnded) {
if (!isReadableEnded(stream))
if (readable && !readableFinished) {
if (!isReadableFinished(stream, false))
return callback.call(stream,
new ERR_STREAM_PREMATURE_CLOSE());
}
if (writable && !writableFinished) {
if (!isWritableFinished(stream))
if (!isWritableFinished(stream, false))
return callback.call(stream,
new ERR_STREAM_PREMATURE_CLOSE());
}
Expand Down Expand Up @@ -185,19 +156,16 @@ function eos(stream, options, callback) {
}
} else if (
!readable &&
(!willEmitClose || stream.readable) &&
writableFinished
(!willEmitClose || isReadable(stream)) &&
(writableFinished || !isWritable(stream))
) {
process.nextTick(onclose);
} else if (
!writable &&
(!willEmitClose || stream.writable) &&
readableEnded
(!willEmitClose || isWritable(stream)) &&
(readableFinished || !isReadable(stream))
) {
process.nextTick(onclose);
} else if (!wState && !rState && stream._closed === true) {
// _closed is for OutgoingMessage which is not a proper Writable.
process.nextTick(onclose);
} else if ((rState && stream.req && stream.aborted)) {
process.nextTick(onclose);
}
Expand Down
14 changes: 7 additions & 7 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ const { validateCallback } = require('internal/validators');

const {
isIterable,
isReadable,
isStream,
isReadableNodeStream,
isNodeStream,
} = require('internal/streams/utils');

let PassThrough;
Expand Down Expand Up @@ -87,7 +87,7 @@ function popCallback(streams) {
function makeAsyncIterable(val) {
if (isIterable(val)) {
return val;
} else if (isReadable(val)) {
} else if (isReadableNodeStream(val)) {
// Legacy streams are not Iterable.
return fromReadable(val);
}
Expand Down Expand Up @@ -204,7 +204,7 @@ function pipeline(...streams) {
const reading = i < streams.length - 1;
const writing = i > 0;

if (isStream(stream)) {
if (isNodeStream(stream)) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, finish));
}
Expand All @@ -216,7 +216,7 @@ function pipeline(...streams) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
}
} else if (isIterable(stream) || isReadable(stream)) {
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
ret = stream;
} else {
throw new ERR_INVALID_ARG_TYPE(
Expand Down Expand Up @@ -271,8 +271,8 @@ function pipeline(...streams) {
finishCount++;
destroys.push(destroyer(ret, false, true, finish));
}
} else if (isStream(stream)) {
if (isReadable(ret)) {
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
ret.pipe(stream);

// Compat. Before node v10.12.0 stdio used to throw an error so
Expand Down
Loading