diff --git a/packages/aws/src/s3.ts b/packages/aws/src/s3.ts index 46d82c0966e..81014d63f7d 100644 --- a/packages/aws/src/s3.ts +++ b/packages/aws/src/s3.ts @@ -5,7 +5,7 @@ import { randomStr } from '@atproto/crypto' import { CID } from 'multiformats/cid' import stream from 'stream' -export type S3Config = { bucket: string } & Omit< +export type S3Config = { bucket: string; uploadTimeoutMs?: number } & Omit< aws.S3ClientConfig, 'apiVersion' > @@ -16,13 +16,15 @@ export type S3Config = { bucket: string } & Omit< export class S3BlobStore implements BlobStore { private client: aws.S3 private bucket: string + private uploadTimeoutMs: number constructor( public did: string, cfg: S3Config, ) { - const { bucket, ...rest } = cfg + const { bucket, uploadTimeoutMs, ...rest } = cfg this.bucket = bucket + this.uploadTimeoutMs = uploadTimeoutMs ?? 10000 this.client = new aws.S3({ ...rest, apiVersion: '2006-03-01', @@ -53,12 +55,13 @@ export class S3BlobStore implements BlobStore { async putTemp(bytes: Uint8Array | stream.Readable): Promise { const key = this.genKey() + // @NOTE abort results in error from aws-sdk "Upload aborted." with name "AbortError" const abortController = new AbortController() const timeout = setTimeout( - () => abortController.abort('upload timed out'), - 10000, + () => abortController.abort(), + this.uploadTimeoutMs, ) - await new Upload({ + const upload = new Upload({ client: this.client, params: { Bucket: this.bucket, @@ -67,8 +70,12 @@ export class S3BlobStore implements BlobStore { }, // @ts-ignore native implementation fine in node >=15 abortController, - }).done() - clearTimeout(timeout) + }) + try { + await upload.done() + } finally { + clearTimeout(timeout) + } return key } @@ -89,14 +96,27 @@ export class S3BlobStore implements BlobStore { cid: CID, bytes: Uint8Array | stream.Readable, ): Promise { - await new Upload({ + // @NOTE abort results in error from aws-sdk "Upload aborted." with name "AbortError" + const abortController = new AbortController() + const timeout = setTimeout( + () => abortController.abort(), + this.uploadTimeoutMs, + ) + const upload = new Upload({ client: this.client, params: { Bucket: this.bucket, Body: bytes, Key: this.getStoredPath(cid), }, - }).done() + // @ts-ignore native implementation fine in node >=15 + abortController, + }) + try { + await upload.done() + } finally { + clearTimeout(timeout) + } } async quarantine(cid: CID): Promise { diff --git a/packages/pds/src/api/com/atproto/repo/uploadBlob.ts b/packages/pds/src/api/com/atproto/repo/uploadBlob.ts index 494a9fe0196..94b717381ca 100644 --- a/packages/pds/src/api/com/atproto/repo/uploadBlob.ts +++ b/packages/pds/src/api/com/atproto/repo/uploadBlob.ts @@ -1,6 +1,8 @@ +import { DAY } from '@atproto/common' +import { UpstreamTimeoutError } from '@atproto/xrpc-server' import { Server } from '../../../../lexicon' import AppContext from '../../../../context' -import { DAY } from '@atproto/common' +import { BlobMetadata } from '../../../../actor-store/blob/transactor' export default function (server: Server, ctx: AppContext) { server.com.atproto.repo.uploadBlob({ @@ -15,10 +17,20 @@ export default function (server: Server, ctx: AppContext) { const blob = await ctx.actorStore.writeNoTransaction( requester, async (store) => { - const metadata = await store.repo.blob.uploadBlobAndGetMetadata( - input.encoding, - input.body, - ) + let metadata: BlobMetadata + try { + metadata = await store.repo.blob.uploadBlobAndGetMetadata( + input.encoding, + input.body, + ) + } catch (err) { + if (err?.['name'] === 'AbortError') { + throw new UpstreamTimeoutError( + 'Upload timed out, please try again.', + ) + } + throw err + } return store.transact(async (actorTxn) => { const blobRef = diff --git a/packages/pds/src/config/config.ts b/packages/pds/src/config/config.ts index 355b02cab7d..fb6efbebbc6 100644 --- a/packages/pds/src/config/config.ts +++ b/packages/pds/src/config/config.ts @@ -55,6 +55,7 @@ export const envToCfg = (env: ServerEnvironment): ServerConfig => { blobstoreCfg = { provider: 's3', bucket: env.blobstoreS3Bucket, + uploadTimeoutMs: env.blobstoreS3UploadTimeoutMs || 20000, region: env.blobstoreS3Region, endpoint: env.blobstoreS3Endpoint, forcePathStyle: env.blobstoreS3ForcePathStyle, @@ -305,6 +306,7 @@ export type S3BlobstoreConfig = { region?: string endpoint?: string forcePathStyle?: boolean + uploadTimeoutMs?: number credentials?: { accessKeyId: string secretAccessKey: string diff --git a/packages/pds/src/config/env.ts b/packages/pds/src/config/env.ts index f7b9849a1a9..e94c84baeb9 100644 --- a/packages/pds/src/config/env.ts +++ b/packages/pds/src/config/env.ts @@ -33,6 +33,7 @@ export const readEnv = (): ServerEnvironment => { blobstoreS3ForcePathStyle: envBool('PDS_BLOBSTORE_S3_FORCE_PATH_STYLE'), blobstoreS3AccessKeyId: envStr('PDS_BLOBSTORE_S3_ACCESS_KEY_ID'), blobstoreS3SecretAccessKey: envStr('PDS_BLOBSTORE_S3_SECRET_ACCESS_KEY'), + blobstoreS3UploadTimeoutMs: envInt('PDS_BLOBSTORE_S3_UPLOAD_TIMEOUT_MS'), // disk blobstoreDiskLocation: envStr('PDS_BLOBSTORE_DISK_LOCATION'), blobstoreDiskTmpLocation: envStr('PDS_BLOBSTORE_DISK_TMP_LOCATION'), @@ -143,6 +144,7 @@ export type ServerEnvironment = { blobstoreS3ForcePathStyle?: boolean blobstoreS3AccessKeyId?: string blobstoreS3SecretAccessKey?: string + blobstoreS3UploadTimeoutMs?: number // identity didPlcUrl?: string diff --git a/packages/pds/src/context.ts b/packages/pds/src/context.ts index 7da2ac980ea..021fbe2e39a 100644 --- a/packages/pds/src/context.ts +++ b/packages/pds/src/context.ts @@ -107,6 +107,7 @@ export class AppContext { endpoint: cfg.blobstore.endpoint, forcePathStyle: cfg.blobstore.forcePathStyle, credentials: cfg.blobstore.credentials, + uploadTimeoutMs: cfg.blobstore.uploadTimeoutMs, }) : DiskBlobStore.creator( cfg.blobstore.location,