Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
fix: use events to delay before self-query (#478)
Browse files Browse the repository at this point in the history
Instead of debouncing and using timeouts to wait for DHT peers before
running the initial self-query, instead get the routing table to emit
events when peers are added or removed - if the table is empty when
we run the self-query, wait for the `peer:add` event before
continuing.

Improves startup time.

Adds tests for the query-self component.
  • Loading branch information
achingbrain authored May 26, 2023
1 parent b70076a commit 46313a8
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 92 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@
"it-take": "^3.0.1",
"multiformats": "^11.0.0",
"p-defer": "^4.0.0",
"p-event": "^5.0.1",
"p-queue": "^7.3.4",
"private-ip": "^3.0.0",
"progress-events": "^1.0.0",
Expand Down
10 changes: 6 additions & 4 deletions src/kad-dht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,11 @@ export class DefaultKadDHT extends EventEmitter<PeerDiscoveryEvents> implements
this.queryManager.start(),
this.network.start(),
this.routingTable.start(),
this.topologyListener.start(),
this.querySelf.start()
this.topologyListener.start()
])

this.querySelf.start()

await this.routingTableRefresh.start()
}

Expand All @@ -299,14 +300,15 @@ export class DefaultKadDHT extends EventEmitter<PeerDiscoveryEvents> implements
async stop (): Promise<void> {
this.running = false

this.querySelf.stop()

await Promise.all([
this.providers.stop(),
this.queryManager.stop(),
this.network.stop(),
this.routingTable.stop(),
this.routingTableRefresh.stop(),
this.topologyListener.stop(),
this.querySelf.stop()
this.topologyListener.stop()
])
}

Expand Down
148 changes: 63 additions & 85 deletions src/query-self.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import { anySignal } from 'any-signal'
import length from 'it-length'
import { pipe } from 'it-pipe'
import take from 'it-take'
import pDefer from 'p-defer'
import { pEvent } from 'p-event'
import { QUERY_SELF_INTERVAL, QUERY_SELF_TIMEOUT, K, QUERY_SELF_INITIAL_INTERVAL } from './constants.js'
import type { KadDHTComponents } from './index.js'
import type { PeerRouting } from './peer-routing/index.js'
import type { RoutingTable } from './routing-table/index.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Startable } from '@libp2p/interfaces/startable'
import type { DeferredPromise } from 'p-defer'

Expand All @@ -22,44 +24,33 @@ export interface QuerySelfInit {
initialQuerySelfHasRun: DeferredPromise<void>
}

