Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: abort signals are respected #26

Merged
merged 9 commits into from
Mar 21, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { ComponentLogger } from '@libp2p/interface'
/**
* Converts an async iterator of Uint8Array bytes to a stream and returns the first chunk of bytes
*/
export async function getStreamFromAsyncIterable (iterator: AsyncIterable<Uint8Array>, path: string, logger: ComponentLogger, options?: Pick<VerifiedFetchInit, 'onProgress'>): Promise<{ stream: ReadableStream<Uint8Array>, firstChunk: Uint8Array }> {
export async function getStreamFromAsyncIterable (iterator: AsyncIterable<Uint8Array>, path: string, logger: ComponentLogger, options?: Pick<VerifiedFetchInit, 'onProgress' | 'signal'>): Promise<{ stream: ReadableStream<Uint8Array>, firstChunk: Uint8Array }> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another part of the bug fix... we need to be passing progressOptions and signal through. I just realized I wasn't checking for signal abort here... the iterator should handle this.. but i'll update just in case.

const log = logger.forComponent('helia:verified-fetch:get-stream-from-async-iterable')
const reader = iterator[Symbol.asyncIterator]()
const { value: firstChunk, done } = await reader.next()
Expand Down
6 changes: 3 additions & 3 deletions packages/verified-fetch/src/utils/parse-resource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ import { parseUrlString } from './parse-url-string.js'
import type { ParsedUrlStringResults } from './parse-url-string.js'
import type { Resource } from '../index.js'
import type { IPNS, IPNSRoutingEvents, ResolveDNSLinkProgressEvents, ResolveProgressEvents } from '@helia/ipns'
import type { ComponentLogger } from '@libp2p/interface'
import type { AbortOptions, ComponentLogger } from '@libp2p/interface'
import type { ProgressOptions } from 'progress-events'

export interface ParseResourceComponents {
ipns: IPNS
logger: ComponentLogger
}

export interface ParseResourceOptions extends ProgressOptions<ResolveProgressEvents | IPNSRoutingEvents | ResolveDNSLinkProgressEvents> {
export interface ParseResourceOptions extends ProgressOptions<ResolveProgressEvents | IPNSRoutingEvents | ResolveDNSLinkProgressEvents>, AbortOptions {

}
/**
Expand All @@ -21,7 +21,7 @@ export interface ParseResourceOptions extends ProgressOptions<ResolveProgressEve
*/
export async function parseResource (resource: Resource, { ipns, logger }: ParseResourceComponents, options?: ParseResourceOptions): Promise<ParsedUrlStringResults> {
if (typeof resource === 'string') {
return parseUrlString({ urlString: resource, ipns, logger }, { onProgress: options?.onProgress })
return parseUrlString({ urlString: resource, ipns, logger }, options)
}

const cid = CID.asCID(resource)
Expand Down
14 changes: 9 additions & 5 deletions packages/verified-fetch/src/utils/parse-url-string.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { peerIdFromString } from '@libp2p/peer-id'
import { CID } from 'multiformats/cid'
import { TLRU } from './tlru.js'
import type { ParseResourceOptions } from './parse-resource.js'
import type { RequestFormatShorthand } from '../types.js'
import type { IPNS, ResolveDNSLinkProgressEvents, ResolveResult } from '@helia/ipns'
import type { IPNS, ResolveResult } from '@helia/ipns'
import type { ComponentLogger } from '@libp2p/interface'
import type { ProgressOptions } from 'progress-events'

const ipnsCache = new TLRU<ResolveResult>(1000)

Expand All @@ -13,7 +13,7 @@ export interface ParseUrlStringInput {
ipns: IPNS
logger: ComponentLogger
}
export interface ParseUrlStringOptions extends ProgressOptions<ResolveDNSLinkProgressEvents> {
export interface ParseUrlStringOptions extends ParseResourceOptions {

}

Expand Down Expand Up @@ -110,7 +110,8 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin
let peerId = null
try {
peerId = peerIdFromString(cidOrPeerIdOrDnsLink)
resolveResult = await ipns.resolve(peerId, { onProgress: options?.onProgress })
options?.signal?.throwIfAborted()
resolveResult = await ipns.resolve(peerId, options)
cid = resolveResult?.cid
resolvedPath = resolveResult?.path
log.trace('resolved %s to %c', cidOrPeerIdOrDnsLink, cid)
Expand All @@ -134,7 +135,8 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin
log.trace('Attempting to resolve DNSLink for %s', decodedDnsLinkLabel)

try {
resolveResult = await ipns.resolveDNSLink(decodedDnsLinkLabel, { onProgress: options?.onProgress })
options?.signal?.throwIfAborted()
resolveResult = await ipns.resolveDNSLink(decodedDnsLinkLabel, options)
cid = resolveResult?.cid
resolvedPath = resolveResult?.path
log.trace('resolved %s to %c', decodedDnsLinkLabel, cid)
Expand All @@ -147,6 +149,8 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin
}
}

options?.signal?.throwIfAborted()
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved

if (cid == null) {
if (errors.length === 1) {
throw errors[0]
Expand Down
8 changes: 7 additions & 1 deletion packages/verified-fetch/src/utils/responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,13 @@
return response
}

export function badRequestResponse (url: string, body?: SupportedBodyTypes, init?: ResponseInit): Response {
/**
* if body is an Error, it will be converted to a string containing the error message.
*/
export function badRequestResponse (url: string, body?: SupportedBodyTypes | Error, init?: ResponseInit): Response {
if (body instanceof Error) {
body = body.message
}

Check warning on line 94 in packages/verified-fetch/src/utils/responses.ts

View check run for this annotation

Codecov / codecov/patch

packages/verified-fetch/src/utils/responses.ts#L93-L94

Added lines #L93 - L94 were not covered by tests
const response = new Response(body, {
...(init ?? {}),
status: 400,
Expand Down
50 changes: 44 additions & 6 deletions packages/verified-fetch/src/verified-fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@
let signal: AbortSignal | undefined
if (options?.signal === null) {
signal = undefined
} else {
signal = options?.signal
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main part that fixes the bug mentioned by #25

}

return {
...options,
signal
Expand Down Expand Up @@ -283,10 +286,18 @@
const pathDetails = await walkPath(this.helia.blockstore, `${cid.toString()}/${path}`, options)
ipfsRoots = pathDetails.ipfsRoots
terminalElement = pathDetails.terminalElement
} catch (err) {
} catch (err: any) {
this.log.error('error walking path %s', path, err)
/**
* TODO: do this better. We should be able to distinguish between an abortError and a regular error
* However, `helia:networked-storage:error` throws an AggregateError with the abort error inside it
*/
const abortError = err.errors?.find((e: Error) => e.message.includes('abort'))
if (abortError != null) {
return badRequestResponse(resource.toString(), abortError.message)
}
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved

return badGatewayResponse('Error walking path')
return badGatewayResponse(resource.toString(), 'Error walking path')
}

let resolvedCID = terminalElement?.cid ?? cid
Expand Down Expand Up @@ -346,7 +357,8 @@

try {
const { stream, firstChunk } = await getStreamFromAsyncIterable(asyncIter, path ?? '', this.helia.logger, {
onProgress: options?.onProgress
onProgress: options?.onProgress,
signal: options?.signal
})
byteRangeContext.setBody(stream)
// if not a valid range request, okRangeRequest will call okResponse
Expand All @@ -366,7 +378,7 @@
if (byteRangeContext.isRangeRequest && err.code === 'ERR_INVALID_PARAMS') {
return badRangeResponse(resource)
}
return badGatewayResponse('Unable to stream content')
return badGatewayResponse(resource.toString(), 'Unable to stream content')

Check warning on line 381 in packages/verified-fetch/src/verified-fetch.ts

View check run for this annotation

Codecov / codecov/patch

packages/verified-fetch/src/verified-fetch.ts#L381

Added line #L381 was not covered by tests
}
}

Expand Down Expand Up @@ -430,11 +442,35 @@
[identity.code]: this.handleRaw
}

/**
*
* TODO: Should we use 400, 408, 418, or 425, or throw and not even return a response?
Comment on lines +442 to +443
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should discuss this.

*/
private async abortHandler (opController: AbortController): Promise<void> {
this.log.error('signal aborted by user')
opController.abort('signal aborted by user')
await this.stop()
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* We're starting to get to the point where we need a queue or pipeline of
* operations to perform and a single place to handle errors.
*
* TODO: move operations called by fetch to a queue of operations where we can
* always exit early (and cleanly) if a given signal is aborted
*/
async fetch (resource: Resource, opts?: VerifiedFetchOptions): Promise<Response> {
this.log('fetch %s', resource)

const options = convertOptions(opts)

const opController = new AbortController()
if (options?.signal != null) {
options.signal.onabort = this.abortHandler.bind(this, opController)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we override any handlers they have on their signal.. they should wrap any signals they want to provide

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can change this later if we want.. but it would be good to be aware of this. @achingbrain

options.signal = opController.signal
}
this.log('options %o', options)
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved

options?.onProgress?.(new CustomProgressEvent<CIDDetail>('verified-fetch:request:start', { resource }))

// resolve the CID/path from the requested resource
Expand All @@ -446,10 +482,11 @@
cid = result.cid
path = result.path
query = result.query
} catch (err) {
options?.signal?.throwIfAborted()
} catch (err: any) {
this.log.error('error parsing resource %s', resource, err)

return badRequestResponse('Invalid resource')
return badRequestResponse(resource.toString(), err)
}

options?.onProgress?.(new CustomProgressEvent<CIDDetail>('verified-fetch:request:resolve', { cid, path }))
Expand Down Expand Up @@ -512,6 +549,7 @@
}
this.log.trace('calling handler "%s"', codecHandler.name)

options?.signal?.throwIfAborted()
response = await codecHandler.call(this, handlerArgs)
}

Expand Down
116 changes: 116 additions & 0 deletions packages/verified-fetch/test/abort-handling.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import { stop, type ComponentLogger, type Logger } from '@libp2p/interface'
import { prefixLogger, logger as libp2pLogger } from '@libp2p/logger'
import { expect } from 'aegir/chai'
import { CID } from 'multiformats/cid'
import pDefer from 'p-defer'
import Sinon from 'sinon'
import { stubInterface, type StubbedInstance } from 'sinon-ts'
import { VerifiedFetch } from '../src/verified-fetch.js'
import { createHelia } from './fixtures/create-offline-helia.js'
import type { BlockRetriever, Helia } from '@helia/interface'

async function makeAbortedRequest (verifiedFetch: VerifiedFetch, [resource, options = {}]: Parameters<typeof verifiedFetch.fetch>, promise: Promise<any>): Promise<Response> {
const controller = new AbortController()
const resultPromise = verifiedFetch.fetch(resource, {
...options,
signal: controller.signal
})

void promise.then(() => {
controller.abort()
})
return resultPromise
}

describe('abort-handling', () => {
let helia: Helia
let heliaStopSpy: Sinon.SinonSpy<[], Promise<void>>
const sandbox = Sinon.createSandbox()
let logger: ComponentLogger
let componentLoggers: Logger[] = []
const notPublishedCid = CID.parse('bafybeichqiz32cw5c3vdpvh2xtfgl42veqbsr6sw2g6c7ffz6atvh2vise')
let superSlowBlockRetriever: StubbedInstance<BlockRetriever>

let blockBrokerRetrieveCalled: ReturnType<typeof pDefer>
let dnsResolverCalled: ReturnType<typeof pDefer>

beforeEach(async () => {
blockBrokerRetrieveCalled = pDefer()
dnsResolverCalled = pDefer()
superSlowBlockRetriever = stubInterface<BlockRetriever>({
retrieve: Sinon.stub().callsFake(async (cid, options) => {
blockBrokerRetrieveCalled.resolve()

return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
reject(new Error('timeout while resolving'))

Check warning on line 46 in packages/verified-fetch/test/abort-handling.spec.ts

View check run for this annotation

Codecov / codecov/patch

packages/verified-fetch/test/abort-handling.spec.ts#L46

Added line #L46 was not covered by tests
}, 10000)

/**
* we need to emulate signal handling (blockBrokers should handle abort signals too)
* this is a simplified version of what the blockBroker should do, and the
* tests in this file verify how verified-fetch would handle the failure
*/
options.signal?.addEventListener('abort', () => {
clearTimeout(timeoutId)
reject(new Error('aborted'))
})
})
})
})

helia = await createHelia({ blockBrokers: [() => superSlowBlockRetriever] })
logger = prefixLogger('helia:verified-fetch-test:abort-handling')

sandbox.stub(logger, 'forComponent').callsFake((name) => {
const newLogger = libp2pLogger(`helia:verified-fetch-test:abort-handling:child-logger-${componentLoggers.length}:${name}`)
componentLoggers.push(sandbox.stub(newLogger))
return newLogger
})
helia.logger = logger
heliaStopSpy = sandbox.spy(helia, 'stop')
})

afterEach(async () => {
await stop(helia)
componentLoggers = []
sandbox.restore()
})

it('should abort a request before dns resolution', async function () {
this.timeout(1000)

const customDnsResolver = Sinon.stub().resolves(new Promise((resolve, reject) => {
dnsResolverCalled.resolve()
setTimeout(() => { reject(new Error('timeout while resolving')) }, 5000)
}))
const verifiedFetch = new VerifiedFetch({
helia
}, {
dnsResolvers: [customDnsResolver]
})
const abortedResult = await makeAbortedRequest(verifiedFetch, ['ipns://this-doesnt-matter'], Promise.resolve())

expect(superSlowBlockRetriever.retrieve.called).to.be.false()
expect(abortedResult).to.be.ok()
expect(abortedResult.status).to.equal(400)
expect(abortedResult.statusText).to.equal('Bad Request')
await expect(abortedResult.text()).to.eventually.contain('aborted')
expect(heliaStopSpy.calledOnce).to.be.true()
})

it('should abort a request while looking for cid', async function () {
const verifiedFetch = new VerifiedFetch({
helia
})

const abortedResult = await makeAbortedRequest(verifiedFetch, [notPublishedCid, { headers: { accept: 'application/octet-stream' } }], blockBrokerRetrieveCalled.promise)

expect(superSlowBlockRetriever.retrieve.called).to.be.true()
expect(abortedResult).to.be.ok()
expect(abortedResult.status).to.equal(400)
expect(abortedResult.statusText).to.equal('Bad Request')
await expect(abortedResult.text()).to.eventually.contain('aborted')
expect(heliaStopSpy.calledOnce).to.be.true()
})
})
2 changes: 0 additions & 2 deletions packages/verified-fetch/test/custom-dns-resolvers.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ describe('custom dns-resolvers', () => {

expect(customDnsResolver.callCount).to.equal(1)
expect(customDnsResolver.getCall(0).args).to.deep.equal(['_dnslink.some-non-cached-domain.com', {
onProgress: undefined,
types: [
RecordType.TXT
]
Expand Down Expand Up @@ -68,7 +67,6 @@ describe('custom dns-resolvers', () => {

expect(customDnsResolver.callCount).to.equal(1)
expect(customDnsResolver.getCall(0).args).to.deep.equal(['_dnslink.some-non-cached-domain2.com', {
onProgress: undefined,
types: [
RecordType.TXT
]
Expand Down
13 changes: 13 additions & 0 deletions packages/verified-fetch/test/fixtures/dns-answer-fake.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { stubInterface } from 'sinon-ts'
import type { DNSResponse } from '@multiformats/dns'

export function answerFake (data: string, TTL: number, name: string, type: number): DNSResponse {
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved
const fake = stubInterface<DNSResponse>()
fake.Answer = [{
data,
TTL,
name,
type
}]
return fake
}
Loading