Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proxy and ip decoder #334

Merged
merged 8 commits into from
Dec 16, 2019
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ Options:
packet to arrive, defaults to `30000` milliseconds
* `id`: id used to identify this broker instance in `$SYS` messages,
defaults to `uuidv4()`
* `decodeProtocol`: function called when a valid buffer is received, see
[instance.decodeProtocol()](#decodeProtocol)
* `preConnect`: function called when a valid CONNECT is received, see
[instance.preConnect()](#preConnect)
* `authenticate`: function used to authenticate clients, see
Expand Down Expand Up @@ -220,7 +222,19 @@ Both `topic` and `payload` can be `Buffer` objects instead of strings.
### instance.unsubscribe(topic, func(packet, cb), done)

The reverse of [subscribe](#subscribe).
------------------------------------------------------
<a name="decodeProtocol"></a>
### instance.decodeProtocol(client, buffer)

It will be called when aedes instance trustProxy is true and that it receives a first valid buffer from client. client object state is in default and its connected state is false. A default function parse https headers (x-real-ip | x-forwarded-for) and proxy protocol v1 and v2 to retrieve information in client.connDetails. Override to supply custom protocolDecoder logic, if it returns an object with data property, this property will be parsed as an mqtt-packet.


```js
instance.decodeProtocol = function(client, buffer) {
var protocol = yourDecoder(client, buffer)
return protocol
}
```
-------------------------------------------------------
<a name="preConnect"></a>
### instance.preConnect(client, done(err, successful))
Expand Down
16 changes: 15 additions & 1 deletion aedes.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ var Packet = require('aedes-packet')
var bulk = require('bulk-write-stream')
var reusify = require('reusify')
var Client = require('./lib/client')
var protocolDecoder = require('./lib/protocol-decoder')

module.exports = Aedes
Aedes.Server = Aedes
Expand All @@ -19,12 +20,15 @@ var defaultOptions = {
concurrency: 100,
heartbeatInterval: 60000, // 1 minute
connectTimeout: 30000, // 30 secs
decodeProtocol: defaultDecodeProtocol,
preConnect: defaultPreConnect,
authenticate: defaultAuthenticate,
authorizePublish: defaultAuthorizePublish,
authorizeSubscribe: defaultAuthorizeSubscribe,
authorizeForward: defaultAuthorizeForward,
published: defaultPublished
published: defaultPublished,
trustProxy: false,
trustedProxies: []
}

function Aedes (opts) {
Expand Down Expand Up @@ -59,6 +63,10 @@ function Aedes (opts) {
this.authorizeForward = opts.authorizeForward
this.published = opts.published

this.decodeProtocol = opts.decodeProtocol
this.trustProxy = opts.trustProxy
this.trustedProxies = opts.trustedProxies

this.clients = {}
this.brokers = {}

Expand Down Expand Up @@ -295,9 +303,15 @@ Aedes.prototype.close = function (cb = noop) {

Aedes.prototype.version = require('./package.json').version

function defaultDecodeProtocol (client, buffer) {
var proto = protocolDecoder(client, buffer)
return proto
}

function defaultPreConnect (client, callback) {
callback(null, true)
}

function defaultAuthenticate (client, username, password, callback) {
callback(null, true)
}
Expand Down
180 changes: 180 additions & 0 deletions examples/proxy/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
'use strict'

var aedes = require('../../aedes')
getlarge marked this conversation as resolved.
Show resolved Hide resolved
var mqemitter = require('mqemitter')
var persistence = require('aedes-persistence')
var mqttPacket = require('mqtt-packet')
var net = require('net')
var proxyProtocol = require('proxy-protocol-js')

var brokerPort = 4883

// from https://stackoverflow.com/questions/57077161/how-do-i-convert-hex-buffer-to-ipv6-in-javascript
function parseIpV6 (ip) {
return ip.match(/.{1,4}/g)
.map((val) => val.replace(/^0+/, ''))
.join(':')
.replace(/0000:/g, ':')
.replace(/:{2,}/g, '::')
}

function sendProxyPacket (version = 1, ipFamily = 4) {
var packet = {
cmd: 'connect',
protocolId: 'MQTT',
protocolVersion: 4,
clean: true,
clientId: `my-client-${version}`,
keepalive: 0
}
var hostIpV4 = '0.0.0.0'
var clientIpV4 = '192.168.1.128'
var hostIpV6 = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
var clientIpV6 = [0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 192, 168, 1, 128]
var protocol
if (version === 1) {
if (ipFamily === 4) {
protocol = new proxyProtocol.V1BinaryProxyProtocol(
proxyProtocol.INETProtocol.TCP4,
new proxyProtocol.Peer(clientIpV4, 12345),
new proxyProtocol.Peer(hostIpV4, brokerPort),
mqttPacket.generate(packet)
).build()
} else if (ipFamily === 6) {
protocol = new proxyProtocol.V1BinaryProxyProtocol(
proxyProtocol.INETProtocol.TCP6,
new proxyProtocol.Peer(parseIpV6(Buffer.from(clientIpV6).toString('hex')), 12345),
new proxyProtocol.Peer(parseIpV6(Buffer.from(hostIpV6).toString('hex')), brokerPort),
mqttPacket.generate(packet)
).build()
}
} else if (version === 2) {
if (ipFamily === 4) {
protocol = new proxyProtocol.V2ProxyProtocol(
proxyProtocol.Command.LOCAL,
proxyProtocol.TransportProtocol.STREAM,
new proxyProtocol.IPv4ProxyAddress(
proxyProtocol.IPv4Address.createFrom(clientIpV4.split('.')),
12346,
proxyProtocol.IPv4Address.createFrom(hostIpV4.split('.')),
brokerPort
),
mqttPacket.generate(packet)
).build()
} else if (ipFamily === 6) {
protocol = new proxyProtocol.V2ProxyProtocol(
proxyProtocol.Command.PROXY,
proxyProtocol.TransportProtocol.STREAM,
new proxyProtocol.IPv6ProxyAddress(
proxyProtocol.IPv6Address.createFrom(clientIpV6),
12346,
proxyProtocol.IPv6Address.createFrom(hostIpV6),
brokerPort
),
mqttPacket.generate(packet)
).build()
}
}

var parsedProto = version === 1
? proxyProtocol.V1BinaryProxyProtocol.parse(protocol)
: proxyProtocol.V2ProxyProtocol.parse(protocol)
// console.log(parsedProto)

var dstPort = version === 1
? parsedProto.destination.port
: parsedProto.proxyAddress.destinationPort

var dstHost
if (version === 1) {
if (ipFamily === 4) {
dstHost = parsedProto.destination.ipAddress
} else if (ipFamily === 6) {
dstHost = parsedProto.destination.ipAddress
// console.log('ipV6 host :', parsedProto.destination.ipAddress)
}
} else if (version === 2) {
if (ipFamily === 4) {
dstHost = parsedProto.proxyAddress.destinationAddress.address.join('.')
} else if (ipFamily === 6) {
// console.log('ipV6 client :', parseIpV6(Buffer.from(clientIpV6).toString('hex')))
dstHost = parseIpV6(Buffer.from(parsedProto.proxyAddress.destinationAddress.address).toString('hex'))
}
}

console.log('Connection to :', dstHost, dstPort)
var mqttConn = net.createConnection(
{
port: dstPort,
host: dstHost,
timeout: 150
}
)

var data = protocol
// if (!Buffer.isBuffer(protocol)) {
// data = Buffer.from(protocol.buffer)
// } else {
// data = protocol
// }
getlarge marked this conversation as resolved.
Show resolved Hide resolved

mqttConn.on('timeout', function () {
mqttConn.end(data)
})
}

function startAedes () {
var broker = aedes({
mq: mqemitter({
concurrency: 100
}),
persistence: persistence(),
preConnect: function (client, done) {
console.log('Aedes preConnect check client ip:', client.connDetails)
if (client.connDetails && client.connDetails.ipAddress) {
client.ip = client.connDetails.ipAddress
}
client.close()
return done(null, true)
},
trustProxy: true
})

var server = require('net').createServer(broker.handle)

server.listen(brokerPort, function () {
console.log('Aedes listening on :', server.address())
broker.publish({ topic: 'aedes/hello', payload: "I'm broker " + broker.id })
setTimeout(() => sendProxyPacket(1), 250)
setTimeout(() => sendProxyPacket(1, 6), 500)
setTimeout(() => sendProxyPacket(2), 750)
setTimeout(() => sendProxyPacket(2, 6), 1000)
})

broker.on('subscribe', function (subscriptions, client) {
console.log('MQTT client \x1b[32m' + (client ? client.id : client) +
'\x1b[0m subscribed to topics: ' + subscriptions.map(s => s.topic).join('\n'), 'from broker', broker.id)
})

broker.on('unsubscribe', function (subscriptions, client) {
console.log('MQTT client \x1b[32m' + (client ? client.id : client) +
'\x1b[0m unsubscribed to topics: ' + subscriptions.join('\n'), 'from broker', broker.id)
})

// fired when a client connects
broker.on('client', function (client) {
console.log('Client Connected: \x1b[33m' + (client ? client.id : client) + ' ip ' + (client ? client.ip : null) + '\x1b[0m', 'to broker', broker.id)
})

// fired when a client disconnects
broker.on('clientDisconnect', function (client) {
console.log('Client Disconnected: \x1b[31m' + (client ? client.id : client) + '\x1b[0m', 'to broker', broker.id)
})

// fired when a message is published
broker.on('publish', async function (packet, client) {
console.log('Client \x1b[31m' + (client ? client.id : 'BROKER_' + broker.id) + '\x1b[0m has published', packet.payload.toString(), 'on', packet.topic, 'to broker', broker.id)
})
}

startAedes()
17 changes: 17 additions & 0 deletions examples/proxy/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name": "aedes_proxy",
"version": "1.0.0",
"description": "Testing Aedes Broker behing proxy",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "getlarge",
"license": "MIT",
"dependencies": {
"aedes": "git+https://[email protected]/getlarge/aedes.git#proxy_and_ip_decoder",
"mqemitter": "^3.0.0",
"mqtt-packet": "^6.2.1",
"proxy-protocol-js": "^4.0.2"
mcollina marked this conversation as resolved.
Show resolved Hide resolved
}
}
12 changes: 10 additions & 2 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ function Client (broker, conn, req) {

this.disconnected = false

this.connDetails = {}
getlarge marked this conversation as resolved.
Show resolved Hide resolved

this.parser.on('packet', enqueue)

function nextBatch (err) {
Expand All @@ -64,8 +66,14 @@ function Client (broker, conn, req) {
that._parsingBatch = 0
var buf = empty
buf = client.conn.read(null)

if (buf) {
if (!client.connackSent && client.broker.trustProxy && buf) {
var { data } = client.broker.decodeProtocol(client, buf)
if (data) {
client.parser.parse(data)
} else {
client.parser.parse(buf)
}
} else if (buf) {
client.parser.parse(buf)
}
}
Expand Down
Loading