From a051ccc835ba20c43c1a59d7ee1860b0f8b771e0 Mon Sep 17 00:00:00 2001 From: Anatoli Papirovski Date: Wed, 25 Oct 2017 19:04:41 -0400 Subject: [PATCH 1/3] http2: correctly reset write timers Currently reset timers on both session & stream when write starts and when it ends. --- lib/internal/http2/core.js | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 727ca517989fb8..b5b25bded8bb5e 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1200,7 +1200,12 @@ function createWriteReq(req, handle, data, encoding) { } function afterDoStreamWrite(status, handle, req) { - _unrefActive(handle[kOwner]); + const session = handle[kOwner]; + const stream = session[kState].streams.get(req.stream); + _unrefActive(session); + if (stream !== undefined) + _unrefActive(stream); + if (typeof req.callback === 'function') req.callback(); this.handle = undefined; @@ -1396,10 +1401,11 @@ class Http2Stream extends Duplex { this.once('ready', this._write.bind(this, data, encoding, cb)); return; } - _unrefActive(this); if (!this[kState].headersSent) this[kProceed](); const session = this[kSession]; + _unrefActive(this); + _unrefActive(session); const handle = session[kHandle]; const req = new WriteWrap(); req.stream = this[kID]; @@ -1410,7 +1416,6 @@ class Http2Stream extends Duplex { const err = createWriteReq(req, handle, data, encoding); if (err) throw util._errnoException(err, 'write', req.error); - this._bytesDispatched += req.bytes; } _writev(data, cb) { @@ -1418,10 +1423,11 @@ class Http2Stream extends Duplex { this.once('ready', this._writev.bind(this, data, cb)); return; } - _unrefActive(this); if (!this[kState].headersSent) this[kProceed](); const session = this[kSession]; + _unrefActive(this); + _unrefActive(session); const handle = session[kHandle]; const req = new WriteWrap(); req.stream = this[kID]; From 03d7e3f795f001dc6e624fe287a035949151275d Mon Sep 17 00:00:00 2001 From: Anatoli Papirovski Date: Thu, 26 Oct 2017 11:37:20 -0400 Subject: [PATCH 2/3] http2: prevent large writes from timing out When writing a large chunk of data in http2, once the data is handed off to C++, the JS session & stream lose all track of the write and will timeout if the write doesn't complete within the timeout window Fix this issue by tracking whether a write request is ongoing and also tracking how many chunks have been sent since the most recent write started. (Since each write call resets the timer.) --- lib/internal/http2/core.js | 64 ++++++++++++- src/env.h | 1 + src/node_http2.cc | 24 +++++ src/node_http2.h | 4 + .../test-http2-timeout-large-write-file.js | 89 +++++++++++++++++++ .../test-http2-timeout-large-write.js | 84 +++++++++++++++++ 6 files changed, 262 insertions(+), 4 deletions(-) create mode 100644 test/sequential/test-http2-timeout-large-write-file.js create mode 100644 test/sequential/test-http2-timeout-large-write.js diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index b5b25bded8bb5e..7889893dd52e3c 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -746,7 +746,8 @@ class Http2Session extends EventEmitter { shutdown: false, shuttingDown: false, pendingAck: 0, - maxPendingAck: Math.max(1, (options.maxPendingAck | 0) || 10) + maxPendingAck: Math.max(1, (options.maxPendingAck | 0) || 10), + writeQueueSize: 0 }; this[kType] = type; @@ -1080,6 +1081,20 @@ class Http2Session extends EventEmitter { } _onTimeout() { + // This checks whether a write is currently in progress and also whether + // that write is actually sending data across the write. The kHandle + // stored `chunksSentSinceLastWrite` is only updated when a timeout event + // happens, meaning that if a write is ongoing it should never equal the + // newly fetched, updated value. + if (this[kState].writeQueueSize > 0) { + const handle = this[kHandle]; + if (handle !== undefined && + handle.chunksSentSinceLastWrite !== handle.updateChunksSent()) { + _unrefActive(this); + return; + } + } + process.nextTick(emit, this, 'timeout'); } } @@ -1199,12 +1214,26 @@ function createWriteReq(req, handle, data, encoding) { } } +function trackWriteState(stream, bytes) { + const session = stream[kSession]; + stream[kState].writeQueueSize += bytes; + session[kState].writeQueueSize += bytes; + session[kHandle].chunksSentSinceLastWrite = 0; +} + function afterDoStreamWrite(status, handle, req) { const session = handle[kOwner]; - const stream = session[kState].streams.get(req.stream); _unrefActive(session); - if (stream !== undefined) + + const state = session[kState]; + const { bytes } = req; + state.writeQueueSize -= bytes; + + const stream = state.streams.get(req.stream); + if (stream !== undefined) { _unrefActive(stream); + stream[kState].writeQueueSize -= bytes; + } if (typeof req.callback === 'function') req.callback(); @@ -1317,7 +1346,8 @@ class Http2Stream extends Duplex { headersSent: false, headRequest: false, aborted: false, - closeHandler: onSessionClose.bind(this) + closeHandler: onSessionClose.bind(this), + writeQueueSize: 0 }; this.once('ready', streamOnceReady); @@ -1364,6 +1394,21 @@ class Http2Stream extends Duplex { } _onTimeout() { + // This checks whether a write is currently in progress and also whether + // that write is actually sending data across the write. The kHandle + // stored `chunksSentSinceLastWrite` is only updated when a timeout event + // happens, meaning that if a write is ongoing it should never equal the + // newly fetched, updated value. + if (this[kState].writeQueueSize > 0) { + const handle = this[kSession][kHandle]; + if (handle !== undefined && + handle.chunksSentSinceLastWrite !== handle.updateChunksSent()) { + _unrefActive(this); + _unrefActive(this[kSession]); + return; + } + } + process.nextTick(emit, this, 'timeout'); } @@ -1416,6 +1461,7 @@ class Http2Stream extends Duplex { const err = createWriteReq(req, handle, data, encoding); if (err) throw util._errnoException(err, 'write', req.error); + trackWriteState(this, req.bytes); } _writev(data, cb) { @@ -1444,6 +1490,7 @@ class Http2Stream extends Duplex { const err = handle.writev(req, chunks); if (err) throw util._errnoException(err, 'write', req.error); + trackWriteState(this, req.bytes); } _read(nread) { @@ -1537,6 +1584,10 @@ class Http2Stream extends Duplex { return; } + const state = this[kState]; + session[kState].writeQueueSize -= state.writeQueueSize; + state.writeQueueSize = 0; + const server = session[kServer]; if (server !== undefined && err) { server.emit('streamError', err, this); @@ -1631,7 +1682,12 @@ function processRespondWithFD(fd, headers, offset = 0, length = -1, if (ret < 0) { err = new NghttpError(ret); process.nextTick(emit, this, 'error', err); + break; } + // exact length of the file doesn't matter here, since the + // stream is closing anyway — just use 1 to signify that + // a write does exist + trackWriteState(this, 1); } } diff --git a/src/env.h b/src/env.h index a1ec05baf0f4fe..d44b18c845b32a 100644 --- a/src/env.h +++ b/src/env.h @@ -111,6 +111,7 @@ class ModuleWrap; V(callback_string, "callback") \ V(change_string, "change") \ V(channel_string, "channel") \ + V(chunks_sent_since_last_write_string, "chunksSentSinceLastWrite") \ V(constants_string, "constants") \ V(oncertcb_string, "oncertcb") \ V(onclose_string, "_onclose") \ diff --git a/src/node_http2.cc b/src/node_http2.cc index 568b59b5b4d722..5cfac991862df0 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -603,6 +603,8 @@ void Http2Session::SubmitFile(const FunctionCallbackInfo& args) { return args.GetReturnValue().Set(NGHTTP2_ERR_INVALID_STREAM_ID); } + session->chunks_sent_since_last_write_ = 0; + Headers list(isolate, context, headers); args.GetReturnValue().Set(stream->SubmitFile(fd, *list, list.length(), @@ -757,6 +759,23 @@ void Http2Session::FlushData(const FunctionCallbackInfo& args) { stream->FlushDataChunks(); } +void Http2Session::UpdateChunksSent(const FunctionCallbackInfo& args) { + Http2Session* session; + Environment* env = Environment::GetCurrent(args); + Isolate* isolate = env->isolate(); + ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); + + HandleScope scope(isolate); + + uint32_t length = session->chunks_sent_since_last_write_; + + session->object()->Set(env->context(), + env->chunks_sent_since_last_write_string(), + Integer::NewFromUnsigned(isolate, length)).FromJust(); + + args.GetReturnValue().Set(length); +} + void Http2Session::SubmitPushPromise(const FunctionCallbackInfo& args) { Http2Session* session; Environment* env = Environment::GetCurrent(args); @@ -811,6 +830,8 @@ int Http2Session::DoWrite(WriteWrap* req_wrap, } } + chunks_sent_since_last_write_ = 0; + nghttp2_stream_write_t* req = new nghttp2_stream_write_t; req->data = req_wrap; @@ -846,6 +867,7 @@ void Http2Session::Send(uv_buf_t* buf, size_t length) { this, AfterWrite); + chunks_sent_since_last_write_++; uv_buf_t actual = uv_buf_init(buf->base, length); if (stream_->DoWrite(write_req, &actual, 1, nullptr)) { write_req->Dispose(); @@ -1255,6 +1277,8 @@ void Initialize(Local target, Http2Session::DestroyStream); env->SetProtoMethod(session, "flushData", Http2Session::FlushData); + env->SetProtoMethod(session, "updateChunksSent", + Http2Session::UpdateChunksSent); StreamBase::AddMethods(env, session, StreamBase::kFlagHasWritev | StreamBase::kFlagNoShutdown); diff --git a/src/node_http2.h b/src/node_http2.h index 3e90c49cd77b6b..0151558ebccfaf 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -474,6 +474,7 @@ class Http2Session : public AsyncWrap, static void SubmitGoaway(const FunctionCallbackInfo& args); static void DestroyStream(const FunctionCallbackInfo& args); static void FlushData(const FunctionCallbackInfo& args); + static void UpdateChunksSent(const FunctionCallbackInfo& args); template static void GetSettings(const FunctionCallbackInfo& args); @@ -492,6 +493,9 @@ class Http2Session : public AsyncWrap, StreamResource::Callback prev_read_cb_; padding_strategy_type padding_strategy_ = PADDING_STRATEGY_NONE; + // use this to allow timeout tracking during long-lasting writes + uint32_t chunks_sent_since_last_write_ = 0; + char stream_buf_[kAllocBufferSize]; }; diff --git a/test/sequential/test-http2-timeout-large-write-file.js b/test/sequential/test-http2-timeout-large-write-file.js new file mode 100644 index 00000000000000..f52523780dc948 --- /dev/null +++ b/test/sequential/test-http2-timeout-large-write-file.js @@ -0,0 +1,89 @@ +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const fixtures = require('../common/fixtures'); +const fs = require('fs'); +const http2 = require('http2'); +const path = require('path'); + +common.refreshTmpDir(); + +// This test assesses whether long-running writes can complete +// or timeout because the session or stream are not aware that the +// backing stream is still writing. +// To simulate a slow client, we write a really large chunk and +// then proceed through the following cycle: +// 1) Receive first 'data' event and record currently written size +// 2) Once we've read up to currently written size recorded above, +// we pause the stream and wait longer than the server timeout +// 3) Socket.prototype._onTimeout triggers and should confirm +// that the backing stream is still active and writing +// 4) Our timer fires, we resume the socket and start at 1) + +const writeSize = 3000000; +const minReadSize = 500000; +const serverTimeout = common.platformTimeout(500); +let offsetTimeout = common.platformTimeout(100); +let didReceiveData = false; + +const content = Buffer.alloc(writeSize, 0x44); +const filepath = path.join(common.tmpDir, 'http2-large-write.tmp'); +fs.writeFileSync(filepath, content, 'binary'); +const fd = fs.openSync(filepath, 'r'); + +const server = http2.createSecureServer({ + key: fixtures.readKey('agent1-key.pem'), + cert: fixtures.readKey('agent1-cert.pem') +}); +server.on('stream', common.mustCall((stream) => { + stream.respondWithFD(fd, { + 'Content-Type': 'application/octet-stream', + 'Content-Length': content.length.toString(), + 'Vary': 'Accept-Encoding' + }); + stream.setTimeout(serverTimeout); + stream.on('timeout', () => { + assert.strictEqual(didReceiveData, false, 'Should not timeout'); + }); + stream.end(); +})); +server.setTimeout(serverTimeout); +server.on('timeout', () => { + assert.strictEqual(didReceiveData, false, 'Should not timeout'); +}); + +server.listen(0, common.mustCall(() => { + const client = http2.connect(`https://localhost:${server.address().port}`, + { rejectUnauthorized: false }); + + const req = client.request({ ':path': '/' }); + req.end(); + + const resume = () => req.resume(); + let receivedBufferLength = 0; + let firstReceivedAt; + req.on('data', common.mustCallAtLeast((buf) => { + if (receivedBufferLength === 0) { + didReceiveData = false; + firstReceivedAt = Date.now(); + } + receivedBufferLength += buf.length; + if (receivedBufferLength >= minReadSize && + receivedBufferLength < writeSize) { + didReceiveData = true; + receivedBufferLength = 0; + req.pause(); + setTimeout( + resume, + serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt) + ); + offsetTimeout = 0; + } + }, 1)); + req.on('end', common.mustCall(() => { + client.destroy(); + server.close(); + })); +})); diff --git a/test/sequential/test-http2-timeout-large-write.js b/test/sequential/test-http2-timeout-large-write.js new file mode 100644 index 00000000000000..f0a11b2e44469e --- /dev/null +++ b/test/sequential/test-http2-timeout-large-write.js @@ -0,0 +1,84 @@ +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const fixtures = require('../common/fixtures'); +const http2 = require('http2'); + +// This test assesses whether long-running writes can complete +// or timeout because the session or stream are not aware that the +// backing stream is still writing. +// To simulate a slow client, we write a really large chunk and +// then proceed through the following cycle: +// 1) Receive first 'data' event and record currently written size +// 2) Once we've read up to currently written size recorded above, +// we pause the stream and wait longer than the server timeout +// 3) Socket.prototype._onTimeout triggers and should confirm +// that the backing stream is still active and writing +// 4) Our timer fires, we resume the socket and start at 1) + +const writeSize = 3000000; +const minReadSize = 500000; +const serverTimeout = common.platformTimeout(500); +let offsetTimeout = common.platformTimeout(100); +let didReceiveData = false; + +const server = http2.createSecureServer({ + key: fixtures.readKey('agent1-key.pem'), + cert: fixtures.readKey('agent1-cert.pem') +}); +server.on('stream', common.mustCall((stream) => { + const content = Buffer.alloc(writeSize, 0x44); + + stream.respond({ + 'Content-Type': 'application/octet-stream', + 'Content-Length': content.length.toString(), + 'Vary': 'Accept-Encoding' + }); + + stream.write(content); + stream.setTimeout(serverTimeout); + stream.on('timeout', () => { + assert.strictEqual(didReceiveData, false, 'Should not timeout'); + }); + stream.end(); +})); +server.setTimeout(serverTimeout); +server.on('timeout', () => { + assert.strictEqual(didReceiveData, false, 'Should not timeout'); +}); + +server.listen(0, common.mustCall(() => { + const client = http2.connect(`https://localhost:${server.address().port}`, + { rejectUnauthorized: false }); + + const req = client.request({ ':path': '/' }); + req.end(); + + const resume = () => req.resume(); + let receivedBufferLength = 0; + let firstReceivedAt; + req.on('data', common.mustCallAtLeast((buf) => { + if (receivedBufferLength === 0) { + didReceiveData = false; + firstReceivedAt = Date.now(); + } + receivedBufferLength += buf.length; + if (receivedBufferLength >= minReadSize && + receivedBufferLength < writeSize) { + didReceiveData = true; + receivedBufferLength = 0; + req.pause(); + setTimeout( + resume, + serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt) + ); + offsetTimeout = 0; + } + }, 1)); + req.on('end', common.mustCall(() => { + client.destroy(); + server.close(); + })); +})); From 809bb46a5e3a871a2c2951d1c39bb3772c9f2c7c Mon Sep 17 00:00:00 2001 From: Anatoli Papirovski Date: Sat, 28 Oct 2017 16:33:03 -0400 Subject: [PATCH 3/3] [fixup] store chunksSentSinceLastWrite in var --- lib/internal/http2/core.js | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 7889893dd52e3c..a667a2e7609237 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1088,8 +1088,10 @@ class Http2Session extends EventEmitter { // newly fetched, updated value. if (this[kState].writeQueueSize > 0) { const handle = this[kHandle]; - if (handle !== undefined && - handle.chunksSentSinceLastWrite !== handle.updateChunksSent()) { + const chunksSentSinceLastWrite = handle !== undefined ? + handle.chunksSentSinceLastWrite : null; + if (chunksSentSinceLastWrite !== null && + chunksSentSinceLastWrite !== handle.updateChunksSent()) { _unrefActive(this); return; } @@ -1401,8 +1403,10 @@ class Http2Stream extends Duplex { // newly fetched, updated value. if (this[kState].writeQueueSize > 0) { const handle = this[kSession][kHandle]; - if (handle !== undefined && - handle.chunksSentSinceLastWrite !== handle.updateChunksSent()) { + const chunksSentSinceLastWrite = handle !== undefined ? + handle.chunksSentSinceLastWrite : null; + if (chunksSentSinceLastWrite !== null && + chunksSentSinceLastWrite !== handle.updateChunksSent()) { _unrefActive(this); _unrefActive(this[kSession]); return;