Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stream car file bytes from @helia/car #444

Merged
merged 8 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions packages/car/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,14 @@
* ```
*/

import { CarWriter } from '@ipld/car'
import drain from 'it-drain'
import map from 'it-map'
import defer from 'p-defer'
import PQueue from 'p-queue'
import type { DAGWalker } from '@helia/interface'
import type { GetBlockProgressEvents, PutManyBlocksProgressEvents } from '@helia/interface/blocks'
import type { CarReader, CarWriter } from '@ipld/car'
import type { CarReader } from '@ipld/car'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Blockstore } from 'interface-blockstore'
import type { CID } from 'multiformats/cid'
Expand Down Expand Up @@ -129,6 +130,28 @@
* ```
*/
export(root: CID | CID[], writer: Pick<CarWriter, 'put' | 'close'>, options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): Promise<void>

/**
* Returns an AsyncGenerator that yields CAR file bytes.
*
* @example
*
* ```typescript
* import { createHelia } from 'helia'
* import { car } from '@helia/car
* import { CID } from 'multiformats/cid'
*
* const helia = await createHelia()
* const cid = CID.parse('QmFoo...')
*
* const c = car(helia)
*
* for (const buf of c.stream(cid)) {
* // store or send `buf` somewhere
* }
* ```
*/
stream(root: CID | CID[], options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): AsyncGenerator<Uint8Array>
}

const DAG_WALK_QUEUE_CONCURRENCY = 1
Expand All @@ -148,7 +171,7 @@
}

async export (root: CID | CID[], writer: Pick<CarWriter, 'put' | 'close'>, options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): Promise<void> {
const deferred = defer()
const deferred = defer<Error | undefined>()
const roots = Array.isArray(root) ? root : [root]

// use a queue to walk the DAG instead of recursion so we can traverse very large DAGs
Expand All @@ -159,6 +182,7 @@
deferred.resolve()
})
queue.on('error', (err) => {
queue.clear()

Check warning on line 185 in packages/car/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/car/src/index.ts#L185

Added line #L185 was not covered by tests
deferred.reject(err)
})

Expand All @@ -168,6 +192,7 @@
await writer.put({ cid, bytes })
}, options)
})
.catch(() => {})
}

// wait for the writer to end
Expand All @@ -178,6 +203,19 @@
}
}

async * stream (root: CID | CID[], options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): AsyncGenerator<Uint8Array, void, undefined> {
const { writer, out } = CarWriter.create(root)

// has to be done async so we write to `writer` and read from `out` at the
// same time
this.export(root, writer, options)
.catch(() => {})

for await (const buf of out) {
yield buf
}
}

/**
* Walk the DAG behind the passed CID, ensure all blocks are present in the blockstore
* and update the pin count for them
Expand Down
27 changes: 27 additions & 0 deletions packages/car/test/fixtures/dag-walkers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import * as dagPb from '@ipld/dag-pb'
import * as raw from 'multiformats/codecs/raw'
import type { DAGWalker } from '@helia/interface'

/**
* Dag walker for dag-pb CIDs
*/
const dagPbWalker: DAGWalker = {
codec: dagPb.code,
* walk (block) {
const node = dagPb.decode(block)

yield * node.Links.map(l => l.Hash)
}
}

const rawWalker: DAGWalker = {
codec: raw.code,
* walk () {
// no embedded CIDs in a raw block
}
}

export const dagWalkers = {
[dagPb.code]: dagPbWalker,
[raw.code]: rawWalker
}
30 changes: 2 additions & 28 deletions packages/car/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,23 @@

import { type UnixFS, unixfs } from '@helia/unixfs'
import { CarReader } from '@ipld/car'
import * as dagPb from '@ipld/dag-pb'
import { expect } from 'aegir/chai'
import { MemoryBlockstore } from 'blockstore-core'
import { fixedSize } from 'ipfs-unixfs-importer/chunker'
import toBuffer from 'it-to-buffer'
import * as raw from 'multiformats/codecs/raw'
import { car, type Car } from '../src/index.js'
import { dagWalkers } from './fixtures/dag-walkers.js'
import { largeFile, smallFile } from './fixtures/files.js'
import { memoryCarWriter } from './fixtures/memory-car.js'
import type { DAGWalker } from '@helia/interface'
import type { Blockstore } from 'interface-blockstore'

/**
* Dag walker for dag-pb CIDs
*/
const dagPbWalker: DAGWalker = {
codec: dagPb.code,
* walk (block) {
const node = dagPb.decode(block)

yield * node.Links.map(l => l.Hash)
}
}

const rawWalker: DAGWalker = {
codec: raw.code,
* walk () {
// no embedded CIDs in a raw block
}
}

describe('import', () => {
describe('import/export car file', () => {
let blockstore: Blockstore
let c: Car
let u: UnixFS
let dagWalkers: Record<number, DAGWalker>

beforeEach(async () => {
blockstore = new MemoryBlockstore()
dagWalkers = {
[dagPb.code]: dagPbWalker,
[raw.code]: rawWalker
}

c = car({ blockstore, dagWalkers })
u = unixfs({ blockstore })
Expand Down
37 changes: 37 additions & 0 deletions packages/car/test/stream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/* eslint-env mocha */

import { type UnixFS, unixfs } from '@helia/unixfs'
import { expect } from 'aegir/chai'
import { MemoryBlockstore } from 'blockstore-core'
import toBuffer from 'it-to-buffer'
import { car, type Car } from '../src/index.js'
import { dagWalkers } from './fixtures/dag-walkers.js'
import { smallFile } from './fixtures/files.js'
import { memoryCarWriter } from './fixtures/memory-car.js'
import type { Blockstore } from 'interface-blockstore'

describe('stream car file', () => {
let blockstore: Blockstore
let c: Car
let u: UnixFS

beforeEach(async () => {
blockstore = new MemoryBlockstore()

c = car({ blockstore, dagWalkers })
u = unixfs({ blockstore })
})

it('streams car file', async () => {
const cid = await u.addBytes(smallFile)

const writer = memoryCarWriter(cid)
await c.export(cid, writer)

const bytes = await writer.bytes()

const streamed = await toBuffer(c.stream(cid))

expect(bytes).to.equalBytes(streamed)
})
})
Loading