Skip to content

Commit

Permalink
feat: handle messages concurrently in pickup worker (#119)
Browse files Browse the repository at this point in the history
- switch to an sqs lib that polls for new messages concurrently rather
than in batches. **This is rad** as now we'll make better use of each
container!
- treat timeouts as a regular failure. Let the message go back on the
queue for another node to try. After 3 goes it'll go to the dead letter
queue and be marked as failed. This is fine, and simplifies the pickup
worker a lot, as it doesn't need to talk to dynamo or determine the
cause of an error.
- rewrite pickup worker so we can compose it out of
single-responsibility pieces instead of having to pass through the giant
config ball. _It's so much simpler now!_ You can figure our what it does
from it's parts! `sqsPoller` + `carFetcher` + `s3Uploader`

```js

  const pickup = createPickup({
    sqsPoller: createSqsPoller({
      queueUrl: SQS_QUEUE_URL,
      maxInFlight: BATCH_SIZE
    }),
    carFetcher: new CarFetcher({
      ipfsApiUrl: IPFS_API_URL,
      fetchTimeoutMs: TIMEOUT_FETCH
    }),
    s3Uploader: new S3Uploader({
      bucket: VALIDATION_BUCKET
    })
  })
```

see: https://github.com/PruvoNet/squiss-ts/

fixes #13 
fixes #116 
fixes #101 

License: MIT

---------

Signed-off-by: Oli Evans <[email protected]>
  • Loading branch information
olizilla authored Mar 14, 2023
1 parent 36f2c3a commit 92935f8
Show file tree
Hide file tree
Showing 22 changed files with 1,824 additions and 1,078 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ COPY package*.json ./
COPY pickup/package.json ./pickup/package.json
RUN npm ci -w pickup --no-audit
COPY . .
CMD [ "npm", "start", "-w", "pickup" ]
CMD [ "npm", "start", "-w", "pickup", "--silent"]
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,41 @@ Find the status of a pin
}
```

## Environment

Set the following in the pickup worker env to tune it's behavior

### `MAX_CAR_BYTES`

Maximum bytes size of a CAR that pickup will fetch. Caps the anmount of data we will pull in a single job.

**default: 31 GiB** _(33,285,996,544 bytes)_

### `FETCH_TIMEOUT_MS`

How long to wait for fetching a CAR before failing the job. Caps the amount of time we spend on a job.

**default: 4 hrs**

_2/3rs of home internet users can upload faster than 20Mbit/s (fixed broadband), at which 32GiB would transfer in 3.5hrs._

see: https://www.speedtest.net/global-index
see: https://www.omnicalculator.com/other/download-time?c=GBP&v=fileSize:32!gigabyte,downloadSpeed:5!megabit

### `FETCH_CHUNK_TIMEOUT_MS`

How long to wait between chunks of data before failing a CAR. Limit the amount of time we spend waiting of a stalled fetch.

**default: 2 min**

### `BATCH_SIZE`

How many pin requests to handle concurrently per worker.

Used to set both the concurrency per worker *and* the max number of messages each worker fetches from the queue in a single batch.

**default: 10**

## Getting Started

PR's are deployed automatically to `https://<pr#>.pickup.dag.haus`. The `main` branch is deployed to https://staging.pickup.dag.haus and staging builds are promoted to prod manually via the UI at https://console.seed.run/dag-house/pickup
Expand Down
Loading

0 comments on commit 92935f8

Please sign in to comment.