From 2d57ae308085059a8cc7636147caa0b87202c016 Mon Sep 17 00:00:00 2001 From: chad Date: Thu, 9 Feb 2023 13:07:30 -0500 Subject: [PATCH 1/5] chore: linting src files --- src/content-fetching/index.ts | 18 +++--- src/content-routing/index.ts | 8 +-- src/dual-kad-dht.ts | 28 ++++----- src/kad-dht.ts | 32 +++++----- src/message/dht.ts | 4 +- src/message/index.ts | 10 +-- src/network.ts | 15 ++--- src/peer-list/index.ts | 10 +-- src/peer-list/peer-distance-list.ts | 8 +-- src/peer-routing/index.ts | 20 +++--- src/providers.ts | 38 +++++++----- src/query-self.ts | 8 +-- src/query/manager.ts | 10 +-- src/query/query-path.ts | 95 ++++++++++++----------------- src/routing-table/index.ts | 20 +++--- src/routing-table/refresh.ts | 21 ++++--- src/rpc/handlers/add-provider.ts | 2 +- src/rpc/handlers/find-node.ts | 2 +- src/rpc/handlers/get-providers.ts | 7 ++- src/rpc/handlers/get-value.ts | 6 +- src/rpc/handlers/ping.ts | 2 +- src/rpc/handlers/put-value.ts | 2 +- src/rpc/index.ts | 4 +- src/topology-listener.ts | 6 +- src/utils.ts | 24 ++++---- 25 files changed, 195 insertions(+), 205 deletions(-) diff --git a/src/content-fetching/index.ts b/src/content-fetching/index.ts index dd144d5e..31f42530 100644 --- a/src/content-fetching/index.ts +++ b/src/content-fetching/index.ts @@ -16,7 +16,7 @@ import { } from '../constants.js' import { createPutRecord, convertBuffer, bufferToRecordKey } from '../utils.js' import { logger } from '@libp2p/logger' -import type { Validators, Selectors, ValueEvent, QueryOptions } from '@libp2p/interface-dht' +import type { Validators, Selectors, ValueEvent, QueryOptions, DialingPeerEvent, PeerResponseEvent, QueryErrorEvent, SendingQueryEvent, QueryEvent } from '@libp2p/interface-dht' import type { PeerRouting } from '../peer-routing/index.js' import type { QueryManager } from '../query/manager.js' import type { RoutingTable } from '../routing-table/index.js' @@ -59,7 +59,7 @@ export class ContentFetching { this.network = network } - async putLocal (key: Uint8Array, rec: Uint8Array) { // eslint-disable-line require-await + async putLocal (key: Uint8Array, rec: Uint8Array): Promise { const dsKey = bufferToRecordKey(key) await this.components.datastore.put(dsKey, rec) } @@ -68,7 +68,7 @@ export class ContentFetching { * Attempt to retrieve the value for the given key from * the local datastore */ - async getLocal (key: Uint8Array) { + async getLocal (key: Uint8Array): Promise { this.log('getLocal %b', key) const dsKey = bufferToRecordKey(key) @@ -88,9 +88,9 @@ export class ContentFetching { /** * Send the best record found to any peers that have an out of date record */ - async * sendCorrectionRecord (key: Uint8Array, vals: ValueEvent[], best: Uint8Array, options: AbortOptions = {}) { + async * sendCorrectionRecord (key: Uint8Array, vals: ValueEvent[], best: Uint8Array, options: AbortOptions = {}): AsyncGenerator { this.log('sendCorrection for %b', key) - const fixupRec = await createPutRecord(key, best) + const fixupRec = createPutRecord(key, best) for (const { value, from } of vals) { // no need to do anything @@ -136,11 +136,11 @@ export class ContentFetching { /** * Store the given key/value pair in the DHT */ - async * put (key: Uint8Array, value: Uint8Array, options: AbortOptions = {}) { + async * put (key: Uint8Array, value: Uint8Array, options: AbortOptions = {}): AsyncGenerator { this.log('put key %b value %b', key, value) // create record in the dht format - const record = await createPutRecord(key, value) + const record = createPutRecord(key, value) // store the record locally const dsKey = bufferToRecordKey(key) @@ -192,7 +192,7 @@ export class ContentFetching { /** * Get the value to the given key */ - async * get (key: Uint8Array, options: QueryOptions = {}) { + async * get (key: Uint8Array, options: QueryOptions = {}): AsyncGenerator { this.log('get %b', key) const vals: ValueEvent[] = [] @@ -236,7 +236,7 @@ export class ContentFetching { /** * Get the `n` values to the given key without sorting */ - async * getMany (key: Uint8Array, options: QueryOptions = {}) { + async * getMany (key: Uint8Array, options: QueryOptions = {}): AsyncGenerator { this.log('getMany values for %b', key) try { diff --git a/src/content-routing/index.ts b/src/content-routing/index.ts index 8a2a0f01..20074e3d 100644 --- a/src/content-routing/index.ts +++ b/src/content-routing/index.ts @@ -10,7 +10,7 @@ import { providerEvent } from '../query/events.js' import { logger } from '@libp2p/logger' -import type { QueryEvent, QueryOptions } from '@libp2p/interface-dht' +import type { PeerResponseEvent, ProviderEvent, QueryEvent, QueryOptions } from '@libp2p/interface-dht' import type { PeerRouting } from '../peer-routing/index.js' import type { QueryManager } from '../query/manager.js' import type { RoutingTable } from '../routing-table/index.js' @@ -58,7 +58,7 @@ export class ContentRouting { * Announce to the network that we can provide the value for a given key and * are contactable on the given multiaddrs */ - async * provide (key: CID, multiaddrs: Multiaddr[], options: AbortOptions = {}) { + async * provide (key: CID, multiaddrs: Multiaddr[], options: AbortOptions = {}): AsyncGenerator { this.log('provide %s', key) // Add peer as provider @@ -124,7 +124,7 @@ export class ContentRouting { /** * Search the dht for up to `K` providers of the given CID. */ - async * findProviders (key: CID, options: QueryOptions) { + async * findProviders (key: CID, options: QueryOptions): AsyncGenerator { const toFind = this.routingTable.kBucketSize const target = key.multihash.bytes const id = await convertBuffer(target) @@ -147,7 +147,7 @@ export class ContentRouting { } yield peerResponseEvent({ from: this.components.peerId, messageType: MESSAGE_TYPE.GET_PROVIDERS, providers }) - yield providerEvent({ from: this.components.peerId, providers: providers }) + yield providerEvent({ from: this.components.peerId, providers }) } // All done diff --git a/src/dual-kad-dht.ts b/src/dual-kad-dht.ts index 69f3e66d..af6d15ef 100644 --- a/src/dual-kad-dht.ts +++ b/src/dual-kad-dht.ts @@ -3,7 +3,7 @@ import { CodeError } from '@libp2p/interfaces/errors' import merge from 'it-merge' import { queryErrorEvent } from './query/events.js' import type { KadDHT } from './kad-dht.js' -import type { DualDHT, QueryOptions } from '@libp2p/interface-dht' +import type { DualDHT, QueryEvent, QueryOptions } from '@libp2p/interface-dht' import type { AbortOptions } from '@libp2p/interfaces' import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events' import type { CID } from 'multiformats' @@ -47,35 +47,35 @@ export class DualKadDHT extends EventEmitter implements Dua return true } - get [Symbol.toStringTag] () { + get [Symbol.toStringTag] (): '@libp2p/dual-kad-dht' { return '@libp2p/dual-kad-dht' } /** * Is this DHT running. */ - isStarted () { + isStarted (): boolean { return this.wan.isStarted() && this.lan.isStarted() } /** * If 'server' this node will respond to DHT queries, if 'client' this node will not */ - async getMode () { + async getMode (): Promise<'client' | 'server'> { return await this.wan.getMode() } /** * If 'server' this node will respond to DHT queries, if 'client' this node will not */ - async setMode (mode: 'client' | 'server') { + async setMode (mode: 'client' | 'server'): Promise { await this.wan.setMode(mode) } /** * Start listening to incoming connections. */ - async start () { + async start (): Promise { await Promise.all([ this.lan.start(), this.wan.start() @@ -86,7 +86,7 @@ export class DualKadDHT extends EventEmitter implements Dua * Stop accepting incoming connections and sending outgoing * messages. */ - async stop () { + async stop (): Promise { await Promise.all([ this.lan.stop(), this.wan.stop() @@ -96,7 +96,7 @@ export class DualKadDHT extends EventEmitter implements Dua /** * Store the given key/value pair in the DHT */ - async * put (key: Uint8Array, value: Uint8Array, options: QueryOptions = {}) { + async * put (key: Uint8Array, value: Uint8Array, options: QueryOptions = {}): AsyncGenerator { for await (const event of merge( this.lan.put(key, value, options), this.wan.put(key, value, options) @@ -108,7 +108,7 @@ export class DualKadDHT extends EventEmitter implements Dua /** * Get the value that corresponds to the passed key */ - async * get (key: Uint8Array, options: QueryOptions = {}) { // eslint-disable-line require-await + async * get (key: Uint8Array, options: QueryOptions = {}): AsyncGenerator { // eslint-disable-line require-await let queriedPeers = false let foundValue = false @@ -152,7 +152,7 @@ export class DualKadDHT extends EventEmitter implements Dua /** * Announce to the network that we can provide given key's value */ - async * provide (key: CID, options: AbortOptions = {}) { // eslint-disable-line require-await + async * provide (key: CID, options: AbortOptions = {}): AsyncGenerator { let sent = 0 let success = 0 const errors = [] @@ -194,7 +194,7 @@ export class DualKadDHT extends EventEmitter implements Dua /** * Search the dht for up to `K` providers of the given CID */ - async * findProviders (key: CID, options: QueryOptions = {}) { + async * findProviders (key: CID, options: QueryOptions = {}): AsyncGenerator { yield * merge( this.lan.findProviders(key, options), this.wan.findProviders(key, options) @@ -206,7 +206,7 @@ export class DualKadDHT extends EventEmitter implements Dua /** * Search for a peer with the given ID */ - async * findPeer (id: PeerId, options: QueryOptions = {}) { + async * findPeer (id: PeerId, options: QueryOptions = {}): AsyncGenerator { let queriedPeers = false for await (const event of merge( @@ -228,14 +228,14 @@ export class DualKadDHT extends EventEmitter implements Dua /** * Kademlia 'node lookup' operation */ - async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}) { + async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}): AsyncGenerator { yield * merge( this.lan.getClosestPeers(key, options), this.wan.getClosestPeers(key, options) ) } - async refreshRoutingTable () { + async refreshRoutingTable (): Promise { await Promise.all([ this.lan.refreshRoutingTable(), this.wan.refreshRoutingTable() diff --git a/src/kad-dht.ts b/src/kad-dht.ts index 3108bc80..2660e462 100644 --- a/src/kad-dht.ts +++ b/src/kad-dht.ts @@ -14,7 +14,7 @@ import { removePublicAddresses } from './utils.js' import { Logger, logger } from '@libp2p/logger' -import type { QueryOptions, Validators, Selectors, DHT } from '@libp2p/interface-dht' +import type { QueryOptions, Validators, Selectors, DHT, QueryEvent } from '@libp2p/interface-dht' import type { PeerInfo } from '@libp2p/interface-peer-info' import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events' import type { PeerId } from '@libp2p/interface-peer-id' @@ -207,11 +207,11 @@ export class KadDHT extends EventEmitter implements DHT { return true } - get [Symbol.toStringTag] () { + get [Symbol.toStringTag] (): '@libp2p/kad-dht' { return '@libp2p/kad-dht' } - async onPeerConnect (peerData: PeerInfo) { + async onPeerConnect (peerData: PeerInfo): Promise { this.log('peer %p connected with protocols %s', peerData.id, peerData.protocols) if (this.lan) { @@ -235,21 +235,21 @@ export class KadDHT extends EventEmitter implements DHT { /** * Is this DHT running. */ - isStarted () { + isStarted (): boolean { return this.running } /** * If 'server' this node will respond to DHT queries, if 'client' this node will not */ - async getMode () { + async getMode (): Promise<'client' | 'server'> { return this.clientMode ? 'client' : 'server' } /** * If 'server' this node will respond to DHT queries, if 'client' this node will not */ - async setMode (mode: 'client' | 'server') { + async setMode (mode: 'client' | 'server'): Promise { await this.components.registrar.unhandle(this.protocol) if (mode === 'client') { @@ -268,7 +268,7 @@ export class KadDHT extends EventEmitter implements DHT { /** * Start listening to incoming connections. */ - async start () { + async start (): Promise { this.running = true // Only respond to queries when not in client mode @@ -290,7 +290,7 @@ export class KadDHT extends EventEmitter implements DHT { * Stop accepting incoming connections and sending outgoing * messages. */ - async stop () { + async stop (): Promise { this.running = false await Promise.all([ @@ -307,14 +307,14 @@ export class KadDHT extends EventEmitter implements DHT { /** * Store the given key/value pair in the DHT */ - async * put (key: Uint8Array, value: Uint8Array, options: QueryOptions = {}) { // eslint-disable-line require-await + async * put (key: Uint8Array, value: Uint8Array, options: QueryOptions = {}): AsyncGenerator { yield * this.contentFetching.put(key, value, options) } /** * Get the value that corresponds to the passed key */ - async * get (key: Uint8Array, options: QueryOptions = {}) { // eslint-disable-line require-await + async * get (key: Uint8Array, options: QueryOptions = {}): AsyncGenerator { yield * this.contentFetching.get(key, options) } @@ -323,14 +323,14 @@ export class KadDHT extends EventEmitter implements DHT { /** * Announce to the network that we can provide given key's value */ - async * provide (key: CID, options: QueryOptions = {}) { // eslint-disable-line require-await + async * provide (key: CID, options: QueryOptions = {}): AsyncGenerator { yield * this.contentRouting.provide(key, this.components.addressManager.getAddresses(), options) } /** * Search the dht for providers of the given CID */ - async * findProviders (key: CID, options: QueryOptions = {}) { + async * findProviders (key: CID, options: QueryOptions = {}): AsyncGenerator { yield * this.contentRouting.findProviders(key, options) } @@ -339,18 +339,18 @@ export class KadDHT extends EventEmitter implements DHT { /** * Search for a peer with the given ID */ - async * findPeer (id: PeerId, options: QueryOptions = {}) { // eslint-disable-line require-await + async * findPeer (id: PeerId, options: QueryOptions = {}): AsyncGenerator { yield * this.peerRouting.findPeer(id, options) } /** * Kademlia 'node lookup' operation */ - async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}) { + async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}): AsyncGenerator { yield * this.peerRouting.getClosestPeers(key, options) } - async refreshRoutingTable () { - await this.routingTableRefresh.refreshTable(true) + async refreshRoutingTable (): Promise { + this.routingTableRefresh.refreshTable(true) } } diff --git a/src/message/dht.ts b/src/message/dht.ts index afbe4e20..a88b7f4e 100644 --- a/src/message/dht.ts +++ b/src/message/dht.ts @@ -126,7 +126,7 @@ export namespace Message { } export namespace MessageType { - export const codec = () => { + export const codec = (): Codec => { return enumeration(__MessageTypeValues) } } @@ -146,7 +146,7 @@ export namespace Message { } export namespace ConnectionType { - export const codec = () => { + export const codec = (): Codec => { return enumeration(__ConnectionTypeValues) } } diff --git a/src/message/index.ts b/src/message/index.ts index 1d898cd4..203f0d51 100644 --- a/src/message/index.ts +++ b/src/message/index.ts @@ -42,7 +42,7 @@ export class Message { /** * @type {number} */ - get clusterLevel () { + get clusterLevel (): number { const level = this.clusterLevelRaw - 1 if (level < 0) { return 0 @@ -58,7 +58,7 @@ export class Message { /** * Encode into protobuf */ - serialize () { + serialize (): Uint8Array { return PBMessage.encode({ key: this.key, type: this.type, @@ -72,7 +72,7 @@ export class Message { /** * Decode from protobuf */ - static deserialize (raw: Uint8ArrayList | Uint8Array) { + static deserialize (raw: Uint8ArrayList | Uint8Array): Message { const dec = PBMessage.decode(raw) const msg = new Message(dec.type ?? PBMessage.MessageType.PUT_VALUE, dec.key ?? Uint8Array.from([]), dec.clusterLevelRaw ?? 0) @@ -87,7 +87,7 @@ export class Message { } } -function toPbPeer (peer: PeerInfo) { +function toPbPeer (peer: PeerInfo): PBPeer { const output: PBPeer = { id: peer.id.toBytes(), addrs: (peer.multiaddrs ?? []).map((m) => m.bytes), @@ -97,7 +97,7 @@ function toPbPeer (peer: PeerInfo) { return output } -function fromPbPeer (peer: PBMessage.Peer) { +function fromPbPeer (peer: PBMessage.Peer): PeerInfo { if (peer.id == null) { throw new Error('Invalid peer in message') } diff --git a/src/network.ts b/src/network.ts index 5652b29b..e12ea2ac 100644 --- a/src/network.ts +++ b/src/network.ts @@ -22,6 +22,7 @@ import type { Stream } from '@libp2p/interface-connection' import { abortableDuplex } from 'abortable-iterator' import type { Uint8ArrayList } from 'uint8arraylist' import type { KadDHTComponents } from './index.js' +import type { QueryEvent } from '@libp2p/interface-dht' export interface NetworkInit { protocol: string @@ -57,7 +58,7 @@ export class Network extends EventEmitter implements Startable { /** * Start the network */ - async start () { + async start (): Promise { if (this.running) { return } @@ -68,21 +69,21 @@ export class Network extends EventEmitter implements Startable { /** * Stop all network activity */ - async stop () { + async stop (): Promise { this.running = false } /** * Is the network online? */ - isStarted () { + isStarted (): boolean { return this.running } /** * Send a request and record RTT for latency measurements */ - async * sendRequest (to: PeerId, msg: Message, options: AbortOptions = {}) { + async * sendRequest (to: PeerId, msg: Message, options: AbortOptions = {}): AsyncGenerator { if (!this.running) { return } @@ -118,7 +119,7 @@ export class Network extends EventEmitter implements Startable { /** * Sends a message without expecting an answer */ - async * sendMessage (to: PeerId, msg: Message, options: AbortOptions = {}) { + async * sendMessage (to: PeerId, msg: Message, options: AbortOptions = {}): AsyncGenerator { if (!this.running) { return } @@ -148,7 +149,7 @@ export class Network extends EventEmitter implements Startable { /** * Write a message to the given stream */ - async _writeMessage (stream: Duplex, msg: Uint8Array | Uint8ArrayList, options: AbortOptions) { + async _writeMessage (stream: Duplex, msg: Uint8Array | Uint8ArrayList, options: AbortOptions): Promise { if (options.signal != null) { stream = abortableDuplex(stream, options.signal) } @@ -166,7 +167,7 @@ export class Network extends EventEmitter implements Startable { * If no response is received after the specified timeout * this will error out. */ - async _writeReadMessage (stream: Duplex, msg: Uint8Array | Uint8ArrayList, options: AbortOptions) { + async _writeReadMessage (stream: Duplex, msg: Uint8Array | Uint8ArrayList, options: AbortOptions): Promise { if (options.signal != null) { stream = abortableDuplex(stream, options.signal) } diff --git a/src/peer-list/index.ts b/src/peer-list/index.ts index 92eb5a6f..7298a70c 100644 --- a/src/peer-list/index.ts +++ b/src/peer-list/index.ts @@ -13,7 +13,7 @@ export class PeerList { /** * Add a new peer. Returns `true` if it was a new one */ - push (peerId: PeerId) { + push (peerId: PeerId): boolean { if (!this.has(peerId)) { this.list.push(peerId) @@ -26,7 +26,7 @@ export class PeerList { /** * Check if this PeerInfo is already in here */ - has (peerId: PeerId) { + has (peerId: PeerId): boolean { const match = this.list.find((i) => i.equals(peerId)) return Boolean(match) } @@ -34,21 +34,21 @@ export class PeerList { /** * Get the list as an array */ - toArray () { + toArray (): PeerId[] { return this.list.slice() } /** * Remove the last element */ - pop () { + pop (): PeerId | undefined { return this.list.pop() } /** * The length of the list */ - get length () { + get length (): number { return this.list.length } } diff --git a/src/peer-list/peer-distance-list.ts b/src/peer-list/peer-distance-list.ts index d925adcf..072e655d 100644 --- a/src/peer-list/peer-distance-list.ts +++ b/src/peer-list/peer-distance-list.ts @@ -33,21 +33,21 @@ export class PeerDistanceList { /** * The length of the list */ - get length () { + get length (): number { return this.peerDistances.length } /** * The peerIds in the list, in order of distance from the origin key */ - get peers () { + get peers (): PeerId[] { return this.peerDistances.map(pd => pd.peerId) } /** * Add a peerId to the list. */ - async add (peerId: PeerId) { + async add (peerId: PeerId): Promise { if (this.peerDistances.find(pd => pd.peerId.equals(peerId)) != null) { return } @@ -67,7 +67,7 @@ export class PeerDistanceList { * Indicates whether any of the peerIds passed as a parameter are closer * to the origin key than the furthest peerId in the PeerDistanceList. */ - async anyCloser (peerIds: PeerId[]) { + async anyCloser (peerIds: PeerId[]): Promise { if (peerIds.length === 0) { return false } diff --git a/src/peer-routing/index.ts b/src/peer-routing/index.ts index 4b6eeba1..98837057 100644 --- a/src/peer-routing/index.ts +++ b/src/peer-routing/index.ts @@ -13,7 +13,7 @@ import { Libp2pRecord } from '@libp2p/record' import { logger } from '@libp2p/logger' import { keys } from '@libp2p/crypto' import { peerIdFromKeys } from '@libp2p/peer-id' -import type { DHTRecord, QueryOptions, Validators } from '@libp2p/interface-dht' +import type { DHTRecord, DialingPeerEvent, FinalPeerEvent, PeerResponseEvent, QueryErrorEvent, QueryEvent, QueryOptions, SendingQueryEvent, Validators, ValueEvent } from '@libp2p/interface-dht' import type { RoutingTable } from '../routing-table/index.js' import type { QueryManager } from '../query/manager.js' import type { Network } from '../network.js' @@ -97,7 +97,7 @@ export class PeerRouting { /** * Get a value via rpc call for the given parameters */ - async * _getValueSingle (peer: PeerId, key: Uint8Array, options: AbortOptions = {}) { // eslint-disable-line require-await + async * _getValueSingle (peer: PeerId, key: Uint8Array, options: AbortOptions = {}): AsyncGenerator { const msg = new Message(MESSAGE_TYPE.GET_VALUE, key, 0) yield * this.network.sendRequest(peer, msg, options) } @@ -105,7 +105,7 @@ export class PeerRouting { /** * Get the public key directly from a node */ - async * getPublicKeyFromNode (peer: PeerId, options: AbortOptions = {}) { + async * getPublicKeyFromNode (peer: PeerId, options: AbortOptions = {}): AsyncGenerator { const pkKey = utils.keyForPublicKey(peer) for await (const event of this._getValueSingle(peer, pkKey, options)) { @@ -133,7 +133,7 @@ export class PeerRouting { /** * Search for a peer with the given ID */ - async * findPeer (id: PeerId, options: QueryOptions = {}) { + async * findPeer (id: PeerId, options: QueryOptions = {}): AsyncGenerator { this.log('findPeer %p', id) // Try to find locally @@ -215,14 +215,14 @@ export class PeerRouting { * Kademlia 'node lookup' operation on a key, which could be a the * bytes from a multihash or a peer ID */ - async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}) { + async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}): AsyncGenerator { this.log('getClosestPeers to %b', key) const id = await utils.convertBuffer(key) const tablePeers = this.routingTable.closestPeers(id) const self = this // eslint-disable-line @typescript-eslint/no-this-alias const peers = new PeerDistanceList(id, this.routingTable.kBucketSize) - await Promise.all(tablePeers.map(async peer => await peers.add(peer))) + await Promise.all(tablePeers.map(async peer => { await peers.add(peer) })) const getCloserPeersQuery: QueryFunc = async function * ({ peer, signal }) { self.log('closerPeersSingle %s from %p', uint8ArrayToString(key, 'base32'), peer) @@ -235,7 +235,7 @@ export class PeerRouting { yield event if (event.name === 'PEER_RESPONSE') { - await Promise.all(event.closer.map(async peerData => await peers.add(peerData.id))) + await Promise.all(event.closer.map(async peerData => { await peers.add(peerData.id) })) } } @@ -259,7 +259,7 @@ export class PeerRouting { * * Note: The peerStore is updated with new addresses found for the given peer. */ - async * getValueOrPeers (peer: PeerId, key: Uint8Array, options: AbortOptions = {}) { + async * getValueOrPeers (peer: PeerId, key: Uint8Array, options: AbortOptions = {}): AsyncGenerator { for await (const event of this._getValueSingle(peer, key, options)) { if (event.name === 'PEER_RESPONSE') { if (event.record != null) { @@ -284,7 +284,7 @@ export class PeerRouting { * Verify a record, fetching missing public keys from the network. * Throws an error if the record is invalid. */ - async _verifyRecordOnline (record: DHTRecord) { + async _verifyRecordOnline (record: DHTRecord): Promise { if (record.timeReceived == null) { throw new CodeError('invalid record received', 'ERR_INVALID_RECORD') } @@ -296,7 +296,7 @@ export class PeerRouting { * Get the nearest peers to the given query, but if closer * than self */ - async getCloserPeersOffline (key: Uint8Array, closerThan: PeerId) { + async getCloserPeersOffline (key: Uint8Array, closerThan: PeerId): Promise { const id = await utils.convertBuffer(key) const ids = this.routingTable.closestPeers(id) const output: PeerInfo[] = [] diff --git a/src/providers.ts b/src/providers.ts index c0675e29..14ffb57b 100644 --- a/src/providers.ts +++ b/src/providers.ts @@ -66,14 +66,14 @@ export class Providers implements Startable { this.started = false } - isStarted () { + isStarted (): boolean { return this.started } /** * Start the provider cleanup service */ - async start () { + async start (): Promise { if (this.started) { return } @@ -93,7 +93,7 @@ export class Providers implements Startable { /** * Release any resources. */ - async stop () { + async stop (): Promise { this.started = false if (this.cleaner != null) { @@ -105,8 +105,8 @@ export class Providers implements Startable { /** * Check all providers if they are still valid, and if not delete them */ - async _cleanup () { - return await this.syncQueue.add(async () => { + async _cleanup (): Promise { + await this.syncQueue.add(async () => { const start = Date.now() let count = 0 @@ -174,7 +174,7 @@ export class Providers implements Startable { /** * Get the currently known provider peer ids for a given CID */ - async _getProvidersMap (cid: CID) { + async _getProvidersMap (cid: CID): Promise> { const cacheKey = makeProviderKey(cid) let provs: Map = this.cache.get(cacheKey) @@ -189,8 +189,8 @@ export class Providers implements Startable { /** * Add a new provider for the given CID */ - async addProvider (cid: CID, provider: PeerId) { - return await this.syncQueue.add(async () => { + async addProvider (cid: CID, provider: PeerId): Promise { + await this.syncQueue.add(async () => { log('%p provides %s', provider, cid) const provs = await this._getProvidersMap(cid) @@ -209,21 +209,27 @@ export class Providers implements Startable { * Get a list of providers for the given CID */ async getProviders (cid: CID): Promise { - return await this.syncQueue.add(async () => { + let provs: Map = new Map() + + await this.syncQueue.add(async () => { log('get providers for %s', cid) - const provs = await this._getProvidersMap(cid) + provs = await this._getProvidersMap(cid) return [...provs.keys()].map(peerIdStr => { return peerIdFromString(peerIdStr) }) }) + + return [...provs.keys()].map(peerIdStr => { + return peerIdFromString(peerIdStr) + }) } } /** * Encode the given key its matching datastore key */ -function makeProviderKey (cid: CID | string) { +function makeProviderKey (cid: CID | string): string { const cidStr = typeof cid === 'string' ? cid : uint8ArrayToString(cid.multihash.bytes, 'base32') return `${PROVIDER_KEY_PREFIX}/${cidStr}` @@ -232,7 +238,7 @@ function makeProviderKey (cid: CID | string) { /** * Write a provider into the given store */ -async function writeProviderEntry (store: Datastore, cid: CID, peer: PeerId, time: Date) { // eslint-disable-line require-await +async function writeProviderEntry (store: Datastore, cid: CID, peer: PeerId, time: Date): Promise { const dsKey = [ makeProviderKey(cid), '/', @@ -242,13 +248,13 @@ async function writeProviderEntry (store: Datastore, cid: CID, peer: PeerId, tim const key = new Key(dsKey) const buffer = Uint8Array.from(varint.encode(time.getTime())) - return await store.put(key, buffer) + await store.put(key, buffer) } /** * Parse the CID and provider peer id from the key */ -function parseProviderKey (key: Key) { +function parseProviderKey (key: Key): { cid: string, peerId: string } { const parts = key.toString().split('/') if (parts.length !== 5) { @@ -264,7 +270,7 @@ function parseProviderKey (key: Key) { /** * Load providers for the given CID from the store */ -async function loadProviders (store: Datastore, cid: CID) { +async function loadProviders (store: Datastore, cid: CID): Promise> { const providers = new Map() const query = store.query({ prefix: makeProviderKey(cid) }) @@ -276,6 +282,6 @@ async function loadProviders (store: Datastore, cid: CID) { return providers } -function readTime (buf: Uint8Array) { +function readTime (buf: Uint8Array): Date { return new Date(varint.decode(buf)) } diff --git a/src/query-self.ts b/src/query-self.ts index b66bf9fe..a5ae17c6 100644 --- a/src/query-self.ts +++ b/src/query-self.ts @@ -44,11 +44,11 @@ export class QuerySelf implements Startable { this.queryTimeout = queryTimeout ?? QUERY_SELF_TIMEOUT } - isStarted () { + isStarted (): boolean { return this.running } - async start () { + async start (): Promise { if (this.running) { return } @@ -57,7 +57,7 @@ export class QuerySelf implements Startable { this._querySelf() } - async stop () { + async stop (): Promise { this.running = false if (this.timeoutId != null) { @@ -69,7 +69,7 @@ export class QuerySelf implements Startable { } } - _querySelf () { + _querySelf (): void { Promise.resolve().then(async () => { const timeoutController = new TimeoutController(this.queryTimeout) diff --git a/src/query/manager.ts b/src/query/manager.ts index a09ecbd6..e2559070 100644 --- a/src/query/manager.ts +++ b/src/query/manager.ts @@ -12,7 +12,7 @@ import { logger } from '@libp2p/logger' import type { PeerId } from '@libp2p/interface-peer-id' import type { Startable } from '@libp2p/interfaces/startable' import type { QueryFunc } from './types.js' -import type { QueryOptions } from '@libp2p/interface-dht' +import type { QueryEvent, QueryOptions } from '@libp2p/interface-dht' import { PeerSet } from '@libp2p/peer-collections' import type { Metric, Metrics } from '@libp2p/interface-metrics' @@ -59,14 +59,14 @@ export class QueryManager implements Startable { this.queries = 0 } - isStarted () { + isStarted (): boolean { return this.running } /** * Starts the query manager */ - async start () { + async start (): Promise { this.running = true if (this.components.metrics != null && this.metrics == null) { @@ -80,7 +80,7 @@ export class QueryManager implements Startable { /** * Stops all queries */ - async stop () { + async stop (): Promise { this.running = false for (const controller of this.controllers) { @@ -90,7 +90,7 @@ export class QueryManager implements Startable { this.controllers.clear() } - async * run (key: Uint8Array, peers: PeerId[], queryFunc: QueryFunc, options: QueryOptions = {}) { + async * run (key: Uint8Array, peers: PeerId[], queryFunc: QueryFunc, options: QueryOptions = {}): AsyncGenerator { if (!this.running) { throw new Error('QueryManager not started') } diff --git a/src/query/query-path.ts b/src/query/query-path.ts index c60148dc..8e416627 100644 --- a/src/query/query-path.ts +++ b/src/query/query-path.ts @@ -6,7 +6,6 @@ import { CodeError } from '@libp2p/interfaces/errors' import { convertPeerId, convertBuffer } from '../utils.js' import { TimeoutController } from 'timeout-abort-controller' import { anySignal } from 'any-signal' -import { queryErrorEvent } from './events.js' import type { PeerId } from '@libp2p/interface-peer-id' import type { EventEmitter } from '@libp2p/interfaces/events' import type { CleanUpEvents } from './manager.js' @@ -83,7 +82,7 @@ export interface QueryPathOptions { * Walks a path through the DHT, calling the passed query function for * every peer encountered that we have not seen before */ -export async function * queryPath (options: QueryPathOptions) { +export async function * queryPath (options: QueryPathOptions): AsyncGenerator { const { key, startingPeer, ourPeerId, signal, query, alpha, pathIndex, numPaths, cleanUp, queryFuncTimeout, log, peersSeen } = options // Only ALPHA node/value lookups are allowed at any given time for each process // https://github.com/libp2p/specs/tree/master/kad-dht#alpha-concurrency-parameter-%CE%B1 @@ -98,7 +97,7 @@ export async function * queryPath (options: QueryPathOptions) { * Adds the passed peer to the query queue if it's not us and no * other path has passed through this peer */ - function queryPeer (peer: PeerId, peerKadId: Uint8Array) { + function queryPeer (peer: PeerId, peerKadId: Uint8Array): void { if (peer == null) { return } @@ -118,64 +117,46 @@ export async function * queryPath (options: QueryPathOptions) { const compoundSignal = anySignal(signals) - try { - for await (const event of query({ - key, - peer, - signal: compoundSignal, - pathIndex, - numPaths - })) { - if (compoundSignal.aborted) { - return - } + for await (const event of query({ + key, + peer, + signal: compoundSignal, + pathIndex, + numPaths + })) { + if (compoundSignal.aborted) { + return + } - // if there are closer peers and the query has not completed, continue the query - if (event.name === 'PEER_RESPONSE') { - for (const closerPeer of event.closer) { - if (peersSeen.has(closerPeer.id)) { // eslint-disable-line max-depth - log('already seen %p in query', closerPeer.id) - continue - } - - if (ourPeerId.equals(closerPeer.id)) { // eslint-disable-line max-depth - log('not querying ourselves') - continue - } - - const closerPeerKadId = await convertPeerId(closerPeer.id) - const closerPeerXor = BigInt('0x' + toString(xor(closerPeerKadId, kadId), 'base16')) - - // only continue query if closer peer is actually closer - if (closerPeerXor > peerXor) { // eslint-disable-line max-depth - log('skipping %p as they are not closer to %b than %p', closerPeer.id, key, peer) - continue - } - - log('querying closer peer %p', closerPeer.id) - queryPeer(closerPeer.id, closerPeerKadId) + // if there are closer peers and the query has not completed, continue the query + if (event.name === 'PEER_RESPONSE') { + for (const closerPeer of event.closer) { + if (peersSeen.has(closerPeer.id)) { // eslint-disable-line max-depth + log('already seen %p in query', closerPeer.id) + continue } - } - // TODO: we have upgraded to p-queue@7, this should no longer be necessary - queue.emit('completed', event) - } + if (ourPeerId.equals(closerPeer.id)) { // eslint-disable-line max-depth + log('not querying ourselves') + continue + } - timeout?.clear() - } catch (err: any) { - if (signal.aborted) { - // TODO: we have upgraded to p-queue@7, this should no longer be necessary - queue.emit('error', err) - } else { - // TODO: we have upgraded to p-queue@7, this should no longer be necessary - queue.emit('completed', queryErrorEvent({ - from: peer, - error: err - })) + const closerPeerKadId = await convertPeerId(closerPeer.id) + const closerPeerXor = BigInt('0x' + toString(xor(closerPeerKadId, kadId), 'base16')) + + // only continue query if closer peer is actually closer + if (closerPeerXor > peerXor) { // eslint-disable-line max-depth + log('skipping %p as they are not closer to %b than %p', closerPeer.id, key, peer) + continue + } + + log('querying closer peer %p', closerPeer.id) + queryPeer(closerPeer.id, closerPeerKadId) + } } - } finally { - timeout?.clear() } + + timeout?.clear() }, { // use xor value as the queue priority - closer peers should execute first // subtract it from MAX_XOR because higher priority values execute sooner @@ -196,12 +177,12 @@ export async function * queryPath (options: QueryPathOptions) { yield * toGenerator(queue, signal, cleanUp, log) } -async function * toGenerator (queue: Queue, signal: AbortSignal, cleanUp: EventEmitter, log: Logger) { +async function * toGenerator (queue: Queue, signal: AbortSignal, cleanUp: EventEmitter, log: Logger): AsyncGenerator { let deferred = defer() let running = true const results: QueryEvent[] = [] - const cleanup = () => { + const cleanup = (): void => { if (!running) { return } diff --git a/src/routing-table/index.ts b/src/routing-table/index.ts index 47a9b574..dbd815f2 100644 --- a/src/routing-table/index.ts +++ b/src/routing-table/index.ts @@ -109,7 +109,7 @@ export class RoutingTable implements Startable { this.tagName = tagName ?? KAD_CLOSE_TAG_NAME this.tagValue = tagValue ?? KAD_CLOSE_TAG_VALUE - const updatePingQueueSizeMetric = () => { + const updatePingQueueSizeMetric = (): void => { this.metrics?.pingQueueSize.update(this.pingQueue.size) this.metrics?.pingRunning.update(this.pingQueue.pending) } @@ -121,11 +121,11 @@ export class RoutingTable implements Startable { this._onPing = this._onPing.bind(this) } - isStarted () { + isStarted (): boolean { return this.running } - async start () { + async start (): Promise { this.running = true if (this.components.metrics != null) { @@ -150,7 +150,7 @@ export class RoutingTable implements Startable { this._tagPeers(kBuck) } - async stop () { + async stop (): Promise { this.running = false this.pingQueue.clear() this.kb = undefined @@ -161,7 +161,7 @@ export class RoutingTable implements Startable { * - this will lower the chances that connections to them get closed when * we reach connection limits */ - _tagPeers (kBuck: KBucketTree) { + _tagPeers (kBuck: KBucketTree): void { let kClosest = new PeerSet() const updatePeerTags = utils.debounce(() => { @@ -208,7 +208,7 @@ export class RoutingTable implements Startable { * `oldContacts` will not be empty and is the list of contacts that * have not been contacted for the longest. */ - _onPing (oldContacts: KBucketPeer[], newContact: KBucketPeer) { + _onPing (oldContacts: KBucketPeer[], newContact: KBucketPeer): void { // add to a queue so multiple ping requests do not overlap and we don't // flood the network with ping requests if lots of newContact requests // are received @@ -272,7 +272,7 @@ export class RoutingTable implements Startable { /** * Amount of currently stored peers */ - get size () { + get size (): number { if (this.kb == null) { return 0 } @@ -323,14 +323,14 @@ export class RoutingTable implements Startable { /** * Add or update the routing table with the given peer */ - async add (peer: PeerId) { + async add (peer: PeerId): Promise { if (this.kb == null) { throw new Error('RoutingTable is not started') } const id = await utils.convertPeerId(peer) - this.kb.add({ id: id, peer: peer }) + this.kb.add({ id, peer }) this.log('added %p with kad id %b', peer, id) @@ -340,7 +340,7 @@ export class RoutingTable implements Startable { /** * Remove a given peer from the table */ - async remove (peer: PeerId) { + async remove (peer: PeerId): Promise { if (this.kb == null) { throw new Error('RoutingTable is not started') } diff --git a/src/routing-table/refresh.ts b/src/routing-table/refresh.ts index b3d9d07f..367f3e55 100644 --- a/src/routing-table/refresh.ts +++ b/src/routing-table/refresh.ts @@ -10,6 +10,7 @@ import { TABLE_REFRESH_INTERVAL, TABLE_REFRESH_QUERY_TIMEOUT } from '../constant import type { RoutingTable } from './index.js' import type { Logger } from '@libp2p/logger' import type { PeerRouting } from '../peer-routing/index.js' +import type { PeerId } from '@libp2p/interface-peer-id' /** * Cannot generate random KadIds longer than this + 1 @@ -49,12 +50,12 @@ export class RoutingTableRefresh { this.refreshTable = this.refreshTable.bind(this) } - async start () { + async start (): Promise { this.log(`refreshing routing table every ${this.refreshInterval}ms`) this.refreshTable(true) } - async stop () { + async stop (): Promise { if (this.refreshTimeoutId != null) { clearTimeout(this.refreshTimeoutId) } @@ -66,7 +67,7 @@ export class RoutingTableRefresh { * that is close to the requested peer ID and query that, then network * peers will tell us who they know who is close to the fake ID */ - refreshTable (force: boolean = false) { + refreshTable (force: boolean = false): void { this.log('refreshing routing table') const prefixLength = this._maxCommonPrefix() @@ -123,7 +124,7 @@ export class RoutingTableRefresh { }) } - async _refreshCommonPrefixLength (cpl: number, lastRefresh: Date, force: boolean) { + async _refreshCommonPrefixLength (cpl: number, lastRefresh: Date, force: boolean): Promise { if (!force && lastRefresh.getTime() > (Date.now() - this.refreshInterval)) { this.log('not running refresh for cpl %s as time since last refresh not above interval', cpl) return @@ -146,7 +147,7 @@ export class RoutingTableRefresh { } } - _getTrackedCommonPrefixLengthsForRefresh (maxCommonPrefix: number) { + _getTrackedCommonPrefixLengthsForRefresh (maxCommonPrefix: number): Date[] { if (maxCommonPrefix > MAX_COMMON_PREFIX_LENGTH) { maxCommonPrefix = MAX_COMMON_PREFIX_LENGTH } @@ -161,7 +162,7 @@ export class RoutingTableRefresh { return dates } - async _generateRandomPeerId (targetCommonPrefixLength: number) { + async _generateRandomPeerId (targetCommonPrefixLength: number): Promise { if (this.routingTable.kb == null) { throw new Error('Routing table not started') } @@ -174,7 +175,7 @@ export class RoutingTableRefresh { return peerIdFromBytes(key) } - async _makePeerId (localKadId: Uint8Array, randomPrefix: number, targetCommonPrefixLength: number) { + async _makePeerId (localKadId: Uint8Array, randomPrefix: number, targetCommonPrefixLength: number): Promise { if (targetCommonPrefixLength > MAX_COMMON_PREFIX_LENGTH) { throw new Error(`Cannot generate peer ID for common prefix length greater than ${MAX_COMMON_PREFIX_LENGTH}`) } @@ -208,7 +209,7 @@ export class RoutingTableRefresh { * returns the maximum common prefix length between any peer in the table * and the current peer */ - _maxCommonPrefix () { + _maxCommonPrefix (): number { // xor our KadId with every KadId in the k-bucket tree, // return the longest id prefix that is the same let prefixLength = 0 @@ -225,7 +226,7 @@ export class RoutingTableRefresh { /** * Returns the number of peers in the table with a given prefix length */ - _numPeersForCpl (prefixLength: number) { + _numPeersForCpl (prefixLength: number): number { let count = 0 for (const length of this._prefixLengths()) { @@ -240,7 +241,7 @@ export class RoutingTableRefresh { /** * Yields the common prefix length of every peer in the table */ - * _prefixLengths () { + * _prefixLengths (): Generator { if (this.routingTable.kb == null) { return } diff --git a/src/rpc/handlers/add-provider.ts b/src/rpc/handlers/add-provider.ts index 6141a3c0..347276f4 100644 --- a/src/rpc/handlers/add-provider.ts +++ b/src/rpc/handlers/add-provider.ts @@ -20,7 +20,7 @@ export class AddProviderHandler implements DHTMessageHandler { this.providers = providers } - async handle (peerId: PeerId, msg: Message) { + async handle (peerId: PeerId, msg: Message): Promise { log('start') if (msg.key == null || msg.key.length === 0) { diff --git a/src/rpc/handlers/find-node.ts b/src/rpc/handlers/find-node.ts index d884d0d9..f15d6c1d 100644 --- a/src/rpc/handlers/find-node.ts +++ b/src/rpc/handlers/find-node.ts @@ -40,7 +40,7 @@ export class FindNodeHandler implements DHTMessageHandler { /** * Process `FindNode` DHT messages */ - async handle (peerId: PeerId, msg: Message) { + async handle (peerId: PeerId, msg: Message): Promise { log('incoming request from %p for peers closer to %b', peerId, msg.key) let closer: PeerInfo[] = [] diff --git a/src/rpc/handlers/get-providers.ts b/src/rpc/handlers/get-providers.ts index 3cb9d4e0..65ad0c63 100644 --- a/src/rpc/handlers/get-providers.ts +++ b/src/rpc/handlers/get-providers.ts @@ -12,6 +12,7 @@ import type { PeerRouting } from '../../peer-routing/index.js' import type { PeerId } from '@libp2p/interface-peer-id' import type { PeerInfo } from '@libp2p/interface-peer-info' import type { PeerStore } from '@libp2p/interface-peer-store' +import type { Multiaddr } from '@multiformats/multiaddr' const log = logger('libp2p:kad-dht:rpc:handlers:get-providers') @@ -40,7 +41,7 @@ export class GetProvidersHandler implements DHTMessageHandler { this.lan = Boolean(lan) } - async handle (peerId: PeerId, msg: Message) { + async handle (peerId: PeerId, msg: Message): Promise { let cid try { cid = CID.decode(msg.key) @@ -71,13 +72,13 @@ export class GetProvidersHandler implements DHTMessageHandler { return response } - async _getAddresses (peerId: PeerId) { + async _getAddresses (peerId: PeerId): Promise { const addrs = await this.components.peerStore.addressBook.get(peerId) return addrs.map(address => address.multiaddr) } - async _getPeers (peerIds: PeerId[]) { + async _getPeers (peerIds: PeerId[]): Promise { const output: PeerInfo[] = [] const addrFilter = this.lan ? removePublicAddresses : removePrivateAddresses diff --git a/src/rpc/handlers/get-value.ts b/src/rpc/handlers/get-value.ts index 6fced6e6..b828f06c 100644 --- a/src/rpc/handlers/get-value.ts +++ b/src/rpc/handlers/get-value.ts @@ -34,7 +34,7 @@ export class GetValueHandler implements DHTMessageHandler { this.peerRouting = peerRouting } - async handle (peerId: PeerId, msg: Message) { + async handle (peerId: PeerId, msg: Message): Promise { const key = msg.key log('%p asked for key %b', peerId, key) @@ -91,11 +91,11 @@ export class GetValueHandler implements DHTMessageHandler { /** * Try to fetch a given record by from the local datastore. - * Returns the record iff it is still valid, meaning + * Returns the record if it is still valid, meaning * - it was either authored by this node, or * - it was received less than `MAX_RECORD_AGE` ago. */ - async _checkLocalDatastore (key: Uint8Array) { + async _checkLocalDatastore (key: Uint8Array): Promise { log('checkLocalDatastore looking for %b', key) const dsKey = bufferToRecordKey(key) diff --git a/src/rpc/handlers/ping.ts b/src/rpc/handlers/ping.ts index ed63d1eb..09d569c0 100644 --- a/src/rpc/handlers/ping.ts +++ b/src/rpc/handlers/ping.ts @@ -6,7 +6,7 @@ import type { PeerId } from '@libp2p/interface-peer-id' const log = logger('libp2p:kad-dht:rpc:handlers:ping') export class PingHandler implements DHTMessageHandler { - async handle (peerId: PeerId, msg: Message) { + async handle (peerId: PeerId, msg: Message): Promise { log('ping from %p', peerId) return msg } diff --git a/src/rpc/handlers/put-value.ts b/src/rpc/handlers/put-value.ts index 0bceb71e..0dcb9607 100644 --- a/src/rpc/handlers/put-value.ts +++ b/src/rpc/handlers/put-value.ts @@ -29,7 +29,7 @@ export class PutValueHandler implements DHTMessageHandler { this.validators = validators } - async handle (peerId: PeerId, msg: Message) { + async handle (peerId: PeerId, msg: Message): Promise { const key = msg.key this.log('%p asked us to store value for key %b', peerId, key) diff --git a/src/rpc/index.ts b/src/rpc/index.ts index bcc420b0..108faf10 100644 --- a/src/rpc/index.ts +++ b/src/rpc/index.ts @@ -54,7 +54,7 @@ export class RPC { /** * Process incoming DHT messages */ - async handleMessage (peerId: PeerId, msg: Message) { + async handleMessage (peerId: PeerId, msg: Message): Promise { try { await this.routingTable.add(peerId) } catch (err: any) { @@ -75,7 +75,7 @@ export class RPC { /** * Handle incoming streams on the dht protocol */ - onIncomingStream (data: IncomingStreamData) { + onIncomingStream (data: IncomingStreamData): void { Promise.resolve().then(async () => { const { stream, connection } = data const peerId = connection.remotePeer diff --git a/src/topology-listener.ts b/src/topology-listener.ts index 4899f5a0..0758409f 100644 --- a/src/topology-listener.ts +++ b/src/topology-listener.ts @@ -36,14 +36,14 @@ export class TopologyListener extends EventEmitter imple this.protocol = protocol } - isStarted () { + isStarted (): boolean { return this.running } /** * Start the network */ - async start () { + async start (): Promise { if (this.running) { return } @@ -65,7 +65,7 @@ export class TopologyListener extends EventEmitter imple /** * Stop all network activity */ - stop () { + async stop (): Promise { this.running = false // unregister protocol and handlers diff --git a/src/utils.ts b/src/utils.ts index 058d919f..1bfeaf18 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -82,7 +82,7 @@ export function removePublicAddresses (peer: PeerInfo): PeerInfo { /** * Creates a DHT ID by hashing a given Uint8Array */ -export async function convertBuffer (buf: Uint8Array) { +export async function convertBuffer (buf: Uint8Array): Promise { const multihash = await sha256.digest(buf) return multihash.digest @@ -91,61 +91,61 @@ export async function convertBuffer (buf: Uint8Array) { /** * Creates a DHT ID by hashing a Peer ID */ -export async function convertPeerId (peerId: PeerId) { +export async function convertPeerId (peerId: PeerId): Promise { return await convertBuffer(peerId.toBytes()) } /** * Convert a Uint8Array to their SHA2-256 hash */ -export function bufferToKey (buf: Uint8Array) { +export function bufferToKey (buf: Uint8Array): Key { return new Key('/' + uint8ArrayToString(buf, 'base32'), false) } /** * Convert a Uint8Array to their SHA2-256 hash */ -export function bufferToRecordKey (buf: Uint8Array) { +export function bufferToRecordKey (buf: Uint8Array): Key { return new Key(`${RECORD_KEY_PREFIX}/${uint8ArrayToString(buf, 'base32')}`, false) } /** * Generate the key for a public key. */ -export function keyForPublicKey (peer: PeerId) { +export function keyForPublicKey (peer: PeerId): Uint8Array { return uint8ArrayConcat([ PK_PREFIX, peer.toBytes() ]) } -export function isPublicKeyKey (key: Uint8Array) { +export function isPublicKeyKey (key: Uint8Array): boolean { return uint8ArrayToString(key.subarray(0, 4)) === '/pk/' } -export function isIPNSKey (key: Uint8Array) { +export function isIPNSKey (key: Uint8Array): boolean { return uint8ArrayToString(key.subarray(0, 4)) === '/ipns/' } -export function fromPublicKeyKey (key: Uint8Array) { +export function fromPublicKeyKey (key: Uint8Array): PeerId { return peerIdFromBytes(key.subarray(4)) } /** * Create a new put record, encodes and signs it if enabled */ -export function createPutRecord (key: Uint8Array, value: Uint8Array) { +export function createPutRecord (key: Uint8Array, value: Uint8Array): Uint8Array { const timeReceived = new Date() const rec = new Libp2pRecord(key, value, timeReceived) return rec.serialize() } -export function debounce (callback: () => void, wait: number = 100) { +export function debounce (callback: () => void, wait: number = 100): () => void { let timeout: ReturnType - return () => { + return (): void => { clearTimeout(timeout) - timeout = setTimeout(() => callback(), wait) + timeout = setTimeout(() => { callback() }, wait) } } From ceee3a02fe05dd210e01fff5d2d7ab51f45de1a0 Mon Sep 17 00:00:00 2001 From: chad Date: Thu, 9 Feb 2023 13:07:50 -0500 Subject: [PATCH 2/5] chore: linting test files --- test/generate-peers/generate-peers.node.ts | 2 +- test/kad-dht.spec.ts | 16 ++++++---------- test/message.spec.ts | 1 - test/multiple-nodes.spec.ts | 2 +- test/network.spec.ts | 5 +++-- test/providers.spec.ts | 2 +- test/query.spec.ts | 11 ++++++++--- test/routing-table.spec.ts | 8 ++++---- test/rpc/index.node.ts | 2 +- test/utils/create-peer-id.ts | 5 +++-- test/utils/index.ts | 2 +- test/utils/sort-closest-peers.ts | 4 ++-- test/utils/test-dht.ts | 13 +++++++------ 13 files changed, 38 insertions(+), 35 deletions(-) diff --git a/test/generate-peers/generate-peers.node.ts b/test/generate-peers/generate-peers.node.ts index f87914aa..a7a1d388 100644 --- a/test/generate-peers/generate-peers.node.ts +++ b/test/generate-peers/generate-peers.node.ts @@ -17,7 +17,7 @@ import type { PeerStore } from '@libp2p/interface-peer-store' const dirname = path.dirname(fileURLToPath(import.meta.url)) -async function fromGo (targetCpl: number, randPrefix: number, localKadId: string) { +async function fromGo (targetCpl: number, randPrefix: number, localKadId: string): Promise { const { stdout } = await execa('./generate-peer', [targetCpl.toString(), randPrefix.toString(), localKadId], { cwd: dirname }) diff --git a/test/kad-dht.spec.ts b/test/kad-dht.spec.ts index de5f096a..935c7fa0 100644 --- a/test/kad-dht.spec.ts +++ b/test/kad-dht.spec.ts @@ -28,9 +28,9 @@ import type { DualKadDHT } from '../src/dual-kad-dht.js' import { pipe } from 'it-pipe' import map from 'it-map' -async function findEvent (events: AsyncIterable, name: 'FINAL_PEER'): Promise +async function findEvent (events: AsyncIterable, name: 'FINAL_PEER'): Promise async function findEvent (events: AsyncIterable, name: 'VALUE'): Promise -async function findEvent (events: AsyncIterable, name: string) { +async function findEvent (events: AsyncIterable, name: string): Promise { const eventTypes = new Set() const event = await last( @@ -424,7 +424,7 @@ describe('KadDHT', () => { ]) // provide values - await Promise.all(values.map(async (value) => await drain(dhts[3].provide(value.cid)))) + await Promise.all(values.map(async (value) => { await drain(dhts[3].provide(value.cid)) })) // Expect an ADD_PROVIDER message to be sent to each peer for each value const fn = dhts[3].lan.network.sendMessage @@ -528,7 +528,7 @@ describe('KadDHT', () => { tdht.connect(dhts[1], dhts[2]) ]) - await Promise.all(dhts.map(async (dht) => await drain(dht.provide(val.cid)))) + await Promise.all(dhts.map(async (dht) => { await drain(dht.provide(val.cid)) })) const events = await all(dhts[0].findProviders(val.cid)) @@ -564,7 +564,7 @@ describe('KadDHT', () => { tdht.connect(dhts[1], dhts[2]) ]) - await Promise.all(dhts.map(async (dht) => await drain(dht.provide(val.cid)))) + await Promise.all(dhts.map(async (dht) => { await drain(dht.provide(val.cid)) })) const events = await all(dhts[0].findProviders(val.cid)) @@ -713,7 +713,7 @@ describe('KadDHT', () => { throw new Error('Could not find DHT') } - return await tdht.connect(dhtA, dhtB) + await tdht.connect(dhtA, dhtB) })) // Get the alpha (3) closest peers to the key from the origin's @@ -784,13 +784,9 @@ describe('KadDHT', () => { const dht = await tdht.spawn() - // TODO: Switch not closing well, but it will be removed - // (invalid transition: STOPPED -> done) await delay(100) await expect(all(dht.get(uint8ArrayFromString('/v/hello')))).to.eventually.be.rejected().property('code', 'ERR_NO_PEERS_IN_ROUTING_TABLE') - - // TODO: after error switch }) it('get should handle correctly an unexpected error', async function () { diff --git a/test/message.spec.ts b/test/message.spec.ts index 47672e48..e360b013 100644 --- a/test/message.spec.ts +++ b/test/message.spec.ts @@ -15,7 +15,6 @@ describe('Message', () => { expect(msg).to.have.property('type', 'PING') expect(msg).to.have.property('key').eql(uint8ArrayFromString('hello')) - // TODO: confirm this works as expected expect(msg).to.have.property('clusterLevelRaw', 5) expect(msg).to.have.property('clusterLevel', 4) }) diff --git a/test/multiple-nodes.spec.ts b/test/multiple-nodes.spec.ts index d87790f4..bc734774 100644 --- a/test/multiple-nodes.spec.ts +++ b/test/multiple-nodes.spec.ts @@ -26,7 +26,7 @@ describe('multiple nodes', function () { const range = Array.from(Array(n - 1).keys()) // connect the last one with the others one by one - return await Promise.all(range.map(async (i) => await tdht.connect(dhts[n - 1], dhts[i]))) + return await Promise.all(range.map(async (i) => { await tdht.connect(dhts[n - 1], dhts[i]) })) }) afterEach(async function () { diff --git a/test/network.spec.ts b/test/network.spec.ts index 47a6dbe8..978a1cc1 100644 --- a/test/network.spec.ts +++ b/test/network.spec.ts @@ -28,7 +28,7 @@ describe('Network', () => { }) }) - after(async () => await tdht.teardown()) + after(async () => { await tdht.teardown() }) describe('sendRequest', () => { it('send and response echo', async () => { @@ -44,7 +44,7 @@ describe('Network', () => { it('send and response different messages', async () => { const defer = pDefer() let i = 0 - const finish = () => { + const finish = (): void => { if (i++ === 1) { defer.resolve() } @@ -53,6 +53,7 @@ describe('Network', () => { const msg = new Message(MESSAGE_TYPE.PING, uint8ArrayFromString('hello'), 0) // mock it + // @ts-expect-error incomplete implementation dht.components.connectionManager.openConnection = async (peer: PeerId) => { // @ts-expect-error incomplete implementation const connection: Connection = { diff --git a/test/providers.spec.ts b/test/providers.spec.ts index cc3eab64..46562866 100644 --- a/test/providers.spec.ts +++ b/test/providers.spec.ts @@ -74,7 +74,7 @@ describe('Providers', () => { const cids = hashes.map((h) => CID.createV0(h)) - await Promise.all(cids.map(async cid => await providers.addProvider(cid, peers[0]))) + await Promise.all(cids.map(async cid => { await providers.addProvider(cid, peers[0]) })) const provs = await Promise.all(cids.map(async cid => await providers.getProviders(cid))) expect(provs).to.have.length(100) diff --git a/test/query.spec.ts b/test/query.spec.ts index 538dc20b..0ed51766 100644 --- a/test/query.spec.ts +++ b/test/query.spec.ts @@ -26,13 +26,18 @@ interface TopologyEntry { closerPeers?: number[] event: QueryEvent } +type Topology = Record describe('QueryManager', () => { let ourPeerId: PeerId let peers: PeerId[] let key: Uint8Array - function createTopology (opts: Record) { + function createTopology (opts: Record): Topology { const topology: Record = {} Object.keys(opts).forEach(key => { @@ -72,7 +77,7 @@ describe('QueryManager', () => { return topology } - function createQueryFunction (topology: Record) { + function createQueryFunction (topology: Record): QueryFunc { const queryFunc: QueryFunc = async function * ({ peer }) { const res = topology[peer.toString()] @@ -561,7 +566,7 @@ describe('QueryManager', () => { const queryFunc: QueryFunc = async function * ({ peer }) { // eslint-disable-line require-await visited.push(peer) - const getResult = async () => { + const getResult = async (): Promise => { const res = topology[peer.toString()] // this delay is necessary so `dhtA.stop` has time to stop the // requests before they all complete diff --git a/test/routing-table.spec.ts b/test/routing-table.spec.ts index 896edd3e..5d95e104 100644 --- a/test/routing-table.spec.ts +++ b/test/routing-table.spec.ts @@ -60,7 +60,7 @@ describe('Routing Table', () => { const ids = await createPeerIds(20) await Promise.all( - Array.from({ length: 1000 }).map(async () => await table.add(ids[random(ids.length - 1)])) + Array.from({ length: 1000 }).map(async () => { await table.add(ids[random(ids.length - 1)]) }) ) await Promise.all( @@ -78,7 +78,7 @@ describe('Routing Table', () => { this.timeout(20 * 1000) const peers = await createPeerIds(10) - await Promise.all(peers.map(async (peer) => await table.add(peer))) + await Promise.all(peers.map(async (peer) => { await table.add(peer) })) const key = await kadUtils.convertPeerId(peers[2]) expect(table.closestPeers(key, 10)).to.have.length(10) @@ -92,7 +92,7 @@ describe('Routing Table', () => { this.timeout(10 * 1000) const peers = await createPeerIds(4) - await Promise.all(peers.map(async (peer) => await table.add(peer))) + await Promise.all(peers.map(async (peer) => { await table.add(peer) })) const id = peers[2] const key = await kadUtils.convertPeerId(id) @@ -103,7 +103,7 @@ describe('Routing Table', () => { this.timeout(20 * 1000) const peers = await createPeerIds(18) - await Promise.all(peers.map(async (peer) => await table.add(peer))) + await Promise.all(peers.map(async (peer) => { await table.add(peer) })) const key = await kadUtils.convertPeerId(peers[2]) expect(table.closestPeers(key, 15)).to.have.length(15) diff --git a/test/rpc/index.node.ts b/test/rpc/index.node.ts index 02fbf7ef..674c20ee 100644 --- a/test/rpc/index.node.ts +++ b/test/rpc/index.node.ts @@ -69,7 +69,7 @@ describe('rpc', () => { const defer = pDefer() const msg = new Message(MESSAGE_TYPE.GET_VALUE, uint8ArrayFromString('hello'), 5) - const validateMessage = (res: Uint8ArrayList[]) => { + const validateMessage = (res: Uint8ArrayList[]): void => { const msg = Message.deserialize(res[0]) expect(msg).to.have.property('key').eql(uint8ArrayFromString('hello')) expect(msg).to.have.property('closerPeers').eql([]) diff --git a/test/utils/create-peer-id.ts b/test/utils/create-peer-id.ts index d5ebb661..c4c3e69b 100644 --- a/test/utils/create-peer-id.ts +++ b/test/utils/create-peer-id.ts @@ -1,9 +1,10 @@ +import type { Ed25519PeerId } from '@libp2p/interface-peer-id' import { createEd25519PeerId } from '@libp2p/peer-id-factory' /** * Creates multiple PeerIds */ -export async function createPeerIds (length: number) { +export async function createPeerIds (length: number): Promise { return await Promise.all( new Array(length).fill(0).map(async () => await createEd25519PeerId()) ) @@ -12,7 +13,7 @@ export async function createPeerIds (length: number) { /** * Creates a PeerId */ -export async function createPeerId () { +export async function createPeerId (): Promise { const ids = await createPeerIds(1) return ids[0] diff --git a/test/utils/index.ts b/test/utils/index.ts index 29469048..933d9e7c 100644 --- a/test/utils/index.ts +++ b/test/utils/index.ts @@ -3,7 +3,7 @@ import type { PeerId } from '@libp2p/interface-peer-id' /** * Count how many peers are in b but are not in a */ -export function countDiffPeers (a: PeerId[], b: PeerId[]) { +export function countDiffPeers (a: PeerId[], b: PeerId[]): number { const s = new Set() a.forEach((p) => s.add(p.toString())) diff --git a/test/utils/sort-closest-peers.ts b/test/utils/sort-closest-peers.ts index 1cdca77c..ee0ad2a6 100644 --- a/test/utils/sort-closest-peers.ts +++ b/test/utils/sort-closest-peers.ts @@ -8,13 +8,13 @@ import type { PeerId } from '@libp2p/interface-peer-id' /** * Sort peers by distance to the given `kadId` */ -export async function sortClosestPeers (peers: PeerId[], kadId: Uint8Array) { +export async function sortClosestPeers (peers: PeerId[], kadId: Uint8Array): Promise { const distances = await all( map(peers, async (peer) => { const id = await convertPeerId(peer) return { - peer: peer, + peer, distance: uint8ArrayXor(id, kadId) } }) diff --git a/test/utils/test-dht.ts b/test/utils/test-dht.ts index ba8e993c..7395e4d7 100644 --- a/test/utils/test-dht.ts +++ b/test/utils/test-dht.ts @@ -15,17 +15,18 @@ import { start } from '@libp2p/interfaces/startable' import delay from 'delay' import type { ConnectionManager } from '@libp2p/interface-connection-manager' import type { PeerStore } from '@libp2p/interface-peer-store' +import type { PeerId } from '@libp2p/interface-peer-id' const log = logger('libp2p:kad-dht:test-dht') export class TestDHT { - private readonly peers: Map + private readonly peers: Map constructor () { this.peers = new Map() } - async spawn (options: Partial = {}, autoStart = true) { + async spawn (options: Partial = {}, autoStart = true): Promise { const components: KadDHTComponents = { peerId: await createPeerId(), datastore: new MemoryDatastore(), @@ -93,7 +94,7 @@ export class TestDHT { Promise.all([ components.peerStore.addressBook.add(peerData.id, peerData.multiaddrs), components.peerStore.protoBook.set(peerData.id, peerData.protocols) - ]).catch(err => log.error(err)) + ]).catch(err => { log.error(err) }) }) if (autoStart) { @@ -108,7 +109,7 @@ export class TestDHT { return dht } - async connect (dhtA: DualKadDHT, dhtB: DualKadDHT) { + async connect (dhtA: DualKadDHT, dhtB: DualKadDHT): Promise { // need addresses in the address book otherwise we won't know whether to add // the peer to the public or private DHT and will do nothing await dhtA.components.peerStore.addressBook.add(dhtB.components.peerId, dhtB.components.addressManager.getAddresses()) @@ -124,7 +125,7 @@ export class TestDHT { await checkConnected(dhtA.wan, dhtB.wan) } - async function checkConnected (a: KadDHT, b: KadDHT) { + async function checkConnected (a: KadDHT, b: KadDHT): Promise { const routingTableChecks = [] routingTableChecks.push(async () => { @@ -161,7 +162,7 @@ export class TestDHT { async teardown (): Promise { await Promise.all( - Array.from(this.peers.entries()).map(async ([_, { dht }]) => await dht.stop()) + Array.from(this.peers.entries()).map(async ([_, { dht }]) => { await dht.stop() }) ) this.peers.clear() } From 562b69c1bc6e75205ffe912ac2f5e76a0480b9a1 Mon Sep 17 00:00:00 2001 From: chad Date: Thu, 9 Feb 2023 13:08:22 -0500 Subject: [PATCH 3/5] chore: upgrade aegir to 38.1.2 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 55c6ea02..ed047449 100644 --- a/package.json +++ b/package.json @@ -196,7 +196,7 @@ "@types/lodash.range": "^3.2.6", "@types/varint": "^6.0.0", "@types/which": "^2.0.1", - "aegir": "^37.7.7", + "aegir": "^38.1.2", "datastore-level": "^9.0.0", "delay": "^5.0.0", "execa": "^6.0.0", From ef0b7e00b16b0557e8954920c39a0e58f5891f09 Mon Sep 17 00:00:00 2001 From: chad Date: Thu, 9 Feb 2023 18:41:20 -0500 Subject: [PATCH 4/5] chore: linting remove unused imports + updated types --- src/content-fetching/index.ts | 4 ++-- src/dual-kad-dht.ts | 4 ++-- src/peer-routing/index.ts | 6 +++--- test/kad-dht.spec.ts | 2 +- test/routing-table.spec.ts | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/content-fetching/index.ts b/src/content-fetching/index.ts index 31f42530..20ca8633 100644 --- a/src/content-fetching/index.ts +++ b/src/content-fetching/index.ts @@ -16,7 +16,7 @@ import { } from '../constants.js' import { createPutRecord, convertBuffer, bufferToRecordKey } from '../utils.js' import { logger } from '@libp2p/logger' -import type { Validators, Selectors, ValueEvent, QueryOptions, DialingPeerEvent, PeerResponseEvent, QueryErrorEvent, SendingQueryEvent, QueryEvent } from '@libp2p/interface-dht' +import type { Validators, Selectors, ValueEvent, QueryOptions, QueryEvent } from '@libp2p/interface-dht' import type { PeerRouting } from '../peer-routing/index.js' import type { QueryManager } from '../query/manager.js' import type { RoutingTable } from '../routing-table/index.js' @@ -88,7 +88,7 @@ export class ContentFetching { /** * Send the best record found to any peers that have an out of date record */ - async * sendCorrectionRecord (key: Uint8Array, vals: ValueEvent[], best: Uint8Array, options: AbortOptions = {}): AsyncGenerator { + async * sendCorrectionRecord (key: Uint8Array, vals: ValueEvent[], best: Uint8Array, options: AbortOptions = {}): AsyncGenerator { this.log('sendCorrection for %b', key) const fixupRec = createPutRecord(key, best) diff --git a/src/dual-kad-dht.ts b/src/dual-kad-dht.ts index af6d15ef..f4fce412 100644 --- a/src/dual-kad-dht.ts +++ b/src/dual-kad-dht.ts @@ -96,7 +96,7 @@ export class DualKadDHT extends EventEmitter implements Dua /** * Store the given key/value pair in the DHT */ - async * put (key: Uint8Array, value: Uint8Array, options: QueryOptions = {}): AsyncGenerator { + async * put (key: Uint8Array, value: Uint8Array, options: QueryOptions = {}): AsyncGenerator { for await (const event of merge( this.lan.put(key, value, options), this.wan.put(key, value, options) @@ -108,7 +108,7 @@ export class DualKadDHT extends EventEmitter implements Dua /** * Get the value that corresponds to the passed key */ - async * get (key: Uint8Array, options: QueryOptions = {}): AsyncGenerator { // eslint-disable-line require-await + async * get (key: Uint8Array, options: QueryOptions = {}): AsyncGenerator { let queriedPeers = false let foundValue = false diff --git a/src/peer-routing/index.ts b/src/peer-routing/index.ts index 98837057..2a8545ff 100644 --- a/src/peer-routing/index.ts +++ b/src/peer-routing/index.ts @@ -13,7 +13,7 @@ import { Libp2pRecord } from '@libp2p/record' import { logger } from '@libp2p/logger' import { keys } from '@libp2p/crypto' import { peerIdFromKeys } from '@libp2p/peer-id' -import type { DHTRecord, DialingPeerEvent, FinalPeerEvent, PeerResponseEvent, QueryErrorEvent, QueryEvent, QueryOptions, SendingQueryEvent, Validators, ValueEvent } from '@libp2p/interface-dht' +import type { DHTRecord, DialingPeerEvent, FinalPeerEvent, QueryEvent, QueryOptions, Validators } from '@libp2p/interface-dht' import type { RoutingTable } from '../routing-table/index.js' import type { QueryManager } from '../query/manager.js' import type { Network } from '../network.js' @@ -97,7 +97,7 @@ export class PeerRouting { /** * Get a value via rpc call for the given parameters */ - async * _getValueSingle (peer: PeerId, key: Uint8Array, options: AbortOptions = {}): AsyncGenerator { + async * _getValueSingle (peer: PeerId, key: Uint8Array, options: AbortOptions = {}): AsyncGenerator { const msg = new Message(MESSAGE_TYPE.GET_VALUE, key, 0) yield * this.network.sendRequest(peer, msg, options) } @@ -105,7 +105,7 @@ export class PeerRouting { /** * Get the public key directly from a node */ - async * getPublicKeyFromNode (peer: PeerId, options: AbortOptions = {}): AsyncGenerator { + async * getPublicKeyFromNode (peer: PeerId, options: AbortOptions = {}): AsyncGenerator { const pkKey = utils.keyForPublicKey(peer) for await (const event of this._getValueSingle(peer, pkKey, options)) { diff --git a/test/kad-dht.spec.ts b/test/kad-dht.spec.ts index 935c7fa0..229d49aa 100644 --- a/test/kad-dht.spec.ts +++ b/test/kad-dht.spec.ts @@ -28,7 +28,7 @@ import type { DualKadDHT } from '../src/dual-kad-dht.js' import { pipe } from 'it-pipe' import map from 'it-map' -async function findEvent (events: AsyncIterable, name: 'FINAL_PEER'): Promise +async function findEvent (events: AsyncIterable, name: 'FINAL_PEER'): Promise async function findEvent (events: AsyncIterable, name: 'VALUE'): Promise async function findEvent (events: AsyncIterable, name: string): Promise { const eventTypes = new Set() diff --git a/test/routing-table.spec.ts b/test/routing-table.spec.ts index 5d95e104..18d2646d 100644 --- a/test/routing-table.spec.ts +++ b/test/routing-table.spec.ts @@ -113,8 +113,8 @@ describe('Routing Table', () => { let fn: (() => Promise) | undefined // execute queued functions immediately + // @ts-expect-error table.pingQueue = { - // @ts-expect-error add: async (f: () => Promise) => { fn = f }, @@ -175,8 +175,8 @@ describe('Routing Table', () => { let fn: (() => Promise) | undefined // execute queued functions immediately + // @ts-expect-error table.pingQueue = { - // @ts-expect-error add: async (f: () => Promise) => { fn = f }, From f0fe367869ddb8a7ccde5970142f4fa7828eea11 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 10 Mar 2023 09:40:49 +0100 Subject: [PATCH 5/5] chore: fix up tests --- package.json | 4 +- src/providers.ts | 10 +---- src/query/query-path.ts | 87 +++++++++++++++++++++++++---------------- 3 files changed, 56 insertions(+), 45 deletions(-) diff --git a/package.json b/package.json index ed047449..6b6a0fed 100644 --- a/package.json +++ b/package.json @@ -137,7 +137,7 @@ "test:chrome-webworker": "aegir test -t webworker", "test:firefox": "aegir test -t browser -- --browser firefox", "test:firefox-webworker": "aegir test -t webworker -- --browser firefox", - "dep-check": "aegir dep-check", + "dep-check": "aegir dep-check -i protons", "release": "aegir release", "docs": "aegir docs" }, @@ -163,7 +163,6 @@ "abortable-iterator": "^4.0.2", "any-signal": "^3.0.0", "datastore-core": "^8.0.1", - "events": "^3.3.0", "hashlru": "^2.3.0", "interface-datastore": "^7.0.0", "it-all": "^2.0.0", @@ -202,7 +201,6 @@ "execa": "^6.0.0", "it-filter": "^2.0.0", "it-last": "^2.0.0", - "it-pair": "^2.0.2", "lodash.random": "^3.2.0", "lodash.range": "^3.2.0", "p-retry": "^5.0.0", diff --git a/src/providers.ts b/src/providers.ts index e3bcc20b..f5121f9d 100644 --- a/src/providers.ts +++ b/src/providers.ts @@ -209,11 +209,9 @@ export class Providers implements Startable { * Get a list of providers for the given CID */ async getProviders (cid: CID): Promise { - let provs: Map = new Map() - - await this.syncQueue.add(async () => { + return await this.syncQueue.add(async () => { log('get providers for %s', cid) - provs = await this._getProvidersMap(cid) + const provs = await this._getProvidersMap(cid) return [...provs.keys()].map(peerIdStr => { return peerIdFromString(peerIdStr) @@ -224,10 +222,6 @@ export class Providers implements Startable { // type since p-queue@7.3.4 throwOnTimeout: true }) - - return [...provs.keys()].map(peerIdStr => { - return peerIdFromString(peerIdStr) - }) } } diff --git a/src/query/query-path.ts b/src/query/query-path.ts index 8e416627..4c34f400 100644 --- a/src/query/query-path.ts +++ b/src/query/query-path.ts @@ -6,6 +6,7 @@ import { CodeError } from '@libp2p/interfaces/errors' import { convertPeerId, convertBuffer } from '../utils.js' import { TimeoutController } from 'timeout-abort-controller' import { anySignal } from 'any-signal' +import { queryErrorEvent } from './events.js' import type { PeerId } from '@libp2p/interface-peer-id' import type { EventEmitter } from '@libp2p/interfaces/events' import type { CleanUpEvents } from './manager.js' @@ -117,46 +118,64 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator peerXor) { // eslint-disable-line max-depth + log('skipping %p as they are not closer to %b than %p', closerPeer.id, key, peer) + continue + } + + log('querying closer peer %p', closerPeer.id) + queryPeer(closerPeer.id, closerPeerKadId) } + } - const closerPeerKadId = await convertPeerId(closerPeer.id) - const closerPeerXor = BigInt('0x' + toString(xor(closerPeerKadId, kadId), 'base16')) - - // only continue query if closer peer is actually closer - if (closerPeerXor > peerXor) { // eslint-disable-line max-depth - log('skipping %p as they are not closer to %b than %p', closerPeer.id, key, peer) - continue - } + // TODO: we have upgraded to p-queue@7, this should no longer be necessary + queue.emit('completed', event) + } - log('querying closer peer %p', closerPeer.id) - queryPeer(closerPeer.id, closerPeerKadId) - } + timeout?.clear() + } catch (err: any) { + if (signal.aborted) { + // TODO: we have upgraded to p-queue@7, this should no longer be necessary + queue.emit('error', err) + } else { + // TODO: we have upgraded to p-queue@7, this should no longer be necessary + queue.emit('completed', queryErrorEvent({ + from: peer, + error: err + })) } + } finally { + timeout?.clear() } - - timeout?.clear() }, { // use xor value as the queue priority - closer peers should execute first // subtract it from MAX_XOR because higher priority values execute sooner