Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

feat: dht client #3947

Merged
merged 27 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c84cb70
feat: dht client
achingbrain Nov 16, 2021
7b876ba
Merge remote-tracking branch 'origin/master' into feat/dht-client
achingbrain Nov 22, 2021
052c4f3
chore: remove bitswap gh version
achingbrain Nov 22, 2021
535ade2
chore: linting
achingbrain Nov 22, 2021
0caf2e6
chore: tests
achingbrain Nov 22, 2021
795d591
chore: fix unit tests
achingbrain Nov 23, 2021
3e277dc
chore: update repo
achingbrain Nov 23, 2021
aec4243
chore: print events
achingbrain Nov 23, 2021
6a220d0
chore: remove redundant test
achingbrain Nov 23, 2021
f4d2472
chore: fix more tests
achingbrain Nov 24, 2021
32ff0d7
Merge remote-tracking branch 'origin/master' into feat/dht-client
achingbrain Nov 24, 2021
159a8d0
chore: update dht
achingbrain Nov 24, 2021
42587c2
chore: fix up ipns example
achingbrain Nov 25, 2021
01cf501
chore: why is this test failing
achingbrain Nov 25, 2021
0de8185
chore: print addr
achingbrain Nov 25, 2021
69508c5
chore: update dht version
achingbrain Nov 25, 2021
e586a57
chore: ensure nodes are reachable before running dht tests
achingbrain Nov 25, 2021
328a974
chore: ensure we have actually queried target node
achingbrain Nov 25, 2021
0d3b217
chore: ensure nodes can be reached
achingbrain Nov 25, 2021
bae733c
chore: disable debug logs
achingbrain Nov 25, 2021
10819b6
chore: renable all tests
achingbrain Nov 25, 2021
084c51e
chore: no need to start wan dht in server mode
achingbrain Nov 25, 2021
146a659
chore: update libp2p version
achingbrain Nov 25, 2021
8ec62aa
chore: update deps
achingbrain Dec 2, 2021
8a3c79f
chore: linting
achingbrain Dec 2, 2021
eb90a09
chore: use a noise fork until the proper release
achingbrain Dec 3, 2021
de601b9
chore: update iso-random-stream
achingbrain Dec 3, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"description": "JavaScript implementation of the IPFS specification",
"scripts": {
"link": "lerna link",
"reset": "lerna run clean && rimraf packages/*/node_modules node_modules",
"reset": "lerna run clean && rimraf packages/*/node_modules node_modules package-lock.json packages/*/package-lock.json",
"test": "lerna run test",
"test:node": "lerna run test:node",
"test:browser": "lerna run test:browser",
Expand Down
10 changes: 5 additions & 5 deletions packages/interface-ipfs-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@
"ipfs-unixfs": "^6.0.3",
"ipfs-unixfs-importer": "^9.0.3",
"ipfs-utils": "^9.0.2",
"ipns": "^0.15.0",
"ipns": "^0.16.0",
"is-ipfs": "^6.0.1",
"iso-random-stream": "^2.0.0",
"iso-random-stream": "^2.0.2",
"it-all": "^1.0.4",
"it-buffer-stream": "^2.0.0",
"it-concat": "^2.0.0",
Expand All @@ -90,7 +90,7 @@
"it-pushable": "^1.4.2",
"it-tar": "^4.0.0",
"it-to-buffer": "^2.0.0",
"libp2p-crypto": "^0.19.7",
"libp2p-crypto": "^0.21.0",
"libp2p-websockets": "^0.16.2",
"multiaddr": "^10.0.0",
"multiformats": "^9.4.13",
Expand All @@ -99,9 +99,9 @@
"p-map": "^4.0.0",
"p-retry": "^4.5.0",
"pako": "^1.0.2",
"peer-id": "^0.15.1",
"peer-id": "^0.16.0",
"readable-stream": "^3.4.0",
"sinon": "^11.1.1",
"sinon": "^12.0.01",
"uint8arrays": "^3.0.0"
},
"devDependencies": {
Expand Down
7 changes: 4 additions & 3 deletions packages/interface-ipfs-core/src/dht/disabled.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import { expect } from 'aegir/utils/chai.js'
import { getDescribe, getIt } from '../utils/mocha.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import all from 'it-all'

/**
* @typedef {import('ipfsd-ctl').Factory} Factory
Expand Down Expand Up @@ -42,8 +42,9 @@ export function testDisabled (factory, options) {
after(() => factory.clean())

it('should error when DHT not available', async () => {
await expect(nodeA.dht.get(uint8ArrayFromString('/ipns/Qme6KJdKcp85TYbLxuLV7oQzMiLremD7HMoXLZEmgo6Rnh')))
.to.eventually.be.rejected()
const events = await all(nodeA.dht.get('/ipns/12D3KooWQMSMXmsBvs5YDEQ6tXsaFv9tjuzmDmEvusaiQSFdrJdN'))

expect(events.filter(event => event.name === 'QUERY_ERROR')).to.not.be.empty()
})
})
}
36 changes: 24 additions & 12 deletions packages/interface-ipfs-core/src/dht/find-peer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import { expect } from 'aegir/utils/chai.js'
import { getDescribe, getIt } from '../utils/mocha.js'
import testTimeout from '../utils/test-timeout.js'
import drain from 'it-drain'
import all from 'it-all'
import { ensureReachable } from './utils.js'

/**
* @typedef {import('ipfsd-ctl').Factory} Factory
Expand All @@ -23,41 +26,50 @@ export function testFindPeer (factory, options) {
let nodeA
/** @type {import('ipfs-core-types').IPFS} */
let nodeB
/** @type {import('ipfs-core-types/src/root').IDResult} */
let nodeBId

before(async () => {
nodeA = (await factory.spawn()).api
nodeB = (await factory.spawn()).api
nodeBId = await nodeB.id()

await nodeA.swarm.connect(nodeBId.addresses[0])
await ensureReachable(nodeA, nodeB)
})

after(() => factory.clean())

it('should respect timeout option when finding a peer on the DHT', async () => {
const nodeBId = await nodeB.id()

await testTimeout(() => nodeA.dht.findPeer(nodeBId.id, {
await testTimeout(() => drain(nodeA.dht.findPeer(nodeBId.id, {
timeout: 1
}))
})))
})

it('should find other peers', async () => {
const nodeBId = await nodeB.id()
const res = await nodeA.dht.findPeer(nodeBId.id)
const id = res.id.toString()

const results = await all(nodeA.dht.findPeer(nodeBId.id))
const finalPeer = results.filter(event => event.name === 'FINAL_PEER').pop()

if (!finalPeer || finalPeer.name !== 'FINAL_PEER') {
throw new Error('No finalPeer event received')
}

const id = finalPeer.peer.id
const nodeAddresses = nodeBId.addresses.map((addr) => addr.nodeAddress())
const peerAddresses = res.addrs.map(ma => ma.nodeAddress())
const peerAddresses = finalPeer.peer.multiaddrs.map(ma => ma.nodeAddress())

expect(id).to.be.eql(nodeBId.id)
expect(id).to.equal(nodeBId.id)
expect(peerAddresses).to.deep.include(nodeAddresses[0])
})

it('should fail to find other peer if peer does not exist', () => {
return expect(nodeA.dht.findPeer('Qmd7qZS4T7xXtsNFdRoK1trfMs5zU94EpokQ9WFtxdPxsZ')).to.eventually.be.rejected()
it('should fail to find other peer if peer does not exist', async () => {
const events = await all(nodeA.dht.findPeer('Qmd7qZS4T7xXtsNFdRoK1trfMs5zU94EpokQ9WFtxdPxsZ'))

// no finalPeer events found
expect(events.filter(event => event.name === 'FINAL_PEER')).to.be.empty()

// queryError events found
expect(events.filter(event => event.name === 'QUERY_ERROR')).to.not.be.empty()
})
})
}
60 changes: 14 additions & 46 deletions packages/interface-ipfs-core/src/dht/find-provs.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { expect } from 'aegir/utils/chai.js'
import { getDescribe, getIt } from '../utils/mocha.js'
import all from 'it-all'
import drain from 'it-drain'
import { fakeCid } from './utils.js'
import testTimeout from '../utils/test-timeout.js'
import { ensureReachable } from './utils.js'

