Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proxy and ip decoder #334

Merged
merged 8 commits into from
Dec 16, 2019
6 changes: 5 additions & 1 deletion aedes.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ var defaultOptions = {
authorizePublish: defaultAuthorizePublish,
authorizeSubscribe: defaultAuthorizeSubscribe,
authorizeForward: defaultAuthorizeForward,
published: defaultPublished
published: defaultPublished,
trustProxy: false,
trustedProxies: []
}

function Aedes (opts) {
Expand Down Expand Up @@ -58,6 +60,8 @@ function Aedes (opts) {
this.authorizeSubscribe = opts.authorizeSubscribe
this.authorizeForward = opts.authorizeForward
this.published = opts.published
this.trustProxy = opts.trustProxy
this.trustedProxies = opts.trustedProxies

this.clients = {}
this.brokers = {}
Expand Down
131 changes: 131 additions & 0 deletions examples/proxy/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
var aedes = require('../../aedes')
getlarge marked this conversation as resolved.
Show resolved Hide resolved
var mqemitter = require('mqemitter')
var persistence = require('aedes-persistence')
var mqttPacket = require('mqtt-packet')
var net = require('net')
var proxyProtocol = require('proxy-protocol-js')

function sendProxyPacket (version = 1) {
var packet = {
cmd: 'connect',
protocolId: 'MQTT',
protocolVersion: 4,
clean: true,
clientId: `my-client-${version}`,
keepalive: 0
}

var protocol
if (version === 1) {
var src = new proxyProtocol.Peer('127.0.0.1', 12345)
var dst = new proxyProtocol.Peer('127.0.0.1', 1883)
protocol = new proxyProtocol.V1BinaryProxyProtocol(
proxyProtocol.INETProtocol.TCP4,
src,
dst,
mqttPacket.generate(packet)
).build()
} else if (version === 2) {
protocol = new proxyProtocol.V2ProxyProtocol(
proxyProtocol.Command.LOCAL,
proxyProtocol.TransportProtocol.DGRAM,
new proxyProtocol.IPv4ProxyAddress(
proxyProtocol.IPv4Address.createFrom([127, 0, 0, 1]),
12346,
proxyProtocol.IPv4Address.createFrom([127, 0, 0, 1]),
1883
),
mqttPacket.generate(packet)
).build()
}

var parsedProto = version === 1
? proxyProtocol.V1BinaryProxyProtocol.parse(protocol)
: proxyProtocol.V2ProxyProtocol.parse(protocol)
// console.log(parsedProto)

var dstPort = version === 1
? parsedProto.destination.port
: parsedProto.proxyAddress.destinationPort

var dstHost = version === 1
? parsedProto.destination.ipAddress
: parsedProto.proxyAddress.destinationAddress.address.join('.')

// console.log('Connection to :', dstHost, dstPort)
var mqttConn = net.createConnection(
{
port: dstPort,
host: dstHost,
timeout: 300
}
)

var data
if (version === 2) {
data = Buffer.from(protocol.buffer)
} else {
data = protocol
}

mqttConn.on('timeout', function () {
// console.log("protocol proxy buffer", data)
mqttConn.write(data, () => {
mqttConn.end()
})
})
}

function startAedes () {
var port = 1883

var broker = aedes({
mq: mqemitter({
concurrency: 100
}),
persistence: persistence(),
preConnect: function (client, done) {
console.log('Aedes preConnect check client ip:', client.ipAddress)
client.ip = client.ipAddress
client.close()
return done(null, true)
},
trustProxy: true
})

var server = require('net').createServer(broker.handle)

server.listen(port, function () {
console.log('Aedes listening on port:', port)
broker.publish({ topic: 'aedes/hello', payload: "I'm broker " + broker.id })
setTimeout(() => sendProxyPacket(1), 250)
setTimeout(() => sendProxyPacket(2), 500)
})

broker.on('subscribe', function (subscriptions, client) {
console.log('MQTT client \x1b[32m' + (client ? client.id : client) +
'\x1b[0m subscribed to topics: ' + subscriptions.map(s => s.topic).join('\n'), 'from broker', broker.id)
})

broker.on('unsubscribe', function (subscriptions, client) {
console.log('MQTT client \x1b[32m' + (client ? client.id : client) +
'\x1b[0m unsubscribed to topics: ' + subscriptions.join('\n'), 'from broker', broker.id)
})

// fired when a client connects
broker.on('client', function (client) {
console.log('Client Connected: \x1b[33m' + (client ? client.id : client) + ' ip ' + (client ? client.ip : null) + '\x1b[0m', 'to broker', broker.id)
})

// fired when a client disconnects
broker.on('clientDisconnect', function (client) {
console.log('Client Disconnected: \x1b[31m' + (client ? client.id : client) + '\x1b[0m', 'to broker', broker.id)
})

// fired when a message is published
broker.on('publish', async function (packet, client) {
console.log('Client \x1b[31m' + (client ? client.id : 'BROKER_' + broker.id) + '\x1b[0m has published', packet.payload.toString(), 'on', packet.topic, 'to broker', broker.id)
})
}

startAedes()
17 changes: 17 additions & 0 deletions examples/proxy/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name": "aedes_proxy",
"version": "1.0.0",
"description": "Testing Aedes Broker behing proxy",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "getlarge",
"license": "MIT",
"dependencies": {
"aedes": "git+https://[email protected]/getlarge/aedes.git#proxy_and_ip_decoder",
"mqemitter": "^3.0.0",
"mqtt-packet": "^6.2.1",
"proxy-protocol-js": "^4.0.2"
mcollina marked this conversation as resolved.
Show resolved Hide resolved
}
}
90 changes: 88 additions & 2 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ var util = require('util')
var eos = require('end-of-stream')
var empty = Buffer.allocUnsafe(0)
var Packet = require('aedes-packet')
var proxyProtocol = require('proxy-protocol-js')
var write = require('./write')
var QoSPacket = require('./qos-packet')
var handleSubscribe = require('./handlers/subscribe')
Expand Down Expand Up @@ -45,8 +46,89 @@ function Client (broker, conn, req) {

this.disconnected = false

this.ipAddress = null
this.ipFamily = null
this.isWebsocket = false
this.isProxied = false
getlarge marked this conversation as resolved.
Show resolved Hide resolved

this.parser.on('packet', enqueue)

function protocolDecoder (data) {
getlarge marked this conversation as resolved.
Show resolved Hide resolved
var proto = {}
if (!data) return proto
// todo: checkProxiesList(that.conn, that.broker.trustedProxies)
var trustProxy = that.broker.trustProxy
var ipFamily
var conn = that.conn
var socket = conn.socket || conn
var headers = that.req && that.req.headers ? that.req.headers : null
if (trustProxy && headers) {
// console.log('PROXY HTTP HEADERS', headers)
if (headers['x-real-ip']) proto.ipAddress = headers['x-real-ip']
else if (headers['x-forwarded-for']) proto.ipAddress = headers['x-forwarded-for']
// console.log('WEB SOCKET VIA PROXY', proto.ipAddress)
that.isWebsocket = true
that.isProxied = true
} else if (trustProxy) {
var proxyProto
try {
proxyProto = proxyProtocol.V1BinaryProxyProtocol.parse(data)
// console.log('PROXY PROTOCOL V1', proxyProto)
} catch (V1BinaryProxyProtocolE) {
try {
proxyProto = proxyProtocol.V2ProxyProtocol.parse(data)
// console.log('PROXY PROTOCOL V2', proxyProto)
} catch (V2ProxyProtocolE) {
// empty;
}
} finally {
if (proxyProto && proxyProto.source && proxyProto.data) {
ipFamily = proxyProto.inetProtocol
proto.ipAddress = proxyProto.source.ipAddress
proto.data = proxyProto.data
that.isWebsocket = false
that.isProxied = true
// console.log('TCP SOCKET VIA PROXY V1', proto.ipAddress)
} else if (proxyProto && proxyProto.proxyAddress && proxyProto.data) {
if (proxyProto.proxyAddress instanceof proxyProtocol.IPv4ProxyAddress) {
ipFamily = 'IPv4'
} else if (proxyProto.proxyAddress instanceof proxyProtocol.IPv6ProxyAddress) {
ipFamily = 'IPv6'
}
proto.ipAddress = proxyProto.proxyAddress.sourceAddress.address.join('.')
proto.data = proxyProto.data
// proto.data = Buffer.from(proxyProto.data)
that.isWebsocket = false
that.isProxied = true
// console.log('TCP SOCKET VIA PROXY V2', proto.ipAddress)
}
}
}
if (!proto.ipAddress) {
if (socket._socket && socket._socket.address) {
that.isWebsocket = true
that.isProxied = false
proto.ipAddress = socket._socket.remoteAddress
ipFamily = socket._socket.remoteFamily
// console.log('WEBSOCKET ADDRESS', proto.ipAddress)
} else if (socket.address) {
proto.ipAddress = socket.remoteAddress
ipFamily = socket.remoteFamily
that.isWebsocket = false
that.isProxied = false
// console.log('TCPSOCKET ADDRESS', proto.ipAddress)
getlarge marked this conversation as resolved.
Show resolved Hide resolved
}
}
if (ipFamily && ipFamily.endsWith('4')) {
proto.ipFamily = 4
} else if (ipFamily && ipFamily.endsWith('6')) {
proto.ipFamily = 6
} else {
proto.ipFamily = 0
}
return proto
}

function nextBatch (err) {
if (err) {
that.emit('error', err)
Expand All @@ -64,8 +146,12 @@ function Client (broker, conn, req) {
that._parsingBatch = 0
var buf = empty
buf = client.conn.read(null)

if (buf) {
var { ipFamily, ipAddress, data } = protocolDecoder(buf)
getlarge marked this conversation as resolved.
Show resolved Hide resolved
if (ipAddress) client.ipAddress = ipAddress
if (ipFamily) client.ipFamily = ipFamily
if (data) {
client.parser.parse(data)
} else if (buf) {
client.parser.parse(buf)
}
}
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
"from2": "^2.3.0",
"mqemitter": "^3.0.0",
"mqtt-packet": "^6.2.1",
"proxy-protocol-js": "^4.0.2",
"pump": "^3.0.0",
"retimer": "^2.0.0",
"reusify": "^1.0.4",
Expand Down
Loading