Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http2: fix graceful session close #30854

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 71 additions & 38 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,10 @@ function onStreamClose(code) {
if (!stream || stream.destroyed)
return false;

debugStreamObj(stream, 'closed with code %d', code);
debugStreamObj(
stream, 'closed with code %d, closed %s, readable %s',
code, stream.closed, stream.readable
);

if (!stream.closed)
closeStream(stream, code, kNoRstStream);
Expand All @@ -495,7 +498,7 @@ function onStreamClose(code) {
// Defer destroy we actually emit end.
if (!stream.readable || code !== NGHTTP2_NO_ERROR) {
// If errored or ended, we can destroy immediately.
stream[kMaybeDestroy](code);
stream.destroy();
lundibundi marked this conversation as resolved.
Show resolved Hide resolved
} else {
// Wait for end to destroy.
stream.on('end', stream[kMaybeDestroy]);
Expand Down Expand Up @@ -983,20 +986,76 @@ function emitClose(self, error) {
self.emit('close');
}

function finishSessionDestroy(session, error) {
function cleanupSession(session) {
const socket = session[kSocket];
if (!socket.destroyed)
socket.destroy(error);

const handle = session[kHandle];
session[kProxySocket] = undefined;
session[kSocket] = undefined;
session[kHandle] = undefined;
session[kNativeFields] = new Uint8Array(kSessionUint8FieldCount);
socket[kSession] = undefined;
socket[kServer] = undefined;
if (handle)
handle.ondone = null;
if (socket) {
socket[kSession] = undefined;
socket[kServer] = undefined;
}
}

// Finally, emit the close and error events (if necessary) on next tick.
process.nextTick(emitClose, session, error);
function finishSessionClose(session, error) {
debugSessionObj(session, 'finishSessionClose');

const socket = session[kSocket];
cleanupSession(session);

if (socket && !socket.destroyed) {
// Always wait for writable side to finish.
socket.end((err) => {
debugSessionObj(session, 'finishSessionClose socket end', err);
// Due to the way the underlying stream is handled in Http2Session we
// won't get graceful Readable end from the other side even if it was sent
// as the stream is already considered closed and will neither be read
// from nor keep the event loop alive.
// Therefore destroy the socket immediately.
// Fixing this would require some heavy juggling of ReadStart/ReadStop
// mostly on Windows as on Unix it will be fine with just ReadStart
// after this 'ondone' callback.
socket.destroy(error);
emitClose(session, error);
});
} else {
process.nextTick(emitClose, session, error);
}
}

function closeSession(session, code, error) {
debugSessionObj(session, 'start closing/destroying');

const state = session[kState];
state.flags |= SESSION_FLAGS_DESTROYED;
state.destroyCode = code;

// Clear timeout and remove timeout listeners.
session.setTimeout(0);
session.removeAllListeners('timeout');

// Destroy any pending and open streams
if (state.pendingStreams.size > 0 || state.streams.size > 0) {
const cancel = new ERR_HTTP2_STREAM_CANCEL(error);
state.pendingStreams.forEach((stream) => stream.destroy(cancel));
state.streams.forEach((stream) => stream.destroy(error));
}

// Disassociate from the socket and server.
const socket = session[kSocket];
const handle = session[kHandle];

// Destroy the handle if it exists at this point.
if (handle !== undefined) {
handle.ondone = finishSessionClose.bind(null, session, error);
handle.destroy(code, socket.destroyed);
} else {
finishSessionClose(session, error);
}
}

// Upon creation, the Http2Session takes ownership of the socket. The session
Expand Down Expand Up @@ -1323,6 +1382,7 @@ class Http2Session extends EventEmitter {
destroy(error = NGHTTP2_NO_ERROR, code) {
if (this.destroyed)
return;

debugSessionObj(this, 'destroying');

if (typeof error === 'number') {
Expand All @@ -1334,34 +1394,7 @@ class Http2Session extends EventEmitter {
if (code === undefined && error != null)
code = NGHTTP2_INTERNAL_ERROR;

const state = this[kState];
state.flags |= SESSION_FLAGS_DESTROYED;
state.destroyCode = code;

// Clear timeout and remove timeout listeners
this.setTimeout(0);
this.removeAllListeners('timeout');

// Destroy any pending and open streams
const cancel = new ERR_HTTP2_STREAM_CANCEL(error);
state.pendingStreams.forEach((stream) => stream.destroy(cancel));
state.streams.forEach((stream) => stream.destroy(error));

// Disassociate from the socket and server
const socket = this[kSocket];
const handle = this[kHandle];

// Destroy the handle if it exists at this point
if (handle !== undefined)
handle.destroy(code, socket.destroyed);

// If the socket is alive, use setImmediate to destroy the session on the
// next iteration of the event loop in order to give data time to transmit.
// Otherwise, destroy immediately.
if (!socket.destroyed)
setImmediate(finishSessionDestroy, this, error);
else
finishSessionDestroy(this, error);
closeSession(this, code, error);
}

// Closing the session will:
Expand Down
15 changes: 14 additions & 1 deletion src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,13 @@ void Http2Session::Close(uint32_t code, bool socket_closed) {

flags_ |= SESSION_STATE_CLOSED;

// If we are writing we will get to make the callback in OnStreamAfterWrite.
if ((flags_ & SESSION_STATE_WRITE_IN_PROGRESS) == 0) {
Debug(this, "make done session callback");
HandleScope scope(env()->isolate());
MakeCallback(env()->ondone_string(), 0, nullptr);
}

// If there are outstanding pings, those will need to be canceled, do
// so on the next iteration of the event loop to avoid calling out into
// javascript since this may be called during garbage collection.
Expand Down Expand Up @@ -1565,6 +1572,12 @@ void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) {
stream_->ReadStart();
}

if ((flags_ & SESSION_STATE_CLOSED) != 0) {
HandleScope scope(env()->isolate());
MakeCallback(env()->ondone_string(), 0, nullptr);
return;
}

// If there is more incoming data queued up, consume it.
if (stream_buf_offset_ > 0) {
ConsumeHTTP2Data();
Expand Down Expand Up @@ -1849,7 +1862,7 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
Context::Scope context_scope(env()->context());
Http2Scope h2scope(this);
CHECK_NOT_NULL(stream_);
Debug(this, "receiving %d bytes", nread);
Debug(this, "receiving %d bytes, offset %d", nread, stream_buf_offset_);
AllocatedBuffer buf(env(), buf_);

// Only pass data on if nread > 0
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-http2-capture-rejection.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ events.captureRejections = true;
}));
}


{
// Test error thrown in 'request' event

Expand Down Expand Up @@ -136,6 +135,7 @@ events.captureRejections = true;
const session = connect(`http://localhost:${port}`);

const req = session.request();
req.resume();

session.on('stream', common.mustCall(async (stream) => {
session.close();
Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-http2-client-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ const Countdown = require('../common/countdown');
server.on('stream', common.mustNotCall());
server.listen(0, common.mustCall(() => {
const client = h2.connect(`http://localhost:${server.address().port}`);
client.on('close', common.mustCall());
const socket = client[kSocket];
socket.on('close', common.mustCall(() => {
assert(socket.destroyed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ if (!common.hasCrypto)
const assert = require('assert');
const h2 = require('http2');
const NGHTTP2_INTERNAL_ERROR = h2.constants.NGHTTP2_INTERNAL_ERROR;
const Countdown = require('../common/countdown');

const server = h2.createServer();

Expand All @@ -27,6 +28,11 @@ server.on('stream', (stream) => {

server.listen(0, common.mustCall(() => {
const client = h2.connect(`http://localhost:${server.address().port}`);
const countdown = new Countdown(2, () => {
server.close();
client.close();
});
client.on('connect', () => countdown.dec());

const req = client.request();
req.destroy(new Error('test'));
Expand All @@ -39,8 +45,7 @@ server.listen(0, common.mustCall(() => {
req.on('close', common.mustCall(() => {
assert.strictEqual(req.rstCode, NGHTTP2_INTERNAL_ERROR);
assert.strictEqual(req.rstCode, NGHTTP2_INTERNAL_ERROR);
server.close();
client.close();
countdown.dec();
}));

req.on('response', common.mustNotCall());
Expand Down
2 changes: 2 additions & 0 deletions test/parallel/test-http2-compat-client-upload-reject.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ fs.readFile(loc, common.mustCall((err, data) => {
res.end();
});
}));
server.on('close', common.mustCall());

server.listen(0, common.mustCall(() => {
const client = http2.connect(`http://localhost:${server.address().port}`);
client.on('close', common.mustCall());

const req = client.request({ ':method': 'POST' });
req.on('response', common.mustCall((headers) => {
Expand Down
4 changes: 3 additions & 1 deletion test/parallel/test-http2-create-client-connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ const URL = url.URL;
const client =
h2.connect.apply(null, i)
.on('connect', common.mustCall(() => maybeClose(client)));
client.on('close', common.mustCall());
});

// Will fail because protocol does not match the server.
h2.connect({ port: port, protocol: 'https:' })
const client = h2.connect({ port: port, protocol: 'https:' })
.on('error', common.mustCall(() => serverClose.dec()));
client.on('close', common.mustCall());
}));
}

Expand Down
8 changes: 6 additions & 2 deletions test/parallel/test-http2-goaway-opaquedata.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,24 @@ const http2 = require('http2');

const server = http2.createServer();
const data = Buffer.from([0x1, 0x2, 0x3, 0x4, 0x5]);
let session;

server.on('stream', common.mustCall((stream) => {
stream.session.goaway(0, 0, data);
session = stream.session;
session.on('close', common.mustCall());
session.goaway(0, 0, data);
stream.respond();
stream.end();
}));
server.on('close', common.mustCall());

server.listen(0, () => {

const client = http2.connect(`http://localhost:${server.address().port}`);
client.once('goaway', common.mustCall((code, lastStreamID, buf) => {
assert.deepStrictEqual(code, 0);
assert.deepStrictEqual(lastStreamID, 1);
assert.deepStrictEqual(data, buf);
session.close();
server.close();
}));
const req = client.request();
Expand Down
9 changes: 7 additions & 2 deletions test/parallel/test-http2-ping-settings-heapdump.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ for (const variant of ['ping', 'settings']) {
}));

server.listen(0, common.mustCall(() => {
http2.connect(`http://localhost:${server.address().port}`,
common.mustCall());
const client = http2.connect(`http://localhost:${server.address().port}`,
common.mustCall());
client.on('error', (err) => {
// We destroy the session so it's possible to get ECONNRESET here.
if (err.code !== 'ECONNRESET')
throw err;
});
}));
}
2 changes: 2 additions & 0 deletions test/parallel/test-http2-server-push-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ server.listen(0, common.mustCall(() => {
assert.strictEqual(headers['x-push-data'], 'pushed by server');
}));
stream.on('aborted', common.mustNotCall());
// We have to read the data of the push stream to end gracefully.
stream.resume();
}));

let data = '';
Expand Down