From aa86307adbd529e568421b0cd838c64c7036ffde Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Fri, 14 Dec 2018 16:29:24 +0100 Subject: [PATCH] fix: improve connection tracking and closing (#291) * chore: update deps * fix: check we have a proper transport before filtering addresses * fix: improve connection close on stop * fix: improve stat stopping * test: fix stats test * fix: improve tracking of open connections * chore: remove log * fix: stats stop in browser chore: fix linting and browser tests * fix: remove uneeded set peer info * fix: abort the base connection on close * fix: catch edge cases of dialTimeout calling back twice * fix: close all connections instead of checking peerbook peers * test: update dial fsm test waits * test: make parallel dial tests deterministic fix: improve logic around disconnecting fix: remove duplicate event handling logic * chore: fix lint * test: improve test reliability --- package.json | 22 ++--- src/connection/incoming.js | 4 +- src/connection/index.js | 34 +++++--- src/connection/manager.js | 105 ++++++++++++++++++++--- src/dialer.js | 5 +- src/index.js | 42 +++------ src/limit-dialer/queue.js | 2 + src/stats/index.js | 40 ++++++--- src/transport.js | 3 + test/circuit-relay.node.js | 4 +- test/dial-fsm.node.js | 85 ++++++++++++++++-- test/pnet.node.js | 10 +-- test/secio.node.js | 10 +-- test/stats.node.js | 4 +- test/stream-muxers.node.js | 16 ++-- test/swarm-muxing+webrtc-star.browser.js | 18 ++-- test/swarm-muxing.node.js | 20 ++--- test/t-webrtc-star.browser.js | 4 +- 18 files changed, 298 insertions(+), 130 deletions(-) diff --git a/package.json b/package.json index 407a51d..4718a08 100644 --- a/package.json +++ b/package.json @@ -40,20 +40,20 @@ "npm": ">=3.0.0" }, "devDependencies": { - "aegir": "^17.0.1", + "aegir": "^17.1.1", "chai": "^4.2.0", "chai-checkmark": "^1.0.1", "dirty-chai": "^2.0.1", "libp2p-mplex": "~0.8.4", "libp2p-pnet": "~0.1.0", "libp2p-secio": "~0.10.1", - "libp2p-spdy": "~0.13.0", + "libp2p-spdy": "~0.13.1", "libp2p-tcp": "~0.13.0", - "libp2p-webrtc-star": "~0.15.5", + "libp2p-webrtc-star": "~0.15.6", "libp2p-websockets": "~0.12.0", - "peer-book": "~0.8.0", - "portfinder": "^1.0.19", - "sinon": "^7.1.1", + "peer-book": "~0.9.0", + "portfinder": "^1.0.20", + "sinon": "^7.2.0", "webrtcsupport": "^2.2.0" }, "dependencies": { @@ -63,18 +63,18 @@ "debug": "^4.1.0", "err-code": "^1.1.2", "fsm-event": "^2.1.0", - "hashlru": "^2.2.1", - "interface-connection": "~0.3.2", + "hashlru": "^2.3.0", + "interface-connection": "~0.3.3", "ip-address": "^5.8.9", - "libp2p-circuit": "~0.3.0", + "libp2p-circuit": "~0.3.1", "libp2p-identify": "~0.7.2", "lodash.includes": "^4.3.0", "moving-average": "^1.0.0", - "multiaddr": "^5.0.2", + "multiaddr": "^6.0.0", "multistream-select": "~0.14.3", "once": "^1.4.0", "peer-id": "~0.12.0", - "peer-info": "~0.14.1", + "peer-info": "~0.15.0", "pull-stream": "^3.6.9", "retimer": "^2.0.0" }, diff --git a/src/connection/incoming.js b/src/connection/incoming.js index 3b2f9d0..3b2412f 100644 --- a/src/connection/incoming.js +++ b/src/connection/incoming.js @@ -20,7 +20,9 @@ class IncomingConnectionFSM extends BaseConnection { this.msListener = new multistream.Listener() this._state = FSM('DIALED', { - DISCONNECTED: { }, + DISCONNECTED: { + disconnect: 'DISCONNECTED' + }, DIALED: { // Base connection to peer established privatize: 'PRIVATIZING', encrypt: 'ENCRYPTING' diff --git a/src/connection/index.js b/src/connection/index.js index 5ce3dd9..f271e97 100644 --- a/src/connection/index.js +++ b/src/connection/index.js @@ -1,7 +1,6 @@ 'use strict' const FSM = require('fsm-event') -const setImmediate = require('async/setImmediate') const Circuit = require('libp2p-circuit') const multistream = require('multistream-select') const withIs = require('class-is') @@ -15,6 +14,8 @@ const Errors = require('../errors') * @property {Switch} _switch Our switch instance * @property {PeerInfo} peerInfo The PeerInfo of the peer to dial * @property {Muxer} muxer Optional - A muxed connection + * @property {Connection} conn Optional - The base connection + * @property {string} type Optional - identify the connection as incoming or outgoing. Defaults to out. */ /** @@ -29,16 +30,16 @@ class ConnectionFSM extends BaseConnection { * @param {ConnectionOptions} param0 * @constructor */ - constructor ({ _switch, peerInfo, muxer }) { + constructor ({ _switch, peerInfo, muxer, conn, type = 'out' }) { super({ _switch, - name: `out:${_switch._peerInfo.id.toB58String().slice(0, 8)}` + name: `${type}:${_switch._peerInfo.id.toB58String().slice(0, 8)}` }) this.theirPeerInfo = peerInfo this.theirB58Id = this.theirPeerInfo.id.toB58String() - this.conn = null // The base connection + this.conn = conn // The base connection this.muxer = muxer // The upgraded/muxed connection let startState = 'DISCONNECTED' @@ -114,6 +115,7 @@ class ConnectionFSM extends BaseConnection { this._state.on('UPGRADING', () => this._onUpgrading()) this._state.on('MUXED', () => { this.log(`successfully muxed connection to ${this.theirB58Id}`) + delete this.switch.conns[this.theirB58Id] this.emit('muxed', this.muxer) }) this._state.on('CONNECTED', () => { @@ -166,7 +168,6 @@ class ConnectionFSM extends BaseConnection { }) } - this.conn.setPeerInfo(this.theirPeerInfo) this._protocolHandshake(protocol, this.conn, callback) } @@ -266,14 +267,22 @@ class ConnectionFSM extends BaseConnection { this.muxer.end() } - delete this.switch.muxedConns[this.theirB58Id] + this.switch.connection.remove(this) + delete this.switch.conns[this.theirB58Id] delete this.muxer - delete this.conn - - this._state('done') - setImmediate(() => this.switch.emit('peer-mux-closed', this.theirPeerInfo)) + // If we have the base connection, abort it + if (this.conn) { + this.conn.source(true, () => { + this._state('done') + this.switch.emit('peer-mux-closed', this.theirPeerInfo) + delete this.conn + }) + } else { + this._state('done') + this.switch.emit('peer-mux-closed', this.theirPeerInfo) + } } /** @@ -352,7 +361,8 @@ class ConnectionFSM extends BaseConnection { const conn = observeConnection(null, key, _conn, this.switch.observer) this.muxer = this.switch.muxers[key].dialer(conn) - this.switch.muxedConns[this.theirB58Id] = this + // this.switch.muxedConns[this.theirB58Id] = this + this.switch.connection.add(this) this.muxer.once('close', () => { this.close() @@ -365,7 +375,7 @@ class ConnectionFSM extends BaseConnection { this.switch.protocolMuxer(null)(conn) }) - setImmediate(() => this.switch.emit('peer-mux-established', this.theirPeerInfo)) + this.switch.emit('peer-mux-established', this.theirPeerInfo) this._didUpgrade(null) }) diff --git a/src/connection/manager.js b/src/connection/manager.js index 571c5c0..43b4ca0 100644 --- a/src/connection/manager.js +++ b/src/connection/manager.js @@ -6,7 +6,6 @@ const waterfall = require('async/waterfall') const debug = require('debug') const log = debug('libp2p:switch:conn-manager') const once = require('once') -const setImmediate = require('async/setImmediate') const ConnectionFSM = require('../connection') const Circuit = require('libp2p-circuit') @@ -20,6 +19,92 @@ const plaintext = require('../plaintext') class ConnectionManager { constructor (_switch) { this.switch = _switch + this.connections = {} + } + + /** + * Adds the connection for tracking if it's not already added + * @private + * @param {ConnectionFSM} connection + * @returns {void} + */ + add (connection) { + this.connections[connection.theirB58Id] = this.connections[connection.theirB58Id] || [] + // Only add it if it's not there + if (!this.get(connection)) { + this.connections[connection.theirB58Id].push(connection) + } + } + + /** + * Gets the connection from the list if it exists + * @private + * @param {ConnectionFSM} connection + * @returns {ConnectionFSM|null} The found connection or null + */ + get (connection) { + if (!this.connections[connection.theirB58Id]) return null + + for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) { + if (this.connections[connection.theirB58Id][i] === connection) { + return this.connections[connection.theirB58Id][i] + } + } + return null + } + + /** + * Gets a connection associated with the given peer + * @private + * @param {string} peerId The peers id + * @returns {ConnectionFSM|null} The found connection or null + */ + getOne (peerId) { + if (this.connections[peerId]) { + // TODO: Maybe select the best? + return this.connections[peerId][0] + } + return null + } + + /** + * Removes the connection from tracking + * @private + * @param {ConnectionFSM} connection The connection to remove + * @returns {void} + */ + remove (connection) { + if (!this.connections[connection.theirB58Id]) return + + for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) { + if (this.connections[connection.theirB58Id][i] === connection) { + this.connections[connection.theirB58Id].splice(i, 1) + return + } + } + } + + /** + * Returns all connections being tracked + * @private + * @returns {ConnectionFSM[]} + */ + getAll () { + let connections = [] + for (const conns of Object.values(this.connections)) { + connections = [...connections, ...conns] + } + return connections + } + + /** + * Returns all connections being tracked for a given peer id + * @private + * @param {string} peerId Stringified peer id + * @returns {ConnectionFSM[]} + */ + getAllById (peerId) { + return this.connections[peerId] || [] } /** @@ -70,9 +155,6 @@ class ConnectionManager { ], (err, peerInfo) => { if (err) { return muxedConn.end(() => { - if (peerInfo) { - setImmediate(() => this.switch.emit('peer-mux-closed', peerInfo)) - } callback(err, null) }) } @@ -91,11 +173,14 @@ class ConnectionManager { } const b58Str = peerInfo.id.toB58String() - this.switch.muxedConns[b58Str] = new ConnectionFSM({ + const connection = new ConnectionFSM({ _switch: this.switch, peerInfo, - muxer: muxedConn + muxer: muxedConn, + conn: conn, + type: 'inc' }) + this.switch.connection.add(connection) if (peerInfo.multiaddrs.size > 0) { // with incomming conn and through identify, going to pick one @@ -111,14 +196,10 @@ class ConnectionManager { peerInfo = this.switch._peerBook.put(peerInfo) muxedConn.once('close', () => { - delete this.switch.muxedConns[b58Str] - peerInfo.disconnect() - peerInfo = this.switch._peerBook.put(peerInfo) - log(`closed connection to ${b58Str}`) - setImmediate(() => this.switch.emit('peer-mux-closed', peerInfo)) + connection.close() }) - setImmediate(() => this.switch.emit('peer-mux-established', peerInfo)) + this.switch.emit('peer-mux-established', peerInfo) }) }) } diff --git a/src/dialer.js b/src/dialer.js index 1a60cce..01a7ad6 100644 --- a/src/dialer.js +++ b/src/dialer.js @@ -54,13 +54,14 @@ function dial (_switch, returnFSM) { log(`dialing to ${b58Id.slice(0, 8)} with protocol ${protocol || 'unknown'}`) - let connection = _switch.muxedConns[b58Id] || _switch.conns[b58Id] + let connection = _switch.connection.getOne(b58Id) if (!ConnectionFSM.isConnectionFSM(connection)) { connection = new ConnectionFSM({ _switch, peerInfo, - muxer: _switch.muxedConns[b58Id] || null + muxer: null, + conn: null }) connection.once('error', (err) => callback(err)) connection.once('connected', () => connection.protect()) diff --git a/src/index.js b/src/index.js index 7b22d50..88e94d3 100644 --- a/src/index.js +++ b/src/index.js @@ -44,14 +44,6 @@ class Switch extends EventEmitter { // { peerIdB58: { conn: }} this.conns = {} - // { - // peerIdB58: { - // muxer: - // conn: // to extract info required for the Identify Protocol - // } - // } - this.muxedConns = {} - // { protocol: handler } this.protocols = {} @@ -94,7 +86,10 @@ class Switch extends EventEmitter { stop: 'STOPPING', start: 'STARTED' }, - STOPPING: { done: 'STOPPED' } + STOPPING: { + stop: 'STOPPING', + done: 'STOPPED' + } }) this.state.on('STARTING', () => { log('The switch is starting') @@ -176,16 +171,11 @@ class Switch extends EventEmitter { hangUp (peer, callback) { const peerInfo = getPeerInfo(peer, this.peerBook) const key = peerInfo.id.toB58String() - if (this.muxedConns[key]) { - const conn = this.muxedConns[key] - conn.once('close', () => { - delete this.muxedConns[key] - callback() - }) + const conns = [...this.connection.getAllById(key)] + each(conns, (conn, cb) => { + conn.once('close', cb) conn.close() - } else { - callback() - } + }, callback) } /** @@ -231,6 +221,7 @@ class Switch extends EventEmitter { * @returns {void} */ _onStarting () { + this.stats.start() eachSeries(this.availableTransports(this._peerInfo), (ts, cb) => { // Listen on the given transport this.transport.listen(ts, {}, null, cb) @@ -252,22 +243,17 @@ class Switch extends EventEmitter { _onStopping () { this.stats.stop() series([ - (cb) => each(this.muxedConns, (conn, cb) => { - // If the connection was destroyed while we are hanging up, continue - if (!conn) { - return cb() - } - - conn.once('close', cb) - conn.close() - }, cb), (cb) => { each(this.transports, (transport, cb) => { each(transport.listeners, (listener, cb) => { listener.close(cb) }, cb) }, cb) - } + }, + (cb) => each([...this.connection.getAll()], (conn, cb) => { + conn.once('close', cb) + conn.close() + }, cb) ], (_) => { this.state('done') }) diff --git a/src/limit-dialer/queue.js b/src/limit-dialer/queue.js index bd55340..d003cbd 100644 --- a/src/limit-dialer/queue.js +++ b/src/limit-dialer/queue.js @@ -5,6 +5,7 @@ const pull = require('pull-stream') const timeout = require('async/timeout') const queue = require('async/queue') const debug = require('debug') +const once = require('once') const log = debug('libp2p:switch:dialer:queue') log.error = debug('libp2p:switch:dialer:queue:error') @@ -38,6 +39,7 @@ class DialQueue { * @private */ _doWork (transport, addr, token, callback) { + callback = once(callback) log(`${transport.constructor.name}:work:start`) this._dialWithTimeout(transport, addr, (err, conn) => { if (err) { diff --git a/src/stats/index.js b/src/stats/index.js index bf79f51..0948e05 100644 --- a/src/stats/index.js +++ b/src/stats/index.js @@ -40,6 +40,7 @@ module.exports = (observer, _options) => { const globalStats = new Stat(initialCounters, options) const stats = Object.assign(new EventEmitter(), { + start: start, stop: stop, global: globalStats, peers: () => Array.from(peerStats.keys()), @@ -59,7 +60,19 @@ module.exports = (observer, _options) => { const transportStats = new Map() const protocolStats = new Map() - observer.on('message', (peerId, transportTag, protocolTag, direction, bufferLength) => { + observer.on('peer:closed', (peerId) => { + const peer = peerStats.get(peerId) + if (peer) { + peer.removeListener('update', propagateChange) + peer.stop() + peerStats.delete(peerId) + oldPeers.set(peerId, peer) + } + }) + + return stats + + function onMessage (peerId, transportTag, protocolTag, direction, bufferLength) { const event = directionToEvent[direction] if (transportTag) { @@ -104,22 +117,25 @@ module.exports = (observer, _options) => { } protocol.push(event, bufferLength) } - }) + } - observer.on('peer:closed', (peerId) => { - const peer = peerStats.get(peerId) - if (peer) { - peer.removeListener('update', propagateChange) - peer.stop() - peerStats.delete(peerId) - oldPeers.set(peerId, peer) - } - }) + function start () { + observer.on('message', onMessage) - return stats + globalStats.start() + + for (let peerStat of peerStats.values()) { + peerStat.start() + } + for (let transportStat of transportStats.values()) { + transportStat.start() + } + } function stop () { + observer.removeListener('message', onMessage) globalStats.stop() + for (let peerStat of peerStats.values()) { peerStat.stop() } diff --git a/src/transport.js b/src/transport.js index b8321df..32c83c4 100644 --- a/src/transport.js +++ b/src/transport.js @@ -203,6 +203,9 @@ class TransportManager { * @returns {Array} */ static dialables (transport, multiaddrs, peerInfo) { + // If we dont have a proper transport, return no multiaddrs + if (!transport || !transport.filter) return [] + const transportAddrs = transport.filter(multiaddrs) if (!peerInfo) { return transportAddrs diff --git a/test/circuit-relay.node.js b/test/circuit-relay.node.js index 11cf270..9368080 100644 --- a/test/circuit-relay.node.js +++ b/test/circuit-relay.node.js @@ -85,12 +85,12 @@ describe(`circuit`, function () { ], (err) => { expect(err).to.not.exist() expect(swarmA._peerInfo.multiaddrs.toArray().filter((a) => a.toString() - .includes(`/p2p-circuit`)).length).to.equal(3) + .includes(`/p2p-circuit`)).length).to.be.at.least(3) // ensure swarmA has had 0.0.0.0 replaced in the addresses expect(swarmA._peerInfo.multiaddrs.toArray().filter((a) => a.toString() .includes(`/0.0.0.0`)).length).to.equal(0) expect(swarmB._peerInfo.multiaddrs.toArray().filter((a) => a.toString() - .includes(`/p2p-circuit`)).length).to.equal(2) + .includes(`/p2p-circuit`)).length).to.be.at.least(2) done() }) }) diff --git a/test/dial-fsm.node.js b/test/dial-fsm.node.js index 8c188b5..7a15aef 100644 --- a/test/dial-fsm.node.js +++ b/test/dial-fsm.node.js @@ -12,6 +12,7 @@ const WS = require('libp2p-websockets') const TCP = require('libp2p-tcp') const secio = require('libp2p-secio') const multiplex = require('libp2p-mplex') +const pull = require('pull-stream') const utils = require('./utils') const createInfos = utils.createInfos @@ -99,14 +100,12 @@ describe('dialFSM', () => { const connFSM = switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', (err) => { expect(err).to.not.exist() - expect(switchA.muxedConns).to.have.property(switchB._peerInfo.id.toB58String()) + expect(switchA.connection.getAllById(switchB._peerInfo.id.toB58String())).to.have.length(1) connFSM.close() }) connFSM.once('close', () => { - expect(switchA.muxedConns).to.not.have.any.keys([ - switchB._peerInfo.id.toB58String() - ]) + expect(switchA.connection.getAllById(switchB._peerInfo.id.toB58String())).to.have.length(0) done() }) }) @@ -121,16 +120,84 @@ describe('dialFSM', () => { switchB.on('peer-mux-established', (peerInfo) => { if (peerInfo.id.toB58String() === peerIdA) { switchB.removeAllListeners('peer-mux-established') - expect(switchB.muxedConns).to.have.property(peerIdA).mark() - switchB.muxedConns[peerIdA].close() + expect(switchB.connection.getAllById(peerIdA)).to.have.length(1).mark() + switchB.connection.getOne(peerIdA).close() } }) const connFSM = switchA.dialFSM(switchB._peerInfo, '/closed/1.0.0', () => { }) connFSM.once('close', () => { - expect(switchA.muxedConns).to.not.have.any.keys([ - switchB._peerInfo.id.toB58String() - ]).mark() + expect(switchA.connection.getAllById(switchB._peerInfo.id.toB58String())).to.have.length(0).mark() + }) + }) + + it('parallel dials to one another should disconnect on hangup', function (done) { + this.timeout(10e3) + + switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) + switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) + + // 4 close checks and 1 hangup check + expect(5).checks(() => { + switchA.removeAllListeners('peer-mux-closed') + switchB.removeAllListeners('peer-mux-closed') + done() + }) + + switchA.on('peer-mux-closed', (peerInfo) => { + expect(peerInfo.id.toB58String()).to.eql(switchB._peerInfo.id.toB58String()).mark() + }) + switchB.on('peer-mux-closed', (peerInfo) => { + expect(peerInfo.id.toB58String()).to.eql(switchA._peerInfo.id.toB58String()).mark() + }) + + const conn = switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', () => { + // Hangup and verify the connections are closed + switchA.hangUp(switchB._peerInfo, (err) => { + expect(err).to.not.exist().mark() + }) + }) + + // Hold the dial from A, until switch B is done dialing to ensure + // we have both incoming and outgoing connections + conn._state.on('DIALING:enter', (cb) => { + switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', () => { + cb() + }) + }) + }) + + it('parallel dials to one another should disconnect on stop', (done) => { + switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) + switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) }) + + // 4 close checks and 1 hangup check + expect(5).checks(() => { + switchA.removeAllListeners('peer-mux-closed') + switchB.removeAllListeners('peer-mux-closed') + done() + }) + + switchA.on('peer-mux-closed', (peerInfo) => { + expect(peerInfo.id.toB58String()).to.eql(switchB._peerInfo.id.toB58String()).mark() + }) + switchB.on('peer-mux-closed', (peerInfo) => { + expect(peerInfo.id.toB58String()).to.eql(switchA._peerInfo.id.toB58String()).mark() + }) + + const conn = switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', () => { + // Hangup and verify the connections are closed + switchA.stop((err) => { + expect(err).to.not.exist().mark() + }) + }) + + // Hold the dial from A, until switch B is done dialing to ensure + // we have both incoming and outgoing connections + conn._state.on('DIALING:enter', (cb) => { + switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', () => { + cb() + }) }) }) }) diff --git a/test/pnet.node.js b/test/pnet.node.js index 036eb80..0c680a2 100644 --- a/test/pnet.node.js +++ b/test/pnet.node.js @@ -98,7 +98,7 @@ describe('Private Network', function () { switchA.dial(switchB._peerInfo, '/abacaxi/1.0.0', (err, conn) => { expect(err).to.not.exist() - expect(Object.keys(switchA.muxedConns).length).to.equal(1) + expect(switchA.connection.getAll()).to.have.length(1) tryEcho(conn, done) }) }) @@ -107,7 +107,7 @@ describe('Private Network', function () { switchB.dial(switchA._peerInfo, (err) => { expect(err).to.not.exist() expect(Object.keys(switchB.conns).length).to.equal(0) - expect(Object.keys(switchB.muxedConns).length).to.equal(1) + expect(switchB.connection.getAll()).to.have.length(1) done() }) }) @@ -118,7 +118,7 @@ describe('Private Network', function () { switchB.dial(switchA._peerInfo, '/papaia/1.0.0', (err, conn) => { expect(err).to.not.exist() expect(Object.keys(switchB.conns).length).to.equal(0) - expect(Object.keys(switchB.muxedConns).length).to.equal(1) + expect(switchB.connection.getAll()).to.have.length(1) tryEcho(conn, done) }) }) @@ -130,8 +130,8 @@ describe('Private Network', function () { switchC.dial(switchA._peerInfo, (err) => { expect(err).to.not.exist() setTimeout(() => { - expect(Object.keys(switchC.muxedConns).length).to.equal(1) - expect(Object.keys(switchA.muxedConns).length).to.equal(2) + expect(switchC.connection.getAll()).to.have.length(1) + expect(switchA.connection.getAll()).to.have.length(2) done() }, 500) }) diff --git a/test/secio.node.js b/test/secio.node.js index 6a710a8..a60c2c3 100644 --- a/test/secio.node.js +++ b/test/secio.node.js @@ -70,7 +70,7 @@ describe('SECIO', () => { switchA.dial(switchB._peerInfo, '/abacaxi/1.0.0', (err, conn) => { expect(err).to.not.exist() - expect(Object.keys(switchA.muxedConns).length).to.equal(1) + expect(switchA.connection.getAll()).to.have.length(1) tryEcho(conn, done) }) }) @@ -79,7 +79,7 @@ describe('SECIO', () => { switchB.dial(switchA._peerInfo, (err) => { expect(err).to.not.exist() expect(Object.keys(switchB.conns).length).to.equal(0) - expect(Object.keys(switchB.muxedConns).length).to.equal(1) + expect(switchB.connection.getAll()).to.have.length(1) done() }) }) @@ -90,7 +90,7 @@ describe('SECIO', () => { switchB.dial(switchA._peerInfo, '/papaia/1.0.0', (err, conn) => { expect(err).to.not.exist() expect(Object.keys(switchB.conns).length).to.equal(0) - expect(Object.keys(switchB.muxedConns).length).to.equal(1) + expect(switchB.connection.getAll()).to.have.length(1) tryEcho(conn, done) }) }) @@ -102,8 +102,8 @@ describe('SECIO', () => { switchC.dial(switchA._peerInfo, (err) => { expect(err).to.not.exist() setTimeout(() => { - expect(Object.keys(switchC.muxedConns).length).to.equal(1) - expect(Object.keys(switchA.muxedConns).length).to.equal(2) + expect(switchC.connection.getAll()).to.have.length(1) + expect(switchA.connection.getAll()).to.have.length(2) done() }, 500) }) diff --git a/test/stats.node.js b/test/stats.node.js index 3469bea..e284f02 100644 --- a/test/stats.node.js +++ b/test/stats.node.js @@ -50,8 +50,8 @@ describe('Stats', () => { switchB.connection.addStreamMuxer(multiplex) parallel([ - (cb) => switchA.transport.listen('tcp', {}, null, cb), - (cb) => switchB.transport.listen('tcp', {}, null, cb) + (cb) => switchA.start(cb), + (cb) => switchB.start(cb) ], (err) => { if (err) { cb(err) diff --git a/test/stream-muxers.node.js b/test/stream-muxers.node.js index b410e60..c1ff0f1 100644 --- a/test/stream-muxers.node.js +++ b/test/stream-muxers.node.js @@ -72,7 +72,7 @@ describe('Stream Multiplexing', () => { switchA.dial(switchB._peerInfo, '/abacaxi/1.0.0', (err, conn) => { expect(err).to.not.exist() - expect(Object.keys(switchA.muxedConns).length).to.equal(1) + expect(switchA.connection.getAll()).to.have.length(1) tryEcho(conn, done) }) @@ -83,7 +83,7 @@ describe('Stream Multiplexing', () => { expect(err).to.not.exist() expect(Object.keys(switchB.conns).length).to.equal(0) - expect(Object.keys(switchB.muxedConns).length).to.equal(1) + expect(switchB.connection.getAll()).to.have.length(1) done() }) }) @@ -94,7 +94,7 @@ describe('Stream Multiplexing', () => { switchB.dial(switchA._peerInfo, '/papaia/1.0.0', (err, conn) => { expect(err).to.not.exist() expect(Object.keys(switchB.conns).length).to.equal(0) - expect(Object.keys(switchB.muxedConns).length).to.equal(1) + expect(switchB.connection.getAll()).to.have.length(1) tryEcho(conn, done) }) @@ -107,8 +107,8 @@ describe('Stream Multiplexing', () => { switchC.dial(switchA._peerInfo, (err) => { expect(err).to.not.exist() setTimeout(() => { - expect(Object.keys(switchC.muxedConns).length).to.equal(1) - expect(Object.keys(switchA.muxedConns).length).to.equal(2) + expect(switchC.connection.getAll()).to.have.length(1) + expect(switchA.connection.getAll()).to.have.length(2) done() }, 500) }) @@ -127,8 +127,8 @@ describe('Stream Multiplexing', () => { switchC.dial(switchA._peerInfo, '/banana/1.0.0', (err, conn) => { expect(err).to.not.exist() setTimeout(() => { - expect(Object.keys(switchC.muxedConns).length).to.equal(1) - expect(Object.keys(switchA.muxedConns).length).to.equal(2) + expect(switchC.connection.getAll()).to.have.length(1) + expect(switchA.connection.getAll()).to.have.length(2) conn.getPeerInfo((err, pi) => { expect(err).to.not.exist() @@ -144,7 +144,7 @@ describe('Stream Multiplexing', () => { expect(err).to.not.exist() setTimeout(() => { - expect(Object.keys(switchA.muxedConns).length).to.equal(1) + expect(switchA.connection.getAll()).to.have.length(1) done() }, 500) }) diff --git a/test/swarm-muxing+webrtc-star.browser.js b/test/swarm-muxing+webrtc-star.browser.js index d5dd645..6ecf25b 100644 --- a/test/swarm-muxing+webrtc-star.browser.js +++ b/test/swarm-muxing+webrtc-star.browser.js @@ -31,7 +31,7 @@ describe('Switch (webrtc-star)', () => { (cb) => peerId.create((err, id1) => { expect(err).to.not.exist() peer1 = new PeerInfo(id1) - const ma1 = 'ip4/127.0.0.1/tcp/15555/ws/p2p-webrtc-star/ipfs/' + + const ma1 = '/ip4/127.0.0.1/tcp/15555/ws/p2p-webrtc-star/ipfs/' + id1.toB58String() peer1.multiaddrs.add(ma1) cb() @@ -86,10 +86,10 @@ describe('Switch (webrtc-star)', () => { it('dial on proto', (done) => { switch1.dial(peer2, '/echo/1.0.0', (err, conn) => { expect(err).to.not.exist() - expect(Object.keys(switch1.muxedConns).length).to.equal(1) + expect(switch1.connection.getAll()).to.have.length(1) tryEcho(conn, () => { - expect(Object.keys(switch2.muxedConns).length).to.equal(1) + expect(switch2.connection.getAll()).to.have.length(1) done() }) }) @@ -104,12 +104,12 @@ describe('Switch (webrtc-star)', () => { function check () { if (++counter === 4) { - const s1n = Object.keys(switch1.muxedConns).length - const s2n = Object.keys(switch2.muxedConns).length - const s3n = Object.keys(switch3.muxedConns).length - expect(s1n).to.equal(2) - expect(s2n).to.equal(2) - expect(s3n).to.equal(2) + const s1n = switch1.connection.getAll() + const s2n = switch2.connection.getAll() + const s3n = switch3.connection.getAll() + expect(s1n).to.have.length(2) + expect(s2n).to.have.length(2) + expect(s3n).to.have.length(2) switch3.stop(done) } if (counter === 3) { diff --git a/test/swarm-muxing.node.js b/test/swarm-muxing.node.js index 063f3f6..47feaf2 100644 --- a/test/swarm-muxing.node.js +++ b/test/swarm-muxing.node.js @@ -118,7 +118,7 @@ describe('Switch (everything all together)', () => { }), (cb) => switchA.dial(switchB._peerInfo, (err) => { expect(err).to.not.exist() - expect(Object.keys(switchA.muxedConns).length).to.equal(1) + expect(switchA.connection.getAll()).to.have.length(1) cb() }) ], done) @@ -127,7 +127,7 @@ describe('Switch (everything all together)', () => { it('warm up a warmed up, from B to A', (done) => { switchB.dial(switchA._peerInfo, (err) => { expect(err).to.not.exist() - expect(Object.keys(switchA.muxedConns).length).to.equal(1) + expect(switchA.connection.getAll()).to.have.length(1) done() }) }) @@ -137,7 +137,7 @@ describe('Switch (everything all together)', () => { switchA.dial(switchB._peerInfo, '/anona/1.0.0', (err, conn) => { expect(err).to.not.exist() - expect(Object.keys(switchA.muxedConns).length).to.equal(1) + expect(switchA.connection.getAll()).to.have.length(1) tryEcho(conn, done) }) }) @@ -145,7 +145,7 @@ describe('Switch (everything all together)', () => { it('dial from ws to ws no proto', (done) => { switchD.dial(switchE._peerInfo, (err) => { expect(err).to.not.exist() - expect(Object.keys(switchD.muxedConns).length).to.equal(1) + expect(switchD.connection.getAll()).to.have.length(1) done() }) }) @@ -155,10 +155,10 @@ describe('Switch (everything all together)', () => { switchD.dial(switchE._peerInfo, '/abacaxi/1.0.0', (err, conn) => { expect(err).to.not.exist() - expect(Object.keys(switchD.muxedConns).length).to.equal(1) + expect(switchD.connection.getAll()).to.have.length(1) tryEcho(conn, () => setTimeout(() => { - expect(Object.keys(switchE.muxedConns).length).to.equal(1) + expect(switchE.connection.getAll()).to.have.length(1) done() }, 1000)) }) @@ -169,7 +169,7 @@ describe('Switch (everything all together)', () => { const conn = switchA.dial(switchB._peerInfo, '/grapes/1.0.0', (err, conn) => { expect(err).to.not.exist() - expect(Object.keys(switchA.muxedConns).length).to.equal(1) + expect(switchA.connection.getAll()).to.have.length(1) }) tryEcho(conn, done) @@ -202,7 +202,7 @@ describe('Switch (everything all together)', () => { check() }) - expect(Object.keys(switchA.muxedConns).length).to.equal(2) + expect(switchA.connection.getAll()).to.have.length(2) expect(switchC._peerInfo.isConnected).to.exist() expect(switchA._peerInfo.isConnected).to.exist() @@ -215,13 +215,13 @@ describe('Switch (everything all together)', () => { const ready = () => ++count === 3 ? done() : null switchB.once('peer-mux-closed', (peerInfo) => { - expect(Object.keys(switchB.muxedConns).length).to.equal(0) + expect(switchB.connection.getAll()).to.have.length(0) expect(switchB._peerInfo.isConnected()).to.not.exist() ready() }) switchA.once('peer-mux-closed', (peerInfo) => { - expect(Object.keys(switchA.muxedConns).length).to.equal(1) + expect(switchA.connection.getAll()).to.have.length(1) expect(switchA._peerInfo.isConnected()).to.not.exist() ready() }) diff --git a/test/t-webrtc-star.browser.js b/test/t-webrtc-star.browser.js index 825e2fe..3a6d7f3 100644 --- a/test/t-webrtc-star.browser.js +++ b/test/t-webrtc-star.browser.js @@ -24,13 +24,13 @@ describe('transport - webrtc-star', () => { .createFromB58String('QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSooooA') const peer1 = new PeerInfo(id1) - const ma1 = 'ip4/127.0.0.1/tcp/15555/ws/p2p-webrtc-star/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSooooA' + const ma1 = '/ip4/127.0.0.1/tcp/15555/ws/p2p-webrtc-star/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSooooA' peer1.multiaddrs.add(ma1) const id2 = PeerId .createFromB58String('QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSooooB') const peer2 = new PeerInfo(id2) - const ma2 = 'ip4/127.0.0.1/tcp/15555/ws/p2p-webrtc-star/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSooooB' + const ma2 = '/ip4/127.0.0.1/tcp/15555/ws/p2p-webrtc-star/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSooooB' peer2.multiaddrs.add(ma2) switch1 = new Switch(peer1, new PeerBook())