Skip to content

Commit

Permalink
fix: updated connection limits to filter for inbound/outbound (libp2p…
Browse files Browse the repository at this point in the history
  • Loading branch information
maschad committed Jan 23, 2023
1 parent 3f20c93 commit 8e11bf7
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 62 deletions.
71 changes: 41 additions & 30 deletions src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,16 +173,9 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven

this.opts = mergeOptions.call({ ignoreUndefined: true }, defaultOptions, init)

if (this.opts.maxIncomingConnections < this.opts.minConnections) {
if ((this.opts.maxIncomingConnections + this.opts.maxOutgoingConnections) < this.opts.minConnections) {
throw errCode(
new Error('Connection Manager maxIncomingConnections must be greater than minConnections'),
codes.ERR_INVALID_PARAMETERS
)
}

if (this.opts.maxOutgoingConnections < this.opts.minConnections) {
throw errCode(
new Error('Connection Manager maxOutgoingConnections must be greater than minConnections'),
new Error('Connection Manager maxIncomingConnections + maxOutgoingConnections must be greater than minConnections'),
codes.ERR_INVALID_PARAMETERS
)
}
Expand Down Expand Up @@ -442,10 +435,24 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
await this.components.peerStore.keyBook.set(peerId, peerId.publicKey)
}

const numConnections = this.getConnections().length
const toPrune = numConnections - this.opts.maxOutgoingConnections
let [outboundConnections, inboundConnections] = [0, 0]

const connections = this.getConnections()

connections.forEach(connection => {
if (connection.stat.direction === 'inbound') {
inboundConnections++
} else {
outboundConnections++
}
})

const numIncomingToPrune = inboundConnections - this.opts.maxIncomingConnections
const numOutgoingToPrune = outboundConnections - this.opts.maxOutgoingConnections

await this._checkMaxLimit('maxOutgoingConnections', outboundConnections, numOutgoingToPrune)
await this._checkMaxLimit('maxIncomingConnections', inboundConnections, numIncomingToPrune)

await this._checkMaxLimit('maxOutgoingConnections', numConnections, toPrune)
this.dispatchEvent(new CustomEvent<Connection>('peer:connect', { detail: connection }))
}

Expand Down Expand Up @@ -505,9 +512,12 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
}
}

if ((this.getConnections().length + 1) > this.opts.maxOutgoingConnections) {
const connections = this.getConnections()
const totalOutboundConnections = connections.filter(connection => connection.stat.direction === 'outbound').length

if ((totalOutboundConnections + 1) > this.opts.maxOutgoingConnections) {
throw errCode(
new Error('Connection Manager maxOutgoing connections exceeded'),
new Error('Connection Manager max connections exceeded'),
codes.ERR_CONNECTION_DENIED
)
}
Expand Down Expand Up @@ -548,6 +558,8 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
peerConnections.push(connection)
}

connection.stat.direction = 'outbound'

return connection
} finally {
if (timeoutController != null) {
Expand Down Expand Up @@ -707,23 +719,27 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
return false
}

// check pending connections
if (this.incomingPendingConnections === this.opts.maxIncomingPendingConnections) {
log('connection from %s refused - incomingPendingConnections exceeded by peer %s', maConn.remoteAddr)
return false
}

const connections = this.getConnections()

const totalIncomingConnections = connections.filter(connection => connection.stat.direction === 'inbound').length

// check allow list
const allowConnection = this.allow.some(ma => {
return maConn.remoteAddr.toString().startsWith(ma.toString())
})

if (allowConnection) {
this.incomingPendingConnections++

return true
}

// check pending connections
if (this.incomingPendingConnections === this.opts.maxIncomingPendingConnections) {
log('connection from %s refused - incomingPendingConnections exceeded by peer %s', maConn.remoteAddr)
if ((totalIncomingConnections + 1 > this.opts.maxIncomingConnections) && !allowConnection) {
log('connection from %s refused - maxIncomingConnections exceeded', maConn.remoteAddr)
return false
}

// Check the rate limiter
if (maConn.remoteAddr.isThinWaistAddress()) {
const host = maConn.remoteAddr.nodeAddress().address

Expand All @@ -735,14 +751,9 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
}
}

if (this.getConnections().length < this.opts.maxIncomingConnections) {
this.incomingPendingConnections++

return true
}
this.incomingPendingConnections++

log('connection from %s refused - maxConnections exceeded', maConn.remoteAddr)
return false
return true
}

afterUpgradeInbound () {
Expand Down
56 changes: 24 additions & 32 deletions test/connection-manager/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,30 +265,19 @@ describe('Connection Manager', () => {
expect(spy).to.have.property('callCount', 1)
})

it('should fail if the connection manager has mismatched incoming connection limit options', async () => {
it('should fail if the connection manager has mismatched incoming + outgoing connection limit options', async () => {
await expect(createNode({
config: createBaseOptions({
connectionManager: {
maxIncomingConnections: 5,
minConnections: 6
maxOutgoingConnections: 1,
minConnections: 7
}
}),
started: false
})).to.eventually.rejected('maxIncomingConnections must be greater')
})

it('should fail if the connection manager has mismatched outgoing connection limit options', async () => {
await expect(createNode({
config: createBaseOptions({
connectionManager: {
maxOutgoingConnections: 5,
minConnections: 6
}
}),
started: false
})).to.eventually.rejected('maxOutgoingConnections must be greater')
})

it('should reconnect to important peers on startup', async () => {
const peerId = await createEd25519PeerId()

Expand Down Expand Up @@ -343,31 +332,34 @@ describe('Connection Manager', () => {
})

it('should deny connections when maxIncomingConnections is exceeded', async () => {
const dialer = stubInterface<Dialer>()
dialer.dial.resolves(stubInterface<Connection>())

const connectionManager = new DefaultConnectionManager({
peerId: libp2p.peerId,
upgrader: stubInterface<Upgrader>(),
peerStore: stubInterface<PeerStore>(),
dialer
}, {
...defaultOptions,
maxIncomingConnections: 1
libp2p = await createNode({
config: createBaseOptions({
connectionManager: {
maxIncomingConnections: 1
}
}),
started: false
})

// max out the connection limit
await connectionManager.openConnection(await createEd25519PeerId())
await libp2p.start()

const connectionManager = libp2p.connectionManager as DefaultConnectionManager

// max out the connection limit by having an inbound connection already
const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId()))
connection.stat.direction = 'inbound'
await connectionManager._onConnect(new CustomEvent('connection', { detail: connection }))

expect(connectionManager.getConnections()).to.have.lengthOf(1)

// an inbound connection is opened
const remotePeer = await createEd25519PeerId()
const maConn = mockMultiaddrConnection({
// another inbound connection is opened
const remotePeer2 = await createEd25519PeerId()
const maConn2 = mockMultiaddrConnection({
source: [],
sink: async () => {}
}, remotePeer)
}, remotePeer2)

await expect(connectionManager.acceptIncomingConnection(maConn))
await expect(connectionManager.acceptIncomingConnection(maConn2))
.to.eventually.be.false()
})

Expand Down

0 comments on commit 8e11bf7

Please sign in to comment.