Skip to content

Commit

Permalink
stream: fix end-of-stream for HTTP/2
Browse files Browse the repository at this point in the history
HTTP/2 streams call `.end()` on themselves from their
`.destroy()` method, which might be queued (e.g. due to network
congestion) and not processed before the stream itself is destroyed.

In that case, the `_writableState.ended` property could be set before
the stream emits its `'close'` event, and never actually emits the
`'finished'` event, confusing the end-of-stream implementation so
that it wouldn’t call its callback.

This can be fixed by watching for the end events themselves using the
existing `'finish'` and `'end'` listeners rather than relying on the
`.ended` properties of the `_...State` objects.

These properties still need to be checked to know whether stream
closure was premature – My understanding is that ideally, streams
should not emit `'close'` before `'end'` and/or `'finished'`, so this
might be another bug, but changing this would require modifying tests
and almost certainly be a breaking change.

Fixes: #24456

PR-URL: #24926
Reviewed-By: Rich Trott <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Franziska Hinkelmann <[email protected]>
  • Loading branch information
addaleax authored and BethGriggs committed Dec 17, 2018
1 parent b2e6cbd commit 39af61f
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 9 deletions.
21 changes: 14 additions & 7 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,24 @@ function eos(stream, opts, callback) {

callback = once(callback);

const ws = stream._writableState;
const rs = stream._readableState;
let readable = opts.readable || (opts.readable !== false && stream.readable);
let writable = opts.writable || (opts.writable !== false && stream.writable);

const onlegacyfinish = () => {
if (!stream.writable) onfinish();
};

var writableEnded = stream._writableState && stream._writableState.finished;
const onfinish = () => {
writable = false;
writableEnded = true;
if (!readable) callback.call(stream);
};

var readableEnded = stream._readableState && stream._readableState.endEmitted;
const onend = () => {
readable = false;
readableEnded = true;
if (!writable) callback.call(stream);
};

Expand All @@ -60,11 +62,16 @@ function eos(stream, opts, callback) {
};

const onclose = () => {
if (readable && !(rs && rs.ended)) {
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
let err;
if (readable && !readableEnded) {
if (!stream._readableState || !stream._readableState.ended)
err = new ERR_STREAM_PREMATURE_CLOSE();
return callback.call(stream, err);
}
if (writable && !(ws && ws.ended)) {
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
if (writable && !writableEnded) {
if (!stream._writableState || !stream._writableState.ended)
err = new ERR_STREAM_PREMATURE_CLOSE();
return callback.call(stream, err);
}
};

Expand All @@ -77,7 +84,7 @@ function eos(stream, opts, callback) {
stream.on('abort', onclose);
if (stream.req) onrequest();
else stream.on('request', onrequest);
} else if (writable && !ws) { // legacy streams
} else if (writable && !stream._writableState) { // legacy streams
stream.on('end', onlegacyfinish);
stream.on('close', onlegacyfinish);
}
Expand Down
2 changes: 0 additions & 2 deletions test/parallel/parallel.status
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ test-net-connect-options-port: PASS,FLAKY
test-http2-pipe: PASS,FLAKY
test-worker-syntax-error: PASS,FLAKY
test-worker-syntax-error-file: PASS,FLAKY
# https://github.com/nodejs/node/issues/24456
test-stream-pipeline-http2: PASS,FLAKY

[$system==linux]

Expand Down
39 changes: 39 additions & 0 deletions test/parallel/test-stream-pipeline-queued-end-in-destroy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { Readable, Duplex, pipeline } = require('stream');

// Test that the callback for pipeline() is called even when the ._destroy()
// method of the stream places an .end() request to itself that does not
// get processed before the destruction of the stream (i.e. the 'close' event).
// Refs: https://github.com/nodejs/node/issues/24456

const readable = new Readable({
read: common.mustCall(() => {})
});

const duplex = new Duplex({
write(chunk, enc, cb) {
// Simulate messages queueing up.
},
read() {},
destroy(err, cb) {
// Call end() from inside the destroy() method, like HTTP/2 streams
// do at the time of writing.
this.end();
cb(err);
}
});

duplex.on('finished', common.mustNotCall());

pipeline(readable, duplex, common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}));

// Write one chunk of data, and destroy the stream later.
// That should trigger the pipeline destruction.
readable.push('foo');
setImmediate(() => {
readable.destroy();
});

0 comments on commit 39af61f

Please sign in to comment.