Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Connect handler #301

Merged
merged 11 commits into from
Aug 23, 2019
32 changes: 29 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ server.listen(8883, function () {
* <a href="#subscribe"><code>instance.<b>subscribe()</b></code></a>
* <a href="#publish"><code>instance.<b>publish()</b></code></a>
* <a href="#unsubscribe"><code>instance.<b>unsubscribe()</b></code></a>
* <a href="#preConnect"><code>instance.<b>preConnect()</b></code></a>
* <a href="#authenticate"><code>instance.<b>authenticate()</b></code></a>
* <a href="#authorizePublish"><code>instance.<b>authorizePublish()</b></code></a>
* <a href="#authorizeSubscribe"><code>instance.<b>authorizeSubscribe()</b></code></a>
Expand Down Expand Up @@ -107,6 +108,8 @@ Options:
packet to arrive, defaults to `30000` milliseconds
* `id`: id used to identify this broker instance in `$SYS` messages,
defaults to `shortid()`
* `preConnect`: function called when a valid CONNECT is received, see
[instance.preConnect()](#preConnect)
* `authenticate`: function used to authenticate clients, see
[instance.authenticate()](#authenticate)
* `authorizePublish`: function used to authorize PUBLISH packets, see
Expand All @@ -120,7 +123,9 @@ Options:

Events:

* `client`: when a new [Client](#client) connects, arguments:
* `client`: when a new [Client](#client) successfully connects and register itself to server, [connackSent event will be come after], arguments:
1. `client`
* `clientReady`: when a new [Client](#client) received all its offline messages, it is ready, arguments:
1. `client`
* `clientDisconnect`: when a [Client](#client) disconnects, arguments:
1. `client`
Expand Down Expand Up @@ -152,8 +157,9 @@ packet.
[UNSUBSCRIBE](https://github.com/mqttjs/mqtt-packet#unsubscribe)
packet.
2. `client`
* `connackSent`: when a CONNACK packet is sent to a client [Client](#client) (happens after `'client'`), arguments:
1. `client`
* `connackSent`: when a CONNACK packet is sent to a client, arguments:
1. `packet`
2. `client`
* `closed`: when the broker is closed

-------------------------------------------------------
Expand Down Expand Up @@ -213,6 +219,26 @@ Both `topic` and `payload` can be `Buffer` objects instead of strings.

The reverse of [subscribe](#subscribe).

-------------------------------------------------------
<a name="preConnect"></a>
### instance.preConnect(client, done(err, successful))

It will be called when aedes instance receives a first valid CONNECT packet from client. client object state is in default and its connected state is false. Any values in CONNECT packet (like clientId, clean flag, keepalive) will pass to client object after this call. Override to supply custom preConnect logic.
Some use cases:
1. Rate Limit / Throttle by `client.conn.remoteAddress`
2. Check `instance.connectedClient` to limit maximum connections
3. IP blacklisting
gnought marked this conversation as resolved.
Show resolved Hide resolved

```js
instance.preConnect = function(client, callback) {
callback(null, client.conn.remoteAddress === '::1') {
}
```
```js
instance.preConnect = function(client, callback) {
callback(new Error('connection error'), client.conn.remoteAddress !== '::1') {
}
```
-------------------------------------------------------
<a name="authenticate"></a>
### instance.authenticate(client, username, password, done(err, successful))
Expand Down
5 changes: 5 additions & 0 deletions aedes.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var defaultOptions = {
concurrency: 100,
heartbeatInterval: 60000, // 1 minute
connectTimeout: 30000, // 30 secs
preConnect: defaultPreConnect,
authenticate: defaultAuthenticate,
authorizePublish: defaultAuthorizePublish,
authorizeSubscribe: defaultAuthorizeSubscribe,
Expand Down Expand Up @@ -50,6 +51,7 @@ function Aedes (opts) {
this._series = series()
this._enqueuers = reusify(DoEnqueues)

this.preConnect = opts.preConnect
this.authenticate = opts.authenticate
this.authorizePublish = opts.authorizePublish
this.authorizeSubscribe = opts.authorizeSubscribe
Expand Down Expand Up @@ -299,6 +301,9 @@ Aedes.prototype.close = function (cb = noop) {

Aedes.prototype.version = require('./package.json').version

function defaultPreConnect (client, callback) {
callback(null, true)
}
function defaultAuthenticate (client, username, password, callback) {
callback(null, true)
}
Expand Down
3 changes: 2 additions & 1 deletion lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ function Client (broker, conn) {

function nextBatch (err) {
if (err) {
return that.emit('error', err)
that.emit('error', err)
return
}

var buf = empty
Expand Down
169 changes: 104 additions & 65 deletions lib/handlers/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ var write = require('../write')
var QoSPacket = require('../qos-packet')
var through = require('through2')
var handleSubscribe = require('./subscribe')
var uuid = require('uuid')
var shortid = require('shortid')

function Connack (arg) {
this.cmd = 'connack'
this.returnCode = arg.returnCode
this.sessionPresent = arg.sessionPresent
}

function ClientPacketStatus (client, packet) {
this.client = client
Expand All @@ -15,6 +21,7 @@ function ClientPacketStatus (client, packet) {

var connectActions = [
authenticate,
setKeepAlive,
fetchSubs,
restoreSubs,
storeWill,
Expand All @@ -33,33 +40,57 @@ var errorMessages = [
]

function handleConnect (client, packet, done) {
client.connected = true
client.clean = packet.clean

if (!packet.clientId && packet.protocolVersion === 3) {
client.emit('error', new Error('Empty clientIds are supported only on MQTT 3.1.1'))
return done()
}

client.id = packet.clientId || uuid.v4()
client._will = packet.will

clearTimeout(client._connectTimer)
client._connectTimer = null

if (packet.keepalive > 0) {
client._keepaliveInterval = (packet.keepalive * 1500) + 1
client._keepaliveTimer = retimer(function keepaliveTimeout () {
client.broker.emit('keepaliveTimeout', client)
client.emit('error', new Error('keep alive timeout'))
}, client._keepaliveInterval)
client.broker.preConnect(client, negate)

function negate (err, successful) {
if (!err && successful === true) {
setImmediate(init, client, packet, done)
return
}
if (err) {
client.broker.emit('connectionError', client, err)
}
client.conn.destroy()
}
}

function init (client, packet, done) {
client.connected = true
var clientId = packet.clientId
var returnCode = 0
// [MQTT-3.1.2-2]
if (packet.protocolVersion < 3 || packet.protocolVersion > 4) {
returnCode = 1
}
// MQTT 3.1.0 allows <= 23 client id length
if (packet.protocolVersion === 3 && clientId.length > 23) {
returnCode = 2
}
if (returnCode > 0) {
var error = new Error(errorMessages[returnCode])
error.errorCode = returnCode
client.broker.emit('clientError', client, error)
doConnack(
{ client: client, returnCode: returnCode, sessionPresent: false },
done.bind(this, error))
client.conn.end()
return
}

client.id = clientId || 'aedes_' + shortid()
client.clean = packet.clean
client._will = packet.will

client.broker._series(
new ClientPacketStatus(client, packet),
connectActions, {}, function (err) {
connectActions,
{ returnCode: 0, sessionPresent: false }, // [MQTT-3.1.4-4], [MQTT-3.2.2-4]
function (err) {
this.client.broker.emit('clientReady', client)
this.client.emit('connected')
client.connackSent = true
done(err)
})
}
Expand All @@ -75,53 +106,62 @@ function authenticate (arg, done) {

function negate (err, successful) {
if (!client.connected) {
// a hack, sometimes close happends before authenticate comes back
// a hack, sometimes close() happened before authenticate() comes back
// we stop here for not to register it and deregister it in write()
return
}
var errCode
if (!err && successful) {
return done()
} else if (err) {
if (err.returnCode && (err.returnCode >= 1 && err.returnCode <= 3)) {
errCode = err.returnCode
write(client, {
cmd: 'connack',
returnCode: err.returnCode
}, client.close.bind(client, done.bind(this, err)))
}

if (err) {
var errCode = err.returnCode
if (errCode && (errCode >= 1 && errCode <= 3)) {
arg.returnCode = errCode
} else {
// If errorCode is 4 or not a number
errCode = 4
write(client, {
cmd: 'connack',
returnCode: 4
}, client.close.bind(client, done.bind(this, err)))
arg.returnCode = 4
}
} else {
errCode = 5
write(client, {
cmd: 'connack',
returnCode: 5
}, client.close.bind(client, done.bind(this, new Error(errorMessages[errCode]))))
arg.returnCode = 5
}
var error = new Error(errorMessages[errCode])
error.errorCode = errCode
var error = new Error(errorMessages[arg.returnCode])
error.errorCode = arg.returnCode
client.broker.emit('clientError', client, error)
arg.client = client
doConnack(arg,
// [MQTT-3.2.2-5]
client.close.bind(client, done.bind(this, error)))
}
}

function setKeepAlive (arg, done) {
if (this.packet.keepalive > 0) {
var client = this.client
// [MQTT-3.1.2-24]
client._keepaliveInterval = (this.packet.keepalive * 1500) + 1
client._keepaliveTimer = retimer(function keepaliveTimeout () {
client.broker.emit('keepaliveTimeout', client)
client.emit('error', new Error('keep alive timeout'))
}, client._keepaliveInterval)
}
done()
}

function fetchSubs (arg, done) {
var client = this.client
if (!this.packet.clean) {
this.client.broker.persistence.subscriptionsByClient({
id: this.client.id,
client.broker.persistence.subscriptionsByClient({
id: client.id,
done: done,
arg: arg
}, gotSubs)
} else {
this.client.broker.persistence.cleanSubscriptions(
this.client,
done)
return
}
arg.sessionPresent = false // [MQTT-3.2.2-1]
client.broker.persistence.cleanSubscriptions(
client,
done)
}

function gotSubs (err, subs, client) {
Expand All @@ -135,21 +175,24 @@ function gotSubs (err, subs, client) {
function restoreSubs (arg, done) {
if (arg.subs) {
handleSubscribe(this.client, { subscriptions: arg.subs, restore: true }, done)
} else {
done()
arg.sessionPresent = !!arg.subs // cast to boolean, [MQTT-3.2.2-2]
return
}
arg.sessionPresent = false // [MQTT-3.2.2-1], [MQTT-3.2.2-3]
done()
}

function storeWill (arg, done) {
this.client.will = this.client._will
if (this.client.will) {
this.client.broker.persistence.putWill(
this.client,
this.client.will,
var client = this.client
client.will = client._will
if (client.will) {
client.broker.persistence.putWill(
client,
client.will,
done)
} else {
done()
return
}
done()
}

function registerClient (arg, done) {
Expand All @@ -158,17 +201,12 @@ function registerClient (arg, done) {
done()
}

function Connack (arg) {
this.cmd = 'connack'
this.returnCode = 0
this.sessionPresent = !!arg.subs // cast to boolean
}

function doConnack (arg, done) {
var client = this.client
var client = arg.client || this.client
const connack = new Connack(arg)
write(client, connack, function () {
client.broker.emit('connackSent', client)
client.broker.emit('connackSent', connack, client)
client.connackSent = true
done()
})
}
Expand All @@ -189,7 +227,6 @@ function emptyQueue (arg, done) {

function emptyQueueFilter (err, client, packet) {
var next = packet.writeCallback
var persistence = client.broker.persistence

if (err) {
client.emit('error', err)
Expand All @@ -202,6 +239,8 @@ function emptyQueueFilter (err, client, packet) {
authorized = client.broker.authorizeForward(client, packet)
}

var persistence = client.broker.persistence

if (client.clean || !authorized) {
persistence.outgoingClearMessageId(client, packet, next)
} else {
Expand Down
12 changes: 9 additions & 3 deletions lib/handlers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,18 @@ var handlePing = require('./ping')

function handle (client, packet, done) {
if (packet.cmd === 'connect') {
// [MQTT-3.1.0-2]
return client.connected ? client.conn.destroy() : handleConnect(client, packet, done)
if (client.connected) {
// [MQTT-3.1.0-2]
client.conn.destroy()
return done(new Error('invalid protocol'))
}
handleConnect(client, packet, done)
return
}
if (!client.connected) {
// [MQTT-3.1.0-1]
return client.conn.destroy()
client.conn.destroy()
return done(new Error('invalid protocol'))
}

switch (packet.cmd) {
Expand Down
3 changes: 2 additions & 1 deletion lib/handlers/pubrec.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ function handlePubrec (client, packet, done) {
function reply (err) {
if (err) {
// TODO is this ok?
return client._onError(err)
client._onError(err)
return
}

write(client, pubrel, done)
Expand Down
Loading