From 89329c39577871e22f00f2b40f3d70e8f5c9dc3e Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 23 Jan 2020 14:21:47 +0100 Subject: [PATCH 01/16] fix: Memory leak on client.js 'connected' event #348 --- lib/client.js | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/lib/client.js b/lib/client.js index 66784a44..b8835a4c 100644 --- a/lib/client.js +++ b/lib/client.js @@ -48,6 +48,15 @@ function Client (broker, conn, req) { this.connDetails = null + // queue packets received before client fires 'connect' event. Prevents memory leaks on 'connect' event + this.parser._queue = [] + + this.on('connected', () => { + while (this.parser._queue.length > 0) { + handle(this, this.parser._queue.shift(), this._nextBatch) + } + }) + this.parser.on('packet', enqueue) function nextBatch (err) { @@ -330,9 +339,7 @@ function enqueue (packet) { if (client.connackSent || client._parsingBatch === 1) { handle(client, packet, client._nextBatch) } else { - client.on('connected', () => { - handle(client, packet, client._nextBatch) - }) + this._queue.push(packet) } } From ebbec2f9da4ab9727c4f31727c4cded789339ea6 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 23 Jan 2020 14:55:39 +0100 Subject: [PATCH 02/16] fix: Test for connect memory leak --- test/connect.js | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/test/connect.js b/test/connect.js index 5eefd9ae..10345204 100644 --- a/test/connect.js +++ b/test/connect.js @@ -332,6 +332,46 @@ test('reject clients with wrong protocol name', function (t) { broker.on('closed', t.end.bind(t)) }) +test.only('After first CONNECT Packet, others are queued until \'connect\' event', function (t) { + t.plan(2) + + var broker = aedes() + + var publishP = { + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 0, + retain: false + } + + var connectP = { + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'abcde', + keepalive: 0 + } + + var s = setup(broker, false) + s.inStream.write(connectP) + + for (let i = 0; i < 10; i++) { + s.inStream.write(publishP) + } + + broker.on('client', function (client) { + t.equal(client.parser._queue.length, 10, 'Packets have been queued') + + setTimeout(() => { + t.equal(client.parser._queue.length, 0, 'Queue is empty') + s.conn.destroy() + broker.close(t.end) + }, 100) + }) +}) + ;[[0, null, false], [1, null, true], [1, new Error('connection banned'), false], [1, new Error('connection banned'), true]].forEach(function (ele) { var plan = ele[0] var err = ele[1] From 2d833a92253b3a3dbaf9b7b35aaae86f5c3eb248 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 23 Jan 2020 14:56:07 +0100 Subject: [PATCH 03/16] fix: Removed typo --- test/connect.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/connect.js b/test/connect.js index 10345204..686b30a8 100644 --- a/test/connect.js +++ b/test/connect.js @@ -332,7 +332,7 @@ test('reject clients with wrong protocol name', function (t) { broker.on('closed', t.end.bind(t)) }) -test.only('After first CONNECT Packet, others are queued until \'connect\' event', function (t) { +test('After first CONNECT Packet, others are queued until \'connect\' event', function (t) { t.plan(2) var broker = aedes() From f974d4fde86defa7c418024947d00658ee7b647f Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 23 Jan 2020 14:57:40 +0100 Subject: [PATCH 04/16] fix: Replaced timout with connected event --- test/connect.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/connect.js b/test/connect.js index 686b30a8..46fcbb1a 100644 --- a/test/connect.js +++ b/test/connect.js @@ -364,11 +364,11 @@ test('After first CONNECT Packet, others are queued until \'connect\' event', fu broker.on('client', function (client) { t.equal(client.parser._queue.length, 10, 'Packets have been queued') - setTimeout(() => { + client.on('connected', () => { t.equal(client.parser._queue.length, 0, 'Queue is empty') s.conn.destroy() broker.close(t.end) - }, 100) + }) }) }) From d6bb864c62762a365ccfa22cf280faddcb12c408 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 23 Jan 2020 15:22:04 +0100 Subject: [PATCH 05/16] fix: Clean up code --- lib/client.js | 11 ++++++++--- test/connect.js | 4 ++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/lib/client.js b/lib/client.js index b8835a4c..0d0332e1 100644 --- a/lib/client.js +++ b/lib/client.js @@ -51,11 +51,16 @@ function Client (broker, conn, req) { // queue packets received before client fires 'connect' event. Prevents memory leaks on 'connect' event this.parser._queue = [] - this.on('connected', () => { - while (this.parser._queue.length > 0) { + this.on('connected', dequeue) + + function dequeue () { + var q = this.parser._queue + for (var i = 0, len = q.length; i < len; i++) { handle(this, this.parser._queue.shift(), this._nextBatch) } - }) + + this.parser._queue = null + } this.parser.on('packet', enqueue) diff --git a/test/connect.js b/test/connect.js index 46fcbb1a..d44f7b4b 100644 --- a/test/connect.js +++ b/test/connect.js @@ -364,8 +364,8 @@ test('After first CONNECT Packet, others are queued until \'connect\' event', fu broker.on('client', function (client) { t.equal(client.parser._queue.length, 10, 'Packets have been queued') - client.on('connected', () => { - t.equal(client.parser._queue.length, 0, 'Queue is empty') + client.once('connected', () => { + t.equal(client.parser._queue, null, 'Queue is empty') s.conn.destroy() broker.close(t.end) }) From f9fed5250ebf971fd8437ed617bf593885482c64 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 23 Jan 2020 15:24:01 +0100 Subject: [PATCH 06/16] fix: Removed shift --- lib/client.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/client.js b/lib/client.js index 0d0332e1..113ef0a7 100644 --- a/lib/client.js +++ b/lib/client.js @@ -56,7 +56,7 @@ function Client (broker, conn, req) { function dequeue () { var q = this.parser._queue for (var i = 0, len = q.length; i < len; i++) { - handle(this, this.parser._queue.shift(), this._nextBatch) + handle(this, q[i], this._nextBatch) } this.parser._queue = null From 95efe311d1ec36dc9b1a3588b340645c47ecd217 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 23 Jan 2020 15:50:01 +0100 Subject: [PATCH 07/16] refactor: Code improvments --- lib/client.js | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/client.js b/lib/client.js index 113ef0a7..a3fd6fe4 100644 --- a/lib/client.js +++ b/lib/client.js @@ -51,16 +51,7 @@ function Client (broker, conn, req) { // queue packets received before client fires 'connect' event. Prevents memory leaks on 'connect' event this.parser._queue = [] - this.on('connected', dequeue) - - function dequeue () { - var q = this.parser._queue - for (var i = 0, len = q.length; i < len; i++) { - handle(this, q[i], this._nextBatch) - } - - this.parser._queue = null - } + this.once('connected', dequeue) this.parser.on('packet', enqueue) @@ -348,4 +339,13 @@ function enqueue (packet) { } } +function dequeue () { + var q = this.parser._queue + for (var i = 0, len = q.length; i < len; i++) { + handle(this, q[i], this._nextBatch) + } + + this.parser._queue = null +} + function nop () { } From 7683bec361fca3eff36b46806a22f37c9ed433e7 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 23 Jan 2020 17:01:29 +0100 Subject: [PATCH 08/16] fix: Detect memory leaks --- test/connect.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/connect.js b/test/connect.js index d44f7b4b..7d997972 100644 --- a/test/connect.js +++ b/test/connect.js @@ -333,8 +333,6 @@ test('reject clients with wrong protocol name', function (t) { }) test('After first CONNECT Packet, others are queued until \'connect\' event', function (t) { - t.plan(2) - var broker = aedes() var publishP = { @@ -357,12 +355,14 @@ test('After first CONNECT Packet, others are queued until \'connect\' event', fu var s = setup(broker, false) s.inStream.write(connectP) - for (let i = 0; i < 10; i++) { + process.once('warning', e => t.fail('Memory leak detected')) + + for (let i = 0; i < 100; i++) { s.inStream.write(publishP) } broker.on('client', function (client) { - t.equal(client.parser._queue.length, 10, 'Packets have been queued') + t.equal(client.parser._queue.length, 100, 'Packets have been queued') client.once('connected', () => { t.equal(client.parser._queue, null, 'Queue is empty') From 517eb862222f97825c6de0920de5da6b2c820d89 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Fri, 24 Jan 2020 11:53:26 +0100 Subject: [PATCH 09/16] fix: Free queue on client close --- lib/client.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/client.js b/lib/client.js index a3fd6fe4..a8a15da6 100644 --- a/lib/client.js +++ b/lib/client.js @@ -254,6 +254,8 @@ Client.prototype.close = function (done) { this.parser.removeAllListeners('packet') conn.removeAllListeners('readable') + this.parser._queue = null + if (this._keepaliveTimer) { this._keepaliveTimer.clear() this._keepaliveInterval = -1 @@ -340,7 +342,7 @@ function enqueue (packet) { } function dequeue () { - var q = this.parser._queue + var q = this.parser._queue || [] for (var i = 0, len = q.length; i < len; i++) { handle(this, q[i], this._nextBatch) } From a486cfd31e4088d9a9fad45c788bcefb6b2cad34 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Fri, 24 Jan 2020 14:22:58 +0100 Subject: [PATCH 10/16] feat: Queue limit --- .gitignore | 1 + aedes.js | 4 +++- lib/client.js | 18 ++++++++++++------ test/connect.js | 44 +++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 57 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 5f1abf39..d6510eb8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ # Logs logs *.log +.vscode # Runtime data pids diff --git a/aedes.js b/aedes.js index e2d5c268..9bb540f8 100644 --- a/aedes.js +++ b/aedes.js @@ -28,7 +28,8 @@ var defaultOptions = { authorizeForward: defaultAuthorizeForward, published: defaultPublished, trustProxy: false, - trustedProxies: [] + trustedProxies: [], + queueLimit: 42 } function Aedes (opts) { @@ -42,6 +43,7 @@ function Aedes (opts) { this.id = opts.id || uuidv4() this.counter = 0 + this.queueLimit = opts.queueLimit this.connectTimeout = opts.connectTimeout this.mq = opts.mq || mqemitter(opts) this.handle = function handle (conn, req) { diff --git a/lib/client.js b/lib/client.js index a8a15da6..f1496f79 100644 --- a/lib/client.js +++ b/lib/client.js @@ -50,6 +50,7 @@ function Client (broker, conn, req) { // queue packets received before client fires 'connect' event. Prevents memory leaks on 'connect' event this.parser._queue = [] + this.parser.queueLimit = broker.queueLimit this.once('connected', dequeue) @@ -337,17 +338,22 @@ function enqueue (packet) { if (client.connackSent || client._parsingBatch === 1) { handle(client, packet, client._nextBatch) } else { - this._queue.push(packet) + if (this._queue.length < this.queueLimit) { + this._queue.push(packet) + } else { + this.emit('error', new Error('Client queue limit reached')) + } } } function dequeue () { - var q = this.parser._queue || [] - for (var i = 0, len = q.length; i < len; i++) { - handle(this, q[i], this._nextBatch) - } + if (this.parser._queue) { + for (var i = 0, len = this.parser._queue.length; i < len; i++) { + handle(this, this.parser._queue[i], this._nextBatch) + } - this.parser._queue = null + this.parser._queue = null + } } function nop () { } diff --git a/test/connect.js b/test/connect.js index 7d997972..b1ee779d 100644 --- a/test/connect.js +++ b/test/connect.js @@ -333,7 +333,8 @@ test('reject clients with wrong protocol name', function (t) { }) test('After first CONNECT Packet, others are queued until \'connect\' event', function (t) { - var broker = aedes() + var queueLimit = 50 + var broker = aedes({ queueLimit }) var publishP = { cmd: 'publish', @@ -357,12 +358,12 @@ test('After first CONNECT Packet, others are queued until \'connect\' event', fu process.once('warning', e => t.fail('Memory leak detected')) - for (let i = 0; i < 100; i++) { + for (let i = 0; i < queueLimit; i++) { s.inStream.write(publishP) } broker.on('client', function (client) { - t.equal(client.parser._queue.length, 100, 'Packets have been queued') + t.equal(client.parser._queue.length, queueLimit, 'Packets have been queued') client.once('connected', () => { t.equal(client.parser._queue, null, 'Queue is empty') @@ -372,6 +373,43 @@ test('After first CONNECT Packet, others are queued until \'connect\' event', fu }) }) +test('Test queue limit', function (t) { + var queueLimit = 50 + var broker = aedes({ queueLimit }) + + var publishP = { + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 0, + retain: false + } + + var connectP = { + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'abcde', + keepalive: 0 + } + + var s = setup(broker, false) + s.inStream.write(connectP) + + process.once('warning', e => t.fail('Memory leak detected')) + + for (let i = 0; i < queueLimit + 5; i++) { + s.inStream.write(publishP) + } + + broker.on('connectionError', function (conn, err) { + t.equal(err.message, 'Client queue limit reached', 'Queue error is thrown') + s.conn.destroy() + broker.close(t.end) + }) +}) + ;[[0, null, false], [1, null, true], [1, new Error('connection banned'), false], [1, new Error('connection banned'), true]].forEach(function (ele) { var plan = ele[0] var err = ele[1] From 6b72608b34a11fd83b364f228829b40690565ef4 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Fri, 24 Jan 2020 14:29:04 +0100 Subject: [PATCH 11/16] docs: Updated interface and docs about queueLimit --- README.md | 2 ++ types/index.d.ts | 33 +++++++++++++++++---------------- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index b783d642..3f40e462 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,8 @@ Options: * `persistence`: an instance of [AedesPersistence](http://npm.im/aedes-persistence), check [plugins](#plugins) for more persistence options. It's used to store *QoS > 1*, *retained*, *will* packets and subscriptions in memory or on disk (if not specified default persistence is in memory) * `concurrency`: the max number of messages delivered concurrently, defaults to `100` +* `queueLimit`: the max number of messages queued while client is waiting to connect, + defaults to `42`. If the number is exceeded `connectionError` is thrown with error `Client queue limit reached` * `heartbeatInterval`: the interval at which the broker heartbeat is emitted, it used by other broker in the cluster, defaults to `60000` milliseconds diff --git a/types/index.d.ts b/types/index.d.ts index 0ebfd6c4..9d5a933b 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -10,7 +10,7 @@ import { Socket } from 'net' import { IncomingMessage } from 'http' import EventEmitter = NodeJS.EventEmitter -declare function aedes (options?: aedes.AedesOptions): aedes.Aedes +declare function aedes(options?: aedes.AedesOptions): aedes.Aedes // eslint-disable-next-line no-redeclare declare namespace aedes { @@ -27,15 +27,15 @@ declare namespace aedes { conn: Socket req?: IncomingMessage - on (event: 'error', cb: (err: Error) => void): this + on(event: 'error', cb: (err: Error) => void): this - publish (message: IPublishPacket, callback?: () => void): void - subscribe ( + publish(message: IPublishPacket, callback?: () => void): void + subscribe( subscriptions: ISubscription | ISubscription[] | ISubscribePacket, callback?: () => void ): void - unsubscribe (topicObjects: ISubscription | ISubscription[], callback?: () => void): void - close (callback?: () => void): void + unsubscribe(topicObjects: ISubscription | ISubscription[], callback?: () => void): void + close(callback?: () => void): void } export type PreConnectCallback = (client: Client, done: (err: Error | null, success: boolean) => void) => void @@ -69,6 +69,7 @@ declare namespace aedes { authorizeSubscribe?: AuthorizeSubscribeCallback authorizeForward?: AuthorizeForwardCallback published?: PublishedCallback + queueLimit?: number } export interface Aedes extends EventEmitter { @@ -81,23 +82,23 @@ declare namespace aedes { authorizeForward: AuthorizeForwardCallback published: PublishedCallback - on (event: 'closed', cb: () => void): this - on (event: 'client' | 'clientReady' | 'clientDisconnect' | 'keepaliveTimeout' | 'connackSent', cb: (client: Client) => void): this - on (event: 'clientError' | 'connectionError', cb: (client: Client, error: Error) => void): this - on (event: 'ping' | 'publish' | 'ack', cb: (packet: any, client: Client) => void): this - on (event: 'subscribe' | 'unsubscribe', cb: (subscriptions: ISubscription | ISubscription[] | ISubscribePacket, client: Client) => void): this + on(event: 'closed', cb: () => void): this + on(event: 'client' | 'clientReady' | 'clientDisconnect' | 'keepaliveTimeout' | 'connackSent', cb: (client: Client) => void): this + on(event: 'clientError' | 'connectionError', cb: (client: Client, error: Error) => void): this + on(event: 'ping' | 'publish' | 'ack', cb: (packet: any, client: Client) => void): this + on(event: 'subscribe' | 'unsubscribe', cb: (subscriptions: ISubscription | ISubscription[] | ISubscribePacket, client: Client) => void): this - publish (packet: IPublishPacket & { topic: string | Buffer }, done: () => void): void - subscribe (topic: string, callback: (packet: ISubscribePacket, cb: () => void) => void, done: () => void): void - unsubscribe ( + publish(packet: IPublishPacket & { topic: string | Buffer }, done: () => void): void + subscribe(topic: string, callback: (packet: ISubscribePacket, cb: () => void) => void, done: () => void): void + unsubscribe( topic: string, callback: (packet: IUnsubscribePacket, cb: () => void) => void, done: () => void ): void - close (callback?: () => void): void + close(callback?: () => void): void } - export function Server (options?: aedes.AedesOptions): aedes.Aedes + export function Server(options?: aedes.AedesOptions): aedes.Aedes } export = aedes From fad8e18f89332c0677248458714f6a7337c8fe06 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Fri, 24 Jan 2020 14:33:45 +0100 Subject: [PATCH 12/16] fix: Typo --- test/connect.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/connect.js b/test/connect.js index b1ee779d..5840f6f6 100644 --- a/test/connect.js +++ b/test/connect.js @@ -399,7 +399,7 @@ test('Test queue limit', function (t) { process.once('warning', e => t.fail('Memory leak detected')) - for (let i = 0; i < queueLimit + 5; i++) { + for (let i = 0; i < queueLimit + 1; i++) { s.inStream.write(publishP) } From 5f639f04cbd84a72eff3ecc5df79da93ca8fa974 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Fri, 24 Jan 2020 14:42:26 +0100 Subject: [PATCH 13/16] docs: contributing --- README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/README.md b/README.md index 3f40e462..4361a12e 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ Barebone MQTT server that can run on any stream server. * [TODO](#todo) * [Plugins](#plugins) * [Collaborators](#collaborators) +* [Contributing](#contributing) * [Acknowledgements](#acknowledgements) * [Mosca Vs Aedes](#mosca-vs-aedes) * [License](#license) @@ -530,6 +531,10 @@ This library is born after a lot of discussion with all production. This addresses your concerns about performance and stability. +## Contributing + +Want to contribute? Check our list of features/bugs [here](https://github.com/moscajs/aedes/projects/1) + ## Mosca vs Aedes Example benchmark test with 1000 clients sending 5000 QoS 1 messsages. Used [mqtt-benchmark](https://github.com/krylovsk/mqtt-benchmark) with command: From ef52695be9f13614625b4296783c7b9850a1253f Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Tue, 28 Jan 2020 09:32:12 +0100 Subject: [PATCH 14/16] fix: code refactoring --- lib/client.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/client.js b/lib/client.js index f1496f79..c835cdf3 100644 --- a/lib/client.js +++ b/lib/client.js @@ -50,7 +50,6 @@ function Client (broker, conn, req) { // queue packets received before client fires 'connect' event. Prevents memory leaks on 'connect' event this.parser._queue = [] - this.parser.queueLimit = broker.queueLimit this.once('connected', dequeue) @@ -338,7 +337,7 @@ function enqueue (packet) { if (client.connackSent || client._parsingBatch === 1) { handle(client, packet, client._nextBatch) } else { - if (this._queue.length < this.queueLimit) { + if (this._queue.length < client.broker.queueLimit) { this._queue.push(packet) } else { this.emit('error', new Error('Client queue limit reached')) @@ -347,9 +346,10 @@ function enqueue (packet) { } function dequeue () { - if (this.parser._queue) { - for (var i = 0, len = this.parser._queue.length; i < len; i++) { - handle(this, this.parser._queue[i], this._nextBatch) + var q = this.parser._queue + if (q) { + for (var i = 0, len = q.length; i < len; i++) { + handle(this, q[i], this._nextBatch) } this.parser._queue = null From bae627e26989f6b75a0d48b1bab7c673eb2e4547 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Tue, 28 Jan 2020 09:36:46 +0100 Subject: [PATCH 15/16] fix: Removed unused 'nextBach' call in client #363 --- lib/client.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/client.js b/lib/client.js index c835cdf3..f536eea4 100644 --- a/lib/client.js +++ b/lib/client.js @@ -88,8 +88,6 @@ function Client (broker, conn, req) { this.on('error', onError) - nextBatch() - conn.on('readable', nextBatch) conn.on('error', this.emit.bind(this, 'error')) this.parser.on('error', this.emit.bind(this, 'error')) From 3bdd9af1988362e9d28cd7356dde834bea8e114e Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Tue, 28 Jan 2020 11:26:35 +0100 Subject: [PATCH 16/16] refactor: code formatting --- types/index.d.ts | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/types/index.d.ts b/types/index.d.ts index 9d5a933b..ea0ff9e2 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -10,7 +10,7 @@ import { Socket } from 'net' import { IncomingMessage } from 'http' import EventEmitter = NodeJS.EventEmitter -declare function aedes(options?: aedes.AedesOptions): aedes.Aedes +declare function aedes (options?: aedes.AedesOptions): aedes.Aedes // eslint-disable-next-line no-redeclare declare namespace aedes { @@ -27,15 +27,15 @@ declare namespace aedes { conn: Socket req?: IncomingMessage - on(event: 'error', cb: (err: Error) => void): this + on (event: 'error', cb: (err: Error) => void): this - publish(message: IPublishPacket, callback?: () => void): void - subscribe( + publish (message: IPublishPacket, callback?: () => void): void + subscribe ( subscriptions: ISubscription | ISubscription[] | ISubscribePacket, callback?: () => void ): void - unsubscribe(topicObjects: ISubscription | ISubscription[], callback?: () => void): void - close(callback?: () => void): void + unsubscribe (topicObjects: ISubscription | ISubscription[], callback?: () => void): void + close (callback?: () => void): void } export type PreConnectCallback = (client: Client, done: (err: Error | null, success: boolean) => void) => void @@ -82,23 +82,23 @@ declare namespace aedes { authorizeForward: AuthorizeForwardCallback published: PublishedCallback - on(event: 'closed', cb: () => void): this - on(event: 'client' | 'clientReady' | 'clientDisconnect' | 'keepaliveTimeout' | 'connackSent', cb: (client: Client) => void): this - on(event: 'clientError' | 'connectionError', cb: (client: Client, error: Error) => void): this - on(event: 'ping' | 'publish' | 'ack', cb: (packet: any, client: Client) => void): this - on(event: 'subscribe' | 'unsubscribe', cb: (subscriptions: ISubscription | ISubscription[] | ISubscribePacket, client: Client) => void): this + on (event: 'closed', cb: () => void): this + on (event: 'client' | 'clientReady' | 'clientDisconnect' | 'keepaliveTimeout' | 'connackSent', cb: (client: Client) => void): this + on (event: 'clientError' | 'connectionError', cb: (client: Client, error: Error) => void): this + on (event: 'ping' | 'publish' | 'ack', cb: (packet: any, client: Client) => void): this + on (event: 'subscribe' | 'unsubscribe', cb: (subscriptions: ISubscription | ISubscription[] | ISubscribePacket, client: Client) => void): this - publish(packet: IPublishPacket & { topic: string | Buffer }, done: () => void): void - subscribe(topic: string, callback: (packet: ISubscribePacket, cb: () => void) => void, done: () => void): void - unsubscribe( + publish (packet: IPublishPacket & { topic: string | Buffer }, done: () => void): void + subscribe (topic: string, callback: (packet: ISubscribePacket, cb: () => void) => void, done: () => void): void + unsubscribe ( topic: string, callback: (packet: IUnsubscribePacket, cb: () => void) => void, done: () => void ): void - close(callback?: () => void): void + close (callback?: () => void): void } - export function Server(options?: aedes.AedesOptions): aedes.Aedes + export function Server (options?: aedes.AedesOptions): aedes.Aedes } export = aedes