diff --git a/.changeset/rude-timers-matter.md b/.changeset/rude-timers-matter.md new file mode 100644 index 00000000..790beb6a --- /dev/null +++ b/.changeset/rude-timers-matter.md @@ -0,0 +1,6 @@ +--- +'@powersync/service-core': minor +'@powersync/service-image': minor +--- + +Support client_id parameter and User-Agent headers. diff --git a/packages/service-core/src/routes/configure-rsocket.ts b/packages/service-core/src/routes/configure-rsocket.ts index a4535774..93f650a1 100644 --- a/packages/service-core/src/routes/configure-rsocket.ts +++ b/packages/service-core/src/routes/configure-rsocket.ts @@ -23,7 +23,7 @@ export function configureRSocket(router: ReactiveSocketRouter, options: router.applyWebSocketEndpoints(server, { contextProvider: async (data: Buffer) => { - const { token } = RSocketContextMeta.decode(deserialize(data) as any); + const { token, user_agent } = RSocketContextMeta.decode(deserialize(data) as any); if (!token) { throw new errors.AuthorizationError('No token provided'); @@ -38,6 +38,7 @@ export function configureRSocket(router: ReactiveSocketRouter, options: } return { token, + user_agent, ...context, token_errors: token_errors, system diff --git a/packages/service-core/src/routes/endpoints/checkpointing.ts b/packages/service-core/src/routes/endpoints/checkpointing.ts index a2664ed9..cb45c582 100644 --- a/packages/service-core/src/routes/endpoints/checkpointing.ts +++ b/packages/service-core/src/routes/endpoints/checkpointing.ts @@ -5,7 +5,9 @@ import * as util from '../../util/util-index.js'; import { authUser } from '../auth.js'; import { routeDefinition } from '../router.js'; -const WriteCheckpointRequest = t.object({}); +const WriteCheckpointRequest = t.object({ + client_id: t.string.optional() +}); export const writeCheckpoint = routeDefinition({ path: '/write-checkpoint.json', @@ -30,8 +32,10 @@ export const writeCheckpoint2 = routeDefinition({ validator: schema.createTsCodecValidator(WriteCheckpointRequest, { allowAdditional: true }), handler: async (payload) => { const { user_id, system } = payload.context; + const client_id = payload.params.client_id; + const full_user_id = util.checkpointUserId(user_id, client_id); const storage = system.storage; - const write_checkpoint = await util.createWriteCheckpoint(system.requirePgPool(), storage, user_id!); + const write_checkpoint = await util.createWriteCheckpoint(system.requirePgPool(), storage, full_user_id); return { write_checkpoint: String(write_checkpoint) }; diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index 4ae52def..8759ef35 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -124,6 +124,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => disposer(); logger.info(`Sync stream complete`, { user_id: syncParams.user_id, + user_agent: context.user_agent, operations_synced: tracker.operationsSynced, data_synced_bytes: tracker.dataSyncedBytes }); diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index 404bdb26..4e614be8 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -21,6 +21,9 @@ export const syncStreamed = routeDefinition({ validator: schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }), handler: async (payload) => { const system = payload.context.system; + const headers = payload.request.headers; + const userAgent = headers['x-user-agent'] ?? headers['user-agent']; + const clientId = payload.params.client_id; if (system.closed) { throw new errors.JourneyError({ @@ -92,6 +95,8 @@ export const syncStreamed = routeDefinition({ Metrics.getInstance().concurrent_connections.add(-1); logger.info(`Sync stream complete`, { user_id: syncParams.user_id, + client_id: clientId, + user_agent: userAgent, operations_synced: tracker.operationsSynced, data_synced_bytes: tracker.dataSyncedBytes }); diff --git a/packages/service-core/src/routes/router-socket.ts b/packages/service-core/src/routes/router-socket.ts index 31b8a3ad..5efea66d 100644 --- a/packages/service-core/src/routes/router-socket.ts +++ b/packages/service-core/src/routes/router-socket.ts @@ -9,5 +9,6 @@ import { Context } from './router.js'; export type SocketRouteGenerator = (router: ReactiveSocketRouter) => IReactiveStream; export const RSocketContextMeta = t.object({ - token: t.string + token: t.string, + user_agent: t.string.optional() }); diff --git a/packages/service-core/src/routes/router.ts b/packages/service-core/src/routes/router.ts index ac2a5f52..59a50f49 100644 --- a/packages/service-core/src/routes/router.ts +++ b/packages/service-core/src/routes/router.ts @@ -11,6 +11,10 @@ export type Context = { token_payload?: auth.JwtPayload; token_errors?: string[]; + /** + * Only on websocket endpoints. + */ + user_agent?: string; }; export type BasicRouterRequest = { diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index 8f98f991..77b31eae 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -93,7 +93,8 @@ async function* streamResponseInner( } } - const stream = storage.watchWriteCheckpoint(syncParams.token_parameters.user_id as string, signal); + const checkpointUserId = util.checkpointUserId(syncParams.token_parameters.user_id as string, params.client_id); + const stream = storage.watchWriteCheckpoint(checkpointUserId, signal); for await (const next of stream) { const { base, writeCheckpoint } = next; const checkpoint = base.checkpoint; diff --git a/packages/service-core/src/util/protocol-types.ts b/packages/service-core/src/util/protocol-types.ts index e577a60f..a4bcbb8c 100644 --- a/packages/service-core/src/util/protocol-types.ts +++ b/packages/service-core/src/util/protocol-types.ts @@ -94,7 +94,12 @@ export const StreamingSyncRequest = t.object({ /** * Client parameters to be passed to the sync rules. */ - parameters: t.record(t.any).optional() + parameters: t.record(t.any).optional(), + + /** + * Unique client id. + */ + client_id: t.string.optional() }); export type StreamingSyncRequest = t.Decoded; diff --git a/packages/service-core/src/util/utils.ts b/packages/service-core/src/util/utils.ts index 5e45fed5..68fd0a80 100644 --- a/packages/service-core/src/util/utils.ts +++ b/packages/service-core/src/util/utils.ts @@ -120,3 +120,13 @@ export async function createWriteCheckpoint( logger.info(`Write checkpoint 2: ${JSON.stringify({ lsn, id: String(id) })}`); return id; } + +export function checkpointUserId(user_id: string | undefined, client_id: string | undefined) { + if (user_id == null) { + throw new Error('user_id is required'); + } + if (client_id == null) { + return user_id; + } + return `${user_id}/${client_id}`; +} diff --git a/service/src/runners/server.ts b/service/src/runners/server.ts index 386d7da4..1c32da9e 100644 --- a/service/src/runners/server.ts +++ b/service/src/runners/server.ts @@ -18,7 +18,7 @@ export async function startServer(runnerConfig: core.utils.RunnerConfig) { server.register(cors, { origin: '*', - allowedHeaders: ['Content-Type', 'Authorization'], + allowedHeaders: ['Content-Type', 'Authorization', 'User-Agent', 'X-User-Agent'], exposedHeaders: ['Content-Type'], // Cache time for preflight response maxAge: 3600