Skip to content

Commit

Permalink
fix!: replace dag walkers with generic CID extraction from blocks (#447)
Browse files Browse the repository at this point in the history
Replace the codec-specific `.dagWalkers` property with a generic dag walker internally that uses the `Block` interface from the `multicodecs` module.

- Removes the `.dagWalkers` property from the Helia interface
- Adds `getCodec` and `getHasher` to retrieve codecs and hashers by code
- Adds `loadCodec` and `loadHasher` options to allow sync or async loading of extra codecs/hashes in addition to staticlly configured ones in the `codecs`/`hashers` keys

BREAKING CHANGE: the `.dagWalkers` property has been removed
  • Loading branch information
achingbrain authored Sep 16, 2024
1 parent 8805202 commit 5ff6998
Show file tree
Hide file tree
Showing 23 changed files with 291 additions and 393 deletions.
24 changes: 5 additions & 19 deletions packages/block-brokers/src/bitswap.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createBitswap } from '@helia/bitswap'
import type { BitswapOptions, Bitswap, BitswapWantBlockProgressEvents, BitswapNotifyProgressEvents } from '@helia/bitswap'
import type { BlockAnnounceOptions, BlockBroker, BlockRetrievalOptions, CreateSessionOptions, Routing } from '@helia/interface'
import type { BlockAnnounceOptions, BlockBroker, BlockRetrievalOptions, CreateSessionOptions, Routing, HasherLoader } from '@helia/interface'
import type { Libp2p, Startable, ComponentLogger } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { CID } from 'multiformats/cid'
Expand All @@ -9,9 +9,9 @@ import type { MultihashHasher } from 'multiformats/hashes/interface'
interface BitswapComponents {
libp2p: Libp2p
blockstore: Blockstore
hashers: Record<string, MultihashHasher>
routing: Routing
logger: ComponentLogger
getHasher: HasherLoader
}

