diff --git a/lib/_http_client.js b/lib/_http_client.js index 598b585bcfa383..fde7fde86bbf25 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -806,9 +806,6 @@ function onSocketNT(req, socket, err) { socket.emit('free'); } else { finished(socket.destroy(err || req[kError]), (er) => { - if (er?.code === 'ERR_STREAM_PREMATURE_CLOSE') { - er = null; - } _destroy(req, er || err); }); return; diff --git a/lib/_http_incoming.js b/lib/_http_incoming.js index a92687ce37bfbc..3961b583de9ddc 100644 --- a/lib/_http_incoming.js +++ b/lib/_http_incoming.js @@ -188,9 +188,6 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) { if (this.socket && !this.socket.destroyed && this.aborted) { this.socket.destroy(err); const cleanup = finished(this.socket, (e) => { - if (e?.code === 'ERR_STREAM_PREMATURE_CLOSE') { - e = null; - } cleanup(); process.nextTick(onError, this, e || err, cb); }); diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index efc2441c51ee39..318ab4c2e6a8b7 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -98,7 +98,8 @@ function eos(stream, options, callback) { isWritable(stream) === writable ); - let writableFinished = stream.writableFinished || wState?.finished; + let writableFinished = stream.writableFinished || + (wState && wState.finished); const onfinish = () => { writableFinished = true; // Stream should not be destroyed here. If it is that @@ -110,7 +111,8 @@ function eos(stream, options, callback) { if (!readable || readableEnded) callback.call(stream); }; - let readableEnded = stream.readableEnded || rState?.endEmitted; + let readableEnded = stream.readableEnded || + (rState && rState.endEmitted); const onend = () => { readableEnded = true; // Stream should not be destroyed here. If it is that @@ -126,17 +128,7 @@ function eos(stream, options, callback) { callback.call(stream, err); }; - let closed = wState?.closed || rState?.closed; - const onclose = () => { - closed = true; - - const errored = wState?.errored || rState?.errored; - - if (errored && typeof errored !== 'boolean') { - return callback.call(stream, errored); - } - if (readable && !readableEnded) { if (!isReadableEnded(stream)) return callback.call(stream, @@ -147,7 +139,6 @@ function eos(stream, options, callback) { return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); } - callback.call(stream); }; @@ -177,29 +168,29 @@ function eos(stream, options, callback) { if (options.error !== false) stream.on('error', onerror); stream.on('close', onclose); + // _closed is for OutgoingMessage which is not a proper Writable. + const closed = (!wState && !rState && stream._closed === true) || ( + (wState && wState.closed) || + (rState && rState.closed) || + (wState && wState.errorEmitted) || + (rState && rState.errorEmitted) || + (rState && stream.req && stream.aborted) || + ( + (!writable || (wState && wState.finished)) && + (!readable || (rState && rState.endEmitted)) + ) + ); + if (closed) { - process.nextTick(onclose); - } else if (wState?.errorEmitted || rState?.errorEmitted) { - if (!willEmitClose) { - process.nextTick(onclose); - } - } else if ( - !readable && - (!willEmitClose || stream.readable) && - writableFinished - ) { - process.nextTick(onclose); - } else if ( - !writable && - (!willEmitClose || stream.writable) && - readableEnded - ) { - process.nextTick(onclose); - } else if (!wState && !rState && stream._closed === true) { - // _closed is for OutgoingMessage which is not a proper Writable. - process.nextTick(onclose); - } else if ((rState && stream.req && stream.aborted)) { - process.nextTick(onclose); + // TODO(ronag): Re-throw error if errorEmitted? + // TODO(ronag): Throw premature close as if finished was called? + // before being closed? i.e. if closed but not errored, ended or finished. + // TODO(ronag): Throw some kind of error? Does it make sense + // to call finished() on a "finished" stream? + // TODO(ronag): willEmitClose? + process.nextTick(() => { + callback(); + }); } const cleanup = () => { diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index f2e21ed59eec6f..6eb07cdd4a9b14 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -56,13 +56,12 @@ const { getDefaultHighWaterMark } = require('internal/streams/state'); -const eos = require('internal/streams/end-of-stream'); - const { ERR_INVALID_ARG_TYPE, - ERR_STREAM_PUSH_AFTER_EOF, ERR_METHOD_NOT_IMPLEMENTED, - ERR_STREAM_UNSHIFT_AFTER_END_EVENT + ERR_STREAM_PREMATURE_CLOSE, + ERR_STREAM_PUSH_AFTER_EOF, + ERR_STREAM_UNSHIFT_AFTER_END_EVENT, } = require('internal/errors').codes; const { validateObject } = require('internal/validators'); @@ -1111,18 +1110,23 @@ async function* createAsyncIterator(stream, options) { let error = state.errored; let errorEmitted = state.errorEmitted; let endEmitted = state.endEmitted; + let closeEmitted = state.closeEmitted; - stream.on('readable', next); - - eos(stream, (err) => { - if (err) { - errorEmitted = true; + stream + .on('readable', next) + .on('error', function(err) { error = err; - } - - endEmitted = true; - next.call(stream); - }); + errorEmitted = true; + next.call(this); + }) + .on('end', function() { + endEmitted = true; + next.call(this); + }) + .on('close', function() { + closeEmitted = true; + next.call(this); + }); let errorThrown = false; try { @@ -1134,6 +1138,8 @@ async function* createAsyncIterator(stream, options) { throw error; } else if (endEmitted) { break; + } else if (closeEmitted) { + throw new ERR_STREAM_PREMATURE_CLOSE(); } else { await new Promise(next); } diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index 8e371911698336..43b1e36a547402 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -608,26 +608,3 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); assert.strictEqual(closed, true); })); } - -{ - const w = new Writable(); - const _err = new Error(); - w.destroy(_err); - finished(w, common.mustCall((err) => { - assert.strictEqual(_err, err); - finished(w, common.mustCall((err) => { - assert.strictEqual(_err, err); - })); - })); -} - -{ - const w = new Writable(); - w.destroy(); - finished(w, common.mustCall((err) => { - assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); - finished(w, common.mustCall((err) => { - assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); - })); - })); -}