From bccfca9612f5d22ad73018bb61d77842836a7a69 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Wed, 30 Oct 2019 07:35:41 +0100 Subject: [PATCH] Clusters example #146 (#332) * Clusters example #146 * Fixed standard indentation issues --- .gitignore | 2 + examples/clusters/index.js | 75 ++++++++++++++++++++++++++++++++++ examples/clusters/package.json | 16 ++++++++ 3 files changed, 93 insertions(+) create mode 100644 examples/clusters/index.js create mode 100644 examples/clusters/package.json diff --git a/.gitignore b/.gitignore index 0f2ada3b..5f1abf39 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,8 @@ build/Release # https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git- node_modules +shared-mqemitter + # Users Environment Variables .lock-wscript diff --git a/examples/clusters/index.js b/examples/clusters/index.js new file mode 100644 index 00000000..4c47bbc3 --- /dev/null +++ b/examples/clusters/index.js @@ -0,0 +1,75 @@ +var cluster = require('cluster') +var mqemitter = require('mqemitter-child-process') +var mongoPersistence = require('aedes-persistence-mongodb') + +function startAedes () { + var client = mqemitter.child() + var port = 1883 + + var aedes = require('aedes')({ + mq: client, + persistence: mongoPersistence({ + url: 'mongodb://127.0.0.1/aedes-test', + // Optional ttl settings + ttl: { + packets: 300, // Number of seconds + subscriptions: 300 + } + }) + }) + + var server = require('net').createServer(aedes.handle) + + server.listen(port, function () { + console.log('Aedes listening on port:', port) + aedes.publish({ topic: 'aedes/hello', payload: "I'm broker " + aedes.id }) + }) + + aedes.on('subscribe', function (subscriptions, client) { + console.log('MQTT client \x1b[32m' + (client ? client.id : client) + + '\x1b[0m subscribed to topics: ' + subscriptions.map(s => s.topic).join('\n'), 'from broker', aedes.id) + }) + + aedes.on('unsubscribe', function (subscriptions, client) { + console.log('MQTT client \x1b[32m' + (client ? client.id : client) + + '\x1b[0m unsubscribed to topics: ' + subscriptions.join('\n'), 'from broker', aedes.id) + }) + + // fired when a client connects + aedes.on('client', function (client) { + console.log('Client Connected: \x1b[33m' + (client ? client.id : client) + '\x1b[0m', 'to broker', aedes.id) + }) + + // fired when a client disconnects + aedes.on('clientDisconnect', function (client) { + console.log('Client Disconnected: \x1b[31m' + (client ? client.id : client) + '\x1b[0m', 'to broker', aedes.id) + }) + + // fired when a message is published + aedes.on('publish', async function (packet, client) { + console.log('Client \x1b[31m' + (client ? client.id : 'BROKER_' + aedes.id) + '\x1b[0m has published', packet.payload.toString(), 'on', packet.topic, 'to broker', aedes.id) + }) +} + +if (cluster.isMaster) { + mqemitter.start((err) => { + if (err) throw err + + var numWorkers = require('os').cpus().length + for (let i = 0; i < numWorkers; i++) { + cluster.fork() + } + }) + + cluster.on('online', function (worker) { + console.log('Worker ' + worker.process.pid + ' is online') + }) + + cluster.on('exit', function (worker, code, signal) { + console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal) + console.log('Starting a new worker') + cluster.fork() + }) +} else { + startAedes() +} diff --git a/examples/clusters/package.json b/examples/clusters/package.json new file mode 100644 index 00000000..c73aa6c3 --- /dev/null +++ b/examples/clusters/package.json @@ -0,0 +1,16 @@ +{ + "name": "aedes_clusters", + "version": "1.0.0", + "description": "Testing Aedes Broker with clusters", + "main": "index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "robertsLando", + "license": "MIT", + "dependencies": { + "aedes": "^0.39.0", + "aedes-persistence-mongodb": "^6.2.0", + "mqemitter-child-process": "^1.0.0" + } +}