Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow retrying proxied requests #2850

Merged
merged 6 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/rare-items-arrive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@atproto/pds": patch
---

Use a less cryptic error message when proxying fails
5 changes: 5 additions & 0 deletions .changeset/thirty-masks-watch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@atproto/pds": patch
---

Allow retrying proxied requests
5 changes: 5 additions & 0 deletions packages/pds/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ export const envToCfg = (env: ServerEnvironment): ServerConfig => {
headersTimeout: env.proxyHeadersTimeout ?? 10e3,
bodyTimeout: env.proxyBodyTimeout ?? 30e3,
maxResponseSize: env.proxyMaxResponseSize ?? 10 * 1024 * 1024, // 10mb
maxRetries:
env.proxyMaxRetries != null && env.proxyMaxRetries > 0
? env.proxyMaxRetries
: 0,
preferCompressed: env.proxyPreferCompressed ?? false,
}

Expand Down Expand Up @@ -414,6 +418,7 @@ export type ProxyConfig = {
headersTimeout: number
bodyTimeout: number
maxResponseSize: number
maxRetries: number

/**
* When proxying requests that might get intercepted (for read-after-write) we
Expand Down
2 changes: 2 additions & 0 deletions packages/pds/src/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ export const readEnv = (): ServerEnvironment => {
proxyHeadersTimeout: envInt('PDS_PROXY_HEADERS_TIMEOUT'),
proxyBodyTimeout: envInt('PDS_PROXY_BODY_TIMEOUT'),
proxyMaxResponseSize: envInt('PDS_PROXY_MAX_RESPONSE_SIZE'),
proxyMaxRetries: envInt('PDS_PROXY_MAX_RETRIES'),
proxyPreferCompressed: envBool('PDS_PROXY_PREFER_COMPRESSED'),
}
}
Expand Down Expand Up @@ -254,5 +255,6 @@ export type ServerEnvironment = {
proxyHeadersTimeout?: number
proxyBodyTimeout?: number
proxyMaxResponseSize?: number
proxyMaxRetries?: number
proxyPreferCompressed?: boolean
}
14 changes: 11 additions & 3 deletions packages/pds/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export type AppContextOptions = {
moderationAgent?: AtpAgent
reportingAgent?: AtpAgent
entrywayAgent?: AtpAgent
proxyAgent: undici.Agent
proxyAgent: undici.Dispatcher
safeFetch: Fetch
authProvider?: PdsOAuthProvider
authVerifier: AuthVerifier
Expand All @@ -90,7 +90,7 @@ export class AppContext {
public moderationAgent: AtpAgent | undefined
public reportingAgent: AtpAgent | undefined
public entrywayAgent: AtpAgent | undefined
public proxyAgent: undici.Agent
public proxyAgent: undici.Dispatcher
public safeFetch: Fetch
public authVerifier: AuthVerifier
public authProvider?: PdsOAuthProvider
Expand Down Expand Up @@ -264,7 +264,7 @@ export class AppContext {
})

// An agent for performing HTTP requests based on user provided URLs.
const proxyAgent = new undici.Agent({
const proxyAgentBase = new undici.Agent({
allowH2: cfg.proxy.allowHTTP2, // This is experimental
headersTimeout: cfg.proxy.headersTimeout,
maxResponseSize: cfg.proxy.maxResponseSize,
Expand All @@ -286,6 +286,14 @@ export class AppContext {
lookup: cfg.proxy.disableSsrfProtection ? undefined : unicastLookup,
},
})
const proxyAgent =
cfg.proxy.maxRetries > 0
? new undici.RetryAgent(proxyAgentBase, {
statusCodes: [], // Only retry on socket errors
methods: ['GET', 'HEAD'],
maxRetries: cfg.proxy.maxRetries,
})
: proxyAgentBase

// A fetch() function that protects against SSRF attacks, large responses &
// known bad domains. This function can safely be used to fetch user
Expand Down
25 changes: 13 additions & 12 deletions packages/pds/src/pipethrough.ts
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,9 @@ async function pipethroughRequest(

function handleUpstreamRequestError(
err: unknown,
message = 'pipethrough network error',
message = 'Upstream service unreachable',
): never {
httpLogger.warn({ err }, message)
httpLogger.error({ err }, message)
throw new XRPCServerError(ResponseType.UpstreamFailure, message, undefined, {
cause: err,
})
Expand Down Expand Up @@ -520,18 +520,11 @@ async function tryParsingError(
}
}

export async function bufferUpstreamResponse(
async function bufferUpstreamResponse(
readable: Readable,
contentEncoding?: string | string[],
): Promise<Buffer> {
try {
// Needed for type-safety (should never happen irl)
if (Array.isArray(contentEncoding)) {
throw new TypeError(
'upstream service returned multiple content-encoding headers',
)
}

return await streamToNodeBuffer(decodeStream(readable, contentEncoding))
} catch (err) {
if (!readable.destroyed) readable.destroy()
Expand Down Expand Up @@ -561,7 +554,11 @@ export async function asPipeThroughBuffer(
// Response parsing/forwarding
// -------------------

const RES_HEADERS_TO_FORWARD = ['atproto-repo-rev', 'atproto-content-labelers']
const RES_HEADERS_TO_FORWARD = [
'atproto-repo-rev',
'atproto-content-labelers',
'retry-after',
]

function* responseHeaders(
headers: IncomingHttpHeaders,
Expand All @@ -584,7 +581,11 @@ function* responseHeaders(
for (let i = 0; i < RES_HEADERS_TO_FORWARD.length; i++) {
const name = RES_HEADERS_TO_FORWARD[i]
const val = headers[name]
if (typeof val === 'string') yield [name, val]

if (val != null) {
const value: string = Array.isArray(val) ? val.join(',') : val
yield [name, value]
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/pds/tests/proxied/proxy-catchall.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ describe('proxy header', () => {
for (const lex of lexicons) client.lex.add(lex)

await expect(client.call('com.example.ok')).rejects.toThrow(
'pipethrough network error',
'Upstream service unreachable',
)
})

Expand Down