export interface BitswapInit extends BitswapOptions {
Expand All @@ -23,26 +23,12 @@ class BitswapBlockBroker implements BlockBroker<BitswapWantBlockProgressEvents,
private started: boolean

constructor (components: BitswapComponents, init: BitswapInit = {}) {
const { hashers } = components
const { getHasher } = components

this.bitswap = createBitswap(components, {
hashLoader: {
getHasher: async (codecOrName: string | number): Promise<MultihashHasher<number>> => {
let hasher: MultihashHasher | undefined

if (typeof codecOrName === 'string') {
hasher = Object.values(hashers).find(hasher => {
return hasher.name === codecOrName
})
} else {
hasher = hashers[codecOrName]
}

if (hasher != null) {
return hasher
}

throw new Error(`Could not load hasher for code/name "${codecOrName}"`)
getHasher: async (codecOrName: number): Promise<MultihashHasher<number>> => {
return getHasher(codecOrName)
}
},
...init
Expand Down
20 changes: 9 additions & 11 deletions packages/car/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@
import { CarWriter } from '@ipld/car'
import drain from 'it-drain'
import map from 'it-map'
import { createUnsafe } from 'multiformats/block'
import defer from 'p-defer'
import PQueue from 'p-queue'
import type { DAGWalker } from '@helia/interface'
import type { CodecLoader } from '@helia/interface'
import type { GetBlockProgressEvents, PutManyBlocksProgressEvents } from '@helia/interface/blocks'
import type { CarReader } from '@ipld/car'
import type { AbortOptions } from '@libp2p/interface'
Expand All @@ -74,7 +75,7 @@ import type { ProgressOptions } from 'progress-events'

export interface CarComponents {
blockstore: Blockstore
dagWalkers: Record<number, DAGWalker>
getCodec: CodecLoader
}

interface ExportCarOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents> {
Expand Down Expand Up @@ -235,18 +236,15 @@ class DefaultCar implements Car {
* and update the pin count for them
*/
async #walkDag (cid: CID, queue: PQueue, withBlock: (cid: CID, block: Uint8Array) => Promise<void>, options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): Promise<void> {
const dagWalker = this.components.dagWalkers[cid.code]
const codec = await this.components.getCodec(cid.code)
const bytes = await this.components.blockstore.get(cid, options)

if (dagWalker == null) {
throw new Error(`No dag walker found for cid codec ${cid.code}`)
}

const block = await this.components.blockstore.get(cid, options)
await withBlock(cid, bytes)

await withBlock(cid, block)
const block = createUnsafe({ bytes, cid, codec })

// walk dag, ensure all blocks are present
for await (const cid of dagWalker.walk(block)) {
for await (const [,cid] of block.links()) {
void queue.add(async () => {
await this.#walkDag(cid, queue, withBlock, options)
})
Expand All @@ -257,6 +255,6 @@ class DefaultCar implements Car {
/**
* Create a {@link Car} instance for use with {@link https://github.com/ipfs/helia Helia}
*/
export function car (helia: { blockstore: Blockstore, dagWalkers: Record<number, DAGWalker> }, init: any = {}): Car {
export function car (helia: CarComponents, init: any = {}): Car {
return new DefaultCar(helia, init)
}
27 changes: 0 additions & 27 deletions packages/car/test/fixtures/dag-walkers.ts

This file was deleted.

17 changes: 17 additions & 0 deletions packages/car/test/fixtures/get-codec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/* eslint-env mocha */

import * as dagPb from '@ipld/dag-pb'
import * as raw from 'multiformats/codecs/raw'
import type { BlockCodec } from 'multiformats'

export function getCodec (code: number): BlockCodec<any, any> {
if (code === dagPb.code) {
return dagPb
}

if (code === raw.code) {
return raw
}

throw new Error(`Unknown codec ${code}`)
}
16 changes: 8 additions & 8 deletions packages/car/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import { MemoryDatastore } from 'datastore-core'
import { fixedSize } from 'ipfs-unixfs-importer/chunker'
import toBuffer from 'it-to-buffer'
import { car, type Car } from '../src/index.js'
import { dagWalkers } from './fixtures/dag-walkers.js'
import { largeFile, smallFile } from './fixtures/files.js'
import { getCodec } from './fixtures/get-codec.js'
import { memoryCarWriter } from './fixtures/memory-car.js'
import type { Blockstore } from 'interface-blockstore'

Expand All @@ -23,14 +23,14 @@ describe('import/export car file', () => {
beforeEach(async () => {
blockstore = new MemoryBlockstore()

c = car({ blockstore, dagWalkers })
c = car({ blockstore, getCodec })
u = unixfs({ blockstore })
})

it('exports and imports a car file', async () => {
const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const otherCar = car({ blockstore: otherBlockstore, getCodec })
const cid = await otherUnixFS.addBytes(smallFile)

const writer = memoryCarWriter(cid)
Expand All @@ -50,7 +50,7 @@ describe('import/export car file', () => {

const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const otherCar = car({ blockstore: otherBlockstore, getCodec })
const cid1 = await otherUnixFS.addBytes(fileData1)
const cid2 = await otherUnixFS.addBytes(fileData2)
const cid3 = await otherUnixFS.addBytes(fileData3)
Expand All @@ -70,7 +70,7 @@ describe('import/export car file', () => {
it('exports and imports a multiple block car file', async () => {
const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const otherCar = car({ blockstore: otherBlockstore, getCodec })
const cid = await otherUnixFS.addBytes(largeFile)

const writer = memoryCarWriter(cid)
Expand All @@ -90,7 +90,7 @@ describe('import/export car file', () => {

const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const otherCar = car({ blockstore: otherBlockstore, getCodec })
const cid1 = await otherUnixFS.addBytes(fileData1, {
chunker: fixedSize({
chunkSize: 2
Expand Down Expand Up @@ -124,7 +124,7 @@ describe('import/export car file', () => {
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherDatastore = new MemoryDatastore()
const otherMFS = mfs({ blockstore: otherBlockstore, datastore: otherDatastore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const otherCar = car({ blockstore: otherBlockstore, getCodec })

await otherMFS.mkdir('/testDups')
await otherMFS.mkdir('/testDups/sub')
Expand All @@ -151,7 +151,7 @@ describe('import/export car file', () => {
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherDatastore = new MemoryDatastore()
const otherMFS = mfs({ blockstore: otherBlockstore, datastore: otherDatastore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const otherCar = car({ blockstore: otherBlockstore, getCodec })

await otherMFS.mkdir('/testDups')
await otherMFS.mkdir('/testDups/sub')
Expand Down
4 changes: 2 additions & 2 deletions packages/car/test/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import { expect } from 'aegir/chai'
import { MemoryBlockstore } from 'blockstore-core'
import toBuffer from 'it-to-buffer'
import { car, type Car } from '../src/index.js'
import { dagWalkers } from './fixtures/dag-walkers.js'
import { smallFile } from './fixtures/files.js'
import { getCodec } from './fixtures/get-codec.js'
import { memoryCarWriter } from './fixtures/memory-car.js'
import type { Blockstore } from 'interface-blockstore'

Expand All @@ -18,7 +18,7 @@ describe('stream car file', () => {
beforeEach(async () => {
blockstore = new MemoryBlockstore()

c = car({ blockstore, dagWalkers })
c = car({ blockstore, getCodec })
u = unixfs({ blockstore })
})

Expand Down
18 changes: 18 additions & 0 deletions packages/interface/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,21 @@ export class NoRoutersAvailableError extends Error {
this.name = 'NoRoutersAvailableError'
}
}

export class UnknownHashAlgorithmError extends Error {
static name = 'UnknownHashAlgorithmError'

constructor (message = 'Unknown hash algorithm') {
super(message)
this.name = 'UnknownHashAlgorithmError'
}
}

export class UnknownCodecError extends Error {
static name = 'UnknownCodecError'

constructor (message = 'Unknown codec') {
super(message)
this.name = 'UnknownCodecError'
}
}
44 changes: 23 additions & 21 deletions packages/interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,21 @@ import type { Routing } from './routing.js'
import type { AbortOptions, ComponentLogger, Metrics } from '@libp2p/interface'
import type { DNS } from '@multiformats/dns'
import type { Datastore } from 'interface-datastore'
import type { MultihashHasher } from 'multiformats'
import type { Await } from 'interface-store'
import type { BlockCodec, MultihashHasher } from 'multiformats'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

export type { Await, AwaitIterable } from 'interface-store'

export interface CodecLoader {
<T = any, Code extends number = any>(code: Code): Await<BlockCodec<Code, T>>
}

export interface HasherLoader {
(code: number): Await<MultihashHasher>
}

/**
* The API presented by a Helia node
*/
Expand Down Expand Up @@ -56,18 +65,6 @@ export interface Helia {
*/
routing: Routing

/**
* DAGWalkers are codec-specific implementations that know how to yield all
* CIDs contained within a block that corresponds to that codec.
*/
dagWalkers: Record<number, DAGWalker>

/**
* Hashers can be used to hash a piece of data with the specified hashing
* algorithm.
*/
hashers: Record<number, MultihashHasher>

/**
* The DNS property can be used to perform lookups of various record types and
* will use a resolver appropriate to the current platform.
Expand All @@ -94,6 +91,19 @@ export interface Helia {
* Remove any unpinned blocks from the blockstore
*/
gc(options?: GCOptions): Promise<void>

/**
* Load an IPLD codec. Implementations may return a promise if, for example,
* the codec is being fetched from the network.
*/
getCodec: CodecLoader

/**
* Hashers can be used to hash a piece of data with the specified hashing
* algorithm. Implementations may return a promise if, for example,
* the hasher is being fetched from the network.
*/
getHasher: HasherLoader
}

export type GcEvents =
Expand All @@ -104,14 +114,6 @@ export interface GCOptions extends AbortOptions, ProgressOptions<GcEvents> {

}

/**
* DAGWalkers take a block and yield CIDs encoded in that block
*/
export interface DAGWalker {
codec: number
walk(block: Uint8Array): Generator<CID, void, undefined>
}

export * from './blocks.js'
export * from './errors.js'
export * from './pins.js'
Expand Down
Loading

0 comments on commit 5ff6998

Please sign in to comment.