Skip to content

Commit

Permalink
http2: fix stream reading resumption
Browse files Browse the repository at this point in the history
_read should always resume the underlying code that is attempting
to push data to a readable stream. Adjust http2 core code to
resume its reading appropriately.

Some other general cleanup around reading, resuming & draining.

PR-URL: #16580
Fixes: #16578
Reviewed-By: Anna Henningsen <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Colin Ihrig <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
  • Loading branch information
apapirovski authored and gibfahn committed Oct 31, 2017
1 parent 528edb2 commit 95a61cb
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 19 deletions.
26 changes: 12 additions & 14 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,13 @@ function onSessionRead(nread, buf, handle) {
'report this as a bug in Node.js');
_unrefActive(owner); // Reset the session timeout timer
_unrefActive(stream); // Reset the stream timeout timer
if (nread >= 0 && !stream.destroyed)
return stream.push(buf);
if (nread >= 0 && !stream.destroyed) {
// prevent overflowing the buffer while pause figures out the
// stream needs to actually pause and streamOnPause runs
if (!stream.push(buf))
owner[kHandle].streamReadStop(id);
return;
}

// Last chunk was received. End the readable side.
stream.push(null);
Expand Down Expand Up @@ -1276,8 +1281,6 @@ function onStreamClosed(code) {
}

function streamOnResume() {
if (this._paused)
return this.pause();
if (this[kID] === undefined) {
this.once('ready', streamOnResume);
return;
Expand All @@ -1299,12 +1302,10 @@ function streamOnPause() {
}
}

function streamOnDrain() {
const needPause = 0 > this._writableState.highWaterMark;
if (this._paused && !needPause) {
this._paused = false;
this.resume();
}
function handleFlushData(handle, streamID) {
assert(handle.flushData(streamID) === undefined,
`HTTP/2 Stream ${streamID} does not exist. Please report this as ` +
'a bug in Node.js');
}

function streamOnSessionConnect() {
Expand Down Expand Up @@ -1357,7 +1358,6 @@ class Http2Stream extends Duplex {
this.once('finish', onHandleFinish);
this.on('resume', streamOnResume);
this.on('pause', streamOnPause);
this.on('drain', streamOnDrain);
session.once('close', state.closeHandler);

if (session[kState].connecting) {
Expand Down Expand Up @@ -1507,9 +1507,7 @@ class Http2Stream extends Duplex {
return;
}
_unrefActive(this);
assert(this[kSession][kHandle].flushData(this[kID]) === undefined,
'HTTP/2 Stream #{this[kID]} does not exist. Please report this as ' +
'a bug in Node.js');
process.nextTick(handleFlushData, this[kSession][kHandle], this[kID]);
}

// Submits an RST-STREAM frame to shutdown this stream.
Expand Down
2 changes: 1 addition & 1 deletion src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ void Http2Session::FlushData(const FunctionCallbackInfo<Value>& args) {
if (!(stream = session->FindStream(id))) {
return args.GetReturnValue().Set(NGHTTP2_ERR_INVALID_STREAM_ID);
}
stream->FlushDataChunks();
stream->ReadResume();
}

void Http2Session::UpdateChunksSent(const FunctionCallbackInfo<Value>& args) {
Expand Down
10 changes: 9 additions & 1 deletion src/node_http2_core-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ inline void Nghttp2Session::SendPendingData() {
// the proceed with the rest.
while (srcRemaining > destRemaining) {
DEBUG_HTTP2("Nghttp2Session %s: pushing %d bytes to the socket\n",
TypeName(), destRemaining);
TypeName(), destLength + destRemaining);
memcpy(dest.base + destOffset, src + srcOffset, destRemaining);
destLength += destRemaining;
Send(&dest, destLength);
Expand Down Expand Up @@ -896,6 +896,14 @@ inline void Nghttp2Stream::ReadStart() {
FlushDataChunks();
}

inline void Nghttp2Stream::ReadResume() {
DEBUG_HTTP2("Nghttp2Stream %d: resume reading\n", id_);
flags_ &= ~NGHTTP2_STREAM_FLAG_READ_PAUSED;

// Flush any queued data chunks immediately out to the JS layer
FlushDataChunks();
}

inline void Nghttp2Stream::ReadStop() {
DEBUG_HTTP2("Nghttp2Stream %d: stop reading\n", id_);
if (!IsReading())
Expand Down
3 changes: 3 additions & 0 deletions src/node_http2_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,9 @@ class Nghttp2Stream {
// the session to be emitted at the JS side
inline void ReadStart();

// Resume Reading
inline void ReadResume();

// Stop/Pause Reading.
inline void ReadStop();

Expand Down
2 changes: 2 additions & 0 deletions test/parallel/parallel.status
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ test-npm-install: PASS,FLAKY
[$system==solaris] # Also applies to SmartOS

[$system==freebsd]
test-http2-compat-serverrequest-pipe: PASS,FLAKY
test-http2-pipe: PASS,FLAKY

[$system==aix]
6 changes: 3 additions & 3 deletions test/parallel/test-http2-compat-serverrequest-pipe.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ const path = require('path');

// piping should work as expected with createWriteStream

const loc = fixtures.path('person.jpg');
const fn = path.join(common.tmpDir, 'http2pipe.jpg');
common.refreshTmpDir();
const loc = fixtures.path('url-tests.js');
const fn = path.join(common.tmpDir, 'http2-url-tests.js');

const server = http2.createServer();

server.on('request', common.mustCall((req, res) => {
const dest = req.pipe(fs.createWriteStream(fn));
dest.on('finish', common.mustCall(() => {
assert.strictEqual(req.complete, true);
assert.deepStrictEqual(fs.readFileSync(loc), fs.readFileSync(fn));
assert.strictEqual(fs.readFileSync(loc).length, fs.readFileSync(fn).length);
fs.unlinkSync(fn);
res.end();
}));
Expand Down
49 changes: 49 additions & 0 deletions test/parallel/test-http2-pipe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
'use strict';

const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const fixtures = require('../common/fixtures');
const assert = require('assert');
const http2 = require('http2');
const fs = require('fs');
const path = require('path');

// piping should work as expected with createWriteStream

common.refreshTmpDir();
const loc = fixtures.path('url-tests.js');
const fn = path.join(common.tmpDir, 'http2-url-tests.js');

const server = http2.createServer();

server.on('stream', common.mustCall((stream) => {
const dest = stream.pipe(fs.createWriteStream(fn));
dest.on('finish', common.mustCall(() => {
assert.strictEqual(fs.readFileSync(loc).length, fs.readFileSync(fn).length);
fs.unlinkSync(fn);
stream.respond();
stream.end();
}));
}));

server.listen(0, common.mustCall(() => {
const port = server.address().port;
const client = http2.connect(`http://localhost:${port}`);

let remaining = 2;
function maybeClose() {
if (--remaining === 0) {
server.close();
client.destroy();
}
}

const req = client.request({ ':method': 'POST' });
req.on('response', common.mustCall());
req.resume();
req.on('end', common.mustCall(maybeClose));
const str = fs.createReadStream(loc);
str.on('end', common.mustCall(maybeClose));
str.pipe(req);
}));

0 comments on commit 95a61cb

Please sign in to comment.