diff --git a/aedes.js b/aedes.js index 36fd056a..5f74455c 100644 --- a/aedes.js +++ b/aedes.js @@ -12,6 +12,7 @@ const Packet = require('aedes-packet') const memory = require('aedes-persistence') const mqemitter = require('mqemitter') const Client = require('./lib/client') +const { $SYS_PREFIX } = require('./lib/utils') module.exports = Aedes.Server = Aedes @@ -78,7 +79,7 @@ function Aedes (opts) { this.clients = {} this.brokers = {} - const heartbeatTopic = '$SYS/' + that.id + '/heartbeat' + const heartbeatTopic = $SYS_PREFIX + that.id + '/heartbeat' this._heartbeatInterval = setInterval(heartbeat, opts.heartbeatInterval) const bufId = Buffer.from(that.id, 'utf8') @@ -138,12 +139,12 @@ function Aedes (opts) { } } - this.mq.on('$SYS/+/heartbeat', function storeBroker (packet, done) { + this.mq.on($SYS_PREFIX + '+/heartbeat', function storeBroker (packet, done) { that.brokers[packet.payload.toString()] = Date.now() done() }) - this.mq.on('$SYS/+/new/clients', function closeSameClients (packet, done) { + this.mq.on($SYS_PREFIX + '+/new/clients', function closeSameClients (packet, done) { const serverId = packet.topic.split('/')[1] const clientId = packet.payload.toString() @@ -206,7 +207,7 @@ function DoEnqueues () { return } - if (that.topic.indexOf('$SYS') === 0) { + if (that.topic.indexOf($SYS_PREFIX) === 0) { subs = subs.filter(removeSharp) } @@ -281,7 +282,7 @@ Aedes.prototype._finishRegisterClient = function (client) { this.clients[client.id] = client this.emit('client', client) this.publish({ - topic: '$SYS/' + this.id + '/new/clients', + topic: $SYS_PREFIX + this.id + '/new/clients', payload: Buffer.from(client.id, 'utf8') }, noop) } @@ -291,7 +292,7 @@ Aedes.prototype.unregisterClient = function (client) { delete this.clients[client.id] this.emit('clientDisconnect', client) this.publish({ - topic: '$SYS/' + this.id + '/disconnect/clients', + topic: $SYS_PREFIX + this.id + '/disconnect/clients', payload: Buffer.from(client.id, 'utf8') }, noop) } @@ -326,6 +327,9 @@ function defaultAuthenticate (client, username, password, callback) { } function defaultAuthorizePublish (client, packet, callback) { + if (packet.topic.startsWith($SYS_PREFIX)) { + return callback(new Error($SYS_PREFIX + ' topic is reserved')) + } callback(null) } diff --git a/docs/Aedes.md b/docs/Aedes.md index afeaa0e8..6662819f 100644 --- a/docs/Aedes.md +++ b/docs/Aedes.md @@ -315,6 +315,17 @@ aedes.authorizePublish = function (client, packet, callback) { } ``` +By default `authorizePublish` throws error in case a client publish to topics with `$SYS/` prefix to prevent possible DoS (see [#597](https://github.com/moscajs/aedes/issues/597)). If you write your own implementation of `authorizePublish` we suggest you to add a check for this. Default implementation: + +```js +function defaultAuthorizePublish (client, packet, callback) { + if (packet.topic.startsWith($SYS_PREFIX)) { + return callback(new Error($SYS_PREFIX + ' topic is reserved')) + } + callback(null) +} +``` + ## Handler: authorizeSubscribe (client, subscription, callback) - client: [``](./Client.md) diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index 8c4685b6..9387623f 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -3,7 +3,7 @@ const fastfall = require('fastfall') const Packet = require('aedes-packet') const { through } = require('../utils') -const { validateTopic } = require('../utils') +const { validateTopic, $SYS_PREFIX } = require('../utils') const write = require('../write') const subscribeTopicActions = fastfall([ @@ -207,7 +207,7 @@ function completeSubscribe (err) { broker.emit('subscribe', subs, client) broker.publish({ - topic: '$SYS/' + broker.id + '/new/subscribes', + topic: $SYS_PREFIX + broker.id + '/new/subscribes', payload: Buffer.from(JSON.stringify({ clientId: client.id, subs: subs diff --git a/lib/handlers/unsubscribe.js b/lib/handlers/unsubscribe.js index de8b4cb7..2ade3458 100644 --- a/lib/handlers/unsubscribe.js +++ b/lib/handlers/unsubscribe.js @@ -1,7 +1,7 @@ 'use strict' const write = require('../write') -const { validateTopic } = require('../utils') +const { validateTopic, $SYS_PREFIX } = require('../utils') function UnSubAck (packet) { this.cmd = 'unsuback' @@ -90,7 +90,7 @@ function completeUnsubscribe (err) { if ((!client.closed || client.clean === true) && packet.unsubscriptions.length > 0) { client.broker.emit('unsubscribe', packet.unsubscriptions, client) client.broker.publish({ - topic: '$SYS/' + client.broker.id + '/new/unsubscribes', + topic: $SYS_PREFIX + client.broker.id + '/new/unsubscribes', payload: Buffer.from(JSON.stringify({ clientId: client.id, subs: packet.unsubscriptions diff --git a/lib/utils.js b/lib/utils.js index 313647e0..870c47e3 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -39,5 +39,6 @@ function through (transform) { module.exports = { validateTopic, - through + through, + $SYS_PREFIX: '$SYS/' } diff --git a/test/basic.js b/test/basic.js index 0807c483..92b25797 100644 --- a/test/basic.js +++ b/test/basic.js @@ -95,6 +95,23 @@ test('publish empty topic throws error', function (t) { }) }) +test('publish to $SYS topic throws error', function (t) { + t.plan(1) + + const s = connect(setup()) + t.teardown(s.broker.close.bind(s.broker)) + + s.inStream.write({ + cmd: 'publish', + topic: '$SYS/not/allowed', + payload: 'world' + }) + + s.broker.on('clientError', function (client, err) { + t.pass('should emit error') + }) +}) + ;[{ qos: 0, clean: false }, { qos: 0, clean: true }, { qos: 1, clean: false }, { qos: 1, clean: true }].forEach(function (ele) { test('subscribe a single topic in QoS ' + ele.qos + ' [clean=' + ele.clean + ']', function (t) { t.plan(5)