diff --git a/.changeset/rare-items-arrive.md b/.changeset/rare-items-arrive.md new file mode 100644 index 00000000000..8e0583a9d55 --- /dev/null +++ b/.changeset/rare-items-arrive.md @@ -0,0 +1,5 @@ +--- +"@atproto/pds": patch +--- + +Use a less cryptic error message when proxying fails diff --git a/.changeset/thirty-masks-watch.md b/.changeset/thirty-masks-watch.md new file mode 100644 index 00000000000..b8c486986d3 --- /dev/null +++ b/.changeset/thirty-masks-watch.md @@ -0,0 +1,5 @@ +--- +"@atproto/pds": patch +--- + +Allow retrying proxied requests diff --git a/packages/pds/src/config/config.ts b/packages/pds/src/config/config.ts index 2d901f58ec6..263d987577e 100644 --- a/packages/pds/src/config/config.ts +++ b/packages/pds/src/config/config.ts @@ -246,6 +246,10 @@ export const envToCfg = (env: ServerEnvironment): ServerConfig => { headersTimeout: env.proxyHeadersTimeout ?? 10e3, bodyTimeout: env.proxyBodyTimeout ?? 30e3, maxResponseSize: env.proxyMaxResponseSize ?? 10 * 1024 * 1024, // 10mb + maxRetries: + env.proxyMaxRetries != null && env.proxyMaxRetries > 0 + ? env.proxyMaxRetries + : 0, preferCompressed: env.proxyPreferCompressed ?? false, } @@ -414,6 +418,7 @@ export type ProxyConfig = { headersTimeout: number bodyTimeout: number maxResponseSize: number + maxRetries: number /** * When proxying requests that might get intercepted (for read-after-write) we diff --git a/packages/pds/src/config/env.ts b/packages/pds/src/config/env.ts index 0c44e1fdc8d..1d26d4c06a6 100644 --- a/packages/pds/src/config/env.ts +++ b/packages/pds/src/config/env.ts @@ -128,6 +128,7 @@ export const readEnv = (): ServerEnvironment => { proxyHeadersTimeout: envInt('PDS_PROXY_HEADERS_TIMEOUT'), proxyBodyTimeout: envInt('PDS_PROXY_BODY_TIMEOUT'), proxyMaxResponseSize: envInt('PDS_PROXY_MAX_RESPONSE_SIZE'), + proxyMaxRetries: envInt('PDS_PROXY_MAX_RETRIES'), proxyPreferCompressed: envBool('PDS_PROXY_PREFER_COMPRESSED'), } } @@ -254,5 +255,6 @@ export type ServerEnvironment = { proxyHeadersTimeout?: number proxyBodyTimeout?: number proxyMaxResponseSize?: number + proxyMaxRetries?: number proxyPreferCompressed?: boolean } diff --git a/packages/pds/src/context.ts b/packages/pds/src/context.ts index 49f55ed6bd5..633a48b46dc 100644 --- a/packages/pds/src/context.ts +++ b/packages/pds/src/context.ts @@ -63,7 +63,7 @@ export type AppContextOptions = { moderationAgent?: AtpAgent reportingAgent?: AtpAgent entrywayAgent?: AtpAgent - proxyAgent: undici.Agent + proxyAgent: undici.Dispatcher safeFetch: Fetch authProvider?: PdsOAuthProvider authVerifier: AuthVerifier @@ -90,7 +90,7 @@ export class AppContext { public moderationAgent: AtpAgent | undefined public reportingAgent: AtpAgent | undefined public entrywayAgent: AtpAgent | undefined - public proxyAgent: undici.Agent + public proxyAgent: undici.Dispatcher public safeFetch: Fetch public authVerifier: AuthVerifier public authProvider?: PdsOAuthProvider @@ -264,7 +264,7 @@ export class AppContext { }) // An agent for performing HTTP requests based on user provided URLs. - const proxyAgent = new undici.Agent({ + const proxyAgentBase = new undici.Agent({ allowH2: cfg.proxy.allowHTTP2, // This is experimental headersTimeout: cfg.proxy.headersTimeout, maxResponseSize: cfg.proxy.maxResponseSize, @@ -286,6 +286,14 @@ export class AppContext { lookup: cfg.proxy.disableSsrfProtection ? undefined : unicastLookup, }, }) + const proxyAgent = + cfg.proxy.maxRetries > 0 + ? new undici.RetryAgent(proxyAgentBase, { + statusCodes: [], // Only retry on socket errors + methods: ['GET', 'HEAD'], + maxRetries: cfg.proxy.maxRetries, + }) + : proxyAgentBase // A fetch() function that protects against SSRF attacks, large responses & // known bad domains. This function can safely be used to fetch user diff --git a/packages/pds/src/pipethrough.ts b/packages/pds/src/pipethrough.ts index 285320d22c6..b326d5fba8b 100644 --- a/packages/pds/src/pipethrough.ts +++ b/packages/pds/src/pipethrough.ts @@ -357,9 +357,9 @@ async function pipethroughRequest( function handleUpstreamRequestError( err: unknown, - message = 'pipethrough network error', + message = 'Upstream service unreachable', ): never { - httpLogger.warn({ err }, message) + httpLogger.error({ err }, message) throw new XRPCServerError(ResponseType.UpstreamFailure, message, undefined, { cause: err, }) @@ -520,18 +520,11 @@ async function tryParsingError( } } -export async function bufferUpstreamResponse( +async function bufferUpstreamResponse( readable: Readable, contentEncoding?: string | string[], ): Promise { try { - // Needed for type-safety (should never happen irl) - if (Array.isArray(contentEncoding)) { - throw new TypeError( - 'upstream service returned multiple content-encoding headers', - ) - } - return await streamToNodeBuffer(decodeStream(readable, contentEncoding)) } catch (err) { if (!readable.destroyed) readable.destroy() @@ -561,7 +554,11 @@ export async function asPipeThroughBuffer( // Response parsing/forwarding // ------------------- -const RES_HEADERS_TO_FORWARD = ['atproto-repo-rev', 'atproto-content-labelers'] +const RES_HEADERS_TO_FORWARD = [ + 'atproto-repo-rev', + 'atproto-content-labelers', + 'retry-after', +] function* responseHeaders( headers: IncomingHttpHeaders, @@ -584,7 +581,11 @@ function* responseHeaders( for (let i = 0; i < RES_HEADERS_TO_FORWARD.length; i++) { const name = RES_HEADERS_TO_FORWARD[i] const val = headers[name] - if (typeof val === 'string') yield [name, val] + + if (val != null) { + const value: string = Array.isArray(val) ? val.join(',') : val + yield [name, value] + } } } diff --git a/packages/pds/tests/proxied/proxy-catchall.test.ts b/packages/pds/tests/proxied/proxy-catchall.test.ts index e6726d79813..7ed68d1ae30 100644 --- a/packages/pds/tests/proxied/proxy-catchall.test.ts +++ b/packages/pds/tests/proxied/proxy-catchall.test.ts @@ -116,7 +116,7 @@ describe('proxy header', () => { for (const lex of lexicons) client.lex.add(lex) await expect(client.call('com.example.ok')).rejects.toThrow( - 'pipethrough network error', + 'Upstream service unreachable', ) })