diff --git a/.aegir.cjs b/.aegir.js similarity index 55% rename from .aegir.cjs rename to .aegir.js index feae220..00643ad 100644 --- a/.aegir.cjs +++ b/.aegir.js @@ -1,9 +1,10 @@ -'use strict' +import { createServer } from 'ipfsd-ctl' +import * as ipfsHttpModule from 'ipfs-http-client' +import goIpfsModule from 'go-ipfs' -const { createServer } = require('ipfsd-ctl') let server -module.exports = { +export default { test: { before: async () => { server = createServer({ @@ -11,8 +12,8 @@ module.exports = { port: 57583 }, { type: 'go', - ipfsHttpModule: require('ipfs-http-client'), - ipfsBin: require('go-ipfs').path(), + ipfsHttpModule, + ipfsBin: goIpfsModule.path(), test: true }) diff --git a/package.json b/package.json index 92a43aa..0037ebc 100644 --- a/package.json +++ b/package.json @@ -116,36 +116,39 @@ ] }, "scripts": { + "clean": "aegir clean", "lint": "aegir lint", - "dep-check": "aegir dep-check dist/src/**/*.js dist/test/**/*.js", - "build": "tsc", - "pretest": "npm run build", - "test": "aegir test -f ./dist/test/*.js -f ./dist/test/**/*.js", - "test:chrome": "npm run test -- -t browser --cov", - "test:chrome-webworker": "npm run test -- -t webworker", - "test:firefox": "npm run test -- -t browser -- --browser firefox", - "test:firefox-webworker": "npm run test -- -t webworker -- --browser firefox", - "test:node": "npm run test -- -t node --cov", - "test:electron-main": "npm run test -- -t electron-main", - "release": "semantic-release" + "dep-check": "aegir dep-check", + "build": "aegir build", + "test": "aegir test", + "test:chrome": "aegir test -t browser --cov", + "test:chrome-webworker": "aegir test -t webworker", + "test:firefox": "aegir test -t browser -- --browser firefox", + "test:firefox-webworker": "aegir test -t webworker -- --browser firefox", + "test:node": "aegir test -t node --cov", + "test:electron-main": "aegir test -t electron-main", + "release": "aegir release" }, "dependencies": { - "@libp2p/interfaces": "^1.3.18", - "@libp2p/logger": "^1.1.2", + "@libp2p/interfaces": "^1.3.32", + "@libp2p/logger": "^1.1.4", + "@libp2p/peer-id": "^1.1.10", "@multiformats/multiaddr": "^10.1.7", + "any-signal": "^3.0.1", "err-code": "^3.0.1", "multiformats": "^9.6.3", "p-defer": "^4.0.0", "p-queue": "^7.2.0" }, "devDependencies": { - "@libp2p/peer-id": "^1.1.8", - "@libp2p/peer-id-factory": "^1.0.8", - "aegir": "^36.1.3", + "@libp2p/peer-id-factory": "^1.0.10", + "aegir": "^37.0.15", "go-ipfs": "^0.12.0", - "ipfs-http-client": "^56.0.1", - "ipfsd-ctl": "^10.0.6", + "ipfs-http-client": "^56.0.3", + "ipfsd-ctl": "^11.0.0", "it-all": "^1.0.6", + "it-drain": "^1.0.5", + "uint8arrays": "^3.0.0", "wherearewe": "^1.0.0" }, "browser": { diff --git a/src/index.ts b/src/index.ts index d85d622..aab366b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,21 +5,25 @@ import defer from 'p-defer' import errCode from 'err-code' import { Multiaddr } from '@multiformats/multiaddr' import { peerIdFromString } from '@libp2p/peer-id' +import anySignal from 'any-signal' import type { PeerId } from '@libp2p/interfaces/peer-id' import type { IPFSHTTPClient } from 'ipfs-http-client' import type { HTTPClientExtraOptions } from 'ipfs-http-client/types/src/types' import type { AbortOptions } from 'ipfs-core-types/src/utils' import type { PeerRouting } from '@libp2p/interfaces/peer-routing' import type { PeerInfo } from '@libp2p/interfaces/peer-info' +import type { Startable } from '@libp2p/interfaces/startable' const log = logger('libp2p-delegated-peer-routing') const DEFAULT_TIMEOUT = 30e3 // 30 second default const CONCURRENT_HTTP_REQUESTS = 4 -export class DelegatedPeerRouting implements PeerRouting { +export class DelegatedPeerRouting implements PeerRouting, Startable { private readonly client: IPFSHTTPClient private readonly httpQueue: PQueue + private started: boolean + private abortController: AbortController /** * Create a new DelegatedPeerRouting instance @@ -30,6 +34,8 @@ export class DelegatedPeerRouting implements PeerRouting { } this.client = client + this.started = false + this.abortController = new AbortController() // limit concurrency to avoid request flood in web browser // https://github.com/libp2p/js-libp2p-delegated-content-routing/issues/12 @@ -46,12 +52,28 @@ export class DelegatedPeerRouting implements PeerRouting { log(`enabled DelegatedPeerRouting via ${protocol}://${host}:${port}`) } + isStarted () { + return this.started + } + + start () { + this.started = true + } + + stop () { + this.httpQueue.clear() + this.abortController.abort() + this.abortController = new AbortController() + this.started = false + } + /** * Attempts to find the given peer */ async findPeer (id: PeerId, options: HTTPClientExtraOptions & AbortOptions = {}) { log('findPeer starts: %p', id) options.timeout = options.timeout ?? DEFAULT_TIMEOUT + options.signal = anySignal([this.abortController.signal].concat((options.signal != null) ? [options.signal] : [])) const onStart = defer() const onFinish = defer() @@ -64,9 +86,7 @@ export class DelegatedPeerRouting implements PeerRouting { try { await onStart.promise - for await (const event of this.client.dht.findPeer(id.toString(), { - timeout: options.timeout - })) { + for await (const event of this.client.dht.findPeer(id.toString(), options)) { if (event.name === 'FINAL_PEER') { const peerInfo: PeerInfo = { id: peerIdFromString(event.peer.id), @@ -97,6 +117,7 @@ export class DelegatedPeerRouting implements PeerRouting { log('getClosestPeers starts:', keyStr) options.timeout = options.timeout ?? DEFAULT_TIMEOUT + options.signal = anySignal([this.abortController.signal].concat((options.signal != null) ? [options.signal] : [])) const onStart = defer() const onFinish = defer() @@ -109,9 +130,7 @@ export class DelegatedPeerRouting implements PeerRouting { try { await onStart.promise - for await (const event of this.client.dht.query(keyStr, { - timeout: options.timeout - })) { + for await (const event of this.client.dht.query(keyStr, options)) { if (event.name === 'PEER_RESPONSE') { yield * event.closer.map(closer => ({ id: peerIdFromString(closer.id), diff --git a/test/index.spec.ts b/test/index.spec.ts index 047d135..264fe2a 100644 --- a/test/index.spec.ts +++ b/test/index.spec.ts @@ -1,6 +1,6 @@ /* eslint-env mocha */ -import { expect } from 'aegir/utils/chai.js' +import { expect } from 'aegir/chai' import { Controller, createFactory } from 'ipfsd-ctl' import { isNode } from 'wherearewe' import { create } from 'ipfs-http-client' @@ -10,6 +10,9 @@ import goIpfs from 'go-ipfs' import { peerIdFromString } from '@libp2p/peer-id' import { createEd25519PeerId } from '@libp2p/peer-id-factory' import all from 'it-all' +import pDefer from 'p-defer' +import drain from 'it-drain' +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import type { IDResult } from 'ipfs-core-types/src/root' const factory = createFactory({ @@ -214,4 +217,29 @@ describe('DelegatedPeerRouting', function () { }) }) }) + + describe('stop', () => { + it('should cancel in-flight requests when stopping', async () => { + const opts = delegatedNode.apiAddr.toOptions() + const router = new DelegatedPeerRouting(create({ + protocol: 'http', + port: opts.port, + host: opts.host + })) + + const deferred = pDefer() + const peer = uint8ArrayFromString('QmVv4Wz46JaZJeH5PMV4LGbRiiMKEmszPYY3g6fjGnVXBs') + + void drain(router.getClosestPeers(peer)) + .then(() => { + deferred.reject(new Error('Did not abort')) + }) + .catch(err => { + deferred.resolve(err) + }) + + await router.stop() + await expect(deferred.promise).to.eventually.have.property('message').that.matches(/aborted/) + }) + }) })