Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bridge): support incoming mosquitto bridge connections #584

Merged
merged 7 commits into from
Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Check Docker docs [here](https://github.com/moscajs/aedes-cli#docker)
- [Dynamic Topics][dynamic_topics] Support
- MQTT Bridge Support between aedes
- [MQTT 5.0][mqttv5] _(not support yet)_
- [Bridge Protocol][bridge_protocol] _(not support yet)_
- [Bridge Protocol][bridge_protocol] _(incoming connections only)_
phil-mitchell marked this conversation as resolved.
Show resolved Hide resolved

## Examples

Expand All @@ -95,6 +95,20 @@ Other info:

- The repo [aedes-tests](https://github.com/moscajs/aedes-tests) is used to test aedes with clusters and different emitters/persistences. Check its source code to have a starting point on how to work with clusters

## Bridge connections

Normally, when publishing a message, the `retain` flag is consumed by Aedes and
then set to `false`. This is done for two reasons:

- MQTT-3.3.1-9 states that it MUST set the RETAIN flag to 0 when a PUBLISH
Packet is sent to a Client because it matches an established subscription
regardless of how the flag was set in the message it received.
- When operating as a cluster, only one Aedes node may store the packet

Brokers that support the [Bridge Protocol][bridge_protocol] can connect to
Aedes. When connecting with this special protocol, subscriptions work as usual
except that the `retain` flag in the packet is propagated as-is.

## Exensions

- [aedes-logging]: Logging module for Aedes, based on Pino
Expand Down
38 changes: 36 additions & 2 deletions aedes.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ function storeRetained (packet, done) {
}

function emitPacket (packet, done) {
packet.retain = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? This is used to prevent other broker instances to store this packet again when it's emiitted

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this cannot be easily removed.

Copy link
Contributor Author

@phil-mitchell phil-mitchell Feb 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to keep the retain on the packet until we are able to check if "retain as published" (rap) is true. That's the whole point of the rap flag. The bridge protocol enables retain-as-published. I moved this into the subscription handler. What are the concerns with moving this into the subscription handler?

The tests are passing with my change, so if there is a problem with removing this line, it must not have been covered by the tests. I'm happy to make adjustments, but I'd like to know more about the concerns.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you link it to the block of code that replaces this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcollina this is what handlePacketSubscription in subscribe.js is doing. Instead of globally setting it to false for all subscriptions, it sets it to false only if the retain-as-published flag is not set on the subscription.

If clusters go through a different (untested?) path, then clearing the retain flag can happen on that path, or I could set up a different emitter for client subscriptions so that clusters always get the flag set to false while bridges and regular clients respect the rap flag.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should verify if this is still needed, e.g. if the packet gets stored multiple times when using a cluster.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only happens when a user sends a packet using broker.publish function. BTW I think that it could be safely removed by double checking the code, the 'worst' thing that could happen is some useless write to the db

Copy link
Contributor Author

@phil-mitchell phil-mitchell Feb 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had thought of the _stored idea, but wasn't sure whether it would cause other issues. In any case, it becomes the event listener's responsibility to not retain the packet if it's not supposed to. I guess that can be a breaking change for some custom code, so I understand it's a bit dangerous. But I don't see a good way to support bridge protocol (or, going forward, MQTT v5) without doing this.

As I've said, the tests pass, including some new ones I added, so how can I verify this concern?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I've said, the tests pass, including some new ones I added, so how can I verify this concern?

You should add a test where you do a broker subscribe followed by a broker publish to check if the received message has the retained flag cleared or not

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the test and, indeed, the retain flag was previously set to false and is now true. I've pushed a change with a proposed solution. The publish function now takes the rap parameter and applies the wrapper function that I had previously placed in subscription.js. This way all callers of subscribe have the retain-as-published functionality available to them but default to the standard behaviour of setting retain to false.

this.broker.mq.emit(packet, done)
}

Expand Down Expand Up @@ -257,11 +256,46 @@ Aedes.prototype.publish = function (packet, client, done) {
this._series(new PublishState(this, client, packet), publishFuncs, p, done)
}

Aedes.prototype.subscribe = function (topic, func, done) {
function createRetainToFalseWrapper (broker, topic, func) {
func._retainToFalseWrappers = func._wrappers || {}
func._retainToFalseWrappers[topic] = func._retainToFalseWrappers[topic] || {}
func._retainToFalseWrappers[topic][broker] = function handlePacketSubscription (_packet, cb) {
_packet = new Packet(_packet, broker)
_packet.retain = false
func(_packet, cb)
}
func._retainToFalseWrappers[topic][broker]._freeWrapper = function freeWrapper () {
delete ((func._retainToFalseWrappers || {})[topic] || {})[broker]
if (Object.keys((func._retainToFalseWrappers || {})[topic] || {}).length === 0) {
delete func._retainToFalseWrappers[topic]
}
if (Object.keys(func._retainToFalseWrappers).length === 0) {
delete func._retainToFalseWrappers
}
}
return func._retainToFalseWrappers[topic][broker]
}

Aedes.prototype.subscribe = function (topic, func, rap, done) {
if (done === undefined && typeof rap === 'function') {
done = rap
rap = false
}
if (!rap) {
func = createRetainToFalseWrapper(this, topic, func)
}
this.mq.on(topic, func, done)
}

Aedes.prototype.unsubscribe = function (topic, func, done) {
if (func) {
func = ((func._retainToFalseWrappers || {})[topic] || {})[this] || func
}

if (func && func._freeWrapper) {
func._freeWrapper()
}

this.mq.removeListener(topic, func, done)
}

Expand Down
7 changes: 5 additions & 2 deletions docs/Aedes.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
- [Event: connackSent](#event-connacksent)
- [Event: closed](#event-closed)
- [aedes.handle (stream)](#aedeshandle-stream)
- [aedes.subscribe (topic, deliverfunc, callback)](#aedessubscribe-topic-deliverfunc-callback)
- [aedes.subscribe (topic, deliverfunc\[, retainAsPublised\], callback)](#aedessubscribe-topic-deliverfunc-callback)
- [aedes.unsubscribe (topic, deliverfunc, callback)](#aedesunsubscribe-topic-deliverfunc-callback)
- [aedes.publish (packet, callback)](#aedespublish-packet-callback)
- [aedes.close ([callback])](#aedesclose-callback)
Expand Down Expand Up @@ -176,16 +176,19 @@ const aedes = require('./aedes')()
const server = require('net').createServer(aedes.handle)
```

## aedes.subscribe (topic, deliverfunc, callback)
## aedes.subscribe (topic, deliverfunc\[, retainAsPublised\], callback)

- topic: `<string>`
- retainAsPublished: `<boolean>`
- deliverfunc: `<Function>` `(packet, cb) => void`
- packet: `<aedes-packet>` & [`PUBLISH`][PUBLISH]
- cb: `<Function>`
- callback: `<Function>`

Directly subscribe a `topic` in server side. Bypass [`authorizeSubscribe`](#handler-authorizesubscribe-client-subscription-callback)

If `retainAsPublished` is `true` (default `false`), the retain flag in the packet is passed to `deliverFunc` as it was published. Otherwise, it is set to `false` before being passed to `deliverFunc`.

The `topic` and `deliverfunc` is a compound key to differentiate the uniqueness of its subscription pool. `topic` could be the one that is existed, in this case `deliverfunc` will be invoked as well as `SUBSCRIBE` does.

`deliverfunc` supports backpressue.
Expand Down
36 changes: 28 additions & 8 deletions lib/handlers/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,23 @@ function SubAck (packet, granted) {
this.granted = granted
}

function Subscription (qos, func) {
function Subscription (qos, func, rh, rap, nl) {
this.qos = qos
this.func = func

// retain-handling indicates how retained messages should be
// handled when a new subscription is created
// (see [MQTT-3.3.1-9] through [MQTT-3.3.1-11])
this.rh = rh

// retain-as-published indicates whether to leave the retain flag as-is (true)
// or to clear it before sending to subscriptions (false) default false
// (see [MQTT-3.3.1-12] through [MQTT-3.3.1-13])
this.rap = rap

// no-local indicates that a client should not receive its own
// messages (see [MQTT-3.8.3-3])
this.nl = nl
}

function SubscribeState (client, packet, restore, finish) {
Expand All @@ -36,10 +50,13 @@ function SubscribeState (client, packet, restore, finish) {
this.subState = []
}

function SubState (client, packet, granted) {
function SubState (client, packet, granted, rh, rap, nl) {
this.client = client
this.packet = packet
this.granted = granted
this.rh = rh
this.rap = rap
this.nl = nl
}

// if same subscribed topic in subs array, we pick up the last one
Expand All @@ -66,7 +83,7 @@ function handleSubscribe (client, packet, restore, done) {
}

function doSubscribe (sub, done) {
const s = new SubState(this.client, this.packet, sub.qos)
const s = new SubState(this.client, this.packet, sub.qos, sub.rh, sub.rap, sub.nl)
this.subState.push(s)
this.actions.call(s, sub, done)
}
Expand Down Expand Up @@ -119,6 +136,9 @@ function addSubs (sub, done) {
const broker = client.broker
const topic = sub.topic
const qos = sub.qos
const rh = this.rh
const rap = this.rap
const nl = this.nl
let func = qos > 0 ? client.deliverQoS : client.deliver0

// [MQTT-4.7.2-1]
Expand All @@ -127,12 +147,12 @@ function addSubs (sub, done) {
}

if (!client.subscriptions[topic]) {
client.subscriptions[topic] = new Subscription(qos, func)
broker.subscribe(topic, func, done)
} else if (client.subscriptions[topic].qos !== qos) {
client.subscriptions[topic] = new Subscription(qos, func, rh, rap, nl)
broker.subscribe(topic, func, rap, done)
} else if (client.subscriptions[topic].qos !== qos || client.subscriptions[topic].rh !== rh || client.subscriptions[topic].rap !== rap || client.subscriptions[topic].nl !== nl) {
broker.unsubscribe(topic, client.subscriptions[topic].func)
client.subscriptions[topic] = new Subscription(qos, func)
broker.subscribe(topic, func, done)
client.subscriptions[topic] = new Subscription(qos, func, rh, rap, nl)
broker.subscribe(topic, func, rap, done)
} else {
done()
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
"fastseries": "^2.0.0",
"hyperid": "^2.0.5",
"mqemitter": "^4.4.0",
"mqtt-packet": "^6.7.0",
"mqtt-packet": "^6.9.0",
"readable-stream": "^3.6.0",
"retimer": "^2.0.0",
"reusify": "^1.0.4",
Expand Down
83 changes: 83 additions & 0 deletions test/client-pub-sub.js
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,89 @@ test('subscribe a client programmatically', function (t) {
})
})

test('subscribe a client programmatically clears retain', function (t) {
t.plan(3)

const broker = aedes()
t.tearDown(broker.close.bind(broker))

const expected = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
dup: false,
length: 12,
qos: 0,
retain: false
}

broker.on('client', function (client) {
client.subscribe({
topic: 'hello',
qos: 0
}, function (err) {
t.error(err, 'no error')

broker.publish({
topic: 'hello',
payload: Buffer.from('world'),
qos: 0,
retain: true
}, function (err) {
t.error(err, 'no error')
})
})
})

const s = connect(setup(broker))

s.outStream.once('data', function (packet) {
t.deepEqual(packet, expected, 'packet matches')
})
})

test('subscribe a bridge programmatically keeps retain', function (t) {
t.plan(3)

const broker = aedes()
t.tearDown(broker.close.bind(broker))

const expected = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
dup: false,
length: 12,
qos: 0,
retain: true
}

broker.on('client', function (client) {
client.subscribe({
topic: 'hello',
qos: 0,
rap: true
}, function (err) {
t.error(err, 'no error')

broker.publish({
topic: 'hello',
payload: Buffer.from('world'),
qos: 0,
retain: true
}, function (err) {
t.error(err, 'no error')
})
})
})

const s = connect(setup(broker))

s.outStream.once('data', function (packet) {
t.deepEqual(packet, expected, 'packet matches')
})
})

test('subscribe throws error when QoS > 0', function (t) {
t.plan(3)

Expand Down
32 changes: 32 additions & 0 deletions test/retain.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,38 @@ test('retain messages', function (t) {
publisher.inStream.write(expected)
})

test('retain messages only once in cluster setup', function (t) {
t.plan(3)

const broker = aedes()
t.tearDown(broker.close.bind(broker))

const expected = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
qos: 0,
dup: false,
length: 12,
retain: true
}

const subscriberFunc = function (packet, cb) {
cb()
setImmediate(function () {
t.equal(packet.retain, false, 'packet must not have retain')
broker.unsubscribe('hello', subscriberFunc, function () {
t.notOk(subscriberFunc._retainToFalseWrappers, 'wrapper function must be freed')
})
})
}

broker.subscribe('hello', subscriberFunc, function () {
t.ok(((subscriberFunc._retainToFalseWrappers || {}).hello || {})[broker], 'wrapper function must be created')
broker.publish(expected)
})
})

test('avoid wrong deduping of retain messages', function (t) {
t.plan(7)

Expand Down