Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: fix stalled pipeline bug #3342

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/_http_common.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ var parsers = new FreeList('parsers', 1000, function() {

parser._headers = [];
parser._url = '';
parser._consumed = false;

// Only called in the slow case where slow means
// that the request headers were either fragmented
Expand Down Expand Up @@ -167,6 +168,9 @@ function freeParser(parser, req, socket) {
if (parser) {
parser._headers = [];
parser.onIncoming = null;
if (parser._consumed)
parser.unconsume();
parser._consumed = false;
if (parser.socket)
parser.socket.parser = null;
parser.socket = null;
Expand Down
66 changes: 31 additions & 35 deletions lib/_http_outgoing.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ OutgoingMessage.prototype._send = function(data, encoding, callback) {
this.outputEncodings.unshift('binary');
this.outputCallbacks.unshift(null);
this.outputSize += this._header.length;
if (this._onPendingData !== null)
if (typeof this._onPendingData === 'function')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is checking for typeof function part of a fix, or different cleanup?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a cleanup.

this._onPendingData(this._header.length);
}
this._headerSent = true;
Expand All @@ -158,22 +158,7 @@ OutgoingMessage.prototype._writeRaw = function(data, encoding, callback) {
// There might be pending data in the this.output buffer.
var outputLength = this.output.length;
if (outputLength > 0) {
var output = this.output;
var outputEncodings = this.outputEncodings;
var outputCallbacks = this.outputCallbacks;
connection.cork();
for (var i = 0; i < outputLength; i++) {
connection.write(output[i], outputEncodings[i],
outputCallbacks[i]);
}
connection.uncork();

this.output = [];
this.outputEncodings = [];
this.outputCallbacks = [];
if (this._onPendingData !== null)
this._onPendingData(-this.outputSize);
this.outputSize = 0;
this._flushOutput(connection);
} else if (data.length === 0) {
if (typeof callback === 'function')
process.nextTick(callback);
Expand All @@ -198,7 +183,7 @@ OutgoingMessage.prototype._buffer = function(data, encoding, callback) {
this.outputEncodings.push(encoding);
this.outputCallbacks.push(callback);
this.outputSize += data.length;
if (this._onPendingData !== null)
if (typeof this._onPendingData === 'function')
this._onPendingData(data.length);
return false;
};
Expand Down Expand Up @@ -644,26 +629,11 @@ OutgoingMessage.prototype._finish = function() {
// to attempt to flush any pending messages out to the socket.
OutgoingMessage.prototype._flush = function() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var socket = this.socket;
var outputLength, ret;
var ret;

if (socket && socket.writable) {
// There might be remaining data in this.output; write it out
outputLength = this.output.length;
if (outputLength > 0) {
var output = this.output;
var outputEncodings = this.outputEncodings;
var outputCallbacks = this.outputCallbacks;
socket.cork();
for (var i = 0; i < outputLength; i++) {
ret = socket.write(output[i], outputEncodings[i],
outputCallbacks[i]);
}
socket.uncork();

this.output = [];
this.outputEncodings = [];
this.outputCallbacks = [];
}
ret = this._flushOutput(socket);

if (this.finished) {
// This is a queue to the server or client to bring in the next this.
Expand All @@ -675,6 +645,32 @@ OutgoingMessage.prototype._flush = function() {
}
};

OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) {
var ret;
var outputLength = this.output.length;
if (outputLength <= 0)
return ret;

var output = this.output;
var outputEncodings = this.outputEncodings;
var outputCallbacks = this.outputCallbacks;
socket.cork();
for (var i = 0; i < outputLength; i++) {
ret = socket.write(output[i], outputEncodings[i],
outputCallbacks[i]);
}
socket.uncork();

this.output = [];
this.outputEncodings = [];
this.outputCallbacks = [];
if (typeof this._onPendingData === 'function')
this._onPendingData(-this.outputSize);
this.outputSize = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is in OutgoingMessage#_writeRaw() but not in OutgoingMessage#_finish(). Is this already addressed in the git commit message? Don't see any direct reference to why this is necessary for the fix.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You meant _flush?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah oop. Yeah _flush.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is used there too.


return ret;
};


OutgoingMessage.prototype.flushHeaders = function() {
if (!this._header) {
Expand Down
39 changes: 33 additions & 6 deletions lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,10 @@ function connectionListener(socket) {
socket.on = socketOnWrap;

var external = socket._handle._externalStream;
if (external)
if (external) {
parser._consumed = true;
parser.consume(external);
}
external = null;
parser[kOnExecute] = onParserExecute;

Expand Down Expand Up @@ -382,7 +384,7 @@ function connectionListener(socket) {
socket.removeListener('data', socketOnData);
socket.removeListener('end', socketOnEnd);
socket.removeListener('close', serverSocketCloseListener);
parser.unconsume(socket._handle._externalStream);
unconsume(parser, socket);
parser.finish();
freeParser(parser, req, null);
parser = null;
Expand Down Expand Up @@ -530,13 +532,38 @@ function connectionListener(socket) {
exports._connectionListener = connectionListener;

function onSocketResume() {
if (this._handle)
// It may seem that the socket is resumed, but this is an enemy's trick to
// deceive us! `resume` is emitted asynchronously, and may be called from
// `incoming.readStart()`. Stop the socket again here, just to preserve the
// state.
//
// We don't care about stream semantics for the consumed socket anyway.
if (this._paused) {
this.pause();
return;
}

if (this._handle && !this._handle.reading) {
this._handle.reading = true;
this._handle.readStart();
}
}

function onSocketPause() {
if (this._handle)
if (this._handle && this._handle.reading) {
this._handle.reading = false;
this._handle.readStop();
}
}

function unconsume(parser, socket) {
if (socket._handle) {
if (parser._consumed)
parser.unconsume(socket._handle._externalStream);
parser._consumed = false;
socket.removeListener('pause', onSocketPause);
socket.removeListener('resume', onSocketResume);
}
}

function socketOnWrap(ev, fn) {
Expand All @@ -546,8 +573,8 @@ function socketOnWrap(ev, fn) {
return res;
}

if (this._handle && (ev === 'data' || ev === 'readable'))
this.parser.unconsume(this._handle._externalStream);
if (ev === 'data' || ev === 'readable')
unconsume(this.parser, this);

return res;
}
17 changes: 11 additions & 6 deletions src/node_http_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,18 @@ class Parser : public BaseObject {
if (parser->prev_alloc_cb_.is_empty())
return;

CHECK(args[0]->IsExternal());
Local<External> stream_obj = args[0].As<External>();
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
CHECK_NE(stream, nullptr);
// Restore stream's callbacks
if (args.Length() == 1 && args[0]->IsExternal()) {
Local<External> stream_obj = args[0].As<External>();
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
CHECK_NE(stream, nullptr);

stream->set_alloc_cb(parser->prev_alloc_cb_);
stream->set_read_cb(parser->prev_read_cb_);
}

stream->set_alloc_cb(parser->prev_alloc_cb_);
stream->set_read_cb(parser->prev_read_cb_);
parser->prev_alloc_cb_.clear();
parser->prev_read_cb_.clear();
}


Expand Down
41 changes: 41 additions & 0 deletions test/parallel/test-http-pipeline-regr-3332.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const http = require('http');
const net = require('net');

const big = new Buffer(16 * 1024);
big.fill('A');

const COUNT = 1e4;

var received = 0;

var client;
const server = http.createServer(function(req, res) {
res.end(big, function() {
if (++received === COUNT) {
server.close();
client.end();
}
});
}).listen(common.PORT, function() {
var req = new Array(COUNT + 1).join('GET / HTTP/1.1\r\n\r\n');
client = net.connect(common.PORT, function() {
client.write(req);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't having no 'data' event cause all the data to buffer? Don't think that's what you're trying to test, and that would accumulate to ~150MB. On our machines that's nothing big, but I'm thinking about execution time. It's not huge, but is longer than the rest:

ok 56 test-http-pipeline-regr-3332.js
  ---
  duration_ms: 42.108

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client.resume() below ;)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That allows data to be read, but not removed from the internal queue:

$ /usr/bin/time ./node test/parallel/test-http-pipeline-regr-3332.js 
0.43user 0.15system 0:00.54elapsed 107%CPU (0avgtext+0avgdata 123572maxresident)k

~123MB memory usage for this test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Though this means net.connect() doesn't follow standard streaming behavior. But that's not for now.


// Just let the test terminate instead of hanging
client.on('close', function() {
if (received !== COUNT)
server.close();
});
client.resume();
});

process.on('exit', function() {
// The server should pause connection on pipeline flood, but it shoul still
// resume it and finish processing the requests, when its output queue will
// be empty again.
assert.equal(received, COUNT);
});