Skip to content

Commit

Permalink
feat: revamp implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
joaosa committed May 2, 2024
1 parent 19a0970 commit b6e213f
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 62 deletions.
233 changes: 171 additions & 62 deletions packages/upload-client/src/blob.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,127 @@
import { ed25519 } from '@ucanto/principal'
import { conclude } from '@web3-storage/capabilities/ucan'
import * as UCAN from '@web3-storage/capabilities/ucan'
import { CAR } from '@ucanto/transport'
import { Receipt } from '@ucanto/core'
import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob'
import * as BlobCapabilities from '@web3-storage/capabilities/blob'
import * as HTTPCapabilities from '@web3-storage/capabilities/http'
import { SpaceDID } from '@web3-storage/capabilities/utils'
import retry, { AbortError } from 'p-retry'
import retry from 'p-retry'
import { servicePrincipal, connection } from './service.js'
import { REQUEST_RETRIES } from './constants.js'

// FIXME this code has been copied over from upload-api
/**
*
* @param {string} url
* @param {import('./types.js').ProgressFn} handler
* @param {import('@ucanto/interface').Invocation} concludeFx
*/
export function getConcludeReceipt(concludeFx) {
const receiptBlocks = new Map()
for (const block of concludeFx.iterateIPLDBlocks()) {
receiptBlocks.set(`${block.cid}`, block)
}
return Receipt.view({
// @ts-expect-error object of type unknown
root: concludeFx.capabilities[0].nb.receipt,
blocks: receiptBlocks,
})
}

