Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: keepalive causes a reconnect loop when connection is lost #1779

Merged
merged 2 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 40 additions & 7 deletions example.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,58 @@
import mqtt from '.'

const client = mqtt.connect('mqtt://test.mosquitto.org')
const client = mqtt.connect('mqtt://test.mosquitto.org', {
keepalive: 10,
reconnectPeriod: 15000,
})

const testTopic = 'presence'

client.subscribe(testTopic, (err) => {
if (!err) {
console.log('subscribed to', testTopic)
client.publish(testTopic, 'Hello mqtt', (err2) => {
function publish() {
client.publish(
testTopic,
`Hello mqtt ${new Date().toISOString()}`,
(err2) => {
if (!err2) {
console.log('message published')
} else {
console.error(err2)
}
})
},
)
}

client.subscribe(testTopic, (err) => {
if (!err) {
console.log('subscribed to', testTopic)
} else {
console.error(err)
}
})

client.on('message', (topic, message) => {
console.log('received message "%s" from topic "%s"', message, topic)
client.end()
setTimeout(() => {
publish()
}, 2000)
})

client.on('error', (err) => {
console.error(err)
})

client.on('connect', () => {
console.log('connected')
publish()
})

client.on('disconnect', () => {
console.log('disconnected')
})

client.on('offline', () => {
console.log('offline')
})

client.on('reconnect', () => {
console.log('reconnect')
})
18 changes: 9 additions & 9 deletions src/lib/PingTimer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,7 @@ export default class PingTimer {
constructor(keepalive: number, checkPing: () => void) {
this.keepalive = keepalive * 1000
this.checkPing = checkPing
this.setup()
}

private setup() {
this.timer = this._setTimeout(() => {
this.checkPing()
this.reschedule()
}, this.keepalive)
this.reschedule()
}

clear() {
Expand All @@ -40,6 +33,13 @@ export default class PingTimer {

reschedule() {
this.clear()
this.setup()
this.timer = this._setTimeout(() => {
this.checkPing()
// prevent possible race condition where the timer is destroyed on _cleauUp
// and recreated here
if (this.timer) {
this.reschedule()
}
}, this.keepalive)
}
}
8 changes: 4 additions & 4 deletions src/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
clearTimeout(this.connackTimer)

this.log('close :: clearing ping timer')
if (this.pingTimer !== null) {
if (this.pingTimer) {
this.pingTimer.clear()
this.pingTimer = null
}
Expand Down Expand Up @@ -1752,15 +1752,15 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
})
}

if (!this.disconnecting) {
if (!this.disconnecting && !this.reconnecting) {
this.log(
'_cleanUp :: client not disconnecting. Clearing and resetting reconnect.',
'_cleanUp :: client not disconnecting/reconnecting. Clearing and resetting reconnect.',
)
this._clearReconnect()
this._setupReconnect()
}

if (this.pingTimer !== null) {
if (this.pingTimer) {
this.log('_cleanUp :: clearing pingTimer')
this.pingTimer.clear()
this.pingTimer = null
Expand Down
53 changes: 53 additions & 0 deletions test/pingTimer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { afterEach, beforeEach, describe, it } from 'node:test'
import PingTimer from '../src/lib/PingTimer'
import { assert } from 'chai'
import { useFakeTimers, spy } from 'sinon'

describe('PingTimer', () => {
let clock: sinon.SinonFakeTimers
beforeEach(() => {
clock = useFakeTimers()
})

afterEach(() => {
clock.restore()
})

it('should schedule and clear', () => {
const keepalive = 10 // seconds
const cb = spy()
const pingTimer = new PingTimer(keepalive, cb)

assert.ok(pingTimer['timer'], 'timer should be created automatically')

clock.tick(keepalive * 1000 + 1)
assert.equal(
cb.callCount,
1,
'should trigger the callback after keepalive seconds',
)
clock.tick(keepalive * 1000 + 1)
assert.equal(cb.callCount, 2, 'should reschedule automatically')
pingTimer.clear()
assert.ok(!pingTimer['timer'], 'timer should not exists after clear()')
})

it('should not re-schedule if timer has been cleared in check ping', () => {
const keepalive = 10 // seconds
const cb = spy()
const pingTimer = new PingTimer(keepalive, () => {
pingTimer.clear()
cb()
})

clock.tick(keepalive * 1000 + 1)
assert.equal(
cb.callCount,
1,
'should trigger the callback after keepalive seconds',
)
clock.tick(keepalive * 1000 + 1)
assert.equal(cb.callCount, 1, 'should not re-schedule')
assert.ok(!pingTimer['timer'], 'timer should not exists')
})
})
Loading