Skip to content

Commit

Permalink
PDS: handle S3 upload timeout more gracefully (#2429)
Browse files Browse the repository at this point in the history
* pds: s3 upload timeout config, increase pds default

* pds: handle s3 upload timeout errors

* tidy
  • Loading branch information
devinivy authored Apr 19, 2024
1 parent fbc7e75 commit 1faf634
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 14 deletions.
38 changes: 29 additions & 9 deletions packages/aws/src/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { randomStr } from '@atproto/crypto'
import { CID } from 'multiformats/cid'
import stream from 'stream'

export type S3Config = { bucket: string } & Omit<
export type S3Config = { bucket: string; uploadTimeoutMs?: number } & Omit<
aws.S3ClientConfig,
'apiVersion'
>
Expand All @@ -16,13 +16,15 @@ export type S3Config = { bucket: string } & Omit<
export class S3BlobStore implements BlobStore {
private client: aws.S3
private bucket: string
private uploadTimeoutMs: number

constructor(
public did: string,
cfg: S3Config,
) {
const { bucket, ...rest } = cfg
const { bucket, uploadTimeoutMs, ...rest } = cfg
this.bucket = bucket
this.uploadTimeoutMs = uploadTimeoutMs ?? 10000
this.client = new aws.S3({
...rest,
apiVersion: '2006-03-01',
Expand Down Expand Up @@ -53,12 +55,13 @@ export class S3BlobStore implements BlobStore {

async putTemp(bytes: Uint8Array | stream.Readable): Promise<string> {
const key = this.genKey()
// @NOTE abort results in error from aws-sdk "Upload aborted." with name "AbortError"
const abortController = new AbortController()
const timeout = setTimeout(
() => abortController.abort('upload timed out'),
10000,
() => abortController.abort(),
this.uploadTimeoutMs,
)
await new Upload({
const upload = new Upload({
client: this.client,
params: {
Bucket: this.bucket,
Expand All @@ -67,8 +70,12 @@ export class S3BlobStore implements BlobStore {
},
// @ts-ignore native implementation fine in node >=15
abortController,
}).done()
clearTimeout(timeout)
})
try {
await upload.done()
} finally {
clearTimeout(timeout)
}
return key
}

Expand All @@ -89,14 +96,27 @@ export class S3BlobStore implements BlobStore {
cid: CID,
bytes: Uint8Array | stream.Readable,
): Promise<void> {
await new Upload({
// @NOTE abort results in error from aws-sdk "Upload aborted." with name "AbortError"
const abortController = new AbortController()
const timeout = setTimeout(
() => abortController.abort(),
this.uploadTimeoutMs,
)
const upload = new Upload({
client: this.client,
params: {
Bucket: this.bucket,
Body: bytes,
Key: this.getStoredPath(cid),
},
}).done()
// @ts-ignore native implementation fine in node >=15
abortController,
})
try {
await upload.done()
} finally {
clearTimeout(timeout)
}
}

async quarantine(cid: CID): Promise<void> {
Expand Down
22 changes: 17 additions & 5 deletions packages/pds/src/api/com/atproto/repo/uploadBlob.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { DAY } from '@atproto/common'
import { UpstreamTimeoutError } from '@atproto/xrpc-server'
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import { DAY } from '@atproto/common'
import { BlobMetadata } from '../../../../actor-store/blob/transactor'

export default function (server: Server, ctx: AppContext) {
server.com.atproto.repo.uploadBlob({
Expand All @@ -15,10 +17,20 @@ export default function (server: Server, ctx: AppContext) {
const blob = await ctx.actorStore.writeNoTransaction(
requester,
async (store) => {
const metadata = await store.repo.blob.uploadBlobAndGetMetadata(
input.encoding,
input.body,
)
let metadata: BlobMetadata
try {
metadata = await store.repo.blob.uploadBlobAndGetMetadata(
input.encoding,
input.body,
)
} catch (err) {
if (err?.['name'] === 'AbortError') {
throw new UpstreamTimeoutError(
'Upload timed out, please try again.',
)
}
throw err
}

return store.transact(async (actorTxn) => {
const blobRef =
Expand Down
2 changes: 2 additions & 0 deletions packages/pds/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export const envToCfg = (env: ServerEnvironment): ServerConfig => {
blobstoreCfg = {
provider: 's3',
bucket: env.blobstoreS3Bucket,
uploadTimeoutMs: env.blobstoreS3UploadTimeoutMs || 20000,
region: env.blobstoreS3Region,
endpoint: env.blobstoreS3Endpoint,
forcePathStyle: env.blobstoreS3ForcePathStyle,
Expand Down Expand Up @@ -305,6 +306,7 @@ export type S3BlobstoreConfig = {
region?: string
endpoint?: string
forcePathStyle?: boolean
uploadTimeoutMs?: number
credentials?: {
accessKeyId: string
secretAccessKey: string
Expand Down
2 changes: 2 additions & 0 deletions packages/pds/src/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export const readEnv = (): ServerEnvironment => {
blobstoreS3ForcePathStyle: envBool('PDS_BLOBSTORE_S3_FORCE_PATH_STYLE'),
blobstoreS3AccessKeyId: envStr('PDS_BLOBSTORE_S3_ACCESS_KEY_ID'),
blobstoreS3SecretAccessKey: envStr('PDS_BLOBSTORE_S3_SECRET_ACCESS_KEY'),
blobstoreS3UploadTimeoutMs: envInt('PDS_BLOBSTORE_S3_UPLOAD_TIMEOUT_MS'),
// disk
blobstoreDiskLocation: envStr('PDS_BLOBSTORE_DISK_LOCATION'),
blobstoreDiskTmpLocation: envStr('PDS_BLOBSTORE_DISK_TMP_LOCATION'),
Expand Down Expand Up @@ -143,6 +144,7 @@ export type ServerEnvironment = {
blobstoreS3ForcePathStyle?: boolean
blobstoreS3AccessKeyId?: string
blobstoreS3SecretAccessKey?: string
blobstoreS3UploadTimeoutMs?: number

// identity
didPlcUrl?: string
Expand Down
1 change: 1 addition & 0 deletions packages/pds/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ export class AppContext {
endpoint: cfg.blobstore.endpoint,
forcePathStyle: cfg.blobstore.forcePathStyle,
credentials: cfg.blobstore.credentials,
uploadTimeoutMs: cfg.blobstore.uploadTimeoutMs,
})
: DiskBlobStore.creator(
cfg.blobstore.location,
Expand Down

0 comments on commit 1faf634

Please sign in to comment.