Skip to content

Commit

Permalink
fix: emptyQueue hitting its high water mark (#583)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sebastian Good authored Jan 30, 2021
1 parent 3ca8a49 commit 9b117a0
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
6 changes: 5 additions & 1 deletion lib/handlers/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,11 @@ function emptyQueue (arg, done) {
outgoing,
through(function clearQueue (data, enc, next) {
const packet = new QoSPacket(data, client)
packet.writeCallback = next
// Here we are deliberatly passing only the error
// This is because there is no destination stream so the "client"
// Object filled the buffer up to the highWaterMark preventing stored messages
// being sent
packet.writeCallback = (error, _client) => next(error)
persistence.outgoingUpdate(client, packet, emptyQueueFilter)
}),
done
Expand Down
65 changes: 65 additions & 0 deletions test/qos1.js
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,71 @@ test('resend publish on non-clean reconnect QoS 1', function (t) {
})
})

test('resend many publish on non-clean reconnect QoS 1', function (t) {
t.plan(38)

const broker = aedes()
t.tearDown(broker.close.bind(broker))

const opts = { clean: false, clientId: 'abcde' }
let subscriber = connect(setup(broker), opts)

subscribe(t, subscriber, 'hello', 1, function () {
subscriber.inStream.end()

const publisher = connect(setup(broker))

let count = 0
while (++count <= 17) {
publisher.inStream.write({
cmd: 'publish',
topic: 'hello',
payload: 'message-' + count,
qos: 1,
messageId: 42 + count
})
}
publisher.outStream.once('data', function (packet) {
t.equal(packet.cmd, 'puback')

subscriber = connect(setup(broker), opts)

const expected = [
[43, 'message-1'],
[44, 'message-2'],
[45, 'message-3'],
[46, 'message-4'],
[47, 'message-5'],
[48, 'message-6'],
[49, 'message-7'],
[50, 'message-8'],
[51, 'message-9'],
[52, 'message-10'],
[53, 'message-11'],
[54, 'message-12'],
[55, 'message-13'],
[56, 'message-14'],
[57, 'message-15'],
[58, 'message-16'],
[59, 'message-17']
]

let recievedCount = 0
subscriber.outStream.on('data', function (packet) {
subscriber.inStream.write({
cmd: 'puback',
messageId: packet.messageId
})

const [messageId, payload] = expected[recievedCount++]

t.notEqual(packet.messageId, messageId, 'messageId should not match')
t.deepEqual(packet.payload, Buffer.from(payload), 'payload should match')
})
})
})
})

test('do not resend QoS 1 packets at each reconnect', function (t) {
t.plan(6)

Expand Down

0 comments on commit 9b117a0

Please sign in to comment.