Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

Commit

Permalink
fix: improve connection tracking (#318)
Browse files Browse the repository at this point in the history
* fix: centralize connection events and peer connects

* fix: remove unneeded peerBook put
  • Loading branch information
jacobheun authored Apr 3, 2019
1 parent 4a543cb commit 828e685
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 40 deletions.
3 changes: 0 additions & 3 deletions src/connection/incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ class IncomingConnectionFSM extends BaseConnection {
this.emit('muxed', this.conn)
})
this._state.on('DISCONNECTING', () => {
if (this.theirPeerInfo) {
this.theirPeerInfo.disconnect()
}
this._state('done')
})
}
Expand Down
13 changes: 3 additions & 10 deletions src/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,6 @@ class ConnectionFSM extends BaseConnection {
_onDisconnecting () {
this.log('disconnecting from %s', this.theirB58Id, Boolean(this.muxer))

// Issue disconnects on both Peers
if (this.theirPeerInfo) {
this.theirPeerInfo.disconnect()
}

this.switch.connection.remove(this)

delete this.switch.conns[this.theirB58Id]
Expand All @@ -284,7 +279,6 @@ class ConnectionFSM extends BaseConnection {
tasks.push((cb) => {
this.muxer.end(() => {
delete this.muxer
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
cb()
})
})
Expand Down Expand Up @@ -325,13 +319,13 @@ class ConnectionFSM extends BaseConnection {
return this.close(maybeUnexpectedEnd(err))
}

const conn = observeConnection(null, this.switch.crypto.tag, _conn, this.switch.observer)

this.conn = this.switch.crypto.encrypt(this.ourPeerInfo.id, conn, this.theirPeerInfo.id, (err) => {
const observedConn = observeConnection(null, this.switch.crypto.tag, _conn, this.switch.observer)
const encryptedConn = this.switch.crypto.encrypt(this.ourPeerInfo.id, observedConn, this.theirPeerInfo.id, (err) => {
if (err) {
return this.close(err)
}

this.conn = encryptedConn
this.conn.setPeerInfo(this.theirPeerInfo)
this._state('done')
})
Expand Down Expand Up @@ -392,7 +386,6 @@ class ConnectionFSM extends BaseConnection {
this.switch.protocolMuxer(null)(conn)
})

this.switch.emit('peer-mux-established', this.theirPeerInfo)
this._didUpgrade(null)

// Run identify on the connection
Expand Down
44 changes: 29 additions & 15 deletions src/connection/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class ConnectionManager {
// Only add it if it's not there
if (!this.get(connection)) {
this.connections[connection.theirB58Id].push(connection)
this.switch.emit('peer-mux-established', connection.theirPeerInfo)
}
}

Expand Down Expand Up @@ -78,14 +79,26 @@ class ConnectionManager {
* @returns {void}
*/
remove (connection) {
if (!this.connections[connection.theirB58Id]) return
// No record of the peer, disconnect it
if (!this.connections[connection.theirB58Id]) {
connection.theirPeerInfo.disconnect()
this.switch.emit('peer-mux-closed', connection.theirPeerInfo)
return
}

for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) {
if (this.connections[connection.theirB58Id][i] === connection) {
this.connections[connection.theirB58Id].splice(i, 1)
return
break
}
}

// The peer is fully disconnected
if (this.connections[connection.theirB58Id].length === 0) {
delete this.connections[connection.theirB58Id]
connection.theirPeerInfo.disconnect()
this.switch.emit('peer-mux-closed', connection.theirPeerInfo)
}
}

/**
Expand Down Expand Up @@ -175,6 +188,7 @@ class ConnectionManager {
return log('identify not successful')
}
const b58Str = peerInfo.id.toB58String()
peerInfo = this.switch._peerBook.put(peerInfo)

const connection = new ConnectionFSM({
_switch: this.switch,
Expand All @@ -185,24 +199,24 @@ class ConnectionManager {
})
this.switch.connection.add(connection)

if (peerInfo.multiaddrs.size > 0) {
// with incomming conn and through identify, going to pick one
// of the available multiaddrs from the other peer as the one
// I'm connected to as we really can't be sure at the moment
// TODO add this consideration to the connection abstraction!
peerInfo.connect(peerInfo.multiaddrs.toArray()[0])
} else {
// for the case of websockets in the browser, where peers have
// no addr, use just their IPFS id
peerInfo.connect(`/ipfs/${b58Str}`)
// Only update if it's not already connected
if (!peerInfo.isConnected()) {
if (peerInfo.multiaddrs.size > 0) {
// with incomming conn and through identify, going to pick one
// of the available multiaddrs from the other peer as the one
// I'm connected to as we really can't be sure at the moment
// TODO add this consideration to the connection abstraction!
peerInfo.connect(peerInfo.multiaddrs.toArray()[0])
} else {
// for the case of websockets in the browser, where peers have
// no addr, use just their IPFS id
peerInfo.connect(`/ipfs/${b58Str}`)
}
}
peerInfo = this.switch._peerBook.put(peerInfo)

muxedConn.once('close', () => {
connection.close()
})

this.switch.emit('peer-mux-established', peerInfo)
})
})
}
Expand Down
11 changes: 4 additions & 7 deletions src/dialer/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

const ConnectionFSM = require('../connection')
const { DIAL_ABORTED, ERR_BLACKLISTED } = require('../errors')
const Connection = require('interface-connection').Connection
const nextTick = require('async/nextTick')
const once = require('once')
const debug = require('debug')
Expand Down Expand Up @@ -45,10 +44,8 @@ function createConnectionWithProtocol ({ protocol, connection, callback }) {
return callback(err)
}

const proxyConnection = new Connection()
proxyConnection.setPeerInfo(connection.theirPeerInfo)
proxyConnection.setInnerConn(conn)
callback(null, proxyConnection)
conn.setPeerInfo(connection.theirPeerInfo)
callback(null, conn)
})
}

Expand Down Expand Up @@ -192,6 +189,8 @@ class Queue {
conn: null
})

this.switch.connection.add(connectionFSM)

// Add control events and start the dialer
connectionFSM.once('connected', () => connectionFSM.protect())
connectionFSM.once('private', () => connectionFSM.encrypt())
Expand Down Expand Up @@ -252,15 +251,13 @@ class Queue {
// If we're not muxed yet, add listeners
connectionFSM.once('muxed', () => {
this.blackListCount = 0 // reset blacklisting on good connections
this.switch.connection.add(connectionFSM)
queuedDial.connection = connectionFSM
createConnectionWithProtocol(queuedDial)
next()
})

connectionFSM.once('unmuxed', () => {
this.blackListCount = 0
this.switch.connection.add(connectionFSM)
queuedDial.connection = connectionFSM
createConnectionWithProtocol(queuedDial)
next()
Expand Down
1 change: 0 additions & 1 deletion src/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ class TransportManager {
}

peerInfo.connect(success.multiaddr)
this.switch._peerBook.put(peerInfo)
callback(null, success.conn)
})
}
Expand Down
8 changes: 4 additions & 4 deletions test/dial-fsm.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ describe('dialFSM', () => {

// Expect 4 `peer-mux-established` events
expect(4).checks(() => {
// Expect 4 `peer-mux-closed`, plus 1 hangup
expect(5).checks(() => {
// Expect 2 `peer-mux-closed`, plus 1 hangup
expect(3).checks(() => {
switchA.removeAllListeners('peer-mux-closed')
switchB.removeAllListeners('peer-mux-closed')
switchA.removeAllListeners('peer-mux-established')
Expand Down Expand Up @@ -286,8 +286,8 @@ describe('dialFSM', () => {
switchA.handle(protocol, (_, conn) => { pull(conn, conn) })
switchB.handle(protocol, (_, conn) => { pull(conn, conn) })

// 4 close checks and 1 hangup check
expect(5).checks(() => {
// 2 close checks and 1 hangup check
expect(2).checks(() => {
switchA.removeAllListeners('peer-mux-closed')
switchB.removeAllListeners('peer-mux-closed')
// restart the node for subsequent tests
Expand Down

0 comments on commit 828e685

Please sign in to comment.