Skip to content

Commit

Permalink
fix: allow dialing a peer when we only have transient connections (#2187
Browse files Browse the repository at this point in the history
)

If we dial a peer that we already have a connection to, we return that
connection instead of dialing.

This creates a problem when we are trying to make a direct connection
to a peer that we already have a relay connection to since the relay
connection is returned.

The fix here is to allow additional dials to peers when we only have
relay connections.

If a direct connection exists, we will still return it instead of dialing.
  • Loading branch information
achingbrain authored Nov 2, 2023
1 parent 16a8707 commit dd400cd
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 27 deletions.
24 changes: 3 additions & 21 deletions packages/libp2p/src/circuit-relay/transport/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ import { CodeError } from '@libp2p/interface/errors'
import { TypedEventEmitter } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { PeerMap } from '@libp2p/peer-collections'
import { peerIdFromString } from '@libp2p/peer-id'
import { multiaddr } from '@multiformats/multiaddr'
import type { ReservationStore } from './reservation-store.js'
import type { Connection } from '@libp2p/interface/connection'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { Listener, ListenerEvents } from '@libp2p/interface/transport'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
Expand Down Expand Up @@ -41,25 +39,9 @@ class CircuitRelayTransportListener extends TypedEventEmitter<ListenerEvents> im
async listen (addr: Multiaddr): Promise<void> {
log('listen on %a', addr)

const relayPeerStr = addr.getPeerId()
let relayConn: Connection | undefined

// check if we already have a connection to the relay
if (relayPeerStr != null) {
const relayPeer = peerIdFromString(relayPeerStr)
const connections = this.connectionManager.getConnectionsMap().get(relayPeer) ?? []

if (connections.length > 0) {
relayConn = connections[0]
}
}

// open a new connection as we don't already have one
if (relayConn == null) {
const addrString = addr.toString().split('/p2p-circuit').find(a => a !== '')
const ma = multiaddr(addrString)
relayConn = await this.connectionManager.openConnection(ma)
}
// remove the circuit part to get the peer id of the relay
const relayAddr = addr.decapsulate('/p2p-circuit')
const relayConn = await this.connectionManager.openConnection(relayAddr)

if (!this.relayStore.hasReservation(relayConn.remotePeer)) {
// addRelay calls transportManager.listen which calls this listen method
Expand Down
45 changes: 44 additions & 1 deletion packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { AbortError, CodeError } from '@libp2p/interface/errors'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { PeerMap } from '@libp2p/peer-collections'
import { defaultAddressSort } from '@libp2p/utils/address-sort'
import { type Multiaddr, type Resolver, resolvers } from '@multiformats/multiaddr'
import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers'
Expand Down Expand Up @@ -35,6 +36,7 @@ export interface PendingDialTarget {

export interface DialOptions extends AbortOptions {
priority?: number
force?: boolean
}

interface PendingDialInternal extends PendingDial {
Expand All @@ -48,6 +50,7 @@ interface DialerInit {
maxParallelDialsPerPeer?: number
dialTimeout?: number
resolvers?: Record<string, Resolver>
connections?: PeerMap<Connection[]>
}

const defaultOptions = {
Expand Down Expand Up @@ -83,12 +86,14 @@ export class DialQueue {
private readonly inProgressDialCount?: Metric
private readonly pendingDialCount?: Metric
private readonly shutDownController: AbortController
private readonly connections: PeerMap<Connection[]>

constructor (components: DialQueueComponents, init: DialerInit = {}) {
this.addressSorter = init.addressSorter ?? defaultOptions.addressSorter
this.maxPeerAddrsToDial = init.maxPeerAddrsToDial ?? defaultOptions.maxPeerAddrsToDial
this.maxParallelDialsPerPeer = init.maxParallelDialsPerPeer ?? defaultOptions.maxParallelDialsPerPeer
this.dialTimeout = init.dialTimeout ?? defaultOptions.dialTimeout
this.connections = init.connections ?? new PeerMap()

this.peerId = components.peerId
this.peerStore = components.peerStore
Expand Down Expand Up @@ -187,6 +192,23 @@ export class DialQueue {
throw err
}

// make sure we don't have an existing connection to any of the addresses we
// are about to dial
let existingConnection = Array.from(this.connections.values()).flat().find(conn => {
if (options.force === true) {
return false
}

return addrsToDial.find(addr => {
return addr.multiaddr.equals(conn.remoteAddr)
})
})

if (existingConnection != null) {
log('already connected to %a', existingConnection.remoteAddr)
return existingConnection
}

// ready to dial, all async work finished - make sure we don't have any
// pending dials in progress for this peer or set of multiaddrs
const existingDial = this.pendingDials.find(dial => {
Expand Down Expand Up @@ -257,7 +279,28 @@ export class DialQueue {
// let other dials join this one
this.pendingDials.push(pendingDial)

return pendingDial.promise
const connection = await pendingDial.promise

// we may have been dialing a multiaddr without a peer id attached but by
// this point we have upgraded the connection so the remote peer information
// should be available - check again that we don't already have a connection
// to the remote multiaddr
existingConnection = Array.from(this.connections.values()).flat().find(conn => {
if (options.force === true) {
return false
}

return conn.id !== connection.id && conn.remoteAddr.equals(connection.remoteAddr)
})

if (existingConnection != null) {
log('already connected to %a', existingConnection.remoteAddr)
await connection.close()
return existingConnection
}

log('connection opened to %a', connection.remoteAddr)
return connection
}

private createDialAbortControllers (userSignal?: AbortSignal): ClearableSignal {
Expand Down
12 changes: 7 additions & 5 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
dialTimeout: init.dialTimeout ?? DIAL_TIMEOUT,
resolvers: init.resolvers ?? {
dnsaddr: dnsaddrResolver
}
},
connections: this.connections
})
}

Expand Down Expand Up @@ -505,12 +506,13 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {

if (peerId != null && options.force !== true) {
log('dial %p', peerId)
const existingConnections = this.getConnections(peerId)
const existingConnection = this.getConnections(peerId)
.find(conn => !conn.transient)

if (existingConnections.length > 0) {
log('had an existing connection to %p', peerId)
if (existingConnection != null) {
log('had an existing non-transient connection to %p', peerId)

return existingConnections[0]
return existingConnection
}
}

Expand Down
36 changes: 36 additions & 0 deletions packages/libp2p/test/connection-manager/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -536,4 +536,40 @@ describe('Connection Manager', () => {
await expect(connectionManager.acceptIncomingConnection(maConn2))
.to.eventually.be.true()
})

it('should allow dialing peers when an existing transient connection exists', async () => {
connectionManager = new DefaultConnectionManager({
peerId: await createEd25519PeerId(),
peerStore: stubInterface<PeerStore>(),
transportManager: stubInterface<TransportManager>(),
connectionGater: stubInterface<ConnectionGater>(),
events: new TypedEventEmitter()
}, {
...defaultOptions,
maxIncomingPendingConnections: 1
})
await connectionManager.start()

const targetPeer = await createEd25519PeerId()
const addr = multiaddr(`/ip4/123.123.123.123/tcp/123/p2p/${targetPeer}`)

const existingConnection = stubInterface<Connection>({
transient: true
})
const newConnection = stubInterface<Connection>()

sinon.stub(connectionManager.dialQueue, 'dial')
.withArgs(addr)
.resolves(newConnection)

// we have an existing transient connection
const map = connectionManager.getConnectionsMap()
map.set(targetPeer, [
existingConnection
])

const conn = await connectionManager.openConnection(addr)

expect(conn).to.equal(newConnection)
})
})

0 comments on commit dd400cd

Please sign in to comment.