diff --git a/test/not-blocking.js b/test/not-blocking.js index ad13295f..63231461 100644 --- a/test/not-blocking.js +++ b/test/not-blocking.js @@ -1,6 +1,7 @@ 'use strict' const { test } = require('tap') +const EventEmitter = require('events') const mqtt = require('mqtt') const net = require('net') const Faketimers = require('@sinonjs/fake-timers') @@ -53,6 +54,7 @@ test('connect 200 concurrent clients', function (t) { test('do not block after a subscription', function (t) { t.plan(3) + const evt = new EventEmitter() const broker = aedes() const server = net.createServer(broker.handle) const total = 10000 @@ -97,10 +99,8 @@ test('do not block after a subscription', function (t) { port: port, keepalive: 0 }).on('error', function (err) { - if (err.code !== 'ECONNRESET') { - clock.clearTimeout(clockId) - t.fail(err) - } + clock.clearTimeout(clockId) + t.fail(err) }) subscriber.subscribe('test', publish) @@ -112,17 +112,25 @@ test('do not block after a subscription', function (t) { received++ clock.tick(1) }) + subscriber.on('close', function () { + evt.emit('finish') + }) } publisher.on('connect', startSubscriber) - - function finish () { - subscriber.end() - publisher.end() + publisher.on('close', function () { + evt.emit('finish') + }) + evt.on('finish', function () { + if (publisher.connected || subscriber.connected) { return } broker.close() server.close() t.equal(total, sent, 'messages sent') t.equal(total, received, 'messages received') + }) + function finish () { + subscriber.end() + publisher.end() } }) }) @@ -130,6 +138,7 @@ test('do not block after a subscription', function (t) { test('do not block with overlapping subscription', function (t) { t.plan(3) + const evt = new EventEmitter() const broker = aedes({ concurrency: 15 }) const server = net.createServer(broker.handle) const total = 10000 @@ -174,10 +183,8 @@ test('do not block with overlapping subscription', function (t) { port: port, keepalive: 0 }).on('error', function (err) { - if (err.code !== 'ECONNRESET') { - clock.clearTimeout(clockId) - t.fail(err) - } + clock.clearTimeout(clockId) + t.fail(err) }) subscriber.subscribe('#', function () { @@ -193,17 +200,25 @@ test('do not block with overlapping subscription', function (t) { received++ clock.tick(1) }) + subscriber.on('close', function () { + evt.emit('finish') + }) } publisher.on('connect', startSubscriber) - - function finish () { - subscriber.end() - publisher.end() + publisher.on('close', function () { + evt.emit('finish') + }) + evt.on('finish', function () { + if (publisher.connected || subscriber.connected) { return } broker.close() server.close() t.equal(total, sent, 'messages sent') t.equal(total, received, 'messages received') + }) + function finish () { + subscriber.end() + publisher.end() } }) })