Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Memory leak on client.js 'connected' event #348 #362

Merged
merged 17 commits into from
Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Logs
logs
*.log
.vscode

# Runtime data
pids
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Barebone MQTT server that can run on any stream server.
* [TODO](#todo)
* [Plugins](#plugins)
* [Collaborators](#collaborators)
* [Contributing](#contributing)
* [Acknowledgements](#acknowledgements)
* [Mosca Vs Aedes](#mosca-vs-aedes)
* [License](#license)
Expand Down Expand Up @@ -126,6 +127,8 @@ Options:
* `persistence`: an instance of [AedesPersistence](http://npm.im/aedes-persistence), check [plugins](#plugins) for more persistence options. It's used to store *QoS > 1*, *retained*, *will* packets and subscriptions in memory or on disk (if not specified default persistence is in memory)
* `concurrency`: the max number of messages delivered concurrently,
defaults to `100`
* `queueLimit`: the max number of messages queued while client is waiting to connect,
defaults to `42`. If the number is exceeded `connectionError` is thrown with error `Client queue limit reached`
* `heartbeatInterval`: the interval at which the broker heartbeat is
emitted, it used by other broker in the cluster, defaults to
`60000` milliseconds
Expand Down Expand Up @@ -528,6 +531,10 @@ This library is born after a lot of discussion with all
production. This addresses your concerns about performance and
stability.

## Contributing

Want to contribute? Check our list of features/bugs [here](https://github.com/moscajs/aedes/projects/1)

## Mosca vs Aedes

Example benchmark test with 1000 clients sending 5000 QoS 1 messsages. Used [mqtt-benchmark](https://github.com/krylovsk/mqtt-benchmark) with command:
Expand Down
4 changes: 3 additions & 1 deletion aedes.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ var defaultOptions = {
authorizeForward: defaultAuthorizeForward,
published: defaultPublished,
trustProxy: false,
trustedProxies: []
trustedProxies: [],
queueLimit: 42
}

function Aedes (opts) {
Expand All @@ -42,6 +43,7 @@ function Aedes (opts) {

this.id = opts.id || uuidv4()
this.counter = 0
this.queueLimit = opts.queueLimit
this.connectTimeout = opts.connectTimeout
this.mq = opts.mq || mqemitter(opts)
this.handle = function handle (conn, req) {
Expand Down
29 changes: 22 additions & 7 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ function Client (broker, conn, req) {

this._parser = mqtt.parser()
this._parser.client = this
this._parser._queue = [] // queue packets received before client fires 'connect' event. Prevents memory leaks on 'connect' event
this._parser.on('packet', enqueue)
this.once('connected', dequeue)

function nextBatch (err) {
if (err) {
Expand Down Expand Up @@ -82,11 +84,9 @@ function Client (broker, conn, req) {
}
this._nextBatch = nextBatch

this.on('error', onError)

nextBatch()

conn.on('readable', nextBatch)

this.on('error', onError)
conn.on('error', this.emit.bind(this, 'error'))
this._parser.on('error', this.emit.bind(this, 'error'))

Expand Down Expand Up @@ -246,6 +246,8 @@ Client.prototype.close = function (done) {
this._parser.removeAllListeners('packet')
conn.removeAllListeners('readable')

this._parser._queue = null

if (this._keepaliveTimer) {
this._keepaliveTimer.clear()
this._keepaliveInterval = -1
Expand Down Expand Up @@ -338,9 +340,22 @@ function enqueue (packet) {
if (client.connackSent || client._parsingBatch === 1) {
handle(client, packet, client._nextBatch)
} else {
client.on('connected', () => {
handle(client, packet, client._nextBatch)
})
if (this._queue.length < client.broker.queueLimit) {
this._queue.push(packet)
} else {
this.emit('error', new Error('Client queue limit reached'))
}
}
}

function dequeue () {
var q = this._parser._queue
if (q) {
for (var i = 0, len = q.length; i < len; i++) {
handle(this, q[i], this._nextBatch)
}
robertsLando marked this conversation as resolved.
Show resolved Hide resolved

this._parser._queue = null
}
}

Expand Down
78 changes: 78 additions & 0 deletions test/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,84 @@ test('reject clients with wrong protocol name', function (t) {
broker.on('closed', t.end.bind(t))
})

test('After first CONNECT Packet, others are queued until \'connect\' event', function (t) {
var queueLimit = 50
var broker = aedes({ queueLimit })

var publishP = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
qos: 0,
retain: false
}

var connectP = {
cmd: 'connect',
protocolId: 'MQTT',
protocolVersion: 4,
clean: true,
clientId: 'abcde',
keepalive: 0
}

var s = setup(broker, false)
s.inStream.write(connectP)

process.once('warning', e => t.fail('Memory leak detected'))

for (let i = 0; i < queueLimit; i++) {
s.inStream.write(publishP)
}

broker.on('client', function (client) {
t.equal(client._parser._queue.length, queueLimit, 'Packets have been queued')

client.once('connected', () => {
t.equal(client._parser._queue, null, 'Queue is empty')
s.conn.destroy()
broker.close(t.end)
})
})
})

test('Test queue limit', function (t) {
var queueLimit = 50
var broker = aedes({ queueLimit })

var publishP = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
qos: 0,
retain: false
}

var connectP = {
cmd: 'connect',
protocolId: 'MQTT',
protocolVersion: 4,
clean: true,
clientId: 'abcde',
keepalive: 0
}

var s = setup(broker, false)
s.inStream.write(connectP)

process.once('warning', e => t.fail('Memory leak detected'))

for (let i = 0; i < queueLimit + 1; i++) {
s.inStream.write(publishP)
}

broker.on('connectionError', function (conn, err) {
t.equal(err.message, 'Client queue limit reached', 'Queue error is thrown')
s.conn.destroy()
broker.close(t.end)
})
})

;[[0, null, false], [1, null, true], [1, new Error('connection banned'), false], [1, new Error('connection banned'), true]].forEach(function (ele) {
var plan = ele[0]
var err = ele[1]
Expand Down
1 change: 1 addition & 0 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ declare namespace aedes {
authorizeSubscribe?: AuthorizeSubscribeCallback
authorizeForward?: AuthorizeForwardCallback
published?: PublishedCallback
queueLimit?: number
}

export interface Aedes extends EventEmitter {
Expand Down