Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http2: fix stream reading resumption #16580

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,13 @@ function onSessionRead(nread, buf, handle) {
'report this as a bug in Node.js');
_unrefActive(owner); // Reset the session timeout timer
_unrefActive(stream); // Reset the stream timeout timer
if (nread >= 0 && !stream.destroyed)
return stream.push(buf);
if (nread >= 0 && !stream.destroyed) {
// prevent overflowing the buffer while pause figures out the
// stream needs to actually pause and streamOnPause runs
if (!stream.push(buf))
owner[kHandle].streamReadStop(id);
return;
}

// Last chunk was received. End the readable side.
stream.push(null);
Expand Down Expand Up @@ -1276,8 +1281,6 @@ function onStreamClosed(code) {
}

function streamOnResume() {
if (this._paused)
return this.pause();
if (this[kID] === undefined) {
this.once('ready', streamOnResume);
return;
Expand All @@ -1299,12 +1302,10 @@ function streamOnPause() {
}
}

function streamOnDrain() {
const needPause = 0 > this._writableState.highWaterMark;
if (this._paused && !needPause) {
this._paused = false;
this.resume();
}
function handleFlushData(handle, streamID) {
assert(handle.flushData(streamID) === undefined,
`HTTP/2 Stream ${streamID} does not exist. Please report this as ` +
'a bug in Node.js');
}

function streamOnSessionConnect() {
Expand Down Expand Up @@ -1357,7 +1358,6 @@ class Http2Stream extends Duplex {
this.once('finish', onHandleFinish);
this.on('resume', streamOnResume);
this.on('pause', streamOnPause);
this.on('drain', streamOnDrain);
session.once('close', state.closeHandler);

if (session[kState].connecting) {
Expand Down Expand Up @@ -1507,9 +1507,7 @@ class Http2Stream extends Duplex {
return;
}
_unrefActive(this);
assert(this[kSession][kHandle].flushData(this[kID]) === undefined,
'HTTP/2 Stream #{this[kID]} does not exist. Please report this as ' +
'a bug in Node.js');
process.nextTick(handleFlushData, this[kSession][kHandle], this[kID]);
}

// Submits an RST-STREAM frame to shutdown this stream.
Expand Down
2 changes: 1 addition & 1 deletion src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ void Http2Session::FlushData(const FunctionCallbackInfo<Value>& args) {
if (!(stream = session->FindStream(id))) {
return args.GetReturnValue().Set(NGHTTP2_ERR_INVALID_STREAM_ID);
}
stream->FlushDataChunks();
stream->ReadResume();
}

void Http2Session::UpdateChunksSent(const FunctionCallbackInfo<Value>& args) {
Expand Down
10 changes: 9 additions & 1 deletion src/node_http2_core-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ inline void Nghttp2Session::SendPendingData() {
// the proceed with the rest.
while (srcRemaining > destRemaining) {
DEBUG_HTTP2("Nghttp2Session %s: pushing %d bytes to the socket\n",
TypeName(), destRemaining);
TypeName(), destLength + destRemaining);
memcpy(dest.base + destOffset, src + srcOffset, destRemaining);
destLength += destRemaining;
Send(&dest, destLength);
Expand Down Expand Up @@ -896,6 +896,14 @@ inline void Nghttp2Stream::ReadStart() {
FlushDataChunks();
}

inline void Nghttp2Stream::ReadResume() {
DEBUG_HTTP2("Nghttp2Stream %d: resume reading\n", id_);
flags_ &= ~NGHTTP2_STREAM_FLAG_READ_PAUSED;

// Flush any queued data chunks immediately out to the JS layer
FlushDataChunks();
}

inline void Nghttp2Stream::ReadStop() {
DEBUG_HTTP2("Nghttp2Stream %d: stop reading\n", id_);
if (!IsReading())
Expand Down
3 changes: 3 additions & 0 deletions src/node_http2_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,9 @@ class Nghttp2Stream {
// the session to be emitted at the JS side
inline void ReadStart();

// Resume Reading
inline void ReadResume();

// Stop/Pause Reading.
inline void ReadStop();

Expand Down
2 changes: 2 additions & 0 deletions test/parallel/parallel.status
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ test-npm-install: PASS,FLAKY
[$system==solaris] # Also applies to SmartOS

[$system==freebsd]
test-http2-compat-serverrequest-pipe: PASS,FLAKY
test-http2-pipe: PASS,FLAKY

[$system==aix]
6 changes: 3 additions & 3 deletions test/parallel/test-http2-compat-serverrequest-pipe.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ const path = require('path');

// piping should work as expected with createWriteStream

const loc = fixtures.path('person.jpg');
const fn = path.join(common.tmpDir, 'http2pipe.jpg');
common.refreshTmpDir();
const loc = fixtures.path('url-tests.js');
const fn = path.join(common.tmpDir, 'http2-url-tests.js');

const server = http2.createServer();

server.on('request', common.mustCall((req, res) => {
const dest = req.pipe(fs.createWriteStream(fn));
dest.on('finish', common.mustCall(() => {
assert.strictEqual(req.complete, true);
assert.deepStrictEqual(fs.readFileSync(loc), fs.readFileSync(fn));
assert.strictEqual(fs.readFileSync(loc).length, fs.readFileSync(fn).length);
fs.unlinkSync(fn);
res.end();
}));
Expand Down
49 changes: 49 additions & 0 deletions test/parallel/test-http2-pipe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
'use strict';

const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const fixtures = require('../common/fixtures');
const assert = require('assert');
const http2 = require('http2');
const fs = require('fs');
const path = require('path');

// piping should work as expected with createWriteStream

common.refreshTmpDir();
const loc = fixtures.path('url-tests.js');
const fn = path.join(common.tmpDir, 'http2-url-tests.js');

const server = http2.createServer();

server.on('stream', common.mustCall((stream) => {
const dest = stream.pipe(fs.createWriteStream(fn));
dest.on('finish', common.mustCall(() => {
assert.strictEqual(fs.readFileSync(loc).length, fs.readFileSync(fn).length);
fs.unlinkSync(fn);
stream.respond();
stream.end();
}));
}));

server.listen(0, common.mustCall(() => {
const port = server.address().port;
const client = http2.connect(`http://localhost:${port}`);

let remaining = 2;
function maybeClose() {
if (--remaining === 0) {
server.close();
client.destroy();
}
}

const req = client.request({ ':method': 'POST' });
req.on('response', common.mustCall());
req.resume();
req.on('end', common.mustCall(maybeClose));
const str = fs.createReadStream(loc);
str.on('end', common.mustCall(maybeClose));
str.pipe(req);
}));