From 05f926e13ea2beb3635c7426b9b165d034e8a96c Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Thu, 11 May 2017 01:28:19 +0200 Subject: [PATCH] [perf] Use pattern matching at the namespace level (#217) This follows #46. Each node will now listen to only three channels: - `socket.io##*`: used when broadcasting - `socket.io-request##`: used for requesting information (ex: get every room in the cluster) - `socket.io-response##`: used for responding to requests We keep the benefits of #46 since: - messages from other namespaces are ignored - when emitting to a single room, the message is sent to `socket.io##`, so listeners can check whether they have the room before unpacking the message (which is CPU consuming). But there is no need to subscribe / unsubscribe every time a socket joins or leaves a room (which is also CPU consuming when there are thousands of subscriptions). --- .travis.yml | 2 - README.md | 2 - index.js | 146 ++++++++++---------------------------------------- package.json | 7 ++- test/index.js | 34 +++++++++--- 5 files changed, 58 insertions(+), 133 deletions(-) diff --git a/.travis.yml b/.travis.yml index 5285b2f..72c68e8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,6 @@ language: node_js sudo: false node_js: - - "0.10" - - "0.12" - "4" - "6" - "node" diff --git a/README.md b/README.md index 8d1a934..c729c26 100644 --- a/README.md +++ b/README.md @@ -32,11 +32,9 @@ The following options are allowed: - `key`: the name of the key to pub/sub events on as prefix (`socket.io`) - `host`: host to connect to redis on (`localhost`) - `port`: port to connect to redis on (`6379`) -- `subEvent`: optional, the redis client event name to subscribe to (`messageBuffer`) - `pubClient`: optional, the redis client to publish events on - `subClient`: optional, the redis client to subscribe to events on - `requestsTimeout`: optional, after this timeout the adapter will stop waiting from responses to request (`1000ms`) -- `withChannelMultiplexing`: optional, whether channel multiplexing is enabled (a new subscription will be trigggered for each room) (`true`) If you decide to supply `pubClient` and `subClient`, make sure you use [node_redis](https://github.com/mranney/node_redis) as a client or one diff --git a/index.js b/index.js index d98943a..d6b36f7 100644 --- a/index.js +++ b/index.js @@ -8,7 +8,6 @@ var redis = require('redis').createClient; var msgpack = require('msgpack-lite'); var Adapter = require('socket.io-adapter'); var debug = require('debug')('socket.io-redis'); -var async = require('async'); /** * Module exports. @@ -50,11 +49,8 @@ function adapter(uri, opts) { // opts var pub = opts.pubClient; var sub = opts.subClient; - var prefix = opts.key || 'socket.io'; - var subEvent = opts.subEvent || 'messageBuffer'; var requestsTimeout = opts.requestsTimeout || 1000; - var withChannelMultiplexing = false !== opts.withChannelMultiplexing; // init clients if needed function createClient() { @@ -85,7 +81,6 @@ function adapter(uri, opts) { this.uid = uid; this.prefix = prefix; this.requestsTimeout = requestsTimeout; - this.withChannelMultiplexing = withChannelMultiplexing; this.channel = prefix + '#' + nsp.name + '#'; this.requestChannel = prefix + '-request#' + this.nsp.name + '#'; @@ -107,11 +102,17 @@ function adapter(uri, opts) { var self = this; - sub.subscribe([this.channel, this.requestChannel, this.responseChannel], function(err){ + sub.psubscribe(this.channel + '*', function(err){ + if (err) self.emit('error', err); + }); + + sub.on('pmessageBuffer', this.onmessage.bind(this)); + + sub.subscribe([this.requestChannel, this.responseChannel], function(err){ if (err) self.emit('error', err); }); - sub.on(subEvent, this.onmessage.bind(this)); + sub.on('messageBuffer', this.onrequest.bind(this)); function onError(err) { self.emit('error', err); @@ -132,21 +133,22 @@ function adapter(uri, opts) { * @api private */ - Redis.prototype.onmessage = function(channel, msg){ + Redis.prototype.onmessage = function(pattern, channel, msg){ channel = channel.toString(); - if (this.channelMatches(channel, this.requestChannel)) { - return this.onrequest(channel, msg); - } else if (this.channelMatches(channel, this.responseChannel)) { - return this.onresponse(channel, msg); - } else if (!this.channelMatches(channel, this.channel)) { + if (!this.channelMatches(channel, this.channel)) { return debug('ignore different channel'); } + var room = channel.substring(this.channel.length); + if (room !== '' && !this.rooms.hasOwnProperty(room)) { + return debug('ignore unknown room %s', room); + } + var args = msgpack.decode(msg); var packet; - if (uid == args.shift()) return debug('ignore same uid'); + if (uid === args.shift()) return debug('ignore same uid'); packet = args[0]; @@ -170,6 +172,14 @@ function adapter(uri, opts) { */ Redis.prototype.onrequest = function(channel, msg){ + channel = channel.toString(); + + if (this.channelMatches(channel, this.responseChannel)) { + return this.onresponse(channel, msg); + } else if (!this.channelMatches(channel, this.requestChannel)) { + return debug('ignore different channel'); + } + var self = this; var request; @@ -394,8 +404,8 @@ function adapter(uri, opts) { packet.nsp = this.nsp.name; if (!(remote || (opts && opts.flags && opts.flags.local))) { var msg = msgpack.encode([uid, packet, opts]); - if (this.withChannelMultiplexing && opts.rooms && opts.rooms.length === 1) { - pub.publish(this.channel + opts.rooms[0] + '#', msg); + if (opts.rooms && opts.rooms.length === 1) { + pub.publish(this.channel + opts.rooms[0], msg); } else { pub.publish(this.channel, msg); } @@ -403,107 +413,6 @@ function adapter(uri, opts) { Adapter.prototype.broadcast.call(this, packet, opts); }; - /** - * Subscribe client to room messages. - * - * @param {String} client id - * @param {String} room - * @param {Function} callback (optional) - * @api public - */ - - Redis.prototype.add = function(id, room, fn){ - debug('adding %s to %s ', id, room); - var self = this; - // subscribe only once per room - var alreadyHasRoom = this.rooms.hasOwnProperty(room); - Adapter.prototype.add.call(this, id, room); - - if (!this.withChannelMultiplexing || alreadyHasRoom) { - if (fn) fn(null); - return; - } - - var channel = this.channel + room + '#'; - - function onSubscribe(err) { - if (err) { - self.emit('error', err); - if (fn) fn(err); - return; - } - if (fn) fn(null); - } - - sub.subscribe(channel, onSubscribe); - }; - - /** - * Unsubscribe client from room messages. - * - * @param {String} session id - * @param {String} room id - * @param {Function} callback (optional) - * @api public - */ - - Redis.prototype.del = function(id, room, fn){ - debug('removing %s from %s', id, room); - - var self = this; - var hasRoom = this.rooms.hasOwnProperty(room); - Adapter.prototype.del.call(this, id, room); - - if (this.withChannelMultiplexing && hasRoom && !this.rooms[room]) { - var channel = this.channel + room + '#'; - - function onUnsubscribe(err) { - if (err) { - self.emit('error', err); - if (fn) fn(err); - return; - } - if (fn) fn(null); - } - - sub.unsubscribe(channel, onUnsubscribe); - } else { - if (fn) process.nextTick(fn.bind(null, null)); - } - }; - - /** - * Unsubscribe client completely. - * - * @param {String} client id - * @param {Function} callback (optional) - * @api public - */ - - Redis.prototype.delAll = function(id, fn){ - debug('removing %s from all rooms', id); - - var self = this; - var rooms = this.sids[id]; - - if (!rooms) { - if (fn) process.nextTick(fn.bind(null, null)); - return; - } - - async.each(Object.keys(rooms), function(room, next){ - self.del(id, room, next); - }, function(err){ - if (err) { - self.emit('error', err); - if (fn) fn(err); - return; - } - delete self.sids[id]; - if (fn) fn(null); - }); - }; - /** * Gets a list of clients by sid. * @@ -531,6 +440,7 @@ function adapter(uri, opts) { } numsub = parseInt(numsub[1], 10); + debug('waiting for %d responses to "clients" request', numsub); var request = JSON.stringify({ requestid : requestid, @@ -619,6 +529,7 @@ function adapter(uri, opts) { } numsub = parseInt(numsub[1], 10); + debug('waiting for %d responses to "allRooms" request', numsub); var request = JSON.stringify({ requestid : requestid, @@ -794,6 +705,7 @@ function adapter(uri, opts) { } numsub = parseInt(numsub[1], 10); + debug('waiting for %d responses to "customRequest" request', numsub); var request = JSON.stringify({ requestid : requestid, diff --git a/package.json b/package.json index b597dad..4e1d5e4 100644 --- a/package.json +++ b/package.json @@ -14,18 +14,17 @@ "test": "mocha" }, "dependencies": { - "async": "2.1.4", "debug": "2.3.3", "msgpack-lite": "0.1.26", "redis": "2.6.3", - "socket.io-adapter": "0.5.0", + "socket.io-adapter": "~1.1.0", "uid2": "0.0.3" }, "devDependencies": { "expect.js": "0.3.1", "ioredis": "2.5.0", "mocha": "3.2.0", - "socket.io": "1.7.x", - "socket.io-client": "1.7.x" + "socket.io": "latest", + "socket.io-client": "latest" } } diff --git a/test/index.js b/test/index.js index a144f63..9bbbbbe 100644 --- a/test/index.js +++ b/test/index.js @@ -15,12 +15,6 @@ var socket1, socket2, socket3; { name: 'socket.io-redis' }, - { - name: 'socket.io-redis without channel multiplexing', - options: { - withChannelMultiplexing: false - } - }, { name: 'socket.io-redis with ioredis', options: function () { @@ -152,7 +146,7 @@ var socket1, socket2, socket3; it('deletes rooms upon disconnection', function(done){ socket1.join('woot'); socket1.on('disconnect', function() { - expect(socket1.adapter.sids[socket1.id]).to.be.empty(); + expect(socket1.adapter.sids[socket1.id]).to.be(undefined); expect(socket1.adapter.rooms).to.be.empty(); client1.disconnect(); done(); @@ -175,6 +169,26 @@ var socket1, socket2, socket3; }); }); + it('ignores messages from unknown channels', function(done){ + namespace1.adapter.subClient.psubscribe('f?o', function () { + namespace3.adapter.pubClient.publish('foo', 'bar'); + }); + + namespace1.adapter.subClient.on('pmessageBuffer', function () { + setTimeout(done, 50); + }); + }); + + it('ignores messages from unknown channels (2)', function(done){ + namespace1.adapter.subClient.subscribe('woot', function () { + namespace3.adapter.pubClient.publish('woot', 'toow'); + }); + + namespace1.adapter.subClient.on('messageBuffer', function () { + setTimeout(done, 50); + }); + }); + describe('rooms', function () { it('returns rooms of a given client', function(done){ socket1.join('woot1', function () { @@ -200,6 +214,7 @@ var socket1, socket2, socket3; it('returns all rooms accross several nodes', function(done){ socket1.join('woot1', function () { namespace1.adapter.allRooms(function(err, rooms){ + expect(err).to.be(null); expect(rooms).to.have.length(4); expect(rooms).to.contain(socket1.id); expect(rooms).to.contain(socket2.id); @@ -212,6 +227,7 @@ var socket1, socket2, socket3; it('makes a given socket join a room', function(done){ namespace3.adapter.remoteJoin(socket1.id, 'woot3', function(err){ + expect(err).to.be(null); var rooms = Object.keys(socket1.rooms); expect(rooms).to.have.length(2); expect(rooms).to.contain('woot3'); @@ -222,6 +238,7 @@ var socket1, socket2, socket3; it('makes a given socket leave a room', function(done){ socket1.join('woot3', function(){ namespace3.adapter.remoteLeave(socket1.id, 'woot3', function(err){ + expect(err).to.be(null); var rooms = Object.keys(socket1.rooms); expect(rooms).to.have.length(1); expect(rooms).not.to.contain('woot3'); @@ -237,6 +254,7 @@ var socket1, socket2, socket3; } namespace3.adapter.customRequest('hello', function(err, replies){ + expect(err).to.be(null); expect(replies).to.have.length(3); expect(replies).to.contain(namespace1.adapter.uid); done(); @@ -297,7 +315,7 @@ function init(options){ socket1 = _socket1; socket2 = _socket2; socket3 = _socket3; - done(); + setTimeout(done, 100); }); }); });