From 731028bbcbf13fbaf2a589e3bc561511e3e077f4 Mon Sep 17 00:00:00 2001 From: gnought <1684105+gnought@users.noreply.github.com> Date: Tue, 3 Sep 2019 02:02:45 +0800 Subject: [PATCH] Not send retained msg in restored subscriptions --- lib/handlers/connect.js | 2 + lib/handlers/subscribe.js | 39 ++++++++------ test/retain.js | 109 +++++++++++++++++++++++++++++++++----- 3 files changed, 121 insertions(+), 29 deletions(-) diff --git a/lib/handlers/connect.js b/lib/handlers/connect.js index ead7dd3c..ffe242c2 100644 --- a/lib/handlers/connect.js +++ b/lib/handlers/connect.js @@ -211,6 +211,8 @@ function doConnack (arg, done) { }) } +// push any queued messages (included retained messages) at the disconnected time +// when QoS > 0 and session is true function emptyQueue (arg, done) { var client = this.client var persistence = client.broker.persistence diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index 91849da8..781e15b6 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -172,24 +172,29 @@ function completeSubscribe (err) { done() } - var persistence = broker.persistence - var topics = [] - for (var i = 0; i < subs.length; i++) { - topics.push(subs[i].topic) + // Conform to MQTT 3.1.1 section 3.1.2.4 + // Restored sessions should not contain any retained message. + // Retained message should be only fetched from SUBSCRIBE. + if (!packet.restore) { + var persistence = broker.persistence + var topics = [] + for (var i = 0; i < subs.length; i++) { + topics.push(subs[i].topic) + } + var stream = persistence.createRetainedStreamCombi(topics) + stream.pipe(through.obj(function sendRetained (packet, enc, cb) { + packet = new Packet({ + cmd: packet.cmd, + qos: packet.qos, + topic: packet.topic, + payload: packet.payload, + retain: true + }, broker) + // this should not be deduped + packet.brokerId = null + client.deliverQoS(packet, cb) + })) } - var stream = persistence.createRetainedStreamCombi(topics) - stream.pipe(through.obj(function sendRetained (packet, enc, cb) { - packet = new Packet({ - cmd: packet.cmd, - qos: packet.qos, - topic: packet.topic, - payload: packet.payload, - retain: true - }, broker) - // this should not be deduped - packet.brokerId = null - client.deliverQoS(packet, cb) - })) } function nop () {} diff --git a/test/retain.js b/test/retain.js index 9a2d4376..59c8706d 100644 --- a/test/retain.js +++ b/test/retain.js @@ -481,8 +481,8 @@ test('deliver QoS 0 retained message with QoS 1 subscription', function (t) { }) }) -test('not clean and retain messages with QoS 1', function (t) { - t.plan(10) +test('disconnect and retain messages with QoS 1 [clean=false]', function (t) { + t.plan(8) var broker = aedes() var publisher @@ -529,22 +529,107 @@ test('not clean and retain messages with QoS 1', function (t) { }) subscriber.outStream.once('data', function (packet) { + // receive any queued messages (no matter they are retained messages) at the disconnected time t.notEqual(packet.messageId, 42, 'messageId must differ') - var prevId = packet.messageId delete packet.messageId packet.length = 14 t.deepEqual(packet, expected, 'packet must match') - // message is duplicated - subscriber.outStream.once('data', function (packet2) { - var curId = packet2.messageId - t.notOk(curId === prevId, 'messageId must differ') - subscriber.inStream.write({ - cmd: 'puback', - messageId: curId - }) - delete packet2.messageId + // there should be no messages come from restored subscriptions + subscriber.outStream.once('data', function (packet) { + t.fail('should not receive any more messages') + }) + }) + }) + }) + broker.on('closed', t.end.bind(t)) +}) + +test('disconnect and two retain messages with QoS 1 [clean=false]', function (t) { + t.plan(17) + + var broker = aedes() + var publisher + var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) + var expected = { + cmd: 'publish', + topic: 'hello', + qos: 1, + dup: false, + length: 14, + retain: true + } + + subscribe(t, subscriber, 'hello', 1, function () { + subscriber.inStream.write({ + cmd: 'disconnect' + }) + + subscriber.outStream.on('data', function (packet) { + console.log('original', packet) + }) + + publisher = connect(setup(broker)) + + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 41, + retain: true + }) + + publisher.outStream.once('data', function (packet) { + t.equal(packet.cmd, 'puback') + + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world2', + qos: 1, + messageId: 42, + retain: true + }) + + publisher.outStream.once('data', function (packet) { + t.equal(packet.cmd, 'puback') + + broker.on('clientError', function (client, err) { + t.equal(err.message, 'connection closed') + }) + + subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) { + t.equal(connect.sessionPresent, true, 'session present is set to true') + }) + + subscriber.outStream.once('data', function (packet) { + // receive any queued messages (included retained messages) at the disconnected time + t.notEqual(packet.messageId, 41, 'messageId must differ') + delete packet.messageId + packet.length = 14 + expected.payload = Buffer.from('world') t.deepEqual(packet, expected, 'packet must match') + + // receive any queued messages (included retained messages) at the disconnected time + subscriber.outStream.once('data', function (packet) { + t.notEqual(packet.messageId, 42, 'messageId must differ') + delete packet.messageId + packet.length = 14 + expected.payload = Buffer.from('world2') + t.deepEqual(packet, expected, 'packet must match') + + // should get the last retained message when we do a subscribe + subscribe(t, subscriber, 'hello', 1, function () { + subscriber.outStream.on('data', function (packet) { + t.notEqual(packet.messageId, 42, 'messageId must differ') + delete packet.messageId + packet.length = 14 + expected.payload = Buffer.from('world2') + t.deepEqual(packet, expected, 'packet must match') + }) + }) + }) }) }) })