diff --git a/.gitignore b/.gitignore index 5f1abf39..d6510eb8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ # Logs logs *.log +.vscode # Runtime data pids diff --git a/README.md b/README.md index c5e4ce19..0004a80d 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ Barebone MQTT server that can run on any stream server. * [TODO](#todo) * [Plugins](#plugins) * [Collaborators](#collaborators) +* [Contributing](#contributing) * [Acknowledgements](#acknowledgements) * [Mosca Vs Aedes](#mosca-vs-aedes) * [License](#license) @@ -126,6 +127,8 @@ Options: * `persistence`: an instance of [AedesPersistence](http://npm.im/aedes-persistence), check [plugins](#plugins) for more persistence options. It's used to store *QoS > 1*, *retained*, *will* packets and subscriptions in memory or on disk (if not specified default persistence is in memory) * `concurrency`: the max number of messages delivered concurrently, defaults to `100` +* `queueLimit`: the max number of messages queued while client is waiting to connect, + defaults to `42`. If the number is exceeded `connectionError` is thrown with error `Client queue limit reached` * `heartbeatInterval`: the interval at which the broker heartbeat is emitted, it used by other broker in the cluster, defaults to `60000` milliseconds @@ -528,6 +531,10 @@ This library is born after a lot of discussion with all production. This addresses your concerns about performance and stability. +## Contributing + +Want to contribute? Check our list of features/bugs [here](https://github.com/moscajs/aedes/projects/1) + ## Mosca vs Aedes Example benchmark test with 1000 clients sending 5000 QoS 1 messsages. Used [mqtt-benchmark](https://github.com/krylovsk/mqtt-benchmark) with command: diff --git a/aedes.js b/aedes.js index e2d5c268..9bb540f8 100644 --- a/aedes.js +++ b/aedes.js @@ -28,7 +28,8 @@ var defaultOptions = { authorizeForward: defaultAuthorizeForward, published: defaultPublished, trustProxy: false, - trustedProxies: [] + trustedProxies: [], + queueLimit: 42 } function Aedes (opts) { @@ -42,6 +43,7 @@ function Aedes (opts) { this.id = opts.id || uuidv4() this.counter = 0 + this.queueLimit = opts.queueLimit this.connectTimeout = opts.connectTimeout this.mq = opts.mq || mqemitter(opts) this.handle = function handle (conn, req) { diff --git a/lib/client.js b/lib/client.js index fabb068c..ea14a363 100644 --- a/lib/client.js +++ b/lib/client.js @@ -49,7 +49,9 @@ function Client (broker, conn, req) { this._parser = mqtt.parser() this._parser.client = this + this._parser._queue = [] // queue packets received before client fires 'connect' event. Prevents memory leaks on 'connect' event this._parser.on('packet', enqueue) + this.once('connected', dequeue) function nextBatch (err) { if (err) { @@ -82,11 +84,9 @@ function Client (broker, conn, req) { } this._nextBatch = nextBatch - this.on('error', onError) - - nextBatch() - conn.on('readable', nextBatch) + + this.on('error', onError) conn.on('error', this.emit.bind(this, 'error')) this._parser.on('error', this.emit.bind(this, 'error')) @@ -246,6 +246,8 @@ Client.prototype.close = function (done) { this._parser.removeAllListeners('packet') conn.removeAllListeners('readable') + this._parser._queue = null + if (this._keepaliveTimer) { this._keepaliveTimer.clear() this._keepaliveInterval = -1 @@ -338,9 +340,22 @@ function enqueue (packet) { if (client.connackSent || client._parsingBatch === 1) { handle(client, packet, client._nextBatch) } else { - client.on('connected', () => { - handle(client, packet, client._nextBatch) - }) + if (this._queue.length < client.broker.queueLimit) { + this._queue.push(packet) + } else { + this.emit('error', new Error('Client queue limit reached')) + } + } +} + +function dequeue () { + var q = this._parser._queue + if (q) { + for (var i = 0, len = q.length; i < len; i++) { + handle(this, q[i], this._nextBatch) + } + + this._parser._queue = null } } diff --git a/test/connect.js b/test/connect.js index 5eefd9ae..04077997 100644 --- a/test/connect.js +++ b/test/connect.js @@ -332,6 +332,84 @@ test('reject clients with wrong protocol name', function (t) { broker.on('closed', t.end.bind(t)) }) +test('After first CONNECT Packet, others are queued until \'connect\' event', function (t) { + var queueLimit = 50 + var broker = aedes({ queueLimit }) + + var publishP = { + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 0, + retain: false + } + + var connectP = { + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'abcde', + keepalive: 0 + } + + var s = setup(broker, false) + s.inStream.write(connectP) + + process.once('warning', e => t.fail('Memory leak detected')) + + for (let i = 0; i < queueLimit; i++) { + s.inStream.write(publishP) + } + + broker.on('client', function (client) { + t.equal(client._parser._queue.length, queueLimit, 'Packets have been queued') + + client.once('connected', () => { + t.equal(client._parser._queue, null, 'Queue is empty') + s.conn.destroy() + broker.close(t.end) + }) + }) +}) + +test('Test queue limit', function (t) { + var queueLimit = 50 + var broker = aedes({ queueLimit }) + + var publishP = { + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 0, + retain: false + } + + var connectP = { + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'abcde', + keepalive: 0 + } + + var s = setup(broker, false) + s.inStream.write(connectP) + + process.once('warning', e => t.fail('Memory leak detected')) + + for (let i = 0; i < queueLimit + 1; i++) { + s.inStream.write(publishP) + } + + broker.on('connectionError', function (conn, err) { + t.equal(err.message, 'Client queue limit reached', 'Queue error is thrown') + s.conn.destroy() + broker.close(t.end) + }) +}) + ;[[0, null, false], [1, null, true], [1, new Error('connection banned'), false], [1, new Error('connection banned'), true]].forEach(function (ele) { var plan = ele[0] var err = ele[1] diff --git a/types/index.d.ts b/types/index.d.ts index 0ebfd6c4..ea0ff9e2 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -69,6 +69,7 @@ declare namespace aedes { authorizeSubscribe?: AuthorizeSubscribeCallback authorizeForward?: AuthorizeForwardCallback published?: PublishedCallback + queueLimit?: number } export interface Aedes extends EventEmitter {