Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not send retained messages when restoring subscriptions for clean=false clients #320

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/handlers/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ function doConnack (arg, done) {
})
}

// push any queued messages (included retained messages) at the disconnected time
// when QoS > 0 and session is true
function emptyQueue (arg, done) {
var client = this.client
var persistence = client.broker.persistence
Expand Down
39 changes: 22 additions & 17 deletions lib/handlers/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,24 +172,29 @@ function completeSubscribe (err) {
done()
}

var persistence = broker.persistence
var topics = []
for (var i = 0; i < subs.length; i++) {
topics.push(subs[i].topic)
// Conform to MQTT 3.1.1 section 3.1.2.4
// Restored sessions should not contain any retained message.
// Retained message should be only fetched from SUBSCRIBE.
if (!packet.restore) {
var persistence = broker.persistence
var topics = []
for (var i = 0; i < subs.length; i++) {
topics.push(subs[i].topic)
}
var stream = persistence.createRetainedStreamCombi(topics)
stream.pipe(through.obj(function sendRetained (packet, enc, cb) {
packet = new Packet({
cmd: packet.cmd,
qos: packet.qos,
topic: packet.topic,
payload: packet.payload,
retain: true
}, broker)
// this should not be deduped
packet.brokerId = null
client.deliverQoS(packet, cb)
}))
}
var stream = persistence.createRetainedStreamCombi(topics)
stream.pipe(through.obj(function sendRetained (packet, enc, cb) {
packet = new Packet({
cmd: packet.cmd,
qos: packet.qos,
topic: packet.topic,
payload: packet.payload,
retain: true
}, broker)
// this should not be deduped
packet.brokerId = null
client.deliverQoS(packet, cb)
}))
}

function nop () {}
Expand Down
109 changes: 97 additions & 12 deletions test/retain.js
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ test('deliver QoS 0 retained message with QoS 1 subscription', function (t) {
})
})

test('not clean and retain messages with QoS 1', function (t) {
t.plan(10)
test('disconnect and retain messages with QoS 1 [clean=false]', function (t) {
t.plan(8)

var broker = aedes()
var publisher
Expand Down Expand Up @@ -529,22 +529,107 @@ test('not clean and retain messages with QoS 1', function (t) {
})

subscriber.outStream.once('data', function (packet) {
// receive any queued messages (no matter they are retained messages) at the disconnected time
t.notEqual(packet.messageId, 42, 'messageId must differ')
var prevId = packet.messageId
delete packet.messageId
packet.length = 14
t.deepEqual(packet, expected, 'packet must match')

// message is duplicated
subscriber.outStream.once('data', function (packet2) {
var curId = packet2.messageId
t.notOk(curId === prevId, 'messageId must differ')
subscriber.inStream.write({
cmd: 'puback',
messageId: curId
})
delete packet2.messageId
// there should be no messages come from restored subscriptions
subscriber.outStream.once('data', function (packet) {
t.fail('should not receive any more messages')
})
})
})
})
broker.on('closed', t.end.bind(t))
})

test('disconnect and two retain messages with QoS 1 [clean=false]', function (t) {
t.plan(17)

var broker = aedes()
var publisher
var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' })
var expected = {
cmd: 'publish',
topic: 'hello',
qos: 1,
dup: false,
length: 14,
retain: true
}

subscribe(t, subscriber, 'hello', 1, function () {
subscriber.inStream.write({
cmd: 'disconnect'
})

subscriber.outStream.on('data', function (packet) {
console.log('original', packet)
})

publisher = connect(setup(broker))

publisher.inStream.write({
cmd: 'publish',
topic: 'hello',
payload: 'world',
qos: 1,
messageId: 41,
retain: true
})

publisher.outStream.once('data', function (packet) {
t.equal(packet.cmd, 'puback')

publisher.inStream.write({
cmd: 'publish',
topic: 'hello',
payload: 'world2',
qos: 1,
messageId: 42,
retain: true
})

publisher.outStream.once('data', function (packet) {
t.equal(packet.cmd, 'puback')

broker.on('clientError', function (client, err) {
t.equal(err.message, 'connection closed')
})

subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) {
t.equal(connect.sessionPresent, true, 'session present is set to true')
})

subscriber.outStream.once('data', function (packet) {
// receive any queued messages (included retained messages) at the disconnected time
t.notEqual(packet.messageId, 41, 'messageId must differ')
delete packet.messageId
packet.length = 14
expected.payload = Buffer.from('world')
t.deepEqual(packet, expected, 'packet must match')

// receive any queued messages (included retained messages) at the disconnected time
subscriber.outStream.once('data', function (packet) {
t.notEqual(packet.messageId, 42, 'messageId must differ')
delete packet.messageId
packet.length = 14
expected.payload = Buffer.from('world2')
t.deepEqual(packet, expected, 'packet must match')

// should get the last retained message when we do a subscribe
subscribe(t, subscriber, 'hello', 1, function () {
subscriber.outStream.on('data', function (packet) {
t.notEqual(packet.messageId, 42, 'messageId must differ')
delete packet.messageId
packet.length = 14
expected.payload = Buffer.from('world2')
t.deepEqual(packet, expected, 'packet must match')
})
})
})
})
})
})
Expand Down