Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

fix: re-enable ensuring queries run along disjoint paths #371

Merged
merged 9 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
"@libp2p/interface-registrar": "^2.0.3",
"@libp2p/interfaces": "^3.0.3",
"@libp2p/logger": "^2.0.1",
"@libp2p/peer-collections": "^2.2.0",
"@libp2p/peer-id": "^1.1.15",
"@libp2p/record": "^2.0.2",
"@libp2p/topology": "^3.0.0",
Expand Down
7 changes: 6 additions & 1 deletion src/query/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import type { Startable } from '@libp2p/interfaces/startable'
import type { QueryFunc } from './types.js'
import type { QueryOptions } from '@libp2p/interface-dht'
import { Components, Initializable } from '@libp2p/components'
import { PeerSet } from '@libp2p/peer-collections'

const METRIC_RUNNING_QUERIES = 'running-queries'

Expand Down Expand Up @@ -139,6 +140,9 @@ export class QueryManager implements Startable, Initializable {
return
}

// make sure we don't get trapped in a loop
const peersSeen = new PeerSet
achingbrain marked this conversation as resolved.
Show resolved Hide resolved

// Create query paths from the starting peers
const paths = peersToQuery.map((peer, index) => {
return queryPath({
Expand All @@ -152,7 +156,8 @@ export class QueryManager implements Startable, Initializable {
alpha: this.alpha,
cleanUp,
queryFuncTimeout: options.queryFuncTimeout,
log
log,
peersSeen
})
})

Expand Down
15 changes: 9 additions & 6 deletions src/query/query-path.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import type { CleanUpEvents } from './manager.js'
import type { Logger } from '@libp2p/logger'
import type { QueryFunc } from '../query/types.js'
import type { QueryEvent } from '@libp2p/interface-dht'
import type { PeerSet } from '@libp2p/peer-collections'

const MAX_XOR = BigInt('0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF')

Expand Down Expand Up @@ -71,14 +72,19 @@ export interface QueryPathOptions {
* Query log
*/
log: Logger

/**
* Set of peers seen by this and other paths
*/
peersSeen: PeerSet
}

/**
* Walks a path through the DHT, calling the passed query function for
* every peer encountered that we have not seen before
*/
export async function * queryPath (options: QueryPathOptions) {
const { key, startingPeer, ourPeerId, signal, query, alpha, pathIndex, numPaths, cleanUp, queryFuncTimeout, log } = options
const { key, startingPeer, ourPeerId, signal, query, alpha, pathIndex, numPaths, cleanUp, queryFuncTimeout, log, peersSeen } = options
// Only ALPHA node/value lookups are allowed at any given time for each process
// https://github.com/libp2p/specs/tree/master/kad-dht#alpha-concurrency-parameter-%CE%B1
const queue = new Queue({
Expand All @@ -88,9 +94,6 @@ export async function * queryPath (options: QueryPathOptions) {
// perform lookups on kadId, not the actual value
const kadId = await convertBuffer(key)

// make sure we don't get trapped in a loop
const peersSeen = new Set()

/**
* Adds the passed peer to the query queue if it's not us and no
* other path has passed through this peer
Expand All @@ -100,7 +103,7 @@ export async function * queryPath (options: QueryPathOptions) {
return
}

peersSeen.add(peer.toString())
peersSeen.add(peer)

const peerXor = BigInt('0x' + toString(xor(peerKadId, kadId), 'base16'))

Expand Down Expand Up @@ -130,7 +133,7 @@ export async function * queryPath (options: QueryPathOptions) {
// if there are closer peers and the query has not completed, continue the query
if (event.name === 'PEER_RESPONSE') {
for (const closerPeer of event.closer) {
if (peersSeen.has(closerPeer.id.toString())) { // eslint-disable-line max-depth
if (peersSeen.has(closerPeer.id)) { // eslint-disable-line max-depth
log('already seen %p in query', closerPeer.id)
continue
}
Expand Down
32 changes: 32 additions & 0 deletions test/query.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,38 @@ describe('QueryManager', () => {
await manager.stop()
})

it('should stop when passing through the same node twice', async () => {
const manager = new QueryManager({ disjointPaths: 20, alpha: 1 })
manager.init(new Components({
peerId: ourPeerId
}))
await manager.start()

const topology = createTopology({
6: { closerPeers: [2] },
5: { closerPeers: [4] },
4: { closerPeers: [3] },
3: { closerPeers: [2] },
2: { closerPeers: [1] },
1: { closerPeers: [0] },
0: { value: uint8ArrayFromString('hello world') }
})

const results = await all(manager.run(key, [peers[6], peers[5]], createQueryFunction(topology)))
const traversedPeers = results
.map(event => {
if (event.type !== EventTypes.PEER_RESPONSE && event.type !== EventTypes.VALUE) {
throw new Error(`Unexpected query event type ${event.type}`)
}

return event.from
})

expect(traversedPeers).lengthOf(7)

await manager.stop()
})

it('only closerPeers', async () => {
const manager = new QueryManager({ disjointPaths: 1, alpha: 1 })
manager.init(new Components({
Expand Down