Skip to content

Commit

Permalink
fix: prevent packets leaks in cluster env
Browse files Browse the repository at this point in the history
Fixes #775
  • Loading branch information
robertsLando committed Sep 14, 2022
1 parent 4ba1aac commit 7a3b6f7
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
22 changes: 22 additions & 0 deletions aedes.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,19 @@ function Aedes (opts) {
this.brokers = {}

const heartbeatTopic = $SYS_PREFIX + that.id + '/heartbeat'
const birthTopic = $SYS_PREFIX + that.id + '/birth'

this._heartbeatInterval = setInterval(heartbeat, opts.heartbeatInterval)

const bufId = Buffer.from(that.id, 'utf8')

// in a cluster env this is used to warn other broker instances
// that this broker is alive
that.publish({
topic: birthTopic,
payload: bufId
}, noop)

function heartbeat () {
that.publish({
topic: heartbeatTopic,
Expand Down Expand Up @@ -143,6 +152,19 @@ function Aedes (opts) {
done()
})

this.mq.on($SYS_PREFIX + '+/birth', function brokerBorn (packet, done) {
const brokerId = packet.payload.toString()

// reset duplicates counter
if (brokerId !== that.id) {
for (const clientId in that.clients) {
delete that.clients[clientId].duplicates[brokerId]
}
}

done()
})

this.mq.on($SYS_PREFIX + '+/new/clients', function closeSameClients (packet, done) {
const serverId = packet.topic.split('/')[1]
const clientId = packet.payload.toString()
Expand Down
4 changes: 2 additions & 2 deletions test/retain.js
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ test('new QoS 0 subscribers receive QoS 0 retained messages when clean', functio
})

clock.setTimeout(() => {
t.equal(broker.counter, 8)
t.equal(broker.counter, 9)
}, 200)
})

Expand Down Expand Up @@ -315,7 +315,7 @@ test('new QoS 0 subscribers receive downgraded QoS 1 retained messages when clea
})
})
broker.on('closed', function () {
t.equal(broker.counter, 7)
t.equal(broker.counter, 8)
})
})

Expand Down

0 comments on commit 7a3b6f7

Please sign in to comment.