Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

fix: re-enable ensuring queries run along disjoint paths #371

Merged
merged 9 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/peer-routing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,6 @@ export class PeerRouting implements Initializable {
}

for await (const event of this.queryManager.run(key, tablePeers, getCloserPeersQuery, options)) {
yield event
Copy link
Member

@achingbrain achingbrain Sep 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why has this been removed? Yielding intermediate events is useful for query diagnostics and tracing.

Copy link
Contributor Author

@zeroxbt zeroxbt Sep 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, getClosestPeers should return only kBucketSize closest peers. This implementation currently also returns multiple "in between" peers instead coming from events other than "FINAL_PEER".
I removed this so that only final peers are returned. It could be there is a misunderstanding on my part. If that's the case, I can add it back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect this function to yield the kBucketSize sorted peers, but it's yielding a lot more than expected (not all are FINAL_PEER), and not necessarily sorted (because it merges events yielded by both lan and wan dhts).


if (event.name === 'PEER_RESPONSE') {
await Promise.all(event.closer.map(async peerData => await peers.add(peerData.id)))
}
Expand Down
6 changes: 5 additions & 1 deletion src/query/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ export class QueryManager implements Startable, Initializable {
return
}

// make sure we don't get trapped in a loop
const peersSeen = new Set()

// Create query paths from the starting peers
const paths = peersToQuery.map((peer, index) => {
return queryPath({
Expand All @@ -152,7 +155,8 @@ export class QueryManager implements Startable, Initializable {
alpha: this.alpha,
cleanUp,
queryFuncTimeout: options.queryFuncTimeout,
log
log,
peersSeen
})
})

Expand Down
10 changes: 6 additions & 4 deletions src/query/query-path.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,19 @@ export interface QueryPathOptions {
* Query log
*/
log: Logger

/**
* Set of peers seen by this and other paths
*/
peersSeen: Set<PeerId>
}

/**
* Walks a path through the DHT, calling the passed query function for
* every peer encountered that we have not seen before
*/
export async function * queryPath (options: QueryPathOptions) {
const { key, startingPeer, ourPeerId, signal, query, alpha, pathIndex, numPaths, cleanUp, queryFuncTimeout, log } = options
const { key, startingPeer, ourPeerId, signal, query, alpha, pathIndex, numPaths, cleanUp, queryFuncTimeout, log, peersSeen } = options
// Only ALPHA node/value lookups are allowed at any given time for each process
// https://github.com/libp2p/specs/tree/master/kad-dht#alpha-concurrency-parameter-%CE%B1
const queue = new Queue({
Expand All @@ -88,9 +93,6 @@ export async function * queryPath (options: QueryPathOptions) {
// perform lookups on kadId, not the actual value
const kadId = await convertBuffer(key)

// make sure we don't get trapped in a loop
const peersSeen = new Set()

/**
* Adds the passed peer to the query queue if it's not us and no
* other path has passed through this peer
Expand Down