Skip to content

Commit

Permalink
fix: http blockbroker loads gateways from routing (#519)
Browse files Browse the repository at this point in the history
Use the new http gateway block broker to load gateways from the routing.

If we can find http gateways for a block we now use it, falling back to the default list of gateways if the routing is slow or it gives providers that fail to supply the block.

One change here is that unless you are using sessions, trustless gateways become disposable so we can't sort them by reliability any more.
  • Loading branch information
achingbrain authored Apr 26, 2024
1 parent 2d070b9 commit 6a62d1c
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 218 deletions.
43 changes: 34 additions & 9 deletions packages/block-brokers/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ import polka from 'polka'
const options = {
test: {
async before (options) {
const server = polka({
const goodGateway = polka({
port: 0,
host: '127.0.0.1'
})
server.use(cors())
server.all('/ipfs/bafkreiefnkxuhnq3536qo2i2w3tazvifek4mbbzb6zlq3ouhprjce5c3aq', (req, res) => {
goodGateway.use(cors())
goodGateway.all('/ipfs/bafkreiefnkxuhnq3536qo2i2w3tazvifek4mbbzb6zlq3ouhprjce5c3aq', (req, res) => {
res.writeHead(200, {
'content-type': 'application/octet-stream',
'content-length': 4
})
res.end(Uint8Array.from([0, 1, 2, 0]))
})
server.all('/ipfs/bafkqabtimvwgy3yk', async (req, res) => {
goodGateway.all('/ipfs/bafkqabtimvwgy3yk', async (req, res) => {
// delay the response
await new Promise((resolve) => setTimeout(resolve, 500))

Expand All @@ -29,18 +29,43 @@ const options = {
res.end(Uint8Array.from([104, 101, 108, 108, 111]))
})

await server.listen()
const { port } = server.server.address()
await goodGateway.listen()
const { port: goodGatewayPort } = goodGateway.server.address()

const badGateway = polka({
port: 0,
host: '127.0.0.1'
})
badGateway.use(cors())
badGateway.all('/ipfs/bafkreiefnkxuhnq3536qo2i2w3tazvifek4mbbzb6zlq3ouhprjce5c3aq', (req, res) => {
res.writeHead(200, {
'content-type': 'application/octet-stream',
'content-length': 4
})
// fails validation
res.end(Uint8Array.from([0, 1, 2, 1]))
})
badGateway.all('/ipfs/*', (req, res) => {
// fails
res.writeHead(500)
res.end()
})

await badGateway.listen()
const { port: badGatewayPort } = badGateway.server.address()

return {
server,
goodGateway,
badGateway,
env: {
TRUSTLESS_GATEWAY: `http://127.0.0.1:${port}`
TRUSTLESS_GATEWAY: `http://127.0.0.1:${goodGatewayPort}`,
BAD_TRUSTLESS_GATEWAY: `http://127.0.0.1:${badGatewayPort}`
}
}
},
async after (options, before) {
await before.server.server.close()
await before.goodGateway.server.close()
await before.badGateway.server.close()
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion packages/block-brokers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
"cors": "^2.8.5",
"polka": "^0.5.2",
"sinon": "^17.0.1",
"sinon-ts": "^2.0.0"
"sinon-ts": "^2.0.0",
"uint8arrays": "^5.0.3"
}
}
33 changes: 13 additions & 20 deletions packages/block-brokers/src/trustless-gateway/broker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createTrustlessGatewaySession } from './session.js'
import { TrustlessGateway } from './trustless-gateway.js'
import { DEFAULT_TRUSTLESS_GATEWAYS } from './index.js'
import { findHttpGatewayProviders } from './utils.js'
import { DEFAULT_ALLOW_INSECURE, DEFAULT_ALLOW_LOCAL } from './index.js'
import type { TrustlessGatewayBlockBrokerInit, TrustlessGatewayComponents, TrustlessGatewayGetBlockProgressEvents } from './index.js'
import type { Routing, BlockRetrievalOptions, BlockBroker, CreateSessionOptions } from '@helia/interface'
import type { ComponentLogger, Logger } from '@libp2p/interface'
Expand Down Expand Up @@ -29,55 +29,48 @@ export interface CreateTrustlessGatewaySessionOptions extends CreateSessionOptio
* for blocks.
*/
export class TrustlessGatewayBlockBroker implements BlockBroker<TrustlessGatewayGetBlockProgressEvents> {
private readonly components: TrustlessGatewayComponents
private readonly gateways: TrustlessGateway[]
private readonly allowInsecure: boolean
private readonly allowLocal: boolean
private readonly routing: Routing
private readonly log: Logger
private readonly logger: ComponentLogger

constructor (components: TrustlessGatewayComponents, init: TrustlessGatewayBlockBrokerInit = {}) {
this.components = components
this.log = components.logger.forComponent('helia:trustless-gateway-block-broker')
this.logger = components.logger
this.routing = components.routing
this.gateways = (init.gateways ?? DEFAULT_TRUSTLESS_GATEWAYS)
.map((gatewayOrUrl) => {
return new TrustlessGateway(gatewayOrUrl, components.logger)
})
}

addGateway (gatewayOrUrl: string): void {
this.gateways.push(new TrustlessGateway(gatewayOrUrl, this.components.logger))
this.allowInsecure = init.allowInsecure ?? DEFAULT_ALLOW_INSECURE
this.allowLocal = init.allowLocal ?? DEFAULT_ALLOW_LOCAL
}

async retrieve (cid: CID, options: BlockRetrievalOptions<TrustlessGatewayGetBlockProgressEvents> = {}): Promise<Uint8Array> {
// Loop through the gateways until we get a block or run out of gateways
// TODO: switch to toSorted when support is better
const sortedGateways = this.gateways.sort((a, b) => b.reliability() - a.reliability())
const aggregateErrors: Error[] = []

for (const gateway of sortedGateways) {
for await (const gateway of findHttpGatewayProviders(cid, this.routing, this.logger, this.allowInsecure, this.allowLocal, options)) {
this.log('getting block for %c from %s', cid, gateway.url)

try {
const block = await gateway.getRawBlock(cid, options.signal)
this.log.trace('got block for %c from %s', cid, gateway.url)

try {
await options.validateFn?.(block)
} catch (err) {
this.log.error('failed to validate block for %c from %s', cid, gateway.url, err)
gateway.incrementInvalidBlocks()

throw new Error(`Block for CID ${cid} from gateway ${gateway.url} failed validation`)
// try another gateway
continue
}

return block
} catch (err: unknown) {
this.log.error('failed to get block for %c from %s', cid, gateway.url, err)

if (err instanceof Error) {
aggregateErrors.push(err)
} else {
aggregateErrors.push(new Error(`Unable to fetch raw block for CID ${cid} from gateway ${gateway.url}`))
}

// if signal was aborted, exit the loop
if (options.signal?.aborted === true) {
this.log.trace('request aborted while fetching raw block for CID %c from gateway %s', cid, gateway.url)
Expand Down
28 changes: 17 additions & 11 deletions packages/block-brokers/src/trustless-gateway/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,28 @@ import type { Routing, BlockBroker } from '@helia/interface'
import type { ComponentLogger } from '@libp2p/interface'
import type { ProgressEvent } from 'progress-events'

export const DEFAULT_TRUSTLESS_GATEWAYS = [
// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://trustless-gateway.link',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://cloudflare-ipfs.com',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://4everland.io'
]
export const DEFAULT_ALLOW_INSECURE = false
export const DEFAULT_ALLOW_LOCAL = false

export type TrustlessGatewayGetBlockProgressEvents =
ProgressEvent<'trustless-gateway:get-block:fetch', URL>

export interface TrustlessGatewayBlockBrokerInit {
gateways?: Array<string | URL>
/**
* By default we will only connect to peers with HTTPS addresses, pass true
* to also connect to HTTP addresses.
*
* @default false
*/
allowInsecure?: boolean

/**
* By default we will only connect to peers with public or DNS addresses, pass
* true to also connect to private addresses.
*
* @default false
*/
allowLocal?: boolean
}

export interface TrustlessGatewayComponents {
Expand Down
47 changes: 4 additions & 43 deletions packages/block-brokers/src/trustless-gateway/session.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
import { AbstractSession } from '@helia/utils'
import { isPrivateIp } from '@libp2p/utils/private-ip'
import { DNS, HTTP, HTTPS } from '@multiformats/multiaddr-matcher'
import { multiaddrToUri } from '@multiformats/multiaddr-to-uri'
import { TrustlessGateway } from './trustless-gateway.js'
import { findHttpGatewayProviders } from './utils.js'
import { DEFAULT_ALLOW_INSECURE, DEFAULT_ALLOW_LOCAL } from './index.js'
import type { CreateTrustlessGatewaySessionOptions } from './broker.js'
import type { TrustlessGatewayGetBlockProgressEvents } from './index.js'
import type { TrustlessGateway } from './trustless-gateway.js'
import type { BlockRetrievalOptions, Routing } from '@helia/interface'
import type { ComponentLogger } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { AbortOptions } from 'interface-store'
import type { CID } from 'multiformats/cid'

const DEFAULT_ALLOW_INSECURE = false
const DEFAULT_ALLOW_LOCAL = false

export interface TrustlessGatewaySessionComponents {
logger: ComponentLogger
routing: Routing
Expand Down Expand Up @@ -47,23 +42,7 @@ class TrustlessGatewaySession extends AbstractSession<TrustlessGateway, Trustles
}

async * findNewProviders (cid: CID, options: AbortOptions = {}): AsyncGenerator<TrustlessGateway> {
for await (const provider of this.routing.findProviders(cid, options)) {
// require http(s) addresses
const httpAddresses = filterMultiaddrs(provider.multiaddrs, this.allowInsecure, this.allowLocal)

if (httpAddresses.length === 0) {
continue
}

// take first address?
// /ip4/x.x.x.x/tcp/31337/http
// /ip4/x.x.x.x/tcp/31337/https
// etc
const uri = multiaddrToUri(httpAddresses[0])

this.log('found http-gateway provider %p %s for cid %c', provider.id, uri, cid)
yield new TrustlessGateway(uri, this.logger)
}
yield * findHttpGatewayProviders(cid, this.routing, this.logger, this.allowInsecure, this.allowLocal, options)
}

toEvictionKey (provider: TrustlessGateway): Uint8Array | string {
Expand All @@ -75,24 +54,6 @@ class TrustlessGatewaySession extends AbstractSession<TrustlessGateway, Trustles
}
}

function filterMultiaddrs (multiaddrs: Multiaddr[], allowInsecure: boolean, allowLocal: boolean): Multiaddr[] {
return multiaddrs.filter(ma => {
if (HTTPS.matches(ma) || (allowInsecure && HTTP.matches(ma))) {
if (allowLocal) {
return true
}

if (DNS.matches(ma)) {
return true
}

return isPrivateIp(ma.toOptions().host) === false
}

return false
})
}

export function createTrustlessGatewaySession (components: TrustlessGatewaySessionComponents, init: CreateTrustlessGatewaySessionOptions): TrustlessGatewaySession {
return new TrustlessGatewaySession(components, init)
}
45 changes: 45 additions & 0 deletions packages/block-brokers/src/trustless-gateway/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { isPrivateIp } from '@libp2p/utils/private-ip'
import { DNS, HTTP, HTTPS } from '@multiformats/multiaddr-matcher'
import { multiaddrToUri } from '@multiformats/multiaddr-to-uri'
import { TrustlessGateway } from './trustless-gateway.js'
import type { Routing } from '@helia/interface'
import type { ComponentLogger } from '@libp2p/interface'
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr'
import type { CID } from 'multiformats/cid'

export function filterNonHTTPMultiaddrs (multiaddrs: Multiaddr[], allowInsecure: boolean, allowLocal: boolean): Multiaddr[] {
return multiaddrs.filter(ma => {
if (HTTPS.matches(ma) || (allowInsecure && HTTP.matches(ma))) {
if (allowLocal) {
return true
}

if (DNS.matches(ma)) {
return true
}

return isPrivateIp(ma.toOptions().host) === false
}

return false
})
}

export async function * findHttpGatewayProviders (cid: CID, routing: Routing, logger: ComponentLogger, allowInsecure: boolean, allowLocal: boolean, options?: AbortOptions): AsyncGenerator<TrustlessGateway> {
for await (const provider of routing.findProviders(cid, options)) {
// require http(s) addresses
const httpAddresses = filterNonHTTPMultiaddrs(provider.multiaddrs, allowInsecure, allowLocal)

if (httpAddresses.length === 0) {
continue
}

// take first address?
// /ip4/x.x.x.x/tcp/31337/http
// /ip4/x.x.x.x/tcp/31337/https
// etc
const uri = multiaddrToUri(httpAddresses[0])

yield new TrustlessGateway(uri, logger)
}
}
18 changes: 0 additions & 18 deletions packages/block-brokers/test/fixtures/create-block.ts

This file was deleted.

Loading

0 comments on commit 6a62d1c

Please sign in to comment.