Skip to content

Commit

Permalink
refactor(xrpc): replace dispatcher for fetchHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
matthieusieben committed May 13, 2024
1 parent 70bfea9 commit 38e8e67
Show file tree
Hide file tree
Showing 17 changed files with 219 additions and 233 deletions.
34 changes: 19 additions & 15 deletions packages/api/src/agent.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import { AtpClient } from './client'
import { BSKY_LABELER_DID } from './const'
import { AtpDispatcher } from './dispatcher/atp-dispatcher'
import { SessionManager, isSessionManager } from './session/session-manager'
import {
StatelessDispatcher,
StatelessDispatcherOptions,
} from './dispatcher/stateless-dispatcher'
StatelessSessionManager,
StatelessSessionManagerOptions,
} from './session/stateless-session-handler'
import { AtpAgentGlobalOpts, AtprotoServiceType } from './types'

const MAX_LABELERS = 10

export type AtpAgentOptions = SessionManager | StatelessSessionManagerOptions

export class AtpAgent {
/**
* The labelers to be used across all requests with the takedown capability
Expand All @@ -28,19 +30,21 @@ export class AtpAgent {
labelersHeader: string[] = []
proxyHeader?: string

readonly dispatcher: AtpDispatcher
readonly sessionManager: SessionManager

get com() {
return this.api.com
}

constructor(options: AtpDispatcher | StatelessDispatcherOptions) {
this.dispatcher =
options instanceof AtpDispatcher
? options
: new StatelessDispatcher(options)
constructor(options: AtpAgentOptions) {
this.sessionManager = isSessionManager(options)
? options
: new StatelessSessionManager(options)

this.api = new AtpClient(this.dispatcher)
this.api = new AtpClient((...args) =>
// The function needs to be "bound" to the right context
this.sessionManager.fetchHandler(...args),
)
this.api.setHeader('atproto-accept-labelers', () =>
// Make sure to read the static property from the subclass in case it was
// overridden.
Expand All @@ -54,7 +58,7 @@ export class AtpAgent {
}

clone() {
const inst = new AtpAgent(this.dispatcher)
const inst = new AtpAgent(this.sessionManager)
this.copyInto(inst)
return inst
}
Expand All @@ -70,16 +74,16 @@ export class AtpAgent {
return inst
}

/** @deprecated only used for a very particular use-case in the official Bluesky app */
async getServiceUrl(): Promise<URL> {
// Clone to prevent mutation of the original dispatcher's URL
return this.dispatcher.getServiceUrl()
return this.sessionManager.getServiceUrl()
}

/**
* Get the active session's DID
*/
async getDid(): Promise<string> {
return this.dispatcher.getDid()
return this.sessionManager.getDid()
}

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/bsky-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ declare global {

export class BskyAgent extends AtpAgent {
clone() {
const inst = new BskyAgent(this.dispatcher)
const inst = new BskyAgent(this.sessionManager)
this.copyInto(inst)
return inst
}
Expand Down
8 changes: 2 additions & 6 deletions packages/api/src/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
/**
* GENERATED CODE - DO NOT MODIFY
*/
import {
XrpcClient,
XrpcDispatcher,
XrpcDispatcherOptions,
} from '@atproto/xrpc'
import { XrpcClient, FetchHandler, FetchHandlerOptions } from '@atproto/xrpc'
import { schemas } from './lexicons'
import { CID } from 'multiformats/cid'
import * as ComAtprotoAdminDefs from './types/com/atproto/admin/defs'
Expand Down Expand Up @@ -369,7 +365,7 @@ export class AtpClient extends XrpcClient {
app: AppNS
tools: ToolsNS

constructor(options: XrpcDispatcher | XrpcDispatcherOptions) {
constructor(options: FetchHandler | FetchHandlerOptions) {
super(options, schemas)
this.com = new ComNS(this)
this.app = new AppNS(this)
Expand Down
6 changes: 0 additions & 6 deletions packages/api/src/dispatcher/atp-dispatcher.ts

This file was deleted.

3 changes: 0 additions & 3 deletions packages/api/src/dispatcher/index.ts

This file was deleted.

19 changes: 0 additions & 19 deletions packages/api/src/dispatcher/stateless-dispatcher.ts

This file was deleted.

2 changes: 1 addition & 1 deletion packages/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export * from './types'
export * from './const'
export * from './util'
export * from './client'
export * from './dispatcher'
export * from './session'
export * from './rich-text/rich-text'
export * from './rich-text/sanitization'
export * from './rich-text/unicode'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,24 @@ import {
AtpPersistSessionHandler,
AtpSessionData,
} from '../types'
import { AtpDispatcher } from './atp-dispatcher'
import { SessionManager } from './session-manager'

const ReadableStream = globalThis.ReadableStream as
| typeof globalThis.ReadableStream
| undefined

export type Fetch = (this: void, request: Request) => Promise<Response>
export interface SessionDispatcherOptions {
export interface AtpSessionManagerOptions {
service: string | URL
persistSession?: AtpPersistSessionHandler
fetch?: Fetch
}

/**
* An {@link XrpcDispatcher} that uses legacy "com.atproto.server" endpoints to
* A {@link SessionManager} that uses legacy "com.atproto.server" endpoints to
* manage sessions and route XRPC requests.
*/
export class SessionDispatcher extends AtpDispatcher {
export class AtpSessionManager implements SessionManager {
public serviceUrl: URL
public pdsUrl?: URL // The PDS URL, driven by the did doc
public session?: AtpSessionData
Expand All @@ -43,9 +43,7 @@ export class SessionDispatcher extends AtpDispatcher {
private persistSession?: AtpPersistSessionHandler
private refreshSessionPromise: Promise<void> | undefined

constructor(options: SessionDispatcherOptions) {
super((url, init) => this._dispatch(url, init))

constructor(options: AtpSessionManagerOptions) {
this.serviceUrl = new URL(options.service)
this.fetch = options.fetch || globalThis.fetch
this.setPersistSessionHandler(options.persistSession)
Expand All @@ -71,19 +69,15 @@ export class SessionDispatcher extends AtpDispatcher {
}

/**
* Internal fetch method that will be triggered by the XRPC Dispatcher (parent
* class). This method will:
* fetch method that will be triggered by the ApiClient. This method will:
* - Set the proper origin for the request (pds or service)
* - Add the proper auth headers to the request
* - Handle session refreshes
*
* @note We define this as a method on the prototype instead of inlining the
* function in the constructor for readability.
*/
protected async _dispatch(
url: string,
reqInit: RequestInit,
): Promise<Response> {
async fetchHandler(url: string, reqInit: RequestInit): Promise<Response> {
// wait for any active session-refreshes to finish
await this.refreshSessionPromise

Expand Down Expand Up @@ -251,11 +245,12 @@ export class SessionDispatcher extends AtpDispatcher {
): Promise<ComAtprotoServerGetSession.Response> {
try {
this.session = session
// For this particular call, we want this._dispatch() to be used in order
// to refresh the session if needed. To do so, we use a (new) AtpClient
// instance to build the HTTP request, and pass "this" as the dispatcher
// so that this._dispatch() is called.
const res = await new AtpClient(this).com.atproto.server.getSession()
// For this particular call, we want this.fetchHandler() to be used in
// order to refresh the session if needed. So let's create a new client
// instance with the right fetchHandler.
const res = await new AtpClient(
this.fetchHandler,
).com.atproto.server.getSession()
if (res.data.did !== this.session.did) {
throw new XRPCError(
ResponseType.InvalidRequest,
Expand Down
3 changes: 3 additions & 0 deletions packages/api/src/session/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from './session-manager'
export * from './atp-session-manager'
export * from './stateless-session-handler'
20 changes: 20 additions & 0 deletions packages/api/src/session/session-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
export interface SessionManager {
fetchHandler(url: string, reqInit: RequestInit): Promise<Response>
getDid(): string | PromiseLike<string>

/** @deprecated only used for a very particular use-case in the official Bluesky app */
getServiceUrl(): URL | PromiseLike<URL>
}

export function isSessionManager<T>(value: T): value is T & SessionManager {
return (
value !== null &&
typeof value === 'object' &&
'fetchHandler' in value &&
typeof value.fetchHandler === 'function' &&
'getDid' in value &&
typeof value.getDid === 'function' &&
'getServiceUrl' in value &&
typeof value.getServiceUrl === 'function'
)
}
43 changes: 43 additions & 0 deletions packages/api/src/session/stateless-session-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { SessionManager } from './session-manager'

export type StatelessSessionManagerOptions = {
service: string | URL
headers?: { [_ in string]?: null | string }
}

export class StatelessSessionManager implements SessionManager {
readonly serviceUrl: URL
readonly headers: Map<string, string>

constructor({ service, headers }: StatelessSessionManagerOptions) {
this.headers = new Map(
headers
? Object.entries(headers).filter(
<T extends [string, unknown]>(
e: T,
): e is T & [T[0], NonNullable<T[1]>] => e[1] != null,
)
: headers,
)
this.serviceUrl = new URL(service)
}

async getServiceUrl(): Promise<URL> {
return this.serviceUrl
}

async getDid(): Promise<string> {
throw new Error('Not logged in')
}

async fetchHandler(url: string, reqInit: RequestInit): Promise<Response> {
const fullUrl = new URL(url, this.serviceUrl)
const headers = new Headers(reqInit.headers)

for (const [key, value] of this.headers) {
if (value != null) headers.set(key, value)
}

return globalThis.fetch(fullUrl.toString(), { ...reqInit, headers })
}
}
28 changes: 14 additions & 14 deletions packages/dev-env/src/agent.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,36 @@
import AtpAgent, {
SessionDispatcher,
SessionDispatcherOptions,
AtpSessionManager,
AtpSessionManagerOptions,
} from '@atproto/api'
import { EXAMPLE_LABELER } from './const'

export class TestAgent extends AtpAgent {
readonly dispatcher: SessionDispatcher
readonly sessionManager: AtpSessionManager

constructor(options: SessionDispatcherOptions) {
const dispatcher = new SessionDispatcher(options)
super(dispatcher)
this.dispatcher = dispatcher
constructor(options: AtpSessionManagerOptions) {
const sessionManager = new AtpSessionManager(options)
super(sessionManager)
this.sessionManager = sessionManager
this.configureLabelersHeader([EXAMPLE_LABELER])
}

get session() {
return this.dispatcher.session
return this.sessionManager.session
}

get hasSession() {
return this.dispatcher.hasSession
return this.sessionManager.hasSession
}

get service() {
return this.dispatcher.serviceUrl
return this.sessionManager.serviceUrl
}

login(...args: Parameters<SessionDispatcher['login']>) {
return this.dispatcher.login(...args)
login(...args: Parameters<AtpSessionManager['login']>) {
return this.sessionManager.login(...args)
}

createAccount(...args: Parameters<SessionDispatcher['createAccount']>) {
return this.dispatcher.createAccount(...args)
createAccount(...args: Parameters<AtpSessionManager['createAccount']>) {
return this.sessionManager.createAccount(...args)
}
}
10 changes: 5 additions & 5 deletions packages/lex-cli/src/codegen/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ const indexTs = (
nsidTokens: Record<string, string[]>,
) =>
gen(project, '/index.ts', async (file) => {
//= import { XrpcClient, XrpcDispatcher, XrpcDispatcherOptions } from '@atproto/xrpc'
//= import { XrpcClient, FetchHandler, FetchHandlerOptions } from '@atproto/xrpc'
const xrpcImport = file.addImportDeclaration({
moduleSpecifier: '@atproto/xrpc',
})
xrpcImport.addNamedImports([
{ name: 'XrpcClient' },
{ name: 'XrpcDispatcher' },
{ name: 'XrpcDispatcherOptions' },
{ name: 'FetchHandler' },
{ name: 'FetchHandlerOptions' },
])
//= import {schemas} from './lexicons'
file
Expand Down Expand Up @@ -131,13 +131,13 @@ const indexTs = (
})
}

//= constructor (options: XrpcDispatcher | XrpcDispatcherOptions) {
//= constructor (options: FetchHandler | FetchHandlerOptions) {
//= super(options, schemas)
//= {namespace declarations}
//= }
atpClientCls.addConstructor({
parameters: [
{ name: 'options', type: 'XrpcDispatcher | XrpcDispatcherOptions' },
{ name: 'options', type: 'FetchHandler | FetchHandlerOptions' },
],
statements: [
'super(options, schemas)',
Expand Down
Loading

0 comments on commit 38e8e67

Please sign in to comment.