Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bridge): support incoming mosquitto bridge connections #584

Merged
merged 7 commits into from
Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Check Docker docs [here](https://github.com/moscajs/aedes-cli#docker)
- [Dynamic Topics][dynamic_topics] Support
- MQTT Bridge Support between aedes
- [MQTT 5.0][mqttv5] _(not support yet)_
- [Bridge Protocol][bridge_protocol] _(not support yet)_
- [Bridge Protocol][bridge_protocol] _(incoming connections only)_
phil-mitchell marked this conversation as resolved.
Show resolved Hide resolved

## Examples

Expand All @@ -95,6 +95,15 @@ Other info:

- The repo [aedes-tests](https://github.com/moscajs/aedes-tests) is used to test aedes with clusters and different emitters/persistences. Check its source code to have a starting point on how to work with clusters

## Bridge connections

Normally, when publishing a message, the `retain` flag is consumed by Aedes and
then set to `false` so that recipients of the message do not retain it as well.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not really correct, it is set to false because Aedes can be used in a cluster env, in such cases the mqemitter is used to share a message between different broker instances, the retain is set to false to prevent other instances to store it again

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robertsLando that's correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably also true, but my understanding of the MQTT spec is that the retain flag needs to be false when received through a subscribe.

http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html

It MUST set the RETAIN flag to 0 when a PUBLISH Packet is sent to a Client because it matches an established subscription regardless of how the flag was set in the message it received [MQTT-3.3.1-9].

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all I would add this comment there, then sincerly I dunno what's the best way to continue as both seems to have a reson to be accepted. I see no reference on bridge retain handling on specs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bridge retain handling is outside of the standard, so the only spec about bridge retain handling is here: https://github.com/mqtt/mqtt.github.io/wiki/bridge_protocol

However, MQTT v5 does mention retain handling: https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html

The setting of the RETAIN flag in an Application Message forwarded by the Server from an established connection is controlled by the Retain As Published subscription option. Refer to section 3.8.3.1 for a definition of the Subscription Options.
· If the value of Retain As Published subscription option is set to 0, the Server MUST set the RETAIN flag to 0 when forwarding an Application Message regardless of how the RETAIN flag was set in the received PUBLISH packet [MQTT-3.3.1-12].
· If the value of Retain As Published subscription option is set to 1, the Server MUST set the RETAIN flag equal to the RETAIN flag in the received PUBLISH packet [MQTT-3.3.1-13].


Brokers that support the [Bridge Protocol][bridge_protocol] can connect to
Aedes. When connecting with this special protocol, subscriptions work as usual
excecpt that the `retain` flag in the packet is propagated as-is.

## Exensions

- [aedes-logging]: Logging module for Aedes, based on Pino
Expand Down
1 change: 0 additions & 1 deletion aedes.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ function storeRetained (packet, done) {
}

function emitPacket (packet, done) {
packet.retain = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? This is used to prevent other broker instances to store this packet again when it's emiitted

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this cannot be easily removed.

Copy link
Contributor Author

@phil-mitchell phil-mitchell Feb 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to keep the retain on the packet until we are able to check if "retain as published" (rap) is true. That's the whole point of the rap flag. The bridge protocol enables retain-as-published. I moved this into the subscription handler. What are the concerns with moving this into the subscription handler?

The tests are passing with my change, so if there is a problem with removing this line, it must not have been covered by the tests. I'm happy to make adjustments, but I'd like to know more about the concerns.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you link it to the block of code that replaces this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcollina this is what handlePacketSubscription in subscribe.js is doing. Instead of globally setting it to false for all subscriptions, it sets it to false only if the retain-as-published flag is not set on the subscription.

If clusters go through a different (untested?) path, then clearing the retain flag can happen on that path, or I could set up a different emitter for client subscriptions so that clusters always get the flag set to false while bridges and regular clients respect the rap flag.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should verify if this is still needed, e.g. if the packet gets stored multiple times when using a cluster.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only happens when a user sends a packet using broker.publish function. BTW I think that it could be safely removed by double checking the code, the 'worst' thing that could happen is some useless write to the db

Copy link
Contributor Author

@phil-mitchell phil-mitchell Feb 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had thought of the _stored idea, but wasn't sure whether it would cause other issues. In any case, it becomes the event listener's responsibility to not retain the packet if it's not supposed to. I guess that can be a breaking change for some custom code, so I understand it's a bit dangerous. But I don't see a good way to support bridge protocol (or, going forward, MQTT v5) without doing this.

As I've said, the tests pass, including some new ones I added, so how can I verify this concern?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I've said, the tests pass, including some new ones I added, so how can I verify this concern?

You should add a test where you do a broker subscribe followed by a broker publish to check if the received message has the retained flag cleared or not

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the test and, indeed, the retain flag was previously set to false and is now true. I've pushed a change with a proposed solution. The publish function now takes the rap parameter and applies the wrapper function that I had previously placed in subscription.js. This way all callers of subscribe have the retain-as-published functionality available to them but default to the standard behaviour of setting retain to false.

this.broker.mq.emit(packet, done)
}

Expand Down
41 changes: 35 additions & 6 deletions lib/handlers/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,23 @@ function SubAck (packet, granted) {
this.granted = granted
}

function Subscription (qos, func) {
function Subscription (qos, func, rh, rap, nl) {
this.qos = qos
this.func = func

// retain-handling indicates how retained messages should be
// handled when a new subscription is created
// (see [MQTT-3.3.1-9] through [MQTT-3.3.1-11])
this.rh = rh

// retain-as-published indicates whether to leave the retain flag as-is (true)
// or to clear it before sending to subscriptions (false) default false
// (see [MQTT-3.3.1-12] through [MQTT-3.3.1-13])
this.rap = rap

// no-local indicates that a client should not receive its own
// messages (see [MQTT-3.8.3-3])
this.nl = nl
}

function SubscribeState (client, packet, restore, finish) {
Expand All @@ -36,10 +50,13 @@ function SubscribeState (client, packet, restore, finish) {
this.subState = []
}

function SubState (client, packet, granted) {
function SubState (client, packet, granted, rh, rap, nl) {
this.client = client
this.packet = packet
this.granted = granted
this.rh = rh
this.rap = rap
this.nl = nl
}

// if same subscribed topic in subs array, we pick up the last one
Expand All @@ -66,7 +83,7 @@ function handleSubscribe (client, packet, restore, done) {
}

function doSubscribe (sub, done) {
const s = new SubState(this.client, this.packet, sub.qos)
const s = new SubState(this.client, this.packet, sub.qos, sub.rh, sub.rap, sub.nl)
this.subState.push(s)
this.actions.call(s, sub, done)
}
Expand Down Expand Up @@ -119,19 +136,31 @@ function addSubs (sub, done) {
const broker = client.broker
const topic = sub.topic
const qos = sub.qos
const rh = this.rh
const rap = this.rap
const nl = this.nl
let func = qos > 0 ? client.deliverQoS : client.deliver0

if (!rap) {
robertsLando marked this conversation as resolved.
Show resolved Hide resolved
const deliverFunc = func
func = function handlePacketSubscription (_packet, cb) {
_packet = new Packet(_packet, broker)
_packet.retain = false
deliverFunc(_packet, cb)
}
}
phil-mitchell marked this conversation as resolved.
Show resolved Hide resolved

// [MQTT-4.7.2-1]
if (isStartsWithWildcard(topic)) {
func = blockDollarSignTopics(func)
}

if (!client.subscriptions[topic]) {
client.subscriptions[topic] = new Subscription(qos, func)
client.subscriptions[topic] = new Subscription(qos, func, rh, rap, nl)
broker.subscribe(topic, func, done)
} else if (client.subscriptions[topic].qos !== qos) {
} else if (client.subscriptions[topic].qos !== qos || client.subscriptions[topic].rh !== rh || client.subscriptions[topic].rap !== rap || client.subscriptions[topic].nl !== nl) {
broker.unsubscribe(topic, client.subscriptions[topic].func)
client.subscriptions[topic] = new Subscription(qos, func)
client.subscriptions[topic] = new Subscription(qos, func, rh, rap, nl)
broker.subscribe(topic, func, done)
} else {
done()
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
"fastseries": "^2.0.0",
"hyperid": "^2.0.5",
"mqemitter": "^4.4.0",
"mqtt-packet": "^6.7.0",
"mqtt-packet": "^6.9.0",
"readable-stream": "^3.6.0",
"retimer": "^2.0.0",
"reusify": "^1.0.4",
Expand Down
83 changes: 83 additions & 0 deletions test/client-pub-sub.js
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,89 @@ test('subscribe a client programmatically', function (t) {
})
})

test('subscribe a client programmatically clears retain', function (t) {
t.plan(3)

const broker = aedes()
t.tearDown(broker.close.bind(broker))

const expected = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
dup: false,
length: 12,
qos: 0,
retain: false
}

broker.on('client', function (client) {
client.subscribe({
topic: 'hello',
qos: 0
}, function (err) {
t.error(err, 'no error')

broker.publish({
topic: 'hello',
payload: Buffer.from('world'),
qos: 0,
retain: true
}, function (err) {
t.error(err, 'no error')
})
})
})

const s = connect(setup(broker))

s.outStream.once('data', function (packet) {
t.deepEqual(packet, expected, 'packet matches')
})
})

test('subscribe a bridge programmatically keeps retain', function (t) {
t.plan(3)

const broker = aedes()
t.tearDown(broker.close.bind(broker))

const expected = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
dup: false,
length: 12,
qos: 0,
retain: true
}

broker.on('client', function (client) {
client.subscribe({
topic: 'hello',
qos: 0,
rap: true
}, function (err) {
t.error(err, 'no error')

broker.publish({
topic: 'hello',
payload: Buffer.from('world'),
qos: 0,
retain: true
}, function (err) {
t.error(err, 'no error')
})
})
})

const s = connect(setup(broker))

s.outStream.once('data', function (packet) {
t.deepEqual(packet, expected, 'packet matches')
})
})

test('subscribe throws error when QoS > 0', function (t) {
t.plan(3)

Expand Down