function debounce (func: () => void, wait: number): () => void {
let timeout: ReturnType<typeof setTimeout> | undefined

return function () {
const later = function (): void {
timeout = undefined
func()
}

clearTimeout(timeout)
timeout = setTimeout(later, wait)
}
export interface QuerySelfComponents {
peerId: PeerId
}

/**
* Receives notifications of new peers joining the network that support the DHT protocol
*/
export class QuerySelf implements Startable {
private readonly log: Logger
private readonly components: KadDHTComponents
private readonly components: QuerySelfComponents
private readonly peerRouting: PeerRouting
private readonly routingTable: RoutingTable
private readonly count: number
private readonly interval: number
private readonly initialInterval: number
private readonly queryTimeout: number
private started: boolean
private running: boolean
private timeoutId?: NodeJS.Timer
private controller?: AbortController
private initialQuerySelfHasRun?: DeferredPromise<void>
private querySelfPromise?: DeferredPromise<void>

constructor (components: KadDHTComponents, init: QuerySelfInit) {
constructor (components: QuerySelfComponents, init: QuerySelfInit) {
const { peerRouting, lan, count, interval, queryTimeout, routingTable } = init

this.components = components
this.log = logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:query-self`)
this.running = false
this.started = false
this.peerRouting = peerRouting
this.routingTable = routingTable
Expand All @@ -68,25 +59,28 @@ export class QuerySelf implements Startable {
this.initialInterval = init.initialInterval ?? QUERY_SELF_INITIAL_INTERVAL
this.queryTimeout = queryTimeout ?? QUERY_SELF_TIMEOUT
this.initialQuerySelfHasRun = init.initialQuerySelfHasRun

this.querySelf = debounce(this.querySelf.bind(this), 100)
}

isStarted (): boolean {
return this.started
}

async start (): Promise<void> {
start (): void {
if (this.started) {
return
}

this.started = true
clearTimeout(this.timeoutId)
this.timeoutId = setTimeout(this.querySelf.bind(this), this.initialInterval)
this.timeoutId = setTimeout(() => {
this.querySelf()
.catch(err => {
this.log.error('error running self-query', err)
})
}, this.initialInterval)
}

async stop (): Promise<void> {
stop (): void {
this.started = false

if (this.timeoutId != null) {
Expand All @@ -98,84 +92,68 @@ export class QuerySelf implements Startable {
}
}

querySelf (): void {
async querySelf (): Promise<void> {
if (!this.started) {
this.log('skip self-query because we are not started')
return
}

if (this.running) {
this.log('skip self-query because we are already running, will run again in %dms', this.interval)
return
if (this.querySelfPromise != null) {
this.log('joining existing self query')
return this.querySelfPromise.promise
}

if (this.routingTable.size === 0) {
let nextInterval = this.interval
this.querySelfPromise = pDefer()

if (this.initialQuerySelfHasRun != null) {
// if we've not yet run the first self query, shorten the interval until we try again
nextInterval = this.initialInterval
}

this.log('skip self-query because routing table is empty, will run again in %dms', nextInterval)
clearTimeout(this.timeoutId)
this.timeoutId = setTimeout(this.querySelf.bind(this), nextInterval)
return
if (this.routingTable.size === 0) {
// wait to discover at least one DHT peer
await pEvent(this.routingTable, 'peer:add')
}

this.running = true
if (this.started) {
this.controller = new AbortController()
const signal = anySignal([this.controller.signal, AbortSignal.timeout(this.queryTimeout)])

Promise.resolve()
.then(async () => {
if (!this.started) {
this.log('not running self-query - node stopped before query started')
return
// this controller will get used for lots of dial attempts so make sure we don't cause warnings to be logged
try {
if (setMaxListeners != null) {
setMaxListeners(Infinity, signal)
}
} catch {} // fails on node < 15.4

this.controller = new AbortController()
const signal = anySignal([this.controller.signal, AbortSignal.timeout(this.queryTimeout)])

// this controller will get used for lots of dial attempts so make sure we don't cause warnings to be logged
try {
if (setMaxListeners != null) {
setMaxListeners(Infinity, signal)
}
} catch {} // fails on node < 15.4

try {
this.log('run self-query, look for %d peers timing out after %dms', this.count, this.queryTimeout)

const found = await pipe(
this.peerRouting.getClosestPeers(this.components.peerId.toBytes(), {
signal,
isSelfQuery: true
}),
(source) => take(source, this.count),
async (source) => length(source)
)

this.log('self-query ran successfully - found %d peers', found)

if (this.initialQuerySelfHasRun != null) {
this.initialQuerySelfHasRun.resolve()
this.initialQuerySelfHasRun = undefined
}
} catch (err: any) {
this.log.error('self-query error', err)
} finally {
signal.clear()
}
}).catch(err => {
this.log('self-query error', err)
}).finally(() => {
this.running = false
try {
this.log('run self-query, look for %d peers timing out after %dms', this.count, this.queryTimeout)

const found = await pipe(
this.peerRouting.getClosestPeers(this.components.peerId.toBytes(), {
signal,
isSelfQuery: true
}),
(source) => take(source, this.count),
async (source) => length(source)
)

clearTimeout(this.timeoutId)
this.log('self-query ran successfully - found %d peers', found)

if (this.started) {
this.log('running self-query again in %dms', this.interval)
this.timeoutId = setTimeout(this.querySelf.bind(this), this.interval)
if (this.initialQuerySelfHasRun != null) {
this.initialQuerySelfHasRun.resolve()
this.initialQuerySelfHasRun = undefined
}
})
} catch (err: any) {
this.log.error('self-query error', err)
} finally {
signal.clear()
}
}

this.querySelfPromise.resolve()
this.querySelfPromise = undefined

this.timeoutId = setTimeout(() => {
this.querySelf()
.catch(err => {
this.log.error('error running self-query', err)
})
}, this.interval)
}
}
18 changes: 15 additions & 3 deletions src/routing-table/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { EventEmitter } from '@libp2p/interfaces/events'
import { logger } from '@libp2p/logger'
import { PeerSet } from '@libp2p/peer-collections'
import Queue from 'p-queue'
Expand Down Expand Up @@ -33,11 +34,16 @@ export interface RoutingTableComponents {
metrics?: Metrics
}

export interface RoutingTableEvents {
'peer:add': CustomEvent<PeerId>
'peer:remove': CustomEvent<PeerId>
}

/**
* A wrapper around `k-bucket`, to provide easy store and
* retrieval for peers.
*/
export class RoutingTable implements Startable {
export class RoutingTable extends EventEmitter<RoutingTableEvents> implements Startable {
public kBucketSize: number
public kb?: KBucket
public pingQueue: Queue
Expand All @@ -58,6 +64,8 @@ export class RoutingTable implements Startable {
}

constructor (components: RoutingTableComponents, init: RoutingTableInit) {
super()

const { kBucketSize, pingTimeout, lan, pingConcurrency, protocol, tagName, tagValue } = init

this.components = components
Expand Down Expand Up @@ -160,11 +168,15 @@ export class RoutingTable implements Startable {
kClosest = newClosest
})

kBuck.addEventListener('added', () => {
kBuck.addEventListener('added', (evt) => {
updatePeerTags()

this.safeDispatchEvent('peer:add', { detail: evt.detail.peer })
})
kBuck.addEventListener('removed', () => {
kBuck.addEventListener('removed', (evt) => {
updatePeerTags()

this.safeDispatchEvent('peer:remove', { detail: evt.detail.peer })
})
}

Expand Down
Loading

0 comments on commit 46313a8

Please sign in to comment.