Skip to content

Commit

Permalink
Clusters example #146 (#332)
Browse files Browse the repository at this point in the history
* Clusters example #146

* Fixed standard indentation issues
  • Loading branch information
robertsLando authored and mcollina committed Oct 30, 2019
1 parent 5a82b99 commit bccfca9
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ build/Release
# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git-
node_modules

shared-mqemitter

# Users Environment Variables
.lock-wscript

Expand Down
75 changes: 75 additions & 0 deletions examples/clusters/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
var cluster = require('cluster')
var mqemitter = require('mqemitter-child-process')
var mongoPersistence = require('aedes-persistence-mongodb')

function startAedes () {
var client = mqemitter.child()
var port = 1883

var aedes = require('aedes')({
mq: client,
persistence: mongoPersistence({
url: 'mongodb://127.0.0.1/aedes-test',
// Optional ttl settings
ttl: {
packets: 300, // Number of seconds
subscriptions: 300
}
})
})

var server = require('net').createServer(aedes.handle)

server.listen(port, function () {
console.log('Aedes listening on port:', port)
aedes.publish({ topic: 'aedes/hello', payload: "I'm broker " + aedes.id })
})

aedes.on('subscribe', function (subscriptions, client) {
console.log('MQTT client \x1b[32m' + (client ? client.id : client) +
'\x1b[0m subscribed to topics: ' + subscriptions.map(s => s.topic).join('\n'), 'from broker', aedes.id)
})

aedes.on('unsubscribe', function (subscriptions, client) {
console.log('MQTT client \x1b[32m' + (client ? client.id : client) +
'\x1b[0m unsubscribed to topics: ' + subscriptions.join('\n'), 'from broker', aedes.id)
})

// fired when a client connects
aedes.on('client', function (client) {
console.log('Client Connected: \x1b[33m' + (client ? client.id : client) + '\x1b[0m', 'to broker', aedes.id)
})

// fired when a client disconnects
aedes.on('clientDisconnect', function (client) {
console.log('Client Disconnected: \x1b[31m' + (client ? client.id : client) + '\x1b[0m', 'to broker', aedes.id)
})

// fired when a message is published
aedes.on('publish', async function (packet, client) {
console.log('Client \x1b[31m' + (client ? client.id : 'BROKER_' + aedes.id) + '\x1b[0m has published', packet.payload.toString(), 'on', packet.topic, 'to broker', aedes.id)
})
}

if (cluster.isMaster) {
mqemitter.start((err) => {
if (err) throw err

var numWorkers = require('os').cpus().length
for (let i = 0; i < numWorkers; i++) {
cluster.fork()
}
})

cluster.on('online', function (worker) {
console.log('Worker ' + worker.process.pid + ' is online')
})

cluster.on('exit', function (worker, code, signal) {
console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal)
console.log('Starting a new worker')
cluster.fork()
})
} else {
startAedes()
}
16 changes: 16 additions & 0 deletions examples/clusters/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "aedes_clusters",
"version": "1.0.0",
"description": "Testing Aedes Broker with clusters",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "robertsLando",
"license": "MIT",
"dependencies": {
"aedes": "^0.39.0",
"aedes-persistence-mongodb": "^6.2.0",
"mqemitter-child-process": "^1.0.0"
}
}

0 comments on commit bccfca9

Please sign in to comment.