diff --git a/.gitignore b/.gitignore index 6704566..cab290e 100644 --- a/.gitignore +++ b/.gitignore @@ -102,3 +102,5 @@ dist # TernJS port file .tern-port + +package-lock.json diff --git a/README.md b/README.md index 03e3950..5968ef5 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,175 @@ # aedes-server-factory -Aedes server factory. Supports tcp, tls, http, https, http2, ws, wss and proxy decoders + +Aedes server factory. Supports TCP, HTTP, HTTP2, WS, and PROXY decoders. + +Work In Progress: TLS, HTTPS, HTTP2, WSS. + +## Install + +To install aedes-server-factory : + +```sh +npm install aedes-server-factory +``` + +## Usage + +1 - Per default, if no options are set, `createServer` will return a TCP server instance, from which the stream will be bound to [aedes] (via `aedes.handle`). + +2 - To bind a Websocket stream to Aedes, set `ws` to true, and eventually configure the HTTP server instance by using one of `http` or `http2` options. + +Additionaly, you can add `https` option to create a secure HTTP | HTTP2 server. + +`createServer` will then return an HTTP | HTTP2 server instance. + +## Options + +### tcp + +[`Object`](https://nodejs.org/api/net.html#net_net_createserver_options_connectionlistener) used to create and configure a TCP server. + +### tls + +[`Object`](https://nodejs.org/api/tls.html#tls_tls_createserver_options_secureconnectionlistener) used to create and configure a TCP over TLS server. + +Default to null, if defined, will be used to create TCP secure server. + +### ws + +boolean used to wrap a http | http2 | https server in a websocket server. +[`Object`](https://nodejs.org/api/http.html#http_http_createserver_options_requestlistener) used to create and configure HTTP server. + +`createServer` will return the HTTP server if no HTTP options is defined. + +### http2 + +[`Object`](https://nodejs.org/api/http2.html#http2_http2_createserver_options_onrequesthandler) used to create and configure HTTP2 server. + +Default to null, if defined, will be used to create HTTP2 server instance + +### https + +https://nodejs.org/api/http2.html#http2_http2_createsecureserver_options_onrequesthandler + +[`Object`](https://nodejs.org/api/https.html#https_https_createserver_options_requestlistener) used to create and configure HTTPS server, can be used in combination of HTTP | HTTP2 option. + +Default to null, if defined, will be used to create HTTP | HTTP2 secure server. + +### trustProxy + +`boolean` to indicates that `aedes-server-factory` should retrieve information from the Proxy server ( HTTP headers and/or Proxy protocol header ). + +Default to false. + + +### extractSocketDetails (socket) + +- socket: [``] + +Invoked when `options.trustProxy` is `false` | `undefined`. + +[aedes-protocol-decoder] `extractSocketDetails` function is used as default value. + +```js +const aedes = require('aedes')(); +const { createServer } = require('aedes-server-factory'); +const yourDecoder = require('./path-to-your-decoder'); + +const options = { trustProxy: false }; +options.extractSocketDetails = function (socket) { + return yourDecoder(socket); +}; + +const server = createServer(aedes, options); +``` + +### protocolDecoder (conn, buffer, req) + +- conn: `` +- buffer: `` +- req?: `` + +Invoked when `options.trustProxy` is `true`. + +[aedes-protocol-decoder] `protocolDecoder` function is used as default value. + +```js +const aedes = require('aedes')(); +const { createServer } = require('aedes-server-factory'); +const yourDecoder = require('./path-to-your-decoder'); + +const options = { trustProxy: true }; +options.protocolDecoder = function (conn, buffer) { + return yourDecoder(conn, buffer); +}; + +const server = createServer(aedes, options); +``` + +### serverFactory (aedes, options) + +- aedes: [``](https://github.com/moscajs/aedes/blob/master/docs/Aedes.md) +- options: `` + +Use to override `createServer` behavior and create your own server instance. + +```js +const aedes = require('aedes')(); +const { createServer } = require('aedes-server-factory'); + +const options = { trustProxy: false }; +options.serverFactory = function (aedes, options) { + return net.createServer((conn) => { + aedes.handle(conn); + }); +}; + +const server = createServer(aedes, options); +``` + +## Examples + +### TCP server + +```js +const aedes = require('aedes')(); +const server = require('aedes-server-factory').createServer(aedes); + +const port = 1883; + +server.listen(port, function () { + console.log('server started and listening on port ', port); +}); +``` + +### TCP server behind proxy + +```js +const aedes = require('aedes')(); +const server = require('aedes-server-factory').createServer(aedes, { + trustProxy: true, +}); + +const port = 1883; + +server.listen(port, function () { + console.log('server started and listening on port ', port); +}); +``` + +### HTTP server for WS + +```js +const aedes = require('aedes')(); +const server = require('aedes-server-factory').createServer(aedes, { + ws: true, +}); +const port = 1883; + +server.listen(port, function () { + console.log('server started and listening on port ', port); +}); +``` + +[aedes]: https://www.npmjs.com/aedes +[aedes-protocol-decoder]: https://www.npmjs.com/aedes-protocol-decoder diff --git a/example.js b/example.js new file mode 100644 index 0000000..8bb9ba4 --- /dev/null +++ b/example.js @@ -0,0 +1,298 @@ +'use strict' + +var aedes = require('aedes') +var mqtt = require('mqtt') +var mqttPacket = require('mqtt-packet') +var net = require('net') +var proxyProtocol = require('proxy-protocol-js') +var { createServer } = require('./index') + +var brokerPort = 4883 +var wsBrokerPort = 4884 +var messageId = 1 + +// from https://stackoverflow.com/questions/57077161/how-do-i-convert-hex-buffer-to-ipv6-in-javascript +function parseIpV6 (ip) { + return ip + .match(/.{1,4}/g) + .map((val) => val.replace(/^0+/, '')) + .join(':') + .replace(/0000:/g, ':') + .replace(/:{2,}/g, '::') +} + +function generateProxyPacket (version = 1, ipFamily = 4, serverPort, packet) { + var hostIpV4 = '0.0.0.0' + var clientIpV4 = '192.168.1.128' + var hostIpV6 = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + var clientIpV6 = [0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 192, 168, 1, 128] + + var proxyPacket + if (version === 1) { + if (ipFamily === 4) { + proxyPacket = new proxyProtocol.V1BinaryProxyProtocol( + proxyProtocol.INETProtocol.TCP4, + new proxyProtocol.Peer(clientIpV4, 12345), + new proxyProtocol.Peer(hostIpV4, serverPort), + mqttPacket.generate(packet) + ).build() + } else if (ipFamily === 6) { + proxyPacket = new proxyProtocol.V1BinaryProxyProtocol( + proxyProtocol.INETProtocol.TCP6, + new proxyProtocol.Peer( + parseIpV6(Buffer.from(clientIpV6).toString('hex')), + 12345 + ), + new proxyProtocol.Peer( + parseIpV6(Buffer.from(hostIpV6).toString('hex')), + serverPort + ), + mqttPacket.generate(packet) + ).build() + } + } else if (version === 2) { + if (ipFamily === 4) { + proxyPacket = new proxyProtocol.V2ProxyProtocol( + proxyProtocol.Command.LOCAL, + proxyProtocol.TransportProtocol.STREAM, + new proxyProtocol.IPv4ProxyAddress( + proxyProtocol.IPv4Address.createFrom(clientIpV4.split('.')), + 12346, + proxyProtocol.IPv4Address.createFrom(hostIpV4.split('.')), + serverPort + ), + mqttPacket.generate(packet) + ).build() + } else if (ipFamily === 6) { + proxyPacket = new proxyProtocol.V2ProxyProtocol( + proxyProtocol.Command.PROXY, + proxyProtocol.TransportProtocol.STREAM, + new proxyProtocol.IPv6ProxyAddress( + proxyProtocol.IPv6Address.createFrom(clientIpV6), + 12346, + proxyProtocol.IPv6Address.createFrom(hostIpV6), + serverPort + ), + mqttPacket.generate(packet) + ).build() + } + } + return proxyPacket +} + +function sendPackets (conn) { + setTimeout(function () { + conn.write(mqttPacket.generate({ + cmd: 'subscribe', + messageId: messageId += 1, + subscriptions: [{ + topic: 'test', + qos: 0 + }] + })) + }, 150) + + setTimeout(function () { + conn.write(mqttPacket.generate({ + cmd: 'publish', + messageId: messageId += 1, + retain: false, + qos: 0, + dup: false, + length: 10, + topic: 'test', + payload: 'test' + })) + }, 300) + + setTimeout(function () { + conn.end(mqttPacket.generate({ + cmd: 'disconnect', + protocolId: 'MQTT', + protocolVersion: 4 + })) + }, 450) +} + +function sendProxyPacket (version = 1, ipFamily = 4, serverPort) { + var packet = { + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: `tcp-proxy-client-${version}-${ipFamily}`, + keepalive: 0 + } + + var proxyPacket = generateProxyPacket(version, ipFamily, serverPort, packet) + var parsedProto = + version === 1 + ? proxyProtocol.V1BinaryProxyProtocol.parse(proxyPacket) + : proxyProtocol.V2ProxyProtocol.parse(proxyPacket) + // console.log(parsedProto) + + var dstPort = + version === 1 + ? parsedProto.destination.port + : parsedProto.proxyAddress.destinationPort + + var dstHost + if (version === 1) { + if (ipFamily === 4) { + dstHost = parsedProto.destination.ipAddress + } else if (ipFamily === 6) { + dstHost = parsedProto.destination.ipAddress + } + } else if (version === 2) { + if (ipFamily === 4) { + dstHost = parsedProto.proxyAddress.destinationAddress.address.join('.') + } else if (ipFamily === 6) { + dstHost = parseIpV6( + Buffer.from( + parsedProto.proxyAddress.destinationAddress.address + ).toString('hex') + ) + } + } + + console.log('Connection to :', dstHost, dstPort) + var mqttConn = net.createConnection({ + port: dstPort, + host: dstHost + }, function () { + this.write(proxyPacket) + }) + + sendPackets(mqttConn) +} + +function sendTcpPacket (serverPort) { + var packet = { + cmd: 'connect', + protocolId: 'MQIsdp', + protocolVersion: 3, + clean: true, + clientId: 'tcp-client', + keepalive: 0 + } + + console.log('Connection to :', '0.0.0.0', serverPort) + + var tcpConn = net.createConnection({ + port: serverPort, + host: '0.0.0.0' + }, function () { + this.write(mqttPacket.generate(packet)) + }) + + sendPackets(tcpConn) +} + +function sendWsPacket (serverPort) { + var clientIpV4 = '192.168.1.128' + var client = mqtt.connect(`ws://localhost:${serverPort}`, { + clientId: 'ws-client', + wsOptions: { + headers: { + 'X-Real-Ip': clientIpV4 + } + } + }) + + setTimeout(() => client.subscribe('test'), 150) + setTimeout(() => client.publish('test', 'test'), 300) + setTimeout(() => client.end(true), 450) +} + +function startAedes () { + var delay = 500 + + var broker = aedes({ + preConnect: function (client, packet, done) { + // console.log('Aedes preConnect : ', { connDetails: client.connDetails, packet }) + client.ip = client.connDetails.ipAddress + done(null, true) + } + }) + + var server = createServer(broker, { trustProxy: true }) + var httpServer = createServer(broker, { trustProxy: true, ws: true, http: null }) + + server.listen(brokerPort, function () { + console.log('Aedes listening on TCP :', server.address()) + setTimeout(() => sendProxyPacket(1, 4, brokerPort), delay) + setTimeout(() => sendProxyPacket(1, 6, brokerPort), delay * 2) + setTimeout(() => sendProxyPacket(2, 4, brokerPort), delay * 3) + setTimeout(() => sendProxyPacket(2, 6, brokerPort), delay * 4) + setTimeout(() => sendTcpPacket(brokerPort), delay * 5) + }) + + httpServer.listen(wsBrokerPort, function () { + console.log('Aedes listening on HTTP :', httpServer.address()) + setTimeout(() => sendWsPacket(wsBrokerPort), delay * 6) + }) + + broker.on('subscribe', function (subscriptions, client) { + console.log( + 'MQTT client \x1b[32m' + + (client ? client.id : client) + + '\x1b[0m subscribed to topics: ' + + subscriptions.map((s) => s.topic).join('\n'), + 'from broker', + broker.id + ) + }) + + broker.on('unsubscribe', function (subscriptions, client) { + console.log( + 'MQTT client \x1b[32m' + + (client ? client.id : client) + + '\x1b[0m unsubscribed to topics: ' + + subscriptions.join('\n'), + 'from broker', + broker.id + ) + }) + + // fired when a client connects + broker.on('client', function (client) { + console.log( + 'Client Connected: \x1b[33m' + + (client ? client.id : client) + + ' ip ' + + (client ? client.ip : null) + + '\x1b[0m', + 'to broker', + broker.id + ) + }) + + // fired when a client disconnects + broker.on('clientDisconnect', function (client) { + console.log( + 'Client Disconnected: \x1b[31m' + + (client ? client.id : client) + + '\x1b[0m', + 'to broker', + broker.id + ) + }) + + // fired when a message is published + broker.on('publish', function (packet, client) { + console.log( + 'Client \x1b[31m' + + (client ? client.id : 'BROKER_' + broker.id) + + '\x1b[0m has published', + packet.payload.toString(), + 'on', + packet.topic, + 'to broker', + broker.id + ) + }) +} + +(function () { + startAedes() +})() diff --git a/index.js b/index.js new file mode 100644 index 0000000..d5f3987 --- /dev/null +++ b/index.js @@ -0,0 +1,113 @@ +'use strict' + +const { + extractSocketDetails, + protocolDecoder +} = require('aedes-protocol-decoder') +const http = require('http') +const https = require('https') +const http2 = require('http2') +const net = require('net') +const tls = require('tls') +const WebSocket = require('ws') + +const defaultOptions = { + ws: null, + http: null, + https: null, + http2: null, + tls: null, + trustProxy: false, + serverFactory: null, + protocolDecoder: protocolDecoder, + extractSocketDetails: extractSocketDetails +} + +const createServer = (aedes, options) => { + if (!aedes || !aedes.handle) { + throw new Error('Missing aedes handler') + } + + options = Object.assign({}, defaultOptions, options) + + let server = null + if (options.serverFactory) { + server = options.serverFactory(aedes, options) + } else if (options.tls) { + server = tls.createServer(options.tls, (conn) => { + bindConnection(aedes, options, conn) + }) + } else if (options.ws) { + if (options.https) { + if (options.http2) { + server = http2.createSecureServer({ + ...options.http2, + ...options.https + }) + } else { + server = https.createServer({ + ...(options.http || {}), + ...options.https + }) + } + } else { + if (options.http2) { + server = http2.createServer(options.http2) + } else { + server = http.createServer(options.http || {}) + } + } + const ws = new WebSocket.Server({ server }) + ws.on('connection', (conn, req) => { + const stream = WebSocket.createWebSocketStream(conn) + // the _socket object is needed in bindConnection to retrieve info from the stream + // before passing it to aedes.handle + stream._socket = conn._socket + bindConnection(aedes, options, stream, req) + }) + } else { + server = net.createServer((conn) => { + bindConnection(aedes, options, conn) + }) + } + return server +} + +const bindConnection = (aedes, options, conn, req = {}) => { + if (options.trustProxy) { + extractConnectionDetails(aedes, options, conn, req) + } else { + req.connDetails = options.extractSocketDetails(conn.socket || conn) + aedes.handle(conn, req) + } +} + +const extractConnectionDetails = (aedes, options, conn, req = {}) => { + const onReadable = () => { + // buffer should contain the whole proxy header if any + // see https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt + const buffer = conn.read(null) + if (buffer) { + const protocol = options.protocolDecoder(conn, buffer, req) + req.connDetails = protocol + conn.removeListener('readable', onReadable) + conn.removeListener('error', onError) + conn.pause() + conn.unshift(protocol.data || buffer) + aedes.handle(conn, req) + } + } + + const onError = (error) => { + conn.removeListener('readable', onReadable) + conn.removeListener('error', onError) + aedes.emit('error', error) + } + + conn.on('readable', onReadable) + conn.on('error', onError) +} + +module.exports = { + createServer +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..29fa630 --- /dev/null +++ b/package.json @@ -0,0 +1,77 @@ +{ + "name": "aedes-server-factory", + "version": "0.0.1", + "description": "Aedes helper to create a server and bind its connection to Aedes", + "main": "index.js", + "types": "types/index.d.ts", + "scripts": { + "lint": "npm run lint:standard && npm run lint:typescript", + "lint:standard": "standard --verbose | snazzy", + "lint:typescript": "standard --parser @typescript-eslint/parser --plugin @typescript-eslint/eslint-plugin types/**/*.d.ts", + "unit": "tape test.js | faucet", + "test": "npm run lint && npm run unit", + "coverage": "nyc --reporter=lcov tape test.js", + "test:ci": "npm run lint && npm run coverage", + "license-checker": "license-checker --production --onlyAllow='MIT;ISC;BSD-3-Clause;BSD-2-Clause'", + "release": "read -p 'GITHUB_TOKEN: ' GITHUB_TOKEN && export GITHUB_TOKEN=$GITHUB_TOKEN && release-it" + }, + "pre-commit": [ + "test" + ], + "repository": { + "type": "git", + "url": "https://github.com/moscajs/aedes-server-factory.git" + }, + "bugs": { + "url": "http://github.com/moscajs/aedes-server-factory/issues" + }, + "engines": { + "node": ">=10" + }, + "release-it": { + "github": { + "release": true + }, + "git": { + "tagName": "v${version}" + }, + "hooks": { + "before:init": [ + "npm run test" + ] + }, + "npm": { + "publish": true + } + }, + "keywords": [ + "mqtt", + "proxy", + "server", + "http", + "tcp", + "ws" + ], + "license": "MIT", + "devDependencies": { + "@types/node": "^14.0.1", + "@typescript-eslint/eslint-plugin": "^2.30.0", + "@typescript-eslint/parser": "^2.30.0", + "aedes": "^0.44.0", + "faucet": "0.0.1", + "license-checker": "^25.0.1", + "mqtt": "^4.0.0", + "mqtt-packet": "^6.5.0", + "nyc": "^15.0.0", + "pre-commit": "^1.2.2", + "release-it": "^14.0.2", + "snazzy": "^8.0.0", + "standard": "^14.3.3", + "tape": "^4.13.0", + "typescript": "^4.0.2" + }, + "dependencies": { + "aedes-protocol-decoder": "^2.0.0", + "ws": "^7.3.1" + } +} diff --git a/test.js b/test.js new file mode 100644 index 0000000..0d7262a --- /dev/null +++ b/test.js @@ -0,0 +1,182 @@ +'use strict' + +var test = require('tape').test +var aedes = require('aedes') +var mqtt = require('mqtt') +var mqttPacket = require('mqtt-packet') +var net = require('net') +var proxyProtocol = require('proxy-protocol-js') +var { createServer } = require('./index') + +test('tcp clients have access to the connection details from the socket', function (t) { + t.plan(3) + + var port = 4883 + var broker = aedes({ + preConnect: function (client, packet, done) { + if (client && client.connDetails && client.connDetails.ipAddress) { + client.ip = client.connDetails.ipAddress + t.equal('::ffff:127.0.0.1', client.ip) + t.equal(packet.cmd, 'connect') + } else { + t.fail('no ip address present') + } + done(null, true) + setImmediate(finish) + } + }) + + var server = createServer(broker, { trustProxy: false }) + server.listen(port, function (err) { + t.error(err, 'no error') + }) + + var client = mqtt.connect({ + port, + keepalive: 0, + clientId: 'mqtt-client', + clean: false + }) + + function finish () { + client.end() + broker.close() + server.close() + t.end() + } +}) + +test('tcp proxied clients have access to the connection details from the proxy header', function (t) { + t.plan(3) + + var port = 4883 + var clientIp = '192.168.0.140' + var connectPacket = { + cmd: 'connect', + protocolId: 'MQIsdp', + protocolVersion: 3, + clean: true, + clientId: 'my-client-proxyV1', + keepalive: 0 + } + + var buf = mqttPacket.generate(connectPacket) + var src = new proxyProtocol.Peer(clientIp, 12345) + var dst = new proxyProtocol.Peer('127.0.0.1', port) + var protocol = new proxyProtocol.V1BinaryProxyProtocol( + proxyProtocol.INETProtocol.TCP4, + src, + dst, + buf + ).build() + + var broker = aedes({ + preConnect: function (client, packet, done) { + if (client.connDetails && client.connDetails.ipAddress) { + client.ip = client.connDetails.ipAddress + t.equal(clientIp, client.ip) + t.equal(packet.cmd, 'connect') + } else { + t.fail('no ip address present') + } + done(null, true) + setImmediate(finish) + } + }) + + var server = createServer(broker, { trustProxy: true }) + server.listen(port, function (err) { + t.error(err, 'no error') + }) + + var client = net.connect( + { + port, + timeout: 0 + }, + function () { + client.write(protocol) + } + ) + + function finish () { + client.end() + broker.close() + server.close() + t.end() + } +}) + +test('websocket clients have access to the connection details from the socket', function (t) { + t.plan(3) + + var clientIp = '::ffff:127.0.0.1' + var port = 4883 + var broker = aedes({ + preConnect: function (client, packet, done) { + if (client.connDetails && client.connDetails.ipAddress) { + client.ip = client.connDetails.ipAddress + t.equal(clientIp, client.ip) + t.equal(packet.cmd, 'connect') + } else { + t.fail('no ip address present') + } + done(null, true) + setImmediate(finish) + } + }) + + var server = createServer(broker, { trustProxy: false, ws: true }) + server.listen(port, function (err) { + t.error(err, 'no error') + }) + + var client = mqtt.connect(`ws://localhost:${port}`) + + function finish () { + client.end(true) + broker.close() + server.close() + t.end() + } +}) + +test('websocket proxied clients have access to the connection details', function (t) { + t.plan(3) + + var clientIp = '192.168.0.140' + var port = 4883 + var broker = aedes({ + preConnect: function (client, packet, done) { + if (client.connDetails && client.connDetails.ipAddress) { + client.ip = client.connDetails.ipAddress + t.equal(clientIp, client.ip) + t.equal(packet.cmd, 'connect') + } else { + t.fail('no ip address present') + } + done(null, true) + setImmediate(finish) + } + }) + + var server = createServer(broker, { trustProxy: true, ws: true }) + server.listen(port, function (err) { + t.error(err, 'no error') + }) + + var client = mqtt.connect(`ws://localhost:${port}`, { + wsOptions: { + headers: { + 'X-Real-Ip': clientIp + } + } + }) + + function finish () { + client.end(true) + broker.close() + server.close() + t.end() + } +}) diff --git a/types/index.d.ts b/types/index.d.ts new file mode 100644 index 0000000..909be5f --- /dev/null +++ b/types/index.d.ts @@ -0,0 +1,38 @@ +/* eslint no-unused-vars: 0 */ +/* eslint no-undef: 0 */ +/* eslint space-infix-ops: 0 */ + +/// + +import { ProtocolDecoder, ExtractSocketDetails } from 'aedes-protocol-decoder' +import { Server as HttpServer, ServerOptions as HttpServerOptions } from 'http' +import { ServerOptions as HttpSecureServerOptions } from 'https' +import { Server as NetServer } from 'net' +import { + Http2Server, + Http2SecureServer, + ServerOptions as Http2ServerOptions, + SecureServerOptions as Http2SecureServerOptions +} from 'http2' +import { Aedes } from 'aedes' +import { TlsOptions } from 'tls' + +export interface ServerFactoryOptions { + ws?: boolean; + http?: HttpServerOptions; + https?: HttpSecureServerOptions | Http2SecureServerOptions; + http2?: Http2ServerOptions; + tls?: TlsOptions; + tcp?: { allowHalfOpen?: boolean; pauseOnConnect?: boolean }; + serverFactory?: ServerFactory; + protocolDecoder?: ProtocolDecoder; + extractSocketDetails?: ExtractSocketDetails; + trustProxy?: boolean; +} + +type Server = NetServer | HttpServer | Http2Server | Http2SecureServer; + +export type ServerFactory = ( + broker: Aedes, + options: ServerFactoryOptions +) => Server;