Skip to content

Commit

Permalink
fix socket does not timeout bug, it will hang on life
Browse files Browse the repository at this point in the history
  • Loading branch information
fengmk2 committed Nov 8, 2013
1 parent 13eb5b7 commit c92f5b5
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 15 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ var Agent = require('agentkeepalive');

var keepaliveAgent = new Agent({
maxSockets: 10,
maxKeepAliveRequests: 0, // max requests per keepalive socket, default is 0, no limit.
maxKeepAliveTime: 30000 // keepalive for 30 seconds
maxFreeSockets: 10,
keepAlive: true,
keepAliveMsecs: 30000 // keepalive for 30 seconds
});

var options = {
Expand Down
9 changes: 6 additions & 3 deletions benchmark/proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ var maxSockets = parseInt(process.argv[2], 10) || 10;
var SERVER = process.argv[3] || '127.0.0.1';

var agentKeepalive = new Agent({
keepAlive: true,
maxSockets: maxSockets,
maxFreeSockets: maxSockets,
maxKeepAliveTime: 30000,
});
var agentHttp = new http.Agent({
Expand Down Expand Up @@ -63,14 +65,14 @@ var rtNormals = {
setInterval(function () {
var name = SERVER + ':1984';
console.log('----------------------------------------------------------------');
console.log('[proxy.js:%d] keepalive, %d created, %d requestFinished, %d req/socket, %s requests, %s sockets, %s unusedSockets, %d timeout\n%j',
console.log('[proxy.js:%d] keepalive, %d created, %d requestFinished, %d req/socket, %s requests, %s sockets, %s freeSockets, %d timeout\n%j',
count,
agentKeepalive.createSocketCount,
agentKeepalive.requestFinishedCount,
(agentKeepalive.requestFinishedCount / agentKeepalive.createSocketCount || 0).toFixed(2),
agentKeepalive.requests[name] && agentKeepalive.requests[name].length || 0,
agentKeepalive.sockets[name] && agentKeepalive.sockets[name].length || 0,
agentKeepalive.unusedSockets[name] && agentKeepalive.unusedSockets[name].length || 0,
agentKeepalive.freeSockets[name] && agentKeepalive.freeSockets[name].length || 0,
agentKeepalive.timeoutSocketCount,
rtKeepalives
);
Expand Down Expand Up @@ -109,6 +111,7 @@ http.createServer(function (req, res) {
method: method,
agent: agent
};
req.on('data', function () {});
req.on('end', function () {
var timer = null;
var start = Date.now();
Expand Down Expand Up @@ -160,4 +163,4 @@ http.createServer(function (req, res) {

}).listen(1985);

console.log('proxy start, listen on 1985');
console.log('proxy start, listen on 1985');
2 changes: 1 addition & 1 deletion benchmark/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ sudo ulimit -n 100000
sysctl -n machdep.cpu.brand_string

SERVER=127.0.0.1
NUM=1000
NUM=500
CONCURRENT=60
maxSockets=50
DELAY=5
Expand Down
126 changes: 120 additions & 6 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,131 @@ var http = require('http');
var https = require('https');
var util = require('util');

if (typeof http.Agent.request === 'function') {
// node >= 0.11, default Agent is keepavlie
module.exports = http.Agent;
module.exports.HttpsAgent = https.Agent;
return;
var debug;
if (process.env.NODE_DEBUG && /agentkeepalive/.test(process.env.NODE_DEBUG)) {
debug = function (x) {
console.error.apply(console, arguments);
};
} else {
debug = function () { };
}

function Agent(options) {
options = options || {};
options.keepAliveMsecs = options.keepAliveMsecs || options.maxKeepAliveTime;
http.Agent.call(this, options);

var self = this;
// max requests per keepalive socket, default is 0, no limit.
self.maxKeepAliveRequests = parseInt(options.maxKeepAliveRequests, 10) || 0;
// max keep alive time, default 60 seconds.
// if set `keepAliveMsecs = 0`, will disable keepalive feature.
self.createSocketCount = 0;
self.timeoutSocketCount = 0;
self.requestFinishedCount = 0;

// override the `free` event listener
self.removeAllListeners('free');
self.on('free', function (socket, options) {
self.requestFinishedCount++;
socket._requestCount++;

var name = self.getName(options);
debug('agent.on(free)', name);

if (!socket.destroyed &&
self.requests[name] && self.requests[name].length) {
self.requests[name].shift().onSocket(socket);
if (self.requests[name].length === 0) {
// don't leak
delete self.requests[name];
}
} else {
// If there are no pending requests, then put it in
// the freeSockets pool, but only if we're allowed to do so.
var req = socket._httpMessage;
if (req &&
req.shouldKeepAlive &&
!socket.destroyed &&
self.options.keepAlive) {
var freeSockets = self.freeSockets[name];
var freeLen = freeSockets ? freeSockets.length : 0;
var count = freeLen;
if (self.sockets[name])
count += self.sockets[name].length;

if (count >= self.maxSockets || freeLen >= self.maxFreeSockets) {
self.removeSocket(socket, options);
socket.destroy();
} else {
freeSockets = freeSockets || [];
self.freeSockets[name] = freeSockets;
socket.setKeepAlive(true, self.keepAliveMsecs);
socket.unref && socket.unref();
socket._httpMessage = null;
self.removeSocket(socket, options);
freeSockets.push(socket);

// Avoid duplicitive timeout events by removing timeout listeners set on
// socket by previous requests. node does not do this normally because it
// assumes sockets are too short-lived for it to matter. It becomes a
// problem when sockets are being reused. Steps are being taken to fix
// this issue upstream in node v0.10.0.
//
// See https://github.com/joyent/node/commit/451ff1540ab536237e8d751d241d7fc3391a4087
if (self.keepAliveMsecs && socket._events && Array.isArray(socket._events.timeout)) {
socket.removeAllListeners('timeout');
// Restore the socket's setTimeout() that was remove as collateral
// damage.
socket.setTimeout(self.keepAliveMsecs, socket._maxKeepAliveTimeout);
}
}
} else {
self.removeSocket(socket, options);
socket.destroy();
}
}
});
}

var Agent = require('./_http_agent').Agent;
util.inherits(Agent, http.Agent);
module.exports = Agent;

Agent.prototype.createSocket = function (req, options) {
var self = this;
var socket = http.Agent.prototype.createSocket.call(this, req, options);
socket._requestCount = 0;
if (self.keepAliveMsecs) {
socket._maxKeepAliveTimeout = function () {
debug('maxKeepAliveTimeout, socket destroy()');
socket.destroy();
self.timeoutSocketCount++;
};
socket.setTimeout(self.keepAliveMsecs, socket._maxKeepAliveTimeout);
// Disable Nagle's algorithm: http://blog.caustik.com/2012/04/08/scaling-node-js-to-100k-concurrent-connections/
socket.setNoDelay(true);
}
this.createSocketCount++;
return socket;
};

Agent.prototype.removeSocket = function (s, options) {
http.Agent.prototype.removeSocket.call(this, s, options);
var name = this.getName(options);
debug('removeSocket', name, 'destroyed:', s.destroyed);

if (s.destroyed && this.freeSockets[name]) {
var index = this.freeSockets[name].indexOf(s);
if (index !== -1) {
this.freeSockets[name].splice(index, 1);
if (this.freeSockets[name].length === 0) {
// don't leak
delete this.freeSockets[name];
}
}
}
};

function HttpsAgent(options) {
Agent.call(this, options);
this.defaultPort = 443;
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"coveralls": "*",
"mocha-lcov-reporter": "*"
},
"engines": { "node": ">= 0.10.0" },
"engines": { "node": ">= 0.11.8" },
"author": "fengmk2 <[email protected]> (http://fengmk2.github.com)",
"license": "MIT"
}
37 changes: 35 additions & 2 deletions test/agent.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ describe('agent.test.js', function () {
} else if (req.url === '/hang') {
// Wait forever.
return;
} else if (req.url === '/remote_close') {
setTimeout(function () {
req.connection.end();
}, 500);
}
var info = urlparse(req.url, true);
if (info.query.timeout) {
Expand Down Expand Up @@ -183,7 +187,7 @@ describe('agent.test.js', function () {
}).on('error', done);
});

it.skip('should use new socket when hit the max keepalive time: 1000ms', function (done) {
it('should use new socket when hit the max keepalive time: 1000ms', function (done) {
// socket._handle will not timeout ...
var name = 'localhost:' + port + '::';
agentkeepalive.sockets.should.have.not.key(name);
Expand Down Expand Up @@ -255,7 +259,6 @@ describe('agent.test.js', function () {
it('should not keepalive when client.abort()', function (done) {
var name = 'localhost:' + port + '::';
agentkeepalive.sockets.should.have.not.key(name);
agentkeepalive.freeSockets.should.have.not.key(name);
var client = agentkeepalive.get({
port: port,
path: '/',
Expand Down Expand Up @@ -329,6 +332,36 @@ describe('agent.test.js', function () {
agent.requests[name].should.length(1);
});

it('should request /remote_close 200 status, after 500ms free socket close', function (done) {
var name = 'localhost:' + port + '::';
agentkeepalive.sockets.should.not.have.key(name);
agentkeepalive.get({
port: port,
path: '/remote_close'
}, function (res) {
res.should.status(200);
res.on('data', function (data) {
});
res.on('end', function () {
agentkeepalive.sockets.should.have.key(name);
agentkeepalive.freeSockets.should.not.have.key(name);
setTimeout(function () {
agentkeepalive.sockets.should.not.have.key(name);
agentkeepalive.freeSockets.should.have.key(name);
agentkeepalive.freeSockets[name].should.length(1);
setTimeout(function () {
agentkeepalive.sockets.should.not.have.key(name);
agentkeepalive.freeSockets.should.not.have.key(name);
done();
}, 510);
}, 10);
});
});
agentkeepalive.sockets.should.have.key(name);
agentkeepalive.sockets[name].should.length(1);
agentkeepalive.freeSockets.should.not.have.key(name);
});

// it('should maxKeepAliveRequests work with 1 and 10', function (done) {
// var name = 'localhost:' + port + '::';
// function request(agent, checkCount, callback) {
Expand Down

0 comments on commit c92f5b5

Please sign in to comment.