Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: replace p-queue with less restrictive queue #2339

Merged
merged 8 commits into from
Jan 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions packages/interface/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ export class CodeError<T extends Record<string, any> = Record<string, never>> ex
}
}

export class AggregateCodeError<T extends Record<string, any> = Record<string, never>> extends AggregateError {
public readonly props: T

constructor (
errors: Error[],
message: string,
public readonly code: string,
props?: T
) {
super(errors, message)

this.name = props?.name ?? 'AggregateCodeError'
this.props = props ?? {} as T // eslint-disable-line @typescript-eslint/consistent-type-assertions
}
}

export class UnexpectedPeerError extends Error {
public code: string

Expand Down
2 changes: 1 addition & 1 deletion packages/interface/src/event-target.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export class TypedEventEmitter<EventMap extends Record<string, any>> extends Eve
return result
}

safeDispatchEvent<Detail>(type: keyof EventMap, detail: CustomEventInit<Detail>): boolean {
safeDispatchEvent<Detail>(type: keyof EventMap, detail: CustomEventInit<Detail> = {}): boolean {
return this.dispatchEvent(new CustomEvent<Detail>(type as string, detail))
}
}
Expand Down
33 changes: 14 additions & 19 deletions packages/kad-dht/src/routing-table/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { CodeError, TypedEventEmitter } from '@libp2p/interface'
import { PeerSet } from '@libp2p/peer-collections'
import { PeerJobQueue } from '@libp2p/utils/peer-job-queue'
import { PeerQueue } from '@libp2p/utils/peer-queue'
import { pbStream } from 'it-protobuf-stream'
import { Message, MessageType } from '../message/dht.js'
import * as utils from '../utils.js'
Expand Down Expand Up @@ -44,7 +44,7 @@
export class RoutingTable extends TypedEventEmitter<RoutingTableEvents> implements Startable {
public kBucketSize: number
public kb?: KBucket
public pingQueue: PeerJobQueue
public pingQueue: PeerQueue<boolean>

private readonly log: Logger
private readonly components: RoutingTableComponents
Expand All @@ -56,8 +56,6 @@
private readonly tagValue: number
private readonly metrics?: {
routingTableSize: Metric
pingQueueSize: Metric
pingRunning: Metric
}

constructor (components: RoutingTableComponents, init: RoutingTableInit) {
Expand All @@ -75,23 +73,18 @@
this.tagName = tagName ?? KAD_CLOSE_TAG_NAME
this.tagValue = tagValue ?? KAD_CLOSE_TAG_VALUE

const updatePingQueueSizeMetric = (): void => {
this.metrics?.pingQueueSize.update(this.pingQueue.size)
this.metrics?.pingRunning.update(this.pingQueue.pending)
}

this.pingQueue = new PeerJobQueue({ concurrency: this.pingConcurrency })
this.pingQueue.addListener('add', updatePingQueueSizeMetric)
this.pingQueue.addListener('next', updatePingQueueSizeMetric)
this.pingQueue.addListener('error', err => {
this.log.error('error pinging peer', err)
this.pingQueue = new PeerQueue({
concurrency: this.pingConcurrency,
metricName: `${logPrefix.replaceAll(':', '_')}_ping_queue`,
metrics: this.components.metrics
})
this.pingQueue.addEventListener('error', evt => {
this.log.error('error pinging peer', evt.detail)

Check warning on line 82 in packages/kad-dht/src/routing-table/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/routing-table/index.ts#L82

Added line #L82 was not covered by tests
})

if (this.components.metrics != null) {
this.metrics = {
routingTableSize: this.components.metrics.registerMetric(`${logPrefix.replaceAll(':', '_')}_routing_table_size`),
pingQueueSize: this.components.metrics.registerMetric(`${logPrefix.replaceAll(':', '_')}_ping_queue_size`),
pingRunning: this.components.metrics.registerMetric(`${logPrefix.replaceAll(':', '_')}_ping_running`)
routingTableSize: this.components.metrics.registerMetric(`${logPrefix.replaceAll(':', '_')}_routing_table_size`)

Check warning on line 87 in packages/kad-dht/src/routing-table/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/routing-table/index.ts#L87

Added line #L87 was not covered by tests
}
}
}
Expand Down Expand Up @@ -204,8 +197,10 @@
const results = await Promise.all(
oldContacts.map(async oldContact => {
// if a previous ping wants us to ping this contact, re-use the result
if (this.pingQueue.hasJob(oldContact.peer)) {
return this.pingQueue.joinJob(oldContact.peer)
const pingJob = this.pingQueue.find(oldContact.peer)

if (pingJob != null) {
return pingJob.join()
}

return this.pingQueue.add(async () => {
Expand Down
1 change: 0 additions & 1 deletion packages/libp2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@
"merge-options": "^3.0.4",
"multiformats": "^13.0.0",
"p-defer": "^4.0.0",
"p-queue": "^8.0.0",
"private-ip": "^3.0.1",
"rate-limiter-flexible": "^4.0.0",
"uint8arraylist": "^2.4.3",
Expand Down
19 changes: 11 additions & 8 deletions packages/libp2p/src/connection-manager/auto-dial.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { PeerMap, PeerSet } from '@libp2p/peer-collections'
import { PeerJobQueue } from '@libp2p/utils/peer-job-queue'
import { PeerQueue } from '@libp2p/utils/peer-queue'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE, AUTO_DIAL_INTERVAL, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PEER_RETRY_THRESHOLD, AUTO_DIAL_PRIORITY, LAST_DIAL_FAILURE_KEY, MIN_CONNECTIONS } from './constants.js'
import type { Libp2pEvents, Logger, ComponentLogger, TypedEventTarget, PeerStore, Startable } from '@libp2p/interface'
import type { Libp2pEvents, Logger, ComponentLogger, TypedEventTarget, PeerStore, Startable, Metrics } from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal'

interface AutoDialInit {
Expand All @@ -20,6 +20,7 @@
peerStore: PeerStore
events: TypedEventTarget<Libp2pEvents>
logger: ComponentLogger
metrics?: Metrics
}

const defaultOptions = {
Expand All @@ -35,7 +36,7 @@
export class AutoDial implements Startable {
private readonly connectionManager: ConnectionManager
private readonly peerStore: PeerStore
private readonly queue: PeerJobQueue
private readonly queue: PeerQueue<void>
private readonly minConnections: number
private readonly autoDialPriority: number
private readonly autoDialIntervalMs: number
Expand Down Expand Up @@ -64,11 +65,13 @@
this.log = components.logger.forComponent('libp2p:connection-manager:auto-dial')
this.started = false
this.running = false
this.queue = new PeerJobQueue({
concurrency: init.autoDialConcurrency ?? defaultOptions.autoDialConcurrency
this.queue = new PeerQueue({
concurrency: init.autoDialConcurrency ?? defaultOptions.autoDialConcurrency,
metricName: 'libp2p_autodial_queue',
metrics: components.metrics
})
this.queue.addListener('error', (err) => {
this.log.error('error during auto-dial', err)
this.queue.addEventListener('error', (evt) => {
this.log.error('error during auto-dial', evt.detail)

Check warning on line 74 in packages/libp2p/src/connection-manager/auto-dial.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/connection-manager/auto-dial.ts#L74

Added line #L74 was not covered by tests
})

// check the min connection limit whenever a peer disconnects
Expand Down Expand Up @@ -179,7 +182,7 @@
}

// remove peers already in the autodial queue
if (this.queue.hasJob(peer.id)) {
if (this.queue.has(peer.id)) {
this.log.trace('not autodialing %p because they are already being autodialed', peer.id)
return false
}
Expand Down
Loading
Loading