From c8b5cb03527d0c27a7ffc9ec5ca267f8e06a3a99 Mon Sep 17 00:00:00 2001 From: getlarge Date: Mon, 26 Oct 2020 13:59:13 +0100 Subject: [PATCH] feat: delegate protocol decoding in server-factory and protocol-decoder (#549) * Delegate protocol decoding in server-factory and protocol-decoder * add server-factory reference * add example and collaborator Co-authored-by: Daniel Lando --- README.md | 3 + docs/Aedes.md | 18 -- docs/Examples.md | 28 +++ examples/proxy/index.js | 175 ------------------ examples/proxy/package.json | 17 -- lib/client.js | 11 +- package.json | 2 - test/connect.js | 343 ------------------------------------ 8 files changed, 33 insertions(+), 564 deletions(-) delete mode 100644 examples/proxy/index.js delete mode 100644 examples/proxy/package.json diff --git a/README.md b/README.md index 541988dd..a597b261 100644 --- a/README.md +++ b/README.md @@ -101,6 +101,7 @@ Other info: - [aedes-stats]: Stats for Aedes - [aedes-cli]: Run Aedes MQTT Broker from the CLI - [aedes-protocol-decoder]: Protocol decoder for Aedes MQTT Broker +- [aedes-server-factory]: Create a server instance such as TCP, HTTP, TLS... ## Middleware Plugins @@ -258,6 +259,7 @@ Here is a list of some interesting projects that are using Aedes as MQTT Broker. - [__Behrad Zari__](https://github.com/behrad) - [__Gnought__](https://github.com/gnought) - [__Daniel Lando__](https://github.com/robertsLando) +- [__Getlarge__](https://github.com/getlarge) ## Contribution @@ -297,6 +299,7 @@ Licensed under [MIT](./LICENSE). [aedes-stats]: https://www.npmjs.com/aedes-stats [aedes-cli]: https://www.npmjs.com/aedes-cli [aedes-protocol-decoder]: https://www.npmjs.com/aedes-protocol-decoder +[aedes-server-factory]: https://www.npmjs.com/aedes-server-factory [aedes-persistence]: https://www.npmjs.com/aedes-persistence [aedes-persistence-mongodb]: https://www.npmjs.com/aedes-persistence-mongodb [aedes-persistence-redis]: https://www.npmjs.com/aedes-persistence-redis diff --git a/docs/Aedes.md b/docs/Aedes.md index 3c33f78b..832614cf 100644 --- a/docs/Aedes.md +++ b/docs/Aedes.md @@ -24,7 +24,6 @@ - [aedes.unsubscribe (topic, deliverfunc, callback)](#aedesunsubscribe-topic-deliverfunc-callback) - [aedes.publish (packet, callback)](#aedespublish-packet-callback) - [aedes.close ([callback])](#aedesclose-callback) - - [Handler: decodeProtocol (client, buffer)](#handler-decodeprotocol-client-buffer) - [Handler: preConnect (client, packet, callback)](#handler-preconnect-client-packet-callback) - [Handler: authenticate (client, username, password, callback)](#handler-authenticate-client-username-password-callback) - [Handler: authorizePublish (client, packet, callback)](#handler-authorizepublish-client-packet-callback) @@ -225,23 +224,6 @@ Close aedes server and disconnects all clients. `callback` will be invoked when server is closed. -## Handler: decodeProtocol (client, buffer) - -- client: [``](./Client.md) -- buffer: `` - -Invoked when aedes instance `trustProxy` is `true` - -It targets to decode wrapped protocols (e.g. websocket and PROXY) into plain raw mqtt stream. - -`aedes-protocol-decoder` is an example to parse https headers (x-real-ip | x-forwarded-for) and proxy protocol v1 and v2 to retrieve information in `client.connDetails`. - -```js -aedes.decodeProtocol = function(client, buffer) { - return yourDecoder(client, buffer) -} -``` - ## Handler: preConnect (client, packet, callback) - client: [``](./Client.md) diff --git a/docs/Examples.md b/docs/Examples.md index eb41324e..f391c2ed 100644 --- a/docs/Examples.md +++ b/docs/Examples.md @@ -13,6 +13,20 @@ server.listen(port, function () { }) ``` +## Simple plain MQTT server using server-factory + +```js +const aedes = require('aedes')() +const { createServer } = require('aedes-server-factory') +const port = 1883 + +const server = createServer(aedes) + +server.listen(port, function () { + console.log('server started and listening on port ', port) +}) +``` + ## MQTT over TLS / MQTTS ```js @@ -47,6 +61,20 @@ httpServer.listen(port, function () { }) ``` +## MQTT server over WebSocket using server-factory + +```js +const aedes = require('aedes')() +const { createServer } = require('aedes-server-factory') +const port = 8888 + +const httpServer = createServer(aedes, { ws: true }) + +httpServer.listen(port, function () { + console.log('websocket server listening on port ', port) +}) +``` + ## Clusters In order to use Aedes in clusters you have to choose a persistence and an mqemitter that supports clusters. Tested persistence/mqemitters that works with clusters are: diff --git a/examples/proxy/index.js b/examples/proxy/index.js deleted file mode 100644 index a2f43c6b..00000000 --- a/examples/proxy/index.js +++ /dev/null @@ -1,175 +0,0 @@ -'use strict' - -const aedes = require('../../aedes') -const mqemitter = require('mqemitter') -const persistence = require('aedes-persistence') -const mqttPacket = require('mqtt-packet') -const net = require('net') -const proxyProtocol = require('proxy-protocol-js') - -const brokerPort = 4883 - -// from https://stackoverflow.com/questions/57077161/how-do-i-convert-hex-buffer-to-ipv6-in-javascript -function parseIpV6 (ip) { - return ip.match(/.{1,4}/g) - .map((val) => val.replace(/^0+/, '')) - .join(':') - .replace(/0000:/g, ':') - .replace(/:{2,}/g, '::') -} - -function sendProxyPacket (version = 1, ipFamily = 4) { - const packet = { - cmd: 'connect', - protocolId: 'MQTT', - protocolVersion: 4, - clean: true, - clientId: `my-client-${version}`, - keepalive: 0 - } - const hostIpV4 = '0.0.0.0' - const clientIpV4 = '192.168.1.128' - const hostIpV6 = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] - const clientIpV6 = [0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 192, 168, 1, 128] - var protocol - if (version === 1) { - if (ipFamily === 4) { - protocol = new proxyProtocol.V1BinaryProxyProtocol( - proxyProtocol.INETProtocol.TCP4, - new proxyProtocol.Peer(clientIpV4, 12345), - new proxyProtocol.Peer(hostIpV4, brokerPort), - mqttPacket.generate(packet) - ).build() - } else if (ipFamily === 6) { - protocol = new proxyProtocol.V1BinaryProxyProtocol( - proxyProtocol.INETProtocol.TCP6, - new proxyProtocol.Peer(parseIpV6(Buffer.from(clientIpV6).toString('hex')), 12345), - new proxyProtocol.Peer(parseIpV6(Buffer.from(hostIpV6).toString('hex')), brokerPort), - mqttPacket.generate(packet) - ).build() - } - } else if (version === 2) { - if (ipFamily === 4) { - protocol = new proxyProtocol.V2ProxyProtocol( - proxyProtocol.Command.LOCAL, - proxyProtocol.TransportProtocol.STREAM, - new proxyProtocol.IPv4ProxyAddress( - proxyProtocol.IPv4Address.createFrom(clientIpV4.split('.')), - 12346, - proxyProtocol.IPv4Address.createFrom(hostIpV4.split('.')), - brokerPort - ), - mqttPacket.generate(packet) - ).build() - } else if (ipFamily === 6) { - protocol = new proxyProtocol.V2ProxyProtocol( - proxyProtocol.Command.PROXY, - proxyProtocol.TransportProtocol.STREAM, - new proxyProtocol.IPv6ProxyAddress( - proxyProtocol.IPv6Address.createFrom(clientIpV6), - 12346, - proxyProtocol.IPv6Address.createFrom(hostIpV6), - brokerPort - ), - mqttPacket.generate(packet) - ).build() - } - } - - const parsedProto = version === 1 - ? proxyProtocol.V1BinaryProxyProtocol.parse(protocol) - : proxyProtocol.V2ProxyProtocol.parse(protocol) - // console.log(parsedProto) - - const dstPort = version === 1 - ? parsedProto.destination.port - : parsedProto.proxyAddress.destinationPort - - var dstHost - if (version === 1) { - if (ipFamily === 4) { - dstHost = parsedProto.destination.ipAddress - } else if (ipFamily === 6) { - dstHost = parsedProto.destination.ipAddress - // console.log('ipV6 host :', parsedProto.destination.ipAddress) - } - } else if (version === 2) { - if (ipFamily === 4) { - dstHost = parsedProto.proxyAddress.destinationAddress.address.join('.') - } else if (ipFamily === 6) { - // console.log('ipV6 client :', parseIpV6(Buffer.from(clientIpV6).toString('hex'))) - dstHost = parseIpV6(Buffer.from(parsedProto.proxyAddress.destinationAddress.address).toString('hex')) - } - } - - console.log('Connection to :', dstHost, dstPort) - var mqttConn = net.createConnection( - { - port: dstPort, - host: dstHost, - timeout: 150 - } - ) - - const data = protocol - - mqttConn.on('timeout', function () { - mqttConn.end(data) - }) -} - -function startAedes () { - const broker = aedes({ - mq: mqemitter({ - concurrency: 100 - }), - persistence: persistence(), - preConnect: function (client, packet, done) { - console.log('Aedes preConnect check client ip:', client.connDetails) - if (client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - } - client.close() - return done(null, true) - }, - trustProxy: true - }) - - const server = require('net').createServer(broker.handle) - - server.listen(brokerPort, function () { - console.log('Aedes listening on :', server.address()) - broker.publish({ topic: 'aedes/hello', payload: "I'm broker " + broker.id }) - setTimeout(() => sendProxyPacket(1), 250) - setTimeout(() => sendProxyPacket(1, 6), 500) - setTimeout(() => sendProxyPacket(2), 750) - setTimeout(() => sendProxyPacket(2, 6), 1000) - }) - - broker.on('subscribe', function (subscriptions, client) { - console.log('MQTT client \x1b[32m' + (client ? client.id : client) + - '\x1b[0m subscribed to topics: ' + subscriptions.map(s => s.topic).join('\n'), 'from broker', broker.id) - }) - - broker.on('unsubscribe', function (subscriptions, client) { - console.log('MQTT client \x1b[32m' + (client ? client.id : client) + - '\x1b[0m unsubscribed to topics: ' + subscriptions.join('\n'), 'from broker', broker.id) - }) - - // fired when a client connects - broker.on('client', function (client) { - console.log('Client Connected: \x1b[33m' + (client ? client.id : client) + ' ip ' + (client ? client.ip : null) + '\x1b[0m', 'to broker', broker.id) - }) - - // fired when a client disconnects - broker.on('clientDisconnect', function (client) { - console.log('Client Disconnected: \x1b[31m' + (client ? client.id : client) + '\x1b[0m', 'to broker', broker.id) - }) - - // fired when a message is published - broker.on('publish', async function (packet, client) { - console.log('Client \x1b[31m' + (client ? client.id : 'BROKER_' + broker.id) + '\x1b[0m has published', packet.payload.toString(), 'on', packet.topic, 'to broker', broker.id) - }) -} - -startAedes() diff --git a/examples/proxy/package.json b/examples/proxy/package.json deleted file mode 100644 index af993cb1..00000000 --- a/examples/proxy/package.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "name": "aedes_proxy", - "version": "1.0.0", - "description": "Testing Aedes Broker behing proxy", - "main": "index.js", - "scripts": { - "test": "echo \"Error: no test specified\" && exit 1" - }, - "author": "getlarge", - "license": "MIT", - "dependencies": { - "aedes": "^0.42.0", - "mqemitter": "^3.0.0", - "mqtt-packet": "^6.3.0", - "proxy-protocol-js": "^4.0.3" - } -} diff --git a/lib/client.js b/lib/client.js index 93646c02..7e9f1e30 100644 --- a/lib/client.js +++ b/lib/client.js @@ -43,7 +43,7 @@ function Client (broker, conn, req) { this._nextId = Math.ceil(Math.random() * 65535) this.req = req - this.connDetails = null + this.connDetails = req ? req.connDetails : null // we use two variables for the will // because we store in _will while @@ -73,14 +73,7 @@ function Client (broker, conn, req) { if (that._parsingBatch <= 0) { that._parsingBatch = 0 var buf = client.conn.read(null) - if (!client.connackSent && client.broker.decodeProtocol && client.broker.trustProxy && buf) { - const { data } = client.broker.decodeProtocol(client, buf) - if (data) { - client._parser.parse(data) - } else { - client._parser.parse(buf) - } - } else if (buf) { + if (buf) { client._parser.parse(buf) } } diff --git a/package.json b/package.json index 0a93458b..5cc26b32 100644 --- a/package.json +++ b/package.json @@ -108,7 +108,6 @@ "dependencies": { "aedes-packet": "^2.3.1", "aedes-persistence": "^8.1.1", - "aedes-protocol-decoder": "^1.0.0", "bulk-write-stream": "^2.0.1", "end-of-stream": "^1.4.4", "fastfall": "^1.5.1", @@ -116,7 +115,6 @@ "fastseries": "^2.0.0", "mqemitter": "^4.2.0", "mqtt-packet": "^6.3.2", - "proxy-protocol-js": "^4.0.4", "readable-stream": "^3.6.0", "retimer": "^2.0.0", "reusify": "^1.0.4", diff --git a/test/connect.js b/test/connect.js index fe662c07..aeee265c 100644 --- a/test/connect.js +++ b/test/connect.js @@ -4,10 +4,6 @@ const { test } = require('tap') const http = require('http') const ws = require('websocket-stream') const mqtt = require('mqtt') -const mqttPacket = require('mqtt-packet') -const net = require('net') -const proxyProtocol = require('proxy-protocol-js') -const { protocolDecoder } = require('aedes-protocol-decoder') const { setup, connect, delay } = require('./helper') const aedes = require('../') @@ -730,342 +726,3 @@ test('websocket clients have access to the request object', function (t) { server.close() }) }) - -// test ipAddress property presence when trustProxy is enabled -test('tcp clients have access to the ipAddress from the socket', function (t) { - t.plan(2) - - const port = 4883 - const broker = aedes({ - preConnect: function (client, packet, done) { - if (client && client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal('::ffff:127.0.0.1', client.ip) - } else { - t.fail('no ip address present') - } - done(null, true) - }, - decodeProtocol: protocolDecoder, - trustProxy: true - }) - - const server = net.createServer(broker.handle) - server.listen(port, function (err) { - t.error(err, 'no error') - }) - - const client = mqtt.connect({ - port, - keepalive: 0, - clientId: 'mqtt-client', - clean: false - }) - - t.tearDown(() => { - client.end(true) - broker.close() - server.close() - }) -}) - -test('tcp proxied (protocol v1) clients have access to the ipAddress(v4)', function (t) { - t.plan(2) - - const port = 4883 - const clientIp = '192.168.0.140' - const packet = { - cmd: 'connect', - protocolId: 'MQIsdp', - protocolVersion: 3, - clean: true, - clientId: 'my-client-proxyV1', - keepalive: 0 - } - - const buf = mqttPacket.generate(packet) - const src = new proxyProtocol.Peer(clientIp, 12345) - const dst = new proxyProtocol.Peer('127.0.0.1', port) - const protocol = new proxyProtocol.V1BinaryProxyProtocol( - proxyProtocol.INETProtocol.TCP4, - src, - dst, - buf - ).build() - - const broker = aedes({ - preConnect: function (client, packet, done) { - if (client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal(clientIp, client.ip) - } else { - t.fail('no ip address present') - } - done(null, true) - }, - decodeProtocol: protocolDecoder, - trustProxy: true - }) - - const server = net.createServer(broker.handle) - server.listen(port, function (err) { - t.error(err, 'no error') - }) - - const client = net.connect({ - port, - timeout: 0 - }, function () { - client.write(protocol) - }) - - t.tearDown(() => { - client.end() - broker.close() - server.close() - }) -}) - -test('tcp proxied (protocol v2) clients have access to the ipAddress(v4)', function (t) { - t.plan(2) - - const port = 4883 - const clientIp = '192.168.0.140' - const packet = { - cmd: 'connect', - protocolId: 'MQTT', - protocolVersion: 4, - clean: true, - clientId: 'my-client-proxyV2' - } - - const protocol = new proxyProtocol.V2ProxyProtocol( - proxyProtocol.Command.LOCAL, - proxyProtocol.TransportProtocol.DGRAM, - new proxyProtocol.IPv4ProxyAddress( - proxyProtocol.IPv4Address.createFrom(clientIp.split('.')), - 12345, - proxyProtocol.IPv4Address.createFrom([127, 0, 0, 1]), - port - ), - mqttPacket.generate(packet) - ).build() - - const broker = aedes({ - preConnect: function (client, packet, done) { - if (client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal(clientIp, client.ip) - } else { - t.fail('no ip address present') - } - done(null, true) - }, - decodeProtocol: protocolDecoder, - trustProxy: true - }) - - const server = net.createServer(broker.handle) - server.listen(port, function (err) { - t.error(err, 'no error') - }) - - const client = net.createConnection( - { - port, - timeout: 0 - }, function () { - client.write(Buffer.from(protocol)) - } - ) - - t.tearDown(() => { - client.end() - broker.close() - server.close() - }) -}) - -test('tcp proxied (protocol v2) clients have access to the ipAddress(v6)', function (t) { - t.plan(2) - - const port = 4883 - const clientIpArray = [0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 192, 168, 1, 128] - const clientIp = '::ffff:c0a8:180:' - const packet = { - cmd: 'connect', - protocolId: 'MQTT', - protocolVersion: 4, - clean: true, - clientId: 'my-client-proxyV2' - } - - const protocol = new proxyProtocol.V2ProxyProtocol( - proxyProtocol.Command.PROXY, - proxyProtocol.TransportProtocol.STREAM, - new proxyProtocol.IPv6ProxyAddress( - proxyProtocol.IPv6Address.createFrom(clientIpArray), - 12345, - proxyProtocol.IPv6Address.createWithEmptyAddress(), - port - ), - mqttPacket.generate(packet) - ).build() - - const broker = aedes({ - preConnect: function (client, packet, done) { - if (client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal(clientIp, client.ip) - } else { - t.fail('no ip address present') - } - done(null, true) - }, - decodeProtocol: protocolDecoder, - trustProxy: true - }) - - const server = net.createServer(broker.handle) - server.listen(port, function (err) { - t.error(err, 'no error') - }) - - const client = net.createConnection( - { - port, - timeout: 0 - }, function () { - client.write(Buffer.from(protocol)) - } - ) - - t.tearDown(() => { - client.end() - broker.close() - server.close() - }) -}) - -test('websocket clients have access to the ipAddress from the socket (if no ip header)', function (t) { - t.plan(2) - - const clientIp = '::ffff:127.0.0.1' - const port = 4883 - const broker = aedes({ - preConnect: function (client, packet, done) { - if (client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal(clientIp, client.ip) - } else { - t.fail('no ip address present') - } - done(null, true) - }, - decodeProtocol: protocolDecoder, - trustProxy: true - }) - - const server = http.createServer() - ws.createServer({ - server: server - }, broker.handle) - - server.listen(port, function (err) { - t.error(err, 'no error') - }) - - const client = mqtt.connect(`ws://localhost:${port}`) - - t.tearDown(() => { - client.end(true) - broker.close() - server.close() - }) -}) - -test('websocket proxied clients have access to the ipAddress from x-real-ip header', function (t) { - t.plan(2) - - const clientIp = '192.168.0.140' - const port = 4883 - const broker = aedes({ - preConnect: function (client, packet, done) { - if (client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal(clientIp, client.ip) - } else { - t.fail('no ip address present') - } - done(null, true) - }, - decodeProtocol: protocolDecoder, - trustProxy: true - }) - - const server = http.createServer() - ws.createServer({ - server: server - }, broker.handle) - - server.listen(port, function (err) { - t.error(err, 'no error') - }) - - const client = mqtt.connect(`ws://localhost:${port}`, { - wsOptions: { - headers: { - 'X-Real-Ip': clientIp - } - } - }) - - t.tearDown(() => { - client.end(true) - broker.close() - server.close() - }) -}) - -test('websocket proxied clients have access to the ipAddress from x-forwarded-for header', function (t) { - t.plan(2) - - const clientIp = '192.168.0.140' - const port = 4883 - const broker = aedes({ - preConnect: function (client, packet, done) { - if (client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal(clientIp, client.ip) - } else { - t.fail('no ip address present') - } - done(null, true) - }, - decodeProtocol: protocolDecoder, - trustProxy: true - }) - - const server = http.createServer() - ws.createServer({ - server: server - }, broker.handle) - - server.listen(port, function (err) { - t.error(err, 'no error') - }) - - const client = mqtt.connect(`ws://localhost:${port}`, { - wsOptions: { - headers: { - 'X-Forwarded-For': clientIp - } - } - }) - - t.tearDown(() => { - client.end(true) - broker.close() - server.close() - }) -})