diff --git a/lib/handlers/connect.js b/lib/handlers/connect.js index 0f1a9723..14865f36 100644 --- a/lib/handlers/connect.js +++ b/lib/handlers/connect.js @@ -232,7 +232,11 @@ function emptyQueue (arg, done) { outgoing, through(function clearQueue (data, enc, next) { const packet = new QoSPacket(data, client) - packet.writeCallback = next + // Here we are deliberatly passing only the error + // This is because there is no destination stream so the "client" + // Object filled the buffer up to the highWaterMark preventing stored messages + // being sent + packet.writeCallback = (error, _client) => next(error) persistence.outgoingUpdate(client, packet, emptyQueueFilter) }), done diff --git a/test/qos1.js b/test/qos1.js index 8963701a..baac1dce 100644 --- a/test/qos1.js +++ b/test/qos1.js @@ -509,6 +509,71 @@ test('resend publish on non-clean reconnect QoS 1', function (t) { }) }) +test('resend many publish on non-clean reconnect QoS 1', function (t) { + t.plan(38) + + const broker = aedes() + t.tearDown(broker.close.bind(broker)) + + const opts = { clean: false, clientId: 'abcde' } + let subscriber = connect(setup(broker), opts) + + subscribe(t, subscriber, 'hello', 1, function () { + subscriber.inStream.end() + + const publisher = connect(setup(broker)) + + let count = 0 + while (++count <= 17) { + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'message-' + count, + qos: 1, + messageId: 42 + count + }) + } + publisher.outStream.once('data', function (packet) { + t.equal(packet.cmd, 'puback') + + subscriber = connect(setup(broker), opts) + + const expected = [ + [43, 'message-1'], + [44, 'message-2'], + [45, 'message-3'], + [46, 'message-4'], + [47, 'message-5'], + [48, 'message-6'], + [49, 'message-7'], + [50, 'message-8'], + [51, 'message-9'], + [52, 'message-10'], + [53, 'message-11'], + [54, 'message-12'], + [55, 'message-13'], + [56, 'message-14'], + [57, 'message-15'], + [58, 'message-16'], + [59, 'message-17'] + ] + + let recievedCount = 0 + subscriber.outStream.on('data', function (packet) { + subscriber.inStream.write({ + cmd: 'puback', + messageId: packet.messageId + }) + + const [messageId, payload] = expected[recievedCount++] + + t.notEqual(packet.messageId, messageId, 'messageId should not match') + t.deepEqual(packet.payload, Buffer.from(payload), 'payload should match') + }) + }) + }) +}) + test('do not resend QoS 1 packets at each reconnect', function (t) { t.plan(6)