diff --git a/README.md b/README.md index a597b261..966559dc 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,7 @@ Check Docker docs [here](https://github.com/moscajs/aedes-cli#docker) - [Dynamic Topics][dynamic_topics] Support - MQTT Bridge Support between aedes - [MQTT 5.0][mqttv5] _(not support yet)_ -- [Bridge Protocol][bridge_protocol] _(not support yet)_ +- [Bridge Protocol][bridge_protocol] _(incoming connections only)_ ## Examples @@ -95,6 +95,20 @@ Other info: - The repo [aedes-tests](https://github.com/moscajs/aedes-tests) is used to test aedes with clusters and different emitters/persistences. Check its source code to have a starting point on how to work with clusters +## Bridge connections + +Normally, when publishing a message, the `retain` flag is consumed by Aedes and +then set to `false`. This is done for two reasons: + +- MQTT-3.3.1-9 states that it MUST set the RETAIN flag to 0 when a PUBLISH + Packet is sent to a Client because it matches an established subscription + regardless of how the flag was set in the message it received. +- When operating as a cluster, only one Aedes node may store the packet + +Brokers that support the [Bridge Protocol][bridge_protocol] can connect to +Aedes. When connecting with this special protocol, subscriptions work as usual +except that the `retain` flag in the packet is propagated as-is. + ## Exensions - [aedes-logging]: Logging module for Aedes, based on Pino diff --git a/aedes.js b/aedes.js index 1c48dea8..36fd056a 100644 --- a/aedes.js +++ b/aedes.js @@ -170,7 +170,6 @@ function storeRetained (packet, done) { } function emitPacket (packet, done) { - packet.retain = false this.broker.mq.emit(packet, done) } diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index b976e3c9..8c4685b6 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -23,9 +23,23 @@ function SubAck (packet, granted) { this.granted = granted } -function Subscription (qos, func) { +function Subscription (qos, func, rh, rap, nl) { this.qos = qos this.func = func + + // retain-handling indicates how retained messages should be + // handled when a new subscription is created + // (see [MQTT-3.3.1-9] through [MQTT-3.3.1-11]) + this.rh = rh + + // retain-as-published indicates whether to leave the retain flag as-is (true) + // or to clear it before sending to subscriptions (false) default false + // (see [MQTT-3.3.1-12] through [MQTT-3.3.1-13]) + this.rap = rap + + // no-local indicates that a client should not receive its own + // messages (see [MQTT-3.8.3-3]) + this.nl = nl } function SubscribeState (client, packet, restore, finish) { @@ -36,10 +50,13 @@ function SubscribeState (client, packet, restore, finish) { this.subState = [] } -function SubState (client, packet, granted) { +function SubState (client, packet, granted, rh, rap, nl) { this.client = client this.packet = packet this.granted = granted + this.rh = rh + this.rap = rap + this.nl = nl } // if same subscribed topic in subs array, we pick up the last one @@ -66,7 +83,7 @@ function handleSubscribe (client, packet, restore, done) { } function doSubscribe (sub, done) { - const s = new SubState(this.client, this.packet, sub.qos) + const s = new SubState(this.client, this.packet, sub.qos, sub.rh, sub.rap, sub.nl) this.subState.push(s) this.actions.call(s, sub, done) } @@ -119,19 +136,31 @@ function addSubs (sub, done) { const broker = client.broker const topic = sub.topic const qos = sub.qos + const rh = this.rh + const rap = this.rap + const nl = this.nl let func = qos > 0 ? client.deliverQoS : client.deliver0 + if (!rap) { + const deliverFunc = func + func = function handlePacketSubscription (_packet, cb) { + _packet = new Packet(_packet, broker) + _packet.retain = false + deliverFunc(_packet, cb) + } + } + // [MQTT-4.7.2-1] if (isStartsWithWildcard(topic)) { func = blockDollarSignTopics(func) } if (!client.subscriptions[topic]) { - client.subscriptions[topic] = new Subscription(qos, func) + client.subscriptions[topic] = new Subscription(qos, func, rh, rap, nl) broker.subscribe(topic, func, done) - } else if (client.subscriptions[topic].qos !== qos) { + } else if (client.subscriptions[topic].qos !== qos || client.subscriptions[topic].rh !== rh || client.subscriptions[topic].rap !== rap || client.subscriptions[topic].nl !== nl) { broker.unsubscribe(topic, client.subscriptions[topic].func) - client.subscriptions[topic] = new Subscription(qos, func) + client.subscriptions[topic] = new Subscription(qos, func, rh, rap, nl) broker.subscribe(topic, func, done) } else { done() diff --git a/package.json b/package.json index 03ac9ad9..868c9b9e 100644 --- a/package.json +++ b/package.json @@ -40,8 +40,8 @@ "test" ], "tsd": { - "directory": "test/types" - }, + "directory": "test/types" + }, "repository": { "type": "git", "url": "https://github.com/moscajs/aedes.git" @@ -121,7 +121,7 @@ "fastseries": "^2.0.0", "hyperid": "^2.0.5", "mqemitter": "^4.4.0", - "mqtt-packet": "^6.7.0", + "mqtt-packet": "^6.9.0", "readable-stream": "^3.6.0", "retimer": "^3.0.0", "reusify": "^1.0.4", diff --git a/test/client-pub-sub.js b/test/client-pub-sub.js index 099998d2..50517c0c 100644 --- a/test/client-pub-sub.js +++ b/test/client-pub-sub.js @@ -451,6 +451,89 @@ test('subscribe a client programmatically', function (t) { }) }) +test('subscribe a client programmatically clears retain', function (t) { + t.plan(3) + + const broker = aedes() + t.tearDown(broker.close.bind(broker)) + + const expected = { + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + dup: false, + length: 12, + qos: 0, + retain: false + } + + broker.on('client', function (client) { + client.subscribe({ + topic: 'hello', + qos: 0 + }, function (err) { + t.error(err, 'no error') + + broker.publish({ + topic: 'hello', + payload: Buffer.from('world'), + qos: 0, + retain: true + }, function (err) { + t.error(err, 'no error') + }) + }) + }) + + const s = connect(setup(broker)) + + s.outStream.once('data', function (packet) { + t.deepEqual(packet, expected, 'packet matches') + }) +}) + +test('subscribe a bridge programmatically keeps retain', function (t) { + t.plan(3) + + const broker = aedes() + t.tearDown(broker.close.bind(broker)) + + const expected = { + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + dup: false, + length: 12, + qos: 0, + retain: true + } + + broker.on('client', function (client) { + client.subscribe({ + topic: 'hello', + qos: 0, + rap: true + }, function (err) { + t.error(err, 'no error') + + broker.publish({ + topic: 'hello', + payload: Buffer.from('world'), + qos: 0, + retain: true + }, function (err) { + t.error(err, 'no error') + }) + }) + }) + + const s = connect(setup(broker)) + + s.outStream.once('data', function (packet) { + t.deepEqual(packet, expected, 'packet matches') + }) +}) + test('subscribe throws error when QoS > 0', function (t) { t.plan(3) diff --git a/test/retain.js b/test/retain.js index 55c63bcb..4cce0fb2 100644 --- a/test/retain.js +++ b/test/retain.js @@ -76,6 +76,36 @@ test('retain messages', function (t) { publisher.inStream.write(expected) }) +test('retain messages propagates through broker subscriptions', function (t) { + t.plan(1) + + const broker = aedes() + t.tearDown(broker.close.bind(broker)) + + const expected = { + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 0, + dup: false, + retain: true + } + + const subscriberFunc = function (packet, cb) { + packet = Object.assign({}, packet) + delete packet.brokerId + delete packet.brokerCounter + cb() + setImmediate(function () { + t.deepEqual(packet, expected, 'packet must not have been modified') + }) + } + + broker.subscribe('hello', subscriberFunc, function () { + broker.publish(expected) + }) +}) + test('avoid wrong deduping of retain messages', function (t) { t.plan(7)