Skip to content

Commit

Permalink
chore: refactor connection manager and registrar
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Apr 27, 2020
1 parent 9e0097c commit 49bfb49
Show file tree
Hide file tree
Showing 18 changed files with 424 additions and 411 deletions.
48 changes: 43 additions & 5 deletions doc/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
* [`handle`](#handle)
* [`unhandle`](#unhandle)
* [`ping`](#ping)
* [`peerRouting.findPeer`](#peerroutingfindpeer)
* [`contentRouting.findProviders`](#contentroutingfindproviders)
* [`contentRouting.provide`](#contentroutingprovide)
* [`contentRouting.put`](#contentroutingput)
* [`contentRouting.get`](#contentroutingget)
* [`contentRouting.getMany`](#contentroutinggetmany)
* [`peerRouting.findPeer`](#peerroutingfindpeer)
* [`peerStore.addressBook.add`](#peerstoreaddressbookadd)
* [`peerStore.addressBook.delete`](#peerstoreaddressbookdelete)
* [`peerStore.addressBook.get`](#peerstoreaddressbookget)
Expand All @@ -34,14 +34,17 @@
* [`pubsub.publish`](#pubsubpublish)
* [`pubsub.subscribe`](#pubsubsubscribe)
* [`pubsub.unsubscribe`](#pubsubunsubscribe)
* [`connectionManager.get`](#connectionmanagerget)
* [`connectionManager.setPeerValue`](#connectionmanagersetpeervalue)
* [`connectionManager.size`](#connectionmanagersize)
* [`metrics.global`](#metricsglobal)
* [`metrics.peers`](#metricspeers)
* [`metrics.protocols`](#metricsprotocols)
* [`metrics.forPeer`](#metricsforpeer)
* [`metrics.forProtocol`](#metricsforprotocol)
* [Events](#events)
* [`libp2p`](#libp2p)
* [`libp2p.connectionManager`](#libp2pconnectionmanager)
* [`libp2p.peerStore`](#libp2ppeerStore)
* [Types](#types)
* [`Stats`](#stats)
Expand Down Expand Up @@ -999,6 +1002,28 @@ const handler = (msg) => {
libp2p.pubsub.unsubscribe(topic, handler)
```

### connectionManager.get

Get a connection with a given peer, if it exists.

#### Parameters

| Name | Type | Description |
|------|------|-------------|
| peerId | [`PeerId`][peer-id] | The peer to find |

#### Returns

| Type | Description |
|------|-------------|
| [`Connection`][connection] | Connection with the given peer |

#### Example

```js
libp2p.connectionManager.get(peerId)
```

### connectionManager.setPeerValue

Enables users to change the value of certain peers in a range of 0 to 1. Peers with the lowest values will have their Connections pruned first, if any Connection Manager limits are exceeded. See [./CONFIGURATION.md#configuring-connection-manager](./CONFIGURATION.md#configuring-connection-manager) for details on how to configure these limits.
Expand All @@ -1025,6 +1050,17 @@ libp2p.connectionManager.setPeerValue(highPriorityPeerId, 1)
libp2p.connectionManager.setPeerValue(lowPriorityPeerId, 0)
```

### connectionManager.size

Getter for obtaining the current number of open connections.

#### Example

```js
libp2p.connectionManager.size
// 10
```

### metrics.global

A [`Stats`](#stats) object of tracking the global bandwidth of the libp2p node.
Expand Down Expand Up @@ -1126,21 +1162,23 @@ unless they are performing a specific action. See [peer discovery and auto dial]

- `peer`: instance of [`PeerId`][peer-id]

### libp2p.connectionManager

#### A new connection to a peer has been opened

This event will be triggered anytime a new Connection is established to another peer.

`libp2p.on('peer:connect', (peer) => {})`
`libp2p.on('peer:connect', (connection) => {})`

- `peer`: instance of [`PeerId`][peer-id]
- `connection`: instance of [`Connection`][connection]

#### An existing connection to a peer has been closed

This event will be triggered anytime we are disconnected from another peer, regardless of the circumstances of that disconnection. If we happen to have multiple connections to a peer, this event will **only** be triggered when the last connection is closed.

`libp2p.on('peer:disconnect', (peer) => {})`
`libp2p.on('peer:disconnect', (connection) => {})`

- `peer`: instance of [`PeerId`][peer-id]
- `connection`: instance of [`Connection`][connection]

### libp2p.peerStore

Expand Down
2 changes: 1 addition & 1 deletion src/circuit/circuit/hop.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ module.exports.handleHop = async function handleHop ({
// Get the connection to the destination (stop) peer
const destinationPeer = new PeerId(request.dstPeer.id)

const destinationConnection = circuit._registrar.getConnection(destinationPeer)
const destinationConnection = circuit._connectionManager.get(destinationPeer)
if (!destinationConnection && !circuit._options.hop.active) {
log('HOP request received but we are not connected to the destination peer')
return streamHandler.end({
Expand Down
3 changes: 2 additions & 1 deletion src/circuit/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class Circuit {
constructor ({ libp2p, upgrader }) {
this._dialer = libp2p.dialer
this._registrar = libp2p.registrar
this._connectionManager = libp2p.connectionManager
this._upgrader = upgrader
this._options = libp2p._config.relay
this.addresses = libp2p.addresses
Expand Down Expand Up @@ -107,7 +108,7 @@ class Circuit {
const destinationPeer = PeerId.createFromCID(destinationAddr.getPeerId())

let disconnectOnFailure = false
let relayConnection = this._registrar.getConnection(relayPeer)
let relayConnection = this._connectionManager.get(relayPeer)
if (!relayConnection) {
relayConnection = await this._dialer.connectToPeer(relayAddr, options)
disconnectOnFailure = true
Expand Down
122 changes: 107 additions & 15 deletions src/connection-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ const LatencyMonitor = require('latency-monitor').default
const debug = require('debug')('libp2p:connection-manager')
const retimer = require('retimer')

const { EventEmitter } = require('events')

const PeerId = require('peer-id')
const { Connection } = require('libp2p-interfaces/src/connection')

const {
ERR_INVALID_PARAMETERS
} = require('../errors')
Expand All @@ -22,7 +27,12 @@ const defaultOptions = {
defaultPeerValue: 1
}

class ConnectionManager {
/**
* Responsible for managing known connections.
* @fires ConnectionManager#peer:connect Emitted when a new peer is connected.
* @fires ConnectionManager#peer:disconnect Emitted when a peer is disconnected.
*/
class ConnectionManager extends EventEmitter {
/**
* @constructor
* @param {Libp2p} libp2p
Expand All @@ -38,30 +48,50 @@ class ConnectionManager {
* @param {Number} options.defaultPeerValue The value of the peer. Default=1
*/
constructor (libp2p, options) {
super()

this._libp2p = libp2p
this._registrar = libp2p.registrar
this._peerId = libp2p.peerId.toB58String()

this._options = mergeOptions.call({ ignoreUndefined: true }, defaultOptions, options)
if (this._options.maxConnections < this._options.minConnections) {
throw errcode(new Error('Connection Manager maxConnections must be greater than minConnections'), ERR_INVALID_PARAMETERS)
}

debug('options: %j', this._options)

this._metrics = libp2p.metrics
this._libp2p = libp2p

/**
* Map of peer identifiers to their peer value for pruning connections.
* @type {Map<string, number>}
*/
this._peerValues = new Map()
this._connections = new Map()

/**
* Map of connections per peer
* @type {Map<string, Array<conn>>}
*/
this.connections = new Map()

this._timer = null
this._checkMetrics = this._checkMetrics.bind(this)
}

/**
* Get current number of open connections.
*/
get size () {
return Array.from(this.connections.values())
.reduce((accumulator, value) => accumulator + value.length, 0)
}

/**
* Starts the Connection Manager. If Metrics are not enabled on libp2p
* only event loop and connection limits will be monitored.
*/
start () {
if (this._metrics) {
if (this._libp2p.metrics) {
this._timer = this._timer || retimer(this._checkMetrics, this._options.pollInterval)
}

Expand All @@ -77,13 +107,33 @@ class ConnectionManager {

/**
* Stops the Connection Manager
* @async
*/
stop () {
async stop () {
this._timer && this._timer.clear()
this._latencyMonitor && this._latencyMonitor.removeListener('data', this._onLatencyMeasure)

await this._close()
debug('stopped')
}

/**
* Cleans up the connections
* @async
*/
async _close () {
// Close all connections we're tracking
const tasks = []
for (const connectionList of this.connections.values()) {
for (const connection of connectionList) {
tasks.push(connection.close())
}
}

await tasks
this.connections.clear()
}

/**
* Sets the value of the given peer. Peers with lower values
* will be disconnected first.
Expand All @@ -106,7 +156,7 @@ class ConnectionManager {
* @private
*/
_checkMetrics () {
const movingAverages = this._metrics.global.movingAverages
const movingAverages = this._libp2p.metrics.global.movingAverages
const received = movingAverages.dataReceived[this._options.movingAverageInterval].movingAverage()
this._checkLimit('maxReceivedData', received)
const sent = movingAverages.dataSent[this._options.movingAverageInterval].movingAverage()
Expand All @@ -122,21 +172,63 @@ class ConnectionManager {
* @param {Connection} connection
*/
onConnect (connection) {
if (!Connection.isConnection(connection)) {
throw errcode(new Error('conn must be an instance of interface-connection'), ERR_INVALID_PARAMETERS)
}

const peerId = connection.remotePeer.toB58String()
this._connections.set(connection.id, connection)
const storedConn = this.connections.get(peerId)

if (storedConn) {
storedConn.push(connection)
} else {
this.connections.set(peerId, [connection])
this.emit('peer:connect', connection)
}

if (!this._peerValues.has(peerId)) {
this._peerValues.set(peerId, this._options.defaultPeerValue)
}
this._checkLimit('maxConnections', this._connections.size)

this._checkLimit('maxConnections', this.size)
}

/**
* Removes the connection from tracking
* @param {Connection} connection
*/
onDisconnect (connection) {
this._connections.delete(connection.id)
this._peerValues.delete(connection.remotePeer.toB58String())
const peerId = connection.remotePeer.toB58String()
let storedConn = this.connections.get(peerId)

if (storedConn && storedConn.length > 1) {
storedConn = storedConn.filter((conn) => conn.id !== connection.id)
this.connections.set(peerId, storedConn)
} else if (storedConn) {
this.connections.delete(peerId)
this._peerValues.delete(connection.remotePeer.toB58String())
this.emit('peer:disconnect', connection)
}
}

/**
* Get a connection with a peer.
* @param {PeerId} peerId
* @returns {Connection}
*/
get (peerId) {
if (!PeerId.isPeerId(peerId)) {
throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS)
}

const id = peerId.toB58String()
const connections = this.connections.get(id)

// Return the first, open connection
if (connections) {
return connections.find(connection => connection.stat.status === 'open')
}
return null
}

/**
Expand Down Expand Up @@ -169,17 +261,17 @@ class ConnectionManager {
* @private
*/
_maybeDisconnectOne () {
if (this._options.minConnections < this._connections.size) {
if (this._options.minConnections < this.connections.size) {
const peerValues = Array.from(this._peerValues).sort(byPeerValue)
debug('%s: sorted peer values: %j', this._peerId, peerValues)
const disconnectPeer = peerValues[0]
if (disconnectPeer) {
const peerId = disconnectPeer[0]
debug('%s: lowest value peer is %s', this._peerId, peerId)
debug('%s: closing a connection to %j', this._peerId, peerId)
for (const connection of this._connections.values()) {
if (connection.remotePeer.toB58String() === peerId) {
connection.close()
for (const connections of this.connections.values()) {
if (connections[0].remotePeer.toB58String() === peerId) {
connections[0].close()
break
}
}
Expand Down
Loading

0 comments on commit 49bfb49

Please sign in to comment.