From c92f5b5bd852c43a8a30700d78c1a02d239d19e4 Mon Sep 17 00:00:00 2001 From: fengmk2 Date: Fri, 8 Nov 2013 17:00:51 +0800 Subject: [PATCH] fix socket does not timeout bug, it will hang on life --- README.md | 5 +- benchmark/proxy.js | 9 ++-- benchmark/start.sh | 2 +- lib/agent.js | 126 ++++++++++++++++++++++++++++++++++++++++++--- package.json | 2 +- test/agent.test.js | 37 ++++++++++++- 6 files changed, 166 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 766fcb8..56b0476 100644 --- a/README.md +++ b/README.md @@ -18,8 +18,9 @@ var Agent = require('agentkeepalive'); var keepaliveAgent = new Agent({ maxSockets: 10, - maxKeepAliveRequests: 0, // max requests per keepalive socket, default is 0, no limit. - maxKeepAliveTime: 30000 // keepalive for 30 seconds + maxFreeSockets: 10, + keepAlive: true, + keepAliveMsecs: 30000 // keepalive for 30 seconds }); var options = { diff --git a/benchmark/proxy.js b/benchmark/proxy.js index 7d91766..59ed646 100644 --- a/benchmark/proxy.js +++ b/benchmark/proxy.js @@ -17,7 +17,9 @@ var maxSockets = parseInt(process.argv[2], 10) || 10; var SERVER = process.argv[3] || '127.0.0.1'; var agentKeepalive = new Agent({ + keepAlive: true, maxSockets: maxSockets, + maxFreeSockets: maxSockets, maxKeepAliveTime: 30000, }); var agentHttp = new http.Agent({ @@ -63,14 +65,14 @@ var rtNormals = { setInterval(function () { var name = SERVER + ':1984'; console.log('----------------------------------------------------------------'); - console.log('[proxy.js:%d] keepalive, %d created, %d requestFinished, %d req/socket, %s requests, %s sockets, %s unusedSockets, %d timeout\n%j', + console.log('[proxy.js:%d] keepalive, %d created, %d requestFinished, %d req/socket, %s requests, %s sockets, %s freeSockets, %d timeout\n%j', count, agentKeepalive.createSocketCount, agentKeepalive.requestFinishedCount, (agentKeepalive.requestFinishedCount / agentKeepalive.createSocketCount || 0).toFixed(2), agentKeepalive.requests[name] && agentKeepalive.requests[name].length || 0, agentKeepalive.sockets[name] && agentKeepalive.sockets[name].length || 0, - agentKeepalive.unusedSockets[name] && agentKeepalive.unusedSockets[name].length || 0, + agentKeepalive.freeSockets[name] && agentKeepalive.freeSockets[name].length || 0, agentKeepalive.timeoutSocketCount, rtKeepalives ); @@ -109,6 +111,7 @@ http.createServer(function (req, res) { method: method, agent: agent }; + req.on('data', function () {}); req.on('end', function () { var timer = null; var start = Date.now(); @@ -160,4 +163,4 @@ http.createServer(function (req, res) { }).listen(1985); -console.log('proxy start, listen on 1985'); \ No newline at end of file +console.log('proxy start, listen on 1985'); diff --git a/benchmark/start.sh b/benchmark/start.sh index 05237fe..c501e95 100644 --- a/benchmark/start.sh +++ b/benchmark/start.sh @@ -8,7 +8,7 @@ sudo ulimit -n 100000 sysctl -n machdep.cpu.brand_string SERVER=127.0.0.1 -NUM=1000 +NUM=500 CONCURRENT=60 maxSockets=50 DELAY=5 diff --git a/lib/agent.js b/lib/agent.js index cdf3bd2..79d4654 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -20,17 +20,131 @@ var http = require('http'); var https = require('https'); var util = require('util'); -if (typeof http.Agent.request === 'function') { - // node >= 0.11, default Agent is keepavlie - module.exports = http.Agent; - module.exports.HttpsAgent = https.Agent; - return; +var debug; +if (process.env.NODE_DEBUG && /agentkeepalive/.test(process.env.NODE_DEBUG)) { + debug = function (x) { + console.error.apply(console, arguments); + }; +} else { + debug = function () { }; } +function Agent(options) { + options = options || {}; + options.keepAliveMsecs = options.keepAliveMsecs || options.maxKeepAliveTime; + http.Agent.call(this, options); + + var self = this; + // max requests per keepalive socket, default is 0, no limit. + self.maxKeepAliveRequests = parseInt(options.maxKeepAliveRequests, 10) || 0; + // max keep alive time, default 60 seconds. + // if set `keepAliveMsecs = 0`, will disable keepalive feature. + self.createSocketCount = 0; + self.timeoutSocketCount = 0; + self.requestFinishedCount = 0; + + // override the `free` event listener + self.removeAllListeners('free'); + self.on('free', function (socket, options) { + self.requestFinishedCount++; + socket._requestCount++; + + var name = self.getName(options); + debug('agent.on(free)', name); + + if (!socket.destroyed && + self.requests[name] && self.requests[name].length) { + self.requests[name].shift().onSocket(socket); + if (self.requests[name].length === 0) { + // don't leak + delete self.requests[name]; + } + } else { + // If there are no pending requests, then put it in + // the freeSockets pool, but only if we're allowed to do so. + var req = socket._httpMessage; + if (req && + req.shouldKeepAlive && + !socket.destroyed && + self.options.keepAlive) { + var freeSockets = self.freeSockets[name]; + var freeLen = freeSockets ? freeSockets.length : 0; + var count = freeLen; + if (self.sockets[name]) + count += self.sockets[name].length; + + if (count >= self.maxSockets || freeLen >= self.maxFreeSockets) { + self.removeSocket(socket, options); + socket.destroy(); + } else { + freeSockets = freeSockets || []; + self.freeSockets[name] = freeSockets; + socket.setKeepAlive(true, self.keepAliveMsecs); + socket.unref && socket.unref(); + socket._httpMessage = null; + self.removeSocket(socket, options); + freeSockets.push(socket); + + // Avoid duplicitive timeout events by removing timeout listeners set on + // socket by previous requests. node does not do this normally because it + // assumes sockets are too short-lived for it to matter. It becomes a + // problem when sockets are being reused. Steps are being taken to fix + // this issue upstream in node v0.10.0. + // + // See https://github.com/joyent/node/commit/451ff1540ab536237e8d751d241d7fc3391a4087 + if (self.keepAliveMsecs && socket._events && Array.isArray(socket._events.timeout)) { + socket.removeAllListeners('timeout'); + // Restore the socket's setTimeout() that was remove as collateral + // damage. + socket.setTimeout(self.keepAliveMsecs, socket._maxKeepAliveTimeout); + } + } + } else { + self.removeSocket(socket, options); + socket.destroy(); + } + } + }); +} -var Agent = require('./_http_agent').Agent; +util.inherits(Agent, http.Agent); module.exports = Agent; +Agent.prototype.createSocket = function (req, options) { + var self = this; + var socket = http.Agent.prototype.createSocket.call(this, req, options); + socket._requestCount = 0; + if (self.keepAliveMsecs) { + socket._maxKeepAliveTimeout = function () { + debug('maxKeepAliveTimeout, socket destroy()'); + socket.destroy(); + self.timeoutSocketCount++; + }; + socket.setTimeout(self.keepAliveMsecs, socket._maxKeepAliveTimeout); + // Disable Nagle's algorithm: http://blog.caustik.com/2012/04/08/scaling-node-js-to-100k-concurrent-connections/ + socket.setNoDelay(true); + } + this.createSocketCount++; + return socket; +}; + +Agent.prototype.removeSocket = function (s, options) { + http.Agent.prototype.removeSocket.call(this, s, options); + var name = this.getName(options); + debug('removeSocket', name, 'destroyed:', s.destroyed); + + if (s.destroyed && this.freeSockets[name]) { + var index = this.freeSockets[name].indexOf(s); + if (index !== -1) { + this.freeSockets[name].splice(index, 1); + if (this.freeSockets[name].length === 0) { + // don't leak + delete this.freeSockets[name]; + } + } + } +}; + function HttpsAgent(options) { Agent.call(this, options); this.defaultPort = 443; diff --git a/package.json b/package.json index 9c9ad94..5124ed6 100644 --- a/package.json +++ b/package.json @@ -32,7 +32,7 @@ "coveralls": "*", "mocha-lcov-reporter": "*" }, - "engines": { "node": ">= 0.10.0" }, + "engines": { "node": ">= 0.11.8" }, "author": "fengmk2 (http://fengmk2.github.com)", "license": "MIT" } diff --git a/test/agent.test.js b/test/agent.test.js index a39a346..18622b2 100644 --- a/test/agent.test.js +++ b/test/agent.test.js @@ -33,6 +33,10 @@ describe('agent.test.js', function () { } else if (req.url === '/hang') { // Wait forever. return; + } else if (req.url === '/remote_close') { + setTimeout(function () { + req.connection.end(); + }, 500); } var info = urlparse(req.url, true); if (info.query.timeout) { @@ -183,7 +187,7 @@ describe('agent.test.js', function () { }).on('error', done); }); - it.skip('should use new socket when hit the max keepalive time: 1000ms', function (done) { + it('should use new socket when hit the max keepalive time: 1000ms', function (done) { // socket._handle will not timeout ... var name = 'localhost:' + port + '::'; agentkeepalive.sockets.should.have.not.key(name); @@ -255,7 +259,6 @@ describe('agent.test.js', function () { it('should not keepalive when client.abort()', function (done) { var name = 'localhost:' + port + '::'; agentkeepalive.sockets.should.have.not.key(name); - agentkeepalive.freeSockets.should.have.not.key(name); var client = agentkeepalive.get({ port: port, path: '/', @@ -329,6 +332,36 @@ describe('agent.test.js', function () { agent.requests[name].should.length(1); }); + it('should request /remote_close 200 status, after 500ms free socket close', function (done) { + var name = 'localhost:' + port + '::'; + agentkeepalive.sockets.should.not.have.key(name); + agentkeepalive.get({ + port: port, + path: '/remote_close' + }, function (res) { + res.should.status(200); + res.on('data', function (data) { + }); + res.on('end', function () { + agentkeepalive.sockets.should.have.key(name); + agentkeepalive.freeSockets.should.not.have.key(name); + setTimeout(function () { + agentkeepalive.sockets.should.not.have.key(name); + agentkeepalive.freeSockets.should.have.key(name); + agentkeepalive.freeSockets[name].should.length(1); + setTimeout(function () { + agentkeepalive.sockets.should.not.have.key(name); + agentkeepalive.freeSockets.should.not.have.key(name); + done(); + }, 510); + }, 10); + }); + }); + agentkeepalive.sockets.should.have.key(name); + agentkeepalive.sockets[name].should.length(1); + agentkeepalive.freeSockets.should.not.have.key(name); + }); + // it('should maxKeepAliveRequests work with 1 and 10', function (done) { // var name = 'localhost:' + port + '::'; // function request(agent, checkCount, callback) {