Skip to content

Commit

Permalink
fix: listen on circuit relay addresses that contain unsupported segme…
Browse files Browse the repository at this point in the history
…nts (#1732)

Fixes a bug whereby receiving a circuit relay address would cause js-libp2p to try to listen on the non-relayed parts of the address.  For example if a relay listening on a WebTransport multiaddr was dialled, js-libp2p would then try to listen on the same address due to the relay reservation store calling `transportManager.listen` with the full address.  This would then fail and the relaying would not succeed.

The fix is to call `transportManager.listen` with a multiaddr that just has the peer id and the `p2p-circuit` segments so only the relay transport tries to use the address.

For safety if listening on an address or writing to the peer store fails while creating a relay reservation, remove the reservation from the reservation store to allow us to try again.

Also updates logging to make what is going on a bit clearer.

Closes #1690

---------

Co-authored-by: Chad Nehemiah <[email protected]>
  • Loading branch information
achingbrain and maschad authored May 5, 2023
1 parent 0d46c78 commit 947639f
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 53 deletions.
4 changes: 4 additions & 0 deletions src/circuit-relay/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ class CircuitRelayServer extends EventEmitter<RelayServerEvents> implements Star
const addrs = []

for (const relayAddr of this.addressManager.getAddresses()) {
if (relayAddr.toString().includes('/p2p-circuit')) {
continue
}

addrs.push(relayAddr.bytes)
}

Expand Down
4 changes: 2 additions & 2 deletions src/circuit-relay/transport/discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ export class RelayDiscovery extends EventEmitter<RelayDiscoveryEvents> implement
log('searching peer store for relays')
const peers = (await this.peerStore.all())
// filter by a list of peers supporting RELAY_V2_HOP and ones we are not listening on
.filter(({ id, protocols }) => protocols.includes(RELAY_V2_HOP_CODEC))
.filter(({ protocols }) => protocols.includes(RELAY_V2_HOP_CODEC))
.sort(() => Math.random() - 0.5)

for (const peer of peers) {
Expand All @@ -112,11 +112,11 @@ export class RelayDiscovery extends EventEmitter<RelayDiscoveryEvents> implement
const peerId = provider.id

found++
log('found relay peer %p in content routing', peerId)
await this.peerStore.merge(peerId, {
multiaddrs: provider.multiaddrs
})

log('found relay peer %p in content routing', peerId)
this.safeDispatchEvent('relay:discover', { detail: peerId })
}
}
Expand Down
11 changes: 4 additions & 7 deletions src/circuit-relay/transport/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ class CircuitRelayTransport implements Transport {
private readonly upgrader: Upgrader
private readonly addressManager: AddressManager
private readonly connectionGater: ConnectionGater
private readonly events: EventEmitter<Libp2pEvents>
private readonly reservationStore: ReservationStore
private started: boolean

Expand All @@ -96,7 +95,6 @@ class CircuitRelayTransport implements Transport {
this.upgrader = components.upgrader
this.addressManager = components.addressManager
this.connectionGater = components.connectionGater
this.events = components.events

if (init.discoverRelays != null && init.discoverRelays > 0) {
this.discovery = new RelayDiscovery(components)
Expand Down Expand Up @@ -170,7 +168,7 @@ class CircuitRelayTransport implements Transport {
const destinationId = destinationAddr.getPeerId()

if (relayId == null || destinationId == null) {
const errMsg = 'Circuit relay dial failed as addresses did not have peer id'
const errMsg = `Circuit relay dial to ${ma.toString()} failed as address did not have peer ids`
log.error(errMsg)
throw new CodeError(errMsg, codes.ERR_RELAYED_DIAL)
}
Expand Down Expand Up @@ -203,7 +201,7 @@ class CircuitRelayTransport implements Transport {
disconnectOnFailure
})
} catch (err: any) {
log.error('Circuit relay dial failed', err)
log.error(`Circuit relay dial to destination ${destinationPeer.toString()} via relay ${relayPeer.toString()} failed`, err)
disconnectOnFailure && await relayConnection.close()
throw err
}
Expand Down Expand Up @@ -244,7 +242,7 @@ class CircuitRelayTransport implements Transport {
log('new outbound connection %s', maConn.remoteAddr)
return await this.upgrader.upgradeOutbound(maConn)
} catch (err) {
log.error('Circuit relay dial failed', err)
log.error(`Circuit relay dial to destination ${destinationPeer.toString()} via relay ${connection.remotePeer.toString()} failed`, err)
disconnectOnFailure && await connection.close()
throw err
}
Expand All @@ -256,8 +254,7 @@ class CircuitRelayTransport implements Transport {
createListener (options: CreateListenerOptions): Listener {
return createListener({
connectionManager: this.connectionManager,
relayStore: this.reservationStore,
events: this.events
relayStore: this.reservationStore
})
}

Expand Down
64 changes: 35 additions & 29 deletions src/circuit-relay/transport/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,79 +7,85 @@ import type { ReservationStore } from './reservation-store.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import { PeerMap } from '@libp2p/peer-collections'
import { logger } from '@libp2p/logger'
import type { Libp2pEvents } from '@libp2p/interface-libp2p'
import { peerIdFromString } from '@libp2p/peer-id'
import type { Connection } from '@libp2p/interface-connection'
import { CodeError } from '@libp2p/interfaces/errors'

const log = logger('libp2p:circuit-relay:transport:listener')

export interface CircuitRelayTransportListenerComponents {
connectionManager: ConnectionManager
relayStore: ReservationStore
events: EventEmitter<Libp2pEvents>
}

class CircuitRelayTransportListener extends EventEmitter<ListenerEvents> implements Listener {
private readonly connectionManager: ConnectionManager
private readonly relayStore: ReservationStore
private readonly listeningAddrs: PeerMap<Multiaddr>
private readonly events: EventEmitter<Libp2pEvents>
private readonly listeningAddrs: PeerMap<Multiaddr[]>

constructor (components: CircuitRelayTransportListenerComponents) {
super()

this.connectionManager = components.connectionManager
this.relayStore = components.relayStore
this.events = components.events
this.listeningAddrs = new PeerMap()

// remove listening addrs when a relay is removed
this.relayStore.addEventListener('relay:removed', (evt) => {
this.#removeRelayPeer(evt.detail)
})

// remove listening addrs when a peer disconnects
this.events.addEventListener('connection:close', (evt) => {
this.#removeRelayPeer(evt.detail.remotePeer)
})
}

async listen (addr: Multiaddr): Promise<void> {
log('listen on %s', addr)

const addrString = addr.toString().split('/p2p-circuit').find(a => a !== '')
const ma = multiaddr(addrString)
const relayConn = await this.connectionManager.openConnection(ma)
const relayPeerStr = addr.getPeerId()
let relayConn: Connection | undefined

// check if we already have a connection to the relay
if (relayPeerStr != null) {
const relayPeer = peerIdFromString(relayPeerStr)
const connections = this.connectionManager.getConnectionsMap().get(relayPeer) ?? []

if (connections.length > 0) {
relayConn = connections[0]
}
}

// open a new connection as we don't already have one
if (relayConn == null) {
const addrString = addr.toString().split('/p2p-circuit').find(a => a !== '')
const ma = multiaddr(addrString)
relayConn = await this.connectionManager.openConnection(ma)
}

if (!this.relayStore.hasReservation(relayConn.remotePeer)) {
// addRelay calls transportManager.listen which calls this listen method
await this.relayStore.addRelay(relayConn.remotePeer, 'configured')
return
}

const reservation = this.relayStore.getReservation(relayConn.remotePeer)

if (reservation == null) {
throw new CodeError('Did not have reservation after making reservation', 'ERR_NO_RESERVATION')
}

if (this.listeningAddrs.has(relayConn.remotePeer)) {
log('already listening on relay %p', relayConn.remotePeer)
return
}

this.listeningAddrs.set(relayConn.remotePeer, addr)
// add all addresses from the relay reservation
this.listeningAddrs.set(relayConn.remotePeer, reservation.addrs.map(buf => {
return multiaddr(buf).encapsulate('/p2p-circuit')
}))

this.safeDispatchEvent('listening', {})
}

/**
* Get fixed up multiaddrs
*
* NOTE: This method will grab the peers multiaddrs and expand them such that:
*
* a) If it's an existing /p2p-circuit address for a specific relay i.e.
* `/ip4/0.0.0.0/tcp/0/ipfs/QmRelay/p2p-circuit` this method will expand the
* address to `/ip4/0.0.0.0/tcp/0/ipfs/QmRelay/p2p-circuit/ipfs/QmPeer` where
* `QmPeer` is this peers id
* b) If it's not a /p2p-circuit address, it will encapsulate the address as a /p2p-circuit
* addr, such when dialing over a relay with this address, it will create the circuit using
* the encapsulated transport address. This is useful when for example, a peer should only
* be dialed over TCP rather than any other transport
*/
getAddrs (): Multiaddr[] {
return [...this.listeningAddrs.values()]
return [...this.listeningAddrs.values()].flat()
}

async close (): Promise<void> {
Expand Down
36 changes: 21 additions & 15 deletions src/circuit-relay/transport/reservation-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
// When a peer disconnects, if we had a reservation on that peer
// remove the reservation and multiaddr and maybe trigger search
// for new relays
this.events.addEventListener('connection:close', (evt) => {
this.removeRelay(evt.detail.remotePeer)
this.events.addEventListener('peer:disconnect', (evt) => {
this.#removeRelay(evt.detail)
})
}

Expand Down Expand Up @@ -125,15 +125,15 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
* on the remote peer
*/
async addRelay (peerId: PeerId, type: RelayType): Promise<void> {
log('add relay', this.reserveQueue.size)
if (this.peerId.equals(peerId)) {
log('not trying to use self as relay')
return
}

log('add relay %p', peerId)

await this.reserveQueue.add(async () => {
try {
if (this.peerId.equals(peerId)) {
log('not trying to use self as relay')
return
}

// allow refresh of an existing reservation if it is about to expire
const existingReservation = this.reservations.get(peerId)

Expand Down Expand Up @@ -181,6 +181,7 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
})
}, timeoutDuration)

// we've managed to create a reservation successfully
this.reservations.set(peerId, {
timeout,
reservation,
Expand All @@ -197,13 +198,13 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
}
})

await this.transportManager.listen(
reservation.addrs.map(ma => {
return multiaddr(ma).encapsulate('/p2p-circuit')
})
)
// listen on multiaddr that only the circuit transport is listening for
await this.transportManager.listen([multiaddr(`/p2p/${peerId.toString()}/p2p-circuit`)])
} catch (err) {
log.error('could not reserve slot on %p', peerId, err)

// if listening failed, remove the reservation
this.reservations.delete(peerId)
}
})
}
Expand All @@ -212,6 +213,10 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
return this.reservations.has(peerId)
}

getReservation (peerId: PeerId): Reservation | undefined {
return this.reservations.get(peerId)?.reservation
}

async #createReservation (connection: Connection): Promise<Reservation> {
log('requesting reservation from %s', connection.remotePeer)
const stream = await connection.newStream(RELAY_V2_HOP_CODEC)
Expand Down Expand Up @@ -243,21 +248,22 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
/**
* Remove listen relay
*/
removeRelay (peerId: PeerId): void {
#removeRelay (peerId: PeerId): void {
const existingReservation = this.reservations.get(peerId)

if (existingReservation == null) {
return
}

log('removing relay %p', peerId)
log('connection to relay %p closed, removing reservation from local store', peerId)

clearTimeout(existingReservation.timeout)
this.reservations.delete(peerId)

this.safeDispatchEvent('relay:removed', { detail: peerId })

if (this.reservations.size < this.maxDiscoveredRelays) {
log('not enough relays %d/%d', this.reservations.size, this.maxDiscoveredRelays)
this.safeDispatchEvent('relay:not-enough-relays', {})
}
}
Expand Down

0 comments on commit 947639f

Please sign in to comment.