From 5ff6998e6bc8b04e3407bc98c1924c55f632d9b7 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Mon, 16 Sep 2024 09:45:46 +0100 Subject: [PATCH] fix!: replace dag walkers with generic CID extraction from blocks (#447) Replace the codec-specific `.dagWalkers` property with a generic dag walker internally that uses the `Block` interface from the `multicodecs` module. - Removes the `.dagWalkers` property from the Helia interface - Adds `getCodec` and `getHasher` to retrieve codecs and hashers by code - Adds `loadCodec` and `loadHasher` options to allow sync or async loading of extra codecs/hashes in addition to staticlly configured ones in the `codecs`/`hashers` keys BREAKING CHANGE: the `.dagWalkers` property has been removed --- packages/block-brokers/src/bitswap.ts | 24 +-- packages/car/src/index.ts | 20 +- packages/car/test/fixtures/dag-walkers.ts | 27 --- packages/car/test/fixtures/get-codec.ts | 17 ++ packages/car/test/index.spec.ts | 16 +- packages/car/test/stream.spec.ts | 4 +- packages/interface/src/errors.ts | 18 ++ packages/interface/src/index.ts | 44 ++-- packages/utils/src/index.ts | 39 ++-- packages/utils/src/pins.ts | 21 +- packages/utils/src/utils/dag-walkers.ts | 198 ------------------ packages/utils/src/utils/default-hashers.ts | 18 -- packages/utils/src/utils/get-codec.ts | 47 +++++ packages/utils/src/utils/get-hasher.ts | 40 ++++ packages/utils/src/utils/is-promise.ts | 3 + packages/utils/src/utils/networked-storage.ts | 29 ++- packages/utils/test/block-broker.spec.ts | 6 +- packages/utils/test/fixtures/create-dag.ts | 70 +++++-- packages/utils/test/fixtures/dag-walker.ts | 14 -- .../utils/test/pins.depth-limited.spec.ts | 11 +- packages/utils/test/pins.recursive.spec.ts | 11 +- packages/utils/test/storage.spec.ts | 3 +- .../test/utils/networked-storage.spec.ts | 4 +- 23 files changed, 291 insertions(+), 393 deletions(-) delete mode 100644 packages/car/test/fixtures/dag-walkers.ts create mode 100644 packages/car/test/fixtures/get-codec.ts delete mode 100644 packages/utils/src/utils/dag-walkers.ts delete mode 100644 packages/utils/src/utils/default-hashers.ts create mode 100644 packages/utils/src/utils/get-codec.ts create mode 100644 packages/utils/src/utils/get-hasher.ts create mode 100644 packages/utils/src/utils/is-promise.ts delete mode 100644 packages/utils/test/fixtures/dag-walker.ts diff --git a/packages/block-brokers/src/bitswap.ts b/packages/block-brokers/src/bitswap.ts index 244c2cda1..03b35ef97 100644 --- a/packages/block-brokers/src/bitswap.ts +++ b/packages/block-brokers/src/bitswap.ts @@ -1,6 +1,6 @@ import { createBitswap } from '@helia/bitswap' import type { BitswapOptions, Bitswap, BitswapWantBlockProgressEvents, BitswapNotifyProgressEvents } from '@helia/bitswap' -import type { BlockAnnounceOptions, BlockBroker, BlockRetrievalOptions, CreateSessionOptions, Routing } from '@helia/interface' +import type { BlockAnnounceOptions, BlockBroker, BlockRetrievalOptions, CreateSessionOptions, Routing, HasherLoader } from '@helia/interface' import type { Libp2p, Startable, ComponentLogger } from '@libp2p/interface' import type { Blockstore } from 'interface-blockstore' import type { CID } from 'multiformats/cid' @@ -9,9 +9,9 @@ import type { MultihashHasher } from 'multiformats/hashes/interface' interface BitswapComponents { libp2p: Libp2p blockstore: Blockstore - hashers: Record routing: Routing logger: ComponentLogger + getHasher: HasherLoader } export interface BitswapInit extends BitswapOptions { @@ -23,26 +23,12 @@ class BitswapBlockBroker implements BlockBroker> => { - let hasher: MultihashHasher | undefined - - if (typeof codecOrName === 'string') { - hasher = Object.values(hashers).find(hasher => { - return hasher.name === codecOrName - }) - } else { - hasher = hashers[codecOrName] - } - - if (hasher != null) { - return hasher - } - - throw new Error(`Could not load hasher for code/name "${codecOrName}"`) + getHasher: async (codecOrName: number): Promise> => { + return getHasher(codecOrName) } }, ...init diff --git a/packages/car/src/index.ts b/packages/car/src/index.ts index f23e1375f..bc2b6bae6 100644 --- a/packages/car/src/index.ts +++ b/packages/car/src/index.ts @@ -61,9 +61,10 @@ import { CarWriter } from '@ipld/car' import drain from 'it-drain' import map from 'it-map' +import { createUnsafe } from 'multiformats/block' import defer from 'p-defer' import PQueue from 'p-queue' -import type { DAGWalker } from '@helia/interface' +import type { CodecLoader } from '@helia/interface' import type { GetBlockProgressEvents, PutManyBlocksProgressEvents } from '@helia/interface/blocks' import type { CarReader } from '@ipld/car' import type { AbortOptions } from '@libp2p/interface' @@ -74,7 +75,7 @@ import type { ProgressOptions } from 'progress-events' export interface CarComponents { blockstore: Blockstore - dagWalkers: Record + getCodec: CodecLoader } interface ExportCarOptions extends AbortOptions, ProgressOptions { @@ -235,18 +236,15 @@ class DefaultCar implements Car { * and update the pin count for them */ async #walkDag (cid: CID, queue: PQueue, withBlock: (cid: CID, block: Uint8Array) => Promise, options?: AbortOptions & ProgressOptions): Promise { - const dagWalker = this.components.dagWalkers[cid.code] + const codec = await this.components.getCodec(cid.code) + const bytes = await this.components.blockstore.get(cid, options) - if (dagWalker == null) { - throw new Error(`No dag walker found for cid codec ${cid.code}`) - } - - const block = await this.components.blockstore.get(cid, options) + await withBlock(cid, bytes) - await withBlock(cid, block) + const block = createUnsafe({ bytes, cid, codec }) // walk dag, ensure all blocks are present - for await (const cid of dagWalker.walk(block)) { + for await (const [,cid] of block.links()) { void queue.add(async () => { await this.#walkDag(cid, queue, withBlock, options) }) @@ -257,6 +255,6 @@ class DefaultCar implements Car { /** * Create a {@link Car} instance for use with {@link https://github.com/ipfs/helia Helia} */ -export function car (helia: { blockstore: Blockstore, dagWalkers: Record }, init: any = {}): Car { +export function car (helia: CarComponents, init: any = {}): Car { return new DefaultCar(helia, init) } diff --git a/packages/car/test/fixtures/dag-walkers.ts b/packages/car/test/fixtures/dag-walkers.ts deleted file mode 100644 index 74faf7dc6..000000000 --- a/packages/car/test/fixtures/dag-walkers.ts +++ /dev/null @@ -1,27 +0,0 @@ -import * as dagPb from '@ipld/dag-pb' -import * as raw from 'multiformats/codecs/raw' -import type { DAGWalker } from '@helia/interface' - -/** - * Dag walker for dag-pb CIDs - */ -const dagPbWalker: DAGWalker = { - codec: dagPb.code, - * walk (block) { - const node = dagPb.decode(block) - - yield * node.Links.map(l => l.Hash) - } -} - -const rawWalker: DAGWalker = { - codec: raw.code, - * walk () { - // no embedded CIDs in a raw block - } -} - -export const dagWalkers = { - [dagPb.code]: dagPbWalker, - [raw.code]: rawWalker -} diff --git a/packages/car/test/fixtures/get-codec.ts b/packages/car/test/fixtures/get-codec.ts new file mode 100644 index 000000000..2e4b2b15c --- /dev/null +++ b/packages/car/test/fixtures/get-codec.ts @@ -0,0 +1,17 @@ +/* eslint-env mocha */ + +import * as dagPb from '@ipld/dag-pb' +import * as raw from 'multiformats/codecs/raw' +import type { BlockCodec } from 'multiformats' + +export function getCodec (code: number): BlockCodec { + if (code === dagPb.code) { + return dagPb + } + + if (code === raw.code) { + return raw + } + + throw new Error(`Unknown codec ${code}`) +} diff --git a/packages/car/test/index.spec.ts b/packages/car/test/index.spec.ts index fabf871f0..4b7ad4209 100644 --- a/packages/car/test/index.spec.ts +++ b/packages/car/test/index.spec.ts @@ -10,8 +10,8 @@ import { MemoryDatastore } from 'datastore-core' import { fixedSize } from 'ipfs-unixfs-importer/chunker' import toBuffer from 'it-to-buffer' import { car, type Car } from '../src/index.js' -import { dagWalkers } from './fixtures/dag-walkers.js' import { largeFile, smallFile } from './fixtures/files.js' +import { getCodec } from './fixtures/get-codec.js' import { memoryCarWriter } from './fixtures/memory-car.js' import type { Blockstore } from 'interface-blockstore' @@ -23,14 +23,14 @@ describe('import/export car file', () => { beforeEach(async () => { blockstore = new MemoryBlockstore() - c = car({ blockstore, dagWalkers }) + c = car({ blockstore, getCodec }) u = unixfs({ blockstore }) }) it('exports and imports a car file', async () => { const otherBlockstore = new MemoryBlockstore() const otherUnixFS = unixfs({ blockstore: otherBlockstore }) - const otherCar = car({ blockstore: otherBlockstore, dagWalkers }) + const otherCar = car({ blockstore: otherBlockstore, getCodec }) const cid = await otherUnixFS.addBytes(smallFile) const writer = memoryCarWriter(cid) @@ -50,7 +50,7 @@ describe('import/export car file', () => { const otherBlockstore = new MemoryBlockstore() const otherUnixFS = unixfs({ blockstore: otherBlockstore }) - const otherCar = car({ blockstore: otherBlockstore, dagWalkers }) + const otherCar = car({ blockstore: otherBlockstore, getCodec }) const cid1 = await otherUnixFS.addBytes(fileData1) const cid2 = await otherUnixFS.addBytes(fileData2) const cid3 = await otherUnixFS.addBytes(fileData3) @@ -70,7 +70,7 @@ describe('import/export car file', () => { it('exports and imports a multiple block car file', async () => { const otherBlockstore = new MemoryBlockstore() const otherUnixFS = unixfs({ blockstore: otherBlockstore }) - const otherCar = car({ blockstore: otherBlockstore, dagWalkers }) + const otherCar = car({ blockstore: otherBlockstore, getCodec }) const cid = await otherUnixFS.addBytes(largeFile) const writer = memoryCarWriter(cid) @@ -90,7 +90,7 @@ describe('import/export car file', () => { const otherBlockstore = new MemoryBlockstore() const otherUnixFS = unixfs({ blockstore: otherBlockstore }) - const otherCar = car({ blockstore: otherBlockstore, dagWalkers }) + const otherCar = car({ blockstore: otherBlockstore, getCodec }) const cid1 = await otherUnixFS.addBytes(fileData1, { chunker: fixedSize({ chunkSize: 2 @@ -124,7 +124,7 @@ describe('import/export car file', () => { const otherUnixFS = unixfs({ blockstore: otherBlockstore }) const otherDatastore = new MemoryDatastore() const otherMFS = mfs({ blockstore: otherBlockstore, datastore: otherDatastore }) - const otherCar = car({ blockstore: otherBlockstore, dagWalkers }) + const otherCar = car({ blockstore: otherBlockstore, getCodec }) await otherMFS.mkdir('/testDups') await otherMFS.mkdir('/testDups/sub') @@ -151,7 +151,7 @@ describe('import/export car file', () => { const otherUnixFS = unixfs({ blockstore: otherBlockstore }) const otherDatastore = new MemoryDatastore() const otherMFS = mfs({ blockstore: otherBlockstore, datastore: otherDatastore }) - const otherCar = car({ blockstore: otherBlockstore, dagWalkers }) + const otherCar = car({ blockstore: otherBlockstore, getCodec }) await otherMFS.mkdir('/testDups') await otherMFS.mkdir('/testDups/sub') diff --git a/packages/car/test/stream.spec.ts b/packages/car/test/stream.spec.ts index 51563817e..822e34c7d 100644 --- a/packages/car/test/stream.spec.ts +++ b/packages/car/test/stream.spec.ts @@ -5,8 +5,8 @@ import { expect } from 'aegir/chai' import { MemoryBlockstore } from 'blockstore-core' import toBuffer from 'it-to-buffer' import { car, type Car } from '../src/index.js' -import { dagWalkers } from './fixtures/dag-walkers.js' import { smallFile } from './fixtures/files.js' +import { getCodec } from './fixtures/get-codec.js' import { memoryCarWriter } from './fixtures/memory-car.js' import type { Blockstore } from 'interface-blockstore' @@ -18,7 +18,7 @@ describe('stream car file', () => { beforeEach(async () => { blockstore = new MemoryBlockstore() - c = car({ blockstore, dagWalkers }) + c = car({ blockstore, getCodec }) u = unixfs({ blockstore }) }) diff --git a/packages/interface/src/errors.ts b/packages/interface/src/errors.ts index c302b06a0..cffe0546f 100644 --- a/packages/interface/src/errors.ts +++ b/packages/interface/src/errors.ts @@ -15,3 +15,21 @@ export class NoRoutersAvailableError extends Error { this.name = 'NoRoutersAvailableError' } } + +export class UnknownHashAlgorithmError extends Error { + static name = 'UnknownHashAlgorithmError' + + constructor (message = 'Unknown hash algorithm') { + super(message) + this.name = 'UnknownHashAlgorithmError' + } +} + +export class UnknownCodecError extends Error { + static name = 'UnknownCodecError' + + constructor (message = 'Unknown codec') { + super(message) + this.name = 'UnknownCodecError' + } +} diff --git a/packages/interface/src/index.ts b/packages/interface/src/index.ts index e915a1e4d..5e0b104a2 100644 --- a/packages/interface/src/index.ts +++ b/packages/interface/src/index.ts @@ -20,12 +20,21 @@ import type { Routing } from './routing.js' import type { AbortOptions, ComponentLogger, Metrics } from '@libp2p/interface' import type { DNS } from '@multiformats/dns' import type { Datastore } from 'interface-datastore' -import type { MultihashHasher } from 'multiformats' +import type { Await } from 'interface-store' +import type { BlockCodec, MultihashHasher } from 'multiformats' import type { CID } from 'multiformats/cid' import type { ProgressEvent, ProgressOptions } from 'progress-events' export type { Await, AwaitIterable } from 'interface-store' +export interface CodecLoader { + (code: Code): Await> +} + +export interface HasherLoader { + (code: number): Await +} + /** * The API presented by a Helia node */ @@ -56,18 +65,6 @@ export interface Helia { */ routing: Routing - /** - * DAGWalkers are codec-specific implementations that know how to yield all - * CIDs contained within a block that corresponds to that codec. - */ - dagWalkers: Record - - /** - * Hashers can be used to hash a piece of data with the specified hashing - * algorithm. - */ - hashers: Record - /** * The DNS property can be used to perform lookups of various record types and * will use a resolver appropriate to the current platform. @@ -94,6 +91,19 @@ export interface Helia { * Remove any unpinned blocks from the blockstore */ gc(options?: GCOptions): Promise + + /** + * Load an IPLD codec. Implementations may return a promise if, for example, + * the codec is being fetched from the network. + */ + getCodec: CodecLoader + + /** + * Hashers can be used to hash a piece of data with the specified hashing + * algorithm. Implementations may return a promise if, for example, + * the hasher is being fetched from the network. + */ + getHasher: HasherLoader } export type GcEvents = @@ -104,14 +114,6 @@ export interface GCOptions extends AbortOptions, ProgressOptions { } -/** - * DAGWalkers take a block and yield CIDs encoded in that block - */ -export interface DAGWalker { - codec: number - walk(block: Uint8Array): Generator -} - export * from './blocks.js' export * from './errors.js' export * from './pins.js' diff --git a/packages/utils/src/index.ts b/packages/utils/src/index.ts index 3b2f367f7..b48d57337 100644 --- a/packages/utils/src/index.ts +++ b/packages/utils/src/index.ts @@ -25,17 +25,18 @@ import { CustomProgressEvent } from 'progress-events' import { PinsImpl } from './pins.js' import { Routing as RoutingClass } from './routing.js' import { BlockStorage } from './storage.js' -import { defaultDagWalkers } from './utils/dag-walkers.js' import { assertDatastoreVersionIsCurrent } from './utils/datastore-version.js' -import { defaultHashers } from './utils/default-hashers.js' +import { getCodec } from './utils/get-codec.js' +import { getHasher } from './utils/get-hasher.js' import { NetworkedStorage } from './utils/networked-storage.js' -import type { DAGWalker, GCOptions, Helia as HeliaInterface, Routing } from '@helia/interface' +import type { Await, CodecLoader, GCOptions, HasherLoader, Helia as HeliaInterface, Routing } from '@helia/interface' import type { BlockBroker } from '@helia/interface/blocks' import type { Pins } from '@helia/interface/pins' import type { ComponentLogger, Logger, Metrics } from '@libp2p/interface' import type { DNS } from '@multiformats/dns' import type { Blockstore } from 'interface-blockstore' import type { Datastore } from 'interface-datastore' +import type { BlockCodec } from 'multiformats' import type { CID } from 'multiformats/cid' import type { MultihashDigest, MultihashHasher } from 'multiformats/hashes/interface' @@ -63,12 +64,24 @@ export interface HeliaInit { */ hashers?: MultihashHasher[] + /** + * An optional function that can load a MultihashHasher on demand. May return + * a promise. + */ + loadHasher?(code: number): Await + /** * In order to pin CIDs that correspond to a DAG, it's necessary to know * how to traverse that DAG. DAGWalkers take a block and yield any CIDs * encoded within that block. */ - dagWalkers?: DAGWalker[] + codecs?: Array> + + /** + * An optional function that can load a BlockCodec on demand. May return a + * promise. + */ + loadCodec?(code: number): Await> /** * A list of strategies used to fetch blocks when they are not present in @@ -142,13 +155,13 @@ export interface HeliaInit { interface Components { blockstore: Blockstore datastore: Datastore - hashers: Record - dagWalkers: Record logger: ComponentLogger blockBrokers: BlockBroker[] routing: Routing dns: DNS metrics?: Metrics + getCodec: CodecLoader + getHasher: HasherLoader } export class Helia implements HeliaInterface { @@ -157,8 +170,8 @@ export class Helia implements HeliaInterface { public pins: Pins public logger: ComponentLogger public routing: Routing - public dagWalkers: Record - public hashers: Record + public getCodec: CodecLoader + public getHasher: HasherLoader public dns: DNS public metrics?: Metrics private readonly log: Logger @@ -166,8 +179,8 @@ export class Helia implements HeliaInterface { constructor (init: HeliaInit) { this.logger = init.logger ?? defaultLogger() this.log = this.logger.forComponent('helia') - this.hashers = defaultHashers(init.hashers) - this.dagWalkers = defaultDagWalkers(init.dagWalkers) + this.getHasher = getHasher(init.hashers, init.loadHasher) + this.getCodec = getCodec(init.codecs, init.loadCodec) this.dns = init.dns ?? dns() this.metrics = init.metrics @@ -175,10 +188,10 @@ export class Helia implements HeliaInterface { const components: Components = { blockstore: init.blockstore, datastore: init.datastore, - hashers: this.hashers, - dagWalkers: this.dagWalkers, logger: this.logger, blockBrokers: [], + getHasher: this.getHasher, + getCodec: this.getCodec, dns: this.dns, metrics: this.metrics, ...(init.components ?? {}) @@ -207,7 +220,7 @@ export class Helia implements HeliaInterface { }) const networkedStorage = new NetworkedStorage(components) - this.pins = new PinsImpl(init.datastore, networkedStorage, this.dagWalkers) + this.pins = new PinsImpl(init.datastore, networkedStorage, this.getCodec) this.blockstore = new BlockStorage(networkedStorage, this.pins, { holdGcLock: init.holdGcLock ?? true }) diff --git a/packages/utils/src/pins.ts b/packages/utils/src/pins.ts index 76f01be9a..ec64083c6 100644 --- a/packages/utils/src/pins.ts +++ b/packages/utils/src/pins.ts @@ -2,10 +2,11 @@ import { Queue } from '@libp2p/utils/queue' import * as cborg from 'cborg' import { type Datastore, Key } from 'interface-datastore' import { base36 } from 'multiformats/bases/base36' +import { createUnsafe } from 'multiformats/block' import { CID, type Version } from 'multiformats/cid' import { CustomProgressEvent, type ProgressOptions } from 'progress-events' import { equals as uint8ArrayEquals } from 'uint8arrays/equals' -import type { DAGWalker } from '@helia/interface' +import type { CodecLoader } from '@helia/interface' import type { GetBlockProgressEvents } from '@helia/interface/blocks' import type { AddOptions, AddPinEvents, IsPinnedOptions, LsOptions, Pin, Pins, RmOptions } from '@helia/interface/pins' import type { AbortOptions } from '@libp2p/interface' @@ -59,12 +60,12 @@ function toDSKey (cid: CID): Key { export class PinsImpl implements Pins { private readonly datastore: Datastore private readonly blockstore: Blockstore - private readonly dagWalkers: Record + private readonly getCodec: CodecLoader - constructor (datastore: Datastore, blockstore: Blockstore, dagWalkers: Record) { + constructor (datastore: Datastore, blockstore: Blockstore, getCodec: CodecLoader) { this.datastore = datastore this.blockstore = blockstore - this.dagWalkers = dagWalkers + this.getCodec = getCodec } async * add (cid: CID, options: AddOptions = {}): AsyncGenerator { @@ -119,18 +120,14 @@ export class PinsImpl implements Pins { return } - const dagWalker = this.dagWalkers[cid.code] - - if (dagWalker == null) { - throw new Error(`No dag walker found for cid codec ${cid.code}`) - } - - const block = await this.blockstore.get(cid, options) + const codec = await this.getCodec(cid.code) + const bytes = await this.blockstore.get(cid, options) + const block = createUnsafe({ bytes, cid, codec }) yield cid // walk dag, ensure all blocks are present - for await (const cid of dagWalker.walk(block)) { + for await (const [,cid] of block.links()) { yield * await queue.add(async () => { return this.#walkDag(cid, queue, { ...options, diff --git a/packages/utils/src/utils/dag-walkers.ts b/packages/utils/src/utils/dag-walkers.ts deleted file mode 100644 index b6f726f5b..000000000 --- a/packages/utils/src/utils/dag-walkers.ts +++ /dev/null @@ -1,198 +0,0 @@ -/* eslint max-depth: ["error", 7] */ - -import * as dagCbor from '@ipld/dag-cbor' -import * as dagJson from '@ipld/dag-json' -import * as dagPb from '@ipld/dag-pb' -import * as cborg from 'cborg' -import { Type, Token } from 'cborg' -import * as cborgJson from 'cborg/json' -import { CID } from 'multiformats' -import { base64 } from 'multiformats/bases/base64' -import * as json from 'multiformats/codecs/json' -import * as raw from 'multiformats/codecs/raw' -import type { DAGWalker } from '@helia/interface' - -/** - * Dag walker for dag-pb CIDs - */ -export const dagPbWalker: DAGWalker = { - codec: dagPb.code, - * walk (block) { - const node = dagPb.decode(block) - - yield * node.Links.map(l => l.Hash) - } -} - -/** - * Dag walker for raw CIDs - */ -export const rawWalker: DAGWalker = { - codec: raw.code, - * walk () { - // no embedded CIDs in a raw block - } -} - -// https://github.com/ipfs/go-ipfs/issues/3570#issuecomment-273931692 -const CID_TAG = 42 - -/** - * Dag walker for dag-cbor CIDs. Does not actually use dag-cbor since - * all we are interested in is extracting the the CIDs from the block - * so we can just use cborg for that. - */ -export const dagCborWalker: DAGWalker = { - codec: dagCbor.code, - * walk (block) { - const cids: CID[] = [] - const tags: cborg.TagDecoder[] = [] - tags[CID_TAG] = (bytes) => { - if (bytes[0] !== 0) { - throw new Error('Invalid CID for CBOR tag 42; expected leading 0x00') - } - - const cid = CID.decode(bytes.subarray(1)) // ignore leading 0x00 - - cids.push(cid) - - return cid - } - - cborg.decode(block, { - tags - }) - - yield * cids - } -} - -/** - * Borrowed from @ipld/dag-json - */ -class DagJsonTokenizer extends cborgJson.Tokenizer { - private readonly tokenBuffer: cborg.Token[] - - constructor (data: Uint8Array, options?: cborg.DecodeOptions) { - super(data, options) - - this.tokenBuffer = [] - } - - done (): boolean { - return this.tokenBuffer.length === 0 && super.done() - } - - _next (): cborg.Token { - if (this.tokenBuffer.length > 0) { - // @ts-expect-error https://github.com/Microsoft/TypeScript/issues/30406 - return this.tokenBuffer.pop() - } - return super.next() - } - - /** - * Implements rules outlined in https://github.com/ipld/specs/pull/356 - */ - next (): cborg.Token { - const token = this._next() - - if (token.type === Type.map) { - const keyToken = this._next() - if (keyToken.type === Type.string && keyToken.value === '/') { - const valueToken = this._next() - if (valueToken.type === Type.string) { // *must* be a CID - const breakToken = this._next() // swallow the end-of-map token - if (breakToken.type !== Type.break) { - throw new Error('Invalid encoded CID form') - } - this.tokenBuffer.push(valueToken) // CID.parse will pick this up after our tag token - return new Token(Type.tag, 42, 0) - } - if (valueToken.type === Type.map) { - const innerKeyToken = this._next() - if (innerKeyToken.type === Type.string && innerKeyToken.value === 'bytes') { - const innerValueToken = this._next() - if (innerValueToken.type === Type.string) { // *must* be Bytes - for (let i = 0; i < 2; i++) { - const breakToken = this._next() // swallow two end-of-map tokens - if (breakToken.type !== Type.break) { - throw new Error('Invalid encoded Bytes form') - } - } - const bytes = base64.decode(`m${innerValueToken.value}`) - return new Token(Type.bytes, bytes, innerValueToken.value.length) - } - this.tokenBuffer.push(innerValueToken) // bail - } - this.tokenBuffer.push(innerKeyToken) // bail - } - this.tokenBuffer.push(valueToken) // bail - } - this.tokenBuffer.push(keyToken) // bail - } - return token - } -} - -/** - * Dag walker for dag-json CIDs. Does not actually use dag-json since - * all we are interested in is extracting the the CIDs from the block - * so we can just use cborg/json for that. - */ -export const dagJsonWalker: DAGWalker = { - codec: dagJson.code, - * walk (block) { - const cids: CID[] = [] - const tags: cborg.TagDecoder[] = [] - tags[CID_TAG] = (string) => { - const cid = CID.parse(string) - - cids.push(cid) - - return cid - } - - cborgJson.decode(block, { - tags, - tokenizer: new DagJsonTokenizer(block, { - tags, - allowIndefinite: true, - allowUndefined: true, - allowNaN: true, - allowInfinity: true, - allowBigInt: true, - strict: false, - rejectDuplicateMapKeys: false - }) - }) - - yield * cids - } -} - -/** - * Dag walker for json CIDs. JSON has no facility for linking to - * external blocks so the walker is a no-op. - */ -export const jsonWalker: DAGWalker = { - codec: json.code, - * walk () {} -} - -export function defaultDagWalkers (walkers: DAGWalker[] = []): Record { - const output: Record = {} - - ;[ - dagPbWalker, - rawWalker, - dagCborWalker, - dagJsonWalker, - jsonWalker, - ...walkers - ].forEach(dagWalker => { - output[dagWalker.codec] = dagWalker - }) - - return output -} diff --git a/packages/utils/src/utils/default-hashers.ts b/packages/utils/src/utils/default-hashers.ts deleted file mode 100644 index 1e5790994..000000000 --- a/packages/utils/src/utils/default-hashers.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { identity } from 'multiformats/hashes/identity' -import { sha256, sha512 } from 'multiformats/hashes/sha2' -import type { MultihashHasher } from 'multiformats/hashes/interface' - -export function defaultHashers (hashers: MultihashHasher[] = []): Record { - const output: Record = {} - - ;[ - sha256, - sha512, - identity, - ...hashers - ].forEach(hasher => { - output[hasher.code] = hasher - }) - - return output -} diff --git a/packages/utils/src/utils/get-codec.ts b/packages/utils/src/utils/get-codec.ts new file mode 100644 index 000000000..ede017351 --- /dev/null +++ b/packages/utils/src/utils/get-codec.ts @@ -0,0 +1,47 @@ +/* eslint max-depth: ["error", 7] */ + +import { UnknownCodecError } from '@helia/interface' +import * as dagCbor from '@ipld/dag-cbor' +import * as dagJson from '@ipld/dag-json' +import * as dagPb from '@ipld/dag-pb' +import * as json from 'multiformats/codecs/json' +import * as raw from 'multiformats/codecs/raw' +import { isPromise } from './is-promise.js' +import type { Await } from '@helia/interface' +import type { BlockCodec } from 'multiformats/codecs/interface' + +export function getCodec (initialCodecs: Array> = [], loadCodec?: (code: number) => Await>): (code: Code) => Await> { + const codecs: Record> = { + [dagPb.code]: dagPb, + [raw.code]: raw, + [dagCbor.code]: dagCbor, + [dagJson.code]: dagJson, + [json.code]: json + } + + initialCodecs.forEach(codec => { + codecs[codec.code] = codec + }) + + return async (code) => { + let codec = codecs[code] + + if (codec == null && loadCodec != null) { + const res = loadCodec(code) + + if (isPromise(res)) { + codec = await res + } else { + codec = res + } + + codecs[codec.code] = codec + } + + if (codec != null) { + return codec + } + + throw new UnknownCodecError(`Could not load codec for ${code}`) + } +} diff --git a/packages/utils/src/utils/get-hasher.ts b/packages/utils/src/utils/get-hasher.ts new file mode 100644 index 000000000..a6229eb71 --- /dev/null +++ b/packages/utils/src/utils/get-hasher.ts @@ -0,0 +1,40 @@ +import { UnknownHashAlgorithmError } from '@helia/interface' +import { identity } from 'multiformats/hashes/identity' +import { sha256, sha512 } from 'multiformats/hashes/sha2' +import { isPromise } from './is-promise.js' +import type { Await } from '@helia/interface' +import type { MultihashHasher } from 'multiformats/hashes/interface' + +export function getHasher (initialHashers: MultihashHasher[] = [], loadHasher?: (code: number) => Await): (code: number) => Await { + const hashers: Record = { + [sha256.code]: sha256, + [sha512.code]: sha512, + [identity.code]: identity + } + + initialHashers.forEach(hasher => { + hashers[hasher.code] = hasher + }) + + return async (code) => { + let hasher = hashers[code] + + if (hasher == null && loadHasher != null) { + const res = loadHasher(code) + + if (isPromise(res)) { + hasher = await res + } else { + hasher = res + } + + hashers[hasher.code] = hasher + } + + if (hasher != null) { + return hasher + } + + throw new UnknownHashAlgorithmError(`No hasher configured for multihash code 0x${code.toString(16)}, please configure one. You can look up which hash this is at https://github.com/multiformats/multicodec/blob/master/table.csv`) + } +} diff --git a/packages/utils/src/utils/is-promise.ts b/packages/utils/src/utils/is-promise.ts new file mode 100644 index 000000000..288a1b842 --- /dev/null +++ b/packages/utils/src/utils/is-promise.ts @@ -0,0 +1,3 @@ +export function isPromise (p?: any): p is Promise { + return p?.then != null +} diff --git a/packages/utils/src/utils/networked-storage.ts b/packages/utils/src/utils/networked-storage.ts index 465b6bd54..41da2f98e 100644 --- a/packages/utils/src/utils/networked-storage.ts +++ b/packages/utils/src/utils/networked-storage.ts @@ -5,12 +5,14 @@ import filter from 'it-filter' import forEach from 'it-foreach' import { CustomProgressEvent, type ProgressOptions } from 'progress-events' import { equals as uint8ArrayEquals } from 'uint8arrays/equals' +import { isPromise } from './is-promise.js' +import type { HasherLoader } from '@helia/interface' import type { BlockBroker, Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions, BlockRetrievalOptions, CreateSessionOptions, SessionBlockstore } from '@helia/interface/blocks' import type { AbortOptions, ComponentLogger, Logger, LoggerOptions, Startable } from '@libp2p/interface' import type { Blockstore } from 'interface-blockstore' import type { AwaitIterable } from 'interface-store' import type { CID } from 'multiformats/cid' -import type { MultihashHasher } from 'multiformats/hashes/interface' +import type { MultihashDigest, MultihashHasher } from 'multiformats/hashes/interface' export interface GetOptions extends AbortOptions { progress?(evt: Event): void @@ -20,12 +22,12 @@ export interface StorageComponents { blockstore: Blockstore logger: ComponentLogger blockBrokers: BlockBroker[] - hashers: Record + getHasher: HasherLoader } class Storage implements Blockstore { protected readonly child: Blockstore - protected readonly hashers: Record + protected readonly getHasher: HasherLoader protected log: Logger protected readonly logger: ComponentLogger protected readonly components: StorageComponents @@ -38,7 +40,7 @@ class Storage implements Blockstore { this.logger = components.logger this.components = components this.child = new IdentityBlockstore(components.blockstore) - this.hashers = components.hashers ?? {} + this.getHasher = components.getHasher } /** @@ -91,9 +93,11 @@ class Storage implements Blockstore { */ async get (cid: CID, options: GetOfflineOptions & AbortOptions & ProgressOptions = {}): Promise { if (options.offline !== true && !(await this.child.has(cid, options))) { + const hasher = await this.getHasher(cid.multihash.code) + // we do not have the block locally, get it from a block provider options.onProgress?.(new CustomProgressEvent('blocks:get:providers:get', cid)) - const block = await raceBlockRetrievers(cid, this.components.blockBrokers, this.hashers[cid.multihash.code], { + const block = await raceBlockRetrievers(cid, this.components.blockBrokers, hasher, { ...options, log: this.log }) @@ -122,9 +126,11 @@ class Storage implements Blockstore { yield * this.child.getMany(forEach(cids, async (cid): Promise => { if (options.offline !== true && !(await this.child.has(cid, options))) { + const hasher = await this.getHasher(cid.multihash.code) + // we do not have the block locally, get it from a block provider options.onProgress?.(new CustomProgressEvent('blocks:get-many:providers:get', cid)) - const block = await raceBlockRetrievers(cid, this.components.blockBrokers, this.hashers[cid.multihash.code], { + const block = await raceBlockRetrievers(cid, this.components.blockBrokers, hasher, { ...options, log: this.log }) @@ -219,7 +225,7 @@ export class NetworkedStorage extends Storage implements Blocks, Startable { return new SessionStorage({ blockstore: this.child, blockBrokers, - hashers: this.hashers, + getHasher: this.getHasher, logger: this.logger }, { root @@ -395,7 +401,14 @@ export const getCidBlockVerifierFunction = (cid: CID, hasher: MultihashHasher): return async (block: Uint8Array): Promise => { // verify block - const hash = await hasher.digest(block) + let hash: MultihashDigest + const res = hasher.digest(block) + + if (isPromise(res)) { + hash = await res + } else { + hash = res + } if (!uint8ArrayEquals(hash.digest, cid.multihash.digest)) { // if a hash mismatch occurs for a TrustlessGatewayBlockBroker, we should try another gateway diff --git a/packages/utils/test/block-broker.spec.ts b/packages/utils/test/block-broker.spec.ts index 6f75aa301..e513e0af5 100644 --- a/packages/utils/test/block-broker.spec.ts +++ b/packages/utils/test/block-broker.spec.ts @@ -9,7 +9,7 @@ import all from 'it-all' import * as raw from 'multiformats/codecs/raw' import Sinon from 'sinon' import { type StubbedInstance, stubInterface } from 'sinon-ts' -import { defaultHashers } from '../src/utils/default-hashers.js' +import { getHasher } from '../src/utils/get-hasher.js' import { NetworkedStorage } from '../src/utils/networked-storage.js' import { createBlock } from './fixtures/create-block.js' import type { BlockBroker } from '@helia/interface/blocks' @@ -40,7 +40,7 @@ describe('block-broker', () => { bitswapBlockBroker, gatewayBlockBroker ], - hashers: defaultHashers() + getHasher: getHasher() }) }) @@ -128,7 +128,7 @@ describe('block-broker', () => { blockBrokers: [ gatewayBlockBroker ], - hashers: defaultHashers() + getHasher: getHasher() }) gatewayBlockBroker.retrieve.withArgs(cid, Sinon.match.any).resolves(block) diff --git a/packages/utils/test/fixtures/create-dag.ts b/packages/utils/test/fixtures/create-dag.ts index 635d45441..8d9ee094a 100644 --- a/packages/utils/test/fixtures/create-dag.ts +++ b/packages/utils/test/fixtures/create-dag.ts @@ -1,4 +1,4 @@ -import { fromString as uint8arrayFromString } from 'uint8arrays/from-string' +import * as dagCbor from '@ipld/dag-cbor' import { createAndPutBlock } from './create-block.js' import type { Blockstore } from 'interface-blockstore' import type { CID } from 'multiformats/cid' @@ -58,33 +58,67 @@ export interface DAGNode { * } * ``` */ -export async function createDag (codec: number, blocks: Blockstore, depth: number, children: number): Promise> { +export async function createDag (blocks: Blockstore, depth: number, children: number): Promise> { const dag: Record = {} - const root = await createAndPutBlock(codec, uint8arrayFromString('level-0'), blocks) - await addChildren(root, 'level', 0, 0, depth, children, dag, codec, blocks) + interface Parent { + name: string + depth: number + links: Parent[] + } - return dag -} + async function descend (parent: Parent, level: number): Promise { + if (level === -1) { + return + } -async function addChildren (cid: CID, name: string, level: number, index: number, depth: number, children: number, dag: Record, codec: number, blocks: Blockstore): Promise { - if (depth === 0) { - return - } + for (let i = 0; i < children; i++) { + const node: Parent = { + name: `${parent.name}-${i}`, + depth: depth - level, + links: [] + } - name = `${name}-${index}` + parent.links.push(node) - dag[name] = { - level, - cid, + await descend(node, level - 1) + } + } + + const node: Parent = { + name: 'level-0', + depth: 0, links: [] } - for (let i = 0; i < children; i++) { - const subChild = await createAndPutBlock(codec, uint8arrayFromString(`${name}-${i}`), blocks) + await descend(node, depth - 1) + + async function write (parent: Parent): Promise { + const links: CID[] = [] - dag[name].links.push(subChild) + for (const child of parent.links) { + if (child.links.length > 0) { + await write(child) + } - await addChildren(subChild, name, level + 1, index + i, depth - 1, children, dag, codec, blocks) + links.push( + await createAndPutBlock(dagCbor.code, dagCbor.encode(child), blocks) + ) + } + + // @ts-expect-error changing type + parent.links = links + + const cid = await createAndPutBlock(dagCbor.code, dagCbor.encode(parent), blocks) + + dag[parent.name] = { + cid, + level: parent.depth, + links + } } + + await write(node) + + return dag } diff --git a/packages/utils/test/fixtures/dag-walker.ts b/packages/utils/test/fixtures/dag-walker.ts deleted file mode 100644 index 390902540..000000000 --- a/packages/utils/test/fixtures/dag-walker.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { toString as uint8ArrayToString } from 'uint8arrays/to-string' -import type { DAGNode } from './create-dag.js' -import type { DAGWalker } from '@helia/interface' - -export function dagWalker (codec: number, dag: Record): DAGWalker { - return { - codec, - * walk (block) { - const node = dag[uint8ArrayToString(block)] ?? { links: [] } - - yield * node.links - } - } -} diff --git a/packages/utils/test/pins.depth-limited.spec.ts b/packages/utils/test/pins.depth-limited.spec.ts index 2ac034c49..9321dafe4 100644 --- a/packages/utils/test/pins.depth-limited.spec.ts +++ b/packages/utils/test/pins.depth-limited.spec.ts @@ -4,7 +4,6 @@ import { MemoryBlockstore } from 'blockstore-core' import drain from 'it-drain' import { createDag, type DAGNode } from './fixtures/create-dag.js' import { createHelia } from './fixtures/create-helia.js' -import { dagWalker } from './fixtures/dag-walker.js' import type { Helia } from '@helia/interface' const MAX_DEPTH = 3 @@ -16,18 +15,12 @@ describe('pins (depth limited)', () => { beforeEach(async () => { const blockstore = new MemoryBlockstore() - // arbitrary CID codec value - const codec = 7 - // create a DAG, MAX_DEPTH levels deep with each level having three children - dag = await createDag(codec, blockstore, MAX_DEPTH, 3) + dag = await createDag(blockstore, MAX_DEPTH, 3) helia = await createHelia({ blockstore, - blockBrokers: [], - dagWalkers: [ - dagWalker(codec, dag) - ] + blockBrokers: [] }) }) diff --git a/packages/utils/test/pins.recursive.spec.ts b/packages/utils/test/pins.recursive.spec.ts index 712af13f5..a0fc3ef73 100644 --- a/packages/utils/test/pins.recursive.spec.ts +++ b/packages/utils/test/pins.recursive.spec.ts @@ -5,7 +5,6 @@ import all from 'it-all' import drain from 'it-drain' import { createDag, type DAGNode } from './fixtures/create-dag.js' import { createHelia } from './fixtures/create-helia.js' -import { dagWalker } from './fixtures/dag-walker.js' import type { Helia } from '@helia/interface' import type { AddPinEvents } from '@helia/interface/pins' @@ -16,18 +15,12 @@ describe('pins (recursive)', () => { beforeEach(async () => { const blockstore = new MemoryBlockstore() - // arbitrary CID codec value - const codec = 7 - // create a DAG, two levels deep with each level having three children - dag = await createDag(codec, blockstore, 2, 3) + dag = await createDag(blockstore, 2, 3) helia = await createHelia({ blockstore, - blockBrokers: [], - dagWalkers: [ - dagWalker(codec, dag) - ] + blockBrokers: [] }) }) diff --git a/packages/utils/test/storage.spec.ts b/packages/utils/test/storage.spec.ts index 43774a45d..98ccf6a4d 100644 --- a/packages/utils/test/storage.spec.ts +++ b/packages/utils/test/storage.spec.ts @@ -9,6 +9,7 @@ import drain from 'it-drain' import * as raw from 'multiformats/codecs/raw' import { PinsImpl } from '../src/pins.js' import { BlockStorage } from '../src/storage.js' +import { getCodec } from '../src/utils/get-codec.js' import { createBlock } from './fixtures/create-block.js' import type { Blocks, SessionBlockstore } from '@helia/interface' import type { Pins } from '@helia/interface/pins' @@ -36,7 +37,7 @@ describe('storage', () => { const datastore = new MemoryDatastore() blockstore = new MemoryBlocks() - pins = new PinsImpl(datastore, blockstore, []) + pins = new PinsImpl(datastore, blockstore, getCodec()) storage = new BlockStorage(blockstore, pins, { holdGcLock: true }) diff --git a/packages/utils/test/utils/networked-storage.spec.ts b/packages/utils/test/utils/networked-storage.spec.ts index 00ef07938..96e861615 100644 --- a/packages/utils/test/utils/networked-storage.spec.ts +++ b/packages/utils/test/utils/networked-storage.spec.ts @@ -13,7 +13,7 @@ import Sinon from 'sinon' import { type StubbedInstance, stubInterface } from 'sinon-ts' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' -import { defaultHashers } from '../../src/utils/default-hashers.js' +import { getHasher } from '../../src/utils/get-hasher.js' import { NetworkedStorage } from '../../src/utils/networked-storage.js' import { createBlock } from '../fixtures/create-block.js' import type { NetworkedStorageComponents } from '../../src/utils/networked-storage.js' @@ -42,7 +42,7 @@ describe('networked-storage', () => { blockBrokers: [ bitswap ], - hashers: defaultHashers() + getHasher: getHasher() } storage = new NetworkedStorage(components) })