From fe57e5685ddf568b47076b125d6f5c067a7e9763 Mon Sep 17 00:00:00 2001 From: theanarkh Date: Sun, 24 Jul 2022 18:18:33 +0800 Subject: [PATCH] cluster: send connection to other server when worker drop it PR-URL: https://github.com/nodejs/node/pull/43747 Reviewed-By: Matteo Collina Reviewed-By: Paolo Insogna --- lib/internal/cluster/child.js | 9 ++- ...test-cluster-net-server-drop-connection.js | 64 +++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) create mode 100644 test/parallel/test-cluster-net-server-drop-connection.js diff --git a/lib/internal/cluster/child.js b/lib/internal/cluster/child.js index 3224b3bac4b2c3..f960878a70aca3 100644 --- a/lib/internal/cluster/child.js +++ b/lib/internal/cluster/child.js @@ -208,7 +208,14 @@ function rr(message, { indexesKey, index }, cb) { function onconnection(message, handle) { const key = message.key; const server = handles.get(key); - const accepted = server !== undefined; + let accepted = server !== undefined; + + if (accepted && server[owner_symbol]) { + const self = server[owner_symbol]; + if (self.maxConnections && self._connections >= self.maxConnections) { + accepted = false; + } + } send({ ack: message.seq, accepted }); diff --git a/test/parallel/test-cluster-net-server-drop-connection.js b/test/parallel/test-cluster-net-server-drop-connection.js new file mode 100644 index 00000000000000..5df62a9a630885 --- /dev/null +++ b/test/parallel/test-cluster-net-server-drop-connection.js @@ -0,0 +1,64 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const net = require('net'); +const cluster = require('cluster'); +const tmpdir = require('../common/tmpdir'); + +// The core has bug in handling pipe handle by ipc when platform is win32, +// it can be triggered on win32. I will fix it in another pr. +if (common.isWindows) + common.skip('no setSimultaneousAccepts on pipe handle'); + +let connectionCount = 0; +let listenCount = 0; +let worker1; +let worker2; + +function request(path) { + for (let i = 0; i < 10; i++) { + net.connect(path); + } +} + +function handleMessage(message) { + assert.match(message.action, /listen|connection/); + if (message.action === 'listen') { + if (++listenCount === 2) { + request(common.PIPE); + } + } else if (message.action === 'connection') { + if (++connectionCount === 10) { + worker1.send({ action: 'disconnect' }); + worker2.send({ action: 'disconnect' }); + } + } +} + +if (cluster.isPrimary) { + cluster.schedulingPolicy = cluster.SCHED_RR; + tmpdir.refresh(); + worker1 = cluster.fork({ maxConnections: 1, pipePath: common.PIPE }); + worker2 = cluster.fork({ maxConnections: 9, pipePath: common.PIPE }); + worker1.on('message', common.mustCall((message) => { + handleMessage(message); + }, 2)); + worker2.on('message', common.mustCall((message) => { + handleMessage(message); + }, 10)); +} else { + const server = net.createServer(common.mustCall((socket) => { + process.send({ action: 'connection' }); + }, +process.env.maxConnections)); + + server.listen(process.env.pipePath, common.mustCall(() => { + process.send({ action: 'listen' }); + })); + + server.maxConnections = +process.env.maxConnections; + + process.on('message', common.mustCall((message) => { + assert.strictEqual(message.action, 'disconnect'); + process.disconnect(); + })); +}