/**
* @typedef {import('ipfsd-ctl').Factory} Factory
Expand All @@ -20,34 +20,22 @@ export function testFindProvs (factory, options) {
const it = getIt(options)

describe('.dht.findProvs', function () {
this.timeout(20000)
this.timeout(80 * 1000)

/** @type {import('ipfs-core-types').IPFS} */
let nodeA
/** @type {import('ipfs-core-types').IPFS} */
let nodeB
/** @type {import('ipfs-core-types').IPFS} */
let nodeC
/** @type {import('ipfs-core-types/src/root').IDResult} */
let nodeAId
/** @type {import('ipfs-core-types/src/root').IDResult} */
let nodeBId
/** @type {import('ipfs-core-types/src/root').IDResult} */
let nodeCId

before(async () => {
nodeA = (await factory.spawn()).api
nodeB = (await factory.spawn()).api
nodeC = (await factory.spawn()).api

nodeAId = await nodeA.id()
nodeBId = await nodeB.id()
nodeCId = await nodeC.id()

await Promise.all([
nodeB.swarm.connect(nodeAId.addresses[0]),
nodeC.swarm.connect(nodeBId.addresses[0])
])
await ensureReachable(nodeB, nodeA)
await ensureReachable(nodeC, nodeB)
})

after(() => factory.clean())
Expand All @@ -57,8 +45,6 @@ export function testFindProvs (factory, options) {
*/
let providedCid
before('add providers for the same cid', async function () {
this.timeout(10 * 1000)

const cids = await Promise.all([
nodeB.object.new('unixfs-dir'),
nodeC.object.new('unixfs-dir')
Expand All @@ -79,38 +65,20 @@ export function testFindProvs (factory, options) {
})

it('should be able to find providers', async function () {
// @ts-ignore this is mocha
this.timeout(20 * 1000)

const provs = await all(nodeA.dht.findProvs(providedCid, { numProviders: 2 }))
const providerIds = provs.map((p) => p.id.toString())
/** @type {string[]} */
const providerIds = []

expect(providerIds).to.have.members([
nodeBId.id,
nodeCId.id
])
})

it('should take options to override timeout config', async function () {
const options = {
timeout: 1
for await (const event of nodeA.dht.findProvs(providedCid)) {
if (event.name === 'PROVIDER') {
providerIds.push(...event.providers.map(prov => prov.id))
}
}

const cidV0 = await fakeCid()
const start = Date.now()
let res

try {
res = await all(nodeA.dht.findProvs(cidV0, options))
} catch (/** @type {any} */ err) {
// rejected by http client
expect(err).to.have.property('name', 'TimeoutError')
return
}
const nodeBId = await nodeB.id()
const nodeCId = await nodeC.id()

// rejected by the server, errors don't work over http - https://github.com/ipfs/js-ipfs/issues/2519
expect(res).to.be.an('array').with.lengthOf(0)
expect(Date.now() - start).to.be.lessThan(100)
expect(providerIds).to.include(nodeBId.id)
expect(providerIds).to.include(nodeCId.id)
})
})
}
37 changes: 24 additions & 13 deletions packages/interface-ipfs-core/src/dht/get.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import { expect } from 'aegir/utils/chai.js'
import { getDescribe, getIt } from '../utils/mocha.js'
import testTimeout from '../utils/test-timeout.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import drain from 'it-drain'
import all from 'it-all'
import { ensureReachable } from './utils.js'

/**
* @typedef {import('ipfsd-ctl').Factory} Factory
Expand All @@ -19,19 +21,18 @@ export function testGet (factory, options) {
const it = getIt(options)

describe('.dht.get', function () {
this.timeout(80 * 1000)

/** @type {import('ipfs-core-types').IPFS} */
let nodeA
/** @type {import('ipfs-core-types').IPFS} */
let nodeB
/** @type {import('ipfs-core-types/src/root').IDResult} */
let nodeBId

before(async () => {
nodeA = (await factory.spawn()).api
nodeB = (await factory.spawn()).api
nodeBId = await nodeB.id()

await nodeA.swarm.connect(nodeBId.addresses[0])
await ensureReachable(nodeA, nodeB)
})

after(() => factory.clean())
Expand All @@ -40,23 +41,33 @@ export function testGet (factory, options) {
const data = await nodeA.add('should put a value to the DHT')
const publish = await nodeA.name.publish(data.cid)

await testTimeout(() => nodeB.dht.get(uint8ArrayFromString(`/ipns/${publish.name}`), {
await testTimeout(() => drain(nodeB.dht.get(`/ipns/${publish.name}`, {
timeout: 1
}))
})))
})

it('should error when getting a non-existent key from the DHT', () => {
return expect(nodeA.dht.get(uint8ArrayFromString('non-existing'), { timeout: 100 }))
.to.eventually.be.rejected
.and.be.an.instanceOf(Error)
it('should error when getting a non-existent key from the DHT', async () => {
const key = '/ipns/k51qzi5uqu5dl0dbfddy2wb42nvbc6anyxnkrguy5l0h0bv9kaih6j6vqdskqk'
const events = await all(nodeA.dht.get(key))

// no value events found
expect(events.filter(event => event.name === 'VALUE')).to.be.empty()

// queryError events found
expect(events.filter(event => event.name === 'QUERY_ERROR')).to.not.be.empty()
})

it('should get a value after it was put on another node', async () => {
const data = await nodeA.add('should put a value to the DHT')
const publish = await nodeA.name.publish(data.cid)
const record = await nodeA.dht.get(uint8ArrayFromString(`/ipns/${publish.name}`))
const events = await all(nodeA.dht.get(`/ipns/${publish.name}`))
const valueEvent = events.filter(event => event.name === 'VALUE').pop()

if (!valueEvent || valueEvent.name !== 'VALUE') {
throw new Error('Value event not found')
}

expect(uint8ArrayToString(record)).to.contain(data.cid.toString())
expect(uint8ArrayToString(valueEvent.value)).to.contain(data.cid.toString())
})
})
}
21 changes: 4 additions & 17 deletions packages/interface-ipfs-core/src/dht/provide.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { CID } from 'multiformats/cid'
import all from 'it-all'
import { expect } from 'aegir/utils/chai.js'
import { getDescribe, getIt } from '../utils/mocha.js'
import { ensureReachable } from './utils.js'

