Skip to content

Commit

Permalink
fix: ensure dht query is aborted on early exit (#2341)
Browse files Browse the repository at this point in the history
If query results are consumed from a `for await..of`-style loop,
and that loop is exited from before the results are complete, ensure
we abort any running sub-queries.
  • Loading branch information
achingbrain authored Jan 5, 2024
1 parent ba70899 commit 388d02b
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 22 deletions.
1 change: 1 addition & 0 deletions packages/kad-dht/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
"private-ip": "^3.0.1",
"progress-events": "^1.0.0",
"protons-runtime": "^5.0.0",
"race-signal": "^1.0.2",
"uint8-varint": "^2.0.0",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^5.0.0"
Expand Down
37 changes: 24 additions & 13 deletions packages/kad-dht/src/query/manager.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { AbortError, TypedEventEmitter, CustomEvent, setMaxListeners } from '@libp2p/interface'
import { TypedEventEmitter, CustomEvent, setMaxListeners } from '@libp2p/interface'
import { PeerSet } from '@libp2p/peer-collections'
import { anySignal } from 'any-signal'
import merge from 'it-merge'
import { raceSignal } from 'race-signal'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import {
ALPHA, K, DEFAULT_QUERY_TIMEOUT
Expand Down Expand Up @@ -127,7 +128,16 @@ export class QueryManager implements Startable {
}
}

const signal = anySignal([this.shutDownController.signal, options.signal])
// if the user breaks out of a for..await of loop iterating over query
// results we need to cancel any in-flight network requests
const queryEarlyExitController = new AbortController()
setMaxListeners(Infinity, queryEarlyExitController.signal)

const signal = anySignal([
this.shutDownController.signal,
queryEarlyExitController.signal,
options.signal
])

// this signal will get listened to for every invocation of queryFunc
// so make sure we don't make a lot of noise in the logs
Expand All @@ -138,19 +148,13 @@ export class QueryManager implements Startable {
// query a subset of peers up to `kBucketSize / 2` in length
const startTime = Date.now()
const cleanUp = new TypedEventEmitter<CleanUpEvents>()
let queryFinished = false

try {
if (options.isSelfQuery !== true && this.initialQuerySelfHasRun != null) {
log('waiting for initial query-self query before continuing')

await Promise.race([
new Promise((resolve, reject) => {
signal.addEventListener('abort', () => {
reject(new AbortError('Query was aborted before self-query ran'))
})
}),
this.initialQuerySelfHasRun.promise
])
await raceSignal(this.initialQuerySelfHasRun.promise, signal)

this.initialQuerySelfHasRun = undefined
}
Expand Down Expand Up @@ -192,19 +196,26 @@ export class QueryManager implements Startable {

// Execute the query along each disjoint path and yield their results as they become available
for await (const event of merge(...paths)) {
yield event

if (event.name === 'QUERY_ERROR') {
log('error', event.error)
log.error('query error', event.error)
}

yield event
}

queryFinished = true
} catch (err: any) {
if (!this.running && err.code === 'ERR_QUERY_ABORTED') {
// ignore query aborted errors that were thrown during query manager shutdown
} else {
throw err
}
} finally {
if (!queryFinished) {
log('query exited early')
queryEarlyExitController.abort()
}

signal.clear()

this.queries--
Expand Down
60 changes: 51 additions & 9 deletions packages/kad-dht/test/query.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { QueryManager, type QueryManagerInit } from '../src/query/manager.js'
import { convertBuffer } from '../src/utils.js'
import { createPeerId, createPeerIds } from './utils/create-peer-id.js'
import { sortClosestPeers } from './utils/sort-closest-peers.js'
import type { QueryFunc } from '../src/query/types.js'
import type { QueryContext, QueryFunc } from '../src/query/types.js'
import type { RoutingTable } from '../src/routing-table/index.js'
import type { PeerId } from '@libp2p/interface'

Expand All @@ -29,12 +29,9 @@ interface TopologyEntry {
value?: Uint8Array
closerPeers?: number[]
event: QueryEvent
context?: QueryContext
}
type Topology = Record<string, {
delay?: number | undefined
error?: Error | undefined
event: QueryEvent
}>
type Topology = Record<string, TopologyEntry>

describe('QueryManager', () => {
let ourPeerId: PeerId
Expand All @@ -55,7 +52,7 @@ describe('QueryManager', () => {
}

function createTopology (opts: Record<number, { delay?: number, error?: Error, value?: Uint8Array, closerPeers?: number[] }>): Topology {
const topology: Record<string, { delay?: number, error?: Error, event: QueryEvent }> = {}
const topology: Topology = {}

Object.keys(opts).forEach(key => {
const id = parseInt(key)
Expand Down Expand Up @@ -94,9 +91,12 @@ describe('QueryManager', () => {
return topology
}

function createQueryFunction (topology: Record<string, { delay?: number, event: QueryEvent }>): QueryFunc {
const queryFunc: QueryFunc = async function * ({ peer }) {
function createQueryFunction (topology: Topology): QueryFunc {
const queryFunc: QueryFunc = async function * (context) {
const { peer } = context

const res = topology[peer.toString()]
res.context = context

if (res.delay != null) {
await delay(res.delay)
Expand Down Expand Up @@ -870,4 +870,46 @@ describe('QueryManager', () => {

await manager.stop()
})

it('should abort the query if we break out of the loop early', async () => {
const manager = new QueryManager({
peerId: ourPeerId,
logger: defaultLogger()
}, {
...defaultInit(),
disjointPaths: 2
})
await manager.start()

// 1 -> 0 [pathComplete]
// 4 -> 3 [delay] -> 2 [pathComplete]
const topology = createTopology({
// quick value path
0: { delay: 10, value: uint8ArrayFromString('true') },
1: { closerPeers: [0] },
// slow value path
2: { value: uint8ArrayFromString('true') },
3: { delay: 1000, closerPeers: [2] },
4: { closerPeers: [3] }
})

routingTable.closestPeers.returns([peers[1], peers[4]])

for await (const event of manager.run(key, createQueryFunction(topology))) {
if (event.name === 'VALUE') {
expect(event.from.toString()).to.equal(peers[0].toString())

// break out of loop early
break
}
}

// should have aborted query on slow path
expect(topology[peers[3].toString()]).to.have.nested.property('context.signal.aborted', true)

// should not have visited the next peer on the slow path
expect(topology[peers[4].toString()]).to.not.have.property('context', true)

await manager.stop()
})
})

0 comments on commit 388d02b

Please sign in to comment.