Skip to content

Commit

Permalink
feat: Added the configuration capabilities on connection manager to s…
Browse files Browse the repository at this point in the history
…eperate incoming and outgoing connection limits (#1508)
  • Loading branch information
maschad committed Dec 16, 2022
1 parent 24f864f commit 79bff2c
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 59 deletions.
3 changes: 2 additions & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ const DefaultConfig: Partial<Libp2pInit> = {
announceFilter: (multiaddrs: Multiaddr[]) => multiaddrs
},
connectionManager: {
maxConnections: 300,
maxIncomingConnections: 300,
maxOutgoingConnections: 300,
minConnections: 50,
autoDial: true,
autoDialInterval: 10000,
Expand Down
4 changes: 2 additions & 2 deletions src/connection-manager/auto-dialler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export interface AutoDiallerInit {
enabled?: boolean

/**
* The minimum number of connections to avoid pruning
* The minimum number of incoming connections to avoid pruning
*/
minConnections?: number

Expand Down Expand Up @@ -107,7 +107,7 @@ export class AutoDialler implements Startable {
this.autoDialTimeout.clear()
}

const minConnections = this.options.minConnections
const { minConnections } = this.options

// Already has enough connections
if (this.components.connectionManager.getConnections().length >= minConnections) {
Expand Down
139 changes: 83 additions & 56 deletions src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import { getPeer } from '../get-peer.js'
const log = logger('libp2p:connection-manager')

const defaultOptions: Partial<ConnectionManagerInit> = {
maxConnections: Infinity,
maxIncomingConnections: Infinity,
maxOutgoingConnections: Infinity,
minConnections: 0,
maxData: Infinity,
maxSentData: Infinity,
Expand All @@ -41,9 +42,14 @@ const STARTUP_RECONNECT_TIMEOUT = 60000

export interface ConnectionManagerInit {
/**
* The maximum number of connections to keep open
* The maximum number of incoming connections to keep open
*/
maxConnections: number
maxIncomingConnections: number

/**
* The maximum number of outgoing connections to keep open
*/
maxOutgoingConnections: number

/**
* The minimum number of connections to keep open
Expand Down Expand Up @@ -136,7 +142,7 @@ export interface ConnectionManagerInit {

/**
* A list of multiaddrs that will always be allowed (except if they are in the
* deny list) to open connections to this node even if we've reached maxConnections
* deny list) to open connections to this node even if we've reached maxIncomingConnections
*/
allow?: string[]

Expand All @@ -157,6 +163,12 @@ export interface ConnectionManagerInit {
* complete the connection upgrade - e.g. choosing connection encryption, muxer, etc
*/
maxIncomingPendingConnections?: number

/**
* The maximum number of parallel outgoing connections allowed that have yet to
* complete the connection upgrade - e.g. choosing connection encryption, muxer, etc
*/
maxOutgoingPendingConnections?: number
}

export interface ConnectionManagerEvents {
Expand All @@ -176,26 +188,36 @@ export interface DefaultConnectionManagerComponents {
* Responsible for managing known connections.
*/
export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEvents> implements ConnectionManager, Startable {
private readonly components: DefaultConnectionManagerComponents
private readonly opts: Required<ConnectionManagerInit>
private readonly connections: Map<string, Connection[]>
private started: boolean
private readonly latencyMonitor: LatencyMonitor
private readonly startupReconnectTimeout: number
private connectOnStartupController?: TimeoutController
private readonly dialTimeout: number
private readonly allow: Multiaddr[]
private readonly deny: Multiaddr[]
private readonly inboundConnectionRateLimiter: RateLimiterMemory
private incomingPendingConnections: number
private readonly components: DefaultConnectionManagerComponents;
private readonly opts: Required<ConnectionManagerInit>;
private readonly connections: Map<string, Connection[]>;
private started: boolean;
private readonly latencyMonitor: LatencyMonitor;
private readonly startupReconnectTimeout: number;
private connectOnStartupController?: TimeoutController;
private readonly dialTimeout: number;
private readonly allow: Multiaddr[];
private readonly deny: Multiaddr[];
private readonly inboundConnectionRateLimiter: RateLimiterMemory;
private incomingPendingConnections: number;

constructor (components: DefaultConnectionManagerComponents, init: ConnectionManagerInit) {
super()

this.opts = mergeOptions.call({ ignoreUndefined: true }, defaultOptions, init)

if (this.opts.maxConnections < this.opts.minConnections) {
throw errCode(new Error('Connection Manager maxConnections must be greater than minConnections'), codes.ERR_INVALID_PARAMETERS)
if (this.opts.maxIncomingConnections < this.opts.minConnections) {
throw errCode(
new Error('Connection Manager maxIncomingConnections must be greater than minConnections'),
codes.ERR_INVALID_PARAMETERS
)
}

if (this.opts.maxOutgoingConnections < this.opts.minConnections) {
throw errCode(
new Error('Connection Manager maxOutgoingConnections must be greater than minConnections'),
codes.ERR_INVALID_PARAMETERS
)
}

log('options: %o', this.opts)
Expand All @@ -217,16 +239,16 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
try {
// This emitter gets listened to a lot
setMaxListeners?.(Infinity, this)
} catch {}
} catch { }

this.onConnect = this.onConnect.bind(this)
this.onDisconnect = this.onDisconnect.bind(this)

this.startupReconnectTimeout = init.startupReconnectTimeout ?? STARTUP_RECONNECT_TIMEOUT
this.dialTimeout = init.dialTimeout ?? 30000

this.allow = (init.allow ?? []).map(ma => multiaddr(ma))
this.deny = (init.deny ?? []).map(ma => multiaddr(ma))
this.allow = (init.allow ?? []).map((ma) => multiaddr(ma))
this.deny = (init.deny ?? []).map((ma) => multiaddr(ma))

this.inboundConnectionRateLimiter = new RateLimiterMemory({
points: this.opts.inboundConnectionThreshold,
Expand Down Expand Up @@ -343,7 +365,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven

for (const peer of await this.components.peerStore.all()) {
const tags = await this.components.peerStore.getTags(peer.id)
const hasKeepAlive = tags.filter(tag => tag.name === KEEP_ALIVE).length > 0
const hasKeepAlive = tags.filter((tag) => tag.name === KEEP_ALIVE).length > 0

if (hasKeepAlive) {
keepAlivePeers.push(peer.id)
Expand All @@ -356,20 +378,19 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
try {
// fails on node < 15.4
setMaxListeners?.(Infinity, this.connectOnStartupController.signal)
} catch {}
} catch { }

await Promise.all(
keepAlivePeers.map(async peer => {
keepAlivePeers.map(async (peer) => {
await this.openConnection(peer, {
signal: this.connectOnStartupController?.signal
}).catch((err) => {
log.error(err)
})
.catch(err => {
log.error(err)
})
})
)
})
.catch(err => {
.catch((err) => {
log.error(err)
})
.finally(() => {
Expand Down Expand Up @@ -404,13 +425,15 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
const tasks: Array<Promise<void>> = []
for (const connectionList of this.connections.values()) {
for (const connection of connectionList) {
tasks.push((async () => {
try {
await connection.close()
} catch (err) {
log.error(err)
}
})())
tasks.push(
(async () => {
try {
await connection.close()
} catch (err) {
log.error(err)
}
})()
)
}
}

Expand All @@ -420,7 +443,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
}

onConnect (evt: CustomEvent<Connection>) {
void this._onConnect(evt).catch(err => {
void this._onConnect(evt).catch((err) => {
log.error(err)
})
}
Expand Down Expand Up @@ -452,9 +475,9 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
}

const numConnections = this.getConnections().length
const toPrune = numConnections - this.opts.maxConnections
const toPrune = numConnections - this.opts.maxIncomingConnections

await this._checkMaxLimit('maxConnections', numConnections, toPrune)
await this._checkMaxLimit('maxIncomingConnections', numConnections, toPrune)
this.dispatchEvent(new CustomEvent<Connection>('peer:connect', { detail: connection }))
}

Expand Down Expand Up @@ -525,7 +548,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
try {
// fails on node < 15.4
setMaxListeners?.(Infinity, timeoutController.signal)
} catch {}
} catch { }
}

try {
Expand Down Expand Up @@ -564,7 +587,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
const connections = this.connections.get(peerId.toString()) ?? []

await Promise.all(
connections.map(async connection => {
connections.map(async (connection) => {
return await connection.close()
})
)
Expand All @@ -583,7 +606,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven

// Return all open connections
if (connections != null) {
return connections.filter(connection => connection.stat.status === STATUS.OPEN)
return connections.filter((connection) => connection.stat.status === STATUS.OPEN)
}

return []
Expand All @@ -595,10 +618,9 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
_onLatencyMeasure (evt: CustomEvent<SummaryObject>) {
const { detail: summary } = evt

this._checkMaxLimit('maxEventLoopDelay', summary.avgMs, 1)
.catch(err => {
log.error(err)
})
this._checkMaxLimit('maxEventLoopDelay', summary.avgMs, 1).catch((err) => {
log.error(err)
})
}

/**
Expand Down Expand Up @@ -632,9 +654,12 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
const tags = await this.components.peerStore.getTags(remotePeer)

// sum all tag values
peerValues.set(remotePeer, tags.reduce((acc, curr) => {
return acc + curr.value
}, 0))
peerValues.set(
remotePeer,
tags.reduce((acc, curr) => {
return acc + curr.value
}, 0)
)
}

// sort by value, lowest to highest
Expand Down Expand Up @@ -667,24 +692,26 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven

// close connections
await Promise.all(
toClose.map(async connection => {
toClose.map(async (connection) => {
try {
await connection.close()
} catch (err) {
log.error(err)
}

// TODO: should not need to invoke this manually
this.onDisconnect(new CustomEvent<Connection>('connectionEnd', {
detail: connection
}))
this.onDisconnect(
new CustomEvent<Connection>('connectionEnd', {
detail: connection
})
)
})
)
}

async acceptIncomingConnection (maConn: MultiaddrConnection): Promise<boolean> {
// check deny list
const denyConnection = this.deny.some(ma => {
const denyConnection = this.deny.some((ma) => {
return maConn.remoteAddr.toString().startsWith(ma.toString())
})

Expand All @@ -694,7 +721,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
}

// check allow list
const allowConnection = this.allow.some(ma => {
const allowConnection = this.allow.some((ma) => {
return maConn.remoteAddr.toString().startsWith(ma.toString())
})

Expand All @@ -721,13 +748,13 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
}
}

if (this.getConnections().length < this.opts.maxConnections) {
if (this.getConnections().length < this.opts.maxIncomingConnections) {
this.incomingPendingConnections++

return true
}

log('connection from %s refused - maxConnections exceeded', maConn.remoteAddr)
log('connection from %s refused - maxIncomingConnections exceeded', maConn.remoteAddr)
return false
}

Expand Down

0 comments on commit 79bff2c

Please sign in to comment.