Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

Make switch a state machine #278

Merged
merged 27 commits into from
Oct 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c47ab13
feat: add basic state machine functionality to switch
jacobheun Sep 5, 2018
a97c386
fix: linting
jacobheun Sep 5, 2018
b4d1602
refactor: move connection.js to connection-manager.js
jacobheun Sep 5, 2018
41e4a88
feat: add outgoing connection state machine
jacobheun Sep 11, 2018
5848e6d
feat: functioning incoming connection fsm
jacobheun Sep 25, 2018
45ae65b
fix: linting
jacobheun Sep 25, 2018
78203fd
fix: stats
jacobheun Sep 26, 2018
0b9f5f1
docs: remove notes
jacobheun Sep 26, 2018
ef92ee2
test: bump circuit shutdown timeout
jacobheun Sep 26, 2018
4b62917
fix: node 8 support
jacobheun Sep 26, 2018
7ab2151
feat: add class-is support for connections
jacobheun Sep 26, 2018
674d55c
refactor: clean up some logic and make inc muxed conns FSMs
jacobheun Sep 27, 2018
f4a1806
fix: cleanup todos, logic and event handlers
jacobheun Sep 27, 2018
dd12ad1
refactor: clean up logs
jacobheun Oct 2, 2018
56ea400
feat: add dialFSM to the switch
jacobheun Oct 3, 2018
2e1f7b5
refactor: rename test file
jacobheun Oct 3, 2018
75f1370
feat: add better support for closing connections
jacobheun Oct 3, 2018
0ee8157
test: add tests for some uncovered lines
jacobheun Oct 3, 2018
d921c2d
refactor: do some cleanup
jacobheun Oct 4, 2018
6351365
feat: add additional fsm user support
jacobheun Oct 5, 2018
3d469d0
feat: add warning emitter for muxer upgrade failed
jacobheun Oct 5, 2018
662a58f
refactor: cleanup and add some tests
jacobheun Oct 5, 2018
730c2b6
test: add test for failed muxer upgrade
jacobheun Oct 5, 2018
6dd8f39
test: add more error state tests for connectionfsm
jacobheun Oct 5, 2018
a2e995e
docs: update readme
jacobheun Oct 17, 2018
ef85443
docs: fix readme link
jacobheun Oct 17, 2018
8ff0375
docs: clean up readme and jsdocs
jacobheun Oct 19, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 87 additions & 55 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ libp2p-switch is used by [libp2p](https://github.com/libp2p/js-libp2p) but it ca
- [Usage](#usage)
- [Create a libp2p switch](#create-a-libp2p-switch)
- [API](#api)
- [`switch.dial(peer, protocol, callback)`](#swarmdialpi-protocol-callback)
- [`switch.hangUp(peer, callback)`](#swarmhanguppi-callback)
- [`switch.handle(protocol, handler)`](#swarmhandleprotocol-handler)
- [`switch.unhandle(protocol)`](#swarmunhandleprotocol)
- [`switch.start(callback)`](#swarmlistencallback)
- [`switch.stop(callback)`](#swarmclosecallback)
- [`switch.connection`](#connection)
- [`switch.connection`](#switchconnection)
- [`switch.dial(peer, protocol, callback)`](#switchdialpeer-protocol-callback)
- [`switch.dialFSM(peer, protocol, callback)`](#switchdialfsmpeer-protocol-callback)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better just mimic js-libp2p and do dial and dialProtocol

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@diasdavid are you suggesting to add dialProtocol in addition to the new dialFSM or as a naming replacement? I think having dial handle calls with and without a protocol is reasonable overloading. Replacing it wouldn't be good as it has a different callback footprint to give users access to the ConnectionFSM for finer control/handling.

I've exposed the same dialFSM method in the libp2p branch

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to block this PR for this. What I mean is that given that libp2p (the module) is just an extension to the libp2p-switch (the dialing machine) that adds it other features, it would be cool if they both expose the same dialing/hangup APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Ideally I'd like to deprecate dialProtocol in libp2p and just use dial. Having both feel unnecessary. I'll propose the change in an issue there and if dialProtocol ends up staying we can add that in here.

- [`switch.handle(protocol, handlerFunc, matchFunc)`](#switchhandleprotocol-handlerfunc-matchfunc)
- [`switch.hangUp(peer, callback)`](#switchhanguppeer-callback)
- [`switch.start(callback)`](#switchstartcallback)
- [`switch.stop(callback)`](#switchstopcallback)
- [`switch.stats`](#stats-api)
- [Internal Transports API](#transports)
- [Design Notes](#designnotes)
- [`switch.unhandle(protocol)`](#switchunhandleprotocol)
- [Internal Transports API](#internal-transports-api)
- [Design Notes](#design-notes)
- [Multitransport](#multitransport)
- [Connection upgrades](#connection-upgrades)
- [Identify](#identify)
Expand Down Expand Up @@ -86,6 +87,46 @@ tests]([./test/pnet.node.js]).
- peerInfo is a [PeerInfo](https://github.com/libp2p/js-peer-info) object that has the peer information.
- peerBook is a [PeerBook](https://github.com/libp2p/js-peer-book) object that stores all the known peers.

### `switch.connection`

##### `switch.connection.addUpgrade()`

A connection upgrade must be able to receive and return something that implements the [interface-connection](https://github.com/libp2p/interface-connection) specification.

> **WIP**

##### `switch.connection.addStreamMuxer(muxer)`

Upgrading a connection to use a stream muxer is still considered an upgrade, but a special case since once this connection is applied, the returned obj will implement the [interface-stream-muxer](https://github.com/libp2p/interface-stream-muxer) spec.

- `muxer`

##### `switch.connection.reuse()`

Enable the identify protocol.

##### `switch.connection.crypto([tag, encrypt])`

Enable a specified crypto protocol. By default no encryption is used, aka `plaintext`. If called with no arguments it resets to use `plaintext`.

You can use for example [libp2p-secio](https://github.com/libp2p/js-libp2p-secio) like this

```js
const secio = require('libp2p-secio')
switch.connection.crypto(secio.tag, secio.encrypt)
```

##### `switch.connection.enableCircuitRelay(options, callback)`

Enable circuit relaying.

- `options`
- enabled - activates relay dialing and listening functionality
- hop - an object with two properties
- enabled - enables circuit relaying
- active - is it an active or passive relay (default false)
- `callback`

### `switch.dial(peer, protocol, callback)`

dial uses the best transport (whatever works first, in the future we can have some criteria), and jump starts the connection until the point where we have to negotiate the protocol. If a muxer is available, then drop the muxer onto that connection. Good to warm up connections or to check for connectivity. If we have already a muxer for that peerInfo, then do nothing.
Expand All @@ -94,13 +135,24 @@ dial uses the best transport (whatever works first, in the future we can have so
- `protocol`
- `callback`

### `switch.hangUp(peer, callback)`
### `switch.dialFSM(peer, protocol, callback)`

Hang up the muxed connection we have with the peer.
works like dial, but calls back with a [Connection State Machine](#connection-state-machine)

- `peer`: can be an instance of [PeerInfo][], [PeerId][] or [multiaddr][]
- `callback`
- `protocol`: String that defines the protocol (e.g '/ipfs/bitswap/1.1.0') to be used
- `callback`: Function with signature `function (err, connFSM) {}` where `connFSM` is a [Connection State Machine](#connection-state-machine)

#### Connection State Machine
Connection state machines emit a number of events that can be used to determine the current state of the connection
and to received the underlying connection that can be used to transfer data.

##### Events
- `error`: emitted whenever a fatal error occurs with the connection; the error will be emitted.
- `error:upgrade_failed`: emitted whenever the connection fails to upgrade with a muxer, this is not fatal.
- `error:connection_attempt_failed`: emitted whenever a dial attempt fails for a given transport. An array of errors is emitted.
- `connection`: emitted whenever a useable connection has been established; the underlying [Connection](https://github.com/libp2p/interface-connection) will be emitted.
- `close`: emitted when the connection has closed.

### `switch.handle(protocol, handlerFunc, matchFunc)`

Expand All @@ -110,68 +162,43 @@ Handle a new protocol.
- `handlerFunc` - function called when we receive a dial on `protocol. Signature must be `function (protocol, conn) {}`
- `matchFunc` - matchFunc for multistream-select

### `switch.unhandle(protocol)`

Unhandle a protocol.

- `protocol`

### `switch.on('peer-mux-established', (peer) => {})`

- `peer`: is instance of [PeerInfo][] that has info of the peer we have just established a muxed connection with.

### `switch.on('peer-mux-closed', (peer) => {})`

- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a muxed connection.

### `switch.start(callback)`

Start listening on all added transports that are available on the current `peerInfo`.

### `switch.stop(callback)`
### `switch.hangUp(peer, callback)`

Close all the listeners and muxers.
Hang up the muxed connection we have with the peer.

- `peer`: can be an instance of [PeerInfo][], [PeerId][] or [multiaddr][]
- `callback`

### `switch.connection`
### `switch.on('error', (err) => {})`

##### `switch.connection.addUpgrade()`
Emitted when the switch encounters an error.

A connection upgrade must be able to receive and return something that implements the [interface-connection](https://github.com/libp2p/interface-connection) specification.
- `err`: instance of [Error][]

> **WIP**
### `switch.on('peer-mux-established', (peer) => {})`

##### `switch.connection.addStreamMuxer(muxer)`
- `peer`: is instance of [PeerInfo][] that has info of the peer we have just established a muxed connection with.

Upgrading a connection to use a stream muxer is still considered an upgrade, but a special case since once this connection is applied, the returned obj will implement the [interface-stream-muxer](https://github.com/libp2p/interface-stream-muxer) spec.
### `switch.on('peer-mux-closed', (peer) => {})`

- `muxer`
- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a muxed connection.

##### `switch.connection.reuse()`
### `switch.on('start', () => {})`

Enable the identify protocol.
Emitted when the switch has successfully started.

##### `switch.connection.crypto([tag, encrypt])`
### `switch.on('stop', () => {})`

Enable a specified crypto protocol. By default no encryption is used, aka `plaintext`. If called with no arguments it resets to use `plaintext`.
Emitted when the switch has successfully stopped.

You can use for example [libp2p-secio](https://github.com/libp2p/js-libp2p-secio) like this
### `switch.start(callback)`

```js
const secio = require('libp2p-secio')
switch.connection.crypto(secio.tag, secio.encrypt)
```
Start listening on all added transports that are available on the current `peerInfo`.

##### `switch.connection.enableCircuitRelay(options, callback)`
### `switch.stop(callback)`

Enable circuit relaying.
Close all the listeners and muxers.

- `options`
- enabled - activates relay dialing and listening functionality
- hop - an object with two properties
- enabled - enables circuit relaying
- active - is it an active or passive relay (default false)
- `callback`

### Stats API
Expand Down Expand Up @@ -278,6 +305,11 @@ Each one of these values is [an exponential moving-average instance](https://git

Stats are not updated in real-time. Instead, measurements are buffered and stats are updated at an interval. The maximum interval can be defined through the `Switch` constructor option `stats.computeThrottleTimeout`, defined in miliseconds.

### `switch.unhandle(protocol)`

Unhandle a protocol.

- `protocol`

### Internal Transports API

Expand Down
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"devDependencies": {
"aegir": "^15.1.0",
"chai": "^4.1.2",
"chai-checkmark": "^1.0.1",
"dirty-chai": "^2.0.1",
"libp2p-mplex": "~0.8.2",
"libp2p-pnet": "~0.1.0",
Expand All @@ -54,7 +55,10 @@
"dependencies": {
"async": "^2.6.1",
"big.js": "^5.1.2",
"class-is": "^1.1.0",
"debug": "^3.1.0",
"err-code": "^1.1.2",
"fsm-event": "^2.1.0",
"hashlru": "^2.2.1",
"interface-connection": "~0.3.2",
"ip-address": "^5.8.9",
Expand Down
103 changes: 103 additions & 0 deletions src/connection/base.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
'use strict'

const EventEmitter = require('events').EventEmitter
const debug = require('debug')
const withIs = require('class-is')

class BaseConnection extends EventEmitter {
constructor ({ _switch, name }) {
super()

this.switch = _switch
this.ourPeerInfo = this.switch._peerInfo
this.log = debug(`libp2p:conn:${name}`)
}

/**
* Gets the current state of the connection
*
* @returns {string} The current state of the connection
*/
getState () {
return this._state._state
}

/**
* Puts the state into encrypting mode
*
* @returns {void}
*/
encrypt () {
this._state('encrypt')
}

/**
* Puts the state into privatizing mode
*
* @returns {void}
*/
protect () {
this._state('privatize')
}

/**
* Puts the state into muxing mode
*
* @returns {void}
*/
upgrade () {
this._state('upgrade')
}

/**
* Event handler for disconnected.
*
* @fires BaseConnection#close
* @returns {void}
*/
_onDisconnected () {
this.log(`disconnected from ${this.theirB58Id}`)
this.emit('close')
this.removeAllListeners()
}

/**
* Event handler for privatized
*
* @fires BaseConnection#private
* @returns {void}
*/
_onPrivatized () {
this.log(`successfully privatized incoming connection`)
this.emit('private', this.conn)
}

/**
* Wraps this.conn with the Switch.protector for private connections
*
* @private
* @fires ConnectionFSM#error
* @returns {void}
*/
_onPrivatizing () {
if (!this.switch.protector) {
return this._state('done')
}

this.conn = this.switch.protector.protect(this.conn, (err) => {
if (err) {
this.emit('error', err)
return this._state('disconnect')
}

this.log(`successfully privatized conn to ${this.theirB58Id}`)
this.conn.setPeerInfo(this.theirPeerInfo)
this._state('done')
})
}
}

module.exports = withIs(BaseConnection, {
className: 'BaseConnection',
symbolName: 'libp2p-switch/BaseConnection'
})
46 changes: 46 additions & 0 deletions src/connection/handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
'use strict'

const debug = require('debug')
const IncomingConnection = require('./incoming')
const observeConn = require('../observe-connection')

function listener (_switch) {
const log = debug(`libp2p:switch:listener`)

/**
* Takes a transport key and returns a connection handler function
*
* @param {string} transportKey The key of the transport to handle connections for
* @param {function} handler A custom handler to use
* @returns {function(Connection)} A connection handler function
*/
return (transportKey, handler) => {
/**
* Takes a base connection and manages listening behavior
*
* @param {Connection} conn The connection to manage
* @returns {void}
*/
return (conn) => {
// Add a transport level observer, if needed
const connection = transportKey ? observeConn(transportKey, null, conn, _switch.observer) : conn

log('received incoming connection')
const connFSM = new IncomingConnection({ connection, _switch, transportKey })

connFSM.once('error', (err) => log(err))
connFSM.once('private', (_conn) => {
// Use the custom handler, if it was provided
if (handler) {
return handler(_conn)
}
connFSM.encrypt()
})
connFSM.once('encrypted', () => connFSM.upgrade())

connFSM.protect()
}
}
}

module.exports = listener
Loading