From 0738a2b7bd011c421b458e586c49f9a7f2657b90 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 2 Jul 2021 14:51:30 +0200 Subject: [PATCH] stream: finished should error on errored stream Calling finished before or after a stream has errored or closed should end up with the same behavior. PR-URL: https://github.com/nodejs/node/pull/39235 Reviewed-By: Benjamin Gruenbaum Reviewed-By: Matteo Collina Reviewed-By: Rich Trott --- lib/_http_client.js | 3 ++ lib/_http_incoming.js | 3 ++ lib/internal/streams/end-of-stream.js | 61 +++++++++++++++------------ test/parallel/test-stream-finished.js | 23 ++++++++++ 4 files changed, 64 insertions(+), 26 deletions(-) diff --git a/lib/_http_client.js b/lib/_http_client.js index fde7fde86bbf25..598b585bcfa383 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -806,6 +806,9 @@ function onSocketNT(req, socket, err) { socket.emit('free'); } else { finished(socket.destroy(err || req[kError]), (er) => { + if (er?.code === 'ERR_STREAM_PREMATURE_CLOSE') { + er = null; + } _destroy(req, er || err); }); return; diff --git a/lib/_http_incoming.js b/lib/_http_incoming.js index 3961b583de9ddc..a92687ce37bfbc 100644 --- a/lib/_http_incoming.js +++ b/lib/_http_incoming.js @@ -188,6 +188,9 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) { if (this.socket && !this.socket.destroyed && this.aborted) { this.socket.destroy(err); const cleanup = finished(this.socket, (e) => { + if (e?.code === 'ERR_STREAM_PREMATURE_CLOSE') { + e = null; + } cleanup(); process.nextTick(onError, this, e || err, cb); }); diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 318ab4c2e6a8b7..efc2441c51ee39 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -98,8 +98,7 @@ function eos(stream, options, callback) { isWritable(stream) === writable ); - let writableFinished = stream.writableFinished || - (wState && wState.finished); + let writableFinished = stream.writableFinished || wState?.finished; const onfinish = () => { writableFinished = true; // Stream should not be destroyed here. If it is that @@ -111,8 +110,7 @@ function eos(stream, options, callback) { if (!readable || readableEnded) callback.call(stream); }; - let readableEnded = stream.readableEnded || - (rState && rState.endEmitted); + let readableEnded = stream.readableEnded || rState?.endEmitted; const onend = () => { readableEnded = true; // Stream should not be destroyed here. If it is that @@ -128,7 +126,17 @@ function eos(stream, options, callback) { callback.call(stream, err); }; + let closed = wState?.closed || rState?.closed; + const onclose = () => { + closed = true; + + const errored = wState?.errored || rState?.errored; + + if (errored && typeof errored !== 'boolean') { + return callback.call(stream, errored); + } + if (readable && !readableEnded) { if (!isReadableEnded(stream)) return callback.call(stream, @@ -139,6 +147,7 @@ function eos(stream, options, callback) { return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); } + callback.call(stream); }; @@ -168,29 +177,29 @@ function eos(stream, options, callback) { if (options.error !== false) stream.on('error', onerror); stream.on('close', onclose); - // _closed is for OutgoingMessage which is not a proper Writable. - const closed = (!wState && !rState && stream._closed === true) || ( - (wState && wState.closed) || - (rState && rState.closed) || - (wState && wState.errorEmitted) || - (rState && rState.errorEmitted) || - (rState && stream.req && stream.aborted) || - ( - (!writable || (wState && wState.finished)) && - (!readable || (rState && rState.endEmitted)) - ) - ); - if (closed) { - // TODO(ronag): Re-throw error if errorEmitted? - // TODO(ronag): Throw premature close as if finished was called? - // before being closed? i.e. if closed but not errored, ended or finished. - // TODO(ronag): Throw some kind of error? Does it make sense - // to call finished() on a "finished" stream? - // TODO(ronag): willEmitClose? - process.nextTick(() => { - callback(); - }); + process.nextTick(onclose); + } else if (wState?.errorEmitted || rState?.errorEmitted) { + if (!willEmitClose) { + process.nextTick(onclose); + } + } else if ( + !readable && + (!willEmitClose || stream.readable) && + writableFinished + ) { + process.nextTick(onclose); + } else if ( + !writable && + (!willEmitClose || stream.writable) && + readableEnded + ) { + process.nextTick(onclose); + } else if (!wState && !rState && stream._closed === true) { + // _closed is for OutgoingMessage which is not a proper Writable. + process.nextTick(onclose); + } else if ((rState && stream.req && stream.aborted)) { + process.nextTick(onclose); } const cleanup = () => { diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index 43b1e36a547402..8e371911698336 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -608,3 +608,26 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); assert.strictEqual(closed, true); })); } + +{ + const w = new Writable(); + const _err = new Error(); + w.destroy(_err); + finished(w, common.mustCall((err) => { + assert.strictEqual(_err, err); + finished(w, common.mustCall((err) => { + assert.strictEqual(_err, err); + })); + })); +} + +{ + const w = new Writable(); + w.destroy(); + finished(w, common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + finished(w, common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + })); + })); +}