// FIXME this code has been copied over from upload-api
/**
* @param {import('@ucanto/interface').Receipt} receipt
*/
function createUploadProgressHandler(url, handler) {
export function parseBlobAddReceiptNext(receipt) {
// Get invocations next
/**
*
* @param {import('./types.js').ProgressStatus} status
* @type {import('@ucanto/interface').Invocation[]}
*/
function onUploadProgress({ total, loaded, lengthComputable }) {
return handler({ total, loaded, lengthComputable, url })
// @ts-expect-error read only effect
const forkInvocations = receipt.fx.fork
const allocateTask = forkInvocations.find(
(fork) => fork.capabilities[0].can === W3sBlobCapabilities.allocate.can
)
const concludefxs = forkInvocations.filter(
(fork) => fork.capabilities[0].can === UCAN.conclude.can
)
const putTask = forkInvocations.find(
(fork) => fork.capabilities[0].can === HTTPCapabilities.put.can
)
const acceptTask = receipt.fx.join
if (!allocateTask || !concludefxs.length || !putTask || !acceptTask) {
throw new Error('mandatory effects not received')
}

// Decode receipts available
const nextReceipts = concludefxs.map((fx) => getConcludeReceipt(fx))
/** @type {import('@ucanto/interface').Receipt<import('./types.js').BlobAllocateSuccess, import('./types.js').BlobAllocateFailure> | undefined} */
// @ts-expect-error types unknown for next
const allocateReceipt = nextReceipts.find((receipt) =>
receipt.ran.link().equals(allocateTask.cid)
)
/** @type {import('@ucanto/interface').Receipt<{}, import('@ucanto/interface').Failure> | undefined} */
// @ts-expect-error types unknown for next
const putReceipt = nextReceipts.find((receipt) =>
receipt.ran.link().equals(putTask.cid)
)
/** @type {import('@ucanto/interface').Receipt<import('./types.js').BlobAcceptSuccess, import('./types.js').BlobAcceptFailure> | undefined} */
// @ts-expect-error types unknown for next
const acceptReceipt = nextReceipts.find((receipt) =>
receipt.ran.link().equals(acceptTask.link())
)

if (!allocateReceipt) {
throw new Error('mandatory effects not received')
}

return {
allocate: {
task: allocateTask,
receipt: allocateReceipt,
},
put: {
task: putTask,
receipt: putReceipt,
},
accept: {
task: acceptTask,
receipt: acceptReceipt,
},
}
}

// FIXME this code has been copied over from upload-api
/**
* @param {import('@ucanto/interface').Signer} id
* @param {import('@ucanto/interface').Verifier} serviceDid
* @param {import('@ucanto/interface').Receipt} receipt
*/
export function createConcludeInvocation(id, serviceDid, receipt) {
const receiptBlocks = []
const receiptCids = []
for (const block of receipt.iterateIPLDBlocks()) {
receiptBlocks.push(block)
receiptCids.push(block.cid)
}
const concludeAllocatefx = conclude.invoke({
issuer: id,
audience: serviceDid,
with: id.toDIDKey(),
nb: {
receipt: receipt.link(),
},
expiration: Infinity,
facts: [
{
...receiptCids,
},
],
})
for (const block of receiptBlocks) {
concludeAllocatefx.attach(block)
}
return onUploadProgress

return concludeAllocatefx
}

/**
Expand Down Expand Up @@ -81,63 +184,69 @@ export async function add(
})
}

// TODO I'm definitely missing something here
// I suppose it's something alike https://github.com/w3s-project/w3up/pull/1421/files#diff-f1d31e4f2617054f785fab0c186ab965b2fdd3a9ed7873a955d3e3c74bb6e186R100
const responseAddUpload = result.out.ok

const fetchWithUploadProgress =
options.fetchWithUploadProgress ||
options.fetch ||
globalThis.fetch.bind(globalThis)
const nextTasks = parseBlobAddReceiptNext(result)

let fetchDidCallUploadProgressCb = false
const res = await retry(
async () => {
try {
const res = await fetchWithUploadProgress(responseAddUpload.url, {
method: 'PUT',
body: car,
headers: responseAddUpload.headers,
signal: options.signal,
onUploadProgress: (status) => {
fetchDidCallUploadProgressCb = true
if (options.onUploadProgress)
createUploadProgressHandler(
responseAddUpload.url,
options.onUploadProgress
)(status)
},
// @ts-expect-error - this is needed by recent versions of node - see https://github.com/bluesky-social/atproto/pull/470 for more info
duplex: 'half',
})
if (res.status >= 400 && res.status < 500) {
throw new AbortError(`upload failed: ${res.status}`)
}
return res
} catch (err) {
if (options.signal?.aborted === true) {
throw new AbortError('upload aborted')
}
throw err
}
},
{
retries: options.retries ?? REQUEST_RETRIES,
}
)
const { receipt } = nextTasks.allocate
if (!receipt.out.ok) {
throw new Error(`failed ${BlobCapabilities.add.can} invocation`, {
cause: receipt.out.error,
})
}

if (!fetchDidCallUploadProgressCb && options.onUploadProgress) {
// the fetch implementation didn't support onUploadProgress
const carBlob = new Blob([car])
options.onUploadProgress({
total: carBlob.size,
loaded: carBlob.size,
lengthComputable: false,
const { address } = receipt.out.ok
if (address) {
const { status } = await fetch(address.url, {
method: 'PUT',
mode: 'cors',
body: bytes,
headers: address.headers,
})
if (status !== 200) throw new Error(`unexpected status: ${status}`)
}

if (!res.ok) {
throw new Error(`upload failed: ${res.status}`)
// Invoke `conclude` with `http/put` receipt
const derivedSigner = ed25519.from(
/** @type {import('@ucanto/interface').SignerArchive<import('@ucanto/interface').DID, typeof ed25519.signatureCode>} */
(nextTasks.put.task.facts[0]['keys'])
)
const httpPut = HTTPCapabilities.put.invoke({
issuer: derivedSigner,
audience: derivedSigner,
with: derivedSigner.toDIDKey(),
nb: {
body: {
digest: bytes,
size: bytes.length,
},
url: {
'ucan/await': ['.out.ok.address.url', nextTasks.allocate.task.cid],
},
headers: {
'ucan/await': [
'.out.ok.address.headers',
nextTasks.allocate.task.cid,
],
},
},
facts: nextTasks.put.task.facts,
expiration: Infinity,
})

const httpPutDelegation = await httpPut.delegate()
const httpPutReceipt = await Receipt.issue({
issuer: derivedSigner,
ran: httpPutDelegation.cid,
result: { ok: {} },
})
// @ts-expect-error object of type unknown
const httpPutConcludeInvocation = createConcludeInvocation(issuer, audience, httpPutReceipt)
// @ts-expect-error object of type unknown
const ucanConclude = await httpPutConcludeInvocation.execute(connection)

if (!ucanConclude.out.ok) {
throw new Error(`failed ${BlobCapabilities.add.can} invocation`, {
cause: result.out.error,
})
}

return link
Expand Down
8 changes: 8 additions & 0 deletions packages/upload-client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import {
BlobAdd,
BlobAddSuccess,
BlobAddFailure,
BlobAllocateSuccess,
BlobAllocateFailure,
BlobAcceptSuccess,
BlobAcceptFailure,
StoreAdd,
StoreAddSuccess,
StoreAddSuccessUpload,
Expand Down Expand Up @@ -62,6 +66,10 @@ type FetchOptions = Override<

export type {
FetchOptions,
BlobAllocateSuccess,
BlobAllocateFailure,
BlobAcceptSuccess,
BlobAcceptFailure,
StoreAdd,
StoreAddSuccess,
StoreAddSuccessUpload,
Expand Down
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b6e213f

Please sign in to comment.