Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

improve connection close management #291

Merged
merged 16 commits into from
Dec 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,20 @@
"npm": ">=3.0.0"
},
"devDependencies": {
"aegir": "^17.0.1",
"aegir": "^17.1.1",
"chai": "^4.2.0",
"chai-checkmark": "^1.0.1",
"dirty-chai": "^2.0.1",
"libp2p-mplex": "~0.8.4",
"libp2p-pnet": "~0.1.0",
"libp2p-secio": "~0.10.1",
"libp2p-spdy": "~0.13.0",
"libp2p-spdy": "~0.13.1",
"libp2p-tcp": "~0.13.0",
"libp2p-webrtc-star": "~0.15.5",
"libp2p-webrtc-star": "~0.15.6",
"libp2p-websockets": "~0.12.0",
"peer-book": "~0.8.0",
"portfinder": "^1.0.19",
"sinon": "^7.1.1",
"peer-book": "~0.9.0",
"portfinder": "^1.0.20",
"sinon": "^7.2.0",
"webrtcsupport": "^2.2.0"
},
"dependencies": {
Expand All @@ -63,18 +63,18 @@
"debug": "^4.1.0",
"err-code": "^1.1.2",
"fsm-event": "^2.1.0",
"hashlru": "^2.2.1",
"interface-connection": "~0.3.2",
"hashlru": "^2.3.0",
"interface-connection": "~0.3.3",
"ip-address": "^5.8.9",
"libp2p-circuit": "~0.3.0",
"libp2p-circuit": "~0.3.1",
"libp2p-identify": "~0.7.2",
"lodash.includes": "^4.3.0",
"moving-average": "^1.0.0",
"multiaddr": "^5.0.2",
"multiaddr": "^6.0.0",
"multistream-select": "~0.14.3",
"once": "^1.4.0",
"peer-id": "~0.12.0",
"peer-info": "~0.14.1",
"peer-info": "~0.15.0",
"pull-stream": "^3.6.9",
"retimer": "^2.0.0"
},
Expand Down
4 changes: 3 additions & 1 deletion src/connection/incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ class IncomingConnectionFSM extends BaseConnection {
this.msListener = new multistream.Listener()

this._state = FSM('DIALED', {
DISCONNECTED: { },
DISCONNECTED: {
disconnect: 'DISCONNECTED'
},
DIALED: { // Base connection to peer established
privatize: 'PRIVATIZING',
encrypt: 'ENCRYPTING'
Expand Down
34 changes: 22 additions & 12 deletions src/connection/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict'

const FSM = require('fsm-event')
const setImmediate = require('async/setImmediate')
const Circuit = require('libp2p-circuit')
const multistream = require('multistream-select')
const withIs = require('class-is')
Expand All @@ -15,6 +14,8 @@ const Errors = require('../errors')
* @property {Switch} _switch Our switch instance
* @property {PeerInfo} peerInfo The PeerInfo of the peer to dial
* @property {Muxer} muxer Optional - A muxed connection
* @property {Connection} conn Optional - The base connection
* @property {string} type Optional - identify the connection as incoming or outgoing. Defaults to out.
*/

/**
Expand All @@ -29,16 +30,16 @@ class ConnectionFSM extends BaseConnection {
* @param {ConnectionOptions} param0
* @constructor
*/
constructor ({ _switch, peerInfo, muxer }) {
constructor ({ _switch, peerInfo, muxer, conn, type = 'out' }) {
super({
_switch,
name: `out:${_switch._peerInfo.id.toB58String().slice(0, 8)}`
name: `${type}:${_switch._peerInfo.id.toB58String().slice(0, 8)}`
})

this.theirPeerInfo = peerInfo
this.theirB58Id = this.theirPeerInfo.id.toB58String()

this.conn = null // The base connection
this.conn = conn // The base connection
this.muxer = muxer // The upgraded/muxed connection

let startState = 'DISCONNECTED'
Expand Down Expand Up @@ -114,6 +115,7 @@ class ConnectionFSM extends BaseConnection {
this._state.on('UPGRADING', () => this._onUpgrading())
this._state.on('MUXED', () => {
this.log(`successfully muxed connection to ${this.theirB58Id}`)
delete this.switch.conns[this.theirB58Id]
this.emit('muxed', this.muxer)
})
this._state.on('CONNECTED', () => {
Expand Down Expand Up @@ -166,7 +168,6 @@ class ConnectionFSM extends BaseConnection {
})
}

this.conn.setPeerInfo(this.theirPeerInfo)
this._protocolHandshake(protocol, this.conn, callback)
}

Expand Down Expand Up @@ -266,14 +267,22 @@ class ConnectionFSM extends BaseConnection {
this.muxer.end()
}

delete this.switch.muxedConns[this.theirB58Id]
this.switch.connection.remove(this)

delete this.switch.conns[this.theirB58Id]
delete this.muxer
delete this.conn

this._state('done')

setImmediate(() => this.switch.emit('peer-mux-closed', this.theirPeerInfo))
// If we have the base connection, abort it
if (this.conn) {
this.conn.source(true, () => {
this._state('done')
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
delete this.conn
})
} else {
this._state('done')
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
}
}

/**
Expand Down Expand Up @@ -352,7 +361,8 @@ class ConnectionFSM extends BaseConnection {
const conn = observeConnection(null, key, _conn, this.switch.observer)

this.muxer = this.switch.muxers[key].dialer(conn)
this.switch.muxedConns[this.theirB58Id] = this
// this.switch.muxedConns[this.theirB58Id] = this
this.switch.connection.add(this)

this.muxer.once('close', () => {
this.close()
Expand All @@ -365,7 +375,7 @@ class ConnectionFSM extends BaseConnection {
this.switch.protocolMuxer(null)(conn)
})

setImmediate(() => this.switch.emit('peer-mux-established', this.theirPeerInfo))
this.switch.emit('peer-mux-established', this.theirPeerInfo)

this._didUpgrade(null)
})
Expand Down
105 changes: 93 additions & 12 deletions src/connection/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const waterfall = require('async/waterfall')
const debug = require('debug')
const log = debug('libp2p:switch:conn-manager')
const once = require('once')
const setImmediate = require('async/setImmediate')
const ConnectionFSM = require('../connection')

const Circuit = require('libp2p-circuit')
Expand All @@ -20,6 +19,92 @@ const plaintext = require('../plaintext')
class ConnectionManager {
constructor (_switch) {
this.switch = _switch
this.connections = {}
}

/**
* Adds the connection for tracking if it's not already added
* @private
* @param {ConnectionFSM} connection
* @returns {void}
*/
add (connection) {
this.connections[connection.theirB58Id] = this.connections[connection.theirB58Id] || []
// Only add it if it's not there
if (!this.get(connection)) {
this.connections[connection.theirB58Id].push(connection)
}
}

/**
* Gets the connection from the list if it exists
* @private
* @param {ConnectionFSM} connection
* @returns {ConnectionFSM|null} The found connection or null
*/
get (connection) {
if (!this.connections[connection.theirB58Id]) return null

for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) {
if (this.connections[connection.theirB58Id][i] === connection) {
return this.connections[connection.theirB58Id][i]
}
}
return null
}

/**
* Gets a connection associated with the given peer
* @private
* @param {string} peerId The peers id
* @returns {ConnectionFSM|null} The found connection or null
*/
getOne (peerId) {
if (this.connections[peerId]) {
// TODO: Maybe select the best?
return this.connections[peerId][0]
}
return null
}

/**
* Removes the connection from tracking
* @private
* @param {ConnectionFSM} connection The connection to remove
* @returns {void}
*/
remove (connection) {
if (!this.connections[connection.theirB58Id]) return

for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) {
if (this.connections[connection.theirB58Id][i] === connection) {
this.connections[connection.theirB58Id].splice(i, 1)
return
}
}
}

/**
* Returns all connections being tracked
* @private
* @returns {ConnectionFSM[]}
*/
getAll () {
let connections = []
for (const conns of Object.values(this.connections)) {
connections = [...connections, ...conns]
}
return connections
}

/**
* Returns all connections being tracked for a given peer id
* @private
* @param {string} peerId Stringified peer id
* @returns {ConnectionFSM[]}
*/
getAllById (peerId) {
return this.connections[peerId] || []
}

/**
Expand Down Expand Up @@ -70,9 +155,6 @@ class ConnectionManager {
], (err, peerInfo) => {
if (err) {
return muxedConn.end(() => {
if (peerInfo) {
setImmediate(() => this.switch.emit('peer-mux-closed', peerInfo))
}
callback(err, null)
})
}
Expand All @@ -91,11 +173,14 @@ class ConnectionManager {
}
const b58Str = peerInfo.id.toB58String()

this.switch.muxedConns[b58Str] = new ConnectionFSM({
const connection = new ConnectionFSM({
_switch: this.switch,
peerInfo,
muxer: muxedConn
muxer: muxedConn,
conn: conn,
type: 'inc'
})
this.switch.connection.add(connection)

if (peerInfo.multiaddrs.size > 0) {
// with incomming conn and through identify, going to pick one
Expand All @@ -111,14 +196,10 @@ class ConnectionManager {
peerInfo = this.switch._peerBook.put(peerInfo)

muxedConn.once('close', () => {
delete this.switch.muxedConns[b58Str]
peerInfo.disconnect()
peerInfo = this.switch._peerBook.put(peerInfo)
log(`closed connection to ${b58Str}`)
setImmediate(() => this.switch.emit('peer-mux-closed', peerInfo))
connection.close()
})

setImmediate(() => this.switch.emit('peer-mux-established', peerInfo))
this.switch.emit('peer-mux-established', peerInfo)
})
})
}
Expand Down
5 changes: 3 additions & 2 deletions src/dialer.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ function dial (_switch, returnFSM) {

log(`dialing to ${b58Id.slice(0, 8)} with protocol ${protocol || 'unknown'}`)

let connection = _switch.muxedConns[b58Id] || _switch.conns[b58Id]
let connection = _switch.connection.getOne(b58Id)

if (!ConnectionFSM.isConnectionFSM(connection)) {
connection = new ConnectionFSM({
_switch,
peerInfo,
muxer: _switch.muxedConns[b58Id] || null
muxer: null,
conn: null
})
connection.once('error', (err) => callback(err))
connection.once('connected', () => connection.protect())
Expand Down
Loading