diff --git a/aedes.js b/aedes.js index b9c1d024..213e7726 100644 --- a/aedes.js +++ b/aedes.js @@ -79,10 +79,19 @@ function Aedes (opts) { this.brokers = {} const heartbeatTopic = $SYS_PREFIX + that.id + '/heartbeat' + const birthTopic = $SYS_PREFIX + that.id + '/birth' + this._heartbeatInterval = setInterval(heartbeat, opts.heartbeatInterval) const bufId = Buffer.from(that.id, 'utf8') + // in a cluster env this is used to warn other broker instances + // that this broker is alive + that.publish({ + topic: birthTopic, + payload: bufId + }, noop) + function heartbeat () { that.publish({ topic: heartbeatTopic, @@ -141,6 +150,19 @@ function Aedes (opts) { done() }) + this.mq.on($SYS_PREFIX + '+/birth', function brokerBorn (packet, done) { + const brokerId = packet.payload.toString() + + // reset duplicates counter + if (brokerId !== that.id) { + for (const clientId in that.clients) { + delete that.clients[clientId].duplicates[brokerId] + } + } + + done() + }) + this.mq.on($SYS_PREFIX + '+/new/clients', function closeSameClients (packet, done) { const serverId = packet.topic.split('/')[1] const clientId = packet.payload.toString() diff --git a/test/events.js b/test/events.js index 48225f5e..40191395 100644 --- a/test/events.js +++ b/test/events.js @@ -1,6 +1,7 @@ 'use strict' const { test } = require('tap') +const mqemitter = require('mqemitter') const { setup, connect, subscribe } = require('./helper') const aedes = require('../') @@ -16,9 +17,49 @@ test('publishes an hearbeat', function (t) { const id = message.topic.match(/\$SYS\/([^/]+)\/heartbeat/)[1] t.equal(id, broker.id, 'broker id matches') t.same(message.payload.toString(), id, 'message has id as the payload') + cb() }) }) +test('publishes birth', function (t) { + t.plan(4) + + const mq = mqemitter() + const brokerId = 'test-broker' + const fakeBroker = 'fake-broker' + const clientId = 'test-client' + + mq.on(`$SYS/${brokerId}/birth`, (message, cb) => { + t.pass('broker birth received') + t.same(message.payload.toString(), brokerId, 'message has id as the payload') + cb() + }) + + const broker = aedes({ + id: brokerId, + mq + }) + + broker.on('client', (client) => { + t.equal(client.id, clientId, 'client connected') + // set a fake counter on a fake broker + process.nextTick(() => { + broker.clients[clientId].duplicates[fakeBroker] = 42 + mq.emit({ topic: `$SYS/${fakeBroker}/birth`, payload: Buffer.from(fakeBroker) }) + }) + }) + + mq.on(`$SYS/${fakeBroker}/birth`, (message, cb) => { + process.nextTick(() => { + t.equal(!!broker.clients[clientId].duplicates[fakeBroker], false, 'client duplicates has been resetted') + cb() + }) + }) + + const s = connect(setup(broker), { clientId }) + t.teardown(s.broker.close.bind(s.broker)) +}) + ;['$mcollina', '$SYS'].forEach(function (topic) { test('does not forward $ prefixed topics to # subscription - ' + topic, function (t) { t.plan(4) diff --git a/test/retain.js b/test/retain.js index 62ad687a..4f840d5f 100644 --- a/test/retain.js +++ b/test/retain.js @@ -275,7 +275,7 @@ test('new QoS 0 subscribers receive QoS 0 retained messages when clean', functio }) clock.setTimeout(() => { - t.equal(broker.counter, 8) + t.equal(broker.counter, 9) }, 200) }) @@ -315,7 +315,7 @@ test('new QoS 0 subscribers receive downgraded QoS 1 retained messages when clea }) }) broker.on('closed', function () { - t.equal(broker.counter, 7) + t.equal(broker.counter, 8) }) })