/**
* @typedef {import('ipfsd-ctl').Factory} Factory
Expand All @@ -27,8 +28,8 @@ export function testProvide (factory, options) {
before(async () => {
ipfs = (await factory.spawn()).api
const nodeB = (await factory.spawn()).api
const nodeBId = await nodeB.id()
await ipfs.swarm.connect(nodeBId.addresses[0])

await ensureReachable(ipfs, nodeB)
})

after(() => factory.clean())
Expand All @@ -48,28 +49,14 @@ export function testProvide (factory, options) {
.that.include('not found locally')
})

it('should allow multiple CIDs to be passed', async () => {
const res = await all(ipfs.addAll([
{ content: uint8ArrayFromString('t0') },
{ content: uint8ArrayFromString('t1') }
]))

await all(ipfs.dht.provide(res.map(f => f.cid)))
})

it('should provide a CIDv1', async () => {
const res = await ipfs.add(uint8ArrayFromString('test'), { cidVersion: 1 })
await all(ipfs.dht.provide(res.cid))
})

it('should error on non CID arg', () => {
it('should error on non CID arg', async () => {
// @ts-expect-error invalid arg
return expect(all(ipfs.dht.provide({}))).to.eventually.be.rejected()
})

it('should error on array containing non CID arg', () => {
// @ts-expect-error invalid arg
return expect(all(ipfs.dht.provide([{}]))).to.eventually.be.rejected()
})
})
}
Loading