From 5d9bf1004ba76098d4ae315fa7a4b44a9d26750b Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Mon, 26 Feb 2024 16:24:02 +0100 Subject: [PATCH] feat: emit `Keepalive timeout` error and speed up tests using fake timers (#1798) * chore: speed up tests using fake timers * fix: typo * fix: improve some other tests * fix: improve flush test * fix: remove clock --- src/lib/client.ts | 1 + test/abstract_client.ts | 140 ++++++++++++++++++++++++++++++++-------- test/client.ts | 63 +++++++++++------- 3 files changed, 153 insertions(+), 51 deletions(-) diff --git a/src/lib/client.ts b/src/lib/client.ts index 2378e0fd6..be16f8d81 100644 --- a/src/lib/client.ts +++ b/src/lib/client.ts @@ -2100,6 +2100,7 @@ export default class MqttClient extends TypedEventEmitter { @@ -1342,7 +1343,7 @@ export default function abstractTest(server, config, ports) { const qosTests = [0, 1, 2] qosTests.forEach((qos) => { - it(`should publish 10 QoS ${qos}and receive them only when \`handleMessage\` finishes`, function _test(t, done) { + it(`should publish 10 QoS ${qos} and receive them only when \`handleMessage\` finishes`, function _test(t, done) { testQosHandleMessage(qos, done) }) }) @@ -2048,40 +2049,83 @@ export default function abstractTest(server, config, ports) { }) it( - 'should reconnect if pingresp is not sent', + 'should reconnect on keepalive timeout', { - timeout: 4000, + timeout: 10000, }, function _test(t, done) { - const client = connect({ keepalive: 1, reconnectPeriod: 100 }) + const clock = sinon.useFakeTimers() - // Fake no pingresp being send by stubbing the _handlePingresp function - client.on('packetreceive', (packet) => { - if (packet.cmd === 'pingresp') { - setImmediate(() => { - client.pingResp = false - }) + t.after(() => { + clock.restore() + if (client) { + client.end(true) + throw new Error('Test timed out') } }) + let client = connect({ + keepalive: 60, + reconnectPeriod: 5000, + }) + client.once('connect', () => { - client.once('connect', () => { - client.end(true, done) + client.pingResp = false + + client.once('error', (err) => { + assert.equal(err.message, 'Keepalive timeout') + client.once('connect', () => { + client.end(true, done) + client = null + }) + }) + + client.once('close', () => { + // Wait for the reconnect to happen + clock.tick(client.options.reconnectPeriod) }) + + clock.tick(client.options.keepalive * 1000) }) }, ) - it('should not reconnect if pingresp is successful', function _test(t, done) { - const client = connect({ keepalive: 100 }) - client.once('close', () => { - done(new Error('Client closed connection')) - }) - setTimeout(() => { - client.removeAllListeners('close') - client.end(true, done) - }, 1000) - }) + it( + 'should not reconnect if pingresp is successful', + { timeout: 1000 }, + function _test(t, done) { + const clock = sinon.useFakeTimers() + + t.after(() => { + clock.restore() + if (client) { + client.end(true) + } + }) + + let client = connect({ keepalive: 10 }) + client.once('close', () => { + done(new Error('Client closed connection')) + }) + + client.once('connect', () => { + setImmediate(() => { + // make keepalive check trigger + clock.tick(client.options.keepalive * 1000) + }) + + client.on('packetsend', (packet) => { + if (packet.cmd === 'pingreq') { + client.removeAllListeners('close') + client.end(true, done) + client = null + } + }) + + clock.tick(1) + }) + }, + ) it('should defer the next ping when sending a control packet', function _test(t, done) { const client = connect({ keepalive: 1 }) @@ -2866,13 +2910,22 @@ export default function abstractTest(server, config, ports) { }) it('should reconnect after stream disconnect', function _test(t, done) { - const client = connect() + const clock = sinon.useFakeTimers() + + t.after(() => { + clock.restore() + }) + + const client = connect({ reconnectPeriod: 1000 }) let tryReconnect = true client.on('connect', () => { if (tryReconnect) { client.stream.end() + client.once('close', () => { + clock.tick(client.options.reconnectPeriod) + }) tryReconnect = false } else { client.end(true, done) @@ -2881,7 +2934,15 @@ export default function abstractTest(server, config, ports) { }) it("should emit 'reconnect' when reconnecting", function _test(t, done) { - const client = connect() + const clock = sinon.useFakeTimers() + + t.after(() => { + clock.restore() + }) + + const client = connect({ + reconnectPeriod: 1000, + }) let tryReconnect = true let reconnectEvent = false @@ -2892,6 +2953,9 @@ export default function abstractTest(server, config, ports) { client.on('connect', () => { if (tryReconnect) { client.stream.end() + client.once('close', () => { + clock.tick(client.options.reconnectPeriod) + }) tryReconnect = false } else { assert.isTrue(reconnectEvent) @@ -2901,7 +2965,14 @@ export default function abstractTest(server, config, ports) { }) it("should emit 'offline' after going offline", function _test(t, done) { - const client = connect() + const clock = sinon.useFakeTimers() + + t.after(() => { + clock.restore() + }) + const client = connect({ + reconnectPeriod: 1000, + }) let tryReconnect = true let offlineEvent = false @@ -2914,6 +2985,9 @@ export default function abstractTest(server, config, ports) { if (tryReconnect) { client.stream.end() tryReconnect = false + client.once('close', () => { + clock.tick(client.options.reconnectPeriod) + }) } else { assert.isTrue(offlineEvent) client.end(true, done) @@ -2956,18 +3030,28 @@ export default function abstractTest(server, config, ports) { timeout: 10000, }, function _test(t, done) { + const clock = sinon.useFakeTimers() + + t.after(() => { + clock.restore() + }) + let end const reconnectSlushTime = 200 const client = connect({ reconnectPeriod: test.period }) let reconnect = false - const start = Date.now() + const start = clock.now client.on('connect', () => { if (!reconnect) { client.stream.end() + client.once('close', () => { + // ensure the tick is done after the reconnect timer is setup (on close) + clock.tick(test.period) + }) reconnect = true } else { - end = Date.now() + end = clock.now client.end(() => { const reconnectPeriodDuringTest = end - start if ( diff --git a/test/client.ts b/test/client.ts index 706219be8..07413f1de 100644 --- a/test/client.ts +++ b/test/client.ts @@ -1,3 +1,4 @@ +import { useFakeTimers } from 'sinon' import mqtt from '../src' import { assert } from 'chai' import { fork } from 'child_process' @@ -181,7 +182,7 @@ describe('MqttClient', () => { host: 'localhost', keepalive: 1, connectTimeout: 350, - reconnectPeriod: 0, + reconnectPeriod: 0, // disable reconnect }) client.once('connect', () => { client.publish( @@ -189,26 +190,36 @@ describe('MqttClient', () => { 'fakeMessage', { qos: 1 }, (err) => { + // connection closed assert.exists(err) pubCallbackCalled = true }, ) client.unsubscribe('fakeTopic', (err, result) => { + // connection closed assert.exists(err) unsubscribeCallbackCalled = true }) - setTimeout(() => { - client.end((err1) => { - assert.strictEqual( - pubCallbackCalled && unsubscribeCallbackCalled, - true, - 'callbacks not invoked', - ) - server2.close((err2) => { - done(err1 || err2) + + client.once('error', (err) => { + assert.equal(err.message, 'Keepalive timeout') + const originalFLush = client['_flush'] + // flush will be called on _cleanUp because of keepalive timeout + client['_flush'] = function _flush() { + originalFLush.call(client) + client.end((err1) => { + assert.strictEqual( + pubCallbackCalled && + unsubscribeCallbackCalled, + true, + 'callbacks should be invoked with error', + ) + server2.close((err2) => { + done(err1 || err2) + }) }) - }) - }, 5000) + } + }) }) }, ) @@ -218,7 +229,7 @@ describe('MqttClient', () => { it( 'should attempt to reconnect once server is down', { - timeout: 30000, + timeout: 5000, }, function _test(t, done) { const args = ['-r', 'ts-node/register'] @@ -344,7 +355,7 @@ describe('MqttClient', () => { it( 'should not keep requeueing the first message when offline', { - timeout: 2500, + timeout: 1000, }, function _test(t, done) { const server2 = serverBuilder('mqtt').listen(ports.PORTAND45) @@ -365,16 +376,22 @@ describe('MqttClient', () => { }) }) - setTimeout(() => { - if (client.queue.length === 0) { - debug('calling final client.end()') - client.end(true, (err) => done(err)) - } else { - debug('calling client.end()') - // Do not call done. We want to trigger a reconnect here. - client.end(true) + let reconnections = 0 + + client.on('reconnect', () => { + reconnections++ + if (reconnections === 2) { + if (client.queue.length === 0) { + debug('calling final client.end()') + client.end(true, (err) => done(err)) + } else { + debug('calling client.end()') + // Do not call done. We want to trigger a reconnect here. + client.end(true) + done(Error('client queue not empty')) + } } - }, 2000) + }) }, )