From 54c5934de5a37fb73db64d1acc8bd026103d4dc3 Mon Sep 17 00:00:00 2001 From: Behrad Date: Tue, 18 Jul 2017 01:13:49 +0430 Subject: [PATCH 01/33] remove parallel and use outgoingEnqueueCombi --- aedes.js | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/aedes.js b/aedes.js index 4e6bf1a1..a2df0b42 100644 --- a/aedes.js +++ b/aedes.js @@ -191,9 +191,7 @@ function DoEnqueues () { that.complete = null that.topic = null - broker._parallel( - status, - doEnqueue, subs, complete) + broker.persistence.outgoingEnqueueCombi(subs, status.packet, complete) broker._enqueuers.release(that) } @@ -204,10 +202,6 @@ function removeSharp (sub) { return sub.topic !== '#' } -function doEnqueue (sub, done) { - this.broker.persistence.outgoingEnqueue(sub, this.packet, done) -} - function callPublished (_, done) { this.broker.published(this.packet, this.client, done) this.broker.emit('publish', this.packet, this.client) From f1f1b0adc5a06d4ca6f360c981bedd2ae95a614a Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Wed, 19 Jul 2017 19:16:02 +0200 Subject: [PATCH 02/33] Validate PUBLISH topics --- lib/handlers/publish.js | 12 ++++++++ test/basic.js | 65 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/lib/handlers/publish.js b/lib/handlers/publish.js index 10ea62fc..4c224aaa 100644 --- a/lib/handlers/publish.js +++ b/lib/handlers/publish.js @@ -17,6 +17,18 @@ var publishActions = [ enqueuePublish ] function handlePublish (client, packet, done) { + var topic = packet.topic + var err + for (var i = 0; i < topic.length; i++) { + switch (topic.charCodeAt(i)) { + case 35: + err = new Error('# is not allowed in PUBLISH') + return done(err) + case 43: + err = new Error('+ is not allowed in PUBLISH') + return done(err) + } + } client.broker._series(client, publishActions, packet, done) } diff --git a/test/basic.js b/test/basic.js index f8a6e835..79419216 100644 --- a/test/basic.js +++ b/test/basic.js @@ -486,3 +486,68 @@ test('avoid wrong deduping of retain messages', function (t) { publisher.inStream.write(expected) }) + +test('publish invalid topic with #', function (t) { + var s = connect(setup()) + + subscribe(t, s, '#', 0, function () { + s.outStream.once('data', function (packet) { + t.fail('no packet') + t.end() + }) + + s.inStream.write({ + cmd: 'publish', + topic: 'hello/#', + payload: 'world' + }) + }) + + eos(s.conn, function () { + t.equal(s.broker.connectedClients, 0, 'no connected clients') + t.end() + }) +}) + +test('publish invalid topic with +', function (t) { + var s = connect(setup()) + + subscribe(t, s, '#', 0, function () { + s.outStream.once('data', function (packet) { + t.fail('no packet') + }) + + s.inStream.write({ + cmd: 'publish', + topic: 'hello/+/eee', + payload: 'world' + }) + }) + + eos(s.conn, function () { + t.equal(s.broker.connectedClients, 0, 'no connected clients') + t.end() + }) +}) + +test('subscribe to invalid topic with hello/+foo', function (t) { + var s = connect(setup()) + + subscribe(t, s, 'hello/+foo', 0, function () { + s.outStream.once('data', function (packet) { + t.fail('no packet') + t.end() + }) + + s.inStream.write({ + cmd: 'publish', + topic: 'hello/#', + payload: 'world' + }) + }) + + eos(s.conn, function () { + t.equal(s.broker.connectedClients, 0, 'no connected clients') + t.end() + }) +}) From 0606069dc3572602344cd73e7baa45faa3f42189 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 20 Jul 2017 10:00:36 +0200 Subject: [PATCH 03/33] Added topic validations for SUBSCRIBE. --- lib/handlers/subscribe.js | 21 +++++++++++++++++++++ test/basic.js | 28 +++++++++++++--------------- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index 1796f3ce..1cc77343 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -36,6 +36,27 @@ function doSubscribe (sub, done) { function authorize (sub, done) { var client = this.client + var topic = sub.topic + var end = topic.length - 1 + var err + for (var i = 0; i < topic.length; i++) { + switch (topic.charCodeAt(i)) { + case 35: + if (i !== end) { + err = new Error('# is only allowed in SUBSCRIBE in the last position') + client.emit('error', err) + return done(err) + } + break + case 43: + if (i < end - 1 && topic.charCodeAt(i + 1) !== 47) { + err = new Error('+ is only allowed in SUBSCRIBE between /') + client.emit('error', err) + return done(err) + } + break + } + } client.broker.authorizeSubscribe(client, sub, done) } diff --git a/test/basic.js b/test/basic.js index 79419216..938c28de 100644 --- a/test/basic.js +++ b/test/basic.js @@ -530,24 +530,22 @@ test('publish invalid topic with +', function (t) { }) }) -test('subscribe to invalid topic with hello/+foo', function (t) { - var s = connect(setup()) - - subscribe(t, s, 'hello/+foo', 0, function () { - s.outStream.once('data', function (packet) { - t.fail('no packet') - t.end() - }) +;['base/#/sub', 'base/#sub', 'base/+xyz/sub'].forEach(function (topic) { + test('subscribe to invalid topic with ' + topic, function (t) { + var s = connect(setup()) s.inStream.write({ - cmd: 'publish', - topic: 'hello/#', - payload: 'world' + cmd: 'subscribe', + messageId: 24, + subscriptions: [{ + topic: topic, + qos: 0 + }] }) - }) - eos(s.conn, function () { - t.equal(s.broker.connectedClients, 0, 'no connected clients') - t.end() + eos(s.conn, function () { + t.equal(s.broker.connectedClients, 0, 'no connected clients') + t.end() + }) }) }) From 7abd055e810b7bcd8e0f852a22801d4f2c0c2520 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 20 Jul 2017 16:43:06 +0200 Subject: [PATCH 04/33] Added test for empty topic --- test/basic.js | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/test/basic.js b/test/basic.js index 938c28de..f9f385a0 100644 --- a/test/basic.js +++ b/test/basic.js @@ -487,6 +487,28 @@ test('avoid wrong deduping of retain messages', function (t) { publisher.inStream.write(expected) }) +test('publish empty topic', function (t) { + var s = connect(setup()) + + subscribe(t, s, '#', 0, function () { + s.outStream.once('data', function (packet) { + t.fail('no packet') + t.end() + }) + + s.inStream.write({ + cmd: 'publish', + topic: '', + payload: 'world' + }) + }) + + eos(s.conn, function () { + t.equal(s.broker.connectedClients, 0, 'no connected clients') + t.end() + }) +}) + test('publish invalid topic with #', function (t) { var s = connect(setup()) From 9f6d3512a5a931c5d7ba536340ee05a140433577 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 20 Jul 2017 16:44:25 +0200 Subject: [PATCH 05/33] Added test for empty topic subscription. --- test/basic.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/basic.js b/test/basic.js index f9f385a0..2982f94d 100644 --- a/test/basic.js +++ b/test/basic.js @@ -552,8 +552,8 @@ test('publish invalid topic with +', function (t) { }) }) -;['base/#/sub', 'base/#sub', 'base/+xyz/sub'].forEach(function (topic) { - test('subscribe to invalid topic with ' + topic, function (t) { +;['base/#/sub', 'base/#sub', 'base/+xyz/sub', ''].forEach(function (topic) { + test('subscribe to invalid topic with "' + topic + '"', function (t) { var s = connect(setup()) s.inStream.write({ From dcaf4f72001e50de63c4502267f792e928530f8d Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 20 Jul 2017 17:15:09 +0200 Subject: [PATCH 06/33] Correct validations. --- lib/handlers/subscribe.js | 14 ++++++++++---- test/basic.js | 17 +++++++---------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index 1cc77343..0dc801f2 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -38,20 +38,26 @@ function authorize (sub, done) { var client = this.client var topic = sub.topic var end = topic.length - 1 + var endMinus = end - 1 var err + var slashInPreEnd = endMinus > 0 && topic.charCodeAt(endMinus) !== 47 + if (topic.length === 0) { + return done(new Error('impossible to subscribe to an empty topic')) + } for (var i = 0; i < topic.length; i++) { switch (topic.charCodeAt(i)) { case 35: - if (i !== end) { + var notAtTheEnd = i !== end + if (notAtTheEnd || slashInPreEnd) { err = new Error('# is only allowed in SUBSCRIBE in the last position') - client.emit('error', err) return done(err) } break case 43: - if (i < end - 1 && topic.charCodeAt(i + 1) !== 47) { + var pastChar = i < end - 1 && topic.charCodeAt(i + 1) !== 47 + var preChar = i > 1 && topic.charCodeAt(i - 1) !== 47 + if (pastChar || preChar) { err = new Error('+ is only allowed in SUBSCRIBE between /') - client.emit('error', err) return done(err) } break diff --git a/test/basic.js b/test/basic.js index 2982f94d..6105141e 100644 --- a/test/basic.js +++ b/test/basic.js @@ -525,8 +525,7 @@ test('publish invalid topic with #', function (t) { }) }) - eos(s.conn, function () { - t.equal(s.broker.connectedClients, 0, 'no connected clients') + s.broker.on('clientError', function () { t.end() }) }) @@ -546,16 +545,19 @@ test('publish invalid topic with +', function (t) { }) }) - eos(s.conn, function () { - t.equal(s.broker.connectedClients, 0, 'no connected clients') + s.broker.on('clientError', function () { t.end() }) }) -;['base/#/sub', 'base/#sub', 'base/+xyz/sub', ''].forEach(function (topic) { +;['base/#/sub', 'base/#sub', 'base/sub#', 'base/xyz+/sub', 'base/+xyz/sub'].forEach(function (topic) { test('subscribe to invalid topic with "' + topic + '"', function (t) { var s = connect(setup()) + s.broker.on('clientError', function () { + t.end() + }) + s.inStream.write({ cmd: 'subscribe', messageId: 24, @@ -564,10 +566,5 @@ test('publish invalid topic with +', function (t) { qos: 0 }] }) - - eos(s.conn, function () { - t.equal(s.broker.connectedClients, 0, 'no connected clients') - t.end() - }) }) }) From 79e88546ca098554c2f3267debef3abcf3543234 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 21 Jul 2017 09:07:35 +0200 Subject: [PATCH 07/33] Error on PUBLISH with empty topic --- lib/handlers/publish.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/handlers/publish.js b/lib/handlers/publish.js index 4c224aaa..f3c8cd87 100644 --- a/lib/handlers/publish.js +++ b/lib/handlers/publish.js @@ -19,6 +19,10 @@ var publishActions = [ function handlePublish (client, packet, done) { var topic = packet.topic var err + if (topic.length === 0) { + err = new Error('empty topic not allowed in PUBLISH') + return done(err) + } for (var i = 0; i < topic.length; i++) { switch (topic.charCodeAt(i)) { case 35: From 98cbfee36b58c1ffcc882a8d881bacbc4427e0ca Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 21 Jul 2017 09:54:13 +0200 Subject: [PATCH 08/33] Added topic validations for unsubscribe. --- lib/handlers/subscribe.js | 30 ++++-------------------------- lib/handlers/unsubscribe.js | 12 +++++++++++- lib/handlers/validations.js | 29 +++++++++++++++++++++++++++++ test/basic.js | 14 ++++++++++++++ 4 files changed, 58 insertions(+), 27 deletions(-) create mode 100644 lib/handlers/validations.js diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index 0dc801f2..bec22df7 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -4,6 +4,7 @@ var write = require('../write') var fastfall = require('fastfall') var Packet = require('aedes-packet') var through = require('through2') +var validateSubscribeTopic = require('./validations').validateSubscribeTopic var topicActions = fastfall([ authorize, storeSubscriptions, @@ -36,32 +37,9 @@ function doSubscribe (sub, done) { function authorize (sub, done) { var client = this.client - var topic = sub.topic - var end = topic.length - 1 - var endMinus = end - 1 - var err - var slashInPreEnd = endMinus > 0 && topic.charCodeAt(endMinus) !== 47 - if (topic.length === 0) { - return done(new Error('impossible to subscribe to an empty topic')) - } - for (var i = 0; i < topic.length; i++) { - switch (topic.charCodeAt(i)) { - case 35: - var notAtTheEnd = i !== end - if (notAtTheEnd || slashInPreEnd) { - err = new Error('# is only allowed in SUBSCRIBE in the last position') - return done(err) - } - break - case 43: - var pastChar = i < end - 1 && topic.charCodeAt(i + 1) !== 47 - var preChar = i > 1 && topic.charCodeAt(i - 1) !== 47 - if (pastChar || preChar) { - err = new Error('+ is only allowed in SUBSCRIBE between /') - return done(err) - } - break - } + var err = validateSubscribeTopic(sub.topic) + if (err) { + return done(err) } client.broker.authorizeSubscribe(client, sub, done) } diff --git a/lib/handlers/unsubscribe.js b/lib/handlers/unsubscribe.js index 927eda52..15918485 100644 --- a/lib/handlers/unsubscribe.js +++ b/lib/handlers/unsubscribe.js @@ -1,6 +1,7 @@ 'use strict' var write = require('../write') +var validateSubscribeTopic = require('./validations').validateSubscribeTopic function UnsubscribeState (client, packet, finish, granted) { this.client = client @@ -11,9 +12,18 @@ function UnsubscribeState (client, packet, finish, granted) { function handleUnsubscribe (client, packet, done) { var broker = client.broker + var unsubscriptions = packet.unsubscriptions + var err + + for (var i = 0; i < unsubscriptions.length; i++) { + err = validateSubscribeTopic(unsubscriptions[i]) + if (err) { + return done(err) + } + } if (packet.messageId) { - broker.persistence.removeSubscriptions(client, packet.unsubscriptions, function (err) { + broker.persistence.removeSubscriptions(client, unsubscriptions, function (err) { if (err) { return done(err) } diff --git a/lib/handlers/validations.js b/lib/handlers/validations.js new file mode 100644 index 00000000..0c830b8a --- /dev/null +++ b/lib/handlers/validations.js @@ -0,0 +1,29 @@ +'use strict' + +function validateSubscribeTopic (topic) { + var end = topic.length - 1 + var endMinus = end - 1 + var slashInPreEnd = endMinus > 0 && topic.charCodeAt(endMinus) !== 47 + if (topic.length === 0) { + return new Error('impossible to subscribe to an empty topic') + } + for (var i = 0; i < topic.length; i++) { + switch (topic.charCodeAt(i)) { + case 35: + var notAtTheEnd = i !== end + if (notAtTheEnd || slashInPreEnd) { + return new Error('# is only allowed in SUBSCRIBE in the last position') + } + break + case 43: + var pastChar = i < end - 1 && topic.charCodeAt(i + 1) !== 47 + var preChar = i > 1 && topic.charCodeAt(i - 1) !== 47 + if (pastChar || preChar) { + return new Error('+ is only allowed in SUBSCRIBE between /') + } + break + } + } +} + +module.exports.validateSubscribeTopic = validateSubscribeTopic diff --git a/test/basic.js b/test/basic.js index 6105141e..fb0ee60b 100644 --- a/test/basic.js +++ b/test/basic.js @@ -567,4 +567,18 @@ test('publish invalid topic with +', function (t) { }] }) }) + + test('unsubscribe to invalid topic with "' + topic + '"', function (t) { + var s = connect(setup()) + + s.broker.on('clientError', function () { + t.end() + }) + + s.inStream.write({ + cmd: 'unsubscribe', + messageId: 24, + unsubscriptions: [topic] + }) + }) }) From 8226e91bf85f642b8a9fcc70a1bba6235ed291b8 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 21 Jul 2017 10:09:35 +0200 Subject: [PATCH 09/33] Better error messages for topic validations --- lib/handlers/subscribe.js | 4 ++-- lib/handlers/unsubscribe.js | 4 ++-- lib/handlers/validations.js | 10 +++++----- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index bec22df7..ece4cb0f 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -4,7 +4,7 @@ var write = require('../write') var fastfall = require('fastfall') var Packet = require('aedes-packet') var through = require('through2') -var validateSubscribeTopic = require('./validations').validateSubscribeTopic +var validateTopic = require('./validations').validateTopic var topicActions = fastfall([ authorize, storeSubscriptions, @@ -37,7 +37,7 @@ function doSubscribe (sub, done) { function authorize (sub, done) { var client = this.client - var err = validateSubscribeTopic(sub.topic) + var err = validateTopic(sub.topic, 'SUBSCRIBE') if (err) { return done(err) } diff --git a/lib/handlers/unsubscribe.js b/lib/handlers/unsubscribe.js index 15918485..8b4700a8 100644 --- a/lib/handlers/unsubscribe.js +++ b/lib/handlers/unsubscribe.js @@ -1,7 +1,7 @@ 'use strict' var write = require('../write') -var validateSubscribeTopic = require('./validations').validateSubscribeTopic +var validateTopic = require('./validations').validateTopic function UnsubscribeState (client, packet, finish, granted) { this.client = client @@ -16,7 +16,7 @@ function handleUnsubscribe (client, packet, done) { var err for (var i = 0; i < unsubscriptions.length; i++) { - err = validateSubscribeTopic(unsubscriptions[i]) + err = validateTopic(unsubscriptions[i], 'UNSUBSCRIBE') if (err) { return done(err) } diff --git a/lib/handlers/validations.js b/lib/handlers/validations.js index 0c830b8a..16d9922b 100644 --- a/lib/handlers/validations.js +++ b/lib/handlers/validations.js @@ -1,29 +1,29 @@ 'use strict' -function validateSubscribeTopic (topic) { +function validateTopic (topic, message) { var end = topic.length - 1 var endMinus = end - 1 var slashInPreEnd = endMinus > 0 && topic.charCodeAt(endMinus) !== 47 if (topic.length === 0) { - return new Error('impossible to subscribe to an empty topic') + return new Error('impossible to ' + message + ' to an empty topic') } for (var i = 0; i < topic.length; i++) { switch (topic.charCodeAt(i)) { case 35: var notAtTheEnd = i !== end if (notAtTheEnd || slashInPreEnd) { - return new Error('# is only allowed in SUBSCRIBE in the last position') + return new Error('# is only allowed in ' + message + ' in the last position') } break case 43: var pastChar = i < end - 1 && topic.charCodeAt(i + 1) !== 47 var preChar = i > 1 && topic.charCodeAt(i - 1) !== 47 if (pastChar || preChar) { - return new Error('+ is only allowed in SUBSCRIBE between /') + return new Error('+ is only allowed in ' + message + ' between /') } break } } } -module.exports.validateSubscribeTopic = validateSubscribeTopic +module.exports.validateTopic = validateTopic From b369602872d07eff4159db8c483fbf7908f03036 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 21 Jul 2017 10:25:24 +0200 Subject: [PATCH 10/33] Do not forward $SYS topics for +/# subscriptions. Fixes: https://github.com/mcollina/aedes/issues/135 --- aedes.js | 5 ++++- lib/handlers/subscribe.js | 9 ++++++++- test/events.js | 19 +++++++++++++++++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/aedes.js b/aedes.js index 4e6bf1a1..cc5125d3 100644 --- a/aedes.js +++ b/aedes.js @@ -200,8 +200,11 @@ function DoEnqueues () { } } +// + is 43 +// # is 35 function removeSharp (sub) { - return sub.topic !== '#' + var code = sub.topic.charCodeAt(0) + return code !== 43 && code !== 35 } function doEnqueue (sub, done) { diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index ece4cb0f..e0848805 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -100,7 +100,7 @@ function subTopic (sub, done) { break } - if (sub.topic === '#') { + if (isWildcardThatMatchesSys(sub.topic)) { func = blockSys(func) } @@ -118,6 +118,13 @@ function subTopic (sub, done) { } } +// + is 43 +// # is 35 +function isWildcardThatMatchesSys (topic) { + var code = topic.charCodeAt(0) + return code === 43 || code === 35 +} + function completeSubscribe (err) { var packet = this.packet var client = this.client diff --git a/test/events.js b/test/events.js index 0c4286d8..25df8ea3 100644 --- a/test/events.js +++ b/test/events.js @@ -40,6 +40,25 @@ test('does not forward $SYS topics to # subscription', function (t) { }) }) +test('does not forward $SYS topics to +/# subscription', function (t) { + t.plan(4) + var s = connect(setup()) + + subscribe(t, s, '+/#', 0, function () { + s.outStream.once('data', function (packet) { + t.fail('no packet should be received') + }) + + s.broker.mq.emit({ + cmd: 'publish', + topic: '$SYS/hello', + payload: 'world' + }, function () { + t.pass('nothing happened') + }) + }) +}) + test('does not store $SYS topics to QoS 1 # subscription', function (t) { t.plan(3) From dc300b9df69a065b6143961974cbbaf1cd32f7ee Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 21 Jul 2017 11:01:53 +0200 Subject: [PATCH 11/33] Bumped v0.29.0. --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 21aa6804..bf9767ac 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aedes", - "version": "0.28.0", + "version": "0.29.0", "description": "Stream-based MQTT broker", "main": "aedes.js", "scripts": { From 185ab4baf909b5d72c7f4eec6eab8dbfd9597d5a Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 21 Jul 2017 15:13:33 +0200 Subject: [PATCH 12/33] failing test for wrong granted order --- test/auth.js | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/test/auth.js b/test/auth.js index d41fc521..bfcb186e 100644 --- a/test/auth.js +++ b/test/auth.js @@ -479,6 +479,71 @@ test('negate subscription', function (t) { }) }) +test('negate multiple subscriptions', function (t) { + t.plan(5) + + var s = connect(setup()) + + s.broker.authorizeSubscribe = function (client, sub, cb) { + t.ok(client, 'client exists') + cb(null, null) + } + + s.inStream.write({ + cmd: 'subscribe', + messageId: 24, + subscriptions: [{ + topic: 'hello', + qos: 0 + }, { + topic: 'world', + qos: 0 + }] + }) + + s.outStream.once('data', function (packet) { + t.equal(packet.cmd, 'suback') + t.deepEqual(packet.granted, [128, 128]) + t.equal(packet.messageId, 24) + }) +}) + +test('negate multiple subscriptions random times', function (t) { + t.plan(5) + + var s = connect(setup()) + + s.broker.authorizeSubscribe = function (client, sub, cb) { + t.ok(client, 'client exists') + if (sub.topic === 'hello') { + setTimeout(function () { + cb(null, sub) + }, 100) + } else { + cb(null, null) + } + } + + s.inStream.write({ + cmd: 'subscribe', + messageId: 24, + subscriptions: [{ + topic: 'hello', + qos: 0 + }, { + topic: 'world', + qos: 0 + }] + }) + + s.outStream.once('data', function (packet) { + t.equal(packet.cmd, 'suback') + t.deepEqual(packet.granted, [0, 128]) + t.equal(packet.messageId, 24) + }) +}) + + test('failed authentication does not disconnect other client with same clientId', function (t) { t.plan(3) From 081027fe6f57267d6b73ca345ca095e1bb9e5652 Mon Sep 17 00:00:00 2001 From: Behrad Date: Fri, 21 Jul 2017 19:52:38 +0430 Subject: [PATCH 13/33] update aedes-persistence --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 21aa6804..3fec36f3 100644 --- a/package.json +++ b/package.json @@ -48,7 +48,7 @@ }, "dependencies": { "aedes-packet": "^1.0.0", - "aedes-persistence": "^4.0.0", + "aedes-persistence": "^5.0.2", "bulk-write-stream": "^1.0.0", "end-of-stream": "^1.1.0", "fastfall": "^1.0.0", From 318ad06a88d73ea617b00e2f9e83437148452ad2 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Wed, 19 Jul 2017 19:16:02 +0200 Subject: [PATCH 14/33] Validate PUBLISH topics --- lib/handlers/publish.js | 12 ++++++++ test/basic.js | 65 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/lib/handlers/publish.js b/lib/handlers/publish.js index 10ea62fc..4c224aaa 100644 --- a/lib/handlers/publish.js +++ b/lib/handlers/publish.js @@ -17,6 +17,18 @@ var publishActions = [ enqueuePublish ] function handlePublish (client, packet, done) { + var topic = packet.topic + var err + for (var i = 0; i < topic.length; i++) { + switch (topic.charCodeAt(i)) { + case 35: + err = new Error('# is not allowed in PUBLISH') + return done(err) + case 43: + err = new Error('+ is not allowed in PUBLISH') + return done(err) + } + } client.broker._series(client, publishActions, packet, done) } diff --git a/test/basic.js b/test/basic.js index f8a6e835..79419216 100644 --- a/test/basic.js +++ b/test/basic.js @@ -486,3 +486,68 @@ test('avoid wrong deduping of retain messages', function (t) { publisher.inStream.write(expected) }) + +test('publish invalid topic with #', function (t) { + var s = connect(setup()) + + subscribe(t, s, '#', 0, function () { + s.outStream.once('data', function (packet) { + t.fail('no packet') + t.end() + }) + + s.inStream.write({ + cmd: 'publish', + topic: 'hello/#', + payload: 'world' + }) + }) + + eos(s.conn, function () { + t.equal(s.broker.connectedClients, 0, 'no connected clients') + t.end() + }) +}) + +test('publish invalid topic with +', function (t) { + var s = connect(setup()) + + subscribe(t, s, '#', 0, function () { + s.outStream.once('data', function (packet) { + t.fail('no packet') + }) + + s.inStream.write({ + cmd: 'publish', + topic: 'hello/+/eee', + payload: 'world' + }) + }) + + eos(s.conn, function () { + t.equal(s.broker.connectedClients, 0, 'no connected clients') + t.end() + }) +}) + +test('subscribe to invalid topic with hello/+foo', function (t) { + var s = connect(setup()) + + subscribe(t, s, 'hello/+foo', 0, function () { + s.outStream.once('data', function (packet) { + t.fail('no packet') + t.end() + }) + + s.inStream.write({ + cmd: 'publish', + topic: 'hello/#', + payload: 'world' + }) + }) + + eos(s.conn, function () { + t.equal(s.broker.connectedClients, 0, 'no connected clients') + t.end() + }) +}) From dbeaf72d8651410695c9f413cdf4dde96e509a36 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 20 Jul 2017 10:00:36 +0200 Subject: [PATCH 15/33] Added topic validations for SUBSCRIBE. --- lib/handlers/subscribe.js | 21 +++++++++++++++++++++ test/basic.js | 28 +++++++++++++--------------- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index 1796f3ce..1cc77343 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -36,6 +36,27 @@ function doSubscribe (sub, done) { function authorize (sub, done) { var client = this.client + var topic = sub.topic + var end = topic.length - 1 + var err + for (var i = 0; i < topic.length; i++) { + switch (topic.charCodeAt(i)) { + case 35: + if (i !== end) { + err = new Error('# is only allowed in SUBSCRIBE in the last position') + client.emit('error', err) + return done(err) + } + break + case 43: + if (i < end - 1 && topic.charCodeAt(i + 1) !== 47) { + err = new Error('+ is only allowed in SUBSCRIBE between /') + client.emit('error', err) + return done(err) + } + break + } + } client.broker.authorizeSubscribe(client, sub, done) } diff --git a/test/basic.js b/test/basic.js index 79419216..938c28de 100644 --- a/test/basic.js +++ b/test/basic.js @@ -530,24 +530,22 @@ test('publish invalid topic with +', function (t) { }) }) -test('subscribe to invalid topic with hello/+foo', function (t) { - var s = connect(setup()) - - subscribe(t, s, 'hello/+foo', 0, function () { - s.outStream.once('data', function (packet) { - t.fail('no packet') - t.end() - }) +;['base/#/sub', 'base/#sub', 'base/+xyz/sub'].forEach(function (topic) { + test('subscribe to invalid topic with ' + topic, function (t) { + var s = connect(setup()) s.inStream.write({ - cmd: 'publish', - topic: 'hello/#', - payload: 'world' + cmd: 'subscribe', + messageId: 24, + subscriptions: [{ + topic: topic, + qos: 0 + }] }) - }) - eos(s.conn, function () { - t.equal(s.broker.connectedClients, 0, 'no connected clients') - t.end() + eos(s.conn, function () { + t.equal(s.broker.connectedClients, 0, 'no connected clients') + t.end() + }) }) }) From 115266dd23baff6307f69438ee91805843c6de84 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 20 Jul 2017 16:43:06 +0200 Subject: [PATCH 16/33] Added test for empty topic --- test/basic.js | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/test/basic.js b/test/basic.js index 938c28de..f9f385a0 100644 --- a/test/basic.js +++ b/test/basic.js @@ -487,6 +487,28 @@ test('avoid wrong deduping of retain messages', function (t) { publisher.inStream.write(expected) }) +test('publish empty topic', function (t) { + var s = connect(setup()) + + subscribe(t, s, '#', 0, function () { + s.outStream.once('data', function (packet) { + t.fail('no packet') + t.end() + }) + + s.inStream.write({ + cmd: 'publish', + topic: '', + payload: 'world' + }) + }) + + eos(s.conn, function () { + t.equal(s.broker.connectedClients, 0, 'no connected clients') + t.end() + }) +}) + test('publish invalid topic with #', function (t) { var s = connect(setup()) From 358e0a67135f8d5ce412365181b62fa81c8fdb46 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 20 Jul 2017 16:44:25 +0200 Subject: [PATCH 17/33] Added test for empty topic subscription. --- test/basic.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/basic.js b/test/basic.js index f9f385a0..2982f94d 100644 --- a/test/basic.js +++ b/test/basic.js @@ -552,8 +552,8 @@ test('publish invalid topic with +', function (t) { }) }) -;['base/#/sub', 'base/#sub', 'base/+xyz/sub'].forEach(function (topic) { - test('subscribe to invalid topic with ' + topic, function (t) { +;['base/#/sub', 'base/#sub', 'base/+xyz/sub', ''].forEach(function (topic) { + test('subscribe to invalid topic with "' + topic + '"', function (t) { var s = connect(setup()) s.inStream.write({ From 2f167c8d48d4c8fb83f473fb8f2c0050d463a8db Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 20 Jul 2017 17:15:09 +0200 Subject: [PATCH 18/33] Correct validations. --- lib/handlers/subscribe.js | 14 ++++++++++---- test/basic.js | 17 +++++++---------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index 1cc77343..0dc801f2 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -38,20 +38,26 @@ function authorize (sub, done) { var client = this.client var topic = sub.topic var end = topic.length - 1 + var endMinus = end - 1 var err + var slashInPreEnd = endMinus > 0 && topic.charCodeAt(endMinus) !== 47 + if (topic.length === 0) { + return done(new Error('impossible to subscribe to an empty topic')) + } for (var i = 0; i < topic.length; i++) { switch (topic.charCodeAt(i)) { case 35: - if (i !== end) { + var notAtTheEnd = i !== end + if (notAtTheEnd || slashInPreEnd) { err = new Error('# is only allowed in SUBSCRIBE in the last position') - client.emit('error', err) return done(err) } break case 43: - if (i < end - 1 && topic.charCodeAt(i + 1) !== 47) { + var pastChar = i < end - 1 && topic.charCodeAt(i + 1) !== 47 + var preChar = i > 1 && topic.charCodeAt(i - 1) !== 47 + if (pastChar || preChar) { err = new Error('+ is only allowed in SUBSCRIBE between /') - client.emit('error', err) return done(err) } break diff --git a/test/basic.js b/test/basic.js index 2982f94d..6105141e 100644 --- a/test/basic.js +++ b/test/basic.js @@ -525,8 +525,7 @@ test('publish invalid topic with #', function (t) { }) }) - eos(s.conn, function () { - t.equal(s.broker.connectedClients, 0, 'no connected clients') + s.broker.on('clientError', function () { t.end() }) }) @@ -546,16 +545,19 @@ test('publish invalid topic with +', function (t) { }) }) - eos(s.conn, function () { - t.equal(s.broker.connectedClients, 0, 'no connected clients') + s.broker.on('clientError', function () { t.end() }) }) -;['base/#/sub', 'base/#sub', 'base/+xyz/sub', ''].forEach(function (topic) { +;['base/#/sub', 'base/#sub', 'base/sub#', 'base/xyz+/sub', 'base/+xyz/sub'].forEach(function (topic) { test('subscribe to invalid topic with "' + topic + '"', function (t) { var s = connect(setup()) + s.broker.on('clientError', function () { + t.end() + }) + s.inStream.write({ cmd: 'subscribe', messageId: 24, @@ -564,10 +566,5 @@ test('publish invalid topic with +', function (t) { qos: 0 }] }) - - eos(s.conn, function () { - t.equal(s.broker.connectedClients, 0, 'no connected clients') - t.end() - }) }) }) From 4a1d48ca64a7490083a8e6924abd9adc6c4e1efc Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 21 Jul 2017 09:07:35 +0200 Subject: [PATCH 19/33] Error on PUBLISH with empty topic --- lib/handlers/publish.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/handlers/publish.js b/lib/handlers/publish.js index 4c224aaa..f3c8cd87 100644 --- a/lib/handlers/publish.js +++ b/lib/handlers/publish.js @@ -19,6 +19,10 @@ var publishActions = [ function handlePublish (client, packet, done) { var topic = packet.topic var err + if (topic.length === 0) { + err = new Error('empty topic not allowed in PUBLISH') + return done(err) + } for (var i = 0; i < topic.length; i++) { switch (topic.charCodeAt(i)) { case 35: From 65a3e76d2febf39bc8dc0c83e43f8ad3ac78558d Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 21 Jul 2017 09:54:13 +0200 Subject: [PATCH 20/33] Added topic validations for unsubscribe. --- lib/handlers/subscribe.js | 30 ++++-------------------------- lib/handlers/unsubscribe.js | 12 +++++++++++- lib/handlers/validations.js | 29 +++++++++++++++++++++++++++++ test/basic.js | 14 ++++++++++++++ 4 files changed, 58 insertions(+), 27 deletions(-) create mode 100644 lib/handlers/validations.js diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index 0dc801f2..bec22df7 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -4,6 +4,7 @@ var write = require('../write') var fastfall = require('fastfall') var Packet = require('aedes-packet') var through = require('through2') +var validateSubscribeTopic = require('./validations').validateSubscribeTopic var topicActions = fastfall([ authorize, storeSubscriptions, @@ -36,32 +37,9 @@ function doSubscribe (sub, done) { function authorize (sub, done) { var client = this.client - var topic = sub.topic - var end = topic.length - 1 - var endMinus = end - 1 - var err - var slashInPreEnd = endMinus > 0 && topic.charCodeAt(endMinus) !== 47 - if (topic.length === 0) { - return done(new Error('impossible to subscribe to an empty topic')) - } - for (var i = 0; i < topic.length; i++) { - switch (topic.charCodeAt(i)) { - case 35: - var notAtTheEnd = i !== end - if (notAtTheEnd || slashInPreEnd) { - err = new Error('# is only allowed in SUBSCRIBE in the last position') - return done(err) - } - break - case 43: - var pastChar = i < end - 1 && topic.charCodeAt(i + 1) !== 47 - var preChar = i > 1 && topic.charCodeAt(i - 1) !== 47 - if (pastChar || preChar) { - err = new Error('+ is only allowed in SUBSCRIBE between /') - return done(err) - } - break - } + var err = validateSubscribeTopic(sub.topic) + if (err) { + return done(err) } client.broker.authorizeSubscribe(client, sub, done) } diff --git a/lib/handlers/unsubscribe.js b/lib/handlers/unsubscribe.js index 927eda52..15918485 100644 --- a/lib/handlers/unsubscribe.js +++ b/lib/handlers/unsubscribe.js @@ -1,6 +1,7 @@ 'use strict' var write = require('../write') +var validateSubscribeTopic = require('./validations').validateSubscribeTopic function UnsubscribeState (client, packet, finish, granted) { this.client = client @@ -11,9 +12,18 @@ function UnsubscribeState (client, packet, finish, granted) { function handleUnsubscribe (client, packet, done) { var broker = client.broker + var unsubscriptions = packet.unsubscriptions + var err + + for (var i = 0; i < unsubscriptions.length; i++) { + err = validateSubscribeTopic(unsubscriptions[i]) + if (err) { + return done(err) + } + } if (packet.messageId) { - broker.persistence.removeSubscriptions(client, packet.unsubscriptions, function (err) { + broker.persistence.removeSubscriptions(client, unsubscriptions, function (err) { if (err) { return done(err) } diff --git a/lib/handlers/validations.js b/lib/handlers/validations.js new file mode 100644 index 00000000..0c830b8a --- /dev/null +++ b/lib/handlers/validations.js @@ -0,0 +1,29 @@ +'use strict' + +function validateSubscribeTopic (topic) { + var end = topic.length - 1 + var endMinus = end - 1 + var slashInPreEnd = endMinus > 0 && topic.charCodeAt(endMinus) !== 47 + if (topic.length === 0) { + return new Error('impossible to subscribe to an empty topic') + } + for (var i = 0; i < topic.length; i++) { + switch (topic.charCodeAt(i)) { + case 35: + var notAtTheEnd = i !== end + if (notAtTheEnd || slashInPreEnd) { + return new Error('# is only allowed in SUBSCRIBE in the last position') + } + break + case 43: + var pastChar = i < end - 1 && topic.charCodeAt(i + 1) !== 47 + var preChar = i > 1 && topic.charCodeAt(i - 1) !== 47 + if (pastChar || preChar) { + return new Error('+ is only allowed in SUBSCRIBE between /') + } + break + } + } +} + +module.exports.validateSubscribeTopic = validateSubscribeTopic diff --git a/test/basic.js b/test/basic.js index 6105141e..fb0ee60b 100644 --- a/test/basic.js +++ b/test/basic.js @@ -567,4 +567,18 @@ test('publish invalid topic with +', function (t) { }] }) }) + + test('unsubscribe to invalid topic with "' + topic + '"', function (t) { + var s = connect(setup()) + + s.broker.on('clientError', function () { + t.end() + }) + + s.inStream.write({ + cmd: 'unsubscribe', + messageId: 24, + unsubscriptions: [topic] + }) + }) }) From 587defb22f142fbe2d4008661054cdfc4a776bf9 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 21 Jul 2017 10:09:35 +0200 Subject: [PATCH 21/33] Better error messages for topic validations --- lib/handlers/subscribe.js | 4 ++-- lib/handlers/unsubscribe.js | 4 ++-- lib/handlers/validations.js | 10 +++++----- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index bec22df7..ece4cb0f 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -4,7 +4,7 @@ var write = require('../write') var fastfall = require('fastfall') var Packet = require('aedes-packet') var through = require('through2') -var validateSubscribeTopic = require('./validations').validateSubscribeTopic +var validateTopic = require('./validations').validateTopic var topicActions = fastfall([ authorize, storeSubscriptions, @@ -37,7 +37,7 @@ function doSubscribe (sub, done) { function authorize (sub, done) { var client = this.client - var err = validateSubscribeTopic(sub.topic) + var err = validateTopic(sub.topic, 'SUBSCRIBE') if (err) { return done(err) } diff --git a/lib/handlers/unsubscribe.js b/lib/handlers/unsubscribe.js index 15918485..8b4700a8 100644 --- a/lib/handlers/unsubscribe.js +++ b/lib/handlers/unsubscribe.js @@ -1,7 +1,7 @@ 'use strict' var write = require('../write') -var validateSubscribeTopic = require('./validations').validateSubscribeTopic +var validateTopic = require('./validations').validateTopic function UnsubscribeState (client, packet, finish, granted) { this.client = client @@ -16,7 +16,7 @@ function handleUnsubscribe (client, packet, done) { var err for (var i = 0; i < unsubscriptions.length; i++) { - err = validateSubscribeTopic(unsubscriptions[i]) + err = validateTopic(unsubscriptions[i], 'UNSUBSCRIBE') if (err) { return done(err) } diff --git a/lib/handlers/validations.js b/lib/handlers/validations.js index 0c830b8a..16d9922b 100644 --- a/lib/handlers/validations.js +++ b/lib/handlers/validations.js @@ -1,29 +1,29 @@ 'use strict' -function validateSubscribeTopic (topic) { +function validateTopic (topic, message) { var end = topic.length - 1 var endMinus = end - 1 var slashInPreEnd = endMinus > 0 && topic.charCodeAt(endMinus) !== 47 if (topic.length === 0) { - return new Error('impossible to subscribe to an empty topic') + return new Error('impossible to ' + message + ' to an empty topic') } for (var i = 0; i < topic.length; i++) { switch (topic.charCodeAt(i)) { case 35: var notAtTheEnd = i !== end if (notAtTheEnd || slashInPreEnd) { - return new Error('# is only allowed in SUBSCRIBE in the last position') + return new Error('# is only allowed in ' + message + ' in the last position') } break case 43: var pastChar = i < end - 1 && topic.charCodeAt(i + 1) !== 47 var preChar = i > 1 && topic.charCodeAt(i - 1) !== 47 if (pastChar || preChar) { - return new Error('+ is only allowed in SUBSCRIBE between /') + return new Error('+ is only allowed in ' + message + ' between /') } break } } } -module.exports.validateSubscribeTopic = validateSubscribeTopic +module.exports.validateTopic = validateTopic From 545fff7091d715716101325a2a3fca3cd7aa0fd8 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 21 Jul 2017 10:25:24 +0200 Subject: [PATCH 22/33] Do not forward $SYS topics for +/# subscriptions. Fixes: https://github.com/mcollina/aedes/issues/135 --- aedes.js | 5 ++++- lib/handlers/subscribe.js | 9 ++++++++- test/events.js | 19 +++++++++++++++++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/aedes.js b/aedes.js index a2df0b42..364f1d4a 100644 --- a/aedes.js +++ b/aedes.js @@ -198,8 +198,11 @@ function DoEnqueues () { } } +// + is 43 +// # is 35 function removeSharp (sub) { - return sub.topic !== '#' + var code = sub.topic.charCodeAt(0) + return code !== 43 && code !== 35 } function callPublished (_, done) { diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index ece4cb0f..e0848805 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -100,7 +100,7 @@ function subTopic (sub, done) { break } - if (sub.topic === '#') { + if (isWildcardThatMatchesSys(sub.topic)) { func = blockSys(func) } @@ -118,6 +118,13 @@ function subTopic (sub, done) { } } +// + is 43 +// # is 35 +function isWildcardThatMatchesSys (topic) { + var code = topic.charCodeAt(0) + return code === 43 || code === 35 +} + function completeSubscribe (err) { var packet = this.packet var client = this.client diff --git a/test/events.js b/test/events.js index 0c4286d8..25df8ea3 100644 --- a/test/events.js +++ b/test/events.js @@ -40,6 +40,25 @@ test('does not forward $SYS topics to # subscription', function (t) { }) }) +test('does not forward $SYS topics to +/# subscription', function (t) { + t.plan(4) + var s = connect(setup()) + + subscribe(t, s, '+/#', 0, function () { + s.outStream.once('data', function (packet) { + t.fail('no packet should be received') + }) + + s.broker.mq.emit({ + cmd: 'publish', + topic: '$SYS/hello', + payload: 'world' + }, function () { + t.pass('nothing happened') + }) + }) +}) + test('does not store $SYS topics to QoS 1 # subscription', function (t) { t.plan(3) From 0ca599911ce0b5883c80c9c2ef485ebcbb0db604 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 21 Jul 2017 11:01:53 +0200 Subject: [PATCH 23/33] Bumped v0.29.0. --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 3fec36f3..9d599c60 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aedes", - "version": "0.28.0", + "version": "0.29.0", "description": "Stream-based MQTT broker", "main": "aedes.js", "scripts": { From f5da7799428bea3dc018dc4a4399763122a3b269 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 27 Jul 2017 07:45:36 -0700 Subject: [PATCH 24/33] Travis badge should display master branch --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 48b23f1e..520e440f 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Aedes  [![Build Status](https://travis-ci.org/mcollina/aedes.svg)](https://travis-ci.org/mcollina/aedes) [![Coverage Status](https://coveralls.io/repos/mcollina/aedes/badge.svg?branch=master&service=github)](https://coveralls.io/github/mcollina/aedes?branch=master) +# Aedes  [![Build Status](https://travis-ci.org/mcollina/aedes.svg?branch=master)](https://travis-ci.org/mcollina/aedes) [![Coverage Status](https://coveralls.io/repos/mcollina/aedes/badge.svg?branch=master&service=github)](https://coveralls.io/github/mcollina/aedes?branch=master) Barebone MQTT server that can run on any stream server. From bc3c13dec2dd5d5dfee2facad8807194a13560cc Mon Sep 17 00:00:00 2001 From: David Halls Date: Mon, 31 Jul 2017 07:59:53 +0100 Subject: [PATCH 25/33] Remove qlobber from deps (it's used in aedes-persistence, not here) --- package.json | 1 - 1 file changed, 1 deletion(-) diff --git a/package.json b/package.json index bf9767ac..7b562252 100644 --- a/package.json +++ b/package.json @@ -58,7 +58,6 @@ "mqemitter": "^2.1.0", "mqtt-packet": "^5.4.0", "pump": "^1.0.0", - "qlobber": "^0.8.0", "retimer": "^1.0.0", "reusify": "^1.0.0", "safe-buffer": "^5.1.1", From 25ec7dd5335e71bc7a72102bc9c5378b6686ee0e Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 7 Aug 2017 09:55:23 +0200 Subject: [PATCH 26/33] Revert "parallelize topic sub/unsub handles" This reverts commit e5d9c8af59c8689893de77acd077738132f021a3. --- lib/handlers/subscribe.js | 2 +- lib/handlers/unsubscribe.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index e0848805..86d24b5c 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -23,7 +23,7 @@ function handleSubscribe (client, packet, done) { var subs = packet.subscriptions var granted = [] - broker._parallel( + broker._series( new SubscribeState(client, packet, done, granted), doSubscribe, subs, diff --git a/lib/handlers/unsubscribe.js b/lib/handlers/unsubscribe.js index 8b4700a8..620b5256 100644 --- a/lib/handlers/unsubscribe.js +++ b/lib/handlers/unsubscribe.js @@ -37,7 +37,7 @@ function handleUnsubscribe (client, packet, done) { function actualUnsubscribe (client, packet, done) { var broker = client.broker - broker._parallel( + broker._series( new UnsubscribeState(client, packet, done, null), doUnsubscribe, packet.unsubscriptions, From b4ab8ba8eb7b8f910592a2646fd39c9a16fa3476 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 7 Aug 2017 10:00:50 +0200 Subject: [PATCH 27/33] Bumped 0.29.1. --- package.json | 14 +++++++------- test/auth.js | 1 - 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/package.json b/package.json index 7b562252..41e46a16 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aedes", - "version": "0.29.0", + "version": "0.29.1", "description": "Stream-based MQTT broker", "main": "aedes.js", "scripts": { @@ -36,15 +36,15 @@ "concat-stream": "^1.4.7", "convert-hrtime": "^2.0.0", "coveralls": "^2.11.6", - "duplexify": "^3.4.1", + "duplexify": "^3.5.1", "faucet": "0.0.1", "istanbul": "^0.4.1", - "mqtt": "^2.9.1", + "mqtt": "^2.11.0", "mqtt-connection": "^3.0.0", "pre-commit": "^1.0.10", - "standard": "^10.0.0", - "tape": "^4.7.0", - "websocket-stream": "^5.0.0" + "standard": "^10.0.3", + "tape": "^4.8.0", + "websocket-stream": "^5.0.1" }, "dependencies": { "aedes-packet": "^1.0.0", @@ -59,7 +59,7 @@ "mqtt-packet": "^5.4.0", "pump": "^1.0.0", "retimer": "^1.0.0", - "reusify": "^1.0.0", + "reusify": "^1.0.2", "safe-buffer": "^5.1.1", "shortid": "^2.1.3", "through2": "^2.0.0", diff --git a/test/auth.js b/test/auth.js index bfcb186e..0077d86e 100644 --- a/test/auth.js +++ b/test/auth.js @@ -543,7 +543,6 @@ test('negate multiple subscriptions random times', function (t) { }) }) - test('failed authentication does not disconnect other client with same clientId', function (t) { t.plan(3) From d2067cd7fae42771755fd71a713afc24a6a85098 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 7 Aug 2017 12:03:54 +0200 Subject: [PATCH 28/33] Update copyright year and removed TODO list. --- LICENSE | 2 +- README.md | 26 -------------------------- 2 files changed, 1 insertion(+), 27 deletions(-) diff --git a/LICENSE b/LICENSE index 02845407..08205b16 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2015 Matteo Collina, http://matteocollina.com +Copyright (c) 2015-2017 Matteo Collina, http://matteocollina.com Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation diff --git a/README.md b/README.md index 520e440f..6f4d820d 100644 --- a/README.md +++ b/README.md @@ -392,32 +392,6 @@ You can subscribe on the following `$SYS` topics to get client presence: - `$SYS/+/disconnect/clients` - will inform about client disconnections. The payload will contain the `clientId` of the connected/disconnected client - - -## Todo - -* [x] QoS 0 support -* [x] Retain messages support -* [x] QoS 1 support -* [x] QoS 2 support -* [x] clean=false support -* [x] Keep alive support -* [x] Will messages must survive crash -* [x] Authentication -* [x] Events -* [x] Wait a CONNECT packet only for X seconds -* [x] Support a CONNECT packet without a clientId -* [x] Disconnect other clients with the same client.id -* [x] Write docs -* [x] Support counting the number of offline clients and subscriptions -* [x] Performance optimizations for QoS 1 and Qos 2 -* [x] Add `client#publish()` and `client#subscribe()` -* [x] move the persistence in a separate module -* [x] mongo persistence ([external module](http://npm.im/aedes-persistence-mongodb)) -* [x] redis persistence ([external module](http://npm.im/aedes-persistence-redis)) -* [x] leveldb persistence ([external module](http://npm.im/aedes-persistence-level)) -* [ ] cluster support (external module) - ## Acknowledgements This library is born after a lot of discussion with all From f084a2b96e7ca6593a12dd382331ea7a64583dc2 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 7 Aug 2017 12:06:36 +0200 Subject: [PATCH 29/33] Bumped 0.29.2. --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 41e46a16..430d902d 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aedes", - "version": "0.29.1", + "version": "0.29.2", "description": "Stream-based MQTT broker", "main": "aedes.js", "scripts": { From 7f1b56b165db6ebaf6b165f9471799410cf1c137 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Wed, 16 Aug 2017 17:15:47 +0200 Subject: [PATCH 30/33] Clear up drain listeners in the case of an error Fixes #148. --- lib/client.js | 4 ++++ test/basic.js | 23 +++++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/lib/client.js b/lib/client.js index 3abfd86f..e41bbf02 100644 --- a/lib/client.js +++ b/lib/client.js @@ -251,6 +251,10 @@ Client.prototype.close = function (done) { var list = (state.getBuffer && state.getBuffer()) || state.buffer list.forEach(drainRequest) + // clear up the drain event listeners + that.conn.emit('drain') + that.conn.removeAllListeners('drain') + if (conn.destroySoon) { conn.destroySoon() } if (conn.destroy) { diff --git a/test/basic.js b/test/basic.js index fb0ee60b..00b1dbe0 100644 --- a/test/basic.js +++ b/test/basic.js @@ -582,3 +582,26 @@ test('publish invalid topic with +', function (t) { }) }) }) + +test('clear drain', function (t) { + t.plan(4) + + var s = connect(setup()) + + subscribe(t, s, 'hello', 0, function () { + // fake a busy socket + s.conn.write = function (chunk, enc, cb) { + return false + } + + s.broker.publish({ + cmd: 'publish', + topic: 'hello', + payload: 'world' + }, function () { + t.pass('callback called') + }) + + s.conn.destroy() + }) +}) From 4560c1f8631d5e60e3c8a4487f7f8ccf7bbc73f2 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 17 Aug 2017 10:19:51 +0200 Subject: [PATCH 31/33] Bumped 0.29.3. --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 430d902d..8208a0e2 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aedes", - "version": "0.29.2", + "version": "0.29.3", "description": "Stream-based MQTT broker", "main": "aedes.js", "scripts": { From b2eccfe3decbbe941d798cfac35b56fde072aa69 Mon Sep 17 00:00:00 2001 From: Behrad Date: Tue, 18 Jul 2017 01:13:49 +0430 Subject: [PATCH 32/33] remove parallel and use outgoingEnqueueCombi --- aedes.js | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/aedes.js b/aedes.js index cc5125d3..364f1d4a 100644 --- a/aedes.js +++ b/aedes.js @@ -191,9 +191,7 @@ function DoEnqueues () { that.complete = null that.topic = null - broker._parallel( - status, - doEnqueue, subs, complete) + broker.persistence.outgoingEnqueueCombi(subs, status.packet, complete) broker._enqueuers.release(that) } @@ -207,10 +205,6 @@ function removeSharp (sub) { return code !== 43 && code !== 35 } -function doEnqueue (sub, done) { - this.broker.persistence.outgoingEnqueue(sub, this.packet, done) -} - function callPublished (_, done) { this.broker.published(this.packet, this.client, done) this.broker.emit('publish', this.packet, this.client) From 2162c476db774ac4ef484b682ffb990993bb51c9 Mon Sep 17 00:00:00 2001 From: Behrad Date: Fri, 21 Jul 2017 19:52:38 +0430 Subject: [PATCH 33/33] update aedes-persistence --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 8208a0e2..20904e75 100644 --- a/package.json +++ b/package.json @@ -48,7 +48,7 @@ }, "dependencies": { "aedes-packet": "^1.0.0", - "aedes-persistence": "^4.0.0", + "aedes-persistence": "^5.0.2", "bulk-write-stream": "^1.0.0", "end-of-stream": "^1.1.0", "fastfall": "^1.0.0",