From dff1846ad8b6ff5bb9e5fd8ff71f79df5bf79e4d Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 6 Feb 2024 15:53:44 +0100 Subject: [PATCH] feat: add support to prepend pieces while buffering to aggregate (#1301) We may need to at the implementation level inject pieces in the start of the Aggregate to overcome requirements of Boost when using PoDSI-formatted aggregate, where first bytes can be problematic for SP to grab in the start of the aggregate This allows implementer to easily pass an array within the lambda context to run, and easily drop it when needed without being hardcoded in the actual lib :) --- packages/filecoin-api/src/aggregator/api.ts | 1 + .../src/aggregator/buffer-reducing.js | 27 ++++--- .../filecoin-api/src/aggregator/events.js | 1 + .../filecoin-api/test/events/aggregator.js | 71 +++++++++++++++++++ 4 files changed, 92 insertions(+), 8 deletions(-) diff --git a/packages/filecoin-api/src/aggregator/api.ts b/packages/filecoin-api/src/aggregator/api.ts index aab00ffb8..8e2f7d37f 100644 --- a/packages/filecoin-api/src/aggregator/api.ts +++ b/packages/filecoin-api/src/aggregator/api.ts @@ -328,6 +328,7 @@ export interface AggregateConfig { maxAggregateSize: number minAggregateSize: number minUtilizationFactor: number + prependBufferedPieces?: BufferedPiece[] } // Enums diff --git a/packages/filecoin-api/src/aggregator/buffer-reducing.js b/packages/filecoin-api/src/aggregator/buffer-reducing.js index c1262d0ab..0bf74d9f9 100644 --- a/packages/filecoin-api/src/aggregator/buffer-reducing.js +++ b/packages/filecoin-api/src/aggregator/buffer-reducing.js @@ -152,12 +152,13 @@ export async function handleBufferReducingWithoutAggregate({ * Attempt to build an aggregate with buffered pieces within ranges. * * @param {BufferedPiece[]} bufferedPieces - * @param {object} sizes - * @param {number} sizes.maxAggregateSize - * @param {number} sizes.minAggregateSize - * @param {number} sizes.minUtilizationFactor + * @param {object} config + * @param {number} config.maxAggregateSize + * @param {number} config.minAggregateSize + * @param {number} config.minUtilizationFactor + * @param {BufferedPiece[]} [config.prependBufferedPieces] */ -export function aggregatePieces(bufferedPieces, sizes) { +export function aggregatePieces(bufferedPieces, config) { // Guarantee buffered pieces total size is bigger than the minimum utilization const bufferUtilizationSize = bufferedPieces.reduce((total, p) => { const piece = Piece.fromLink(p.piece) @@ -166,14 +167,14 @@ export function aggregatePieces(bufferedPieces, sizes) { }, 0n) if ( bufferUtilizationSize < - sizes.maxAggregateSize / sizes.minUtilizationFactor + config.maxAggregateSize / config.minUtilizationFactor ) { return } // Create builder with maximum size and try to fill it up const builder = Aggregate.createBuilder({ - size: Aggregate.Size.from(sizes.maxAggregateSize), + size: Aggregate.Size.from(config.maxAggregateSize), }) // add pieces to an aggregate until there is no more space, or no more pieces @@ -182,6 +183,16 @@ export function aggregatePieces(bufferedPieces, sizes) { /** @type {BufferedPiece[]} */ const remainingBufferedPieces = [] + // start by adding prepend buffered pieces if available + for (const bufferedPiece of (config.prependBufferedPieces || [])) { + const p = Piece.fromLink(bufferedPiece.piece) + if (builder.estimate(p).error) { + throw new Error('aggregate builder is not able to create aggregates with only prepend buffered pieces') + } + builder.write(p) + addedBufferedPieces.push(bufferedPiece) + } + for (const bufferedPiece of bufferedPieces) { const p = Piece.fromLink(bufferedPiece.piece) if (builder.estimate(p).error) { @@ -196,7 +207,7 @@ export function aggregatePieces(bufferedPieces, sizes) { BigInt(builder.limit) * BigInt(Index.EntrySize) // If not enough space return undefined - if (totalUsedSpace < BigInt(sizes.minAggregateSize)) { + if (totalUsedSpace < BigInt(config.minAggregateSize)) { return } diff --git a/packages/filecoin-api/src/aggregator/events.js b/packages/filecoin-api/src/aggregator/events.js index 71a59f2ee..66b00c911 100644 --- a/packages/filecoin-api/src/aggregator/events.js +++ b/packages/filecoin-api/src/aggregator/events.js @@ -114,6 +114,7 @@ export const handleBufferQueueMessage = async (context, records) => { maxAggregateSize: context.config.maxAggregateSize, minAggregateSize: context.config.minAggregateSize, minUtilizationFactor: context.config.minUtilizationFactor, + prependBufferedPieces: context.config.prependBufferedPieces }) // Store buffered pieces if not enough to do aggregate and re-queue them diff --git a/packages/filecoin-api/test/events/aggregator.js b/packages/filecoin-api/test/events/aggregator.js index d2de21104..d83671367 100644 --- a/packages/filecoin-api/test/events/aggregator.js +++ b/packages/filecoin-api/test/events/aggregator.js @@ -397,6 +397,77 @@ export const test = { message.minPieceInsertedAt ) }, + 'handles buffer queue messages successfully to queue aggregate prepended with a buffer piece': async ( + assert, + context + ) => { + const group = context.id.did() + const { buffers, blocks } = await getBuffers(2, group, { + length: 100, + size: 128, + }) + + const [cargo] = await randomCargo(1, 128) + /** @type {import('../../src/aggregator/api.js').BufferedPiece} */ + const bufferedPiece = { + piece: cargo.link.link(), + policy: 0, + insertedAt: (new Date()).toISOString() + } + + const totalPieces = buffers.reduce((acc, v) => { + acc += v.pieces.length + return acc + }, 0) + + // Store buffers + for (let i = 0; i < blocks.length; i++) { + const putBufferRes = await context.bufferStore.put({ + buffer: buffers[i], + block: blocks[i].cid, + }) + assert.ok(putBufferRes.ok) + } + + // Handle messages + const handledMessageRes = await AggregatorEvents.handleBufferQueueMessage( + { + ...context, + config: { + minAggregateSize: 2 ** 19, + minUtilizationFactor: 10e5, + maxAggregateSize: 2 ** 35, + prependBufferedPieces: [bufferedPiece] + }, + }, + blocks.map((b) => ({ + pieces: b.cid, + group, + })) + ) + assert.ok(handledMessageRes.ok) + assert.equal(handledMessageRes.ok?.aggregatedPieces, totalPieces + 1) + + // Validate queue and store + await pWaitFor( + () => + context.queuedMessages.get('aggregateOfferQueue')?.length === 1 + ) + + /** @type {AggregateOfferMessage} */ + // @ts-expect-error cannot infer buffer message + const message = context.queuedMessages.get('aggregateOfferQueue')?.[0] + const bufferGet = await context.bufferStore.get(message.buffer) + assert.ok(bufferGet.ok) + assert.ok(bufferGet.ok?.block.equals(message.buffer)) + assert.equal(bufferGet.ok?.buffer.group, group) + assert.ok(message.aggregate.equals(bufferGet.ok?.buffer.aggregate)) + assert.equal(bufferGet.ok?.buffer.pieces.length, totalPieces + 1) + + // prepended piece + assert.ok(bufferGet.ok?.buffer.pieces.find(p => p.piece.link().equals(bufferedPiece.piece.link()))) + assert.ok(bufferGet.ok?.buffer.pieces[0].piece.link().equals(bufferedPiece.piece.link())) + }, 'handles buffer queue messages successfully to queue aggregate and remaining buffer': async (assert, context) => { const group = context.id.did()