Skip to content

Commit

Permalink
fix: aggregate offer invocation cid wrong because not pieces cid bloc…
Browse files Browse the repository at this point in the history
…k being used but buffer block
  • Loading branch information
vasco-santos committed Nov 3, 2023
1 parent dfb46d8 commit 49ed90f
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 15 deletions.
8 changes: 8 additions & 0 deletions packages/filecoin-api/src/aggregator/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ export interface AggregateRecord {
* `bagy...aggregate` Piece CID of an aggregate
*/
aggregate: PieceLink
/**
* `bafy...cbor` as CID of dag-cbor block with buffer of pieces in an aggregate.
*/
buffer: Link
/**
* `bafy...cbor` as CID of dag-cbor block with list of pieces in an aggregate.
*/
Expand Down Expand Up @@ -298,6 +302,10 @@ export interface AggregateOfferMessage {
* List of pieces in an aggregate.
*/
pieces: Link
/**
* `bafy...cbor` as CID of dag-cbor block with buffer of pieces in an aggregate.
*/
buffer: Link
/**
* Grouping information for submitted piece.
*/
Expand Down
4 changes: 3 additions & 1 deletion packages/filecoin-api/src/aggregator/buffer-reducing.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export async function handleBufferReducingWithAggregate({
pieces: aggregateInfo.addedBufferedPieces,
group,
}
const piecesBlock = await CBOR.write(aggregateInfo.addedBufferedPieces.map(bf => bf.piece))
const aggregateBlock = await CBOR.write(aggregateReducedBuffer)

// Store buffered pieces for aggregate
Expand All @@ -59,7 +60,8 @@ export async function handleBufferReducingWithAggregate({
// Propagate message for aggregate offer queue
const aggregateOfferQueueAdd = await aggregateOfferQueue.add({
aggregate: aggregateInfo.aggregate.link,
pieces: aggregateBlock.cid,
buffer: aggregateBlock.cid,
pieces: piecesBlock.cid,
group,
})
if (aggregateOfferQueueAdd.error) {
Expand Down
7 changes: 4 additions & 3 deletions packages/filecoin-api/src/aggregator/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,13 @@ export const handleBufferQueueMessage = async (context, records) => {
* @param {import('./api.js').AggregateOfferMessage} message
*/
export const handleAggregateOfferMessage = async (context, message) => {
const { pieces, aggregate, group } = message
const { pieces, aggregate, buffer, group } = message

// Store aggregate information into the store. Store events MAY be used to propagate aggregate over
const putRes = await context.aggregateStore.put({
pieces,
aggregate,
buffer,
group,
insertedAt: new Date().toISOString(),
})
Expand All @@ -196,7 +197,7 @@ export const handleAggregateInsertToPieceAcceptQueue = async (
context,
record
) => {
const bufferStoreRes = await context.bufferStore.get(record.pieces)
const bufferStoreRes = await context.bufferStore.get(record.buffer)
if (bufferStoreRes.error) {
return bufferStoreRes
}
Expand Down Expand Up @@ -331,7 +332,7 @@ export const handleAggregateInsertToAggregateOffer = async (
context,
record
) => {
const bufferStoreRes = await context.bufferStore.get(record.pieces)
const bufferStoreRes = await context.bufferStore.get(record.buffer)
if (bufferStoreRes.error) {
return {
error: bufferStoreRes.error,
Expand Down
1 change: 1 addition & 0 deletions packages/filecoin-api/src/aggregator/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export const pieceAccept = async ({ capability }, context) => {
}
}

// Get buffered pieces
const [{ aggregate, inclusion }] = getInclusionRes.ok
const getAggregateRes = await context.aggregateStore.get({ aggregate })
if (getAggregateRes.error) {
Expand Down
38 changes: 27 additions & 11 deletions packages/filecoin-api/test/events/aggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,9 @@ export const test = {
// @ts-expect-error cannot infer buffer message
const message = context.queuedMessages.get('aggregateOfferQueue')?.[0]

const bufferGet = await context.bufferStore.get(message.pieces)
const bufferGet = await context.bufferStore.get(message.buffer)
assert.ok(bufferGet.ok)
assert.ok(bufferGet.ok?.block.equals(message.pieces))
assert.ok(bufferGet.ok?.block.equals(message.buffer))
assert.equal(bufferGet.ok?.buffer.group, group)
assert.ok(message.aggregate.equals(bufferGet.ok?.buffer.aggregate))
assert.equal(bufferGet.ok?.buffer.pieces.length, totalPieces)
Expand Down Expand Up @@ -414,7 +414,7 @@ export const test = {
const bufferMessage = context.queuedMessages.get('bufferQueue')?.[0]

const aggregateBufferGet = await context.bufferStore.get(
aggregateOfferMessage.pieces
aggregateOfferMessage.buffer
)
assert.ok(aggregateBufferGet.ok)
const remainingBufferGet = await context.bufferStore.get(
Expand Down Expand Up @@ -554,11 +554,13 @@ export const test = {
group,
}
const block = await CBOR.write(buffer)
const piecesBlock = await CBOR.write(pieces.map((p) => p.link))

/** @type {AggregateOfferMessage} */
const message = {
aggregate: aggregate.link,
pieces: block.cid,
pieces: piecesBlock.cid,
buffer: block.cid,
group,
}

Expand All @@ -573,7 +575,8 @@ export const test = {
})
assert.ok(hasStoredAggregate.ok)
assert.ok(hasStoredAggregate.ok?.aggregate.equals(aggregate.link))
assert.ok(hasStoredAggregate.ok?.pieces.equals(block.cid))
assert.ok(hasStoredAggregate.ok?.buffer.equals(block.cid))
assert.ok(hasStoredAggregate.ok?.pieces.equals(piecesBlock.cid))
assert.equal(hasStoredAggregate.ok?.group, group)
assert.ok(hasStoredAggregate.ok?.insertedAt)
},
Expand All @@ -593,11 +596,13 @@ export const test = {
group,
}
const block = await CBOR.write(buffer)
const piecesBlock = await CBOR.write(pieces.map((p) => p.link))

/** @type {AggregateOfferMessage} */
const message = {
aggregate: aggregate.link,
pieces: block.cid,
buffer: block.cid,
pieces: piecesBlock.cid,
group,
}

Expand Down Expand Up @@ -631,6 +636,7 @@ export const test = {
group,
}
const block = await CBOR.write(buffer)
const piecesBlock = await CBOR.write(pieces.map((p) => p.link))

// Put buffer record
const putBufferRes = await context.bufferStore.put({
Expand All @@ -641,7 +647,8 @@ export const test = {

// Put aggregate record
const aggregateRecord = {
pieces: block.cid,
buffer: block.cid,
pieces: piecesBlock.cid,
aggregate: aggregate.link,
group,
insertedAt: new Date().toISOString(),
Expand Down Expand Up @@ -697,11 +704,13 @@ export const test = {
group,
}
const block = await CBOR.write(buffer)
const piecesBlock = await CBOR.write(pieces.map((p) => p.link))

// Put aggregate record
const aggregateRecord = {
pieces: block.cid,
buffer: block.cid,
aggregate: aggregate.link,
pieces: piecesBlock.cid,
group,
insertedAt: new Date().toISOString(),
}
Expand Down Expand Up @@ -745,6 +754,8 @@ export const test = {
group,
}
const block = await CBOR.write(buffer)
const piecesBlock = await CBOR.write(pieces.map((p) => p.link))

// Put buffer record
const putBufferRes = await context.bufferStore.put({
buffer,
Expand All @@ -754,8 +765,9 @@ export const test = {

// Put aggregate record
const aggregateRecord = {
pieces: block.cid,
buffer: block.cid,
aggregate: aggregate.link,
pieces: piecesBlock.cid,
group,
insertedAt: new Date().toISOString(),
}
Expand Down Expand Up @@ -1114,8 +1126,9 @@ export const test = {

// Put aggregate record
const aggregateRecord = {
pieces: blockBuffer.cid,
buffer: blockBuffer.cid,
aggregate: aggregate.link,
pieces: blockPieces.cid,
group,
insertedAt: new Date().toISOString(),
}
Expand All @@ -1128,6 +1141,7 @@ export const test = {
context,
aggregateRecord
)
console.log('xxx', handledAggregateInsertsRes.error)
assert.ok(handledAggregateInsertsRes.ok)

// Verify invocation
Expand Down Expand Up @@ -1163,11 +1177,13 @@ export const test = {
group,
}
const blockBuffer = await CBOR.write(buffer)
const piecesBlock = await CBOR.write(pieces.map((p) => p.link))

// Put aggregate record
const aggregateRecord = {
pieces: blockBuffer.cid,
buffer: blockBuffer.cid,
aggregate: aggregate.link,
pieces: piecesBlock.cid,
group,
insertedAt: new Date().toISOString(),
}
Expand Down
12 changes: 12 additions & 0 deletions packages/filecoin-api/test/services/aggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
/**
* @typedef {import('@web3-storage/data-segment').PieceLink} PieceLink
* @typedef {import('@ucanto/interface').Link} Link
* @typedef {import('../../src/aggregator/api.js').Buffer} Buffer
* @typedef {import('../../src/aggregator/api.js').PieceRecord} PieceRecord
* @typedef {import('../../src/aggregator/api.js').PieceRecordKey} PieceRecordKey
* @typedef {import('../../src/aggregator/api.js').BufferRecord} BufferRecord
Expand Down Expand Up @@ -234,13 +235,24 @@ export const test = {
const group = storefront.did()
const { pieces, aggregate } = await randomAggregate(100, 128)
const piece = pieces[0].link
/** @type {Buffer} */
const buffer = {
pieces: pieces.map((p) => ({
piece: p.link,
insertedAt: new Date().toISOString(),
policy: 0,
})),
group,
}
const block = await CBOR.write(buffer)

// Store aggregate record into store
const offer = pieces.map((p) => p.link)
const piecesBlock = await CBOR.write(offer)
const aggregatePutRes = await context.aggregateStore.put({
aggregate: aggregate.link,
pieces: piecesBlock.cid,
buffer: block.cid,
group,
insertedAt: new Date().toISOString(),
})
Expand Down

0 comments on commit 49ed90f

Please sign in to comment.