From 79bff2c158331a1b2c89ac1bbd8135c6aa9ae6b1 Mon Sep 17 00:00:00 2001 From: chad Date: Mon, 5 Dec 2022 15:14:49 -0500 Subject: [PATCH] feat: Added the configuration capabilities on connection manager to seperate incoming and outgoing connection limits (#1508) --- src/config.ts | 3 +- src/connection-manager/auto-dialler.ts | 4 +- src/connection-manager/index.ts | 139 +++++++++++++++---------- 3 files changed, 87 insertions(+), 59 deletions(-) diff --git a/src/config.ts b/src/config.ts index 91e06bbb97..b98c679546 100644 --- a/src/config.ts +++ b/src/config.ts @@ -20,7 +20,8 @@ const DefaultConfig: Partial = { announceFilter: (multiaddrs: Multiaddr[]) => multiaddrs }, connectionManager: { - maxConnections: 300, + maxIncomingConnections: 300, + maxOutgoingConnections: 300, minConnections: 50, autoDial: true, autoDialInterval: 10000, diff --git a/src/connection-manager/auto-dialler.ts b/src/connection-manager/auto-dialler.ts index 2b26b00ad2..136ba73a6f 100644 --- a/src/connection-manager/auto-dialler.ts +++ b/src/connection-manager/auto-dialler.ts @@ -20,7 +20,7 @@ export interface AutoDiallerInit { enabled?: boolean /** - * The minimum number of connections to avoid pruning + * The minimum number of incoming connections to avoid pruning */ minConnections?: number @@ -107,7 +107,7 @@ export class AutoDialler implements Startable { this.autoDialTimeout.clear() } - const minConnections = this.options.minConnections + const { minConnections } = this.options // Already has enough connections if (this.components.connectionManager.getConnections().length >= minConnections) { diff --git a/src/connection-manager/index.ts b/src/connection-manager/index.ts index f3d96f37de..4a467ca1ff 100644 --- a/src/connection-manager/index.ts +++ b/src/connection-manager/index.ts @@ -24,7 +24,8 @@ import { getPeer } from '../get-peer.js' const log = logger('libp2p:connection-manager') const defaultOptions: Partial = { - maxConnections: Infinity, + maxIncomingConnections: Infinity, + maxOutgoingConnections: Infinity, minConnections: 0, maxData: Infinity, maxSentData: Infinity, @@ -41,9 +42,14 @@ const STARTUP_RECONNECT_TIMEOUT = 60000 export interface ConnectionManagerInit { /** - * The maximum number of connections to keep open + * The maximum number of incoming connections to keep open */ - maxConnections: number + maxIncomingConnections: number + + /** + * The maximum number of outgoing connections to keep open + */ + maxOutgoingConnections: number /** * The minimum number of connections to keep open @@ -136,7 +142,7 @@ export interface ConnectionManagerInit { /** * A list of multiaddrs that will always be allowed (except if they are in the - * deny list) to open connections to this node even if we've reached maxConnections + * deny list) to open connections to this node even if we've reached maxIncomingConnections */ allow?: string[] @@ -157,6 +163,12 @@ export interface ConnectionManagerInit { * complete the connection upgrade - e.g. choosing connection encryption, muxer, etc */ maxIncomingPendingConnections?: number + + /** + * The maximum number of parallel outgoing connections allowed that have yet to + * complete the connection upgrade - e.g. choosing connection encryption, muxer, etc + */ + maxOutgoingPendingConnections?: number } export interface ConnectionManagerEvents { @@ -176,26 +188,36 @@ export interface DefaultConnectionManagerComponents { * Responsible for managing known connections. */ export class DefaultConnectionManager extends EventEmitter implements ConnectionManager, Startable { - private readonly components: DefaultConnectionManagerComponents - private readonly opts: Required - private readonly connections: Map - private started: boolean - private readonly latencyMonitor: LatencyMonitor - private readonly startupReconnectTimeout: number - private connectOnStartupController?: TimeoutController - private readonly dialTimeout: number - private readonly allow: Multiaddr[] - private readonly deny: Multiaddr[] - private readonly inboundConnectionRateLimiter: RateLimiterMemory - private incomingPendingConnections: number + private readonly components: DefaultConnectionManagerComponents; + private readonly opts: Required; + private readonly connections: Map; + private started: boolean; + private readonly latencyMonitor: LatencyMonitor; + private readonly startupReconnectTimeout: number; + private connectOnStartupController?: TimeoutController; + private readonly dialTimeout: number; + private readonly allow: Multiaddr[]; + private readonly deny: Multiaddr[]; + private readonly inboundConnectionRateLimiter: RateLimiterMemory; + private incomingPendingConnections: number; constructor (components: DefaultConnectionManagerComponents, init: ConnectionManagerInit) { super() this.opts = mergeOptions.call({ ignoreUndefined: true }, defaultOptions, init) - if (this.opts.maxConnections < this.opts.minConnections) { - throw errCode(new Error('Connection Manager maxConnections must be greater than minConnections'), codes.ERR_INVALID_PARAMETERS) + if (this.opts.maxIncomingConnections < this.opts.minConnections) { + throw errCode( + new Error('Connection Manager maxIncomingConnections must be greater than minConnections'), + codes.ERR_INVALID_PARAMETERS + ) + } + + if (this.opts.maxOutgoingConnections < this.opts.minConnections) { + throw errCode( + new Error('Connection Manager maxOutgoingConnections must be greater than minConnections'), + codes.ERR_INVALID_PARAMETERS + ) } log('options: %o', this.opts) @@ -217,7 +239,7 @@ export class DefaultConnectionManager extends EventEmitter multiaddr(ma)) - this.deny = (init.deny ?? []).map(ma => multiaddr(ma)) + this.allow = (init.allow ?? []).map((ma) => multiaddr(ma)) + this.deny = (init.deny ?? []).map((ma) => multiaddr(ma)) this.inboundConnectionRateLimiter = new RateLimiterMemory({ points: this.opts.inboundConnectionThreshold, @@ -343,7 +365,7 @@ export class DefaultConnectionManager extends EventEmitter tag.name === KEEP_ALIVE).length > 0 + const hasKeepAlive = tags.filter((tag) => tag.name === KEEP_ALIVE).length > 0 if (hasKeepAlive) { keepAlivePeers.push(peer.id) @@ -356,20 +378,19 @@ export class DefaultConnectionManager extends EventEmitter { + keepAlivePeers.map(async (peer) => { await this.openConnection(peer, { signal: this.connectOnStartupController?.signal + }).catch((err) => { + log.error(err) }) - .catch(err => { - log.error(err) - }) }) ) }) - .catch(err => { + .catch((err) => { log.error(err) }) .finally(() => { @@ -404,13 +425,15 @@ export class DefaultConnectionManager extends EventEmitter> = [] for (const connectionList of this.connections.values()) { for (const connection of connectionList) { - tasks.push((async () => { - try { - await connection.close() - } catch (err) { - log.error(err) - } - })()) + tasks.push( + (async () => { + try { + await connection.close() + } catch (err) { + log.error(err) + } + })() + ) } } @@ -420,7 +443,7 @@ export class DefaultConnectionManager extends EventEmitter) { - void this._onConnect(evt).catch(err => { + void this._onConnect(evt).catch((err) => { log.error(err) }) } @@ -452,9 +475,9 @@ export class DefaultConnectionManager extends EventEmitter('peer:connect', { detail: connection })) } @@ -525,7 +548,7 @@ export class DefaultConnectionManager extends EventEmitter { + connections.map(async (connection) => { return await connection.close() }) ) @@ -583,7 +606,7 @@ export class DefaultConnectionManager extends EventEmitter connection.stat.status === STATUS.OPEN) + return connections.filter((connection) => connection.stat.status === STATUS.OPEN) } return [] @@ -595,10 +618,9 @@ export class DefaultConnectionManager extends EventEmitter) { const { detail: summary } = evt - this._checkMaxLimit('maxEventLoopDelay', summary.avgMs, 1) - .catch(err => { - log.error(err) - }) + this._checkMaxLimit('maxEventLoopDelay', summary.avgMs, 1).catch((err) => { + log.error(err) + }) } /** @@ -632,9 +654,12 @@ export class DefaultConnectionManager extends EventEmitter { - return acc + curr.value - }, 0)) + peerValues.set( + remotePeer, + tags.reduce((acc, curr) => { + return acc + curr.value + }, 0) + ) } // sort by value, lowest to highest @@ -667,7 +692,7 @@ export class DefaultConnectionManager extends EventEmitter { + toClose.map(async (connection) => { try { await connection.close() } catch (err) { @@ -675,16 +700,18 @@ export class DefaultConnectionManager extends EventEmitter('connectionEnd', { - detail: connection - })) + this.onDisconnect( + new CustomEvent('connectionEnd', { + detail: connection + }) + ) }) ) } async acceptIncomingConnection (maConn: MultiaddrConnection): Promise { // check deny list - const denyConnection = this.deny.some(ma => { + const denyConnection = this.deny.some((ma) => { return maConn.remoteAddr.toString().startsWith(ma.toString()) }) @@ -694,7 +721,7 @@ export class DefaultConnectionManager extends EventEmitter { + const allowConnection = this.allow.some((ma) => { return maConn.remoteAddr.toString().startsWith(ma.toString()) }) @@ -721,13 +748,13 @@ export class DefaultConnectionManager extends EventEmitter