From 650f163b42583c4347e9eba1f4d0276527064ee0 Mon Sep 17 00:00:00 2001 From: Oli Evans Date: Wed, 3 Aug 2022 16:56:16 +0100 Subject: [PATCH] feat: add pickupBatch for batch processing (#15) adds an pickupBatch impl for handleMessageBatch so we can do more than 1 at a time see: #13 License: (Apache-2.0 AND MIT) Signed-off-by: Oli Evans --- pickup/index.js | 5 ++++- pickup/lib/consumer.js | 18 +++++------------- pickup/lib/pickup.js | 39 ++++++++++++++++++++++++++++++++++++++ pickup/test/pickup.test.js | 30 ++++++++++++++++++++++++++++- 4 files changed, 77 insertions(+), 15 deletions(-) diff --git a/pickup/index.js b/pickup/index.js index 1206a98..850ac6f 100644 --- a/pickup/index.js +++ b/pickup/index.js @@ -10,7 +10,10 @@ async function start () { ipfsApiUrl: IPFS_API_URL, queueUrl: SQS_QUEUE_URL }) - + app.on('message_received', msg => { + const { requestid, cid } = JSON.parse(msg.Body) + console.log(`Processing req: ${requestid} cid: ${cid}`) + }) app.start() console.log(`Pickup subscribed to ${SQS_QUEUE_URL}`) } diff --git a/pickup/lib/consumer.js b/pickup/lib/consumer.js index f536162..44f4928 100644 --- a/pickup/lib/consumer.js +++ b/pickup/lib/consumer.js @@ -1,8 +1,8 @@ import retry from 'p-retry' import { Consumer } from 'sqs-consumer' import { createS3Uploader } from './s3.js' -import { testIpfsApi, repoStat } from './ipfs.js' -import { pickup } from './pickup.js' +import { testIpfsApi } from './ipfs.js' +import { pickupBatch } from './pickup.js' export async function createConsumer ({ ipfsApiUrl, queueUrl, s3 }) { // throws if can't connect @@ -10,7 +10,7 @@ export async function createConsumer ({ ipfsApiUrl, queueUrl, s3 }) { const app = Consumer.create({ queueUrl, - batchSize: 1, // 1 to 10 + batchSize: 2, // 1 to 10 visibilityTimeout: 20, // seconds, how long to hide message from queue after reading. heartbeatInterval: 10, // seconds, must be lower than `visibilityTimeout`. how long before increasing the `visibilityTimeout` // allow 4hrs before timeout. 2/3rs of the world can upload faster than @@ -21,16 +21,8 @@ export async function createConsumer ({ ipfsApiUrl, queueUrl, s3 }) { // TODO: enforce 32GiB limit // TODO: monitor throughput and bail early if stalled. handleMessageTimeout: 4 * 60 * 60 * 1000, // ms, error if processing takes longer than this. - handleMessage: async (message) => { - const { cid, origins, bucket, key, requestid } = JSON.parse(message.Body) - console.log(`Fetching req: ${requestid} cid: ${cid}`) - await pickup({ - upload: createS3Uploader({ bucket, key, client: s3 }), - ipfsApiUrl, - origins, - cid - }) - console.log(await repoStat(ipfsApiUrl)) + handleMessageBatch: async (messages) => { + return pickupBatch(messages, { ipfsApiUrl, createS3Uploader, s3 }) } }) diff --git a/pickup/lib/pickup.js b/pickup/lib/pickup.js index 3c06800..8daed31 100644 --- a/pickup/lib/pickup.js +++ b/pickup/lib/pickup.js @@ -12,3 +12,42 @@ export async function pickup ({ upload, ipfsApiUrl, cid, origins }) { } return { cid, origins } } + +/** + * Fetch CARs for a batch of SQS messages. + * @param {import('sqs-consumer').SQSMessage[]} messages + * @param {Object} opts + * @param {string} opts.ipfsApiUrl + * @param {Function} opts.createS3Uploader + * @param {import('@aws-sdk/client-s3'.S3Client)} opts.s3 + * @returns {Promise} + */ +export async function pickupBatch (messages, { ipfsApiUrl, createS3Uploader, s3 }) { + const jobs = [] + const allOrigins = [] + for (const message of messages) { + const { cid, origins, bucket, key, requestid } = JSON.parse(message.Body) + jobs.push({ message, requestid, cid, upload: createS3Uploader({ bucket, key, client: s3 }) }) + allOrigins.concat(origins) + } + + // Prepare! + await Promise.all([ + waitForGC(ipfsApiUrl), + connectTo(allOrigins, ipfsApiUrl) + ]) + + // Do! + const res = await Promise.allSettled(jobs.map(async job => { + const { message, cid, upload } = job + const body = await fetchCar(cid, ipfsApiUrl) + await upload({ body }) + return message // hand back msg so we can ack all that succeded + })) + + // Clear! + await disconnect(allOrigins, ipfsApiUrl) + + // return the set of messages that were handled + return res.filter(r => r.status === 'fulfilled').map(r => r.value) +} diff --git a/pickup/test/pickup.test.js b/pickup/test/pickup.test.js index c3d519d..c5e6cb5 100644 --- a/pickup/test/pickup.test.js +++ b/pickup/test/pickup.test.js @@ -1,7 +1,7 @@ import { GetObjectCommand } from '@aws-sdk/client-s3' import { unpackStream } from 'ipfs-car/unpack' import { createS3Uploader } from '../lib/s3.js' -import { pickup } from '../lib/pickup.js' +import { pickup, pickupBatch } from '../lib/pickup.js' import { Buffer } from 'buffer' import test from 'ava' import { compose } from './_compose.js' @@ -81,6 +81,34 @@ test('with bad origins', async t => { t.pass() }) +test('pickupBatch', async t => { + const { s3, createBucket, ipfsApiUrl } = t.context + const bucket = await createBucket() + const cids = [ + 'bafkreifzjut3te2nhyekklss27nh3k72ysco7y32koao5eei66wof36n5e', + 'bafkreig6ylslysmsgffjzgsrxpmftynqqg3uc6ebrrj4dhiy233wd5oyaq', + 'bad' + ] + const msgs = cids.map((cid, i) => ({ + Body: JSON.stringify({ + cid, + bucket, + key: `batch/${cid}.car`, + requestid: `#${i}` + }) + })) + + const res = await pickupBatch(msgs, { createS3Uploader, s3, ipfsApiUrl }) + + t.is(res.length, 2) + const sorted = res.map(msg => JSON.parse(msg.Body)).sort() + for (let i = 0; i < sorted.length; i++) { + t.is(sorted[i].cid, cids[i]) + const res = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: `batch/${cids[i]}.car` })) + t.is(res.$metadata.httpStatusCode, 200) + } +}) + async function resToFiles (res) { const files = [] for await (const file of unpackStream(res.Body)) {