Skip to content

Commit

Permalink
feat: add pickupBatch for batch processing (#15)
Browse files Browse the repository at this point in the history
adds an pickupBatch impl for handleMessageBatch so we can do more than 1 at a time

see: #13

License: (Apache-2.0 AND MIT)
Signed-off-by: Oli Evans <[email protected]>
  • Loading branch information
olizilla authored Aug 3, 2022
1 parent 215ec79 commit 650f163
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 15 deletions.
5 changes: 4 additions & 1 deletion pickup/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ async function start () {
ipfsApiUrl: IPFS_API_URL,
queueUrl: SQS_QUEUE_URL
})

app.on('message_received', msg => {
const { requestid, cid } = JSON.parse(msg.Body)
console.log(`Processing req: ${requestid} cid: ${cid}`)
})
app.start()
console.log(`Pickup subscribed to ${SQS_QUEUE_URL}`)
}
Expand Down
18 changes: 5 additions & 13 deletions pickup/lib/consumer.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import retry from 'p-retry'
import { Consumer } from 'sqs-consumer'
import { createS3Uploader } from './s3.js'
import { testIpfsApi, repoStat } from './ipfs.js'
import { pickup } from './pickup.js'
import { testIpfsApi } from './ipfs.js'
import { pickupBatch } from './pickup.js'

export async function createConsumer ({ ipfsApiUrl, queueUrl, s3 }) {
// throws if can't connect
await retry(() => testIpfsApi(ipfsApiUrl), { maxRetryTime: 1000 * 5 })

const app = Consumer.create({
queueUrl,
batchSize: 1, // 1 to 10
batchSize: 2, // 1 to 10
visibilityTimeout: 20, // seconds, how long to hide message from queue after reading.
heartbeatInterval: 10, // seconds, must be lower than `visibilityTimeout`. how long before increasing the `visibilityTimeout`
// allow 4hrs before timeout. 2/3rs of the world can upload faster than
Expand All @@ -21,16 +21,8 @@ export async function createConsumer ({ ipfsApiUrl, queueUrl, s3 }) {
// TODO: enforce 32GiB limit
// TODO: monitor throughput and bail early if stalled.
handleMessageTimeout: 4 * 60 * 60 * 1000, // ms, error if processing takes longer than this.
handleMessage: async (message) => {
const { cid, origins, bucket, key, requestid } = JSON.parse(message.Body)
console.log(`Fetching req: ${requestid} cid: ${cid}`)
await pickup({
upload: createS3Uploader({ bucket, key, client: s3 }),
ipfsApiUrl,
origins,
cid
})
console.log(await repoStat(ipfsApiUrl))
handleMessageBatch: async (messages) => {
return pickupBatch(messages, { ipfsApiUrl, createS3Uploader, s3 })
}
})

Expand Down
39 changes: 39 additions & 0 deletions pickup/lib/pickup.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,42 @@ export async function pickup ({ upload, ipfsApiUrl, cid, origins }) {
}
return { cid, origins }
}

/**
* Fetch CARs for a batch of SQS messages.
* @param {import('sqs-consumer').SQSMessage[]} messages
* @param {Object} opts
* @param {string} opts.ipfsApiUrl
* @param {Function} opts.createS3Uploader
* @param {import('@aws-sdk/client-s3'.S3Client)} opts.s3
* @returns {Promise<SQSMessage[]>}
*/
export async function pickupBatch (messages, { ipfsApiUrl, createS3Uploader, s3 }) {
const jobs = []
const allOrigins = []
for (const message of messages) {
const { cid, origins, bucket, key, requestid } = JSON.parse(message.Body)
jobs.push({ message, requestid, cid, upload: createS3Uploader({ bucket, key, client: s3 }) })
allOrigins.concat(origins)
}

// Prepare!
await Promise.all([
waitForGC(ipfsApiUrl),
connectTo(allOrigins, ipfsApiUrl)
])

// Do!
const res = await Promise.allSettled(jobs.map(async job => {
const { message, cid, upload } = job
const body = await fetchCar(cid, ipfsApiUrl)
await upload({ body })
return message // hand back msg so we can ack all that succeded
}))

// Clear!
await disconnect(allOrigins, ipfsApiUrl)

// return the set of messages that were handled
return res.filter(r => r.status === 'fulfilled').map(r => r.value)
}
30 changes: 29 additions & 1 deletion pickup/test/pickup.test.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { GetObjectCommand } from '@aws-sdk/client-s3'
import { unpackStream } from 'ipfs-car/unpack'
import { createS3Uploader } from '../lib/s3.js'
import { pickup } from '../lib/pickup.js'
import { pickup, pickupBatch } from '../lib/pickup.js'
import { Buffer } from 'buffer'
import test from 'ava'
import { compose } from './_compose.js'
Expand Down Expand Up @@ -81,6 +81,34 @@ test('with bad origins', async t => {
t.pass()
})

test('pickupBatch', async t => {
const { s3, createBucket, ipfsApiUrl } = t.context
const bucket = await createBucket()
const cids = [
'bafkreifzjut3te2nhyekklss27nh3k72ysco7y32koao5eei66wof36n5e',
'bafkreig6ylslysmsgffjzgsrxpmftynqqg3uc6ebrrj4dhiy233wd5oyaq',
'bad'
]
const msgs = cids.map((cid, i) => ({
Body: JSON.stringify({
cid,
bucket,
key: `batch/${cid}.car`,
requestid: `#${i}`
})
}))

const res = await pickupBatch(msgs, { createS3Uploader, s3, ipfsApiUrl })

t.is(res.length, 2)
const sorted = res.map(msg => JSON.parse(msg.Body)).sort()
for (let i = 0; i < sorted.length; i++) {
t.is(sorted[i].cid, cids[i])
const res = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: `batch/${cids[i]}.car` }))
t.is(res.$metadata.httpStatusCode, 200)
}
})

async function resToFiles (res) {
const files = []
for await (const file of unpackStream(res.Body)) {
Expand Down

0 comments on commit 650f163

Please sign in to comment.