Skip to content

Commit

Permalink
Proper handling of socketErrors in mqtt client v4 (#593)
Browse files Browse the repository at this point in the history
Co-authored-by: Daniel Lando <[email protected]>
  • Loading branch information
gnought and robertsLando authored Mar 12, 2021
1 parent 1f978aa commit f1af8e7
Showing 1 changed file with 31 additions and 16 deletions.
47 changes: 31 additions & 16 deletions test/not-blocking.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const { test } = require('tap')
const EventEmitter = require('events')
const mqtt = require('mqtt')
const net = require('net')
const Faketimers = require('@sinonjs/fake-timers')
Expand Down Expand Up @@ -53,6 +54,7 @@ test('connect 200 concurrent clients', function (t) {
test('do not block after a subscription', function (t) {
t.plan(3)

const evt = new EventEmitter()
const broker = aedes()
const server = net.createServer(broker.handle)
const total = 10000
Expand Down Expand Up @@ -97,10 +99,8 @@ test('do not block after a subscription', function (t) {
port: port,
keepalive: 0
}).on('error', function (err) {
if (err.code !== 'ECONNRESET') {
clock.clearTimeout(clockId)
t.fail(err)
}
clock.clearTimeout(clockId)
t.fail(err)
})

subscriber.subscribe('test', publish)
Expand All @@ -112,24 +112,33 @@ test('do not block after a subscription', function (t) {
received++
clock.tick(1)
})
subscriber.on('close', function () {
evt.emit('finish')
})
}

publisher.on('connect', startSubscriber)

function finish () {
subscriber.end()
publisher.end()
publisher.on('close', function () {
evt.emit('finish')
})
evt.on('finish', function () {
if (publisher.connected || subscriber.connected) { return }
broker.close()
server.close()
t.equal(total, sent, 'messages sent')
t.equal(total, received, 'messages received')
})
function finish () {
subscriber.end()
publisher.end()
}
})
})

test('do not block with overlapping subscription', function (t) {
t.plan(3)

const evt = new EventEmitter()
const broker = aedes({ concurrency: 15 })
const server = net.createServer(broker.handle)
const total = 10000
Expand Down Expand Up @@ -174,10 +183,8 @@ test('do not block with overlapping subscription', function (t) {
port: port,
keepalive: 0
}).on('error', function (err) {
if (err.code !== 'ECONNRESET') {
clock.clearTimeout(clockId)
t.fail(err)
}
clock.clearTimeout(clockId)
t.fail(err)
})

subscriber.subscribe('#', function () {
Expand All @@ -193,17 +200,25 @@ test('do not block with overlapping subscription', function (t) {
received++
clock.tick(1)
})
subscriber.on('close', function () {
evt.emit('finish')
})
}

publisher.on('connect', startSubscriber)

function finish () {
subscriber.end()
publisher.end()
publisher.on('close', function () {
evt.emit('finish')
})
evt.on('finish', function () {
if (publisher.connected || subscriber.connected) { return }
broker.close()
server.close()
t.equal(total, sent, 'messages sent')
t.equal(total, received, 'messages received')
})
function finish () {
subscriber.end()
publisher.end()
}
})
})

0 comments on commit f1af8e7

Please sign in to comment.