diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 35fa31f..3e81d86 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,10 +4,11 @@ on: [push, pull_request] jobs: test: + timeout-minutes: 5 runs-on: ubuntu-latest strategy: matrix: - node-version: [8.x, 10.x, 12.x, 13.x] + node-version: [10.x, 12.x, 14.x] steps: - uses: actions/checkout@v1 @@ -29,7 +30,7 @@ jobs: - name: Coveralls Parallel uses: coverallsapp/github-action@master with: - github-token: ${{ secrets.github_token }} + github-token: ${{ secrets.GITHUB_TOKEN }} parallel: true coverage: @@ -40,4 +41,4 @@ jobs: uses: coverallsapp/github-action@master with: github-token: ${{ secrets.GITHUB_TOKEN }} - parallel-finished: true \ No newline at end of file + parallel-finished: true diff --git a/README.md b/README.md index d79eba7..2d612a8 100644 --- a/README.md +++ b/README.md @@ -14,10 +14,10 @@ Protocol decoder for Aedes MQTT Broker -The purpose of this module is to be used inside [aedes](https://github.com/moscajs/aedes) `decodeProtocol` hook, which is called when aedes instance receives a first valid buffer from client ( before CONNECT packet). The client object state is in default and its connected state is false. -The function extract socket details and if aedes `trustProxy` option is set to true, it will first parse http headers (x-real-ip | x-forwarded-for) and proxy protocol (v1 and v2) to retrieve information in client.connDetails. +The purpose of this module is to be used inside [aedes-server-factory](https://github.com/moscajs/aedes-server-factory) `bindConnection` function, which is called when the server receives a connection from client (before CONNECT packet). The client object state is in default and its connected state is false. +The function extract socket details and if `aedes-server-factory` `trustProxy` option is set to true, it will first parse http headers (x-real-ip | x-forwarded-for) and/or proxy protocol (v1 and v2), then passing the informations to `aedes` that will assign them to `client.connDetails`. -The function `protocolDecoder` returns [ConnectionDetails](./types/index.d.ts), if the object contains data property, it will be parsed as an mqtt-packet. +The function `protocolDecoder` and `extractSocketDetails` returns [ConnectionDetails](./types/index.d.ts), if the object contains `data` property, it will be parsed as an [mqtt-packet](https://github.com/mqttjs/mqtt-packet). ## Install @@ -30,30 +30,22 @@ npm install aedes-protocol-decoder --save ```js var aedes = require('aedes') var { protocolDecoder } = require('aedes-protocol-decoder') -var net = require('net') +var { createServer } = require('aedes-server-factory') var port = 1883 var broker = aedes({ - decodeProtocol: function (client, buffer) { - var proto = protocolDecoder(client, buffer) - return proto - }, - preConnect: function (client, done) { + preConnect: function (client, packet, done) { if (client.connDetails && client.connDetails.ipAddress) { client.ip = client.connDetails.ipAddress } return done(null, true) }, - trustProxy: true }) -var server = net.createServer(broker.handle) - +var server = createServer(broker, { trustProxy: true, protocolDecoder }) server.listen(port, function () { console.log('server listening on port', port) }) - - ``` ## License diff --git a/example.js b/example.js index c23cf39..49dfd25 100644 --- a/example.js +++ b/example.js @@ -4,7 +4,8 @@ var aedes = require('aedes') var mqttPacket = require('mqtt-packet') var net = require('net') var proxyProtocol = require('proxy-protocol-js') -var protocolDecoder = require('./lib/protocol-decoder') +var { createServer } = require('aedes-server-factory') +var { extractSocketDetails, protocolDecoder } = require('./index') var brokerPort = 4883 @@ -119,22 +120,17 @@ function sendProxyPacket (version = 1, ipFamily = 4) { function startAedes () { var broker = aedes({ - decodeProtocol: function (client, buffer) { - var proto = protocolDecoder(client, buffer) - return proto - }, - preConnect: function (client, done) { + preConnect: function (client, packet, done) { console.log('Aedes preConnect check client ip:', client.connDetails) if (client.connDetails && client.connDetails.ipAddress) { client.ip = client.connDetails.ipAddress } client.close() - return done(null, true) - }, - trustProxy: true + done(null, true) + } }) - var server = require('net').createServer(broker.handle) + var server = createServer(broker, { trustProxy: true, extractSocketDetails, protocolDecoder }) server.listen(brokerPort, function () { console.log('Aedes listening on :', server.address()) diff --git a/index.js b/index.js index c2d994d..c182adc 100644 --- a/index.js +++ b/index.js @@ -1,5 +1,132 @@ -var protocolDecoder = require('./lib/protocol-decoder') +'use strict' + +const proxyProtocol = require('proxy-protocol-js') +const forwarded = require('forwarded') + +const v1ProxyProtocolSignature = Buffer.from('PROXY ', 'utf8') +const v2ProxyProtocolSignature = Buffer.from('0d0a0d0a000d0a515549540a', 'hex') + +function isValidV1ProxyProtocol (buffer) { + for (let i = 0; i < v1ProxyProtocolSignature.length; i++) { + if (buffer[i] !== v1ProxyProtocolSignature[i]) { + return false + } + } + return true +} + +function isValidV2ProxyProtocol (buffer) { + for (let i = 0; i < v2ProxyProtocolSignature.length; i++) { + if (buffer[i] !== v2ProxyProtocolSignature[i]) { + return false + } + } + return true +} + +// from https://stackoverflow.com/questions/57077161/how-do-i-convert-hex-buffer-to-ipv6-in-javascript +function parseIpV6Array (ip) { + const ipHex = Buffer.from(ip).toString('hex') + return ipHex.match(/.{1,4}/g) + .map((val) => val.replace(/^0+/, '')) + .join(':') + .replace(/0000:/g, ':') + .replace(/:{2,}/g, '::') +} + +function getProtoIpFamily (ipFamily) { + if (ipFamily && ipFamily.endsWith('4')) { + return 4 + } else if (ipFamily && ipFamily.endsWith('6')) { + return 6 + } + return 0 +} + +function extractHttpDetails (req, socket, proto = {}) { + const headers = req && req.headers ? req.headers : null + if (headers) { + if (headers['x-forwarded-for']) { + const addresses = forwarded(req) + proto.ipAddress = headers['x-real-ip'] ? headers['x-real-ip'] : addresses[addresses.length - 1] + proto.serverIpAddress = addresses[0] + } + if (headers['x-real-ip']) { + proto.ipAddress = headers['x-real-ip'] + } + proto.port = socket._socket.remotePort + proto.ipFamily = getProtoIpFamily(socket._socket.remoteFamily) + proto.isWebsocket = true + } + return proto +} + +function extractProxyDetails (buffer, proto = {}) { + let proxyProto + if (isValidV1ProxyProtocol(buffer)) { + proxyProto = proxyProtocol.V1BinaryProxyProtocol.parse(buffer) + if (proxyProto && proxyProto.source && proxyProto.data) { + proto.ipFamily = getProtoIpFamily(proxyProto.inetProtocol) + proto.ipAddress = proxyProto.source.ipAddress + proto.port = proxyProto.source.port + proto.serverIpAddress = proxyProto.destination.ipAddress + proto.data = proxyProto.data + proto.isProxy = 1 + } + } else if (isValidV2ProxyProtocol(buffer)) { + proxyProto = proxyProtocol.V2ProxyProtocol.parse(buffer) + if (proxyProto && proxyProto.proxyAddress && proxyProto.data) { + if (proxyProto.proxyAddress instanceof proxyProtocol.IPv4ProxyAddress) { + proto.ipAddress = proxyProto.proxyAddress.sourceAddress.address.join('.') + proto.port = proxyProto.proxyAddress.sourcePort + proto.serverIpAddress = proxyProto.proxyAddress.destinationAddress.address.join('.') + proto.ipFamily = 4 + } else if (proxyProto.proxyAddress instanceof proxyProtocol.IPv6ProxyAddress) { + proto.ipAddress = parseIpV6Array(proxyProto.proxyAddress.sourceAddress.address) + proto.port = proxyProto.proxyAddress.sourcePort + proto.serverIpAddress = parseIpV6Array(proxyProto.proxyAddress.destinationAddress.address) + proto.ipFamily = 6 + } + proto.isProxy = 2 + proto.data = Buffer.isBuffer(proxyProto.data) ? proxyProto.data : Buffer.from(proxyProto.data) + } + } + return proto +} + +function extractSocketDetails (socket, proto = {}) { + if (socket._socket && socket._socket.address) { + proto.isWebsocket = true + proto.ipAddress = socket._socket.remoteAddress + proto.port = socket._socket.remotePort + proto.serverIpAddress = socket._socket.address().address + proto.ipFamily = getProtoIpFamily(socket._socket.remoteFamily) + } else if (socket.address) { + proto.ipAddress = socket.remoteAddress + proto.port = socket.remotePort + proto.serverIpAddress = socket.address().address + proto.ipFamily = getProtoIpFamily(socket.remoteFamily) + } + return proto +} + +function protocolDecoder (conn, buffer, req) { + const proto = {} + if (!buffer) return proto + const socket = conn.socket || conn + proto.isProxy = 0 + proto.isWebsocket = false + extractHttpDetails(req, socket, proto) + extractProxyDetails(buffer, proto) + + if (!proto.ipAddress) { + extractSocketDetails(socket, proto) + } + + return proto +} module.exports = { + extractSocketDetails, protocolDecoder } diff --git a/lib/protocol-decoder.js b/lib/protocol-decoder.js deleted file mode 100644 index 13b0863..0000000 --- a/lib/protocol-decoder.js +++ /dev/null @@ -1,145 +0,0 @@ -'use strict' - -var proxyProtocol = require('proxy-protocol-js') -var forwarded = require('forwarded') - -var v1ProxyProtocolSignature = Buffer.from('PROXY ', 'utf8') -var v2ProxyProtocolSignature = Buffer.from([ - 0x0d, - 0x0a, - 0x0d, - 0x0a, - 0x00, - 0x0d, - 0x0a, - 0x51, - 0x55, - 0x49, - 0x54, - 0x0a -]) - -function isValidV1ProxyProtocol (buffer) { - for (var i = 0; i < v1ProxyProtocolSignature.length; i++) { - if (buffer[i] !== v1ProxyProtocolSignature[i]) { - return false - } - } - return true -} - -function isValidV2ProxyProtocol (buffer) { - for (var i = 0; i < v2ProxyProtocolSignature.length; i++) { - if (buffer[i] !== v2ProxyProtocolSignature[i]) { - return false - } - } - return true -} - -// from https://stackoverflow.com/questions/57077161/how-do-i-convert-hex-buffer-to-ipv6-in-javascript -function parseIpV6Array (ip) { - var ipHex = Buffer.from(ip).toString('hex') - return ipHex.match(/.{1,4}/g) - .map((val) => val.replace(/^0+/, '')) - .join(':') - .replace(/0000:/g, ':') - .replace(/:{2,}/g, '::') -} - -function protocolDecoder (client, data) { - var proto = {} - if (!data) return proto - var trustProxy = client.broker.trustProxy - var ipFamily - var conn = client.conn - var socket = conn.socket || conn - proto.isProxy = 0 - proto.isWebsocket = false - if (trustProxy) { - var headers = client.req && client.req.headers ? client.req.headers : null - var proxyProto - if (headers) { - if (headers['x-forwarded-for']) { - var addresses = forwarded(client.req) - proto.ipAddress = headers['x-real-ip'] ? headers['x-real-ip'] : addresses[addresses.length - 1] - proto.serverIpAddress = addresses[0] - } - if (headers['x-real-ip']) { - proto.ipAddress = headers['x-real-ip'] - } - proto.port = socket._socket.remotePort - ipFamily = socket._socket.remoteFamily - proto.isWebsocket = true - } - if (isValidV1ProxyProtocol(data)) { - proxyProto = proxyProtocol.V1BinaryProxyProtocol.parse(data) - if (proxyProto && proxyProto.source && proxyProto.data) { - ipFamily = proxyProto.inetProtocol - proto.ipAddress = proxyProto.source.ipAddress - proto.port = proxyProto.source.port - proto.serverIpAddress = proxyProto.destination.ipAddress - proto.data = proxyProto.data - proto.isProxy = 1 - } - } else if (isValidV2ProxyProtocol(data)) { - proxyProto = proxyProtocol.V2ProxyProtocol.parse(data) - if (proxyProto && proxyProto.proxyAddress && proxyProto.data) { - if (proxyProto.proxyAddress instanceof proxyProtocol.IPv4ProxyAddress) { - proto.ipAddress = proxyProto.proxyAddress.sourceAddress.address.join('.') - proto.port = proxyProto.proxyAddress.sourceAddress.address.port - proto.serverIpAddress = proxyProto.proxyAddress.destinationAddress.address.join('.') - ipFamily = 'IPv4' - } else if (proxyProto.proxyAddress instanceof proxyProtocol.IPv6ProxyAddress) { - proto.ipAddress = parseIpV6Array(proxyProto.proxyAddress.sourceAddress.address) - proto.port = proxyProto.proxyAddress.sourceAddress.address.port - proto.serverIpAddress = parseIpV6Array(proxyProto.proxyAddress.destinationAddress.address) - ipFamily = 'IPv6' - } - proto.isProxy = 2 - if (Buffer.isBuffer(proxyProto.data)) { - proto.data = proxyProto.data - } else { - proto.data = Buffer.from(proxyProto.data) - } - } - } - } - if (!proto.ipAddress) { - if (socket._socket && socket._socket.address) { - proto.isWebsocket = true - proto.ipAddress = socket._socket.remoteAddress - proto.port = socket._socket.remotePort - proto.serverIpAddress = socket._socket.address().address - ipFamily = socket._socket.remoteFamily - } else if (socket.address) { - proto.ipAddress = socket.remoteAddress - proto.port = socket.remotePort - proto.serverIpAddress = socket.address().address - ipFamily = socket.remoteFamily - } - } - if (ipFamily && ipFamily.endsWith('4')) { - proto.ipFamily = 4 - } else if (ipFamily && ipFamily.endsWith('6')) { - proto.ipFamily = 6 - } else { - proto.ipFamily = 0 - } - if (!client.connDetails) client.connDetails = {} - if (proto.ipAddress) { - client.connDetails.ipAddress = proto.ipAddress - } - if (proto.port) { - client.connDetails.port = proto.port - } - if (proto.serverIpAddress) { - client.connDetails.serverIpAddress = proto.serverIpAddress - } - client.connDetails.ipFamily = proto.ipFamily - client.connDetails.isProxy = proto.isProxy - client.connDetails.isWebsocket = proto.isWebsocket - return proto -} - -module.exports = protocolDecoder diff --git a/package.json b/package.json index 8c9acd3..02c8101 100644 --- a/package.json +++ b/package.json @@ -26,7 +26,7 @@ "url": "http://github.com/moscajs/aedes-protocol-decoder/issues" }, "engines": { - "node": ">=8" + "node": ">=10" }, "release-it": { "github": { @@ -64,25 +64,25 @@ ], "license": "MIT", "devDependencies": { - "@types/node": "^12.12.25", - "@typescript-eslint/eslint-plugin": "^2.17.0", - "@typescript-eslint/parser": "^2.17.0", - "aedes": "git+https://git@github.com/moscajs/aedes.git#master", + "@types/node": "^14.0.1", + "@typescript-eslint/eslint-plugin": "^2.30.0", + "@typescript-eslint/parser": "^2.30.0", + "aedes": "^0.44.0", + "aedes-server-factory": "git+https://git@github.com/getlarge/aedes-server-factory.git#create-server-factory", "faucet": "0.0.1", "license-checker": "^25.0.1", - "mqtt": "^3.0.0", - "mqtt-packet": "^6.3.0", + "mqtt": "^4.0.0", + "mqtt-packet": "^6.5.0", "nyc": "^15.0.0", "pre-commit": "^1.2.2", - "release-it": "^12.4.3", + "release-it": "^14.0.2", "snazzy": "^8.0.0", - "standard": "^14.3.1", + "standard": "^14.3.3", "tape": "^4.13.0", - "typescript": "^3.7.5", - "websocket-stream": "^5.5.0" + "typescript": "^4.0.2" }, "dependencies": { "forwarded": "^0.1.2", - "proxy-protocol-js": "^4.0.3" + "proxy-protocol-js": "^4.0.5" } } diff --git a/test.js b/test.js index f58b8d8..5d598da 100644 --- a/test.js +++ b/test.js @@ -2,55 +2,124 @@ var test = require('tape').test var aedes = require('aedes') -var http = require('http') -var ws = require('websocket-stream') +var { createServer } = require('aedes-server-factory') var mqtt = require('mqtt') var mqttPacket = require('mqtt-packet') var net = require('net') var proxyProtocol = require('proxy-protocol-js') -var protocolDecoder = require('./lib/protocol-decoder') +var { extractSocketDetails, protocolDecoder } = require('./index') + +function start (options) { + var broker + var server + var client + + if (options.broker) { + broker = aedes(options.broker) + } + if (options.server) { + server = createServer(broker, options.server) + } + if (options.client) { + client = mqtt.connect(options.client) + } + + return { broker, client, server } +} + +function close ({ broker, client, server }, t) { + if (client) { + client.end(true) + } + if (broker) { + broker.close() + } + if (server) { + server.close() + } + if (t) { + t.end() + } +} + +function generateProxyConnectPacket (clientIp, serverPort, proxyVersion = 1, ipFamily = 4) { + var packet = { + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'my-client-proxyV1', + keepalive: 0 + } + + if (proxyVersion === 1 && ipFamily === 4) { + var src = new proxyProtocol.Peer(clientIp, 12345) + var dst = new proxyProtocol.Peer('127.0.0.1', serverPort) + return new proxyProtocol.V1BinaryProxyProtocol( + proxyProtocol.INETProtocol.TCP4, + src, + dst, + mqttPacket.generate(packet) + ).build() + } else if (proxyVersion === 2 && ipFamily === 4) { + return new proxyProtocol.V2ProxyProtocol( + proxyProtocol.Command.LOCAL, + proxyProtocol.TransportProtocol.DGRAM, + new proxyProtocol.IPv4ProxyAddress( + proxyProtocol.IPv4Address.createFrom(clientIp.split('.')), + 12345, + proxyProtocol.IPv4Address.createFrom([127, 0, 0, 1]), + serverPort + ), + mqttPacket.generate(packet) + ).build() + } else if (proxyVersion === 2 && ipFamily === 6) { + return new proxyProtocol.V2ProxyProtocol( + proxyProtocol.Command.PROXY, + proxyProtocol.TransportProtocol.STREAM, + new proxyProtocol.IPv6ProxyAddress( + proxyProtocol.IPv6Address.createFrom(clientIp), + 12345, + proxyProtocol.IPv6Address.createWithEmptyAddress(), + serverPort + ), + mqttPacket.generate(packet) + ).build() + } + return null +} -// test ipAddress property presence when trustProxy is enabled test('tcp clients have access to the ipAddress from the socket', function (t) { t.plan(2) var port = 4883 - var broker = aedes({ - decodeProtocol: function (client, buffer) { - var proto = protocolDecoder(client, buffer) - return proto - }, - preConnect: function (client, done) { - if (client && client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal('::ffff:127.0.0.1', client.ip) - } else { - t.fail('no ip address present') + var setup = start({ + broker: { + preConnect: function (client, packet, done) { + if (client && client.connDetails && client.connDetails.ipAddress) { + client.ip = client.connDetails.ipAddress + t.equal('::ffff:127.0.0.1', client.ip) + } else { + t.fail('no ip address present') + } + done(null, true) + close(setup, t) } - done(null, true) - setImmediate(finish) }, - trustProxy: true + server: { + trustProxy: false, extractSocketDetails, protocolDecoder + }, + client: { + port, + keepalive: 0, + clientId: 'mqtt-client', + clean: false + } }) - var server = net.createServer(broker.handle) - server.listen(port, function (err) { + setup.server.listen(port, function (err) { t.error(err, 'no error') }) - - var client = mqtt.connect({ - port, - keepalive: 0, - clientId: 'mqtt-client', - clean: false - }) - - function finish () { - client.end() - broker.close() - server.close() - t.end() - } }) test('tcp proxied (protocol v1) clients have access to the ipAddress(v4)', function (t) { @@ -58,60 +127,40 @@ test('tcp proxied (protocol v1) clients have access to the ipAddress(v4)', funct var port = 4883 var clientIp = '192.168.0.140' - var packet = { - cmd: 'connect', - protocolId: 'MQIsdp', - protocolVersion: 3, - clean: true, - clientId: 'my-client-proxyV1', - keepalive: 0 - } - var buf = mqttPacket.generate(packet) - var src = new proxyProtocol.Peer(clientIp, 12345) - var dst = new proxyProtocol.Peer('127.0.0.1', port) - var protocol = new proxyProtocol.V1BinaryProxyProtocol( - proxyProtocol.INETProtocol.TCP4, - src, - dst, - buf - ).build() - - var broker = aedes({ - decodeProtocol: function (client, buffer) { - var proto = protocolDecoder(client, buffer) - return proto - }, - preConnect: function (client, done) { - if (client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal(clientIp, client.ip) - } else { - t.fail('no ip address present') + var setup = start({ + broker: { + preConnect: function (client, packet, done) { + if (client.connDetails && client.connDetails.ipAddress) { + client.ip = client.connDetails.ipAddress + t.equal(clientIp, client.ip) + } else { + t.fail('no ip address present') + } + done(null, true) + finish() } - done(null, true) - setImmediate(finish) }, - trustProxy: true + server: { + trustProxy: true, extractSocketDetails, protocolDecoder + } }) - var server = net.createServer(broker.handle) - server.listen(port, function (err) { + setup.server.listen(port, function (err) { t.error(err, 'no error') }) + var proxyPacket = generateProxyConnectPacket(clientIp, port, 1, 4) var client = net.connect({ port, timeout: 0 }, function () { - client.write(protocol) + client.write(proxyPacket) }) function finish () { client.end() - broker.close() - server.close() - t.end() + close(setup, t) } }) @@ -120,63 +169,42 @@ test('tcp proxied (protocol v2) clients have access to the ipAddress(v4)', funct var port = 4883 var clientIp = '192.168.0.140' - var packet = { - cmd: 'connect', - protocolId: 'MQTT', - protocolVersion: 4, - clean: true, - clientId: 'my-client-proxyV2' - } - var protocol = new proxyProtocol.V2ProxyProtocol( - proxyProtocol.Command.LOCAL, - proxyProtocol.TransportProtocol.DGRAM, - new proxyProtocol.IPv4ProxyAddress( - proxyProtocol.IPv4Address.createFrom(clientIp.split('.')), - 12345, - proxyProtocol.IPv4Address.createFrom([127, 0, 0, 1]), - port - ), - mqttPacket.generate(packet) - ).build() - - var broker = aedes({ - decodeProtocol: function (client, buffer) { - var proto = protocolDecoder(client, buffer) - return proto - }, - preConnect: function (client, done) { - if (client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal(clientIp, client.ip) - } else { - t.fail('no ip address present') + var setup = start({ + broker: { + preConnect: function (client, packet, done) { + if (client.connDetails && client.connDetails.ipAddress) { + client.ip = client.connDetails.ipAddress + t.equal(clientIp, client.ip) + } else { + t.fail('no ip address present') + } + done(null, true) + finish() } - done(null, true) - setImmediate(finish) }, - trustProxy: true + server: { + trustProxy: true, extractSocketDetails, protocolDecoder + } }) - var server = net.createServer(broker.handle) - server.listen(port, function (err) { + setup.server.listen(port, function (err) { t.error(err, 'no error') }) + var proxyPacket = generateProxyConnectPacket(clientIp, port, 2, 4) var client = net.createConnection( { port, timeout: 0 }, function () { - client.write(Buffer.from(protocol)) + client.write(proxyPacket) } ) function finish () { client.end() - broker.close() - server.close() - t.end() + close(setup, t) } }) @@ -186,63 +214,42 @@ test('tcp proxied (protocol v2) clients have access to the ipAddress(v6)', funct var port = 4883 var clientIpArray = [0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 192, 168, 1, 128] var clientIp = '::ffff:c0a8:180:' - var packet = { - cmd: 'connect', - protocolId: 'MQTT', - protocolVersion: 4, - clean: true, - clientId: 'my-client-proxyV2' - } - var protocol = new proxyProtocol.V2ProxyProtocol( - proxyProtocol.Command.PROXY, - proxyProtocol.TransportProtocol.STREAM, - new proxyProtocol.IPv6ProxyAddress( - proxyProtocol.IPv6Address.createFrom(clientIpArray), - 12345, - proxyProtocol.IPv6Address.createWithEmptyAddress(), - port - ), - mqttPacket.generate(packet) - ).build() - - var broker = aedes({ - decodeProtocol: function (client, buffer) { - var proto = protocolDecoder(client, buffer) - return proto - }, - preConnect: function (client, done) { - if (client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal(clientIp, client.ip) - } else { - t.fail('no ip address present') + var setup = start({ + broker: { + preConnect: function (client, packet, done) { + if (client.connDetails && client.connDetails.ipAddress) { + client.ip = client.connDetails.ipAddress + t.equal(clientIp, client.ip) + } else { + t.fail('no ip address present') + } + done(null, true) + finish() } - done(null, true) - setImmediate(finish) }, - trustProxy: true + server: { + trustProxy: true, extractSocketDetails, protocolDecoder + } }) - var server = net.createServer(broker.handle) - server.listen(port, function (err) { + setup.server.listen(port, function (err) { t.error(err, 'no error') }) + var proxyPacket = generateProxyConnectPacket(clientIpArray, port, 2, 6) var client = net.createConnection( { port, timeout: 0 }, function () { - client.write(Buffer.from(protocol)) + client.write(proxyPacket) } ) function finish () { client.end() - broker.close() - server.close() - t.end() + close(setup, t) } }) @@ -251,41 +258,29 @@ test('websocket clients have access to the ipAddress from the socket (if no ip h var clientIp = '::ffff:127.0.0.1' var port = 4883 - var broker = aedes({ - decodeProtocol: function (client, buffer) { - var proto = protocolDecoder(client, buffer) - return proto - }, - preConnect: function (client, done) { - if (client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal(clientIp, client.ip) - } else { - t.fail('no ip address present') + + var setup = start({ + broker: { + preConnect: function (client, packet, done) { + if (client.connDetails && client.connDetails.ipAddress) { + client.ip = client.connDetails.ipAddress + t.equal(clientIp, client.ip) + } else { + t.fail('no ip address present') + } + done(null, true) + close(setup, t) } - done(null, true) - setImmediate(finish) }, - trustProxy: true + server: { + ws: true, trustProxy: false, extractSocketDetails, protocolDecoder + }, + client: `ws://localhost:${port}` }) - var server = http.createServer() - ws.createServer({ - server: server - }, broker.handle) - - server.listen(port, function (err) { + setup.server.listen(port, function (err) { t.error(err, 'no error') }) - - var client = mqtt.connect(`ws://localhost:${port}`) - - function finish () { - broker.close() - server.close() - client.end() - t.end() - } }) test('websocket proxied clients have access to the ipAddress from x-real-ip header', function (t) { @@ -293,47 +288,38 @@ test('websocket proxied clients have access to the ipAddress from x-real-ip head var clientIp = '192.168.0.140' var port = 4883 - var broker = aedes({ - decodeProtocol: function (client, buffer) { - var proto = protocolDecoder(client, buffer) - return proto - }, - preConnect: function (client, done) { - if (client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal(clientIp, client.ip) - } else { - t.fail('no ip address present') + + var setup = start({ + broker: { + preConnect: function (client, packet, done) { + if (client.connDetails && client.connDetails.ipAddress) { + client.ip = client.connDetails.ipAddress + t.equal(clientIp, client.ip) + } else { + t.fail('no ip address present') + } + done(null, true) + close(setup, t) } - done(null, true) - setImmediate(finish) }, - trustProxy: true - }) - - var server = http.createServer() - ws.createServer({ - server: server - }, broker.handle) - - server.listen(port, function (err) { - t.error(err, 'no error') - }) - - var client = mqtt.connect(`ws://localhost:${port}`, { - wsOptions: { - headers: { - 'X-Real-Ip': clientIp + server: { + ws: true, trustProxy: true, extractSocketDetails, protocolDecoder + }, + client: { + protocol: 'ws', + host: 'localhost', + port, + wsOptions: { + headers: { + 'X-Real-Ip': clientIp + } } } }) - function finish () { - broker.close() - server.close() - client.end() - t.end() - } + setup.server.listen(port, function (err) { + t.error(err, 'no error') + }) }) test('websocket proxied clients have access to the ipAddress from x-forwarded-for header', function (t) { @@ -341,47 +327,38 @@ test('websocket proxied clients have access to the ipAddress from x-forwarded-fo var clientIp = '192.168.0.140' var port = 4883 - var broker = aedes({ - decodeProtocol: function (client, buffer) { - var proto = protocolDecoder(client, buffer) - return proto - }, - preConnect: function (client, done) { - if (client.connDetails && client.connDetails.ipAddress) { - client.ip = client.connDetails.ipAddress - t.equal(clientIp, client.ip) - } else { - t.fail('no ip address present') + + var setup = start({ + broker: { + preConnect: function (client, packet, done) { + if (client.connDetails && client.connDetails.ipAddress) { + client.ip = client.connDetails.ipAddress + t.equal(clientIp, client.ip) + } else { + t.fail('no ip address present') + } + done(null, true) + close(setup, t) } - done(null, true) - setImmediate(finish) }, - trustProxy: true - }) - - var server = http.createServer() - ws.createServer({ - server: server - }, broker.handle) - - server.listen(port, function (err) { - t.error(err, 'no error') - }) - - var client = mqtt.connect(`ws://localhost:${port}`, { - wsOptions: { - headers: { - 'X-Forwarded-For': clientIp + server: { + ws: true, trustProxy: true, extractSocketDetails, protocolDecoder + }, + client: { + protocol: 'ws', + host: 'localhost', + port, + wsOptions: { + headers: { + 'X-Forwarded-For': clientIp + } } } }) - function finish () { - broker.close() - server.close() - client.end() - t.end() - } + setup.server.listen(port, function (err) { + t.error(err, 'no error') + }) }) test('tcp proxied (protocol v1) clients buffer contains MQTT packet and proxy header', function (t) { @@ -390,43 +367,42 @@ test('tcp proxied (protocol v1) clients buffer contains MQTT packet and proxy he var brokerPort = 4883 var proxyPort = 4884 var clientIp = '192.168.0.140' - var packet = { - cmd: 'connect', - protocolId: 'MQIsdp', - protocolVersion: 3, - clean: true, - clientId: 'my-client-proxyV1', - keepalive: 0 - } - - var buf = mqttPacket.generate(packet) - var src = new proxyProtocol.Peer(clientIp, 12345) - var dst = new proxyProtocol.Peer('127.0.0.1', proxyPort) - var broker = aedes({ - decodeProtocol: function (client, buff) { - var proto = protocolDecoder(client, buff) - if (proto.data) { - t.equal(proto.data.toString(), buf.toString()) - } else { - t.fail('no MQTT packet extracted from TCP buffer') + var setup = start({ + broker: { + preConnect: function (client, packet, done) { + if (client.connDetails.data) { + const parser = mqttPacket.parser({ protocolVersion: 3 }) + parser.on('packet', (parsedPacket) => { + t.equal(JSON.stringify(parsedPacket), JSON.stringify(packet)) + done(null, true) + }) + parser.on('error', () => { + t.fail('no valid MQTT packet extracted from TCP buffer') + done(null, true) + }) + parser.parse(client.connDetails.data) + } else { + t.fail('no MQTT packet extracted from TCP buffer') + done(null, true) + } } - return proto }, - trustProxy: true + server: { + trustProxy: true, extractSocketDetails, protocolDecoder + } }) - broker.on('clientDisconnect', function (client) { - // console.log('onClientDisconnect', client.id) - setImmediate(finish) + setup.broker.on('clientDisconnect', function () { + finish() }) - var server = net.createServer(broker.handle) - server.listen(brokerPort, function (err) { + setup.server.listen(brokerPort, function (err) { t.error(err, 'no error') }) var proxyServer = net.createServer() + proxyServer.listen(proxyPort, function (err) { t.error(err, 'no error') }) @@ -434,17 +410,17 @@ test('tcp proxied (protocol v1) clients buffer contains MQTT packet and proxy he var proxyClient proxyServer.on('connection', function (socket) { - socket.on('end', function (data) { - proxyClient.end(data, function () { - proxyClient.connected = false - }) + socket.on('end', function () { + proxyClient.connected = false }) socket.on('data', function (data) { if (proxyClient && proxyClient.connected) { proxyClient.write(data) } else { - var protocol = new proxyProtocol.V1BinaryProxyProtocol( + var src = new proxyProtocol.Peer(clientIp, 12345) + var dst = new proxyProtocol.Peer('127.0.0.1', proxyPort) + var proxyPacket = new proxyProtocol.V1BinaryProxyProtocol( proxyProtocol.INETProtocol.TCP4, src, dst, @@ -454,7 +430,7 @@ test('tcp proxied (protocol v1) clients buffer contains MQTT packet and proxy he port: brokerPort, timeout: 0 }, function () { - proxyClient.write(protocol, function () { + proxyClient.write(proxyPacket, function () { proxyClient.connected = true }) }) @@ -462,11 +438,20 @@ test('tcp proxied (protocol v1) clients buffer contains MQTT packet and proxy he }) }) + var packet = { + cmd: 'connect', + protocolId: 'MQIsdp', + protocolVersion: 3, + clean: true, + clientId: 'my-client-proxyV1', + keepalive: 0 + } + var client = net.connect({ port: proxyPort, - timeout: 200 + timeout: 250 }, function () { - client.write(buf) + client.write(mqttPacket.generate(packet)) }) client.on('timeout', function () { @@ -474,8 +459,7 @@ test('tcp proxied (protocol v1) clients buffer contains MQTT packet and proxy he }) function finish () { - broker.close() - server.close() + close(setup) proxyServer.close() t.end() } diff --git a/types/index.d.ts b/types/index.d.ts index 7ab31d1..7b29d89 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -1,3 +1,13 @@ + +/* eslint no-unused-vars: 0 */ +/* eslint no-undef: 0 */ +/* eslint space-infix-ops: 0 */ + +/// + +import { Socket } from 'net' +import { Stream } from 'stream' + export interface ConnectionDetails { ipAddress: string port: number @@ -8,4 +18,6 @@ export interface ConnectionDetails { data?: Buffer } -export type ProtocolDecoder = (client: Object, buffer: Buffer) => ConnectionDetails | null +export type ProtocolDecoder = (conn: Stream, buffer: Buffer, req?: any) => ConnectionDetails + +export type ExtractSocketDetails = (socket: Socket) => ConnectionDetails | null