From b71cdba9f91c7d30d30b7d098921252b1ca498fa Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Mon, 27 Nov 2023 14:18:21 +0100 Subject: [PATCH 1/2] feat: aggregator keeping oldest piece ts --- packages/filecoin-api/src/aggregator/api.ts | 8 +++++++ .../src/aggregator/buffer-reducing.js | 10 ++++++++ .../filecoin-api/src/aggregator/events.js | 3 ++- .../filecoin-api/test/events/aggregator.js | 24 +++++++++++++++++-- .../filecoin-api/test/services/aggregator.js | 1 + 5 files changed, 43 insertions(+), 3 deletions(-) diff --git a/packages/filecoin-api/src/aggregator/api.ts b/packages/filecoin-api/src/aggregator/api.ts index 7a6b1ae01..1bb1b7b2d 100644 --- a/packages/filecoin-api/src/aggregator/api.ts +++ b/packages/filecoin-api/src/aggregator/api.ts @@ -187,6 +187,10 @@ export interface AggregateRecord { * Insertion date ISO string. */ insertedAt: string + /** + * ISO string date of oldest piece in the pipeline included into the aggregate. + */ + oldestPieceInsertedAt: string } // TODO: probably group should also be key! @@ -314,6 +318,10 @@ export interface AggregateOfferMessage { * Grouping information for submitted piece. */ group: string + /** + * ISO string date of oldest piece in the pipeline included into the aggregate. + */ + oldestPieceInsertedAt: string } export interface AggregateConfig { diff --git a/packages/filecoin-api/src/aggregator/buffer-reducing.js b/packages/filecoin-api/src/aggregator/buffer-reducing.js index e7de30d45..124f90885 100644 --- a/packages/filecoin-api/src/aggregator/buffer-reducing.js +++ b/packages/filecoin-api/src/aggregator/buffer-reducing.js @@ -50,6 +50,15 @@ export async function handleBufferReducingWithAggregate({ ) const aggregateBlock = await CBOR.write(aggregateReducedBuffer) + // Get timestamp of oldest piece in the pipeline included in the aggregate + const oldestPieceInsertedAtDate = new Date( + Math.min( + ...aggregateInfo.addedBufferedPieces.map((bf) => + new Date(bf.insertedAt).getTime() + ) + ) + ) + // Store buffered pieces for aggregate const bufferStoreAggregatePut = await bufferStore.put({ buffer: aggregateReducedBuffer, @@ -65,6 +74,7 @@ export async function handleBufferReducingWithAggregate({ buffer: aggregateBlock.cid, pieces: piecesBlock.cid, group, + oldestPieceInsertedAt: oldestPieceInsertedAtDate.toISOString(), }) if (aggregateOfferQueueAdd.error) { return aggregateOfferQueueAdd diff --git a/packages/filecoin-api/src/aggregator/events.js b/packages/filecoin-api/src/aggregator/events.js index d8b27c96d..bc8895ea6 100644 --- a/packages/filecoin-api/src/aggregator/events.js +++ b/packages/filecoin-api/src/aggregator/events.js @@ -168,7 +168,7 @@ export const handleBufferQueueMessage = async (context, records) => { * @param {import('./api.js').AggregateOfferMessage} message */ export const handleAggregateOfferMessage = async (context, message) => { - const { pieces, aggregate, buffer, group } = message + const { pieces, aggregate, buffer, group, oldestPieceInsertedAt } = message // Store aggregate information into the store. Store events MAY be used to propagate aggregate over const putRes = await context.aggregateStore.put({ @@ -176,6 +176,7 @@ export const handleAggregateOfferMessage = async (context, message) => { aggregate, buffer, group, + oldestPieceInsertedAt, insertedAt: new Date().toISOString(), }) diff --git a/packages/filecoin-api/test/events/aggregator.js b/packages/filecoin-api/test/events/aggregator.js index 04ca2ec0d..d490c497c 100644 --- a/packages/filecoin-api/test/events/aggregator.js +++ b/packages/filecoin-api/test/events/aggregator.js @@ -352,13 +352,26 @@ export const test = { /** @type {AggregateOfferMessage} */ // @ts-expect-error cannot infer buffer message const message = context.queuedMessages.get('aggregateOfferQueue')?.[0] - const bufferGet = await context.bufferStore.get(message.buffer) assert.ok(bufferGet.ok) assert.ok(bufferGet.ok?.block.equals(message.buffer)) assert.equal(bufferGet.ok?.buffer.group, group) assert.ok(message.aggregate.equals(bufferGet.ok?.buffer.aggregate)) assert.equal(bufferGet.ok?.buffer.pieces.length, totalPieces) + // Validate oldest piece date + assert.ok(message.oldestPieceInsertedAt) + + const oldestPieceInsertedAtDate = new Date( + Math.min( + ...(bufferGet.ok?.buffer.pieces?.map((bf) => + new Date(bf.insertedAt).getTime() + ) || []) + ) + ) + assert.equal( + oldestPieceInsertedAtDate.toISOString(), + message.oldestPieceInsertedAt + ) }, 'handles buffer queue messages successfully to queue aggregate and remaining buffer': async (assert, context) => { @@ -562,6 +575,7 @@ export const test = { pieces: piecesBlock.cid, buffer: block.cid, group, + oldestPieceInsertedAt: new Date().toISOString(), } // Handle message @@ -604,6 +618,7 @@ export const test = { buffer: block.cid, pieces: piecesBlock.cid, group, + oldestPieceInsertedAt: new Date().toISOString(), } // Handle message @@ -652,6 +667,7 @@ export const test = { aggregate: aggregate.link, group, insertedAt: new Date().toISOString(), + oldestPieceInsertedAt: new Date().toISOString(), } const putAggregateRes = await context.aggregateStore.put(aggregateRecord) assert.ok(putAggregateRes.ok) @@ -719,9 +735,10 @@ export const test = { pieces: piecesBlock.cid, group, insertedAt: new Date().toISOString(), + oldestPieceInsertedAt: new Date().toISOString(), } const putAggregateRes = await context.aggregateStore.put( - aggregateRecord + aggregateRecord, ) assert.ok(putAggregateRes.ok) @@ -776,6 +793,7 @@ export const test = { pieces: piecesBlock.cid, group, insertedAt: new Date().toISOString(), + oldestPieceInsertedAt: new Date().toISOString(), } const putAggregateRes = await context.aggregateStore.put( aggregateRecord @@ -1149,6 +1167,7 @@ export const test = { pieces: blockPieces.cid, group, insertedAt: new Date().toISOString(), + oldestPieceInsertedAt: new Date().toISOString(), } const putAggregateRes = await context.aggregateStore.put(aggregateRecord) assert.ok(putAggregateRes.ok) @@ -1203,6 +1222,7 @@ export const test = { pieces: piecesBlock.cid, group, insertedAt: new Date().toISOString(), + oldestPieceInsertedAt: new Date().toISOString(), } const putAggregateRes = await context.aggregateStore.put( aggregateRecord diff --git a/packages/filecoin-api/test/services/aggregator.js b/packages/filecoin-api/test/services/aggregator.js index 6160e7e26..7d45eabae 100644 --- a/packages/filecoin-api/test/services/aggregator.js +++ b/packages/filecoin-api/test/services/aggregator.js @@ -255,6 +255,7 @@ export const test = { buffer: block.cid, group, insertedAt: new Date().toISOString(), + oldestPieceInsertedAt: new Date().toISOString(), }) assert.ok(aggregatePutRes.ok) From 6c1a2fce32a24fa30dc477e3a6951a850a9f6f5a Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 28 Nov 2023 13:58:45 +0100 Subject: [PATCH 2/2] fix: address review comments --- packages/filecoin-api/src/aggregator/api.ts | 4 ++-- .../src/aggregator/buffer-reducing.js | 4 ++-- .../filecoin-api/src/aggregator/events.js | 4 ++-- .../filecoin-api/test/events/aggregator.js | 24 +++++++++---------- .../filecoin-api/test/services/aggregator.js | 2 +- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/packages/filecoin-api/src/aggregator/api.ts b/packages/filecoin-api/src/aggregator/api.ts index 1bb1b7b2d..aab00ffb8 100644 --- a/packages/filecoin-api/src/aggregator/api.ts +++ b/packages/filecoin-api/src/aggregator/api.ts @@ -190,7 +190,7 @@ export interface AggregateRecord { /** * ISO string date of oldest piece in the pipeline included into the aggregate. */ - oldestPieceInsertedAt: string + minPieceInsertedAt: string } // TODO: probably group should also be key! @@ -321,7 +321,7 @@ export interface AggregateOfferMessage { /** * ISO string date of oldest piece in the pipeline included into the aggregate. */ - oldestPieceInsertedAt: string + minPieceInsertedAt: string } export interface AggregateConfig { diff --git a/packages/filecoin-api/src/aggregator/buffer-reducing.js b/packages/filecoin-api/src/aggregator/buffer-reducing.js index 124f90885..fd134c541 100644 --- a/packages/filecoin-api/src/aggregator/buffer-reducing.js +++ b/packages/filecoin-api/src/aggregator/buffer-reducing.js @@ -51,7 +51,7 @@ export async function handleBufferReducingWithAggregate({ const aggregateBlock = await CBOR.write(aggregateReducedBuffer) // Get timestamp of oldest piece in the pipeline included in the aggregate - const oldestPieceInsertedAtDate = new Date( + const minPieceInsertedAtDate = new Date( Math.min( ...aggregateInfo.addedBufferedPieces.map((bf) => new Date(bf.insertedAt).getTime() @@ -74,7 +74,7 @@ export async function handleBufferReducingWithAggregate({ buffer: aggregateBlock.cid, pieces: piecesBlock.cid, group, - oldestPieceInsertedAt: oldestPieceInsertedAtDate.toISOString(), + minPieceInsertedAt: minPieceInsertedAtDate.toISOString(), }) if (aggregateOfferQueueAdd.error) { return aggregateOfferQueueAdd diff --git a/packages/filecoin-api/src/aggregator/events.js b/packages/filecoin-api/src/aggregator/events.js index bc8895ea6..71a59f2ee 100644 --- a/packages/filecoin-api/src/aggregator/events.js +++ b/packages/filecoin-api/src/aggregator/events.js @@ -168,7 +168,7 @@ export const handleBufferQueueMessage = async (context, records) => { * @param {import('./api.js').AggregateOfferMessage} message */ export const handleAggregateOfferMessage = async (context, message) => { - const { pieces, aggregate, buffer, group, oldestPieceInsertedAt } = message + const { pieces, aggregate, buffer, group, minPieceInsertedAt } = message // Store aggregate information into the store. Store events MAY be used to propagate aggregate over const putRes = await context.aggregateStore.put({ @@ -176,7 +176,7 @@ export const handleAggregateOfferMessage = async (context, message) => { aggregate, buffer, group, - oldestPieceInsertedAt, + minPieceInsertedAt, insertedAt: new Date().toISOString(), }) diff --git a/packages/filecoin-api/test/events/aggregator.js b/packages/filecoin-api/test/events/aggregator.js index d490c497c..4e092cb03 100644 --- a/packages/filecoin-api/test/events/aggregator.js +++ b/packages/filecoin-api/test/events/aggregator.js @@ -358,10 +358,10 @@ export const test = { assert.equal(bufferGet.ok?.buffer.group, group) assert.ok(message.aggregate.equals(bufferGet.ok?.buffer.aggregate)) assert.equal(bufferGet.ok?.buffer.pieces.length, totalPieces) - // Validate oldest piece date - assert.ok(message.oldestPieceInsertedAt) + // Validate min piece date + assert.ok(message.minPieceInsertedAt) - const oldestPieceInsertedAtDate = new Date( + const minPieceInsertedAtDate = new Date( Math.min( ...(bufferGet.ok?.buffer.pieces?.map((bf) => new Date(bf.insertedAt).getTime() @@ -369,8 +369,8 @@ export const test = { ) ) assert.equal( - oldestPieceInsertedAtDate.toISOString(), - message.oldestPieceInsertedAt + minPieceInsertedAtDate.toISOString(), + message.minPieceInsertedAt ) }, 'handles buffer queue messages successfully to queue aggregate and remaining buffer': @@ -575,7 +575,7 @@ export const test = { pieces: piecesBlock.cid, buffer: block.cid, group, - oldestPieceInsertedAt: new Date().toISOString(), + minPieceInsertedAt: new Date().toISOString(), } // Handle message @@ -618,7 +618,7 @@ export const test = { buffer: block.cid, pieces: piecesBlock.cid, group, - oldestPieceInsertedAt: new Date().toISOString(), + minPieceInsertedAt: new Date().toISOString(), } // Handle message @@ -667,7 +667,7 @@ export const test = { aggregate: aggregate.link, group, insertedAt: new Date().toISOString(), - oldestPieceInsertedAt: new Date().toISOString(), + minPieceInsertedAt: new Date().toISOString(), } const putAggregateRes = await context.aggregateStore.put(aggregateRecord) assert.ok(putAggregateRes.ok) @@ -735,7 +735,7 @@ export const test = { pieces: piecesBlock.cid, group, insertedAt: new Date().toISOString(), - oldestPieceInsertedAt: new Date().toISOString(), + minPieceInsertedAt: new Date().toISOString(), } const putAggregateRes = await context.aggregateStore.put( aggregateRecord, @@ -793,7 +793,7 @@ export const test = { pieces: piecesBlock.cid, group, insertedAt: new Date().toISOString(), - oldestPieceInsertedAt: new Date().toISOString(), + minPieceInsertedAt: new Date().toISOString(), } const putAggregateRes = await context.aggregateStore.put( aggregateRecord @@ -1167,7 +1167,7 @@ export const test = { pieces: blockPieces.cid, group, insertedAt: new Date().toISOString(), - oldestPieceInsertedAt: new Date().toISOString(), + minPieceInsertedAt: new Date().toISOString(), } const putAggregateRes = await context.aggregateStore.put(aggregateRecord) assert.ok(putAggregateRes.ok) @@ -1222,7 +1222,7 @@ export const test = { pieces: piecesBlock.cid, group, insertedAt: new Date().toISOString(), - oldestPieceInsertedAt: new Date().toISOString(), + minPieceInsertedAt: new Date().toISOString(), } const putAggregateRes = await context.aggregateStore.put( aggregateRecord diff --git a/packages/filecoin-api/test/services/aggregator.js b/packages/filecoin-api/test/services/aggregator.js index 7d45eabae..8ee6de30f 100644 --- a/packages/filecoin-api/test/services/aggregator.js +++ b/packages/filecoin-api/test/services/aggregator.js @@ -255,7 +255,7 @@ export const test = { buffer: block.cid, group, insertedAt: new Date().toISOString(), - oldestPieceInsertedAt: new Date().toISOString(), + minPieceInsertedAt: new Date().toISOString(), }) assert.ok(aggregatePutRes.ok)