Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
feat!: add libp2p events (#373)
Browse files Browse the repository at this point in the history
Removes events from some internal components and adds them to the event bus.

Decouples internal components from each other since they can listen to the bus for events rather than having to know about the component that dispatches them.

Refs: libp2p/js-libp2p#1693
  • Loading branch information
achingbrain authored Apr 21, 2023
1 parent 1e03f2a commit 071c718
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 96 deletions.
10 changes: 1 addition & 9 deletions packages/interface-address-manager/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
import type { Multiaddr } from '@multiformats/multiaddr'
import type { EventEmitter } from '@libp2p/interfaces/events'

export interface AddressManagerEvents {
/**
* Emitted when the current node's addresses change
*/
'change:addresses': CustomEvent
}

export interface AddressManager extends EventEmitter<AddressManagerEvents> {
export interface AddressManager {
/**
* Get peer listen multiaddrs
*/
Expand Down
51 changes: 1 addition & 50 deletions packages/interface-connection-manager/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { AbortOptions } from '@libp2p/interfaces'
import type { EventEmitter } from '@libp2p/interfaces/events'
import type { Connection, MultiaddrConnection } from '@libp2p/interface-connection'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Multiaddr } from '@multiformats/multiaddr'
Expand All @@ -14,55 +13,7 @@ export interface PendingDial {
multiaddrs: Multiaddr[]
}

export interface ConnectionManagerEvents {
/**
* This event will be triggered any time a new Connection is established to another peer.
*
* @example
*
* ```js
* libp2p.connectionManager.addEventListener('peer:connect', (event) => {
* const connection = event.detail
* // ...
* })
* ```
*/
'peer:connect': CustomEvent<Connection>

/**
* This event will be triggered any time we are disconnected from another peer, regardless of
* the circumstances of that disconnection. If we happen to have multiple connections to a
* peer, this event will **only** be triggered when the last connection is closed.
*
* @example
*
* ```js
* libp2p.connectionManager.addEventListener('peer:disconnect', (event) => {
* const connection = event.detail
* // ...
* })
* ```
*/
'peer:disconnect': CustomEvent<Connection>

/**
* This event will be triggered when the connection manager has more connections than the
* configured limit. The event detail contains the list of PeerIds from the connections
* that were closed to bring the node back under the max connections limit.
*
* @example
*
* ```js
* libp2p.connectionManager.addEventListener('peer:prune', (event) => {
* const connection = event.detail
* // ...
* })
* ```
*/
'peer:prune': CustomEvent<PeerId[]>
}

export interface ConnectionManager extends EventEmitter<ConnectionManagerEvents> {
export interface ConnectionManager {
/**
* Return connections, optionally filtering by a PeerId
*
Expand Down
1 change: 1 addition & 0 deletions packages/interface-libp2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
"@libp2p/interface-peer-store": "^2.0.0",
"@libp2p/interface-pubsub": "^4.0.0",
"@libp2p/interface-registrar": "^2.0.0",
"@libp2p/interface-transport": "^2.1.3",
"@libp2p/interfaces": "^3.0.0",
"@multiformats/multiaddr": "^12.0.0"
},
Expand Down
91 changes: 85 additions & 6 deletions packages/interface-libp2p/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import type { EventEmitter } from '@libp2p/interfaces/events'
import type { Startable } from '@libp2p/interfaces/startable'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { DualDHT } from '@libp2p/interface-dht'
import type { PeerStore } from '@libp2p/interface-peer-store'
import type { Address, Peer, PeerStore } from '@libp2p/interface-peer-store'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Connection, Stream } from '@libp2p/interface-connection'
import type { PeerRouting } from '@libp2p/interface-peer-routing'
Expand All @@ -29,12 +29,34 @@ import type { StreamHandler, StreamHandlerOptions, Topology } from '@libp2p/inte
import type { Metrics } from '@libp2p/interface-metrics'
import type { PeerInfo } from '@libp2p/interface-peer-info'
import type { KeyChain } from '@libp2p/interface-keychain'
import type { Listener } from '@libp2p/interface-transport'

/**
* Once you have a libp2p instance, you can listen to several events it emits, so that you can be notified of relevant network events.
* Used by the connection manager to sort addresses into order before dialling
*/
export interface AddressSorter {
(a: Address, b: Address): -1 | 0 | 1
}

/**
* Event detail emitted when peer data changes
*/
export interface PeerUpdate {
peer: Peer
previous?: Peer
}

/**
* Once you have a libp2p instance, you can listen to several events it emits,
* so that you can be notified of relevant network events.
*
* Event names are `noun:adjective` so the first part is the name of the object
* being acted on and the second is the action.
*/
export interface Libp2pEvents {
/**
* This event is dispatched when a new network peer is discovered.
*
* @example
*
* ```js
Expand All @@ -47,18 +69,18 @@ export interface Libp2pEvents {
'peer:discovery': CustomEvent<PeerInfo>

/**
* This event will be triggered anytime a new Connection is established to another peer.
* This event will be triggered anytime a new peer connects.
*
* @example
*
* ```js
* libp2p.connectionManager.addEventListener('peer:connect', (event) => {
* const connection = event.detail
* const peerId = event.detail
* // ...
* })
* ```
*/
'peer:connect': CustomEvent<Connection>
'peer:connect': CustomEvent<PeerId>

/**
* This event will be triggered anytime we are disconnected from another peer, regardless of
Expand All @@ -74,7 +96,64 @@ export interface Libp2pEvents {
* })
* ```
*/
'peer:disconnect': CustomEvent<Connection>
'peer:disconnect': CustomEvent<PeerId>

/**
* This event is dispatched when the peer store data for a peer has been
* updated - e.g. their multiaddrs, protocols etc have changed.
*
* If they were previously known to this node, the old peer data will be
* set in the `previous` field.
*
* This may be in response to the identify protocol running, a manual
* update or some other event.
*/
'peer:update': CustomEvent<PeerUpdate>

/**
* This event is dispatched when the current node's peer record changes -
* for example a transport started listening on a new address or a new
* protocol handler was registered.
*
* @example
*
* ```js
* libp2p.addEventListener('self:peer:update', (event) => {
* const { peer } = event.detail
* // ...
* })
* ```
*/
'self:peer:update': CustomEvent<PeerUpdate>

/**
* This event is dispatched when a transport begins listening on a new address
*/
'transport:listening': CustomEvent<Listener>

/**
* This event is dispatched when a transport stops listening on an address
*/
'transport:close': CustomEvent<Listener>

/**
* This event is dispatched when the connection manager has more than the
* configured allowable max connections and has closed some connections to
* bring the node back under the limit.
*/
'connection:prune': CustomEvent<Connection[]>

/**
* This event notifies listeners when new incoming or outgoing connections
* are opened.
*/
'connection:open': CustomEvent<Connection>

/**
* This event notifies listeners when incoming or outgoing connections are
* closed.
*/
'connection:close': CustomEvent<Connection>
}

/**
Expand Down
1 change: 1 addition & 0 deletions packages/interface-mocks/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
"@libp2p/interface-connection-encrypter": "^4.0.0",
"@libp2p/interface-connection-gater": "^3.0.0",
"@libp2p/interface-connection-manager": "^2.0.0",
"@libp2p/interface-libp2p": "^1.0.0",
"@libp2p/interface-metrics": "^4.0.0",
"@libp2p/interface-peer-discovery": "^1.0.0",
"@libp2p/interface-peer-id": "^2.0.0",
Expand Down
15 changes: 8 additions & 7 deletions packages/interface-mocks/src/connection-manager.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import { EventEmitter } from '@libp2p/interfaces/events'
import type { EventEmitter } from '@libp2p/interfaces/events'
import type { Startable } from '@libp2p/interfaces/startable'
import type { Connection } from '@libp2p/interface-connection'
import { isPeerId, PeerId } from '@libp2p/interface-peer-id'
import type { ConnectionManager, ConnectionManagerEvents, PendingDial } from '@libp2p/interface-connection-manager'
import type { ConnectionManager, PendingDial } from '@libp2p/interface-connection-manager'
import { connectionPair } from './connection.js'
import { CodeError } from '@libp2p/interfaces/errors'
import type { Registrar } from '@libp2p/interface-registrar'
import type { PubSub } from '@libp2p/interface-pubsub'
import { isMultiaddr, Multiaddr } from '@multiformats/multiaddr'
import { peerIdFromString } from '@libp2p/peer-id'
import { PeerMap } from '@libp2p/peer-collections'
import type { Libp2pEvents } from '@libp2p/interface-libp2p'

export interface MockNetworkComponents {
peerId: PeerId
registrar: Registrar
connectionManager: ConnectionManager
pubsub?: PubSub
events: EventEmitter<Libp2pEvents>
}

class MockNetwork {
Expand Down Expand Up @@ -49,16 +51,15 @@ export const mockNetwork = new MockNetwork()
export interface MockConnectionManagerComponents {
peerId: PeerId
registrar: Registrar
events: EventEmitter<Libp2pEvents>
}

class MockConnectionManager extends EventEmitter<ConnectionManagerEvents> implements ConnectionManager, Startable {
class MockConnectionManager implements ConnectionManager, Startable {
private connections: Connection[] = []
private readonly components: MockConnectionManagerComponents
private started = false

constructor (components: MockConnectionManagerComponents) {
super()

this.components = components
}

Expand Down Expand Up @@ -129,7 +130,7 @@ class MockConnectionManager extends EventEmitter<ConnectionManagerEvents> implem
this.connections.push(aToB)
;(componentsB.connectionManager as MockConnectionManager).connections.push(bToA)

this.safeDispatchEvent<Connection>('peer:connect', {
this.components.events.safeDispatchEvent('connection:open', {
detail: aToB
})

Expand All @@ -139,7 +140,7 @@ class MockConnectionManager extends EventEmitter<ConnectionManagerEvents> implem
}
}

componentsB.connectionManager.safeDispatchEvent<Connection>('peer:connect', {
componentsB.events.safeDispatchEvent('connection:open', {
detail: bToA
})

Expand Down
20 changes: 11 additions & 9 deletions packages/interface-mocks/src/upgrader.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
import { mockConnection } from './connection.js'
import type { Upgrader, UpgraderEvents, UpgraderOptions } from '@libp2p/interface-transport'
import type { Upgrader, UpgraderOptions } from '@libp2p/interface-transport'
import type { Connection, MultiaddrConnection } from '@libp2p/interface-connection'
import type { Registrar } from '@libp2p/interface-registrar'
import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
import type { EventEmitter } from '@libp2p/interfaces/events'
import type { Libp2pEvents } from '@libp2p/interface-libp2p'

export interface MockUpgraderInit {
registrar?: Registrar
events: EventEmitter<Libp2pEvents>
}

class MockUpgrader extends EventEmitter<UpgraderEvents> implements Upgrader {
class MockUpgrader implements Upgrader {
private readonly registrar?: Registrar
private readonly events: EventEmitter<Libp2pEvents>

constructor (init: MockUpgraderInit = {}) {
super()

constructor (init: MockUpgraderInit) {
this.registrar = init.registrar
this.events = init.events
}

async upgradeOutbound (multiaddrConnection: MultiaddrConnection, opts: UpgraderOptions = {}): Promise<Connection> {
Expand All @@ -24,7 +26,7 @@ class MockUpgrader extends EventEmitter<UpgraderEvents> implements Upgrader {
...opts
})

this.dispatchEvent(new CustomEvent<Connection>('connection', { detail: connection }))
this.events.safeDispatchEvent('connection:open', { detail: connection })

return connection
}
Expand All @@ -36,12 +38,12 @@ class MockUpgrader extends EventEmitter<UpgraderEvents> implements Upgrader {
...opts
})

this.dispatchEvent(new CustomEvent<Connection>('connection', { detail: connection }))
this.events.safeDispatchEvent('connection:open', { detail: connection })

return connection
}
}

export function mockUpgrader (init: MockUpgraderInit = {}): Upgrader {
export function mockUpgrader (init: MockUpgraderInit): Upgrader {
return new MockUpgrader(init)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type { Transport, Listener, Upgrader } from '@libp2p/interface-transport'
import type { TransportTestFixtures, Connector } from './index.js'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Registrar } from '@libp2p/interface-registrar'
import { EventEmitter } from '@libp2p/interfaces/events'

export default (common: TestSetup<TransportTestFixtures>): void => {
describe('dial', () => {
Expand All @@ -25,7 +26,8 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
before(async () => {
registrar = mockRegistrar()
upgrader = mockUpgrader({
registrar
registrar,
events: new EventEmitter()
});

({ addrs, transport, connector } = await common.setup())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { isValidTick } from '@libp2p/interface-compliance-tests/is-valid-tick'
import { mockUpgrader, mockRegistrar } from '@libp2p/interface-mocks'
import defer from 'p-defer'
import drain from 'it-drain'
import { CustomEvent } from '@libp2p/interfaces/events'
import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
import type { TestSetup } from '@libp2p/interface-compliance-tests'
import type { Transport, Upgrader } from '@libp2p/interface-transport'
import type { TransportTestFixtures } from './index.js'
Expand All @@ -26,7 +26,8 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
before(async () => {
registrar = mockRegistrar()
upgrader = mockUpgrader({
registrar
registrar,
events: new EventEmitter()
});

({ transport, addrs } = await common.setup())
Expand Down
Loading

0 comments on commit 071c718

Please sign in to comment.