diff --git a/README.md b/README.md index 25e9621..a2bb0c5 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,9 @@ # @libp2p/delegated-peer-routing [![libp2p.io](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/) -[![IRC](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p) [![Discuss](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg?style=flat-square)](https://discuss.libp2p.io) [![codecov](https://img.shields.io/codecov/c/github/libp2p/js-libp2p-delegated-peer-routing.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p-delegated-peer-routing) -[![CI](https://img.shields.io/github/workflow/status/libp2p/js-libp2p-interfaces/test%20&%20maybe%20release/master?style=flat-square)](https://github.com/libp2p/js-libp2p-delegated-peer-routing/actions/workflows/js-test-and-release.yml) +[![CI](https://img.shields.io/github/workflow/status/libp2p/js-libp2p-delegated-peer-routing/test%20&%20maybe%20release/master?style=flat-square)](https://github.com/libp2p/js-libp2p-delegated-peer-routing/actions/workflows/js-test-and-release.yml) > Leverage other peers in the libp2p network to perform Peer Routing calls. @@ -37,30 +36,28 @@ npm install ipfs-http-client libp2p-delegated-peer-routing ## Example ```js -import { createEd25519PeerId } from '@libp2p/peer-id-factory' -import { DelegatedPeerRouting } from '@libp2p/delegated-peer-routing') +import { createLibp2p } from 'libp2p' +import { delegatedPeerRouting } from '@libp2p/delegated-peer-routing') import { create as createIpfsHttpClient } from 'ipfs-http-client') // default is to use ipfs.io -const routing = new DelegatedPeerRouting(createIpfsHttpClient({ +const client = createIpfsHttpClient({ // use default api settings protocol: 'https', port: 443, host: 'node0.delegate.ipfs.io' -})) - -try { - for await (const event of routing.findPeer('peerid')) { - console.log('query event', event) - } -} catch (err) { - console.error(err) -} - -const peerId = await createEd25519PeerId() -for await (const event of routing.getClosestPeers(peerId.id)) { - console.log('query event', event) -} +}) + +const node = await createLibp2p({ + peerRouting: [ + delegatedPeerRouting(client) + ] + //.. other config +}) +await node.start() + +const peerInfo = await node.peerRouting.findPeer('peerid') +console.log('peerInfo', peerInfo) ``` ## License diff --git a/package.json b/package.json index 3d2cd18..63ba65f 100644 --- a/package.json +++ b/package.json @@ -143,7 +143,7 @@ "@libp2p/peer-id": "^1.1.11", "any-signal": "^3.0.1", "err-code": "^3.0.1", - "multiformats": "^9.6.3", + "multiformats": "^10.0.0", "p-defer": "^4.0.0", "p-queue": "^7.2.0" }, @@ -155,7 +155,8 @@ "ipfsd-ctl": "^12.0.2", "it-all": "^1.0.6", "it-drain": "^1.0.5", - "uint8arrays": "^3.0.0", + "timeout-abort-controller": "^3.0.0", + "uint8arrays": "^4.0.2", "wherearewe": "^2.0.1" }, "browser": { diff --git a/src/index.ts b/src/index.ts index 958eb36..95198ad 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,7 +5,6 @@ import defer from 'p-defer' import errCode from 'err-code' import anySignal from 'any-signal' import type { PeerId } from '@libp2p/interface-peer-id' -import type { IPFSHTTPClient, HTTPClientExtraOptions } from 'ipfs-http-client' import type { AbortOptions } from 'ipfs-core-types/src/utils' import type { PeerRouting } from '@libp2p/interface-peer-routing' import type { PeerInfo } from '@libp2p/interface-peer-info' @@ -17,8 +16,107 @@ const log = logger('libp2p-delegated-peer-routing') const DEFAULT_TIMEOUT = 30e3 // 30 second default const CONCURRENT_HTTP_REQUESTS = 4 -export class DelegatedPeerRouting implements PeerRouting, Startable { - private readonly client: IPFSHTTPClient +export interface HTTPClientExtraOptions { + headers?: Record + searchParams?: URLSearchParams +} + +export enum EventTypes { + SENDING_QUERY = 0, + PEER_RESPONSE, + FINAL_PEER, + QUERY_ERROR, + PROVIDER, + VALUE, + ADDING_PEER, + DIALING_PEER +} + +/** + * The types of messages set/received during DHT queries + */ +export enum MessageType { + PUT_VALUE = 0, + GET_VALUE, + ADD_PROVIDER, + GET_PROVIDERS, + FIND_NODE, + PING +} + +export type MessageName = keyof typeof MessageType + +export interface DHTRecord { + key: Uint8Array + value: Uint8Array + timeReceived?: Date +} + +export interface SendingQueryEvent { + type: EventTypes.SENDING_QUERY + name: 'SENDING_QUERY' +} + +export interface PeerResponseEvent { + from: PeerId + type: EventTypes.PEER_RESPONSE + name: 'PEER_RESPONSE' + messageType: MessageType + messageName: MessageName + providers: PeerInfo[] + closer: PeerInfo[] + record?: DHTRecord +} + +export interface FinalPeerEvent { + peer: PeerInfo + type: EventTypes.FINAL_PEER + name: 'FINAL_PEER' +} + +export interface QueryErrorEvent { + type: EventTypes.QUERY_ERROR + name: 'QUERY_ERROR' + error: Error +} + +export interface ProviderEvent { + type: EventTypes.PROVIDER + name: 'PROVIDER' + providers: PeerInfo[] +} + +export interface ValueEvent { + type: EventTypes.VALUE + name: 'VALUE' + value: Uint8Array +} + +export interface AddingPeerEvent { + type: EventTypes.ADDING_PEER + name: 'ADDING_PEER' + peer: PeerId +} + +export interface DialingPeerEvent { + peer: PeerId + type: EventTypes.DIALING_PEER + name: 'DIALING_PEER' +} + +export type QueryEvent = SendingQueryEvent | PeerResponseEvent | FinalPeerEvent | QueryErrorEvent | ProviderEvent | ValueEvent | AddingPeerEvent | DialingPeerEvent + +export interface Delegate { + getEndpointConfig: () => { protocol: string, host: string, port: string } + + dht: { + findPeer: (peerId: PeerId, options?: AbortOptions) => AsyncIterable + query: (peerId: PeerId | CID, options?: AbortOptions) => AsyncIterable + } +} + +class DelegatedPeerRouting implements PeerRouting, Startable { + private readonly client: Delegate private readonly httpQueue: PQueue private started: boolean private abortController: AbortController @@ -26,7 +124,7 @@ export class DelegatedPeerRouting implements PeerRouting, Startable { /** * Create a new DelegatedPeerRouting instance */ - constructor (client: IPFSHTTPClient) { + constructor (client: Delegate) { if (client == null) { throw new Error('missing ipfs http client') } @@ -153,3 +251,7 @@ export class DelegatedPeerRouting implements PeerRouting, Startable { } } } + +export function delegatedPeerRouting (client: Delegate): (components?: any) => PeerRouting { + return () => new DelegatedPeerRouting(client) +} diff --git a/test/index.spec.ts b/test/index.spec.ts index 295ac3d..94119e9 100644 --- a/test/index.spec.ts +++ b/test/index.spec.ts @@ -3,8 +3,8 @@ import { expect } from 'aegir/chai' import { Controller, createFactory } from 'ipfsd-ctl' import { isElectronMain, isNode } from 'wherearewe' -import { create } from 'ipfs-http-client' -import { DelegatedPeerRouting } from '../src/index.js' +import { create, Options, CID as IPFSCID } from 'ipfs-http-client' +import { delegatedPeerRouting } from '../src/index.js' // @ts-expect-error no types import goIpfs from 'go-ipfs' import { peerIdFromString } from '@libp2p/peer-id' @@ -14,6 +14,11 @@ import pDefer from 'p-defer' import drain from 'it-drain' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import type { IDResult } from 'ipfs-core-types/src/root' +import { CID } from 'multiformats/cid' +import type { AbortOptions } from '@libp2p/interfaces' +import type { PeerId } from '@libp2p/interface-peer-id' +import { stop } from '@libp2p/interfaces/startable' +import { TimeoutController } from 'timeout-abort-controller' const factory = createFactory({ type: 'go', @@ -41,6 +46,33 @@ async function spawnNode (bootstrap: any[] = []) { } } +function createIpfsClient (opts: Options) { + const client = create(opts) + + return { + getEndpointConfig: () => client.getEndpointConfig(), + block: { + async stat (cid: CID, options?: AbortOptions) { + const result = await client.block.stat(IPFSCID.parse(cid.toString()), options) + + return { + cid: CID.parse(result.cid.toString()), + size: result.size + } + } + }, + dht: { + async * findPeer (peerId: PeerId, options?: AbortOptions) { + yield * client.dht.findPeer(peerId, options) + }, + async * query (peerId: PeerId | CID, options?: AbortOptions) { + // @ts-expect-error CID types can be different + yield * client.dht.query(peerId, options) + } + } + } +} + describe('DelegatedPeerRouting', function () { this.timeout(20 * 1000) // we're spawning daemons, give ci some time @@ -77,16 +109,16 @@ describe('DelegatedPeerRouting', function () { describe('create', () => { it('should require an http api client instance at construction time', () => { // @ts-expect-error invalid parameters - expect(() => new DelegatedPeerRouting()).to.throw() + expect(() => delegatedPeerRouting()()).to.throw() }) it('should accept an http api client instance at construction time', () => { - const client = create({ + const client = createIpfsClient({ protocol: 'http', port: 8000, host: 'localhost' }) - const router = new DelegatedPeerRouting(client) + const router = delegatedPeerRouting(client)() expect(router).to.have.property('client') .that.has.property('getEndpointConfig') @@ -104,11 +136,11 @@ describe('DelegatedPeerRouting', function () { it('should be able to find peers via the delegate with a peer id string', async () => { const opts = delegatedNode.apiAddr.toOptions() - const router = new DelegatedPeerRouting(create({ + const router = delegatedPeerRouting(createIpfsClient({ protocol: 'http', port: opts.port, host: opts.host - })) + }))() const peer = await router.findPeer(peerIdToFind.id) @@ -120,11 +152,11 @@ describe('DelegatedPeerRouting', function () { it('should be able to find peers via the delegate with a peerid', async () => { const opts = delegatedNode.apiAddr.toOptions() - const router = new DelegatedPeerRouting(create({ + const router = delegatedPeerRouting(createIpfsClient({ protocol: 'http', port: opts.port, host: opts.host - })) + }))() const peer = await router.findPeer(peerIdToFind.id) @@ -137,28 +169,31 @@ describe('DelegatedPeerRouting', function () { it('should be able to specify a timeout', async () => { const opts = delegatedNode.apiAddr.toOptions() - const router = new DelegatedPeerRouting(create({ + const router = delegatedPeerRouting(createIpfsClient({ protocol: 'http', port: opts.port, host: opts.host - })) + }))() + const controller = new TimeoutController(5e3) - const peer = await router.findPeer(peerIdToFind.id, { timeout: 2000 }) + const peer = await router.findPeer(peerIdToFind.id, { signal: controller.signal }) const { id, multiaddrs } = peer expect(id).to.exist() expect(multiaddrs).to.exist() expect(id.toString()).to.eql(peerIdToFind.id.toString()) + + controller.clear() }) it('should not be able to find peers not on the network', async () => { const opts = delegatedNode.apiAddr.toOptions() - const router = new DelegatedPeerRouting(create({ + const router = delegatedPeerRouting(createIpfsClient({ protocol: 'http', port: opts.port, host: opts.host - })) + }))() // This is one of the default Bootstrap nodes, but we're not connected to it // so we'll test with it. @@ -171,11 +206,11 @@ describe('DelegatedPeerRouting', function () { it('should be able to query for the closest peers', async () => { const opts = delegatedNode.apiAddr.toOptions() - const router = new DelegatedPeerRouting(create({ + const router = delegatedPeerRouting(createIpfsClient({ protocol: 'http', port: opts.port, host: opts.host - })) + }))() const nodeId = await delegatedNode.api.id() const delegatePeerId = nodeId.id @@ -196,11 +231,11 @@ describe('DelegatedPeerRouting', function () { it('should find closest peers even if the peer does not exist', async () => { const opts = delegatedNode.apiAddr.toOptions() - const router = new DelegatedPeerRouting(create({ + const router = delegatedPeerRouting(createIpfsClient({ protocol: 'http', port: opts.port, host: opts.host - })) + }))() const nodeId = await delegatedNode.api.id() const delegatePeerId = nodeId.id @@ -221,11 +256,11 @@ describe('DelegatedPeerRouting', function () { describe('stop', () => { it('should cancel in-flight requests when stopping', async () => { const opts = delegatedNode.apiAddr.toOptions() - const router = new DelegatedPeerRouting(create({ + const router = delegatedPeerRouting(createIpfsClient({ protocol: 'http', port: opts.port, host: opts.host - })) + }))() const deferred = pDefer() const peer = uint8ArrayFromString('QmVv4Wz46JaZJeH5PMV4LGbRiiMKEmszPYY3g6fjGnVXBs', 'base58btc') @@ -238,7 +273,7 @@ describe('DelegatedPeerRouting', function () { deferred.resolve(err) }) - await router.stop() + await stop(router) await expect(deferred.promise).to.eventually.have.property('message').that.matches(/aborted/) }) })