Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proxy and ip decoder #334

Merged
merged 8 commits into from
Dec 16, 2019
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ Options:
packet to arrive, defaults to `30000` milliseconds
* `id`: id used to identify this broker instance in `$SYS` messages,
defaults to `uuidv4()`
* `decodeProtocol`: function called when a valid buffer is received, see
[instance.decodeProtocol()](#decodeProtocol)
* `preConnect`: function called when a valid CONNECT is received, see
[instance.preConnect()](#preConnect)
* `authenticate`: function used to authenticate clients, see
Expand Down Expand Up @@ -220,7 +222,19 @@ Both `topic` and `payload` can be `Buffer` objects instead of strings.
### instance.unsubscribe(topic, func(packet, cb), done)

The reverse of [subscribe](#subscribe).
------------------------------------------------------
<a name="decodeProtocol"></a>
### instance.decodeProtocol(client, buffer)

It will be called when aedes instance trustProxy is true and that it receives a first valid buffer from client. client object state is in default and its connected state is false. A default function parse https headers (x-real-ip | x-forwarded-for) and proxy protocol v1 and v2 to retrieve information in client.connDetails. Override to supply custom protocolDecoder logic, if it returns an object with data property, this property will be parsed as an mqtt-packet.


```js
instance.decodeProtocol = function(client, buffer) {
var protocol = yourDecoder(client, buffer)
return protocol
}
```
-------------------------------------------------------
<a name="preConnect"></a>
### instance.preConnect(client, done(err, successful))
Expand Down
16 changes: 15 additions & 1 deletion aedes.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ var Packet = require('aedes-packet')
var bulk = require('bulk-write-stream')
var reusify = require('reusify')
var Client = require('./lib/client')
var protocolDecoder = require('./lib/protocol-decoder')

module.exports = Aedes
Aedes.Server = Aedes
Expand All @@ -19,12 +20,15 @@ var defaultOptions = {
concurrency: 100,
heartbeatInterval: 60000, // 1 minute
connectTimeout: 30000, // 30 secs
decodeProtocol: defaultDecodeProtocol,
preConnect: defaultPreConnect,
authenticate: defaultAuthenticate,
authorizePublish: defaultAuthorizePublish,
authorizeSubscribe: defaultAuthorizeSubscribe,
authorizeForward: defaultAuthorizeForward,
published: defaultPublished
published: defaultPublished,
trustProxy: false,
trustedProxies: []
}

function Aedes (opts) {
Expand Down Expand Up @@ -59,6 +63,10 @@ function Aedes (opts) {
this.authorizeForward = opts.authorizeForward
this.published = opts.published

this.decodeProtocol = opts.decodeProtocol
this.trustProxy = opts.trustProxy
this.trustedProxies = opts.trustedProxies

this.clients = {}
this.brokers = {}

Expand Down Expand Up @@ -295,9 +303,15 @@ Aedes.prototype.close = function (cb = noop) {

Aedes.prototype.version = require('./package.json').version

function defaultDecodeProtocol (client, buffer) {
var proto = protocolDecoder(client, buffer)
return proto
}

function defaultPreConnect (client, callback) {
callback(null, true)
}

function defaultAuthenticate (client, username, password, callback) {
callback(null, true)
}
Expand Down
135 changes: 135 additions & 0 deletions examples/proxy/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
'use strict'

var aedes = require('../../aedes')
getlarge marked this conversation as resolved.
Show resolved Hide resolved
var mqemitter = require('mqemitter')
var persistence = require('aedes-persistence')
var mqttPacket = require('mqtt-packet')
var net = require('net')
var proxyProtocol = require('proxy-protocol-js')

function sendProxyPacket (version = 1) {
var packet = {
cmd: 'connect',
protocolId: 'MQTT',
protocolVersion: 4,
clean: true,
clientId: `my-client-${version}`,
keepalive: 0
}

var protocol
if (version === 1) {
var src = new proxyProtocol.Peer('127.0.0.1', 12345)
var dst = new proxyProtocol.Peer('127.0.0.1', 1883)
protocol = new proxyProtocol.V1BinaryProxyProtocol(
proxyProtocol.INETProtocol.TCP4,
src,
dst,
mqttPacket.generate(packet)
).build()
} else if (version === 2) {
protocol = new proxyProtocol.V2ProxyProtocol(
proxyProtocol.Command.LOCAL,
proxyProtocol.TransportProtocol.DGRAM,
new proxyProtocol.IPv4ProxyAddress(
proxyProtocol.IPv4Address.createFrom([127, 0, 0, 1]),
12346,
proxyProtocol.IPv4Address.createFrom([127, 0, 0, 1]),
1883
),
mqttPacket.generate(packet)
).build()
}

var parsedProto = version === 1
? proxyProtocol.V1BinaryProxyProtocol.parse(protocol)
: proxyProtocol.V2ProxyProtocol.parse(protocol)
// console.log(parsedProto)

var dstPort = version === 1
? parsedProto.destination.port
: parsedProto.proxyAddress.destinationPort

var dstHost = version === 1
? parsedProto.destination.ipAddress
: parsedProto.proxyAddress.destinationAddress.address.join('.')

// console.log('Connection to :', dstHost, dstPort)
var mqttConn = net.createConnection(
{
port: dstPort,
host: dstHost,
timeout: 150
}
)

var data
if (version === 2) {
data = Buffer.from(protocol.buffer)
} else {
data = protocol
}

mqttConn.on('timeout', function () {
// console.log("protocol proxy buffer", data)
mqttConn.write(data, () => {
mqttConn.end()
})
})
}

function startAedes () {
var port = 1883

var broker = aedes({
mq: mqemitter({
concurrency: 100
}),
persistence: persistence(),
preConnect: function (client, done) {
console.log('Aedes preConnect check client ip:', client.connDetails)
if (client.connDetails && client.connDetails.ipAddress) {
client.ip = client.connDetails.ipAddress
}
client.close()
return done(null, true)
},
trustProxy: true
})

var server = require('net').createServer(broker.handle)

server.listen(port, function () {
console.log('Aedes listening on port:', port)
broker.publish({ topic: 'aedes/hello', payload: "I'm broker " + broker.id })
setTimeout(() => sendProxyPacket(1), 250)
setTimeout(() => sendProxyPacket(2), 500)
})

broker.on('subscribe', function (subscriptions, client) {
console.log('MQTT client \x1b[32m' + (client ? client.id : client) +
'\x1b[0m subscribed to topics: ' + subscriptions.map(s => s.topic).join('\n'), 'from broker', broker.id)
})

broker.on('unsubscribe', function (subscriptions, client) {
console.log('MQTT client \x1b[32m' + (client ? client.id : client) +
'\x1b[0m unsubscribed to topics: ' + subscriptions.join('\n'), 'from broker', broker.id)
})

// fired when a client connects
broker.on('client', function (client) {
console.log('Client Connected: \x1b[33m' + (client ? client.id : client) + ' ip ' + (client ? client.ip : null) + '\x1b[0m', 'to broker', broker.id)
})

// fired when a client disconnects
broker.on('clientDisconnect', function (client) {
console.log('Client Disconnected: \x1b[31m' + (client ? client.id : client) + '\x1b[0m', 'to broker', broker.id)
})

// fired when a message is published
broker.on('publish', async function (packet, client) {
console.log('Client \x1b[31m' + (client ? client.id : 'BROKER_' + broker.id) + '\x1b[0m has published', packet.payload.toString(), 'on', packet.topic, 'to broker', broker.id)
})
}

startAedes()
17 changes: 17 additions & 0 deletions examples/proxy/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name": "aedes_proxy",
"version": "1.0.0",
"description": "Testing Aedes Broker behing proxy",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "getlarge",
"license": "MIT",
"dependencies": {
"aedes": "git+https://[email protected]/getlarge/aedes.git#proxy_and_ip_decoder",
"mqemitter": "^3.0.0",
"mqtt-packet": "^6.2.1",
"proxy-protocol-js": "^4.0.2"
mcollina marked this conversation as resolved.
Show resolved Hide resolved
}
}
12 changes: 10 additions & 2 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ function Client (broker, conn, req) {

this.disconnected = false

this.connDetails = {}
getlarge marked this conversation as resolved.
Show resolved Hide resolved

this.parser.on('packet', enqueue)

function nextBatch (err) {
Expand All @@ -64,8 +66,14 @@ function Client (broker, conn, req) {
that._parsingBatch = 0
var buf = empty
buf = client.conn.read(null)

if (buf) {
if (!client.connackSent && client.broker.trustProxy && buf) {
var { data } = client.broker.decodeProtocol(client, buf)
if (data) {
client.parser.parse(data)
} else {
client.parser.parse(buf)
}
} else if (buf) {
client.parser.parse(buf)
}
}
Expand Down
84 changes: 84 additions & 0 deletions lib/protocol-decoder.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
'use strict'

var proxyProtocol = require('proxy-protocol-js')

function protocolDecoder (client, data) {
var proto = {}
// var buffer = Buffer.allocUnsafe(0)
// buffer = client.conn.read(null)
getlarge marked this conversation as resolved.
Show resolved Hide resolved
if (!data) return proto
// todo: checkProxiesList(client.conn, client.broker.trustedProxies)
getlarge marked this conversation as resolved.
Show resolved Hide resolved
var trustProxy = client.broker.trustProxy
var ipFamily
var conn = client.conn
var socket = conn.socket || conn
var headers = client.req && client.req.headers ? client.req.headers : null
if (trustProxy && headers) {
if (headers['x-real-ip']) proto.ipAddress = headers['x-real-ip']
else if (headers['x-forwarded-for']) proto.ipAddress = headers['x-forwarded-for']
client.connDetails.isWebsocket = true
client.connDetails.isProxied = true
} else if (trustProxy) {
var proxyProto
try {
proxyProto = proxyProtocol.V1BinaryProxyProtocol.parse(data)
} catch (V1BinaryProxyProtocolE) {
try {
proxyProto = proxyProtocol.V2ProxyProtocol.parse(data)
} catch (V2ProxyProtocolE) {
// empty;
getlarge marked this conversation as resolved.
Show resolved Hide resolved
}
} finally {
if (proxyProto && proxyProto.source && proxyProto.data) {
ipFamily = proxyProto.inetProtocol
proto.ipAddress = proxyProto.source.ipAddress
proto.data = proxyProto.data
client.connDetails.isWebsocket = false
client.connDetails.isProxied = true
} else if (proxyProto && proxyProto.proxyAddress && proxyProto.data) {
if (proxyProto.proxyAddress instanceof proxyProtocol.IPv4ProxyAddress) {
ipFamily = 'IPv4'
} else if (proxyProto.proxyAddress instanceof proxyProtocol.IPv6ProxyAddress) {
ipFamily = 'IPv6'
}
proto.ipAddress = proxyProto.proxyAddress.sourceAddress.address.join('.')
if (Buffer.isBuffer(proxyProto.data)) {
proto.data = proxyProto.data
} else {
proto.data = Buffer.from(proxyProto.data)
}
client.connDetails.isWebsocket = false
client.connDetails.isProxied = true
}
}
}
if (!proto.ipAddress) {
if (socket._socket && socket._socket.address) {
client.connDetails.isWebsocket = true
client.connDetails.isProxied = false
proto.ipAddress = socket._socket.remoteAddress
ipFamily = socket._socket.remoteFamily
} else if (socket.address) {
proto.ipAddress = socket.remoteAddress
ipFamily = socket.remoteFamily
client.connDetails.isWebsocket = false
client.connDetails.isProxied = false
}
}
if (ipFamily && ipFamily.endsWith('4')) {
proto.ipFamily = 4
} else if (ipFamily && ipFamily.endsWith('6')) {
proto.ipFamily = 6
} else {
proto.ipFamily = 0
}
if (proto.ipAddress) {
client.connDetails.ipAddress = proto.ipAddress
}
if (proto.ipFamily !== undefined) {
client.connDetails.ipFamily = proto.ipFamily
}
return proto
}

module.exports = protocolDecoder
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
"from2": "^2.3.0",
"mqemitter": "^3.0.0",
"mqtt-packet": "^6.2.1",
"proxy-protocol-js": "^4.0.2",
"pump": "^3.0.0",
"retimer": "^2.0.0",
"reusify": "^1.0.4",
Expand Down
Loading