Skip to content

Commit

Permalink
feat: aggregator keeping oldest piece ts
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Nov 27, 2023
1 parent e8bffe2 commit b71cdba
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 3 deletions.
8 changes: 8 additions & 0 deletions packages/filecoin-api/src/aggregator/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ export interface AggregateRecord {
* Insertion date ISO string.
*/
insertedAt: string
/**
* ISO string date of oldest piece in the pipeline included into the aggregate.
*/
oldestPieceInsertedAt: string
}

// TODO: probably group should also be key!
Expand Down Expand Up @@ -314,6 +318,10 @@ export interface AggregateOfferMessage {
* Grouping information for submitted piece.
*/
group: string
/**
* ISO string date of oldest piece in the pipeline included into the aggregate.
*/
oldestPieceInsertedAt: string
}

export interface AggregateConfig {
Expand Down
10 changes: 10 additions & 0 deletions packages/filecoin-api/src/aggregator/buffer-reducing.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ export async function handleBufferReducingWithAggregate({
)
const aggregateBlock = await CBOR.write(aggregateReducedBuffer)

// Get timestamp of oldest piece in the pipeline included in the aggregate
const oldestPieceInsertedAtDate = new Date(
Math.min(
...aggregateInfo.addedBufferedPieces.map((bf) =>
new Date(bf.insertedAt).getTime()
)
)
)

// Store buffered pieces for aggregate
const bufferStoreAggregatePut = await bufferStore.put({
buffer: aggregateReducedBuffer,
Expand All @@ -65,6 +74,7 @@ export async function handleBufferReducingWithAggregate({
buffer: aggregateBlock.cid,
pieces: piecesBlock.cid,
group,
oldestPieceInsertedAt: oldestPieceInsertedAtDate.toISOString(),
})
if (aggregateOfferQueueAdd.error) {
return aggregateOfferQueueAdd
Expand Down
3 changes: 2 additions & 1 deletion packages/filecoin-api/src/aggregator/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,15 @@ export const handleBufferQueueMessage = async (context, records) => {
* @param {import('./api.js').AggregateOfferMessage} message
*/
export const handleAggregateOfferMessage = async (context, message) => {
const { pieces, aggregate, buffer, group } = message
const { pieces, aggregate, buffer, group, oldestPieceInsertedAt } = message

// Store aggregate information into the store. Store events MAY be used to propagate aggregate over
const putRes = await context.aggregateStore.put({
pieces,
aggregate,
buffer,
group,
oldestPieceInsertedAt,
insertedAt: new Date().toISOString(),
})

Expand Down
24 changes: 22 additions & 2 deletions packages/filecoin-api/test/events/aggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,26 @@ export const test = {
/** @type {AggregateOfferMessage} */
// @ts-expect-error cannot infer buffer message
const message = context.queuedMessages.get('aggregateOfferQueue')?.[0]

const bufferGet = await context.bufferStore.get(message.buffer)
assert.ok(bufferGet.ok)
assert.ok(bufferGet.ok?.block.equals(message.buffer))
assert.equal(bufferGet.ok?.buffer.group, group)
assert.ok(message.aggregate.equals(bufferGet.ok?.buffer.aggregate))
assert.equal(bufferGet.ok?.buffer.pieces.length, totalPieces)
// Validate oldest piece date
assert.ok(message.oldestPieceInsertedAt)

const oldestPieceInsertedAtDate = new Date(
Math.min(
...(bufferGet.ok?.buffer.pieces?.map((bf) =>
new Date(bf.insertedAt).getTime()
) || [])
)
)
assert.equal(
oldestPieceInsertedAtDate.toISOString(),
message.oldestPieceInsertedAt
)
},
'handles buffer queue messages successfully to queue aggregate and remaining buffer':
async (assert, context) => {
Expand Down Expand Up @@ -562,6 +575,7 @@ export const test = {
pieces: piecesBlock.cid,
buffer: block.cid,
group,
oldestPieceInsertedAt: new Date().toISOString(),
}

// Handle message
Expand Down Expand Up @@ -604,6 +618,7 @@ export const test = {
buffer: block.cid,
pieces: piecesBlock.cid,
group,
oldestPieceInsertedAt: new Date().toISOString(),
}

// Handle message
Expand Down Expand Up @@ -652,6 +667,7 @@ export const test = {
aggregate: aggregate.link,
group,
insertedAt: new Date().toISOString(),
oldestPieceInsertedAt: new Date().toISOString(),
}
const putAggregateRes = await context.aggregateStore.put(aggregateRecord)
assert.ok(putAggregateRes.ok)
Expand Down Expand Up @@ -719,9 +735,10 @@ export const test = {
pieces: piecesBlock.cid,
group,
insertedAt: new Date().toISOString(),
oldestPieceInsertedAt: new Date().toISOString(),
}
const putAggregateRes = await context.aggregateStore.put(
aggregateRecord
aggregateRecord,
)
assert.ok(putAggregateRes.ok)

Expand Down Expand Up @@ -776,6 +793,7 @@ export const test = {
pieces: piecesBlock.cid,
group,
insertedAt: new Date().toISOString(),
oldestPieceInsertedAt: new Date().toISOString(),
}
const putAggregateRes = await context.aggregateStore.put(
aggregateRecord
Expand Down Expand Up @@ -1149,6 +1167,7 @@ export const test = {
pieces: blockPieces.cid,
group,
insertedAt: new Date().toISOString(),
oldestPieceInsertedAt: new Date().toISOString(),
}
const putAggregateRes = await context.aggregateStore.put(aggregateRecord)
assert.ok(putAggregateRes.ok)
Expand Down Expand Up @@ -1203,6 +1222,7 @@ export const test = {
pieces: piecesBlock.cid,
group,
insertedAt: new Date().toISOString(),
oldestPieceInsertedAt: new Date().toISOString(),
}
const putAggregateRes = await context.aggregateStore.put(
aggregateRecord
Expand Down
1 change: 1 addition & 0 deletions packages/filecoin-api/test/services/aggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ export const test = {
buffer: block.cid,
group,
insertedAt: new Date().toISOString(),
oldestPieceInsertedAt: new Date().toISOString(),
})
assert.ok(aggregatePutRes.ok)

Expand Down

0 comments on commit b71cdba

Please sign in to comment.