From 65bb9af6605ee46cc4b7edb9cd2b47c9e7d399fe Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Tue, 11 Dec 2018 16:51:24 +0100 Subject: [PATCH 01/16] chore: update deps --- package.json | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/package.json b/package.json index 407a51d..4718a08 100644 --- a/package.json +++ b/package.json @@ -40,20 +40,20 @@ "npm": ">=3.0.0" }, "devDependencies": { - "aegir": "^17.0.1", + "aegir": "^17.1.1", "chai": "^4.2.0", "chai-checkmark": "^1.0.1", "dirty-chai": "^2.0.1", "libp2p-mplex": "~0.8.4", "libp2p-pnet": "~0.1.0", "libp2p-secio": "~0.10.1", - "libp2p-spdy": "~0.13.0", + "libp2p-spdy": "~0.13.1", "libp2p-tcp": "~0.13.0", - "libp2p-webrtc-star": "~0.15.5", + "libp2p-webrtc-star": "~0.15.6", "libp2p-websockets": "~0.12.0", - "peer-book": "~0.8.0", - "portfinder": "^1.0.19", - "sinon": "^7.1.1", + "peer-book": "~0.9.0", + "portfinder": "^1.0.20", + "sinon": "^7.2.0", "webrtcsupport": "^2.2.0" }, "dependencies": { @@ -63,18 +63,18 @@ "debug": "^4.1.0", "err-code": "^1.1.2", "fsm-event": "^2.1.0", - "hashlru": "^2.2.1", - "interface-connection": "~0.3.2", + "hashlru": "^2.3.0", + "interface-connection": "~0.3.3", "ip-address": "^5.8.9", - "libp2p-circuit": "~0.3.0", + "libp2p-circuit": "~0.3.1", "libp2p-identify": "~0.7.2", "lodash.includes": "^4.3.0", "moving-average": "^1.0.0", - "multiaddr": "^5.0.2", + "multiaddr": "^6.0.0", "multistream-select": "~0.14.3", "once": "^1.4.0", "peer-id": "~0.12.0", - "peer-info": "~0.14.1", + "peer-info": "~0.15.0", "pull-stream": "^3.6.9", "retimer": "^2.0.0" }, From 10323a8a32b5b2cbb12947ff8df42711f075662a Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 12 Dec 2018 12:32:11 +0100 Subject: [PATCH 02/16] fix: check we have a proper transport before filtering addresses --- src/transport.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/transport.js b/src/transport.js index b8321df..32c83c4 100644 --- a/src/transport.js +++ b/src/transport.js @@ -203,6 +203,9 @@ class TransportManager { * @returns {Array} */ static dialables (transport, multiaddrs, peerInfo) { + // If we dont have a proper transport, return no multiaddrs + if (!transport || !transport.filter) return [] + const transportAddrs = transport.filter(multiaddrs) if (!peerInfo) { return transportAddrs From 97425c2a628c9a1fba0d1e2ed26ceeda3761b9a6 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 12 Dec 2018 14:03:22 +0100 Subject: [PATCH 03/16] fix: improve connection close on stop --- src/connection/manager.js | 4 +-- src/dialer.js | 4 +-- src/index.js | 32 ++++++++++-------------- test/dial-fsm.node.js | 51 +++++++++++++++++++++++++++++++++++++-- 4 files changed, 66 insertions(+), 25 deletions(-) diff --git a/src/connection/manager.js b/src/connection/manager.js index 571c5c0..db83fde 100644 --- a/src/connection/manager.js +++ b/src/connection/manager.js @@ -91,7 +91,7 @@ class ConnectionManager { } const b58Str = peerInfo.id.toB58String() - this.switch.muxedConns[b58Str] = new ConnectionFSM({ + this.switch.muxedConnsIn[b58Str] = new ConnectionFSM({ _switch: this.switch, peerInfo, muxer: muxedConn @@ -111,7 +111,7 @@ class ConnectionManager { peerInfo = this.switch._peerBook.put(peerInfo) muxedConn.once('close', () => { - delete this.switch.muxedConns[b58Str] + delete this.switch.muxedConnsIn[b58Str] peerInfo.disconnect() peerInfo = this.switch._peerBook.put(peerInfo) log(`closed connection to ${b58Str}`) diff --git a/src/dialer.js b/src/dialer.js index 1a60cce..169751a 100644 --- a/src/dialer.js +++ b/src/dialer.js @@ -54,13 +54,13 @@ function dial (_switch, returnFSM) { log(`dialing to ${b58Id.slice(0, 8)} with protocol ${protocol || 'unknown'}`) - let connection = _switch.muxedConns[b58Id] || _switch.conns[b58Id] + let connection = _switch.muxedConns[b58Id] || _switch.muxedConnsIn[b58Id] || _switch.conns[b58Id] if (!ConnectionFSM.isConnectionFSM(connection)) { connection = new ConnectionFSM({ _switch, peerInfo, - muxer: _switch.muxedConns[b58Id] || null + muxer: _switch.muxedConns[b58Id] || _switch.muxedConnsIn[b58Id] || null }) connection.once('error', (err) => callback(err)) connection.once('connected', () => connection.protect()) diff --git a/src/index.js b/src/index.js index 7b22d50..9a394ca 100644 --- a/src/index.js +++ b/src/index.js @@ -51,6 +51,7 @@ class Switch extends EventEmitter { // } // } this.muxedConns = {} + this.muxedConnsIn = {} // { protocol: handler } this.protocols = {} @@ -176,16 +177,15 @@ class Switch extends EventEmitter { hangUp (peer, callback) { const peerInfo = getPeerInfo(peer, this.peerBook) const key = peerInfo.id.toB58String() - if (this.muxedConns[key]) { - const conn = this.muxedConns[key] - conn.once('close', () => { - delete this.muxedConns[key] - callback() - }) + let conns = [] + + if (this.muxedConns[key]) conns.push(this.muxedConns[key]) + if (this.muxedConnsIn[key]) conns.push(this.muxedConnsIn[key]) + + each(conns, (conn, cb) => { + conn.once('close', cb) conn.close() - } else { - callback() - } + }, callback) } /** @@ -252,22 +252,16 @@ class Switch extends EventEmitter { _onStopping () { this.stats.stop() series([ - (cb) => each(this.muxedConns, (conn, cb) => { - // If the connection was destroyed while we are hanging up, continue - if (!conn) { - return cb() - } - - conn.once('close', cb) - conn.close() - }, cb), (cb) => { each(this.transports, (transport, cb) => { each(transport.listeners, (listener, cb) => { listener.close(cb) }, cb) }, cb) - } + }, + (cb) => each(this._peerBook.getAllArray(), (peer, cb) => { + this.hangUp(peer, cb) + }, cb) ], (_) => { this.state('done') }) diff --git a/test/dial-fsm.node.js b/test/dial-fsm.node.js index 8c188b5..4c2508c 100644 --- a/test/dial-fsm.node.js +++ b/test/dial-fsm.node.js @@ -12,6 +12,7 @@ const WS = require('libp2p-websockets') const TCP = require('libp2p-tcp') const secio = require('libp2p-secio') const multiplex = require('libp2p-mplex') +const pull = require('pull-stream') const utils = require('./utils') const createInfos = utils.createInfos @@ -121,8 +122,8 @@ describe('dialFSM', () => { switchB.on('peer-mux-established', (peerInfo) => { if (peerInfo.id.toB58String() === peerIdA) { switchB.removeAllListeners('peer-mux-established') - expect(switchB.muxedConns).to.have.property(peerIdA).mark() - switchB.muxedConns[peerIdA].close() + expect(switchB.muxedConnsIn).to.have.property(peerIdA).mark() + switchB.muxedConnsIn[peerIdA].close() } }) @@ -133,4 +134,50 @@ describe('dialFSM', () => { ]).mark() }) }) + + it('parallel dials to one another should disconnect on hangup', (done) => { + switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) + switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) + + parallel([ + (cb) => switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', cb), + (cb) => switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', cb) + ], (err) => { + if (err) return done(err) + + // Hangup and verify the connections are closed + switchA.hangUp(switchB._peerInfo, () => { + setTimeout(() => { + expect(switchB.muxedConns).to.eql({}) + expect(switchB.muxedConnsIn).to.eql({}) + expect(switchA.muxedConns).to.eql({}) + expect(switchA.muxedConnsIn).to.eql({}) + done() + }, 250) + }) + }) + }) + + it('parallel dials to one another should disconnect on hangup', (done) => { + switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) + switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) + + parallel([ + (cb) => switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', cb), + (cb) => switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', cb) + ], (err) => { + if (err) return done(err) + + // Hangup and verify the connections are closed + switchA.stop(() => { + setTimeout(() => { + expect(switchB.muxedConns).to.eql({}) + expect(switchB.muxedConnsIn).to.eql({}) + expect(switchA.muxedConns).to.eql({}) + expect(switchA.muxedConnsIn).to.eql({}) + done() + }, 250) + }) + }) + }) }) From b519caa0f53aacaf05b1b3d20693ca30da093c94 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 12 Dec 2018 14:05:07 +0100 Subject: [PATCH 04/16] fix: improve stat stopping --- src/index.js | 1 + src/stats/index.js | 40 ++++++++++++++++++++++++++++------------ 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/index.js b/src/index.js index 9a394ca..b48639f 100644 --- a/src/index.js +++ b/src/index.js @@ -231,6 +231,7 @@ class Switch extends EventEmitter { * @returns {void} */ _onStarting () { + this.stats.start() eachSeries(this.availableTransports(this._peerInfo), (ts, cb) => { // Listen on the given transport this.transport.listen(ts, {}, null, cb) diff --git a/src/stats/index.js b/src/stats/index.js index bf79f51..64c89fa 100644 --- a/src/stats/index.js +++ b/src/stats/index.js @@ -40,6 +40,7 @@ module.exports = (observer, _options) => { const globalStats = new Stat(initialCounters, options) const stats = Object.assign(new EventEmitter(), { + start: start, stop: stop, global: globalStats, peers: () => Array.from(peerStats.keys()), @@ -59,7 +60,19 @@ module.exports = (observer, _options) => { const transportStats = new Map() const protocolStats = new Map() - observer.on('message', (peerId, transportTag, protocolTag, direction, bufferLength) => { + observer.on('peer:closed', (peerId) => { + const peer = peerStats.get(peerId) + if (peer) { + peer.removeListener('update', propagateChange) + peer.stop() + peerStats.delete(peerId) + oldPeers.set(peerId, peer) + } + }) + + return stats + + function onMessage (peerId, transportTag, protocolTag, direction, bufferLength) { const event = directionToEvent[direction] if (transportTag) { @@ -104,22 +117,25 @@ module.exports = (observer, _options) => { } protocol.push(event, bufferLength) } - }) + } - observer.on('peer:closed', (peerId) => { - const peer = peerStats.get(peerId) - if (peer) { - peer.removeListener('update', propagateChange) - peer.stop() - peerStats.delete(peerId) - oldPeers.set(peerId, peer) - } - }) + function start () { + observer.on('message', onMessage) - return stats + globalStats.start() + + for (let peerStat of peerStats.values()) { + peerStat.start() + } + for (let transportStat of transportStats.values()) { + transportStat.start() + } + } function stop () { + observer.off('message', onMessage) globalStats.stop() + for (let peerStat of peerStats.values()) { peerStat.stop() } From 9341ddafd271592b15f16de8f3e9630039579d59 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 12 Dec 2018 18:59:18 +0100 Subject: [PATCH 05/16] test: fix stats test --- test/stats.node.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/stats.node.js b/test/stats.node.js index 3469bea..e284f02 100644 --- a/test/stats.node.js +++ b/test/stats.node.js @@ -50,8 +50,8 @@ describe('Stats', () => { switchB.connection.addStreamMuxer(multiplex) parallel([ - (cb) => switchA.transport.listen('tcp', {}, null, cb), - (cb) => switchB.transport.listen('tcp', {}, null, cb) + (cb) => switchA.start(cb), + (cb) => switchB.start(cb) ], (err) => { if (err) { cb(err) From b3e92a2c24479846486db89f5d305911284ff99f Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 12 Dec 2018 19:14:35 +0100 Subject: [PATCH 06/16] fix: improve tracking of open connections --- src/connection/incoming.js | 4 +- src/connection/index.js | 8 +++- src/connection/manager.js | 57 +++++++++++++++++++++++- src/dialer.js | 4 +- src/index.js | 23 ++++++---- test/connection.node.js | 1 + test/dial-fsm.node.js | 28 +++++------- test/pnet.node.js | 10 ++--- test/secio.node.js | 10 ++--- test/stream-muxers.node.js | 16 +++---- test/swarm-muxing+webrtc-star.browser.js | 16 +++---- test/swarm-muxing.node.js | 20 ++++----- 12 files changed, 128 insertions(+), 69 deletions(-) diff --git a/src/connection/incoming.js b/src/connection/incoming.js index 3b2f9d0..3b2412f 100644 --- a/src/connection/incoming.js +++ b/src/connection/incoming.js @@ -20,7 +20,9 @@ class IncomingConnectionFSM extends BaseConnection { this.msListener = new multistream.Listener() this._state = FSM('DIALED', { - DISCONNECTED: { }, + DISCONNECTED: { + disconnect: 'DISCONNECTED' + }, DIALED: { // Base connection to peer established privatize: 'PRIVATIZING', encrypt: 'ENCRYPTING' diff --git a/src/connection/index.js b/src/connection/index.js index 5ce3dd9..d50e876 100644 --- a/src/connection/index.js +++ b/src/connection/index.js @@ -114,6 +114,7 @@ class ConnectionFSM extends BaseConnection { this._state.on('UPGRADING', () => this._onUpgrading()) this._state.on('MUXED', () => { this.log(`successfully muxed connection to ${this.theirB58Id}`) + delete this.switch.conns[this.theirB58Id] this.emit('muxed', this.muxer) }) this._state.on('CONNECTED', () => { @@ -166,6 +167,7 @@ class ConnectionFSM extends BaseConnection { }) } + // TODO: probably not necessary this.conn.setPeerInfo(this.theirPeerInfo) this._protocolHandshake(protocol, this.conn, callback) } @@ -266,7 +268,8 @@ class ConnectionFSM extends BaseConnection { this.muxer.end() } - delete this.switch.muxedConns[this.theirB58Id] + this.switch.connection.remove(this) + delete this.switch.conns[this.theirB58Id] delete this.muxer delete this.conn @@ -352,7 +355,8 @@ class ConnectionFSM extends BaseConnection { const conn = observeConnection(null, key, _conn, this.switch.observer) this.muxer = this.switch.muxers[key].dialer(conn) - this.switch.muxedConns[this.theirB58Id] = this + // this.switch.muxedConns[this.theirB58Id] = this + this.switch.connection.add(this) this.muxer.once('close', () => { this.close() diff --git a/src/connection/manager.js b/src/connection/manager.js index db83fde..d7a891e 100644 --- a/src/connection/manager.js +++ b/src/connection/manager.js @@ -20,6 +20,57 @@ const plaintext = require('../plaintext') class ConnectionManager { constructor (_switch) { this.switch = _switch + this.connections = {} + } + + add (connection) { + this.connections[connection.theirB58Id] = this.connections[connection.theirB58Id] || [] + // Only add it if it's not there + if (!this.get(connection)) { + this.connections[connection.theirB58Id].push(connection) + } + } + + get (connection) { + if (!this.connections[connection.theirB58Id]) return null + + for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) { + if (this.connections[connection.theirB58Id][i]) { + return this.connections[connection.theirB58Id][i] + } + } + return null + } + + getOne (peerId) { + if (this.connections[peerId]) { + // TODO: Maybe select the best? + return this.connections[peerId][0] + } + return null + } + + remove (connection) { + if (!this.connections[connection.theirB58Id]) return + + for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) { + if (this.connections[connection.theirB58Id][i]) { + this.connections[connection.theirB58Id].splice(i, 1) + return + } + } + } + + getAll () { + let connections = [] + for (const conns of Object.values(this.connections)) { + connections = [...connections, ...conns] + } + return connections + } + + getAllById (id) { + return this.connections[id] || [] } /** @@ -91,11 +142,12 @@ class ConnectionManager { } const b58Str = peerInfo.id.toB58String() - this.switch.muxedConnsIn[b58Str] = new ConnectionFSM({ + const connection = new ConnectionFSM({ _switch: this.switch, peerInfo, muxer: muxedConn }) + this.switch.connection.add(connection) if (peerInfo.multiaddrs.size > 0) { // with incomming conn and through identify, going to pick one @@ -111,7 +163,8 @@ class ConnectionManager { peerInfo = this.switch._peerBook.put(peerInfo) muxedConn.once('close', () => { - delete this.switch.muxedConnsIn[b58Str] + // delete this.switch.muxedConnsIn[b58Str] + this.switch.connection.remove(connection) peerInfo.disconnect() peerInfo = this.switch._peerBook.put(peerInfo) log(`closed connection to ${b58Str}`) diff --git a/src/dialer.js b/src/dialer.js index 169751a..f4822d7 100644 --- a/src/dialer.js +++ b/src/dialer.js @@ -54,13 +54,13 @@ function dial (_switch, returnFSM) { log(`dialing to ${b58Id.slice(0, 8)} with protocol ${protocol || 'unknown'}`) - let connection = _switch.muxedConns[b58Id] || _switch.muxedConnsIn[b58Id] || _switch.conns[b58Id] + let connection = _switch.connection.getOne(b58Id) if (!ConnectionFSM.isConnectionFSM(connection)) { connection = new ConnectionFSM({ _switch, peerInfo, - muxer: _switch.muxedConns[b58Id] || _switch.muxedConnsIn[b58Id] || null + muxer: null }) connection.once('error', (err) => callback(err)) connection.once('connected', () => connection.protect()) diff --git a/src/index.js b/src/index.js index b48639f..22ecff2 100644 --- a/src/index.js +++ b/src/index.js @@ -50,8 +50,8 @@ class Switch extends EventEmitter { // conn: // to extract info required for the Identify Protocol // } // } - this.muxedConns = {} - this.muxedConnsIn = {} + // this.muxedConns = {} + // this.muxedConnsIn = {} // { protocol: handler } this.protocols = {} @@ -95,7 +95,10 @@ class Switch extends EventEmitter { stop: 'STOPPING', start: 'STARTED' }, - STOPPING: { done: 'STOPPED' } + STOPPING: { + stop: 'STOPPING', + done: 'STOPPED' + } }) this.state.on('STARTING', () => { log('The switch is starting') @@ -177,10 +180,7 @@ class Switch extends EventEmitter { hangUp (peer, callback) { const peerInfo = getPeerInfo(peer, this.peerBook) const key = peerInfo.id.toB58String() - let conns = [] - - if (this.muxedConns[key]) conns.push(this.muxedConns[key]) - if (this.muxedConnsIn[key]) conns.push(this.muxedConnsIn[key]) + const conns = this.connection.getAllById(key) each(conns, (conn, cb) => { conn.once('close', cb) @@ -262,7 +262,14 @@ class Switch extends EventEmitter { }, (cb) => each(this._peerBook.getAllArray(), (peer, cb) => { this.hangUp(peer, cb) - }, cb) + }, cb), + // (cb) => each(this.allConns, (conn, cb) => { + // conn.on('close', cb) + // conn.close() + // }, () => { + // this.allConns = [] + // cb() + // }) ], (_) => { this.state('done') }) diff --git a/test/connection.node.js b/test/connection.node.js index a9bb20c..c014823 100644 --- a/test/connection.node.js +++ b/test/connection.node.js @@ -367,6 +367,7 @@ describe('ConnectionFSM', () => { (cb) => dialerSwitch.stop(cb), (cb) => listenerSwitch.stop(cb) ], () => { + console.log('Stopped') dialerSwitch.protector = new Protector(psk) listenerSwitch.protector = new Protector(psk) diff --git a/test/dial-fsm.node.js b/test/dial-fsm.node.js index 4c2508c..9cd54df 100644 --- a/test/dial-fsm.node.js +++ b/test/dial-fsm.node.js @@ -100,14 +100,12 @@ describe('dialFSM', () => { const connFSM = switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', (err) => { expect(err).to.not.exist() - expect(switchA.muxedConns).to.have.property(switchB._peerInfo.id.toB58String()) + expect(switchA.connection.getAllById(switchB._peerInfo.id.toB58String())).to.have.length(1) connFSM.close() }) connFSM.once('close', () => { - expect(switchA.muxedConns).to.not.have.any.keys([ - switchB._peerInfo.id.toB58String() - ]) + expect(switchA.connection.getAllById(switchB._peerInfo.id.toB58String())).to.have.length(0) done() }) }) @@ -122,16 +120,14 @@ describe('dialFSM', () => { switchB.on('peer-mux-established', (peerInfo) => { if (peerInfo.id.toB58String() === peerIdA) { switchB.removeAllListeners('peer-mux-established') - expect(switchB.muxedConnsIn).to.have.property(peerIdA).mark() - switchB.muxedConnsIn[peerIdA].close() + expect(switchB.connection.getAllById(peerIdA)).to.have.length(1).mark() + switchB.connection.getOne(peerIdA).close() } }) const connFSM = switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', () => { }) connFSM.once('close', () => { - expect(switchA.muxedConns).to.not.have.any.keys([ - switchB._peerInfo.id.toB58String() - ]).mark() + expect(switchA.connection.getAllById(switchB._peerInfo.id.toB58String())).to.have.length(0).mark() }) }) @@ -148,17 +144,15 @@ describe('dialFSM', () => { // Hangup and verify the connections are closed switchA.hangUp(switchB._peerInfo, () => { setTimeout(() => { - expect(switchB.muxedConns).to.eql({}) - expect(switchB.muxedConnsIn).to.eql({}) - expect(switchA.muxedConns).to.eql({}) - expect(switchA.muxedConnsIn).to.eql({}) + expect(switchB.connection.getAll()).to.have.length(0) + expect(switchA.connection.getAll()).to.have.length(0) done() }, 250) }) }) }) - it('parallel dials to one another should disconnect on hangup', (done) => { + it('parallel dials to one another should disconnect on stop', (done) => { switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) @@ -171,10 +165,8 @@ describe('dialFSM', () => { // Hangup and verify the connections are closed switchA.stop(() => { setTimeout(() => { - expect(switchB.muxedConns).to.eql({}) - expect(switchB.muxedConnsIn).to.eql({}) - expect(switchA.muxedConns).to.eql({}) - expect(switchA.muxedConnsIn).to.eql({}) + expect(switchB.connection.getAll()).to.have.length(0) + expect(switchA.connection.getAll()).to.have.length(0) done() }, 250) }) diff --git a/test/pnet.node.js b/test/pnet.node.js index 036eb80..0c680a2 100644 --- a/test/pnet.node.js +++ b/test/pnet.node.js @@ -98,7 +98,7 @@ describe('Private Network', function () { switchA.dial(switchB._peerInfo, '/abacaxi/1.0.0', (err, conn) => { expect(err).to.not.exist() - expect(Object.keys(switchA.muxedConns).length).to.equal(1) + expect(switchA.connection.getAll()).to.have.length(1) tryEcho(conn, done) }) }) @@ -107,7 +107,7 @@ describe('Private Network', function () { switchB.dial(switchA._peerInfo, (err) => { expect(err).to.not.exist() expect(Object.keys(switchB.conns).length).to.equal(0) - expect(Object.keys(switchB.muxedConns).length).to.equal(1) + expect(switchB.connection.getAll()).to.have.length(1) done() }) }) @@ -118,7 +118,7 @@ describe('Private Network', function () { switchB.dial(switchA._peerInfo, '/papaia/1.0.0', (err, conn) => { expect(err).to.not.exist() expect(Object.keys(switchB.conns).length).to.equal(0) - expect(Object.keys(switchB.muxedConns).length).to.equal(1) + expect(switchB.connection.getAll()).to.have.length(1) tryEcho(conn, done) }) }) @@ -130,8 +130,8 @@ describe('Private Network', function () { switchC.dial(switchA._peerInfo, (err) => { expect(err).to.not.exist() setTimeout(() => { - expect(Object.keys(switchC.muxedConns).length).to.equal(1) - expect(Object.keys(switchA.muxedConns).length).to.equal(2) + expect(switchC.connection.getAll()).to.have.length(1) + expect(switchA.connection.getAll()).to.have.length(2) done() }, 500) }) diff --git a/test/secio.node.js b/test/secio.node.js index 6a710a8..a60c2c3 100644 --- a/test/secio.node.js +++ b/test/secio.node.js @@ -70,7 +70,7 @@ describe('SECIO', () => { switchA.dial(switchB._peerInfo, '/abacaxi/1.0.0', (err, conn) => { expect(err).to.not.exist() - expect(Object.keys(switchA.muxedConns).length).to.equal(1) + expect(switchA.connection.getAll()).to.have.length(1) tryEcho(conn, done) }) }) @@ -79,7 +79,7 @@ describe('SECIO', () => { switchB.dial(switchA._peerInfo, (err) => { expect(err).to.not.exist() expect(Object.keys(switchB.conns).length).to.equal(0) - expect(Object.keys(switchB.muxedConns).length).to.equal(1) + expect(switchB.connection.getAll()).to.have.length(1) done() }) }) @@ -90,7 +90,7 @@ describe('SECIO', () => { switchB.dial(switchA._peerInfo, '/papaia/1.0.0', (err, conn) => { expect(err).to.not.exist() expect(Object.keys(switchB.conns).length).to.equal(0) - expect(Object.keys(switchB.muxedConns).length).to.equal(1) + expect(switchB.connection.getAll()).to.have.length(1) tryEcho(conn, done) }) }) @@ -102,8 +102,8 @@ describe('SECIO', () => { switchC.dial(switchA._peerInfo, (err) => { expect(err).to.not.exist() setTimeout(() => { - expect(Object.keys(switchC.muxedConns).length).to.equal(1) - expect(Object.keys(switchA.muxedConns).length).to.equal(2) + expect(switchC.connection.getAll()).to.have.length(1) + expect(switchA.connection.getAll()).to.have.length(2) done() }, 500) }) diff --git a/test/stream-muxers.node.js b/test/stream-muxers.node.js index b410e60..c1ff0f1 100644 --- a/test/stream-muxers.node.js +++ b/test/stream-muxers.node.js @@ -72,7 +72,7 @@ describe('Stream Multiplexing', () => { switchA.dial(switchB._peerInfo, '/abacaxi/1.0.0', (err, conn) => { expect(err).to.not.exist() - expect(Object.keys(switchA.muxedConns).length).to.equal(1) + expect(switchA.connection.getAll()).to.have.length(1) tryEcho(conn, done) }) @@ -83,7 +83,7 @@ describe('Stream Multiplexing', () => { expect(err).to.not.exist() expect(Object.keys(switchB.conns).length).to.equal(0) - expect(Object.keys(switchB.muxedConns).length).to.equal(1) + expect(switchB.connection.getAll()).to.have.length(1) done() }) }) @@ -94,7 +94,7 @@ describe('Stream Multiplexing', () => { switchB.dial(switchA._peerInfo, '/papaia/1.0.0', (err, conn) => { expect(err).to.not.exist() expect(Object.keys(switchB.conns).length).to.equal(0) - expect(Object.keys(switchB.muxedConns).length).to.equal(1) + expect(switchB.connection.getAll()).to.have.length(1) tryEcho(conn, done) }) @@ -107,8 +107,8 @@ describe('Stream Multiplexing', () => { switchC.dial(switchA._peerInfo, (err) => { expect(err).to.not.exist() setTimeout(() => { - expect(Object.keys(switchC.muxedConns).length).to.equal(1) - expect(Object.keys(switchA.muxedConns).length).to.equal(2) + expect(switchC.connection.getAll()).to.have.length(1) + expect(switchA.connection.getAll()).to.have.length(2) done() }, 500) }) @@ -127,8 +127,8 @@ describe('Stream Multiplexing', () => { switchC.dial(switchA._peerInfo, '/banana/1.0.0', (err, conn) => { expect(err).to.not.exist() setTimeout(() => { - expect(Object.keys(switchC.muxedConns).length).to.equal(1) - expect(Object.keys(switchA.muxedConns).length).to.equal(2) + expect(switchC.connection.getAll()).to.have.length(1) + expect(switchA.connection.getAll()).to.have.length(2) conn.getPeerInfo((err, pi) => { expect(err).to.not.exist() @@ -144,7 +144,7 @@ describe('Stream Multiplexing', () => { expect(err).to.not.exist() setTimeout(() => { - expect(Object.keys(switchA.muxedConns).length).to.equal(1) + expect(switchA.connection.getAll()).to.have.length(1) done() }, 500) }) diff --git a/test/swarm-muxing+webrtc-star.browser.js b/test/swarm-muxing+webrtc-star.browser.js index d5dd645..c01bd86 100644 --- a/test/swarm-muxing+webrtc-star.browser.js +++ b/test/swarm-muxing+webrtc-star.browser.js @@ -86,10 +86,10 @@ describe('Switch (webrtc-star)', () => { it('dial on proto', (done) => { switch1.dial(peer2, '/echo/1.0.0', (err, conn) => { expect(err).to.not.exist() - expect(Object.keys(switch1.muxedConns).length).to.equal(1) + expect(switch1.connection.getAll()).to.have.length(1) tryEcho(conn, () => { - expect(Object.keys(switch2.muxedConns).length).to.equal(1) + expect(switch2.connection.getAll()).to.have.length(1) done() }) }) @@ -104,12 +104,12 @@ describe('Switch (webrtc-star)', () => { function check () { if (++counter === 4) { - const s1n = Object.keys(switch1.muxedConns).length - const s2n = Object.keys(switch2.muxedConns).length - const s3n = Object.keys(switch3.muxedConns).length - expect(s1n).to.equal(2) - expect(s2n).to.equal(2) - expect(s3n).to.equal(2) + const s1n = switchA.connection.getAll() + const s2n = switchA.connection.getAll() + const s3n = switchA.connection.getAll() + expect(s1n).to.have.length(2) + expect(s2n).to.have.length(2) + expect(s3n).to.have.length(2) switch3.stop(done) } if (counter === 3) { diff --git a/test/swarm-muxing.node.js b/test/swarm-muxing.node.js index 063f3f6..47feaf2 100644 --- a/test/swarm-muxing.node.js +++ b/test/swarm-muxing.node.js @@ -118,7 +118,7 @@ describe('Switch (everything all together)', () => { }), (cb) => switchA.dial(switchB._peerInfo, (err) => { expect(err).to.not.exist() - expect(Object.keys(switchA.muxedConns).length).to.equal(1) + expect(switchA.connection.getAll()).to.have.length(1) cb() }) ], done) @@ -127,7 +127,7 @@ describe('Switch (everything all together)', () => { it('warm up a warmed up, from B to A', (done) => { switchB.dial(switchA._peerInfo, (err) => { expect(err).to.not.exist() - expect(Object.keys(switchA.muxedConns).length).to.equal(1) + expect(switchA.connection.getAll()).to.have.length(1) done() }) }) @@ -137,7 +137,7 @@ describe('Switch (everything all together)', () => { switchA.dial(switchB._peerInfo, '/anona/1.0.0', (err, conn) => { expect(err).to.not.exist() - expect(Object.keys(switchA.muxedConns).length).to.equal(1) + expect(switchA.connection.getAll()).to.have.length(1) tryEcho(conn, done) }) }) @@ -145,7 +145,7 @@ describe('Switch (everything all together)', () => { it('dial from ws to ws no proto', (done) => { switchD.dial(switchE._peerInfo, (err) => { expect(err).to.not.exist() - expect(Object.keys(switchD.muxedConns).length).to.equal(1) + expect(switchD.connection.getAll()).to.have.length(1) done() }) }) @@ -155,10 +155,10 @@ describe('Switch (everything all together)', () => { switchD.dial(switchE._peerInfo, '/abacaxi/1.0.0', (err, conn) => { expect(err).to.not.exist() - expect(Object.keys(switchD.muxedConns).length).to.equal(1) + expect(switchD.connection.getAll()).to.have.length(1) tryEcho(conn, () => setTimeout(() => { - expect(Object.keys(switchE.muxedConns).length).to.equal(1) + expect(switchE.connection.getAll()).to.have.length(1) done() }, 1000)) }) @@ -169,7 +169,7 @@ describe('Switch (everything all together)', () => { const conn = switchA.dial(switchB._peerInfo, '/grapes/1.0.0', (err, conn) => { expect(err).to.not.exist() - expect(Object.keys(switchA.muxedConns).length).to.equal(1) + expect(switchA.connection.getAll()).to.have.length(1) }) tryEcho(conn, done) @@ -202,7 +202,7 @@ describe('Switch (everything all together)', () => { check() }) - expect(Object.keys(switchA.muxedConns).length).to.equal(2) + expect(switchA.connection.getAll()).to.have.length(2) expect(switchC._peerInfo.isConnected).to.exist() expect(switchA._peerInfo.isConnected).to.exist() @@ -215,13 +215,13 @@ describe('Switch (everything all together)', () => { const ready = () => ++count === 3 ? done() : null switchB.once('peer-mux-closed', (peerInfo) => { - expect(Object.keys(switchB.muxedConns).length).to.equal(0) + expect(switchB.connection.getAll()).to.have.length(0) expect(switchB._peerInfo.isConnected()).to.not.exist() ready() }) switchA.once('peer-mux-closed', (peerInfo) => { - expect(Object.keys(switchA.muxedConns).length).to.equal(1) + expect(switchA.connection.getAll()).to.have.length(1) expect(switchA._peerInfo.isConnected()).to.not.exist() ready() }) From 47e62fedf174c12ba1483692dcbf71f2b23184c2 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Thu, 13 Dec 2018 12:00:18 +0100 Subject: [PATCH 07/16] chore: remove log --- test/connection.node.js | 1 - 1 file changed, 1 deletion(-) diff --git a/test/connection.node.js b/test/connection.node.js index c014823..a9bb20c 100644 --- a/test/connection.node.js +++ b/test/connection.node.js @@ -367,7 +367,6 @@ describe('ConnectionFSM', () => { (cb) => dialerSwitch.stop(cb), (cb) => listenerSwitch.stop(cb) ], () => { - console.log('Stopped') dialerSwitch.protector = new Protector(psk) listenerSwitch.protector = new Protector(psk) From cea657798d771f72fae6baf421571cfb13cc80a2 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Thu, 13 Dec 2018 12:39:40 +0100 Subject: [PATCH 08/16] fix: stats stop in browser chore: fix linting and browser tests --- src/index.js | 9 +-------- src/stats/index.js | 2 +- test/swarm-muxing+webrtc-star.browser.js | 8 ++++---- test/t-webrtc-star.browser.js | 4 ++-- 4 files changed, 8 insertions(+), 15 deletions(-) diff --git a/src/index.js b/src/index.js index 22ecff2..73cbd1b 100644 --- a/src/index.js +++ b/src/index.js @@ -262,14 +262,7 @@ class Switch extends EventEmitter { }, (cb) => each(this._peerBook.getAllArray(), (peer, cb) => { this.hangUp(peer, cb) - }, cb), - // (cb) => each(this.allConns, (conn, cb) => { - // conn.on('close', cb) - // conn.close() - // }, () => { - // this.allConns = [] - // cb() - // }) + }, cb) ], (_) => { this.state('done') }) diff --git a/src/stats/index.js b/src/stats/index.js index 64c89fa..0948e05 100644 --- a/src/stats/index.js +++ b/src/stats/index.js @@ -133,7 +133,7 @@ module.exports = (observer, _options) => { } function stop () { - observer.off('message', onMessage) + observer.removeListener('message', onMessage) globalStats.stop() for (let peerStat of peerStats.values()) { diff --git a/test/swarm-muxing+webrtc-star.browser.js b/test/swarm-muxing+webrtc-star.browser.js index c01bd86..6ecf25b 100644 --- a/test/swarm-muxing+webrtc-star.browser.js +++ b/test/swarm-muxing+webrtc-star.browser.js @@ -31,7 +31,7 @@ describe('Switch (webrtc-star)', () => { (cb) => peerId.create((err, id1) => { expect(err).to.not.exist() peer1 = new PeerInfo(id1) - const ma1 = 'ip4/127.0.0.1/tcp/15555/ws/p2p-webrtc-star/ipfs/' + + const ma1 = '/ip4/127.0.0.1/tcp/15555/ws/p2p-webrtc-star/ipfs/' + id1.toB58String() peer1.multiaddrs.add(ma1) cb() @@ -104,9 +104,9 @@ describe('Switch (webrtc-star)', () => { function check () { if (++counter === 4) { - const s1n = switchA.connection.getAll() - const s2n = switchA.connection.getAll() - const s3n = switchA.connection.getAll() + const s1n = switch1.connection.getAll() + const s2n = switch2.connection.getAll() + const s3n = switch3.connection.getAll() expect(s1n).to.have.length(2) expect(s2n).to.have.length(2) expect(s3n).to.have.length(2) diff --git a/test/t-webrtc-star.browser.js b/test/t-webrtc-star.browser.js index 825e2fe..3a6d7f3 100644 --- a/test/t-webrtc-star.browser.js +++ b/test/t-webrtc-star.browser.js @@ -24,13 +24,13 @@ describe('transport - webrtc-star', () => { .createFromB58String('QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSooooA') const peer1 = new PeerInfo(id1) - const ma1 = 'ip4/127.0.0.1/tcp/15555/ws/p2p-webrtc-star/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSooooA' + const ma1 = '/ip4/127.0.0.1/tcp/15555/ws/p2p-webrtc-star/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSooooA' peer1.multiaddrs.add(ma1) const id2 = PeerId .createFromB58String('QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSooooB') const peer2 = new PeerInfo(id2) - const ma2 = 'ip4/127.0.0.1/tcp/15555/ws/p2p-webrtc-star/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSooooB' + const ma2 = '/ip4/127.0.0.1/tcp/15555/ws/p2p-webrtc-star/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSooooB' peer2.multiaddrs.add(ma2) switch1 = new Switch(peer1, new PeerBook()) From c8cab32e54b3b2511d82cf9032668f42a6ea84c5 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Thu, 13 Dec 2018 14:45:25 +0100 Subject: [PATCH 09/16] fix: remove uneeded set peer info --- src/connection/index.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/connection/index.js b/src/connection/index.js index d50e876..d956a46 100644 --- a/src/connection/index.js +++ b/src/connection/index.js @@ -167,8 +167,6 @@ class ConnectionFSM extends BaseConnection { }) } - // TODO: probably not necessary - this.conn.setPeerInfo(this.theirPeerInfo) this._protocolHandshake(protocol, this.conn, callback) } From 528fee8fd5355c6129af4856306e37a5406a7736 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Thu, 13 Dec 2018 14:55:46 +0100 Subject: [PATCH 10/16] fix: abort the base connection on close --- src/connection/index.js | 20 ++++++++++++++------ src/connection/manager.js | 3 ++- src/dialer.js | 3 ++- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/connection/index.js b/src/connection/index.js index d956a46..d4e245e 100644 --- a/src/connection/index.js +++ b/src/connection/index.js @@ -15,6 +15,7 @@ const Errors = require('../errors') * @property {Switch} _switch Our switch instance * @property {PeerInfo} peerInfo The PeerInfo of the peer to dial * @property {Muxer} muxer Optional - A muxed connection + * @property {Connection} conn Optional - The base connection */ /** @@ -29,7 +30,7 @@ class ConnectionFSM extends BaseConnection { * @param {ConnectionOptions} param0 * @constructor */ - constructor ({ _switch, peerInfo, muxer }) { + constructor ({ _switch, peerInfo, muxer, conn }) { super({ _switch, name: `out:${_switch._peerInfo.id.toB58String().slice(0, 8)}` @@ -38,7 +39,7 @@ class ConnectionFSM extends BaseConnection { this.theirPeerInfo = peerInfo this.theirB58Id = this.theirPeerInfo.id.toB58String() - this.conn = null // The base connection + this.conn = conn // The base connection this.muxer = muxer // The upgraded/muxed connection let startState = 'DISCONNECTED' @@ -270,11 +271,18 @@ class ConnectionFSM extends BaseConnection { delete this.switch.conns[this.theirB58Id] delete this.muxer - delete this.conn - this._state('done') - - setImmediate(() => this.switch.emit('peer-mux-closed', this.theirPeerInfo)) + // If we have the base connection, abort it + if (this.conn) { + this.conn.source(true, () => { + this._state('done') + this.switch.emit('peer-mux-closed', this.theirPeerInfo) + delete this.conn + }) + } else { + this._state('done') + this.switch.emit('peer-mux-closed', this.theirPeerInfo) + } } /** diff --git a/src/connection/manager.js b/src/connection/manager.js index d7a891e..1886ec8 100644 --- a/src/connection/manager.js +++ b/src/connection/manager.js @@ -145,7 +145,8 @@ class ConnectionManager { const connection = new ConnectionFSM({ _switch: this.switch, peerInfo, - muxer: muxedConn + muxer: muxedConn, + conn: conn }) this.switch.connection.add(connection) diff --git a/src/dialer.js b/src/dialer.js index f4822d7..01a7ad6 100644 --- a/src/dialer.js +++ b/src/dialer.js @@ -60,7 +60,8 @@ function dial (_switch, returnFSM) { connection = new ConnectionFSM({ _switch, peerInfo, - muxer: null + muxer: null, + conn: null }) connection.once('error', (err) => callback(err)) connection.once('connected', () => connection.protect()) From def005e91cb25a07181bedb2e69d37f7a2810279 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Thu, 13 Dec 2018 17:07:07 +0100 Subject: [PATCH 11/16] fix: catch edge cases of dialTimeout calling back twice --- src/limit-dialer/queue.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/limit-dialer/queue.js b/src/limit-dialer/queue.js index bd55340..d003cbd 100644 --- a/src/limit-dialer/queue.js +++ b/src/limit-dialer/queue.js @@ -5,6 +5,7 @@ const pull = require('pull-stream') const timeout = require('async/timeout') const queue = require('async/queue') const debug = require('debug') +const once = require('once') const log = debug('libp2p:switch:dialer:queue') log.error = debug('libp2p:switch:dialer:queue:error') @@ -38,6 +39,7 @@ class DialQueue { * @private */ _doWork (transport, addr, token, callback) { + callback = once(callback) log(`${transport.constructor.name}:work:start`) this._dialWithTimeout(transport, addr, (err, conn) => { if (err) { From ddd61bb04644782ab62bd1d0476846c700c59056 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Thu, 13 Dec 2018 20:30:31 +0100 Subject: [PATCH 12/16] fix: close all connections instead of checking peerbook peers --- src/index.js | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/src/index.js b/src/index.js index 73cbd1b..495e6eb 100644 --- a/src/index.js +++ b/src/index.js @@ -44,15 +44,6 @@ class Switch extends EventEmitter { // { peerIdB58: { conn: }} this.conns = {} - // { - // peerIdB58: { - // muxer: - // conn: // to extract info required for the Identify Protocol - // } - // } - // this.muxedConns = {} - // this.muxedConnsIn = {} - // { protocol: handler } this.protocols = {} @@ -260,8 +251,9 @@ class Switch extends EventEmitter { }, cb) }, cb) }, - (cb) => each(this._peerBook.getAllArray(), (peer, cb) => { - this.hangUp(peer, cb) + (cb) => each(this.connection.getAll(), (conn, cb) => { + conn.once('close', cb) + conn.close() }, cb) ], (_) => { this.state('done') From 98142004ca917ba84a857848034c33283d7e60a2 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Thu, 13 Dec 2018 21:08:38 +0100 Subject: [PATCH 13/16] test: update dial fsm test waits --- test/dial-fsm.node.js | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/test/dial-fsm.node.js b/test/dial-fsm.node.js index 9cd54df..90f25d3 100644 --- a/test/dial-fsm.node.js +++ b/test/dial-fsm.node.js @@ -135,6 +135,17 @@ describe('dialFSM', () => { switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) + switchB.on('peer-mux-closed', (peerInfo) => { + switchB.removeAllListeners('peer-mux-closed') + expect(peerInfo.id.toB58String()).to.eql(switchA._peerInfo.id.toB58String()).mark() + }) + + expect(2).checks(() => { + expect(switchA.connection.getAll()).to.have.length(0) + expect(switchB.connection.getAll()).to.have.length(0) + done() + }) + parallel([ (cb) => switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', cb), (cb) => switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', cb) @@ -142,12 +153,8 @@ describe('dialFSM', () => { if (err) return done(err) // Hangup and verify the connections are closed - switchA.hangUp(switchB._peerInfo, () => { - setTimeout(() => { - expect(switchB.connection.getAll()).to.have.length(0) - expect(switchA.connection.getAll()).to.have.length(0) - done() - }, 250) + switchA.hangUp(switchB._peerInfo, (err) => { + expect(err).to.not.exist().mark() }) }) }) @@ -156,6 +163,17 @@ describe('dialFSM', () => { switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) + switchB.on('peer-mux-closed', (peerInfo) => { + switchB.removeAllListeners('peer-mux-closed') + expect(peerInfo.id.toB58String()).to.eql(switchA._peerInfo.id.toB58String()).mark() + }) + + expect(2).checks(() => { + expect(switchA.connection.getAll()).to.have.length(0) + expect(switchB.connection.getAll()).to.have.length(0) + done() + }) + parallel([ (cb) => switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', cb), (cb) => switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', cb) @@ -163,12 +181,8 @@ describe('dialFSM', () => { if (err) return done(err) // Hangup and verify the connections are closed - switchA.stop(() => { - setTimeout(() => { - expect(switchB.connection.getAll()).to.have.length(0) - expect(switchA.connection.getAll()).to.have.length(0) - done() - }, 250) + switchA.stop((err) => { + expect(err).to.not.exist().mark() }) }) }) From d29f6d7e382d4bd419a80f5b3850fbde95f0108c Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Fri, 14 Dec 2018 11:58:28 +0100 Subject: [PATCH 14/16] test: make parallel dial tests deterministic fix: improve logic around disconnecting fix: remove duplicate event handling logic --- src/connection/index.js | 7 +++-- src/connection/manager.js | 58 +++++++++++++++++++++++++++---------- src/index.js | 5 ++-- test/dial-fsm.node.js | 60 +++++++++++++++++++++++---------------- 4 files changed, 85 insertions(+), 45 deletions(-) diff --git a/src/connection/index.js b/src/connection/index.js index d4e245e..c251ee3 100644 --- a/src/connection/index.js +++ b/src/connection/index.js @@ -16,6 +16,7 @@ const Errors = require('../errors') * @property {PeerInfo} peerInfo The PeerInfo of the peer to dial * @property {Muxer} muxer Optional - A muxed connection * @property {Connection} conn Optional - The base connection + * @property {string} type Optional - identify the connection as incoming or outgoing. Defaults to out. */ /** @@ -30,10 +31,10 @@ class ConnectionFSM extends BaseConnection { * @param {ConnectionOptions} param0 * @constructor */ - constructor ({ _switch, peerInfo, muxer, conn }) { + constructor ({ _switch, peerInfo, muxer, conn, type = 'out' }) { super({ _switch, - name: `out:${_switch._peerInfo.id.toB58String().slice(0, 8)}` + name: `${type}:${_switch._peerInfo.id.toB58String().slice(0, 8)}` }) this.theirPeerInfo = peerInfo @@ -375,7 +376,7 @@ class ConnectionFSM extends BaseConnection { this.switch.protocolMuxer(null)(conn) }) - setImmediate(() => this.switch.emit('peer-mux-established', this.theirPeerInfo)) + this.switch.emit('peer-mux-established', this.theirPeerInfo) this._didUpgrade(null) }) diff --git a/src/connection/manager.js b/src/connection/manager.js index 1886ec8..e819b28 100644 --- a/src/connection/manager.js +++ b/src/connection/manager.js @@ -23,6 +23,12 @@ class ConnectionManager { this.connections = {} } + /** + * Adds the connection for tracking if it's not already added + * @private + * @param {ConnectionFSM} connection + * @returns {void} + */ add (connection) { this.connections[connection.theirB58Id] = this.connections[connection.theirB58Id] || [] // Only add it if it's not there @@ -31,17 +37,29 @@ class ConnectionManager { } } + /** + * Gets the connection from the list if it exists + * @private + * @param {ConnectionFSM} connection + * @returns {ConnectionFSM|null} The found connection or null + */ get (connection) { if (!this.connections[connection.theirB58Id]) return null for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) { - if (this.connections[connection.theirB58Id][i]) { + if (this.connections[connection.theirB58Id][i] === connection) { return this.connections[connection.theirB58Id][i] } } return null } + /** + * Gets a connection associated with the given peer + * @private + * @param {string} peerId The peers id + * @returns {ConnectionFSM|null} The found connection or null + */ getOne (peerId) { if (this.connections[peerId]) { // TODO: Maybe select the best? @@ -50,17 +68,28 @@ class ConnectionManager { return null } + /** + * Removes the connection from tracking + * @private + * @param {ConnectionFSM} connection The connection to remove + * @returns {void} + */ remove (connection) { if (!this.connections[connection.theirB58Id]) return for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) { - if (this.connections[connection.theirB58Id][i]) { + if (this.connections[connection.theirB58Id][i] === connection) { this.connections[connection.theirB58Id].splice(i, 1) return } } } + /** + * Returns all connections being tracked + * @private + * @returns {ConnectionFSM[]} + */ getAll () { let connections = [] for (const conns of Object.values(this.connections)) { @@ -69,8 +98,14 @@ class ConnectionManager { return connections } - getAllById (id) { - return this.connections[id] || [] + /** + * Returns all connections being tracked for a given peer id + * @private + * @param {string} peerId Stringified peer id + * @returns {ConnectionFSM[]} + */ + getAllById (peerId) { + return this.connections[peerId] || [] } /** @@ -121,9 +156,6 @@ class ConnectionManager { ], (err, peerInfo) => { if (err) { return muxedConn.end(() => { - if (peerInfo) { - setImmediate(() => this.switch.emit('peer-mux-closed', peerInfo)) - } callback(err, null) }) } @@ -146,7 +178,8 @@ class ConnectionManager { _switch: this.switch, peerInfo, muxer: muxedConn, - conn: conn + conn: conn, + type: 'inc' }) this.switch.connection.add(connection) @@ -164,15 +197,10 @@ class ConnectionManager { peerInfo = this.switch._peerBook.put(peerInfo) muxedConn.once('close', () => { - // delete this.switch.muxedConnsIn[b58Str] - this.switch.connection.remove(connection) - peerInfo.disconnect() - peerInfo = this.switch._peerBook.put(peerInfo) - log(`closed connection to ${b58Str}`) - setImmediate(() => this.switch.emit('peer-mux-closed', peerInfo)) + connection.close() }) - setImmediate(() => this.switch.emit('peer-mux-established', peerInfo)) + this.switch.emit('peer-mux-established', peerInfo) }) }) } diff --git a/src/index.js b/src/index.js index 495e6eb..88e94d3 100644 --- a/src/index.js +++ b/src/index.js @@ -171,8 +171,7 @@ class Switch extends EventEmitter { hangUp (peer, callback) { const peerInfo = getPeerInfo(peer, this.peerBook) const key = peerInfo.id.toB58String() - const conns = this.connection.getAllById(key) - + const conns = [...this.connection.getAllById(key)] each(conns, (conn, cb) => { conn.once('close', cb) conn.close() @@ -251,7 +250,7 @@ class Switch extends EventEmitter { }, cb) }, cb) }, - (cb) => each(this.connection.getAll(), (conn, cb) => { + (cb) => each([...this.connection.getAll()], (conn, cb) => { conn.once('close', cb) conn.close() }, cb) diff --git a/test/dial-fsm.node.js b/test/dial-fsm.node.js index 90f25d3..ed9e959 100644 --- a/test/dial-fsm.node.js +++ b/test/dial-fsm.node.js @@ -135,55 +135,67 @@ describe('dialFSM', () => { switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) - switchB.on('peer-mux-closed', (peerInfo) => { + // 4 close checks and 1 hangup check + expect(5).checks(() => { + switchA.removeAllListeners('peer-mux-closed') switchB.removeAllListeners('peer-mux-closed') - expect(peerInfo.id.toB58String()).to.eql(switchA._peerInfo.id.toB58String()).mark() - }) - - expect(2).checks(() => { - expect(switchA.connection.getAll()).to.have.length(0) - expect(switchB.connection.getAll()).to.have.length(0) done() }) - parallel([ - (cb) => switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', cb), - (cb) => switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', cb) - ], (err) => { - if (err) return done(err) + switchA.on('peer-mux-closed', (peerInfo) => { + expect(peerInfo.id.toB58String()).to.eql(switchB._peerInfo.id.toB58String()).mark() + }) + switchB.on('peer-mux-closed', (peerInfo) => { + expect(peerInfo.id.toB58String()).to.eql(switchA._peerInfo.id.toB58String()).mark() + }) + const conn = switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', () => { // Hangup and verify the connections are closed switchA.hangUp(switchB._peerInfo, (err) => { expect(err).to.not.exist().mark() }) }) + + // Hold the dial from A, until switch B is done dialing to ensure + // we have both incoming and outgoing connections + conn._state.on('DIALING:enter', (cb) => { + switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', () => { + cb() + }) + }) }) it('parallel dials to one another should disconnect on stop', (done) => { switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) - switchB.on('peer-mux-closed', (peerInfo) => { + // 4 close checks and 1 hangup check + expect(5).checks(() => { + switchA.removeAllListeners('peer-mux-closed') switchB.removeAllListeners('peer-mux-closed') - expect(peerInfo.id.toB58String()).to.eql(switchA._peerInfo.id.toB58String()).mark() - }) - - expect(2).checks(() => { - expect(switchA.connection.getAll()).to.have.length(0) - expect(switchB.connection.getAll()).to.have.length(0) done() }) - parallel([ - (cb) => switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', cb), - (cb) => switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', cb) - ], (err) => { - if (err) return done(err) + switchA.on('peer-mux-closed', (peerInfo) => { + expect(peerInfo.id.toB58String()).to.eql(switchB._peerInfo.id.toB58String()).mark() + }) + switchB.on('peer-mux-closed', (peerInfo) => { + expect(peerInfo.id.toB58String()).to.eql(switchA._peerInfo.id.toB58String()).mark() + }) + const conn = switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', () => { // Hangup and verify the connections are closed switchA.stop((err) => { expect(err).to.not.exist().mark() }) }) + + // Hold the dial from A, until switch B is done dialing to ensure + // we have both incoming and outgoing connections + conn._state.on('DIALING:enter', (cb) => { + switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', () => { + cb() + }) + }) }) }) From 74df2503db71e536c87b1aa356665e2f40ca7091 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Fri, 14 Dec 2018 12:13:57 +0100 Subject: [PATCH 15/16] chore: fix lint --- src/connection/index.js | 1 - src/connection/manager.js | 1 - 2 files changed, 2 deletions(-) diff --git a/src/connection/index.js b/src/connection/index.js index c251ee3..f271e97 100644 --- a/src/connection/index.js +++ b/src/connection/index.js @@ -1,7 +1,6 @@ 'use strict' const FSM = require('fsm-event') -const setImmediate = require('async/setImmediate') const Circuit = require('libp2p-circuit') const multistream = require('multistream-select') const withIs = require('class-is') diff --git a/src/connection/manager.js b/src/connection/manager.js index e819b28..43b4ca0 100644 --- a/src/connection/manager.js +++ b/src/connection/manager.js @@ -6,7 +6,6 @@ const waterfall = require('async/waterfall') const debug = require('debug') const log = debug('libp2p:switch:conn-manager') const once = require('once') -const setImmediate = require('async/setImmediate') const ConnectionFSM = require('../connection') const Circuit = require('libp2p-circuit') From 888a63b9fe0bba012e3850b1bd0c28196ca0617b Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Fri, 14 Dec 2018 12:38:14 +0100 Subject: [PATCH 16/16] test: improve test reliability --- test/circuit-relay.node.js | 4 ++-- test/dial-fsm.node.js | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/test/circuit-relay.node.js b/test/circuit-relay.node.js index 11cf270..9368080 100644 --- a/test/circuit-relay.node.js +++ b/test/circuit-relay.node.js @@ -85,12 +85,12 @@ describe(`circuit`, function () { ], (err) => { expect(err).to.not.exist() expect(swarmA._peerInfo.multiaddrs.toArray().filter((a) => a.toString() - .includes(`/p2p-circuit`)).length).to.equal(3) + .includes(`/p2p-circuit`)).length).to.be.at.least(3) // ensure swarmA has had 0.0.0.0 replaced in the addresses expect(swarmA._peerInfo.multiaddrs.toArray().filter((a) => a.toString() .includes(`/0.0.0.0`)).length).to.equal(0) expect(swarmB._peerInfo.multiaddrs.toArray().filter((a) => a.toString() - .includes(`/p2p-circuit`)).length).to.equal(2) + .includes(`/p2p-circuit`)).length).to.be.at.least(2) done() }) }) diff --git a/test/dial-fsm.node.js b/test/dial-fsm.node.js index ed9e959..7a15aef 100644 --- a/test/dial-fsm.node.js +++ b/test/dial-fsm.node.js @@ -131,7 +131,9 @@ describe('dialFSM', () => { }) }) - it('parallel dials to one another should disconnect on hangup', (done) => { + it('parallel dials to one another should disconnect on hangup', function (done) { + this.timeout(10e3) + switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })