Skip to content

Commit

Permalink
fix: update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Nov 22, 2024
1 parent d7a805d commit 4d08443
Show file tree
Hide file tree
Showing 17 changed files with 96 additions and 71 deletions.
2 changes: 1 addition & 1 deletion packages/kad-dht/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
"@multiformats/multiaddr": "^12.2.3",
"any-signal": "^4.1.1",
"interface-datastore": "^8.3.0",
"it-all": "^3.0.6",
"it-drain": "^3.0.7",
"it-length": "^3.0.6",
"it-length-prefixed": "^9.0.4",
Expand Down Expand Up @@ -99,7 +100,6 @@
"datastore-core": "^10.0.0",
"delay": "^6.0.0",
"execa": "^9.1.0",
"it-all": "^3.0.6",
"it-filter": "^3.1.0",
"it-last": "^3.0.6",
"it-pair": "^2.0.6",
Expand Down
3 changes: 2 additions & 1 deletion packages/kad-dht/src/content-fetching/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export interface ContentFetchingInit {
queryManager: QueryManager
network: Network
logPrefix: string
datastorePrefix: string
}

export class ContentFetching {
Expand All @@ -48,7 +49,7 @@ export class ContentFetching {

this.components = components
this.log = components.logger.forComponent(`${logPrefix}:content-fetching`)
this.datastorePrefix = `/${init.logPrefix.replaceAll(':', '/')}/record`
this.datastorePrefix = `${init.datastorePrefix}/record`
this.validators = validators
this.selectors = selectors
this.peerRouting = peerRouting
Expand Down
6 changes: 4 additions & 2 deletions packages/kad-dht/src/kad-dht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
peerRouting: this.peerRouting,
queryManager: this.queryManager,
network: this.network,
logPrefix
logPrefix,
datastorePrefix
})
this.contentRouting = new KADDHTContentRouting(components, {
network: this.network,
Expand All @@ -262,6 +263,7 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
validators: this.validators,
logPrefix,
metricsPrefix,
datastorePrefix,
peerInfoMapper: this.peerInfoMapper
})
this.topologyListener = new TopologyListener(components, {
Expand Down Expand Up @@ -315,7 +317,7 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka

await this.onPeerConnect(peerData)
}).catch(err => {
this.log.error('could not add %p to routing table', peerId, err)
this.log.error('could not add %p to routing table - %e', peerId, err)
})
})

Expand Down
8 changes: 5 additions & 3 deletions packages/kad-dht/src/providers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class Providers {

constructor (components: ProvidersComponents, init: ProvidersInit) {
this.log = components.logger.forComponent(`${init.logPrefix}:providers`)
this.datastorePrefix = `/${init.datastorePrefix}/provider`
this.datastorePrefix = `${init.datastorePrefix}/provider`
this.datastore = components.datastore
this.lock = init.lock
}
Expand Down Expand Up @@ -70,8 +70,9 @@ export class Providers {
const release = await this.lock.readLock()

try {
this.log('get providers for %s', cid)
this.log('get providers for %c', cid)
const provs = await this.loadProviders(cid)
this.log('got %d providers for %c', provs.size, cid)

return [...provs.keys()]
} finally {
Expand All @@ -94,8 +95,9 @@ export class Providers {
*/
private async loadProviders (cid: CID): Promise<PeerMap<Date>> {
const providers = new PeerMap<Date>()
const key = toProviderKey(this.datastorePrefix, cid)

for await (const entry of this.datastore.query({ prefix: toProviderKey(this.datastorePrefix, cid).toString() })) {
for await (const entry of this.datastore.query({ prefix: key.toString() })) {
const { peerId } = parseProviderKey(entry.key)
providers.set(peerId, readProviderTime(entry.value))
}
Expand Down
2 changes: 1 addition & 1 deletion packages/kad-dht/src/reprovider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
})
this.datastore = components.datastore
this.addressManager = components.addressManager
this.datastorePrefix = `/${init.datastorePrefix}/provider`
this.datastorePrefix = `${init.datastorePrefix}/provider`
this.reprovideThreshold = init.threshold ?? REPROVIDE_THRESHOLD
this.maxQueueSize = init.maxQueueSize ?? REPROVIDE_MAX_QUEUE_SIZE
this.validity = init.validity ?? PROVIDERS_VALIDITY
Expand Down
25 changes: 18 additions & 7 deletions packages/kad-dht/src/rpc/handlers/add-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import * as Digest from 'multiformats/hashes/digest'
import type { Message } from '../../message/dht.js'
import type { Providers } from '../../providers'
import type { DHTMessageHandler } from '../index.js'
import type { ComponentLogger, Logger, PeerId } from '@libp2p/interface'
import type { ComponentLogger, Logger, PeerId, PeerStore } from '@libp2p/interface'

export interface AddProviderComponents {
peerId: PeerId
peerStore: PeerStore
logger: ComponentLogger
}

Expand All @@ -18,12 +20,16 @@ export interface AddProviderHandlerInit {
}

export class AddProviderHandler implements DHTMessageHandler {
private readonly peerId: PeerId
private readonly providers: Providers
private readonly peerStore: PeerStore
private readonly log: Logger

constructor (components: AddProviderComponents, init: AddProviderHandlerInit) {
this.log = components.logger.forComponent(`${init.logPrefix}:rpc:handlers:add-provider`)
this.peerId = components.peerId
this.providers = init.providers
this.peerStore = components.peerStore
}

async handle (peerId: PeerId, msg: Message): Promise<Message | undefined> {
Expand All @@ -43,12 +49,16 @@ export class AddProviderHandler implements DHTMessageHandler {
this.log.error('no providers found in message')
}

this.log('%p asked us to store provider record for for %c', peerId, cid)
this.log('%p asked us, %p to store provider record for for %c', peerId, this.peerId, cid)

await Promise.all(
msg.providers.map(async (pi) => {
const digest = Digest.decode(pi.id)
const providerId = peerIdFromMultihash(digest)
const providerMultiaddrs = pi.multiaddrs.map(buf => multiaddr(buf))

// Ignore providers not from the originator
if (!peerId.equals(pi.id)) {
if (!peerId.equals(providerId)) {
this.log('invalid provider peer %p from %p', pi.id, peerId)
return
}
Expand All @@ -58,11 +68,12 @@ export class AddProviderHandler implements DHTMessageHandler {
return
}

this.log.trace('received provider %p for %s (addrs %s)', peerId, cid, pi.multiaddrs.map((m) => multiaddr(m).toString()))

const multihash = Digest.decode(pi.id)
this.log.trace('received provider %p for %s (addrs %s)', peerId, cid, providerMultiaddrs)

await this.providers.addProvider(cid, peerIdFromMultihash(multihash))
await this.providers.addProvider(cid, providerId)
await this.peerStore.merge(providerId, {
multiaddrs: providerMultiaddrs
})
})
)

Expand Down
50 changes: 18 additions & 32 deletions packages/kad-dht/src/rpc/handlers/get-providers.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { InvalidMessageError } from '@libp2p/interface'
import all from 'it-all'
import map from 'it-map'
import { CID } from 'multiformats/cid'
import { MessageType } from '../../message/dht.js'
import type { PeerInfoMapper } from '../../index.js'
Expand All @@ -17,11 +19,13 @@ export interface GetProvidersHandlerInit {
}

export interface GetProvidersHandlerComponents {
peerId: PeerId
peerStore: PeerStore
logger: ComponentLogger
}

export class GetProvidersHandler implements DHTMessageHandler {
private readonly peerId: PeerId
private readonly peerRouting: PeerRouting
private readonly providers: Providers
private readonly peerStore: PeerStore
Expand All @@ -32,6 +36,7 @@ export class GetProvidersHandler implements DHTMessageHandler {
const { peerRouting, providers, logPrefix } = init

this.log = components.logger.forComponent(`${logPrefix}:rpc:handlers:get-providers`)
this.peerId = components.peerId
this.peerStore = components.peerStore
this.peerRouting = peerRouting
this.providers = providers
Expand All @@ -52,27 +57,33 @@ export class GetProvidersHandler implements DHTMessageHandler {

this.log('%p asking for providers for %s', peerId, cid)

const [peers, closer] = await Promise.all([
this.providers.getProviders(cid),
this.peerRouting.getCloserPeersOffline(msg.key, peerId)
const [providerPeers, closerPeers] = await Promise.all([
all(map(await this.providers.getProviders(cid), async (peerId) => {
const peer = await this.peerStore.get(peerId)
const info: PeerInfo = {
id: peer.id,
multiaddrs: peer.addresses.map(({ multiaddr }) => multiaddr)
}

return info
})),
this.peerRouting.getCloserPeersOffline(msg.key, this.peerId)
])

const providerPeers = await this._getPeers(peers)
const closerPeers = await this._getPeers(closer.map(({ id }) => id))
const response: Message = {
type: MessageType.GET_PROVIDERS,
key: msg.key,
clusterLevel: msg.clusterLevel,
closer: closerPeers
.map(this.peerInfoMapper)
.filter(({ multiaddrs }) => multiaddrs.length)
.filter(({ id, multiaddrs }) => !id.equals(peerId) && multiaddrs.length > 0)
.map(peerInfo => ({
id: peerInfo.id.toMultihash().bytes,
multiaddrs: peerInfo.multiaddrs.map(ma => ma.bytes)
})),
providers: providerPeers
.map(this.peerInfoMapper)
.filter(({ multiaddrs }) => multiaddrs.length)
.filter(({ id, multiaddrs }) => !id.equals(peerId) && multiaddrs.length > 0)
.map(peerInfo => ({
id: peerInfo.id.toMultihash().bytes,
multiaddrs: peerInfo.multiaddrs.map(ma => ma.bytes)
Expand All @@ -87,29 +98,4 @@ export class GetProvidersHandler implements DHTMessageHandler {
async _getAddresses (peerId: PeerId): Promise<Multiaddr[]> {
return []
}

async _getPeers (peerIds: PeerId[]): Promise<PeerInfo[]> {
const output: PeerInfo[] = []

for (const peerId of peerIds) {
try {
const peer = await this.peerStore.get(peerId)

const peerAfterFilter = this.peerInfoMapper({
id: peerId,
multiaddrs: peer.addresses.map(({ multiaddr }) => multiaddr)
})

if (peerAfterFilter.multiaddrs.length > 0) {
output.push(peerAfterFilter)
}
} catch (err: any) {
if (err.name !== 'NotFoundError') {
throw err
}
}
}

return output
}
}
3 changes: 2 additions & 1 deletion packages/kad-dht/src/rpc/handlers/get-value.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type { Datastore } from 'interface-datastore'
export interface GetValueHandlerInit {
peerRouting: PeerRouting
logPrefix: string
datastorePrefix: string
}

export interface GetValueHandlerComponents {
Expand All @@ -32,7 +33,7 @@ export class GetValueHandler implements DHTMessageHandler {

constructor (components: GetValueHandlerComponents, init: GetValueHandlerInit) {
this.log = components.logger.forComponent(`${init.logPrefix}:rpc:handlers:get-value`)
this.datastorePrefix = `/${init.logPrefix.replaceAll(':', '/')}/record`
this.datastorePrefix = `${init.datastorePrefix}/record`
this.peerStore = components.peerStore
this.datastore = components.datastore
this.peerRouting = init.peerRouting
Expand Down
3 changes: 2 additions & 1 deletion packages/kad-dht/src/rpc/handlers/put-value.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { Datastore } from 'interface-datastore'
export interface PutValueHandlerInit {
validators: Validators
logPrefix: string
datastorePrefix: string
}

export interface PutValueHandlerComponents {
Expand All @@ -29,7 +30,7 @@ export class PutValueHandler implements DHTMessageHandler {

this.components = components
this.log = components.logger.forComponent(`${init.logPrefix}:rpc:handlers:put-value`)
this.datastorePrefix = `/${init.logPrefix.replaceAll(':', '/')}/record`
this.datastorePrefix = `${init.datastorePrefix}/record`
this.validators = validators
}

Expand Down
1 change: 1 addition & 0 deletions packages/kad-dht/src/rpc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export interface RPCInit {
validators: Validators
logPrefix: string
metricsPrefix: string
datastorePrefix: string
peerInfoMapper: PeerInfoMapper
}

Expand Down
26 changes: 13 additions & 13 deletions packages/kad-dht/test/content-routing.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ describe('content routing', () => {
dhts[2].components.peerId.toString()
].sort(), 'did not send ADD_PROVIDER to network peers')

// let all messages be processed
await delay(1000)

// Expect each DHT to find the provider of each value
for (const d of dhts) {
const events = await all(d.findProviders(cid))
Expand All @@ -102,12 +99,12 @@ describe('content routing', () => {
})

it('provides if in server mode', async function () {
const dhts = await Promise.all([
const dhts = await sortDHTs(await Promise.all([
tdht.spawn(),
tdht.spawn(),
tdht.spawn(),
tdht.spawn()
])
]), await kadUtils.convertBuffer(cid.multihash.bytes))

// connect peers
await Promise.all([
Expand All @@ -128,11 +125,11 @@ describe('content routing', () => {
it('find providers', async function () {
this.timeout(20 * 1000)

const dhts = await Promise.all([
const dhts = await sortDHTs(await Promise.all([
tdht.spawn(),
tdht.spawn(),
tdht.spawn()
])
]), await kadUtils.convertBuffer(cid.multihash.bytes))

// Connect
await Promise.all([
Expand Down Expand Up @@ -160,11 +157,11 @@ describe('content routing', () => {
it('find providers from client', async function () {
this.timeout(20 * 1000)

const dhts = await Promise.all([
const dhts = await sortDHTs(await Promise.all([
tdht.spawn(),
tdht.spawn(),
tdht.spawn()
])
]), await kadUtils.convertBuffer(cid.multihash.bytes))
const clientDHT = await tdht.spawn({ clientMode: true })

// Connect
Expand All @@ -174,7 +171,10 @@ describe('content routing', () => {
tdht.connect(dhts[1], dhts[2])
])

await Promise.all(dhts.map(async (dht) => { await drain(dht.provide(cid)) }))
await drain(dhts[2].provide(cid))

// wait for messages to be handled
await delay(1000)

const events = await all(clientDHT.findProviders(cid))

Expand All @@ -188,16 +188,16 @@ describe('content routing', () => {

return acc
}, {}))
expect(provs).to.have.length(3)
expect(provs).to.have.length(1)
})

it('find provider published by client', async function () {
this.timeout(20 * 1000)

const dhts = await Promise.all([
const dhts = await sortDHTs(await Promise.all([
tdht.spawn(),
tdht.spawn()
])
]), await kadUtils.convertBuffer(cid.multihash.bytes))
const clientDHT = await tdht.spawn({ clientMode: true })

// Connect
Expand Down
Loading

0 comments on commit 4d08443

Please sign in to comment.