Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: listen on circuit relay addresses that contain unsupported segments #1732

Merged
merged 4 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/circuit-relay/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ class CircuitRelayServer extends EventEmitter<RelayServerEvents> implements Star
const addrs = []

for (const relayAddr of this.addressManager.getAddresses()) {
if (relayAddr.toString().includes('/p2p-circuit')) {
continue
}

addrs.push(relayAddr.bytes)
}

Expand Down
4 changes: 2 additions & 2 deletions src/circuit-relay/transport/discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ export class RelayDiscovery extends EventEmitter<RelayDiscoveryEvents> implement
log('searching peer store for relays')
const peers = (await this.peerStore.all())
// filter by a list of peers supporting RELAY_V2_HOP and ones we are not listening on
.filter(({ id, protocols }) => protocols.includes(RELAY_V2_HOP_CODEC))
.filter(({ protocols }) => protocols.includes(RELAY_V2_HOP_CODEC))
.sort(() => Math.random() - 0.5)

for (const peer of peers) {
Expand All @@ -112,11 +112,11 @@ export class RelayDiscovery extends EventEmitter<RelayDiscoveryEvents> implement
const peerId = provider.id

found++
log('found relay peer %p in content routing', peerId)
await this.peerStore.merge(peerId, {
multiaddrs: provider.multiaddrs
})

log('found relay peer %p in content routing', peerId)
this.safeDispatchEvent('relay:discover', { detail: peerId })
}
}
Expand Down
11 changes: 4 additions & 7 deletions src/circuit-relay/transport/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ class CircuitRelayTransport implements Transport {
private readonly upgrader: Upgrader
private readonly addressManager: AddressManager
private readonly connectionGater: ConnectionGater
private readonly events: EventEmitter<Libp2pEvents>
private readonly reservationStore: ReservationStore
private started: boolean

Expand All @@ -96,7 +95,6 @@ class CircuitRelayTransport implements Transport {
this.upgrader = components.upgrader
this.addressManager = components.addressManager
this.connectionGater = components.connectionGater
this.events = components.events

if (init.discoverRelays != null && init.discoverRelays > 0) {
this.discovery = new RelayDiscovery(components)
Expand Down Expand Up @@ -170,7 +168,7 @@ class CircuitRelayTransport implements Transport {
const destinationId = destinationAddr.getPeerId()

if (relayId == null || destinationId == null) {
const errMsg = 'Circuit relay dial failed as addresses did not have peer id'
const errMsg = `Circuit relay dial to ${ma.toString()} failed as address did not have peer ids`
log.error(errMsg)
throw new CodeError(errMsg, codes.ERR_RELAYED_DIAL)
}
Expand Down Expand Up @@ -203,7 +201,7 @@ class CircuitRelayTransport implements Transport {
disconnectOnFailure
})
} catch (err: any) {
log.error('Circuit relay dial failed', err)
log.error(`Circuit relay dial to destination ${destinationPeer.toString()} via relay ${relayPeer.toString()} failed`, err)
disconnectOnFailure && await relayConnection.close()
throw err
}
Expand Down Expand Up @@ -244,7 +242,7 @@ class CircuitRelayTransport implements Transport {
log('new outbound connection %s', maConn.remoteAddr)
return await this.upgrader.upgradeOutbound(maConn)
} catch (err) {
log.error('Circuit relay dial failed', err)
log.error(`Circuit relay dial to destination ${destinationPeer.toString()} via relay ${connection.remotePeer.toString()} failed`, err)
disconnectOnFailure && await connection.close()
throw err
}
Expand All @@ -256,8 +254,7 @@ class CircuitRelayTransport implements Transport {
createListener (options: CreateListenerOptions): Listener {
return createListener({
connectionManager: this.connectionManager,
relayStore: this.reservationStore,
events: this.events
relayStore: this.reservationStore
})
}

Expand Down
64 changes: 35 additions & 29 deletions src/circuit-relay/transport/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,79 +7,85 @@ import type { ReservationStore } from './reservation-store.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import { PeerMap } from '@libp2p/peer-collections'
import { logger } from '@libp2p/logger'
import type { Libp2pEvents } from '@libp2p/interface-libp2p'
import { peerIdFromString } from '@libp2p/peer-id'
import type { Connection } from '@libp2p/interface-connection'
import { CodeError } from '@libp2p/interfaces/errors'

const log = logger('libp2p:circuit-relay:transport:listener')

export interface CircuitRelayTransportListenerComponents {
connectionManager: ConnectionManager
relayStore: ReservationStore
events: EventEmitter<Libp2pEvents>
}

class CircuitRelayTransportListener extends EventEmitter<ListenerEvents> implements Listener {
private readonly connectionManager: ConnectionManager
private readonly relayStore: ReservationStore
private readonly listeningAddrs: PeerMap<Multiaddr>
private readonly events: EventEmitter<Libp2pEvents>
private readonly listeningAddrs: PeerMap<Multiaddr[]>

constructor (components: CircuitRelayTransportListenerComponents) {
super()

this.connectionManager = components.connectionManager
this.relayStore = components.relayStore
this.events = components.events
this.listeningAddrs = new PeerMap()

// remove listening addrs when a relay is removed
this.relayStore.addEventListener('relay:removed', (evt) => {
this.#removeRelayPeer(evt.detail)
})

// remove listening addrs when a peer disconnects
this.events.addEventListener('connection:close', (evt) => {
this.#removeRelayPeer(evt.detail.remotePeer)
})
}

async listen (addr: Multiaddr): Promise<void> {
log('listen on %s', addr)

const addrString = addr.toString().split('/p2p-circuit').find(a => a !== '')
const ma = multiaddr(addrString)
const relayConn = await this.connectionManager.openConnection(ma)
const relayPeerStr = addr.getPeerId()
let relayConn: Connection | undefined

// check if we already have a connection to the relay
if (relayPeerStr != null) {
const relayPeer = peerIdFromString(relayPeerStr)
const connections = this.connectionManager.getConnectionsMap().get(relayPeer) ?? []

if (connections.length > 0) {
relayConn = connections[0]
}
}

// open a new connection as we don't already have one
if (relayConn == null) {
const addrString = addr.toString().split('/p2p-circuit').find(a => a !== '')
const ma = multiaddr(addrString)
relayConn = await this.connectionManager.openConnection(ma)
}

if (!this.relayStore.hasReservation(relayConn.remotePeer)) {
// addRelay calls transportManager.listen which calls this listen method
await this.relayStore.addRelay(relayConn.remotePeer, 'configured')
return
}

const reservation = this.relayStore.getReservation(relayConn.remotePeer)

if (reservation == null) {
throw new CodeError('Did not have reservation after making reservation', 'ERR_NO_RESERVATION')
}

if (this.listeningAddrs.has(relayConn.remotePeer)) {
log('already listening on relay %p', relayConn.remotePeer)
return
}

this.listeningAddrs.set(relayConn.remotePeer, addr)
// add all addresses from the relay reservation
this.listeningAddrs.set(relayConn.remotePeer, reservation.addrs.map(buf => {
return multiaddr(buf).encapsulate('/p2p-circuit')
}))

this.safeDispatchEvent('listening', {})
}

/**
* Get fixed up multiaddrs
*
* NOTE: This method will grab the peers multiaddrs and expand them such that:
*
* a) If it's an existing /p2p-circuit address for a specific relay i.e.
* `/ip4/0.0.0.0/tcp/0/ipfs/QmRelay/p2p-circuit` this method will expand the
* address to `/ip4/0.0.0.0/tcp/0/ipfs/QmRelay/p2p-circuit/ipfs/QmPeer` where
* `QmPeer` is this peers id
* b) If it's not a /p2p-circuit address, it will encapsulate the address as a /p2p-circuit
* addr, such when dialing over a relay with this address, it will create the circuit using
* the encapsulated transport address. This is useful when for example, a peer should only
* be dialed over TCP rather than any other transport
*/
getAddrs (): Multiaddr[] {
return [...this.listeningAddrs.values()]
return [...this.listeningAddrs.values()].flat()
}

async close (): Promise<void> {
Expand Down
36 changes: 21 additions & 15 deletions src/circuit-relay/transport/reservation-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
// When a peer disconnects, if we had a reservation on that peer
// remove the reservation and multiaddr and maybe trigger search
// for new relays
this.events.addEventListener('connection:close', (evt) => {
this.removeRelay(evt.detail.remotePeer)
this.events.addEventListener('peer:disconnect', (evt) => {
this.#removeRelay(evt.detail)
})
}

Expand Down Expand Up @@ -125,15 +125,15 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
* on the remote peer
*/
async addRelay (peerId: PeerId, type: RelayType): Promise<void> {
log('add relay', this.reserveQueue.size)
if (this.peerId.equals(peerId)) {
log('not trying to use self as relay')
return
}

log('add relay %p', peerId)

await this.reserveQueue.add(async () => {
try {
if (this.peerId.equals(peerId)) {
log('not trying to use self as relay')
return
}

// allow refresh of an existing reservation if it is about to expire
const existingReservation = this.reservations.get(peerId)

Expand Down Expand Up @@ -181,6 +181,7 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
})
}, timeoutDuration)

// we've managed to create a reservation successfully
this.reservations.set(peerId, {
timeout,
reservation,
Expand All @@ -197,13 +198,13 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
}
})

await this.transportManager.listen(
reservation.addrs.map(ma => {
return multiaddr(ma).encapsulate('/p2p-circuit')
})
)
// listen on multiaddr that only the circuit transport is listening for
await this.transportManager.listen([multiaddr(`/p2p/${peerId.toString()}/p2p-circuit`)])
} catch (err) {
log.error('could not reserve slot on %p', peerId, err)

// if listening failed, remove the reservation
this.reservations.delete(peerId)
}
})
}
Expand All @@ -212,6 +213,10 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
return this.reservations.has(peerId)
}

getReservation (peerId: PeerId): Reservation | undefined {
return this.reservations.get(peerId)?.reservation
}

async #createReservation (connection: Connection): Promise<Reservation> {
log('requesting reservation from %s', connection.remotePeer)
const stream = await connection.newStream(RELAY_V2_HOP_CODEC)
Expand Down Expand Up @@ -243,21 +248,22 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
/**
* Remove listen relay
*/
removeRelay (peerId: PeerId): void {
#removeRelay (peerId: PeerId): void {
const existingReservation = this.reservations.get(peerId)

if (existingReservation == null) {
return
}

log('removing relay %p', peerId)
log('connection to relay %p closed, removing reservation from local store', peerId)

clearTimeout(existingReservation.timeout)
this.reservations.delete(peerId)

this.safeDispatchEvent('relay:removed', { detail: peerId })

if (this.reservations.size < this.maxDiscoveredRelays) {
log('not enough relays %d/%d', this.reservations.size, this.maxDiscoveredRelays)
this.safeDispatchEvent('relay:not-enough-relays', {})
}
}
Expand Down