From 7a3b6f7c24441be326187bf5a3147d86430366fc Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Wed, 14 Sep 2022 17:43:10 +0200 Subject: [PATCH 1/3] fix: prevent packets leaks in cluster env Fixes #775 --- aedes.js | 22 ++++++++++++++++++++++ test/retain.js | 4 ++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/aedes.js b/aedes.js index e7de15fa..2aade298 100644 --- a/aedes.js +++ b/aedes.js @@ -79,10 +79,19 @@ function Aedes (opts) { this.brokers = {} const heartbeatTopic = $SYS_PREFIX + that.id + '/heartbeat' + const birthTopic = $SYS_PREFIX + that.id + '/birth' + this._heartbeatInterval = setInterval(heartbeat, opts.heartbeatInterval) const bufId = Buffer.from(that.id, 'utf8') + // in a cluster env this is used to warn other broker instances + // that this broker is alive + that.publish({ + topic: birthTopic, + payload: bufId + }, noop) + function heartbeat () { that.publish({ topic: heartbeatTopic, @@ -143,6 +152,19 @@ function Aedes (opts) { done() }) + this.mq.on($SYS_PREFIX + '+/birth', function brokerBorn (packet, done) { + const brokerId = packet.payload.toString() + + // reset duplicates counter + if (brokerId !== that.id) { + for (const clientId in that.clients) { + delete that.clients[clientId].duplicates[brokerId] + } + } + + done() + }) + this.mq.on($SYS_PREFIX + '+/new/clients', function closeSameClients (packet, done) { const serverId = packet.topic.split('/')[1] const clientId = packet.payload.toString() diff --git a/test/retain.js b/test/retain.js index 62ad687a..4f840d5f 100644 --- a/test/retain.js +++ b/test/retain.js @@ -275,7 +275,7 @@ test('new QoS 0 subscribers receive QoS 0 retained messages when clean', functio }) clock.setTimeout(() => { - t.equal(broker.counter, 8) + t.equal(broker.counter, 9) }, 200) }) @@ -315,7 +315,7 @@ test('new QoS 0 subscribers receive downgraded QoS 1 retained messages when clea }) }) broker.on('closed', function () { - t.equal(broker.counter, 7) + t.equal(broker.counter, 8) }) }) From 23c2bc654a2475b6e9f82416c6156e0ab3bc8626 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Wed, 14 Sep 2022 17:54:21 +0200 Subject: [PATCH 2/3] fix: add test for birth --- test/events.js | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/test/events.js b/test/events.js index 48225f5e..67e45c55 100644 --- a/test/events.js +++ b/test/events.js @@ -1,6 +1,7 @@ 'use strict' const { test } = require('tap') +const mqemitter = require('mqemitter') const { setup, connect, subscribe } = require('./helper') const aedes = require('../') @@ -16,9 +17,31 @@ test('publishes an hearbeat', function (t) { const id = message.topic.match(/\$SYS\/([^/]+)\/heartbeat/)[1] t.equal(id, broker.id, 'broker id matches') t.same(message.payload.toString(), id, 'message has id as the payload') + cb() }) }) +test('publishes birth', function (t) { + t.plan(2) + + const mq = mqemitter() + const brokerId = 'test-broker' + + mq.on('$SYS/+/birth', function (message, cb) { + const id = message.topic.match(/\$SYS\/([^/]+)\/birth/)[1] + t.equal(id, brokerId, 'broker id matches') + t.same(message.payload.toString(), id, 'message has id as the payload') + cb() + }) + + const broker = aedes({ + id: brokerId, + mq + }) + + t.teardown(broker.close.bind(broker)) +}) + ;['$mcollina', '$SYS'].forEach(function (topic) { test('does not forward $ prefixed topics to # subscription - ' + topic, function (t) { t.plan(4) From c6e40c2338786aabd63aa90718829f7df950bf7e Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 15 Sep 2022 08:55:50 +0200 Subject: [PATCH 3/3] fix: reset test --- test/events.js | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/test/events.js b/test/events.js index 67e45c55..40191395 100644 --- a/test/events.js +++ b/test/events.js @@ -22,15 +22,16 @@ test('publishes an hearbeat', function (t) { }) test('publishes birth', function (t) { - t.plan(2) + t.plan(4) const mq = mqemitter() const brokerId = 'test-broker' + const fakeBroker = 'fake-broker' + const clientId = 'test-client' - mq.on('$SYS/+/birth', function (message, cb) { - const id = message.topic.match(/\$SYS\/([^/]+)\/birth/)[1] - t.equal(id, brokerId, 'broker id matches') - t.same(message.payload.toString(), id, 'message has id as the payload') + mq.on(`$SYS/${brokerId}/birth`, (message, cb) => { + t.pass('broker birth received') + t.same(message.payload.toString(), brokerId, 'message has id as the payload') cb() }) @@ -39,7 +40,24 @@ test('publishes birth', function (t) { mq }) - t.teardown(broker.close.bind(broker)) + broker.on('client', (client) => { + t.equal(client.id, clientId, 'client connected') + // set a fake counter on a fake broker + process.nextTick(() => { + broker.clients[clientId].duplicates[fakeBroker] = 42 + mq.emit({ topic: `$SYS/${fakeBroker}/birth`, payload: Buffer.from(fakeBroker) }) + }) + }) + + mq.on(`$SYS/${fakeBroker}/birth`, (message, cb) => { + process.nextTick(() => { + t.equal(!!broker.clients[clientId].duplicates[fakeBroker], false, 'client duplicates has been resetted') + cb() + }) + }) + + const s = connect(setup(broker), { clientId }) + t.teardown(s.broker.close.bind(s.broker)) }) ;['$mcollina', '$SYS'].forEach(function (topic) {