From f82b8f87d72b2c3f6d016e70b0910bf0a31847ce Mon Sep 17 00:00:00 2001 From: Oldrich Svec Date: Thu, 12 May 2022 16:07:51 +0200 Subject: [PATCH] feat: nolocal flag support for bridges (#737) Co-authored-by: Daniel Lando --- aedes.js | 1 + lib/client.js | 17 +++++++++--- lib/handlers/subscribe.js | 13 +++++---- package.json | 4 +-- test/auth.js | 38 +++++++++++++++++++------- test/basic.js | 12 ++++++--- test/bridge.js | 57 +++++++++++++++++++++++++++++++++++++++ test/client-pub-sub.js | 12 +++++---- test/qos2.js | 6 +++-- 9 files changed, 127 insertions(+), 33 deletions(-) create mode 100644 test/bridge.js diff --git a/aedes.js b/aedes.js index 75d5914d..e7de15fa 100644 --- a/aedes.js +++ b/aedes.js @@ -170,6 +170,7 @@ function storeRetained (packet, done) { } function emitPacket (packet, done) { + if (this.client) packet.clientId = this.client.id this.broker.mq.emit(packet, done) } diff --git a/lib/client.js b/lib/client.js index 2a6221dd..414d8e5f 100644 --- a/lib/client.js +++ b/lib/client.js @@ -89,9 +89,20 @@ function Client (broker, conn, req) { conn.on('end', this.close.bind(this)) this._eos = eos(this.conn, this.close.bind(this)) - this.deliver0 = function deliverQoS0 (_packet, cb) { + const getToForwardPacket = (_packet) => { + // Mqttv5 3.8.3.1: https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html#_Toc3901169 + // prevent to forward messages sent by the same client when no-local flag is set + if (_packet.clientId === that.id && _packet.nl) return + const toForward = dedupe(that, _packet) && that.broker.authorizeForward(that, _packet) + + return toForward + } + + this.deliver0 = function deliverQoS0 (_packet, cb) { + const toForward = getToForwardPacket(_packet) + if (toForward) { // Give nodejs some time to clear stacks, or we will see // "Maximum call stack size exceeded" in a very high load @@ -114,8 +125,8 @@ function Client (broker, conn, req) { that.deliver0(_packet, cb) return } - const toForward = dedupe(that, _packet) && - that.broker.authorizeForward(that, _packet) + const toForward = getToForwardPacket(_packet) + if (toForward) { setImmediate(() => { const packet = new QoSPacket(toForward, that) diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index e1e25d67..f93fb9d4 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -142,13 +142,12 @@ function addSubs (sub, done) { const nl = this.nl let func = qos > 0 ? client.deliverQoS : client.deliver0 - if (!rap) { - const deliverFunc = func - func = function handlePacketSubscription (_packet, cb) { - _packet = new Packet(_packet, broker) - _packet.retain = false - deliverFunc(_packet, cb) - } + const deliverFunc = func + func = function handlePacketSubscription (_packet, cb) { + _packet = new Packet(_packet, broker) + _packet.nl = nl + if (!rap) _packet.retain = false + deliverFunc(_packet, cb) } // [MQTT-4.7.2-1] diff --git a/package.json b/package.json index 19ebfdb2..612f7ebd 100644 --- a/package.json +++ b/package.json @@ -118,8 +118,8 @@ "websocket-stream": "^5.5.2" }, "dependencies": { - "aedes-packet": "^2.3.1", - "aedes-persistence": "^9.0.1", + "aedes-packet": "^3.0.0", + "aedes-persistence": "^9.1.1", "end-of-stream": "^1.4.4", "fastfall": "^1.5.1", "fastparallel": "^2.4.1", diff --git a/test/auth.js b/test/auth.js index 2c57a73a..248657c5 100644 --- a/test/auth.js +++ b/test/auth.js @@ -422,7 +422,7 @@ test('authentication error when non numeric return code is passed', function (t) test('authorize publish', function (t) { t.plan(4) - const s = connect(setup()) + const s = connect(setup(), { clientId: 'my-client-xyz' }) t.teardown(s.broker.close.bind(s.broker)) const expected = { @@ -445,6 +445,7 @@ test('authorize publish', function (t) { t.notOk(Object.prototype.hasOwnProperty.call(packet, 'messageId'), 'should not contain messageId in QoS 0') expected.brokerId = s.broker.id expected.brokerCounter = s.broker.counter + expected.clientId = 'my-client-xyz' delete expected.length t.same(packet, expected, 'packet matches') cb() @@ -460,7 +461,7 @@ test('authorize publish', function (t) { test('authorize waits for authenticate', function (t) { t.plan(6) - const s = setup() + const s = setup(aedes({ clientId: 'my-client-xyz-2' })) t.teardown(s.broker.close.bind(s.broker)) s.broker.authenticate = function (client, username, password, cb) { @@ -485,7 +486,8 @@ test('authorize waits for authenticate', function (t) { qos: 0, retain: false, length: 12, - dup: false + dup: false, + clientId: 'my-client' } s.broker.mq.on('hello', function (packet, cb) { @@ -519,12 +521,13 @@ test('authorize publish from configOptions', function (t) { t.plan(4) const s = connect(setup(aedes({ + clientId: 'my-client-xyz-3', authorizePublish: function (client, packet, cb) { t.ok(client, 'client exists') t.same(packet, expected, 'packet matches') cb() } - }))) + })), { clientId: 'my-client-xyz-3' }) t.teardown(s.broker.close.bind(s.broker)) const expected = { @@ -541,6 +544,7 @@ test('authorize publish from configOptions', function (t) { t.notOk(Object.prototype.hasOwnProperty.call(packet, 'messageId'), 'should not contain messageId in QoS 0') expected.brokerId = s.broker.id expected.brokerCounter = s.broker.counter + expected.clientId = 'my-client-xyz-3' delete expected.length t.same(packet, expected, 'packet matches') cb() @@ -589,7 +593,7 @@ test('do not authorize publish', function (t) { test('modify qos out of range in authorize publish ', function (t) { t.plan(2) - const s = connect(setup()) + const s = connect(setup(), { clientId: 'my-client-xyz-4' }) t.teardown(s.broker.close.bind(s.broker)) const expected = { @@ -599,7 +603,8 @@ test('modify qos out of range in authorize publish ', function (t) { qos: 0, retain: false, length: 12, - dup: false + dup: false, + clientId: 'my-client-xyz-4' } s.broker.authorizePublish = function (client, packet, cb) { @@ -805,12 +810,19 @@ test('negate multiple subscriptions', function (t) { test('negate subscription with correct persistence', function (t) { t.plan(6) + // rh, rap, nl are undefined because mqtt.parser is set to MQTT 3.1.1 and will thus erase these props from s.inStream.write const expected = [{ topic: 'hello', - qos: 0 + qos: 0, + rh: undefined, + rap: undefined, + nl: undefined }, { topic: 'world', - qos: 0 + qos: 0, + rh: undefined, + rap: undefined, + nl: undefined }] const broker = aedes() @@ -839,10 +851,16 @@ test('negate subscription with correct persistence', function (t) { messageId: 24, subscriptions: [{ topic: 'hello', - qos: 0 + qos: 0, + rh: 0, + rap: true, + nl: false }, { topic: 'world', - qos: 0 + qos: 0, + rh: 0, + rap: true, + nl: false }] }) }) diff --git a/test/basic.js b/test/basic.js index 81130a3e..1638847d 100644 --- a/test/basic.js +++ b/test/basic.js @@ -20,7 +20,7 @@ test('test aedes.Server', function (t) { test('publish QoS 0', function (t) { t.plan(2) - const s = connect(setup()) + const s = connect(setup(), { clientId: 'my-client-xyz-5' }) t.teardown(s.broker.close.bind(s.broker)) const expected = { @@ -29,7 +29,8 @@ test('publish QoS 0', function (t) { payload: Buffer.from('world'), qos: 0, retain: false, - dup: false + dup: false, + clientId: 'my-client-xyz-5' } s.broker.mq.on('hello', function (packet, cb) { @@ -128,7 +129,7 @@ test('publish to $SYS topic throws error', function (t) { qos: 0, retain: false } - const expectedSubs = ele.clean ? null : [{ topic: 'hello', qos: ele.qos }] + const expectedSubs = ele.clean ? null : [{ topic: 'hello', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined }] subscribe(t, s, 'hello', ele.qos, function () { s.outStream.once('data', function (packet) { @@ -188,7 +189,10 @@ test('return write errors to callback', function (t) { qos: 0, retain: false } - const subs = [{ topic: 'hello', qos: ele.qos }, { topic: 'world', qos: ele.qos }] + const subs = [ + { topic: 'hello', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined }, + { topic: 'world', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined } + ] const expectedSubs = ele.clean ? null : subs subscribeMultiple(t, s, subs, [ele.qos, ele.qos], function () { diff --git a/test/bridge.js b/test/bridge.js new file mode 100644 index 00000000..7927937e --- /dev/null +++ b/test/bridge.js @@ -0,0 +1,57 @@ +'use strict' + +const { test } = require('tap') +const { setup, connect, subscribe } = require('./helper') + +for (const qos of [0, 1, 2]) { + const packet = { + qos, + cmd: 'publish', + topic: 'hello', + payload: 'world' + } + + if (qos > 0) packet.messageId = 42 + + test('normal client sends a publish message and shall receive it back, qos = ' + qos, function (t) { + const s = connect(setup()) + t.teardown(s.broker.close.bind(s.broker)) + + const handle = setTimeout(() => { + t.fail('did not receive packet back') + t.end() + }, 1000) + + subscribe(t, s, 'hello', qos, function () { + s.outStream.on('data', (packet) => { + if (packet.cmd === 'publish') { + clearTimeout(handle) + t.end() + } else if (packet.cmd === 'pubrec') { + s.inStream.write({ cmd: 'pubrel', messageId: 42 }) + } + }) + + s.inStream.write(packet) + }) + }) + + test('bridge client sends a publish message but shall not receive it back, qos = ' + qos, function (t) { + // protocolVersion 128 + 4 means mqtt 3.1.1 with bridgeMode enabled + // https://github.com/mqttjs/mqtt-packet/blob/7f7c2ed8bcb4b2c582851d120a94e0b4a731f661/parser.js#L171 + const s = connect(setup(), { clientId: 'my-client-bridge-1', protocolVersion: 128 + 4 }) + t.teardown(s.broker.close.bind(s.broker)) + + const handle = setTimeout(() => t.end(), 1000) + + subscribe(t, s, 'hello', qos, function () { + s.outStream.on('data', function () { + clearTimeout(handle) + t.fail('should not receive packet back') + t.end() + }) + + s.inStream.write(packet) + }) + }) +} diff --git a/test/client-pub-sub.js b/test/client-pub-sub.js index 127ccd2b..9f829325 100644 --- a/test/client-pub-sub.js +++ b/test/client-pub-sub.js @@ -905,10 +905,10 @@ test('should not receive a message on negated subscription', function (t) { test('programmatically add custom subscribe', function (t) { t.plan(6) - const broker = aedes() + const broker = aedes({ clientId: 'my-client-xyz-7' }) t.teardown(broker.close.bind(broker)) - const s = connect(setup(broker)) + const s = connect(setup(broker), { clientId: 'my-client-xyz-7' }) const expected = { cmd: 'publish', topic: 'hello', @@ -924,7 +924,8 @@ test('programmatically add custom subscribe', function (t) { payload: Buffer.from('world'), qos: 0, retain: false, - dup: false + dup: false, + clientId: 'my-client-xyz-7' } subscribe(t, s, 'hello', 0, function () { broker.subscribe('hello', deliver, function () { @@ -963,9 +964,10 @@ test('custom function in broker.subscribe', function (t) { qos: 1, retain: false, dup: false, - messageId: undefined + messageId: undefined, + clientId: 'my-client-xyz-6' } - connect(s, {}, function () { + connect(s, { clientId: 'my-client-xyz-6' }, function () { broker.subscribe('hello', deliver, function () { t.pass('subscribed') }) diff --git a/test/qos2.js b/test/qos2.js index e87ac093..00b028ae 100644 --- a/test/qos2.js +++ b/test/qos2.js @@ -239,7 +239,7 @@ test('call published method with client with QoS 2', function (t) { t.teardown(broker.close.bind(broker)) const opts = { clean: cleanSession } - const publisher = connect(setup(broker)) + const publisher = connect(setup(broker), { clientId: 'my-client-xyz-8' }) const subscriber = connect(setup(broker), { ...opts, clientId: 'abcde' }) const forwarded = { cmd: 'publish', @@ -248,7 +248,8 @@ test('call published method with client with QoS 2', function (t) { qos: 2, retain: false, dup: false, - messageId: undefined + messageId: undefined, + clientId: 'my-client-xyz-8' } const expected = { cmd: 'publish', @@ -262,6 +263,7 @@ test('call published method with client with QoS 2', function (t) { broker.authorizeForward = function (client, packet) { forwarded.brokerId = broker.id forwarded.brokerCounter = broker.counter + delete packet.nl t.same(packet, forwarded, 'forwarded packet must match') return packet }