Skip to content

Commit

Permalink
fix: aggregate offer invocation cid wrong because not pieces cid bloc…
Browse files Browse the repository at this point in the history
…k being used but buffer block
  • Loading branch information
vasco-santos committed Nov 3, 2023
1 parent dfb46d8 commit b88ed5d
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 13 deletions.
8 changes: 8 additions & 0 deletions packages/filecoin-api/src/aggregator/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ export interface AggregateRecord {
* `bagy...aggregate` Piece CID of an aggregate
*/
aggregate: PieceLink
/**
* `bafy...cbor` as CID of dag-cbor block with buffer of pieces in an aggregate.
*/
buffer: Link
/**
* `bafy...cbor` as CID of dag-cbor block with list of pieces in an aggregate.
*/
Expand Down Expand Up @@ -298,6 +302,10 @@ export interface AggregateOfferMessage {
* List of pieces in an aggregate.
*/
pieces: Link
/**
* `bafy...cbor` as CID of dag-cbor block with buffer of pieces in an aggregate.
*/
buffer: Link
/**
* Grouping information for submitted piece.
*/
Expand Down
4 changes: 3 additions & 1 deletion packages/filecoin-api/src/aggregator/buffer-reducing.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export async function handleBufferReducingWithAggregate({
pieces: aggregateInfo.addedBufferedPieces,
group,
}
const piecesBlock = await CBOR.write(aggregateInfo.addedBufferedPieces.map(bf => bf.piece))
const aggregateBlock = await CBOR.write(aggregateReducedBuffer)

// Store buffered pieces for aggregate
Expand All @@ -59,7 +60,8 @@ export async function handleBufferReducingWithAggregate({
// Propagate message for aggregate offer queue
const aggregateOfferQueueAdd = await aggregateOfferQueue.add({
aggregate: aggregateInfo.aggregate.link,
pieces: aggregateBlock.cid,
buffer: aggregateBlock.cid,
pieces: piecesBlock.cid,
group,
})
if (aggregateOfferQueueAdd.error) {
Expand Down
7 changes: 4 additions & 3 deletions packages/filecoin-api/src/aggregator/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,13 @@ export const handleBufferQueueMessage = async (context, records) => {
* @param {import('./api.js').AggregateOfferMessage} message
*/
export const handleAggregateOfferMessage = async (context, message) => {
const { pieces, aggregate, group } = message
const { pieces, aggregate, buffer, group } = message

// Store aggregate information into the store. Store events MAY be used to propagate aggregate over
const putRes = await context.aggregateStore.put({
pieces,
aggregate,
buffer,
group,
insertedAt: new Date().toISOString(),
})
Expand All @@ -196,7 +197,7 @@ export const handleAggregateInsertToPieceAcceptQueue = async (
context,
record
) => {
const bufferStoreRes = await context.bufferStore.get(record.pieces)
const bufferStoreRes = await context.bufferStore.get(record.buffer)
if (bufferStoreRes.error) {
return bufferStoreRes
}
Expand Down Expand Up @@ -331,7 +332,7 @@ export const handleAggregateInsertToAggregateOffer = async (
context,
record
) => {
const bufferStoreRes = await context.bufferStore.get(record.pieces)
const bufferStoreRes = await context.bufferStore.get(record.buffer)
if (bufferStoreRes.error) {
return {
error: bufferStoreRes.error,
Expand Down
1 change: 1 addition & 0 deletions packages/filecoin-api/src/aggregator/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export const pieceAccept = async ({ capability }, context) => {
}
}

// Get buffered pieces
const [{ aggregate, inclusion }] = getInclusionRes.ok
const getAggregateRes = await context.aggregateStore.get({ aggregate })
if (getAggregateRes.error) {
Expand Down
31 changes: 22 additions & 9 deletions packages/filecoin-api/test/events/aggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,9 @@ export const test = {
// @ts-expect-error cannot infer buffer message
const message = context.queuedMessages.get('aggregateOfferQueue')?.[0]

const bufferGet = await context.bufferStore.get(message.pieces)
const bufferGet = await context.bufferStore.get(message.buffer)
assert.ok(bufferGet.ok)
assert.ok(bufferGet.ok?.block.equals(message.pieces))
assert.ok(bufferGet.ok?.block.equals(message.buffer))
assert.equal(bufferGet.ok?.buffer.group, group)
assert.ok(message.aggregate.equals(bufferGet.ok?.buffer.aggregate))
assert.equal(bufferGet.ok?.buffer.pieces.length, totalPieces)
Expand Down Expand Up @@ -414,7 +414,7 @@ export const test = {
const bufferMessage = context.queuedMessages.get('bufferQueue')?.[0]

const aggregateBufferGet = await context.bufferStore.get(
aggregateOfferMessage.pieces
aggregateOfferMessage.buffer
)
assert.ok(aggregateBufferGet.ok)
const remainingBufferGet = await context.bufferStore.get(
Expand Down Expand Up @@ -593,11 +593,13 @@ export const test = {
group,
}
const block = await CBOR.write(buffer)
const piecesBlock = await CBOR.write(pieces.map((p) => p.link))

/** @type {AggregateOfferMessage} */
const message = {
aggregate: aggregate.link,
pieces: block.cid,
buffer: block.cid,
pieces: piecesBlock.cid,
group,
}

Expand Down Expand Up @@ -631,6 +633,7 @@ export const test = {
group,
}
const block = await CBOR.write(buffer)
const piecesBlock = await CBOR.write(pieces.map((p) => p.link))

// Put buffer record
const putBufferRes = await context.bufferStore.put({
Expand All @@ -641,7 +644,8 @@ export const test = {

// Put aggregate record
const aggregateRecord = {
pieces: block.cid,
buffer: block.cid,
pieces: piecesBlock.cid,
aggregate: aggregate.link,
group,
insertedAt: new Date().toISOString(),
Expand Down Expand Up @@ -697,11 +701,13 @@ export const test = {
group,
}
const block = await CBOR.write(buffer)
const piecesBlock = await CBOR.write(pieces.map((p) => p.link))

// Put aggregate record
const aggregateRecord = {
pieces: block.cid,
buffer: block.cid,
aggregate: aggregate.link,
pieces: piecesBlock.cid,
group,
insertedAt: new Date().toISOString(),
}
Expand Down Expand Up @@ -745,6 +751,8 @@ export const test = {
group,
}
const block = await CBOR.write(buffer)
const piecesBlock = await CBOR.write(pieces.map((p) => p.link))

// Put buffer record
const putBufferRes = await context.bufferStore.put({
buffer,
Expand All @@ -754,8 +762,9 @@ export const test = {

// Put aggregate record
const aggregateRecord = {
pieces: block.cid,
buffer: block.cid,
aggregate: aggregate.link,
pieces: piecesBlock.cid,
group,
insertedAt: new Date().toISOString(),
}
Expand Down Expand Up @@ -1114,8 +1123,9 @@ export const test = {

// Put aggregate record
const aggregateRecord = {
pieces: blockBuffer.cid,
buffer: blockBuffer.cid,
aggregate: aggregate.link,
pieces: blockPieces.cid,
group,
insertedAt: new Date().toISOString(),
}
Expand All @@ -1128,6 +1138,7 @@ export const test = {
context,
aggregateRecord
)
console.log('xxx', handledAggregateInsertsRes.error)
assert.ok(handledAggregateInsertsRes.ok)

// Verify invocation
Expand Down Expand Up @@ -1163,11 +1174,13 @@ export const test = {
group,
}
const blockBuffer = await CBOR.write(buffer)
const piecesBlock = await CBOR.write(pieces.map((p) => p.link))

// Put aggregate record
const aggregateRecord = {
pieces: blockBuffer.cid,
buffer: blockBuffer.cid,
aggregate: aggregate.link,
pieces: piecesBlock.cid,
group,
insertedAt: new Date().toISOString(),
}
Expand Down

0 comments on commit b88ed5d

Please sign in to comment.