Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow for alternative connection manager #1594

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions src/circuit/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export interface RelayConfig extends StreamHandlerOptions {
advertise: RelayAdvertiseConfig
hop: HopConfig
autoRelay: AutoRelayConfig
addressSorter?: AddressSorter
}

export interface HopConfig {
Expand All @@ -48,10 +49,6 @@ export interface AutoRelayConfig {
maxListeners: number
}

export interface RelayInit extends RelayConfig {
addressSorter?: AddressSorter
}

export interface RelayComponents {
peerId: PeerId
contentRouting: ContentRouting
Expand All @@ -62,7 +59,7 @@ export interface RelayComponents {

export class Relay implements Startable {
private readonly components: RelayComponents
private readonly init: RelayInit
private readonly init: RelayConfig
// @ts-expect-error this field isn't used anywhere?
private readonly autoRelay?: AutoRelay
private timeout?: any
Expand All @@ -71,7 +68,7 @@ export class Relay implements Startable {
/**
* Creates an instance of Relay
*/
constructor (components: RelayComponents, init: RelayInit) {
constructor (components: RelayComponents, init: RelayConfig) {
this.components = components
// Create autoRelay if enabled
this.autoRelay = init.autoRelay?.enabled !== false
Expand Down
18 changes: 10 additions & 8 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,28 @@ import errCode from 'err-code'
import type { RecursivePartial } from '@libp2p/interfaces'
import { isNode, isBrowser, isWebWorker, isElectronMain, isElectronRenderer, isReactNative } from 'wherearewe'

const DefaultConfig: Partial<Libp2pInit> = {
export const DefaultConfig: Partial<Libp2pInit> = {
addresses: {
listen: [],
announce: [],
noAnnounce: [],
announceFilter: (multiaddrs: Multiaddr[]) => multiaddrs
},
connectionManager: {
connectionManagerConfig: {
maxConnections: 300,
minConnections: 50,
resolvers: {
dnsaddr: dnsaddrResolver
}
},
dialer: {
addressSorter: publicAddressesFirst,
inboundUpgradeTimeout: Constants.INBOUND_UPGRADE_TIMEOUT,
autoDial: true,
autoDialInterval: 10000,
maxParallelDials: Constants.MAX_PARALLEL_DIALS,
maxDialsPerPeer: Constants.MAX_PER_PEER_DIALS,
dialTimeout: Constants.DIAL_TIMEOUT,
inboundUpgradeTimeout: Constants.INBOUND_UPGRADE_TIMEOUT,
resolvers: {
dnsaddr: dnsaddrResolver
},
addressSorter: publicAddressesFirst
dialTimeout: Constants.DIAL_TIMEOUT
},
connectionGater: {},
transportManager: {
Expand Down
39 changes: 28 additions & 11 deletions src/connection-manager/dialer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,37 @@ export interface DialerInit {
addressSorter?: AddressSorter

/**
* Number of max concurrent dials
* If true, try to connect to all discovered peers up to the connection manager limit
*/
maxParallelDials?: number
autoDial?: boolean

/**
* Number of max addresses to dial for a given peer
* How long to wait between attempting to keep our number of concurrent connections
* above minConnections
*/
maxAddrsToDial?: number
autoDialInterval?: number

/**
* How long a dial attempt is allowed to take
*/
dialTimeout?: number

/**
* When a new inbound connection is opened, the upgrade process (e.g. protect,
* encrypt, multiplex etc) must complete within this number of ms.
*/
inboundUpgradeTimeout?: number

/**
* Number of max concurrent dials
*/
maxParallelDials?: number

/**
* Number of max addresses to dial for a given peer
*/
maxAddrsToDial?: number

/**
* Number of max concurrent dials per peer
*/
Expand Down Expand Up @@ -94,13 +111,13 @@ export class DefaultDialer implements Startable, Dialer {
public pendingDialTargets: Map<string, AbortController>
private started: boolean

constructor (components: DefaultDialerComponents, init: DialerInit = {}) {
constructor (components: DefaultDialerComponents, dialerInit?: DialerInit) {
this.started = false
this.addressSorter = init.addressSorter ?? publicAddressesFirst
this.maxAddrsToDial = init.maxAddrsToDial ?? MAX_ADDRS_TO_DIAL
this.timeout = init.dialTimeout ?? DIAL_TIMEOUT
this.maxDialsPerPeer = init.maxDialsPerPeer ?? MAX_PER_PEER_DIALS
this.tokens = [...new Array(init.maxParallelDials ?? MAX_PARALLEL_DIALS)].map((_, index) => index)
this.addressSorter = dialerInit?.addressSorter ?? publicAddressesFirst
this.maxAddrsToDial = dialerInit?.maxAddrsToDial ?? MAX_ADDRS_TO_DIAL
this.timeout = dialerInit?.dialTimeout ?? DIAL_TIMEOUT
this.maxDialsPerPeer = dialerInit?.maxDialsPerPeer ?? MAX_PER_PEER_DIALS
this.tokens = [...new Array(dialerInit?.maxParallelDials ?? MAX_PARALLEL_DIALS)].map((_, index) => index)
this.components = components
this.pendingDials = trackedMap({
name: 'libp2p_dialler_pending_dials',
Expand All @@ -111,7 +128,7 @@ export class DefaultDialer implements Startable, Dialer {
metrics: components.metrics
})

for (const [key, value] of Object.entries(init.resolvers ?? {})) {
for (const [key, value] of Object.entries(dialerInit?.resolvers ?? {})) {
resolvers.set(key, value)
}
}
Expand Down
51 changes: 6 additions & 45 deletions src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { setMaxListeners } from 'events'
import type { Connection, MultiaddrConnection } from '@libp2p/interface-connection'
import type { ConnectionManager, ConnectionManagerEvents, Dialer } from '@libp2p/interface-connection-manager'
import * as STATUS from '@libp2p/interface-connection/status'
import type { AddressSorter, PeerStore } from '@libp2p/interface-peer-store'
import type { PeerStore } from '@libp2p/interface-peer-store'
import { multiaddr, Multiaddr, Resolver } from '@multiformats/multiaddr'
import { PeerMap } from '@libp2p/peer-collections'
import { TimeoutController } from 'timeout-abort-controller'
Expand Down Expand Up @@ -45,47 +45,9 @@ export interface ConnectionManagerConfig {
pollInterval?: number

/**
* If true, try to connect to all discovered peers up to the connection manager limit
* The abort signal to use for timeouts when opening connections to peers
*/
autoDial?: boolean

/**
* How long to wait between attempting to keep our number of concurrent connections
* above minConnections
*/
autoDialInterval: number

/**
* Sort the known addresses of a peer before trying to dial
*/
addressSorter?: AddressSorter

/**
* Number of max concurrent dials
*/
maxParallelDials?: number

/**
* Number of max addresses to dial for a given peer
*/
maxAddrsToDial?: number

/**
* How long a dial attempt is allowed to take, including DNS resolution
* of the multiaddr, opening a socket and upgrading it to a Connection.
*/
dialTimeout?: number

/**
* When a new inbound connection is opened, the upgrade process (e.g. protect,
* encrypt, multiplex etc) must complete within this number of ms.
*/
inboundUpgradeTimeout: number

/**
* Number of max concurrent dials per peer
*/
maxDialsPerPeer?: number
outgoingDialTimeout?: number

/**
* Multiaddr resolvers to use when dialing
Expand Down Expand Up @@ -128,7 +90,6 @@ const defaultOptions: Partial<ConnectionManagerConfig> = {
minConnections: 0,
maxEventLoopDelay: Infinity,
pollInterval: 2000,
autoDialInterval: 10000,
inboundConnectionThreshold: 5,
maxIncomingPendingConnections: 10
}
Expand Down Expand Up @@ -156,7 +117,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
private readonly latencyMonitor?: LatencyMonitor
private readonly startupReconnectTimeout: number
private connectOnStartupController?: TimeoutController
private readonly dialTimeout: number
private readonly outgoingDialTimeout: number
private readonly allow: Multiaddr[]
private readonly deny: Multiaddr[]
private readonly inboundConnectionRateLimiter: RateLimiterMemory
Expand Down Expand Up @@ -198,7 +159,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
this.onDisconnect = this.onDisconnect.bind(this)

this.startupReconnectTimeout = init.startupReconnectTimeout ?? STARTUP_RECONNECT_TIMEOUT
this.dialTimeout = init.dialTimeout ?? 30000
this.outgoingDialTimeout = init.outgoingDialTimeout ?? 30000

this.allow = (init.allow ?? []).map(ma => multiaddr(ma))
this.deny = (init.deny ?? []).map(ma => multiaddr(ma))
Expand Down Expand Up @@ -496,7 +457,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
let timeoutController: TimeoutController | undefined

if (options?.signal == null) {
timeoutController = new TimeoutController(this.dialTimeout)
timeoutController = new TimeoutController(this.outgoingDialTimeout)
options.signal = timeoutController.signal

try {
Expand Down
14 changes: 13 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ import type { KeyChainInit } from '@libp2p/keychain'
import type { NatManagerInit } from './nat-manager.js'
import type { AddressManagerInit } from './address-manager/index.js'
import type { PeerRoutingInit } from './peer-routing.js'
import type { ConnectionManager } from '@libp2p/interface-connection-manager'
import type { ConnectionManagerInit } from './connection-manager/index.js'
import type { DialerInit } from './connection-manager/dialer/index.js'

/**
* For Libp2p configurations and modules details read the [Configuration Document](./CONFIGURATION.md).
Expand All @@ -60,7 +62,12 @@ export interface Libp2pInit {
/**
* libp2p Connection Manager configuration
*/
connectionManager: ConnectionManagerInit
connectionManagerConfig: ConnectionManagerInit

/**
* libp2p dialer configuration
*/
dialer: DialerInit

/**
* A connection gater can deny new connections based on user criteria
Expand Down Expand Up @@ -149,6 +156,11 @@ export interface Libp2pInit {
* A ConnectionProtector can be used to create a secure overlay on top of the network using pre-shared keys
*/
connectionProtector?: (components: Components) => ConnectionProtector

/**
* Pass a custom ConnectionManager implementation to enable custom connection management
*/
connectionManager?: (components: Components) => ConnectionManager
}

/**
Expand Down
18 changes: 9 additions & 9 deletions src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,19 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
this.components.connectionProtector = init.connectionProtector(components)
}

// Create the Connection Manager
// if a connection manager is not provided, create a default one
this.connectionManager = (init.connectionManager != null) ? init.connectionManager(this.components) : this.components.connectionManager = new DefaultConnectionManager(this.components, init.connectionManagerConfig)

// Set up the Upgrader
this.components.upgrader = new DefaultUpgrader(this.components, {
connectionEncryption: (init.connectionEncryption ?? []).map(fn => this.configureComponent(fn(this.components))),
muxers: (init.streamMuxers ?? []).map(fn => this.configureComponent(fn(this.components))),
inboundUpgradeTimeout: init.connectionManager.inboundUpgradeTimeout
inboundUpgradeTimeout: init.dialer.inboundUpgradeTimeout ?? 10000
})

// Create the dialer
this.components.dialer = new DefaultDialer(this.components, init.connectionManager)

// Create the Connection Manager
this.connectionManager = this.components.connectionManager = new DefaultConnectionManager(this.components, init.connectionManager)
this.components.dialer = new DefaultDialer(this.components, init.dialer)

// forward connection manager events
this.components.connectionManager.addEventListener('peer:disconnect', (event) => {
Expand All @@ -156,9 +157,9 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
this.configureComponent(new PeerRecordUpdater(this.components))

this.configureComponent(new AutoDialler(this.components, {
enabled: init.connectionManager.autoDial,
minConnections: init.connectionManager.minConnections,
autoDialInterval: init.connectionManager.autoDialInterval
enabled: init.dialer.autoDial,
minConnections: init.connectionManagerConfig.minConnections,
autoDialInterval: init.dialer.autoDialInterval
}))

// Create keychain
Expand Down Expand Up @@ -230,7 +231,6 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
this.components.transportManager.add(this.configureComponent(new Circuit(this.components, init.relay)))

this.configureComponent(new Relay(this.components, {
addressSorter: init.connectionManager.addressSorter,
...init.relay
}))
}
Expand Down
Loading