Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: abort in flight requests when shutting down #68

Merged
merged 2 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions .aegir.cjs → .aegir.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
'use strict'
import { createServer } from 'ipfsd-ctl'
import * as ipfsHttpModule from 'ipfs-http-client'
import goIpfsModule from 'go-ipfs'

const { createServer } = require('ipfsd-ctl')
let server

module.exports = {
export default {
test: {
before: async () => {
server = createServer({
host: '127.0.0.1',
port: 57583
}, {
type: 'go',
ipfsHttpModule: require('ipfs-http-client'),
ipfsBin: require('go-ipfs').path(),
ipfsHttpModule,
ipfsBin: goIpfsModule.path(),
test: true
})

Expand Down
39 changes: 21 additions & 18 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,36 +116,39 @@
]
},
"scripts": {
"clean": "aegir clean",
"lint": "aegir lint",
"dep-check": "aegir dep-check dist/src/**/*.js dist/test/**/*.js",
"build": "tsc",
"pretest": "npm run build",
"test": "aegir test -f ./dist/test/*.js -f ./dist/test/**/*.js",
"test:chrome": "npm run test -- -t browser --cov",
"test:chrome-webworker": "npm run test -- -t webworker",
"test:firefox": "npm run test -- -t browser -- --browser firefox",
"test:firefox-webworker": "npm run test -- -t webworker -- --browser firefox",
"test:node": "npm run test -- -t node --cov",
"test:electron-main": "npm run test -- -t electron-main",
"release": "semantic-release"
"dep-check": "aegir dep-check",
"build": "aegir build",
"test": "aegir test",
"test:chrome": "aegir test -t browser --cov",
"test:chrome-webworker": "aegir test -t webworker",
"test:firefox": "aegir test -t browser -- --browser firefox",
"test:firefox-webworker": "aegir test -t webworker -- --browser firefox",
"test:node": "aegir test -t node --cov",
"test:electron-main": "aegir test -t electron-main",
"release": "aegir release"
},
"dependencies": {
"@libp2p/interfaces": "^1.3.18",
"@libp2p/logger": "^1.1.2",
"@libp2p/interfaces": "^1.3.32",
"@libp2p/logger": "^1.1.4",
"@libp2p/peer-id": "^1.1.10",
"@multiformats/multiaddr": "^10.1.7",
"any-signal": "^3.0.1",
"err-code": "^3.0.1",
"multiformats": "^9.6.3",
"p-defer": "^4.0.0",
"p-queue": "^7.2.0"
},
"devDependencies": {
"@libp2p/peer-id": "^1.1.8",
"@libp2p/peer-id-factory": "^1.0.8",
"aegir": "^36.1.3",
"@libp2p/peer-id-factory": "^1.0.10",
"aegir": "^37.0.15",
"go-ipfs": "^0.12.0",
"ipfs-http-client": "^56.0.1",
"ipfsd-ctl": "^10.0.6",
"ipfs-http-client": "^56.0.3",
"ipfsd-ctl": "^11.0.0",
"it-all": "^1.0.6",
"it-drain": "^1.0.5",
"uint8arrays": "^3.0.0",
"wherearewe": "^1.0.0"
},
"browser": {
Expand Down
33 changes: 26 additions & 7 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,25 @@ import defer from 'p-defer'
import errCode from 'err-code'
import { Multiaddr } from '@multiformats/multiaddr'
import { peerIdFromString } from '@libp2p/peer-id'
import anySignal from 'any-signal'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { IPFSHTTPClient } from 'ipfs-http-client'
import type { HTTPClientExtraOptions } from 'ipfs-http-client/types/src/types'
import type { AbortOptions } from 'ipfs-core-types/src/utils'
import type { PeerRouting } from '@libp2p/interfaces/peer-routing'
import type { PeerInfo } from '@libp2p/interfaces/peer-info'
import type { Startable } from '@libp2p/interfaces/startable'

const log = logger('libp2p-delegated-peer-routing')

const DEFAULT_TIMEOUT = 30e3 // 30 second default
const CONCURRENT_HTTP_REQUESTS = 4

export class DelegatedPeerRouting implements PeerRouting {
export class DelegatedPeerRouting implements PeerRouting, Startable {
private readonly client: IPFSHTTPClient
private readonly httpQueue: PQueue
private started: boolean
private abortController: AbortController

/**
* Create a new DelegatedPeerRouting instance
Expand All @@ -30,6 +34,8 @@ export class DelegatedPeerRouting implements PeerRouting {
}

this.client = client
this.started = false
this.abortController = new AbortController()

// limit concurrency to avoid request flood in web browser
// https://github.com/libp2p/js-libp2p-delegated-content-routing/issues/12
Expand All @@ -46,12 +52,28 @@ export class DelegatedPeerRouting implements PeerRouting {
log(`enabled DelegatedPeerRouting via ${protocol}://${host}:${port}`)
}

isStarted () {
return this.started
}

start () {
this.started = true
}

stop () {
this.httpQueue.clear()
this.abortController.abort()
this.abortController = new AbortController()
this.started = false
}

/**
* Attempts to find the given peer
*/
async findPeer (id: PeerId, options: HTTPClientExtraOptions & AbortOptions = {}) {
log('findPeer starts: %p', id)
options.timeout = options.timeout ?? DEFAULT_TIMEOUT
options.signal = anySignal([this.abortController.signal].concat((options.signal != null) ? [options.signal] : []))

const onStart = defer()
const onFinish = defer()
Expand All @@ -64,9 +86,7 @@ export class DelegatedPeerRouting implements PeerRouting {
try {
await onStart.promise

for await (const event of this.client.dht.findPeer(id.toString(), {
timeout: options.timeout
})) {
for await (const event of this.client.dht.findPeer(id.toString(), options)) {
if (event.name === 'FINAL_PEER') {
const peerInfo: PeerInfo = {
id: peerIdFromString(event.peer.id),
Expand Down Expand Up @@ -97,6 +117,7 @@ export class DelegatedPeerRouting implements PeerRouting {

log('getClosestPeers starts:', keyStr)
options.timeout = options.timeout ?? DEFAULT_TIMEOUT
options.signal = anySignal([this.abortController.signal].concat((options.signal != null) ? [options.signal] : []))

const onStart = defer()
const onFinish = defer()
Expand All @@ -109,9 +130,7 @@ export class DelegatedPeerRouting implements PeerRouting {
try {
await onStart.promise

for await (const event of this.client.dht.query(keyStr, {
timeout: options.timeout
})) {
for await (const event of this.client.dht.query(keyStr, options)) {
if (event.name === 'PEER_RESPONSE') {
yield * event.closer.map(closer => ({
id: peerIdFromString(closer.id),
Expand Down
30 changes: 29 additions & 1 deletion test/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* eslint-env mocha */

import { expect } from 'aegir/utils/chai.js'
import { expect } from 'aegir/chai'
import { Controller, createFactory } from 'ipfsd-ctl'
import { isNode } from 'wherearewe'
import { create } from 'ipfs-http-client'
Expand All @@ -10,6 +10,9 @@ import goIpfs from 'go-ipfs'
import { peerIdFromString } from '@libp2p/peer-id'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import all from 'it-all'
import pDefer from 'p-defer'
import drain from 'it-drain'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import type { IDResult } from 'ipfs-core-types/src/root'

const factory = createFactory({
Expand Down Expand Up @@ -214,4 +217,29 @@ describe('DelegatedPeerRouting', function () {
})
})
})

describe('stop', () => {
it('should cancel in-flight requests when stopping', async () => {
const opts = delegatedNode.apiAddr.toOptions()
const router = new DelegatedPeerRouting(create({
protocol: 'http',
port: opts.port,
host: opts.host
}))

const deferred = pDefer<Error>()
const peer = uint8ArrayFromString('QmVv4Wz46JaZJeH5PMV4LGbRiiMKEmszPYY3g6fjGnVXBs')

void drain(router.getClosestPeers(peer))
.then(() => {
deferred.reject(new Error('Did not abort'))
})
.catch(err => {
deferred.resolve(err)
})

await router.stop()
await expect(deferred.promise).to.eventually.have.property('message').that.matches(/aborted/)
})
})
})