From 805d416b80e2bd2eac317f5f72e9c3c5f5daff94 Mon Sep 17 00:00:00 2001 From: gnought <1684105+gnought@users.noreply.github.com> Date: Fri, 24 Jan 2020 16:58:32 +0800 Subject: [PATCH] Make tests that after connected/clientReady event. We assumed that client connection is ready when writing mqtt packets, we strengthen tests not to have race conditions. --- test/auth.js | 152 ++++--- test/basic.js | 202 +++++---- test/close_socket_by_other_party.js | 32 +- test/events.js | 79 ++-- test/helper.js | 2 +- test/keep-alive.js | 21 +- test/meta.js | 78 ++-- test/qos1.js | 637 +++++++++++++++------------- test/qos2.js | 252 ++++++----- test/regr-21.js | 4 +- test/retain.js | 621 +++++++++++++++------------ test/topics.js | 150 ++++--- 12 files changed, 1233 insertions(+), 997 deletions(-) diff --git a/test/auth.js b/test/auth.js index efb8f353..481bf077 100644 --- a/test/auth.js +++ b/test/auth.js @@ -339,10 +339,12 @@ test('authorize publish', function (t) { cb() }) - s.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world' + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world' + }) }) }) @@ -397,10 +399,12 @@ test('authorize waits for authenticate', function (t) { keepalive: 0 }) - s.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world' + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world' + }) }) }) @@ -434,11 +438,12 @@ test('authorize publish from configOptions', function (t) { t.deepEqual(packet, expected, 'packet matches') cb() }) - - s.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world' + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world' + }) }) }) @@ -466,10 +471,12 @@ test('do not authorize publish', function (t) { t.pass('ended') }) - s.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world' + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world' + }) }) }) @@ -486,8 +493,9 @@ test('authorize subscribe', function (t) { }, 'topic matches') cb(null, sub) } - - subscribe(t, s, 'hello', 0) + s.broker.once('clientReady', () => { + subscribe(t, s, 'hello', 0) + }) }) test('authorize subscribe multiple same topics with same qos', function (t) { @@ -502,8 +510,9 @@ test('authorize subscribe multiple same topics with same qos', function (t) { }, 'topic matches') cb(null, sub) } - - subscribeMultiple(t, s, [{ topic: 'hello', qos: 0 }, { topic: 'hello', qos: 0 }], [0]) + s.broker.once('clientReady', () => { + subscribeMultiple(t, s, [{ topic: 'hello', qos: 0 }, { topic: 'hello', qos: 0 }], [0]) + }) }) test('authorize subscribe multiple same topics with different qos', function (t) { @@ -518,8 +527,9 @@ test('authorize subscribe multiple same topics with different qos', function (t) }, 'topic matches') cb(null, sub) } - - subscribeMultiple(t, s, [{ topic: 'hello', qos: 0 }, { topic: 'hello', qos: 1 }], [1]) + s.broker.once('clientReady', () => { + subscribeMultiple(t, s, [{ topic: 'hello', qos: 0 }, { topic: 'hello', qos: 1 }], [1]) + }) }) test('authorize subscribe multiple different topics', function (t) { @@ -542,8 +552,9 @@ test('authorize subscribe multiple different topics', function (t) { } cb(null, sub) } - - subscribeMultiple(t, s, [{ topic: 'hello', qos: 0 }, { topic: 'foo', qos: 0 }], [0, 0]) + s.broker.once('clientReady', () => { + subscribeMultiple(t, s, [{ topic: 'hello', qos: 0 }, { topic: 'foo', qos: 0 }], [0, 0]) + }) }) test('authorize subscribe from config options', function (t) { @@ -559,8 +570,9 @@ test('authorize subscribe from config options', function (t) { cb(null, sub) } }))) - - subscribe(t, s, 'hello', 0) + s.broker.once('clientReady', () => { + subscribe(t, s, 'hello', 0) + }) }) test('negate subscription', function (t) { @@ -576,14 +588,15 @@ test('negate subscription', function (t) { }, 'topic matches') cb(null, null) } - - s.inStream.write({ - cmd: 'subscribe', - messageId: 24, - subscriptions: [{ - topic: 'hello', - qos: 0 - }] + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'subscribe', + messageId: 24, + subscriptions: [{ + topic: 'hello', + qos: 0 + }] + }) }) s.outStream.once('data', function (packet) { @@ -602,17 +615,18 @@ test('negate multiple subscriptions', function (t) { t.ok(client, 'client exists') cb(null, null) } - - s.inStream.write({ - cmd: 'subscribe', - messageId: 24, - subscriptions: [{ - topic: 'hello', - qos: 0 - }, { - topic: 'world', - qos: 0 - }] + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'subscribe', + messageId: 24, + subscriptions: [{ + topic: 'hello', + qos: 0 + }, { + topic: 'world', + qos: 0 + }] + }) }) s.outStream.once('data', function (packet) { @@ -651,17 +665,18 @@ test('negate subscription with correct persistence', function (t) { }) t.equal(packet.messageId, 24) }) - - s.inStream.write({ - cmd: 'subscribe', - messageId: 24, - subscriptions: [{ - topic: 'hello', - qos: 0 - }, { - topic: 'world', - qos: 0 - }] + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'subscribe', + messageId: 24, + subscriptions: [{ + topic: 'hello', + qos: 0 + }, { + topic: 'world', + qos: 0 + }] + }) }) }) @@ -680,17 +695,18 @@ test('negate multiple subscriptions random times', function (t) { cb(null, null) } } - - s.inStream.write({ - cmd: 'subscribe', - messageId: 24, - subscriptions: [{ - topic: 'hello', - qos: 0 - }, { - topic: 'world', - qos: 0 - }] + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'subscribe', + messageId: 24, + subscriptions: [{ + topic: 'hello', + qos: 0 + }, { + topic: 'world', + qos: 0 + }] + }) }) s.outStream.once('data', function (packet) { diff --git a/test/basic.js b/test/basic.js index 0f48e50f..97328399 100644 --- a/test/basic.js +++ b/test/basic.js @@ -29,11 +29,12 @@ test('publish QoS 0', function (t) { cb() t.end() }) - - s.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world' + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world' + }) }) }) @@ -50,17 +51,18 @@ test('subscribe QoS 0', function (t) { qos: 0, retain: false } + s.broker.once('clientReady', () => { + subscribe(t, s, 'hello', 0, function () { + s.outStream.once('data', function (packet) { + t.deepEqual(packet, expected, 'packet matches') + t.end() + }) - subscribe(t, s, 'hello', 0, function () { - s.outStream.once('data', function (packet) { - t.deepEqual(packet, expected, 'packet matches') - t.end() - }) - - s.broker.publish({ - cmd: 'publish', - topic: 'hello', - payload: 'world' + s.broker.publish({ + cmd: 'publish', + topic: 'hello', + payload: 'world' + }) }) }) }) @@ -70,13 +72,15 @@ test('does not die badly on connection error', function (t) { var s = connect(setup()) - s.inStream.write({ - cmd: 'subscribe', - messageId: 42, - subscriptions: [{ - topic: 'hello', - qos: 0 - }] + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'subscribe', + messageId: 42, + subscriptions: [{ + topic: 'hello', + qos: 0 + }] + }) }) s.broker.on('clientError', function (client, err) { @@ -101,33 +105,35 @@ test('unsubscribe', function (t) { var s = noError(connect(setup()), t) - subscribe(t, s, 'hello', 0, function () { - s.inStream.write({ - cmd: 'unsubscribe', - messageId: 43, - unsubscriptions: ['hello'] - }) - - s.outStream.once('data', function (packet) { - t.deepEqual(packet, { - cmd: 'unsuback', + s.broker.once('clientReady', () => { + subscribe(t, s, 'hello', 0, function () { + s.inStream.write({ + cmd: 'unsubscribe', messageId: 43, - dup: false, - length: 2, - qos: 0, - retain: false - }, 'packet matches') - - s.outStream.on('data', function (packet) { - t.fail('packet received') + unsubscriptions: ['hello'] }) - s.broker.publish({ - cmd: 'publish', - topic: 'hello', - payload: 'world' - }, function () { - t.pass('publish finished') + s.outStream.once('data', function (packet) { + t.deepEqual(packet, { + cmd: 'unsuback', + messageId: 43, + dup: false, + length: 2, + qos: 0, + retain: false + }, 'packet matches') + + s.outStream.on('data', function (packet) { + t.fail('packet received') + }) + + s.broker.publish({ + cmd: 'publish', + topic: 'hello', + payload: 'world' + }, function () { + t.pass('publish finished') + }) }) }) }) @@ -138,11 +144,13 @@ test('unsubscribe without subscribe', function (t) { var s = noError(connect(setup()), t) - s.inStream.write({ - cmd: 'unsubscribe', - messageId: 43, - unsubscriptions: ['hello'] - }) + s.broker.once('clientReady', () => + s.inStream.write({ + cmd: 'unsubscribe', + messageId: 43, + unsubscriptions: ['hello'] + }) + ) s.outStream.once('data', function (packet) { t.deepEqual(packet, { @@ -162,28 +170,30 @@ test('unsubscribe on disconnect for a clean=true client', function (t) { var opts = { clean: true } var s = connect(setup(), opts) - subscribe(t, s, 'hello', 0, function () { - s.conn.destroy(null, function () { - t.pass('closed streams') - }) - s.outStream.on('data', function () { - t.fail('should not receive any more messages') - }) - s.broker.once('unsubscribe', function () { - t.pass('should emit unsubscribe') - }) - s.broker.once('closed', function () { - t.ok(true) - t.end() - }) - s.broker.publish({ - cmd: 'publish', - topic: 'hello', - payload: Buffer.from('world') - }, function () { - t.pass('calls the callback') + s.broker.once('clientReady', () => + subscribe(t, s, 'hello', 0, function () { + s.conn.destroy(null, function () { + t.pass('closed streams') + }) + s.outStream.on('data', function () { + t.fail('should not receive any more messages') + }) + s.broker.once('unsubscribe', function () { + t.pass('should emit unsubscribe') + }) + s.broker.once('closed', function () { + t.ok(true) + t.end() + }) + s.broker.publish({ + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world') + }, function () { + t.pass('calls the callback') + }) }) - }) + ) }) test('unsubscribe on disconnect for a clean=false client', function (t) { @@ -192,26 +202,28 @@ test('unsubscribe on disconnect for a clean=false client', function (t) { var opts = { clean: false } var s = connect(setup(), opts) - subscribe(t, s, 'hello', 0, function () { - s.conn.destroy(null, function () { - t.pass('closed streams') - }) - s.outStream.on('data', function () { - t.fail('should not receive any more messages') - }) - s.broker.once('unsubscribe', function () { - t.fail('should not emit unsubscribe') - }) - s.broker.once('closed', function () { - t.ok(true) - t.end() - }) - s.broker.publish({ - cmd: 'publish', - topic: 'hello', - payload: Buffer.from('world') - }, function () { - t.pass('calls the callback') + s.broker.once('clientReady', () => { + subscribe(t, s, 'hello', 0, function () { + s.conn.destroy(null, function () { + t.pass('closed streams') + }) + s.outStream.on('data', function () { + t.fail('should not receive any more messages') + }) + s.broker.once('unsubscribe', function () { + t.fail('should not emit unsubscribe') + }) + s.broker.once('closed', function () { + t.ok(true) + t.end() + }) + s.broker.publish({ + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world') + }, function () { + t.pass('calls the callback') + }) }) }) }) @@ -225,8 +237,10 @@ test('disconnect', function (t) { t.end() }) - s.inStream.write({ - cmd: 'disconnect' + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'disconnect' + }) }) }) diff --git a/test/close_socket_by_other_party.js b/test/close_socket_by_other_party.js index 7d7a2c18..2402d56c 100644 --- a/test/close_socket_by_other_party.js +++ b/test/close_socket_by_other_party.js @@ -66,13 +66,15 @@ test('client is closed before authorizePublish returns', function (t) { }) var s = connect(setup(broker, false)) - s.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1, - messageId: 10, - retain: false + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 10, + retain: false + }) }) evt.on('AuthorizePublishBegin', function (client) { @@ -92,14 +94,16 @@ test('close client when its socket is closed', function (t) { t.plan(4) var broker = aedes() - var subscriber = connect(setup(broker, false)) + var s = connect(setup(broker, false)) - subscribe(t, subscriber, 'hello', 1, function () { - subscriber.inStream.end() - subscriber.conn.on('close', function () { - t.equal(broker.connectedClients, 0, 'no connected client') - broker.close() - t.end() + s.broker.once('clientReady', () => { + subscribe(t, s, 'hello', 1, function () { + s.inStream.end() + s.conn.on('close', function () { + t.equal(broker.connectedClients, 0, 'no connected client') + broker.close() + t.end() + }) }) }) }) diff --git a/test/events.js b/test/events.js index 98f20e8a..0f7b020d 100644 --- a/test/events.js +++ b/test/events.js @@ -28,17 +28,19 @@ test('publishes an hearbeat', function (t) { var s = connect(setup()) - subscribe(t, s, '#', 0, function () { - s.outStream.once('data', function (packet) { - t.fail('no packet should be received') - }) - - s.broker.mq.emit({ - cmd: 'publish', - topic: topic + '/hello', - payload: 'world' - }, function () { - t.pass('nothing happened') + s.broker.once('clientReady', () => { + subscribe(t, s, '#', 0, function () { + s.outStream.once('data', function (packet) { + t.fail('no packet should be received') + }) + + s.broker.mq.emit({ + cmd: 'publish', + topic: topic + '/hello', + payload: 'world' + }, function () { + t.pass('nothing happened') + }) }) }) }) @@ -48,45 +50,44 @@ test('publishes an hearbeat', function (t) { var s = connect(setup()) - subscribe(t, s, '+/#', 0, function () { - s.outStream.once('data', function (packet) { - t.fail('no packet should be received') - }) - - s.broker.mq.emit({ - cmd: 'publish', - topic: topic + '/hello', - payload: 'world' - }, function () { - t.pass('nothing happened') + s.broker.once('clientReady', () => { + subscribe(t, s, '+/#', 0, function () { + s.outStream.once('data', function (packet) { + t.fail('no packet should be received') + }) + + s.broker.mq.emit({ + cmd: 'publish', + topic: topic + '/hello', + payload: 'world' + }, function () { + t.pass('nothing happened') + }) }) }) }) }) test('does not store $SYS topics to QoS 1 # subscription', function (t) { - t.plan(3) + t.plan(4) var broker = aedes() var opts = { clean: false, clientId: 'abcde' } var s = connect(setup(broker), opts) - subscribe(t, s, '#', 1, function () { - s.inStream.end() - - s.broker.publish({ - cmd: 'publish', - topic: '$SYS/hello', - payload: 'world', - qos: 1 - }, function () { - s = connect(setup(broker), { clean: false, clientId: 'abcde' }, function () { - t.end() - }) - + s.broker.once('clientReady', () => { + subscribe(t, s, '#', 1, function () { s.outStream.once('data', function (packet) { t.fail('no packet should be received') }) + s.broker.mq.emit({ + cmd: 'publish', + topic: '$SYS/hello', + payload: 'world', + qos: 1 + }, function () { + t.pass('nothing happened') + }) }) }) }) @@ -111,8 +112,10 @@ test('Emit event when receives a ping', function (t) { var s = connect(setup(broker), { clientId: 'abcde' }) - s.inStream.write({ - cmd: 'pingreq' + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'pingreq' + }) }) }) diff --git a/test/helper.js b/test/helper.js index d7540dd0..9c768260 100644 --- a/test/helper.js +++ b/test/helper.js @@ -20,7 +20,7 @@ function setup (broker, autoClose) { if (autoClose === undefined || autoClose) { setTimeout(function () { broker.close() - }, autoClose || 200) + }, autoClose || 250) } return { diff --git a/test/keep-alive.js b/test/keep-alive.js index fc0bc257..39155e03 100644 --- a/test/keep-alive.js +++ b/test/keep-alive.js @@ -13,10 +13,11 @@ test('supports pingreq/pingresp', function (t) { var s = noError(connect(setup())) - s.inStream.write({ - cmd: 'pingreq' + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'pingreq' + }) }) - s.outStream.on('data', function (packet) { t.equal(packet.cmd, 'pingresp', 'the response is a pingresp') }) @@ -42,12 +43,14 @@ test('supports keep alive disconnections after a pingreq', function (t) { var s = connect(setup(null, 3000), { keepalive: 1 }) var start - setTimeout(function () { - start = Date.now() - s.inStream.write({ - cmd: 'pingreq' - }) - }, 1000) + s.broker.once('clientReady', () => { + setTimeout(function () { + start = Date.now() + s.inStream.write({ + cmd: 'pingreq' + }) + }, 1000) + }) eos(s.conn, function () { t.ok(Date.now() >= start + 1500, 'waits 1 and a half the keepalive timeout') diff --git a/test/meta.js b/test/meta.js index 4c335e68..2131d75d 100644 --- a/test/meta.js +++ b/test/meta.js @@ -73,12 +73,14 @@ test('call published method with client', function (t) { var s = connect(setup(broker)) - s.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: Buffer.from('world'), - qos: 1, - messageId: 42 + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 1, + messageId: 42 + }) }) }) @@ -99,11 +101,13 @@ test('emit publish event with client - QoS 0', function (t) { var s = connect(setup(broker)) - s.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: Buffer.from('world'), - qos: 0 + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 0 + }) }) }) @@ -125,12 +129,14 @@ test('emit publish event with client - QoS 1', function (t) { var s = connect(setup(broker)) - s.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: Buffer.from('world'), - qos: 1, - messageId: 42 + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 1, + messageId: 42 + }) }) }) @@ -148,8 +154,10 @@ test('emit subscribe event', function (t) { t.equal(client.id, 'abcde', 'client matches') }) - subscribe(t, s, 'hello', 0, function () { - t.pass('subscribe completed') + s.broker.once('clientReady', () => { + subscribe(t, s, 'hello', 0, function () { + t.pass('subscribe completed') + }) }) }) @@ -166,15 +174,17 @@ test('emit unsubscribe event', function (t) { t.equal(client.id, 'abcde', 'client matches') }) - subscribe(t, s, 'hello', 0, function () { - s.inStream.write({ - cmd: 'unsubscribe', - messageId: 43, - unsubscriptions: ['hello'] - }) + s.broker.once('clientReady', () => { + subscribe(t, s, 'hello', 0, function () { + s.inStream.write({ + cmd: 'unsubscribe', + messageId: 43, + unsubscriptions: ['hello'] + }) - s.outStream.once('data', function (packet) { - t.pass('subscribe completed') + s.outStream.once('data', function (packet) { + t.pass('subscribe completed') + }) }) }) }) @@ -189,12 +199,14 @@ test('dont emit unsubscribe event on client close', function (t) { t.error('unsubscribe should not be emitted') }) - subscribe(t, s, 'hello', 0, function () { - s.inStream.end({ - cmd: 'disconnect' - }) - s.outStream.once('data', function (packet) { - t.pass('unsubscribe completed') + s.broker.once('clientReady', () => { + subscribe(t, s, 'hello', 0, function () { + s.inStream.end({ + cmd: 'disconnect' + }) + s.outStream.once('data', function (packet) { + t.pass('unsubscribe completed') + }) }) }) }) diff --git a/test/qos1.js b/test/qos1.js index 8a6f9613..7a00c5bc 100644 --- a/test/qos1.js +++ b/test/qos1.js @@ -21,12 +21,14 @@ test('publish QoS 1', function (t) { retain: false } - s.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1, - messageId: 42 + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 42 + }) }) s.outStream.on('data', function (packet) { @@ -68,40 +70,46 @@ test('publish QoS 1 and check offline queue', function (t) { retain: false } var queue = [] - subscribe(t, subscriber, 'hello', 1, function () { - publisher.outStream.on('data', function (packet) { - t.deepEqual(packet, expectedAck, 'ack packet must patch') - }) - subscriber.outStream.on('data', function (packet) { - queue.push(packet) - delete packet.payload - delete packet.length - t.notEqual(packet.messageId, undefined, 'messageId is assigned a value') - t.notEqual(packet.messageId, 10, 'messageId should be unique') - expected.messageId = packet.messageId - t.deepEqual(packet, expected, 'publish packet must patch') - if (queue.length === 2) { - setImmediate(() => { - for (var i = 0; i < queue.length; i++) { - broker.persistence.outgoingClearMessageId(subscriberClient, queue[i], function (_, origPacket) { - if (origPacket) { - delete origPacket.brokerId - delete origPacket.brokerCounter - delete origPacket.payload - delete origPacket.messageId - delete sent.payload - delete sent.messageId - t.deepEqual(origPacket, sent, 'origPacket must match') + + subscriber.broker.once('clientReady', () => { + publisher.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 1, function () { + publisher.outStream.on('data', function (packet) { + t.deepEqual(packet, expectedAck, 'ack packet must patch') + }) + subscriber.outStream.on('data', function (packet) { + queue.push(packet) + delete packet.payload + delete packet.length + t.notEqual(packet.messageId, undefined, 'messageId is assigned a value') + t.notEqual(packet.messageId, 10, 'messageId should be unique') + expected.messageId = packet.messageId + t.deepEqual(packet, expected, 'publish packet must patch') + if (queue.length === 2) { + setImmediate(() => { + for (var i = 0; i < queue.length; i++) { + broker.persistence.outgoingClearMessageId(subscriberClient, queue[i], function (_, origPacket) { + if (origPacket) { + delete origPacket.brokerId + delete origPacket.brokerCounter + delete origPacket.payload + delete origPacket.messageId + delete sent.payload + delete sent.messageId + t.deepEqual(origPacket, sent, 'origPacket must match') + } + }) } + t.end() }) } - t.end() }) - } + }) + + publisher.inStream.write(sent) + sent.payload = 'world2world' + publisher.inStream.write(sent) }) - publisher.inStream.write(sent) - sent.payload = 'world2world' - publisher.inStream.write(sent) }) }) @@ -121,24 +129,28 @@ test('subscribe QoS 1', function (t) { retain: false } - subscribe(t, subscriber, 'hello', 1, function () { - subscriber.outStream.once('data', function (packet) { - subscriber.inStream.write({ - cmd: 'puback', - messageId: packet.messageId - }) - t.notEqual(packet.messageId, 42, 'messageId must differ') - delete packet.messageId - t.deepEqual(packet, expected, 'packet must match') - t.end() - }) + subscriber.broker.once('clientReady', () => { + publisher.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 1, function () { + subscriber.outStream.once('data', function (packet) { + subscriber.inStream.write({ + cmd: 'puback', + messageId: packet.messageId + }) + t.notEqual(packet.messageId, 42, 'messageId must differ') + delete packet.messageId + t.deepEqual(packet, expected, 'packet must match') + t.end() + }) - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1, - messageId: 42 + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 42 + }) + }) }) }) }) @@ -158,29 +170,36 @@ test('subscribe QoS 0, but publish QoS 1', function (t) { length: 12, retain: false } + subscriber.broker.once('clientReady', () => { + publisher.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 0, function () { + subscriber.outStream.once('data', function (packet) { + t.deepEqual(packet, expected, 'packet must match') + t.end() + }) - subscribe(t, subscriber, 'hello', 0, function () { - subscriber.outStream.once('data', function (packet) { - t.deepEqual(packet, expected, 'packet must match') - t.end() - }) - - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1, - messageId: 42 + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 42 + }) + }) }) }) }) test('restore QoS 1 subscriptions not clean', function (t) { - t.plan(7) + // t.plan(7) var broker = aedes() var publisher var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) + // subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) + // var subscriber = null + // broker.close() + // t.end() var expected = { cmd: 'publish', topic: 'hello', @@ -191,35 +210,39 @@ test('restore QoS 1 subscriptions not clean', function (t) { retain: false } - subscribe(t, subscriber, 'hello', 1, function () { - subscriber.inStream.end() + subscriber.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 1, function () { + subscriber.inStream.end() - publisher = connect(setup(broker)) + publisher = connect(setup(broker)) - subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) { - t.equal(connect.sessionPresent, true, 'session present is set to true') - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1, - messageId: 42 - }) - }) + publisher.broker.once('clientReady', () => { + subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) { + t.equal(connect.sessionPresent, true, 'session present is set to true') + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 42 + }) + }) - publisher.outStream.once('data', function (packet) { - t.equal(packet.cmd, 'puback') - }) + publisher.outStream.once('data', function (packet) { + t.equal(packet.cmd, 'puback') + }) - subscriber.outStream.once('data', function (packet) { - subscriber.inStream.write({ - cmd: 'puback', - messageId: packet.messageId + subscriber.outStream.once('data', function (packet) { + subscriber.inStream.write({ + cmd: 'puback', + messageId: packet.messageId + }) + t.notEqual(packet.messageId, 42, 'messageId must differ') + delete packet.messageId + t.deepEqual(packet, expected, 'packet must match') + t.end() + }) }) - t.notEqual(packet.messageId, 42, 'messageId must differ') - delete packet.messageId - t.deepEqual(packet, expected, 'packet must match') - t.end() }) }) }) @@ -231,43 +254,47 @@ test('remove stored subscriptions if connected with clean=true', function (t) { var publisher var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) - subscribe(t, subscriber, 'hello', 1, function () { - subscriber.inStream.end() + subscriber.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 1, function () { + subscriber.inStream.end() - publisher = connect(setup(broker)) + publisher = connect(setup(broker)) - subscriber = connect(setup(broker), { clean: true, clientId: 'abcde' }, function (packet) { - t.equal(packet.sessionPresent, false, 'session present is set to false') - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1, - messageId: 42 - }) + publisher.broker.once('clientReady', () => { + subscriber = connect(setup(broker), { clean: true, clientId: 'abcde' }, function (packet) { + t.equal(packet.sessionPresent, false, 'session present is set to false') + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 42 + }) - subscriber.inStream.end() + subscriber.inStream.end() - subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) { - t.equal(connect.sessionPresent, false, 'session present is set to false') - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1, - messageId: 43 - }) + subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) { + t.equal(connect.sessionPresent, false, 'session present is set to false') + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 43 + }) - t.end() - }) + t.end() + }) - subscriber.outStream.once('data', function (packet) { - t.fail('publish received') - }) - }) + subscriber.outStream.once('data', function (packet) { + t.fail('publish received') + }) + }) - subscriber.outStream.once('data', function (packet) { - t.fail('publish received') + subscriber.outStream.once('data', function (packet) { + t.fail('publish received') + }) + }) }) }) }) @@ -292,44 +319,49 @@ test('resend publish on non-clean reconnect QoS 1', function (t) { retain: false } - subscribe(t, subscriber, 'hello', 1, function () { - subscriber.inStream.end() + subscriber.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 1, function () { + subscriber.inStream.end() - publisher = connect(setup(broker)) + publisher = connect(setup(broker)) - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1, - messageId: 42 - }) - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world world', - qos: 1, - messageId: 42 - }) - publisher.outStream.once('data', function (packet) { - t.equal(packet.cmd, 'puback') + publisher.broker.once('clientReady', () => { + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 42 + }) + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world world', + qos: 1, + messageId: 42 + }) + }) - subscriber = connect(setup(broker), opts) + publisher.outStream.once('data', function (packet) { + t.equal(packet.cmd, 'puback') - subscriber.outStream.once('data', function (packet) { - subscriber.inStream.write({ - cmd: 'puback', - messageId: packet.messageId - }) - t.notEqual(packet.messageId, 42, 'messageId must differ') - delete packet.messageId - t.deepEqual(packet, expected, 'packet must match') - setImmediate(() => { - var stream = broker.persistence.outgoingStream(subscriberClient) - stream.pipe(concat(function (list) { - t.equal(list.length, 1, 'should remain one item in queue') - t.deepEqual(list[0].payload, Buffer.from('world world'), 'packet must match') - })) + subscriber = connect(setup(broker), opts) + + subscriber.outStream.once('data', function (packet) { + subscriber.inStream.write({ + cmd: 'puback', + messageId: packet.messageId + }) + t.notEqual(packet.messageId, 42, 'messageId must differ') + delete packet.messageId + t.deepEqual(packet, expected, 'packet must match') + setImmediate(() => { + var stream = broker.persistence.outgoingStream(subscriberClient) + stream.pipe(concat(function (list) { + t.equal(list.length, 1, 'should remain one item in queue') + t.deepEqual(list[0].payload, Buffer.from('world world'), 'packet must match') + })) + }) }) }) }) @@ -352,38 +384,42 @@ test('do not resend QoS 1 packets at each reconnect', function (t) { retain: false } - subscribe(t, subscriber, 'hello', 1, function () { - subscriber.inStream.end() + subscriber.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 1, function () { + subscriber.inStream.end() - publisher = connect(setup(broker)) + publisher = connect(setup(broker)) - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1, - messageId: 42 - }) + publisher.broker.once('clientReady', () => { + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 42 + }) + }) - publisher.outStream.once('data', function (packet) { - t.equal(packet.cmd, 'puback') + publisher.outStream.once('data', function (packet) { + t.equal(packet.cmd, 'puback') - subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) + subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) - subscriber.outStream.once('data', function (packet) { - subscriber.inStream.end({ - cmd: 'puback', - messageId: packet.messageId - }) + subscriber.outStream.once('data', function (packet) { + subscriber.inStream.end({ + cmd: 'puback', + messageId: packet.messageId + }) - t.notEqual(packet.messageId, 42, 'messageId must differ') - delete packet.messageId - t.deepEqual(packet, expected, 'packet must match') + t.notEqual(packet.messageId, 42, 'messageId must differ') + delete packet.messageId + t.deepEqual(packet, expected, 'packet must match') - var subscriber2 = connect(setup(broker), { clean: false, clientId: 'abcde' }) + var subscriber2 = connect(setup(broker), { clean: false, clientId: 'abcde' }) - subscriber2.outStream.once('data', function (packet) { - t.fail('this should never happen') + subscriber2.outStream.once('data', function (packet) { + t.fail('this should never happen') + }) }) }) }) @@ -398,26 +434,30 @@ test('do not resend QoS 1 packets if reconnect is clean', function (t) { var publisher var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) - subscribe(t, subscriber, 'hello', 1, function () { - subscriber.inStream.end() + subscriber.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 1, function () { + subscriber.inStream.end() - publisher = connect(setup(broker)) + publisher = connect(setup(broker)) - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1, - messageId: 42 - }) + publisher.broker.once('clientReady', () => { + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 42 + }) + }) - publisher.outStream.once('data', function (packet) { - t.equal(packet.cmd, 'puback') + publisher.outStream.once('data', function (packet) { + t.equal(packet.cmd, 'puback') - subscriber = connect(setup(broker), { clean: true, clientId: 'abcde' }) + subscriber = connect(setup(broker), { clean: true, clientId: 'abcde' }) - subscriber.outStream.once('data', function (packet) { - t.fail('this should never happen') + subscriber.outStream.once('data', function (packet) { + t.fail('this should never happen') + }) }) }) }) @@ -440,34 +480,38 @@ test('do not resend QoS 1 packets at reconnect if puback was received', function retain: false } - subscribe(t, subscriber, 'hello', 1, function () { - publisher = connect(setup(broker)) - - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1, - messageId: 42 - }) + subscriber.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 1, function () { + publisher = connect(setup(broker)) - publisher.outStream.once('data', function (packet) { - t.equal(packet.cmd, 'puback') - }) + publisher.broker.once('clientReady', () => { + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 42 + }) + }) - subscriber.outStream.once('data', function (packet) { - subscriber.inStream.end({ - cmd: 'puback', - messageId: packet.messageId + publisher.outStream.once('data', function (packet) { + t.equal(packet.cmd, 'puback') }) - delete packet.messageId - t.deepEqual(packet, expected, 'packet must match') + subscriber.outStream.once('data', function (packet) { + subscriber.inStream.end({ + cmd: 'puback', + messageId: packet.messageId + }) + + delete packet.messageId + t.deepEqual(packet, expected, 'packet must match') - subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) + subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) - subscriber.outStream.once('data', function (packet) { - t.fail('this should never happen') + subscriber.outStream.once('data', function (packet) { + t.fail('this should never happen') + }) }) }) }) @@ -481,56 +525,59 @@ test('remove stored subscriptions after unsubscribe', function (t) { var publisher var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) - subscribe(t, subscriber, 'hello', 1, function () { - subscriber.inStream.write({ - cmd: 'unsubscribe', - messageId: 43, - unsubscriptions: ['hello'] - }) - - subscriber.outStream.once('data', function (packet) { - t.deepEqual(packet, { - cmd: 'unsuback', + subscriber.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 1, function () { + subscriber.inStream.write({ + cmd: 'unsubscribe', messageId: 43, - dup: false, - length: 2, - qos: 0, - retain: false - }, 'packet matches') - - subscriber.inStream.end() + unsubscriptions: ['hello'] + }) - publisher = connect(setup(broker)) + subscriber.outStream.once('data', function (packet) { + t.deepEqual(packet, { + cmd: 'unsuback', + messageId: 43, + dup: false, + length: 2, + qos: 0, + retain: false + }, 'packet matches') + + subscriber.inStream.end() + + publisher = connect(setup(broker)) + + publisher.broker.once('clientReady', () => { + subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (packet) { + t.equal(packet.sessionPresent, false, 'session present is set to false') + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 42 + }) - subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (packet) { - t.equal(packet.sessionPresent, false, 'session present is set to false') - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1, - messageId: 42 - }) + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 43 + }, function () { + subscriber.inStream.end() + t.end() + }) - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1, - messageId: 43 - }, function () { - subscriber.inStream.end() - t.end() + subscriber.outStream.once('data', function (packet) { + t.fail('publish received') + }) + }) }) - subscriber.outStream.once('data', function (packet) { t.fail('publish received') }) }) - - subscriber.outStream.once('data', function (packet) { - t.fail('publish received') - }) }) }) }) @@ -549,20 +596,22 @@ test('upgrade a QoS 0 subscription to QoS 1', function (t) { dup: false } - subscribe(t, s, 'hello', 0, function () { - subscribe(t, s, 'hello', 1, function () { - s.outStream.once('data', function (packet) { - t.ok(packet.messageId, 'has messageId') - delete packet.messageId - t.deepEqual(packet, expected, 'packet matches') - t.end() - }) + s.broker.once('clientReady', () => { + subscribe(t, s, 'hello', 0, function () { + subscribe(t, s, 'hello', 1, function () { + s.outStream.once('data', function (packet) { + t.ok(packet.messageId, 'has messageId') + delete packet.messageId + t.deepEqual(packet, expected, 'packet matches') + t.end() + }) - s.broker.publish({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1 + s.broker.publish({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1 + }) }) }) }) @@ -582,16 +631,18 @@ test('downgrade QoS 0 publish on QoS 1 subsciption', function (t) { dup: false } - subscribe(t, s, 'hello', 1, function () { - s.outStream.once('data', function (packet) { - t.deepEqual(packet, expected, 'packet matches') - t.end() - }) - s.broker.publish({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 0 + s.broker.once('clientReady', () => { + subscribe(t, s, 'hello', 1, function () { + s.outStream.once('data', function (packet) { + t.deepEqual(packet, expected, 'packet matches') + t.end() + }) + s.broker.publish({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 0 + }) }) }) }) @@ -635,20 +686,22 @@ test('subscribe and publish QoS 1 in parallel', function (t) { }) }) - s.inStream.write({ - cmd: 'subscribe', - messageId: 24, - subscriptions: [{ - topic: 'hello', - qos: 1 - }] - }) + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'subscribe', + messageId: 24, + subscriptions: [{ + topic: 'hello', + qos: 1 + }] + }) - s.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1, - messageId: 42 + s.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 42 + }) }) }) diff --git a/test/qos2.js b/test/qos2.js index 343cfc30..2c5d0985 100644 --- a/test/qos2.js +++ b/test/qos2.js @@ -7,12 +7,12 @@ var setup = helper.setup var connect = helper.connect var subscribe = helper.subscribe -function publish (t, s, packet, done) { +function publish (t, publisher, packet, done) { var msgId = packet.messageId - s.inStream.write(packet) + publisher.inStream.write(packet) - s.outStream.once('data', function (packet) { + publisher.outStream.once('data', function (packet) { t.deepEqual(packet, { cmd: 'pubrec', messageId: msgId, @@ -22,12 +22,12 @@ function publish (t, s, packet, done) { qos: 0 }, 'pubrec must match') - s.inStream.write({ + publisher.inStream.write({ cmd: 'pubrel', messageId: msgId }) - s.outStream.once('data', function (packet) { + publisher.outStream.once('data', function (packet) { t.deepEqual(packet, { cmd: 'pubcomp', messageId: msgId, @@ -80,7 +80,6 @@ function receive (t, subscriber, expected, done) { test('publish QoS 2', function (t) { t.plan(2) - var s = connect(setup()) var packet = { cmd: 'publish', topic: 'hello', @@ -88,7 +87,10 @@ test('publish QoS 2', function (t) { qos: 2, messageId: 42 } - publish(t, s, packet, t.end.bind(t)) + + var p = connect(setup(), {}, () => { + publish(t, p, packet, t.end.bind(t)) + }) }) test('subscribe QoS 2', function (t) { @@ -108,10 +110,14 @@ test('subscribe QoS 2', function (t) { retain: false } - subscribe(t, subscriber, 'hello', 2, function () { - publish(t, publisher, toPublish) + subscriber.broker.once('clientReady', () => { + publisher.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 2, function () { + publish(t, publisher, toPublish) - receive(t, subscriber, toPublish, t.end.bind(t)) + receive(t, subscriber, toPublish, t.end.bind(t)) + }) + }) }) }) @@ -139,13 +145,13 @@ test('client.publish with clean=true subscribption QoS 2', function (t) { }) }) - var subscriber = connect(setup(broker), { clean: true }) - - subscribe(t, subscriber, 'hello', 2, function () { - t.pass('subscribed') - receive(t, subscriber, toPublish, t.end.bind(t)) - brokerClient.publish(toPublish, function (err) { - t.error(err) + var subscriber = connect(setup(broker), { clean: true }, () => { + subscribe(t, subscriber, 'hello', 2, function () { + t.pass('subscribed') + receive(t, subscriber, toPublish, t.end.bind(t)) + brokerClient.publish(toPublish, function (err) { + t.error(err) + }) }) }) }) @@ -175,10 +181,14 @@ test('call published method with client with QoS 2', function (t) { } } - subscribe(t, subscriber, 'hello', 2, function () { - publish(t, publisher, toPublish) + subscriber.broker.once('clientReady', () => { + publisher.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 2, function () { + publish(t, publisher, toPublish) - receive(t, subscriber, toPublish, t.pass.bind(t)) + receive(t, subscriber, toPublish, t.pass.bind(t)) + }) + }) }) }) @@ -198,19 +208,23 @@ test('subscribe QoS 0, but publish QoS 2', function (t) { retain: false } - subscribe(t, subscriber, 'hello', 0, function () { - subscriber.outStream.once('data', function (packet) { - t.deepEqual(packet, expected, 'packet must match') - }) + subscriber.broker.once('clientReady', () => { + publisher.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 0, function () { + subscriber.outStream.once('data', function (packet) { + t.deepEqual(packet, expected, 'packet must match') + }) - publish(t, publisher, { - cmd: 'publish', - topic: 'hello', - payload: Buffer.from('world'), - qos: 2, - retain: false, - messageId: 42, - dup: false + publish(t, publisher, { + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 2, + retain: false, + messageId: 42, + dup: false + }) + }) }) }) broker.on('closed', t.end.bind(t)) @@ -232,20 +246,24 @@ test('subscribe QoS 1, but publish QoS 2', function (t) { retain: false } - subscribe(t, subscriber, 'hello', 1, function () { - subscriber.outStream.once('data', function (packet) { - delete packet.messageId - t.deepEqual(packet, expected, 'packet must match') - }) + subscriber.broker.once('clientReady', () => { + publisher.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 1, function () { + subscriber.outStream.once('data', function (packet) { + delete packet.messageId + t.deepEqual(packet, expected, 'packet must match') + }) - publish(t, publisher, { - cmd: 'publish', - topic: 'hello', - payload: Buffer.from('world'), - qos: 2, - retain: false, - messageId: 42, - dup: false + publish(t, publisher, { + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 2, + retain: false, + messageId: 42, + dup: false + }) + }) }) }) broker.on('closed', t.end.bind(t)) @@ -268,17 +286,21 @@ test('restore QoS 2 subscriptions not clean', function (t) { retain: false } - subscribe(t, subscriber, 'hello', 2, function () { - subscriber.inStream.end() + subscriber.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 2, function () { + subscriber.inStream.end() - publisher = connect(setup(broker)) + publisher = connect(setup(broker)) - subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) { - t.equal(connect.sessionPresent, true, 'session present is set to true') - publish(t, publisher, expected) - }) + publisher.broker.once('clientReady', () => { + subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) { + t.equal(connect.sessionPresent, true, 'session present is set to true') + publish(t, publisher, expected) + }) - receive(t, subscriber, expected, t.end.bind(t)) + receive(t, subscriber, expected, t.end.bind(t)) + }) + }) }) }) @@ -300,15 +322,19 @@ test('resend publish on non-clean reconnect QoS 2', function (t) { retain: false } - subscribe(t, subscriber, 'hello', 2, function () { - subscriber.inStream.end() + subscriber.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 2, function () { + subscriber.inStream.end() - publisher = connect(setup(broker)) + publisher = connect(setup(broker)) - publish(t, publisher, expected, function () { - subscriber = connect(setup(broker), opts) + publisher.broker.once('clientReady', () => { + publish(t, publisher, expected, function () { + subscriber = connect(setup(broker), opts) - receive(t, subscriber, expected, t.end.bind(t)) + receive(t, subscriber, expected, t.end.bind(t)) + }) + }) }) }) }) @@ -331,57 +357,61 @@ test('resend pubrel on non-clean reconnect QoS 2', function (t) { retain: false } - subscribe(t, subscriber, 'hello', 2, function () { - subscriber.inStream.end() - - publisher = connect(setup(broker)) - - publish(t, publisher, expected, function () { - subscriber = connect(setup(broker), opts) - - subscriber.outStream.once('data', function (packet) { - t.notEqual(packet.messageId, expected.messageId, 'messageId must differ') - - var msgId = packet.messageId - delete packet.messageId - delete expected.messageId - t.deepEqual(packet, expected, 'packet must match') + subscriber.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 2, function () { + subscriber.inStream.end() - subscriber.inStream.write({ - cmd: 'pubrec', - messageId: msgId - }) - - subscriber.outStream.once('data', function (packet) { - t.deepEqual(packet, { - cmd: 'pubrel', - messageId: msgId, - length: 2, - qos: 1, - retain: false, - dup: false - }, 'pubrel must match') - - subscriber.inStream.end() + publisher = connect(setup(broker)) + publisher.broker.once('clientReady', () => { + publish(t, publisher, expected, function () { subscriber = connect(setup(broker), opts) subscriber.outStream.once('data', function (packet) { - t.deepEqual(packet, { - cmd: 'pubrel', - messageId: msgId, - length: 2, - qos: 1, - retain: false, - dup: false - }, 'pubrel must match') + t.notEqual(packet.messageId, expected.messageId, 'messageId must differ') + + var msgId = packet.messageId + delete packet.messageId + delete expected.messageId + t.deepEqual(packet, expected, 'packet must match') subscriber.inStream.write({ - cmd: 'pubcomp', + cmd: 'pubrec', messageId: msgId }) - t.end() + subscriber.outStream.once('data', function (packet) { + t.deepEqual(packet, { + cmd: 'pubrel', + messageId: msgId, + length: 2, + qos: 1, + retain: false, + dup: false + }, 'pubrel must match') + + subscriber.inStream.end() + + subscriber = connect(setup(broker), opts) + + subscriber.outStream.once('data', function (packet) { + t.deepEqual(packet, { + cmd: 'pubrel', + messageId: msgId, + length: 2, + qos: 1, + retain: false, + dup: false + }, 'pubrel must match') + + subscriber.inStream.write({ + cmd: 'pubcomp', + messageId: msgId + }) + + t.end() + }) + }) }) }) }) @@ -416,11 +446,15 @@ test('publish after disconnection', function (t) { retain: false } - subscribe(t, subscriber, 'hello', 2, function () { - publish(t, publisher, toPublish) + subscriber.broker.once('clientReady', () => { + publisher.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 2, function () { + publish(t, publisher, toPublish) - receive(t, subscriber, toPublish, function () { - publish(t, publisher, toPublish2, t.end.bind(t)) + receive(t, subscriber, toPublish, function () { + publish(t, publisher, toPublish2, t.end.bind(t)) + }) + }) }) }) }) @@ -442,10 +476,12 @@ test('multiple publish and store one', function (t) { messageId: 42 } - var count = 5 - while (--count) { - s.inStream.write(toPublish) - } + s.broker.once('clientReady', () => { + var count = 5 + while (--count) { + s.inStream.write(toPublish) + } + }) broker.on('closed', function () { broker.persistence.incomingGetPacket(sid, toPublish, function (err, origPacket) { delete origPacket.brokerId diff --git a/test/regr-21.js b/test/regr-21.js index cc71b10c..2c98175c 100644 --- a/test/regr-21.js +++ b/test/regr-21.js @@ -30,5 +30,7 @@ test('after an error, outstanding packets are discarded', function (t) { s.broker.mq.on('foo', function (msg, cb) { t.fail('msg received') }) - s.inStream.write(packet) + s.broker.once('clientReady', () => { + s.inStream.write(packet) + }) }) diff --git a/test/retain.js b/test/retain.js index 59c8706d..973c8448 100644 --- a/test/retain.js +++ b/test/retain.js @@ -24,21 +24,23 @@ test('live retain packets', function (t) { var s = noError(connect(setup()), t) - subscribe(t, s, 'hello', 0, function () { - s.outStream.on('data', function (packet) { - t.deepEqual(packet, expected) - }) + s.broker.once('clientReady', () => { + subscribe(t, s, 'hello', 0, function () { + s.outStream.on('data', function (packet) { + t.deepEqual(packet, expected) + }) - s.broker.publish({ - cmd: 'publish', - topic: 'hello', - payload: Buffer.from('world'), - retain: true, - dup: false, - length: 12, - qos: 0 - }, function () { - t.pass('publish finished') + s.broker.publish({ + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + retain: true, + dup: false, + length: 12, + qos: 0 + }, function () { + t.pass('publish finished') + }) }) }) }) @@ -59,22 +61,26 @@ test('retain messages', function (t) { retain: true } - broker.subscribe('hello', function (packet, cb) { - cb() - - // defer this or it will receive the message which - // is being published - setImmediate(function () { - subscribe(t, subscriber, 'hello', 0, function () { - subscriber.outStream.once('data', function (packet) { - t.deepEqual(packet, expected, 'packet must match') - t.end() + subscriber.broker.once('clientReady', () => { + publisher.broker.once('clientReady', () => { + broker.subscribe('hello', function (packet, cb) { + cb() + + // defer this or it will receive the message which + // is being published + setImmediate(function () { + subscribe(t, subscriber, 'hello', 0, function () { + subscriber.outStream.once('data', function (packet) { + t.deepEqual(packet, expected, 'packet must match') + t.end() + }) + }) }) }) + + publisher.inStream.write(expected) }) }) - - publisher.inStream.write(expected) }) test('avoid wrong deduping of retain messages', function (t) { @@ -93,32 +99,36 @@ test('avoid wrong deduping of retain messages', function (t) { retain: true } - broker.subscribe('hello', function (packet, cb) { - cb() - // subscribe and publish another topic - subscribe(t, subscriber, 'hello2', 0, function () { - cb() - - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello2', - payload: Buffer.from('world'), - qos: 0, - dup: false - }) + subscriber.broker.once('clientReady', () => { + publisher.broker.once('clientReady', () => { + broker.subscribe('hello', function (packet, cb) { + cb() + // subscribe and publish another topic + subscribe(t, subscriber, 'hello2', 0, function () { + cb() + + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello2', + payload: Buffer.from('world'), + qos: 0, + dup: false + }) - subscriber.outStream.once('data', function (packet) { - subscribe(t, subscriber, 'hello', 0, function () { subscriber.outStream.once('data', function (packet) { - t.deepEqual(packet, expected, 'packet must match') - t.end() + subscribe(t, subscriber, 'hello', 0, function () { + subscriber.outStream.once('data', function (packet) { + t.deepEqual(packet, expected, 'packet must match') + t.end() + }) + }) }) }) }) + + publisher.inStream.write(expected) }) }) - - publisher.inStream.write(expected) }) test('reconnected subscriber will not receive retained messages when QoS 0 and clean', function (t) { @@ -136,27 +146,32 @@ test('reconnected subscriber will not receive retained messages when QoS 0 and c dup: false, length: 12 } - subscribe(t, subscriber, 'hello', 0, function () { - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 0, - retain: false - }) - subscriber.outStream.once('data', function (packet) { - t.deepEqual(packet, expected, 'packet must match') - subscriber.inStream.end() - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'foo', - qos: 0, - retain: true - }) - subscriber = connect(setup(broker, false), { clean: true }) - subscriber.outStream.on('data', function (packet) { - t.fail('should not received retain message') + + subscriber.broker.once('clientReady', () => { + publisher.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 0, function () { + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 0, + retain: false + }) + subscriber.outStream.once('data', function (packet) { + t.deepEqual(packet, expected, 'packet must match') + subscriber.inStream.end() + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'foo', + qos: 0, + retain: true + }) + subscriber = connect(setup(broker, false), { clean: true }) + subscriber.outStream.on('data', function (packet) { + t.fail('should not received retain message') + }) + }) }) }) }) @@ -169,7 +184,6 @@ test('new QoS 0 subscribers receive QoS 0 retained messages when clean', functio t.plan(9) var broker = aedes() - var publisher = connect(setup(broker), { clean: true }) var expected = { cmd: 'publish', topic: 'hello/world', @@ -179,25 +193,38 @@ test('new QoS 0 subscribers receive QoS 0 retained messages when clean', functio dup: false, length: 26 } - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello/world', - payload: 'big big world', - qos: 0, - retain: true - }) - var subscriber1 = connect(setup(broker, false), { clean: true }) - subscribe(t, subscriber1, 'hello/world', 0, function () { - subscriber1.outStream.on('data', function (packet) { - t.deepEqual(packet, expected, 'packet must match') + + var publisher = connect(setup(broker), { clean: true }) + publisher.broker.once('clientReady', () => { + console.log('publisher') + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello/world', + payload: 'big big world', + qos: 0, + retain: true }) - }) - var subscriber2 = connect(setup(broker, false), { clean: true }) - subscribe(t, subscriber2, 'hello/+', 0, function () { - subscriber2.outStream.on('data', function (packet) { - t.deepEqual(packet, expected, 'packet must match') + + var subscriber1 = connect(setup(broker, false), { clean: true }) + subscriber1.broker.once('clientReady', () => { + console.log('subscriber1') + subscribe(t, subscriber1, 'hello/world', 0, function () { + subscriber1.outStream.on('data', function (packet) { + t.deepEqual(packet, expected, 'packet must match') + }) + }) + }) + var subscriber2 = connect(setup(broker, false), { clean: true }) + subscriber2.broker.once('clientReady', () => { + console.log('subscriber2') + subscribe(t, subscriber2, 'hello/+', 0, function () { + subscriber2.outStream.on('data', function (packet) { + t.deepEqual(packet, expected, 'packet must match') + }) + }) }) }) + broker.on('closed', function () { t.equal(broker.counter, 9) t.end() @@ -219,24 +246,30 @@ test('new QoS 0 subscribers receive downgraded QoS 1 retained messages when clea dup: false, length: 12 } - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1, - retain: true, - messageId: 42 - }) - publisher.outStream.on('data', function (packet) { - var subscriber = connect(setup(broker, false), { clean: true }) - subscribe(t, subscriber, 'hello', 0, function () { - subscriber.outStream.on('data', function (packet) { - t.notEqual(packet.messageId, 42, 'messageId should not be the same') - delete packet.messageId - t.deepEqual(packet, expected, 'packet must match') + + publisher.broker.once('clientReady', () => { + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + retain: true, + messageId: 42 + }) + publisher.outStream.on('data', function (packet) { + var subscriber = connect(setup(broker, false), { clean: true }) + subscriber.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 0, function () { + subscriber.outStream.on('data', function (packet) { + t.notEqual(packet.messageId, 42, 'messageId should not be the same') + delete packet.messageId + t.deepEqual(packet, expected, 'packet must match') + }) + }) }) }) }) + broker.on('closed', function () { t.equal(broker.counter, 6) t.end() @@ -249,24 +282,29 @@ test('clean retained messages', function (t) { var broker = aedes() var publisher = connect(setup(broker), { clean: true }) - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 0, - retain: true - }) - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: '', - qos: 0, - retain: true - }) - var subscriber = connect(setup(broker, false), { clean: true }) - subscribe(t, subscriber, 'hello', 0, function () { - subscriber.outStream.once('data', function (packet) { - t.fail('should not received retain message') + + publisher.broker.once('clientReady', () => { + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 0, + retain: true + }) + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: '', + qos: 0, + retain: true + }) + var subscriber = connect(setup(broker, false), { clean: true }) + subscriber.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 0, function () { + subscriber.outStream.once('data', function (packet) { + t.fail('should not received retain message') + }) + }) }) }) broker.on('closed', t.end.bind(t)) @@ -279,11 +317,13 @@ test('broker not store zero-byte retained messages', function (t) { var broker = aedes() var s = connect(setup(broker)) - s.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: '', - retain: true + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: '', + retain: true + }) }) s.broker.on('publish', function (packet, client) { if (packet.topic.startsWith('$SYS/')) { @@ -311,24 +351,29 @@ test('fail to clean retained messages without retain flag', function (t) { dup: false, length: 12 } - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 0, - retain: true - }) - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: '', - qos: 0, - retain: false - }) - var subscriber = connect(setup(broker, false), { clean: true }) - subscribe(t, subscriber, 'hello', 0, function () { - subscriber.outStream.on('data', function (packet) { - t.deepEqual(packet, expected, 'packet must match') + + publisher.broker.once('clientReady', () => { + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 0, + retain: true + }) + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: '', + qos: 0, + retain: false + }) + var subscriber = connect(setup(broker, false), { clean: true }) + subscriber.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 0, function () { + subscriber.outStream.on('data', function (packet) { + t.deepEqual(packet, expected, 'packet must match') + }) + }) }) }) broker.on('closed', t.end.bind(t)) @@ -348,24 +393,29 @@ test('only get the last retained messages in same topic', function (t) { dup: false, length: 10 } - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 0, - retain: true - }) - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'foo', - qos: 0, - retain: true - }) - var subscriber = connect(setup(broker, false), { clean: true }) - subscribe(t, subscriber, 'hello', 0, function () { - subscriber.outStream.on('data', function (packet) { - t.deepEqual(packet, expected, 'packet must match') + + publisher.broker.once('clientReady', () => { + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 0, + retain: true + }) + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'foo', + qos: 0, + retain: true + }) + var subscriber = connect(setup(broker, false), { clean: true }) + subscriber.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 0, function () { + subscriber.outStream.on('data', function (packet) { + t.deepEqual(packet, expected, 'packet must match') + }) + }) }) }) broker.on('closed', t.end.bind(t)) @@ -376,7 +426,6 @@ test('deliver QoS 1 retained messages to new subscriptions', function (t) { var broker = aedes() var publisher = connect(setup(broker)) - var subscriber = connect(setup(broker)) var expected = { cmd: 'publish', topic: 'hello', @@ -387,21 +436,26 @@ test('deliver QoS 1 retained messages to new subscriptions', function (t) { retain: true } - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1, - messageId: 42, - retain: true + publisher.broker.once('clientReady', () => { + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 42, + retain: true + }) }) publisher.outStream.on('data', function (packet) { - subscribe(t, subscriber, 'hello', 1, function () { - subscriber.outStream.once('data', function (packet) { - delete packet.messageId - t.deepEqual(packet, expected, 'packet must match') - t.end() + var subscriber = connect(setup(broker)) + subscriber.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 1, function () { + subscriber.outStream.once('data', function (packet) { + delete packet.messageId + t.deepEqual(packet, expected, 'packet must match') + t.end() + }) }) }) }) @@ -411,7 +465,6 @@ test('deliver QoS 1 retained messages to established subscriptions', function (t t.plan(4) var broker = aedes() - var publisher = connect(setup(broker)) var subscriber = connect(setup(broker)) var expected = { cmd: 'publish', @@ -423,19 +476,25 @@ test('deliver QoS 1 retained messages to established subscriptions', function (t retain: false } - subscribe(t, subscriber, 'hello', 1, function () { - subscriber.outStream.once('data', function (packet) { - delete packet.messageId - t.deepEqual(packet, expected, 'packet must match') - t.end() - }) - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1, - messageId: 42, - retain: true + subscriber.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 1, function () { + subscriber.outStream.once('data', function (packet) { + delete packet.messageId + t.deepEqual(packet, expected, 'packet must match') + t.end() + }) + var publisher = connect(setup(broker)) + publisher.broker.once('clientReady', () => { + console.log('test') + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 42, + retain: true + }) + }) }) }) }) @@ -459,9 +518,7 @@ test('deliver QoS 0 retained message with QoS 1 subscription', function (t) { broker.mq.on('hello', function (msg, cb) { cb() - // defer this or it will receive the message which - // is being published - setImmediate(function () { + subscriber.broker.once('clientReady', () => { subscribe(t, subscriber, 'hello', 1, function () { subscriber.outStream.once('data', function (packet) { t.deepEqual(packet, expected, 'packet must match') @@ -471,13 +528,15 @@ test('deliver QoS 0 retained message with QoS 1 subscription', function (t) { }) }) - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: Buffer.from('world'), - qos: 0, - messageId: 42, - retain: true + publisher.broker.once('clientReady', () => { + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 0, + messageId: 42, + retain: true + }) }) }) @@ -497,47 +556,51 @@ test('disconnect and retain messages with QoS 1 [clean=false]', function (t) { retain: true } - subscribe(t, subscriber, 'hello', 1, function () { - subscriber.inStream.write({ - cmd: 'disconnect' - }) - - subscriber.outStream.on('data', function (packet) { - console.log('original', packet) - }) - - publisher = connect(setup(broker)) + subscriber.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 1, function () { + subscriber.inStream.write({ + cmd: 'disconnect' + }) - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1, - messageId: 42, - retain: true - }) + subscriber.outStream.on('data', function (packet) { + console.log('original', packet) + }) - publisher.outStream.once('data', function (packet) { - t.equal(packet.cmd, 'puback') + publisher = connect(setup(broker)) - broker.on('clientError', function (client, err) { - t.equal(err.message, 'connection closed') + publisher.broker.once('clientReady', () => { + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 42, + retain: true + }) }) - subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) { - t.equal(connect.sessionPresent, true, 'session present is set to true') - }) + publisher.outStream.once('data', function (packet) { + t.equal(packet.cmd, 'puback') - subscriber.outStream.once('data', function (packet) { - // receive any queued messages (no matter they are retained messages) at the disconnected time - t.notEqual(packet.messageId, 42, 'messageId must differ') - delete packet.messageId - packet.length = 14 - t.deepEqual(packet, expected, 'packet must match') + broker.on('clientError', function (client, err) { + t.equal(err.message, 'connection closed') + }) + + subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) { + t.equal(connect.sessionPresent, true, 'session present is set to true') + }) - // there should be no messages come from restored subscriptions subscriber.outStream.once('data', function (packet) { - t.fail('should not receive any more messages') + // receive any queued messages (no matter they are retained messages) at the disconnected time + t.notEqual(packet.messageId, 42, 'messageId must differ') + delete packet.messageId + packet.length = 14 + t.deepEqual(packet, expected, 'packet must match') + + // there should be no messages come from restored subscriptions + subscriber.outStream.once('data', function (packet) { + t.fail('should not receive any more messages') + }) }) }) }) @@ -560,73 +623,77 @@ test('disconnect and two retain messages with QoS 1 [clean=false]', function (t) retain: true } - subscribe(t, subscriber, 'hello', 1, function () { - subscriber.inStream.write({ - cmd: 'disconnect' - }) - - subscriber.outStream.on('data', function (packet) { - console.log('original', packet) - }) - - publisher = connect(setup(broker)) + subscriber.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 1, function () { + subscriber.inStream.write({ + cmd: 'disconnect' + }) - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 1, - messageId: 41, - retain: true - }) + subscriber.outStream.on('data', function (packet) { + console.log('original', packet) + }) - publisher.outStream.once('data', function (packet) { - t.equal(packet.cmd, 'puback') + publisher = connect(setup(broker)) - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world2', - qos: 1, - messageId: 42, - retain: true + publisher.broker.once('clientReady', () => { + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 41, + retain: true + }) }) publisher.outStream.once('data', function (packet) { t.equal(packet.cmd, 'puback') - broker.on('clientError', function (client, err) { - t.equal(err.message, 'connection closed') + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world2', + qos: 1, + messageId: 42, + retain: true }) - subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) { - t.equal(connect.sessionPresent, true, 'session present is set to true') - }) + publisher.outStream.once('data', function (packet) { + t.equal(packet.cmd, 'puback') - subscriber.outStream.once('data', function (packet) { - // receive any queued messages (included retained messages) at the disconnected time - t.notEqual(packet.messageId, 41, 'messageId must differ') - delete packet.messageId - packet.length = 14 - expected.payload = Buffer.from('world') - t.deepEqual(packet, expected, 'packet must match') + broker.on('clientError', function (client, err) { + t.equal(err.message, 'connection closed') + }) + + subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) { + t.equal(connect.sessionPresent, true, 'session present is set to true') + }) - // receive any queued messages (included retained messages) at the disconnected time subscriber.outStream.once('data', function (packet) { - t.notEqual(packet.messageId, 42, 'messageId must differ') + // receive any queued messages (included retained messages) at the disconnected time + t.notEqual(packet.messageId, 41, 'messageId must differ') delete packet.messageId packet.length = 14 - expected.payload = Buffer.from('world2') + expected.payload = Buffer.from('world') t.deepEqual(packet, expected, 'packet must match') - // should get the last retained message when we do a subscribe - subscribe(t, subscriber, 'hello', 1, function () { - subscriber.outStream.on('data', function (packet) { - t.notEqual(packet.messageId, 42, 'messageId must differ') - delete packet.messageId - packet.length = 14 - expected.payload = Buffer.from('world2') - t.deepEqual(packet, expected, 'packet must match') + // receive any queued messages (included retained messages) at the disconnected time + subscriber.outStream.once('data', function (packet) { + t.notEqual(packet.messageId, 42, 'messageId must differ') + delete packet.messageId + packet.length = 14 + expected.payload = Buffer.from('world2') + t.deepEqual(packet, expected, 'packet must match') + + // should get the last retained message when we do a subscribe + subscribe(t, subscriber, 'hello', 1, function () { + subscriber.outStream.on('data', function (packet) { + t.notEqual(packet.messageId, 42, 'messageId must differ') + delete packet.messageId + packet.length = 14 + expected.payload = Buffer.from('world2') + t.deepEqual(packet, expected, 'packet must match') + }) }) }) }) diff --git a/test/topics.js b/test/topics.js index 5c46f574..53815a19 100644 --- a/test/topics.js +++ b/test/topics.js @@ -14,16 +14,18 @@ test('publish empty topic', function (t) { var s = connect(setup()) - subscribe(t, s, '#', 0, function () { - s.outStream.once('data', function (packet) { - t.fail('no packet') - t.end() - }) + s.broker.once('clientReady', () => { + subscribe(t, s, '#', 0, function () { + s.outStream.once('data', function (packet) { + t.fail('no packet') + t.end() + }) - s.inStream.write({ - cmd: 'publish', - topic: '', - payload: 'world' + s.inStream.write({ + cmd: 'publish', + topic: '', + payload: 'world' + }) }) }) @@ -38,16 +40,18 @@ test('publish invalid topic with #', function (t) { var s = connect(setup()) - subscribe(t, s, '#', 0, function () { - s.outStream.once('data', function (packet) { - t.fail('no packet') - t.end() - }) + s.broker.once('clientReady', () => { + subscribe(t, s, '#', 0, function () { + s.outStream.once('data', function (packet) { + t.fail('no packet') + t.end() + }) - s.inStream.write({ - cmd: 'publish', - topic: 'hello/#', - payload: 'world' + s.inStream.write({ + cmd: 'publish', + topic: 'hello/#', + payload: 'world' + }) }) }) @@ -61,15 +65,17 @@ test('publish invalid topic with +', function (t) { var s = connect(setup()) - subscribe(t, s, '#', 0, function () { - s.outStream.once('data', function (packet) { - t.fail('no packet') - }) + s.broker.once('clientReady', () => { + subscribe(t, s, '#', 0, function () { + s.outStream.once('data', function (packet) { + t.fail('no packet') + }) - s.inStream.write({ - cmd: 'publish', - topic: 'hello/+/eee', - payload: 'world' + s.inStream.write({ + cmd: 'publish', + topic: 'hello/+/eee', + payload: 'world' + }) }) }) @@ -88,13 +94,15 @@ test('publish invalid topic with +', function (t) { t.end() }) - s.inStream.write({ - cmd: 'subscribe', - messageId: 24, - subscriptions: [{ - topic: topic, - qos: 0 - }] + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'subscribe', + messageId: 24, + subscriptions: [{ + topic: topic, + qos: 0 + }] + }) }) }) @@ -107,10 +115,12 @@ test('publish invalid topic with +', function (t) { t.end() }) - s.inStream.write({ - cmd: 'unsubscribe', - messageId: 24, - unsubscriptions: [topic] + s.broker.once('clientReady', () => { + s.inStream.write({ + cmd: 'unsubscribe', + messageId: 24, + unsubscriptions: [topic] + }) }) }) }) @@ -131,17 +141,21 @@ test('topics are case-sensitive', function (t) { retain: false } - subscribe(t, subscriber, 'hello', 0, function () { - subscriber.outStream.on('data', function (packet) { - t.deepEqual(packet, expected, 'packet mush match') - }) - ;['hello', 'HELLO', 'heLLo', 'HELLO/#', 'hello/+'].forEach(function (topic) { - publisher.inStream.write({ - cmd: 'publish', - topic: topic, - payload: 'world', - qos: 0, - retain: false + subscriber.broker.once('clientReady', () => { + publisher.broker.once('clientReady', () => { + subscribe(t, subscriber, 'hello', 0, function () { + subscriber.outStream.on('data', function (packet) { + t.deepEqual(packet, expected, 'packet mush match') + }) + ;['hello', 'HELLO', 'heLLo', 'HELLO/#', 'hello/+'].forEach(function (topic) { + publisher.inStream.write({ + cmd: 'publish', + topic: topic, + payload: 'world', + qos: 0, + retain: false + }) + }) }) }) }) @@ -192,10 +206,13 @@ test('Overlapped topics with same QoS', function (t) { var sub = [ { topic: 'hello/world', qos: 1 }, { topic: 'hello/#', qos: 1 }] - subscribeMultipleTopics(t, broker, 1, subscriber, sub, function () { - subscriber.outStream.on('data', function (packet) { - delete packet.messageId - t.deepEqual(packet, expected, 'packet must match') + + subscriber.broker.once('clientReady', () => { + subscribeMultipleTopics(t, broker, 1, subscriber, sub, function () { + subscriber.outStream.on('data', function (packet) { + delete packet.messageId + t.deepEqual(packet, expected, 'packet must match') + }) }) }) broker.on('closed', t.end.bind(t)) @@ -219,10 +236,13 @@ test('deliver overlapped topics respecting the maximum QoS of all the matching s var sub = [ { topic: 'hello/world', qos: 0 }, { topic: 'hello/#', qos: 2 }] - subscribeMultipleTopics(t, broker, 0, subscriber, sub, function () { - subscriber.outStream.on('data', function (packet) { - delete packet.messageId - t.deepEqual(packet, expected, 'packet must match') + + subscriber.broker.once('clientReady', () => { + subscribeMultipleTopics(t, broker, 0, subscriber, sub, function () { + subscriber.outStream.on('data', function (packet) { + delete packet.messageId + t.deepEqual(packet, expected, 'packet must match') + }) }) }) broker.on('closed', t.end.bind(t)) @@ -238,9 +258,12 @@ test('deliver overlapped topics respecting the maximum QoS of all the matching s var sub = [ { topic: 'hello/world', qos: 0 }, { topic: 'hello/#', qos: 2 }] - subscribeMultipleTopics(t, broker, 2, subscriber, sub, function () { - subscriber.outStream.on('data', function () { - t.fail('should receive messages with the maximum QoS') + + subscriber.broker.once('clientReady', () => { + subscribeMultipleTopics(t, broker, 2, subscriber, sub, function () { + subscriber.outStream.on('data', function () { + t.fail('should receive messages with the maximum QoS') + }) }) }) broker.on('closed', t.end.bind(t)) @@ -263,9 +286,12 @@ test('Overlapped topics with QoS downgrade', function (t) { var sub = [ { topic: 'hello/world', qos: 1 }, { topic: 'hello/#', qos: 1 }] - subscribeMultipleTopics(t, broker, 0, subscriber, sub, function () { - subscriber.outStream.on('data', function (packet) { - t.deepEqual(packet, expected, 'packet must match') + + subscriber.broker.once('clientReady', () => { + subscribeMultipleTopics(t, broker, 0, subscriber, sub, function () { + subscriber.outStream.on('data', function (packet) { + t.deepEqual(packet, expected, 'packet must match') + }) }) }) broker.on('closed', t.end.bind(t))