Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove parallel and use outgoingEnqueueCombi #131

Closed
wants to merge 39 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
54c5934
remove parallel and use outgoingEnqueueCombi
behrad Jul 17, 2017
f1f1b0a
Validate PUBLISH topics
mcollina Jul 19, 2017
0606069
Added topic validations for SUBSCRIBE.
mcollina Jul 20, 2017
7abd055
Added test for empty topic
mcollina Jul 20, 2017
9f6d351
Added test for empty topic subscription.
mcollina Jul 20, 2017
dcaf4f7
Correct validations.
mcollina Jul 20, 2017
79e8854
Error on PUBLISH with empty topic
mcollina Jul 21, 2017
98cbfee
Added topic validations for unsubscribe.
mcollina Jul 21, 2017
8226e91
Better error messages for topic validations
mcollina Jul 21, 2017
352adc6
Merge pull request #134 from mcollina/validate-topics
mcollina Jul 21, 2017
b369602
Do not forward $SYS topics for +/# subscriptions.
mcollina Jul 21, 2017
3a3ac38
Merge pull request #136 from mcollina/block-sys-with-plus
mcollina Jul 21, 2017
dc300b9
Bumped v0.29.0.
mcollina Jul 21, 2017
185ab4b
failing test for wrong granted order
mcollina Jul 21, 2017
081027f
update aedes-persistence
behrad Jul 21, 2017
318ad06
Validate PUBLISH topics
mcollina Jul 19, 2017
dbeaf72
Added topic validations for SUBSCRIBE.
mcollina Jul 20, 2017
115266d
Added test for empty topic
mcollina Jul 20, 2017
358e0a6
Added test for empty topic subscription.
mcollina Jul 20, 2017
2f167c8
Correct validations.
mcollina Jul 20, 2017
4a1d48c
Error on PUBLISH with empty topic
mcollina Jul 21, 2017
65a3e76
Added topic validations for unsubscribe.
mcollina Jul 21, 2017
587defb
Better error messages for topic validations
mcollina Jul 21, 2017
545fff7
Do not forward $SYS topics for +/# subscriptions.
mcollina Jul 21, 2017
0ca5999
Bumped v0.29.0.
mcollina Jul 21, 2017
f5da779
Travis badge should display master branch
mcollina Jul 27, 2017
bc3c13d
Remove qlobber from deps (it's used in aedes-persistence, not here)
davedoesdev Jul 31, 2017
b1b51be
Merge pull request #142 from davedoesdev/master
mcollina Aug 2, 2017
25ec7dd
Revert "parallelize topic sub/unsub handles"
mcollina Aug 7, 2017
e4b2259
Merge pull request #140 from mcollina/ordered-granted
mcollina Aug 7, 2017
b4ab8ba
Bumped 0.29.1.
mcollina Aug 7, 2017
d2067cd
Update copyright year and removed TODO list.
mcollina Aug 7, 2017
f084a2b
Bumped 0.29.2.
mcollina Aug 7, 2017
7f1b56b
Clear up drain listeners in the case of an error
mcollina Aug 16, 2017
10ab8e3
Merge pull request #149 from mcollina/drain
mcollina Aug 17, 2017
4560c1f
Bumped 0.29.3.
mcollina Aug 17, 2017
b2eccfe
remove parallel and use outgoingEnqueueCombi
behrad Jul 17, 2017
2162c47
update aedes-persistence
behrad Jul 21, 2017
13cddd2
Merge remote-tracking branch 'origin/outgoingEnqueueCombi' into outgo…
behrad Aug 18, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright (c) 2015 Matteo Collina, http://matteocollina.com
Copyright (c) 2015-2017 Matteo Collina, http://matteocollina.com

Permission is hereby granted, free of charge, to any person
obtaining a copy of this software and associated documentation
Expand Down
28 changes: 1 addition & 27 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Aedes  [![Build Status](https://travis-ci.org/mcollina/aedes.svg)](https://travis-ci.org/mcollina/aedes) [![Coverage Status](https://coveralls.io/repos/mcollina/aedes/badge.svg?branch=master&service=github)](https://coveralls.io/github/mcollina/aedes?branch=master)
# Aedes  [![Build Status](https://travis-ci.org/mcollina/aedes.svg?branch=master)](https://travis-ci.org/mcollina/aedes) [![Coverage Status](https://coveralls.io/repos/mcollina/aedes/badge.svg?branch=master&service=github)](https://coveralls.io/github/mcollina/aedes?branch=master)

Barebone MQTT server that can run on any stream server.

Expand Down Expand Up @@ -392,32 +392,6 @@ You can subscribe on the following `$SYS` topics to get client presence:
- `$SYS/+/disconnect/clients` - will inform about client disconnections.
The payload will contain the `clientId` of the connected/disconnected client


<a name="todo"></a>
## Todo

* [x] QoS 0 support
* [x] Retain messages support
* [x] QoS 1 support
* [x] QoS 2 support
* [x] clean=false support
* [x] Keep alive support
* [x] Will messages must survive crash
* [x] Authentication
* [x] Events
* [x] Wait a CONNECT packet only for X seconds
* [x] Support a CONNECT packet without a clientId
* [x] Disconnect other clients with the same client.id
* [x] Write docs
* [x] Support counting the number of offline clients and subscriptions
* [x] Performance optimizations for QoS 1 and Qos 2
* [x] Add `client#publish()` and `client#subscribe()`
* [x] move the persistence in a separate module
* [x] mongo persistence ([external module](http://npm.im/aedes-persistence-mongodb))
* [x] redis persistence ([external module](http://npm.im/aedes-persistence-redis))
* [x] leveldb persistence ([external module](http://npm.im/aedes-persistence-level))
* [ ] cluster support (external module)

## Acknowledgements

This library is born after a lot of discussion with all
Expand Down
13 changes: 5 additions & 8 deletions aedes.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,21 +191,18 @@ function DoEnqueues () {
that.complete = null
that.topic = null

broker._parallel(
status,
doEnqueue, subs, complete)
broker.persistence.outgoingEnqueueCombi(subs, status.packet, complete)

broker._enqueuers.release(that)
}
}
}

// + is 43
// # is 35
function removeSharp (sub) {
return sub.topic !== '#'
}

function doEnqueue (sub, done) {
this.broker.persistence.outgoingEnqueue(sub, this.packet, done)
var code = sub.topic.charCodeAt(0)
return code !== 43 && code !== 35
}

function callPublished (_, done) {
Expand Down
4 changes: 4 additions & 0 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ Client.prototype.close = function (done) {
var list = (state.getBuffer && state.getBuffer()) || state.buffer
list.forEach(drainRequest)

// clear up the drain event listeners
that.conn.emit('drain')
that.conn.removeAllListeners('drain')

if (conn.destroySoon) {
conn.destroySoon()
} if (conn.destroy) {
Expand Down
16 changes: 16 additions & 0 deletions lib/handlers/publish.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,22 @@ var publishActions = [
enqueuePublish
]
function handlePublish (client, packet, done) {
var topic = packet.topic
var err
if (topic.length === 0) {
err = new Error('empty topic not allowed in PUBLISH')
return done(err)
}
for (var i = 0; i < topic.length; i++) {
switch (topic.charCodeAt(i)) {
case 35:
err = new Error('# is not allowed in PUBLISH')
return done(err)
case 43:
err = new Error('+ is not allowed in PUBLISH')
return done(err)
}
}
client.broker._series(client, publishActions, packet, done)
}

Expand Down
16 changes: 14 additions & 2 deletions lib/handlers/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ var write = require('../write')
var fastfall = require('fastfall')
var Packet = require('aedes-packet')
var through = require('through2')
var validateTopic = require('./validations').validateTopic
var topicActions = fastfall([
authorize,
storeSubscriptions,
Expand All @@ -22,7 +23,7 @@ function handleSubscribe (client, packet, done) {
var subs = packet.subscriptions
var granted = []

broker._parallel(
broker._series(
new SubscribeState(client, packet, done, granted),
doSubscribe,
subs,
Expand All @@ -36,6 +37,10 @@ function doSubscribe (sub, done) {

function authorize (sub, done) {
var client = this.client
var err = validateTopic(sub.topic, 'SUBSCRIBE')
if (err) {
return done(err)
}
client.broker.authorizeSubscribe(client, sub, done)
}

Expand Down Expand Up @@ -95,7 +100,7 @@ function subTopic (sub, done) {
break
}

if (sub.topic === '#') {
if (isWildcardThatMatchesSys(sub.topic)) {
func = blockSys(func)
}

Expand All @@ -113,6 +118,13 @@ function subTopic (sub, done) {
}
}

// + is 43
// # is 35
function isWildcardThatMatchesSys (topic) {
var code = topic.charCodeAt(0)
return code === 43 || code === 35
}

function completeSubscribe (err) {
var packet = this.packet
var client = this.client
Expand Down
14 changes: 12 additions & 2 deletions lib/handlers/unsubscribe.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

var write = require('../write')
var validateTopic = require('./validations').validateTopic

function UnsubscribeState (client, packet, finish, granted) {
this.client = client
Expand All @@ -11,9 +12,18 @@ function UnsubscribeState (client, packet, finish, granted) {

function handleUnsubscribe (client, packet, done) {
var broker = client.broker
var unsubscriptions = packet.unsubscriptions
var err

for (var i = 0; i < unsubscriptions.length; i++) {
err = validateTopic(unsubscriptions[i], 'UNSUBSCRIBE')
if (err) {
return done(err)
}
}

if (packet.messageId) {
broker.persistence.removeSubscriptions(client, packet.unsubscriptions, function (err) {
broker.persistence.removeSubscriptions(client, unsubscriptions, function (err) {
if (err) {
return done(err)
}
Expand All @@ -27,7 +37,7 @@ function handleUnsubscribe (client, packet, done) {

function actualUnsubscribe (client, packet, done) {
var broker = client.broker
broker._parallel(
broker._series(
new UnsubscribeState(client, packet, done, null),
doUnsubscribe,
packet.unsubscriptions,
Expand Down
29 changes: 29 additions & 0 deletions lib/handlers/validations.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
'use strict'

function validateTopic (topic, message) {
var end = topic.length - 1
var endMinus = end - 1
var slashInPreEnd = endMinus > 0 && topic.charCodeAt(endMinus) !== 47
if (topic.length === 0) {
return new Error('impossible to ' + message + ' to an empty topic')
}
for (var i = 0; i < topic.length; i++) {
switch (topic.charCodeAt(i)) {
case 35:
var notAtTheEnd = i !== end
if (notAtTheEnd || slashInPreEnd) {
return new Error('# is only allowed in ' + message + ' in the last position')
}
break
case 43:
var pastChar = i < end - 1 && topic.charCodeAt(i + 1) !== 47
var preChar = i > 1 && topic.charCodeAt(i - 1) !== 47
if (pastChar || preChar) {
return new Error('+ is only allowed in ' + message + ' between /')
}
break
}
}
}

module.exports.validateTopic = validateTopic
17 changes: 8 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "aedes",
"version": "0.28.0",
"version": "0.29.3",
"description": "Stream-based MQTT broker",
"main": "aedes.js",
"scripts": {
Expand Down Expand Up @@ -36,19 +36,19 @@
"concat-stream": "^1.4.7",
"convert-hrtime": "^2.0.0",
"coveralls": "^2.11.6",
"duplexify": "^3.4.1",
"duplexify": "^3.5.1",
"faucet": "0.0.1",
"istanbul": "^0.4.1",
"mqtt": "^2.9.1",
"mqtt": "^2.11.0",
"mqtt-connection": "^3.0.0",
"pre-commit": "^1.0.10",
"standard": "^10.0.0",
"tape": "^4.7.0",
"websocket-stream": "^5.0.0"
"standard": "^10.0.3",
"tape": "^4.8.0",
"websocket-stream": "^5.0.1"
},
"dependencies": {
"aedes-packet": "^1.0.0",
"aedes-persistence": "^4.0.0",
"aedes-persistence": "^5.0.2",
"bulk-write-stream": "^1.0.0",
"end-of-stream": "^1.1.0",
"fastfall": "^1.0.0",
Expand All @@ -58,9 +58,8 @@
"mqemitter": "^2.1.0",
"mqtt-packet": "^5.4.0",
"pump": "^1.0.0",
"qlobber": "^0.8.0",
"retimer": "^1.0.0",
"reusify": "^1.0.0",
"reusify": "^1.0.2",
"safe-buffer": "^5.1.1",
"shortid": "^2.1.3",
"through2": "^2.0.0",
Expand Down
64 changes: 64 additions & 0 deletions test/auth.js
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,70 @@ test('negate subscription', function (t) {
})
})

test('negate multiple subscriptions', function (t) {
t.plan(5)

var s = connect(setup())

s.broker.authorizeSubscribe = function (client, sub, cb) {
t.ok(client, 'client exists')
cb(null, null)
}

s.inStream.write({
cmd: 'subscribe',
messageId: 24,
subscriptions: [{
topic: 'hello',
qos: 0
}, {
topic: 'world',
qos: 0
}]
})

s.outStream.once('data', function (packet) {
t.equal(packet.cmd, 'suback')
t.deepEqual(packet.granted, [128, 128])
t.equal(packet.messageId, 24)
})
})

test('negate multiple subscriptions random times', function (t) {
t.plan(5)

var s = connect(setup())

s.broker.authorizeSubscribe = function (client, sub, cb) {
t.ok(client, 'client exists')
if (sub.topic === 'hello') {
setTimeout(function () {
cb(null, sub)
}, 100)
} else {
cb(null, null)
}
}

s.inStream.write({
cmd: 'subscribe',
messageId: 24,
subscriptions: [{
topic: 'hello',
qos: 0
}, {
topic: 'world',
qos: 0
}]
})

s.outStream.once('data', function (packet) {
t.equal(packet.cmd, 'suback')
t.deepEqual(packet.granted, [0, 128])
t.equal(packet.messageId, 24)
})
})

test('failed authentication does not disconnect other client with same clientId', function (t) {
t.plan(3)

Expand Down
Loading