From 5724ce45246398ea7507b0d296321656a0439a9e Mon Sep 17 00:00:00 2001 From: daishi Date: Mon, 4 Dec 2023 15:43:24 +0900 Subject: [PATCH 1/7] wip: separate renderRSC --- packages/waku/src/lib/middleware/rsc/utils.ts | 34 --- .../src/lib/middleware/rsc/worker-impl.ts | 3 +- packages/waku/src/lib/rsc/renderer.ts | 281 ++++++++++++++++++ packages/waku/src/lib/utils/form.ts | 33 ++ 4 files changed, 316 insertions(+), 35 deletions(-) create mode 100644 packages/waku/src/lib/rsc/renderer.ts create mode 100644 packages/waku/src/lib/utils/form.ts diff --git a/packages/waku/src/lib/middleware/rsc/utils.ts b/packages/waku/src/lib/middleware/rsc/utils.ts index b5eaaa24d..1c27e9ea9 100644 --- a/packages/waku/src/lib/middleware/rsc/utils.ts +++ b/packages/waku/src/lib/middleware/rsc/utils.ts @@ -56,37 +56,3 @@ export const deepFreeze = (x: unknown): void => { } } }; - -// TODO is this correct? better to use a library? -export const parseFormData = (body: string, contentType: string) => { - const boundary = contentType.split('boundary=')[1]; - const parts = body.split(`--${boundary}`); - const formData = new FormData(); - for (const part of parts) { - if (part.trim() === '' || part === '--') continue; - const [rawHeaders, content] = part.split('\r\n\r\n', 2); - const headers = rawHeaders!.split('\r\n').reduce( - (acc, currentHeader) => { - const [key, value] = currentHeader.split(': '); - acc[key!.toLowerCase()] = value!; - return acc; - }, - {} as Record, - ); - const contentDisposition = headers['content-disposition']; - const nameMatch = /name="([^"]+)"/.exec(contentDisposition!); - const filenameMatch = /filename="([^"]+)"/.exec(contentDisposition!); - if (nameMatch) { - const name = nameMatch[1]; - if (filenameMatch) { - const filename = filenameMatch[1]; - const type = headers['content-type'] || 'application/octet-stream'; - const blob = new Blob([content!], { type }); - formData.append(name!, blob, filename); - } else { - formData.append(name!, content!.trim()); - } - } - } - return formData; -}; diff --git a/packages/waku/src/lib/middleware/rsc/worker-impl.ts b/packages/waku/src/lib/middleware/rsc/worker-impl.ts index 1cda597ea..d7a48b3c2 100644 --- a/packages/waku/src/lib/middleware/rsc/worker-impl.ts +++ b/packages/waku/src/lib/middleware/rsc/worker-impl.ts @@ -10,7 +10,8 @@ import type { RenderContext } from '../../../server.js'; import type { ResolvedConfig } from '../../../config.js'; import { viteInlineConfig } from '../../config.js'; import { normalizePath } from '../../utils/path.js'; -import { hasStatusCode, deepFreeze, parseFormData } from './utils.js'; +import { hasStatusCode, deepFreeze } from './utils.js'; +import { parseFormData } from '../../utils/form.js'; import type { MessageReq, MessageRes, RenderRequest } from './worker-api.js'; let nodeLoaderRegistered = false; diff --git a/packages/waku/src/lib/rsc/renderer.ts b/packages/waku/src/lib/rsc/renderer.ts new file mode 100644 index 000000000..59492c5bf --- /dev/null +++ b/packages/waku/src/lib/rsc/renderer.ts @@ -0,0 +1,281 @@ +import path from 'node:path'; // TODO no node dependency +import url from 'node:url'; // TODO no node dependency +import type { ReactNode } from 'react'; + +import { defineEntries } from '../../server.js'; +import type { RenderContext } from '../../server.js'; +import type { ResolvedConfig } from '../../config.js'; +import { normalizePath } from '../utils/path.js'; +import { parseFormData } from '../utils/form.js'; + +const loadRSDWServer = async ( + config: Omit, + isDev: boolean, +) => { + if (!isDev) { + return ( + await import( + url + .pathToFileURL( + path.join(config.rootDir, config.distDir, 'rsdw-server.js'), + ) + .toString() + ) + ).default; + } + return import('react-server-dom-webpack/server.edge'); +}; + +type Entries = { + default: ReturnType; +}; + +const getEntriesFile = ( + config: Omit, + isDev: boolean, +) => { + const filePath = path.join( + config.rootDir, + isDev ? config.srcDir : config.distDir, + config.entriesJs, + ); + return normalizePath( + isDev ? filePath : url.pathToFileURL(filePath).toString(), + ); +}; + +const resolveClientEntry = ( + filePath: string, + config: Omit, + isDev: boolean, +) => { + filePath = filePath.startsWith('file:///') + ? url.fileURLToPath(filePath) + : filePath; + const root = path.join( + config.rootDir, + isDev ? config.srcDir : config.distDir, + ); + if (!filePath.startsWith(root)) { + if (isDev) { + // HACK this relies on Vite's internal implementation detail. + return normalizePath( + config.basePath + '@fs/' + filePath.replace(/^\//, ''), + ); + } else { + throw new Error( + 'Resolving client module outside root is unsupported for now', + ); + } + } + return normalizePath(config.basePath + path.relative(root, filePath)); +}; + +// HACK Patching stream is very fragile. +const transformRsfId = (prefixToRemove: string) => { + const encoder = new TextEncoder(); + const decoder = new TextDecoder(); + let data = ''; + return new TransformStream({ + transform(chunk, controller) { + if (!(chunk instanceof Uint8Array)) { + throw new Error('Unknown chunk type'); + } + data += decoder.decode(chunk); + if (!data.endsWith('\n')) { + return; + } + const lines = data.split('\n'); + data = ''; + for (let i = 0; i < lines.length; ++i) { + const match = lines[i]!.match( + new RegExp( + `^([0-9]+):{"id":"(?:file:///?)?${prefixToRemove}(.*?)"(.*)$`, + ), + ); + if (match) { + lines[i] = `${match[1]}:{"id":"${match[2]}"${match[3]}`; + } + } + controller.enqueue(encoder.encode(lines.join('\n'))); + }, + }); +}; + +export async function renderRSC( + opts: { + config: Omit; + input: string; + method: 'GET' | 'POST'; + context: unknown; + body?: ReadableStream; + contentType?: string; + moduleIdCallback?: (id: string) => void; + } & ( + | { isDev: false } + | { isDev: true; customImport: (id: string) => Promise } + ), +): Promise { + const { + config, + input, + method, + contentType, + context, + body, + moduleIdCallback, + isDev, + } = opts; + const customImport = isDev ? opts.customImport : (id: string) => import(id); + + const { renderToReadableStream, decodeReply } = await loadRSDWServer( + config, + isDev, + ); + + const entriesFile = getEntriesFile(config, isDev); + const { + default: { renderEntries }, + } = await (customImport(entriesFile) as Promise); + + const rsfPrefix = + path.posix.join(config.rootDir, isDev ? config.srcDir : config.distDir) + + '/'; + + const render = async (renderContext: RenderContext, input: string) => { + const elements = await renderEntries.call(renderContext, input); + if (elements === null) { + const err = new Error('No function component found'); + (err as any).statusCode = 404; // HACK our convention for NotFound + throw err; + } + if (Object.keys(elements).some((key) => key.startsWith('_'))) { + throw new Error('"_" prefix is reserved'); + } + return elements; + }; + + const bundlerConfig = new Proxy( + {}, + { + get(_target, encodedId: string) { + const [filePath, name] = encodedId.split('#') as [string, string]; + const id = resolveClientEntry(filePath, config, isDev); + moduleIdCallback?.(id); + return { id, chunks: [id], name, async: true }; + }, + }, + ); + + if (method === 'POST') { + const rsfId = decodeURIComponent(input); + let args: unknown[] = []; + let bodyStr = ''; + if (body) { + const decoder = new TextDecoder(); + const reader = body.getReader(); + let result: ReadableStreamReadResult; + do { + result = await reader.read(); + if (result.value) { + if (!(result.value instanceof Uint8Array)) { + throw new Error('Unexepected buffer type'); + } + bodyStr += decoder.decode(result.value); + } + } while (!result.done); + } + if ( + typeof contentType === 'string' && + contentType.startsWith('multipart/form-data') + ) { + // XXX This doesn't support streaming unlike busboy + const formData = parseFormData(bodyStr, contentType); + args = await decodeReply(formData); + } else if (bodyStr) { + args = await decodeReply(bodyStr); + } + const [fileId, name] = rsfId.split('#') as [string, string]; + const filePath = fileId.startsWith('/') ? fileId : rsfPrefix + fileId; + const fname = isDev ? filePath : url.pathToFileURL(filePath).toString(); + const mod = await customImport(fname); + const fn = mod[name] || mod; + let elements: Promise> = Promise.resolve({}); + let rendered = false; + const rerender = (input: string) => { + if (rendered) { + throw new Error('already rendered'); + } + const renderContext: RenderContext = { rerender, context }; + elements = Promise.all([elements, render(renderContext, input)]).then( + ([oldElements, newElements]) => ({ ...oldElements, ...newElements }), + ); + }; + const renderContext: RenderContext = { rerender, context }; + const data = await fn.apply(renderContext, args); + const resolvedElements = await elements; + rendered = true; + return renderToReadableStream( + { ...resolvedElements, _value: data }, + bundlerConfig, + ).pipeThrough(transformRsfId(rsfPrefix)); + } + + // rr.method === 'GET' + const renderContext: RenderContext = { + rerender: () => { + throw new Error('Cannot rerender'); + }, + context, + }; + const elements = await render(renderContext, input); + return renderToReadableStream(elements, bundlerConfig).pipeThrough( + transformRsfId(rsfPrefix), + ); +} + +export async function getBuildConfigRSC(opts: { + config: Omit; +}) { + const { config } = opts; + + const entriesFile = getEntriesFile(config, false); + const { + default: { getBuildConfig }, + } = await (import(entriesFile) as Promise); + if (!getBuildConfig) { + console.warn( + "getBuildConfig is undefined. It's recommended for optimization and sometimes required.", + ); + return {}; + } + + const unstable_collectClientModules = async ( + input: string, + ): Promise => { + const idSet = new Set(); + const readable = await renderRSC({ + input, + method: 'GET', + config, + context: null, + moduleIdCallback: (id) => idSet.add(id), + isDev: false, + }); + await new Promise((resolve, reject) => { + const writable = new WritableStream({ + close() { + resolve(); + }, + abort(reason) { + reject(reason); + }, + }); + readable.pipeTo(writable); + }); + return Array.from(idSet); + }; + + const output = await getBuildConfig(unstable_collectClientModules); + return output; +} diff --git a/packages/waku/src/lib/utils/form.ts b/packages/waku/src/lib/utils/form.ts new file mode 100644 index 000000000..a33c9214a --- /dev/null +++ b/packages/waku/src/lib/utils/form.ts @@ -0,0 +1,33 @@ +// TODO is this correct? better to use a library? +export const parseFormData = (body: string, contentType: string) => { + const boundary = contentType.split('boundary=')[1]; + const parts = body.split(`--${boundary}`); + const formData = new FormData(); + for (const part of parts) { + if (part.trim() === '' || part === '--') continue; + const [rawHeaders, content] = part.split('\r\n\r\n', 2); + const headers = rawHeaders!.split('\r\n').reduce( + (acc, currentHeader) => { + const [key, value] = currentHeader.split(': '); + acc[key!.toLowerCase()] = value!; + return acc; + }, + {} as Record, + ); + const contentDisposition = headers['content-disposition']; + const nameMatch = /name="([^"]+)"/.exec(contentDisposition!); + const filenameMatch = /filename="([^"]+)"/.exec(contentDisposition!); + if (nameMatch) { + const name = nameMatch[1]; + if (filenameMatch) { + const filename = filenameMatch[1]; + const type = headers['content-type'] || 'application/octet-stream'; + const blob = new Blob([content!], { type }); + formData.append(name!, blob, filename); + } else { + formData.append(name!, content!.trim()); + } + } + } + return formData; +}; From b23275b0c549621e28b7dbfc6446aad468c20fd9 Mon Sep 17 00:00:00 2001 From: daishi Date: Mon, 4 Dec 2023 15:58:44 +0900 Subject: [PATCH 2/7] wip: use new renderRSC --- .../src/lib/middleware/rsc/worker-impl.ts | 286 ++---------------- 1 file changed, 24 insertions(+), 262 deletions(-) diff --git a/packages/waku/src/lib/middleware/rsc/worker-impl.ts b/packages/waku/src/lib/middleware/rsc/worker-impl.ts index d7a48b3c2..e4fd16def 100644 --- a/packages/waku/src/lib/middleware/rsc/worker-impl.ts +++ b/packages/waku/src/lib/middleware/rsc/worker-impl.ts @@ -1,35 +1,15 @@ -import path from 'node:path'; // TODO no node dependency import url from 'node:url'; // TODO no node dependency import { parentPort } from 'node:worker_threads'; // TODO no node dependency -import type { ReactNode } from 'react'; import type { ViteDevServer } from 'vite'; -import { defineEntries } from '../../../server.js'; -import type { RenderContext } from '../../../server.js'; -import type { ResolvedConfig } from '../../../config.js'; import { viteInlineConfig } from '../../config.js'; -import { normalizePath } from '../../utils/path.js'; import { hasStatusCode, deepFreeze } from './utils.js'; -import { parseFormData } from '../../utils/form.js'; import type { MessageReq, MessageRes, RenderRequest } from './worker-api.js'; +import { renderRSC, getBuildConfigRSC } from '../../rsc/renderer.js'; let nodeLoaderRegistered = false; -const loadRSDWServer = async ( - config: Omit, - command: 'dev' | 'build' | 'start', -) => { - if (command !== 'dev') { - return ( - await import( - url - .pathToFileURL( - path.join(config.rootDir, config.distDir, 'rsdw-server.js'), - ) - .toString() - ) - ).default; - } +const registerNodeLoader = async () => { if (!nodeLoaderRegistered) { nodeLoaderRegistered = true; const IS_NODE_20 = Number(process.versions.node.split('.')[0]) >= 20; @@ -40,18 +20,17 @@ const loadRSDWServer = async ( register('waku/node-loader', url.pathToFileURL('./')); } } - return import('react-server-dom-webpack/server.edge'); }; -type Entries = { - default: ReturnType; -}; const controllerMap = new Map(); const handleRender = async (mesg: MessageReq & { type: 'render' }) => { // eslint-disable-next-line @typescript-eslint/no-unused-vars const { id, type, hasModuleIdCallback, ...rest } = mesg; const rr: RenderRequest = rest; + if (rr.command === 'dev') { + await registerNodeLoader(); + } try { const stream = new ReadableStream({ start(controller) { @@ -65,7 +44,23 @@ const handleRender = async (mesg: MessageReq & { type: 'render' }) => { parentPort!.postMessage(mesg); }; } - const readable = await renderRSC(rr); + const readable = await renderRSC({ + config: rr.config, + input: rr.input, + method: rr.method, + context: rr.context, + body: rr.stream, + contentType: rr.headers['content-type'] as string, + ...(rr.moduleIdCallback ? { moduleIdCallback: rr.moduleIdCallback } : {}), + ...(rr.command === 'dev' + ? { + isDev: true, + customImport: loadServerFile, + } + : { + isDev: false, + }), + }); const mesg: MessageRes = { id, type: 'start', context: rr.context }; parentPort!.postMessage(mesg); deepFreeze(rr.context); @@ -103,7 +98,7 @@ const handleGetBuildConfig = async ( ) => { const { id, config } = mesg; try { - const output = await getBuildConfigRSC(config); + const output = await getBuildConfigRSC({ config }); const mesg: MessageRes = { id, type: 'buildConfig', output }; parentPort!.postMessage(mesg); } catch (err) { @@ -166,13 +161,7 @@ const shutdown = async () => { parentPort!.close(); }; -const loadServerFile = async ( - fname: string, - command: 'dev' | 'build' | 'start', -) => { - if (command !== 'dev') { - return import(fname); - } +const loadServerFile = async (fname: string) => { const vite = await getViteServer(); return vite.ssrLoadModule(fname); }; @@ -197,230 +186,3 @@ parentPort!.on('message', (mesg: MessageReq) => { controller.error(err); } }); - -const getEntriesFile = ( - config: Omit, - command: 'dev' | 'build' | 'start', -) => { - const filePath = path.join( - config.rootDir, - command === 'dev' ? config.srcDir : config.distDir, - config.entriesJs, - ); - return normalizePath( - command === 'dev' ? filePath : url.pathToFileURL(filePath).toString(), - ); -}; - -const resolveClientEntry = ( - filePath: string, - config: Omit, - command: 'dev' | 'build' | 'start', -) => { - filePath = filePath.startsWith('file:///') - ? url.fileURLToPath(filePath) - : filePath; - const root = path.join( - config.rootDir, - command === 'dev' ? config.srcDir : config.distDir, - ); - if (!filePath.startsWith(root)) { - if (command === 'dev') { - // HACK this relies on Vite's internal implementation detail. - return normalizePath( - config.basePath + '@fs/' + filePath.replace(/^\//, ''), - ); - } else { - throw new Error( - 'Resolving client module outside root is unsupported for now', - ); - } - } - return normalizePath(config.basePath + path.relative(root, filePath)); -}; - -// HACK Patching stream is very fragile. -const transformRsfId = (prefixToRemove: string) => { - const encoder = new TextEncoder(); - const decoder = new TextDecoder(); - let data = ''; - return new TransformStream({ - transform(chunk, controller) { - if (!(chunk instanceof Uint8Array)) { - throw new Error('Unknown chunk type'); - } - data += decoder.decode(chunk); - if (!data.endsWith('\n')) { - return; - } - const lines = data.split('\n'); - data = ''; - for (let i = 0; i < lines.length; ++i) { - const match = lines[i]!.match( - new RegExp( - `^([0-9]+):{"id":"(?:file:///?)?${prefixToRemove}(.*?)"(.*)$`, - ), - ); - if (match) { - lines[i] = `${match[1]}:{"id":"${match[2]}"${match[3]}`; - } - } - controller.enqueue(encoder.encode(lines.join('\n'))); - }, - }); -}; - -async function renderRSC(rr: RenderRequest): Promise { - const config = rr.config; - const { renderToReadableStream, decodeReply } = await loadRSDWServer( - config, - rr.command, - ); - - const entriesFile = getEntriesFile(config, rr.command); - const { - default: { renderEntries }, - } = await (loadServerFile(entriesFile, rr.command) as Promise); - - const rsfPrefix = - path.posix.join( - config.rootDir, - rr.command === 'dev' ? config.srcDir : config.distDir, - ) + '/'; - - const render = async (renderContext: RenderContext, input: string) => { - const elements = await renderEntries.call(renderContext, input); - if (elements === null) { - const err = new Error('No function component found'); - (err as any).statusCode = 404; // HACK our convention for NotFound - throw err; - } - if (Object.keys(elements).some((key) => key.startsWith('_'))) { - throw new Error('"_" prefix is reserved'); - } - return elements; - }; - - const bundlerConfig = new Proxy( - {}, - { - get(_target, encodedId: string) { - const [filePath, name] = encodedId.split('#') as [string, string]; - const id = resolveClientEntry(filePath, config, rr.command); - rr?.moduleIdCallback?.(id); - return { id, chunks: [id], name, async: true }; - }, - }, - ); - - if (rr.method === 'POST') { - const rsfId = decodeURIComponent(rr.input); - let args: unknown[] = []; - const contentType = rr.headers['content-type']; - let body = ''; - if (rr.stream) { - const decoder = new TextDecoder(); - const reader = rr.stream.getReader(); - let result: ReadableStreamReadResult; - do { - result = await reader.read(); - if (result.value) { - if (!(result.value instanceof Uint8Array)) { - throw new Error('Unexepected buffer type'); - } - body += decoder.decode(result.value); - } - } while (!result.done); - } - if ( - typeof contentType === 'string' && - contentType.startsWith('multipart/form-data') - ) { - // XXX This doesn't support streaming unlike busboy - const formData = parseFormData(body, contentType); - args = await decodeReply(formData); - } else if (body) { - args = await decodeReply(body); - } - const [fileId, name] = rsfId.split('#') as [string, string]; - const filePath = fileId.startsWith('/') ? fileId : rsfPrefix + fileId; - const fname = - rr.command === 'dev' ? filePath : url.pathToFileURL(filePath).toString(); - const mod = await loadServerFile(fname, rr.command); - const fn = mod[name] || mod; - let elements: Promise> = Promise.resolve({}); - let rendered = false; - const rerender = (input: string) => { - if (rendered) { - throw new Error('already rendered'); - } - const renderContext: RenderContext = { rerender, context: rr.context }; - elements = Promise.all([elements, render(renderContext, input)]).then( - ([oldElements, newElements]) => ({ ...oldElements, ...newElements }), - ); - }; - const renderContext: RenderContext = { rerender, context: rr.context }; - const data = await fn.apply(renderContext, args); - const resolvedElements = await elements; - rendered = true; - return renderToReadableStream( - { ...resolvedElements, _value: data }, - bundlerConfig, - ).pipeThrough(transformRsfId(rsfPrefix)); - } - - // rr.method === 'GET' - const renderContext: RenderContext = { - rerender: () => { - throw new Error('Cannot rerender'); - }, - context: rr.context, - }; - const elements = await render(renderContext, rr.input); - return renderToReadableStream(elements, bundlerConfig).pipeThrough( - transformRsfId(rsfPrefix), - ); -} - -async function getBuildConfigRSC(config: Omit) { - const entriesFile = getEntriesFile(config, 'build'); - const { - default: { getBuildConfig }, - } = await (loadServerFile(entriesFile, 'build') as Promise); - if (!getBuildConfig) { - console.warn( - "getBuildConfig is undefined. It's recommended for optimization and sometimes required.", - ); - return {}; - } - - const unstable_collectClientModules = async ( - input: string, - ): Promise => { - const idSet = new Set(); - const readable = await renderRSC({ - input, - method: 'GET', - headers: {}, - config, - command: 'build', - context: null, - moduleIdCallback: (id) => idSet.add(id), - }); - await new Promise((resolve, reject) => { - const writable = new WritableStream({ - close() { - resolve(); - }, - abort(reason) { - reject(reason); - }, - }); - readable.pipeTo(writable); - }); - return Array.from(idSet); - }; - - const output = await getBuildConfig(unstable_collectClientModules); - return output; -} From 3618f30a4605f38ea45803dba6ddb339c754e663 Mon Sep 17 00:00:00 2001 From: daishi Date: Mon, 4 Dec 2023 16:20:30 +0900 Subject: [PATCH 3/7] use worker only in dev --- packages/waku/src/lib/middleware/rsc.ts | 75 +++++++++++++------ .../src/lib/middleware/rsc/worker-impl.ts | 2 +- packages/waku/src/lib/rsc/renderer.ts | 2 +- 3 files changed, 53 insertions(+), 26 deletions(-) diff --git a/packages/waku/src/lib/middleware/rsc.ts b/packages/waku/src/lib/middleware/rsc.ts index 4669d2591..f430cab13 100644 --- a/packages/waku/src/lib/middleware/rsc.ts +++ b/packages/waku/src/lib/middleware/rsc.ts @@ -6,12 +6,13 @@ import type { Config } from '../../config.js'; import { resolveConfig } from '../config.js'; import { endStream } from '../utils/stream.js'; import { renderHtml } from './rsc/ssr.js'; -import { decodeInput, hasStatusCode } from './rsc/utils.js'; +import { decodeInput, hasStatusCode, deepFreeze } from './rsc/utils.js'; import { registerReloadCallback, registerImportCallback, - renderRSC, + renderRSC as renderRSCWorker, } from './rsc/worker-api.js'; +import { renderRSC } from '../rsc/renderer.js'; import { patchReactRefresh } from '../vite-plugin/patch-react-refresh.js'; import type { BaseReq, BaseRes, Middleware } from './types.js'; @@ -166,30 +167,56 @@ export function rsc< return; } } - if (pathStr.startsWith(basePrefix)) { - const { method, headers } = req; - if (method !== 'GET' && method !== 'POST') { - throw new Error(`Unsupported method '${method}'`); + if (command !== 'dev') { + if (pathStr.startsWith(basePrefix)) { + const { method, headers } = req; + if (method !== 'GET' && method !== 'POST') { + throw new Error(`Unsupported method '${method}'`); + } + try { + const input = decodeInput(pathStr.slice(basePrefix.length)); + const readable = await renderRSC({ + config, + input, + method, + context, + body: req.stream, + contentType: headers['content-type'] as string | undefined, + isDev: false, + }); + unstable_posthook?.(req, res, context as Context); + deepFreeze(context); + readable.pipeTo(res.stream); + } catch (e) { + handleError(e); + } + return; } - try { - const input = decodeInput(pathStr.slice(basePrefix.length)); - const [readable, nextCtx] = await renderRSC({ - input, - method, - headers, - config, - command, - context, - stream: req.stream, - }); - unstable_posthook?.(req, res, nextCtx as Context); - readable.pipeTo(res.stream); - } catch (e) { - handleError(e); + } else { + // command === 'dev' + if (pathStr.startsWith(basePrefix)) { + const { method, headers } = req; + if (method !== 'GET' && method !== 'POST') { + throw new Error(`Unsupported method '${method}'`); + } + try { + const input = decodeInput(pathStr.slice(basePrefix.length)); + const [readable, nextCtx] = await renderRSCWorker({ + input, + method, + headers, + config, + command, + context, + stream: req.stream, + }); + unstable_posthook?.(req, res, nextCtx as Context); + readable.pipeTo(res.stream); + } catch (e) { + handleError(e); + } + return; } - return; - } - if (command === 'dev') { const vite = await getViteServer(); // TODO Do we still need this? // HACK re-export "?v=..." URL to avoid dual module hazard. diff --git a/packages/waku/src/lib/middleware/rsc/worker-impl.ts b/packages/waku/src/lib/middleware/rsc/worker-impl.ts index e4fd16def..61f35211b 100644 --- a/packages/waku/src/lib/middleware/rsc/worker-impl.ts +++ b/packages/waku/src/lib/middleware/rsc/worker-impl.ts @@ -50,7 +50,7 @@ const handleRender = async (mesg: MessageReq & { type: 'render' }) => { method: rr.method, context: rr.context, body: rr.stream, - contentType: rr.headers['content-type'] as string, + contentType: rr.headers['content-type'] as string | undefined, ...(rr.moduleIdCallback ? { moduleIdCallback: rr.moduleIdCallback } : {}), ...(rr.command === 'dev' ? { diff --git a/packages/waku/src/lib/rsc/renderer.ts b/packages/waku/src/lib/rsc/renderer.ts index 59492c5bf..4862f6f2a 100644 --- a/packages/waku/src/lib/rsc/renderer.ts +++ b/packages/waku/src/lib/rsc/renderer.ts @@ -109,7 +109,7 @@ export async function renderRSC( method: 'GET' | 'POST'; context: unknown; body?: ReadableStream; - contentType?: string; + contentType?: string | undefined; moduleIdCallback?: (id: string) => void; } & ( | { isDev: false } From b7106c7a8236129f9d16696854aaef5efdf6c959 Mon Sep 17 00:00:00 2001 From: daishi Date: Mon, 4 Dec 2023 16:42:30 +0900 Subject: [PATCH 4/7] do not use worker unless dev --- packages/waku/src/lib/builder.ts | 14 +++----- packages/waku/src/lib/middleware/rsc/ssr.ts | 33 +++++++++++++------ .../waku/src/lib/middleware/rsc/worker-api.ts | 6 ++-- .../src/lib/middleware/rsc/worker-impl.ts | 1 + 4 files changed, 32 insertions(+), 22 deletions(-) diff --git a/packages/waku/src/lib/builder.ts b/packages/waku/src/lib/builder.ts index 32fe69cf3..2833e6b48 100644 --- a/packages/waku/src/lib/builder.ts +++ b/packages/waku/src/lib/builder.ts @@ -21,11 +21,7 @@ import type { Config, ResolvedConfig } from '../config.js'; import { resolveConfig, viteInlineConfig } from './config.js'; import { normalizePath } from './utils/path.js'; import { encodeInput, generatePrefetchCode } from './middleware/rsc/utils.js'; -import { - shutdown as shutdownRsc, - renderRSC, - getBuildConfigRSC, -} from './middleware/rsc/worker-api.js'; +import { renderRSC, getBuildConfigRSC } from './rsc/renderer.js'; import { rscIndexPlugin } from './vite-plugin/rsc-index-plugin.js'; import { rscAnalyzePlugin } from './vite-plugin/rsc-analyze-plugin.js'; import { rscTransformPlugin } from './vite-plugin/rsc-transform-plugin.js'; @@ -233,7 +229,7 @@ const buildClientBundle = async ( }; const emitRscFiles = async (config: ResolvedConfig) => { - const buildConfig = await getBuildConfigRSC(config); + const buildConfig = await getBuildConfigRSC({ config }); const clientModuleMap = new Map>(); const addClientModule = (input: string, id: string) => { let idSet = clientModuleMap.get(input); @@ -261,14 +257,13 @@ const emitRscFiles = async (config: ResolvedConfig) => { if (!rscFileSet.has(destFile)) { rscFileSet.add(destFile); await fsPromises.mkdir(path.dirname(destFile), { recursive: true }); - const [readable] = await renderRSC({ + const readable = await renderRSC({ input, method: 'GET', - headers: {}, config, - command: 'build', context, moduleIdCallback: (id) => addClientModule(input, id), + isDev: false, }); await pipeline( Readable.fromWeb(readable as any), @@ -493,5 +488,4 @@ export async function build(options: { config: Config; ssr?: boolean }) { emitVercelOutput(config, clientBuildOutput, rscFiles, htmlFiles); await shutdownSsr(); - await shutdownRsc(); } diff --git a/packages/waku/src/lib/middleware/rsc/ssr.ts b/packages/waku/src/lib/middleware/rsc/ssr.ts index 920147fc6..885dab826 100644 --- a/packages/waku/src/lib/middleware/rsc/ssr.ts +++ b/packages/waku/src/lib/middleware/rsc/ssr.ts @@ -9,8 +9,9 @@ import { viteInlineConfig } from '../../config.js'; import { defineEntries } from '../../../server.js'; import { concatUint8Arrays } from '../../utils/stream.js'; import { normalizePath } from '../../utils/path.js'; -import { renderRSC } from './worker-api.js'; -import { hasStatusCode } from './utils.js'; +import { renderRSC as renderRSCWorker } from './worker-api.js'; +import { renderRSC } from '../../rsc/renderer.js'; +import { hasStatusCode, deepFreeze } from './utils.js'; const loadReact = async ( config: ResolvedConfig, @@ -357,14 +358,26 @@ export const renderHtml = async ( let stream: ReadableStream; let nextCtx: Context; try { - [stream, nextCtx] = await renderRSC({ - input: ssrConfig.input, - method: 'GET', - headers: {}, - config, - command, - context, - }); + if (command !== 'dev') { + stream = await renderRSC({ + config, + input: ssrConfig.input, + method: 'GET', + context, + isDev: false, + }); + deepFreeze(context); + nextCtx = context; + } else { + [stream, nextCtx] = await renderRSCWorker({ + input: ssrConfig.input, + method: 'GET', + headers: {}, + config, + command, + context, + }); + } } catch (e) { if (hasStatusCode(e) && e.statusCode === 404) { return null; diff --git a/packages/waku/src/lib/middleware/rsc/worker-api.ts b/packages/waku/src/lib/middleware/rsc/worker-api.ts index f778b33fd..87621a49b 100644 --- a/packages/waku/src/lib/middleware/rsc/worker-api.ts +++ b/packages/waku/src/lib/middleware/rsc/worker-api.ts @@ -8,7 +8,7 @@ export type RenderRequest = { method: 'GET' | 'POST'; headers: Record; config: Omit; - command: 'dev' | 'build' | 'start'; + command: 'dev'; // DEV only context: unknown; stream?: ReadableStream; moduleIdCallback?: (id: string) => void; @@ -20,7 +20,7 @@ export type BuildOutput = { }; export type MessageReq = - | { type: 'shutdown' } + | { type: 'shutdown' } // TODO unused | ({ id: number; type: 'render'; @@ -101,6 +101,7 @@ export function registerImportCallback(fn: (source: string) => void) { return () => worker.off('message', listener); } +// TODO unused export function shutdown(): Promise { const worker = lastWorker; if (!worker) { @@ -213,6 +214,7 @@ export function renderRSC( }); } +// TODO unused export function getBuildConfigRSC( config: ResolvedConfig, ): ReturnType { diff --git a/packages/waku/src/lib/middleware/rsc/worker-impl.ts b/packages/waku/src/lib/middleware/rsc/worker-impl.ts index 61f35211b..d597b87ee 100644 --- a/packages/waku/src/lib/middleware/rsc/worker-impl.ts +++ b/packages/waku/src/lib/middleware/rsc/worker-impl.ts @@ -153,6 +153,7 @@ const getViteServer = async () => { return viteServer; }; +// TODO unused const shutdown = async () => { if (lastViteServer) { await lastViteServer.close(); From 0293ec0635c9b4ddb0f30652b7431bbb90b9f3a7 Mon Sep 17 00:00:00 2001 From: daishi Date: Mon, 4 Dec 2023 16:55:13 +0900 Subject: [PATCH 5/7] dynamic import Worker --- .../waku/src/lib/middleware/rsc/worker-api.ts | 77 ++++++++++--------- 1 file changed, 42 insertions(+), 35 deletions(-) diff --git a/packages/waku/src/lib/middleware/rsc/worker-api.ts b/packages/waku/src/lib/middleware/rsc/worker-api.ts index 87621a49b..46a9811ba 100644 --- a/packages/waku/src/lib/middleware/rsc/worker-api.ts +++ b/packages/waku/src/lib/middleware/rsc/worker-api.ts @@ -1,4 +1,4 @@ -import { Worker } from 'node:worker_threads'; // TODO no node dependency +import type { Worker as WorkerOrig } from 'node:worker_threads'; import type { ResolvedConfig } from '../../../config.js'; import type { GetBuildConfig } from '../../../server.js'; @@ -48,7 +48,7 @@ export type MessageRes = const messageCallbacks = new Map void>(); let lastCommand: 'dev' | 'build' | 'start' | undefined; -let lastWorker: Worker | undefined; +let lastWorker: Promise | undefined; const getWorker = (command: 'dev' | 'build' | 'start') => { if (lastWorker) { if (lastCommand !== command) { @@ -56,31 +56,36 @@ const getWorker = (command: 'dev' | 'build' | 'start') => { } return lastWorker; } - const IS_NODE_18 = Number(process.versions.node.split('.')[0]) < 20; - const worker = new Worker(new URL('worker-impl.js', import.meta.url), { - execArgv: - command !== 'dev' - ? [] - : [ - ...(IS_NODE_18 - ? ['--experimental-loader', 'waku/node-loader'] - : []), - '--conditions', - 'react-server', - ], - }); - worker.on('message', (mesg: MessageRes) => { - if ('id' in mesg) { - messageCallbacks.get(mesg.id)?.(mesg); - } - }); lastCommand = command; - lastWorker = worker; - return worker; + return (lastWorker = new Promise((resolve) => { + import('node:worker_threads').then(({ Worker }) => { + const IS_NODE_18 = Number(process.versions.node.split('.')[0]) < 20; + const worker = new Worker(new URL('worker-impl.js', import.meta.url), { + execArgv: + command !== 'dev' + ? [] + : [ + ...(IS_NODE_18 + ? ['--experimental-loader', 'waku/node-loader'] + : []), + '--conditions', + 'react-server', + ], + }); + worker.on('message', (mesg: MessageRes) => { + if ('id' in mesg) { + messageCallbacks.get(mesg.id)?.(mesg); + } + }); + resolve(worker); + }); + })); }; -export function registerReloadCallback(fn: (type: 'full-reload') => void) { - const worker = getWorker('dev'); +export async function registerReloadCallback( + fn: (type: 'full-reload') => void, +) { + const worker = await getWorker('dev'); const listener = (mesg: MessageRes) => { if (mesg.type === 'full-reload') { fn(mesg.type); @@ -90,8 +95,8 @@ export function registerReloadCallback(fn: (type: 'full-reload') => void) { return () => worker.off('message', listener); } -export function registerImportCallback(fn: (source: string) => void) { - const worker = getWorker('dev'); +export async function registerImportCallback(fn: (source: string) => void) { + const worker = await getWorker('dev'); const listener = (mesg: MessageRes) => { if (mesg.type === 'hot-import') { fn(mesg.source); @@ -103,23 +108,25 @@ export function registerImportCallback(fn: (source: string) => void) { // TODO unused export function shutdown(): Promise { - const worker = lastWorker; - if (!worker) { + const workerPromise = lastWorker; + if (!workerPromise) { throw new Error('No worker to shutdown'); } return new Promise((resolve) => { - worker.on('close', resolve); - const mesg: MessageReq = { type: 'shutdown' }; - worker.postMessage(mesg); + workerPromise.then((worker) => { + worker.on('close', resolve); + const mesg: MessageReq = { type: 'shutdown' }; + worker.postMessage(mesg); + }); }); } let nextId = 1; -export function renderRSC( +export async function renderRSC( rr: RenderRequest, ): Promise { - const worker = getWorker(rr.command); + const worker = await getWorker(rr.command); const id = nextId++; const pipe = async () => { if (rr.stream) { @@ -215,12 +222,12 @@ export function renderRSC( } // TODO unused -export function getBuildConfigRSC( +export async function getBuildConfigRSC( config: ResolvedConfig, ): ReturnType { // eslint-disable-next-line @typescript-eslint/no-unused-vars const { ssr: _removed, ...copiedConfig } = config; - const worker = getWorker('build'); + const worker = await getWorker('build'); return new Promise((resolve, reject) => { const id = nextId++; messageCallbacks.set(id, (mesg) => { From b8c6a5565281b61df11f59a46926227b06531a19 Mon Sep 17 00:00:00 2001 From: daishi Date: Mon, 4 Dec 2023 17:05:36 +0900 Subject: [PATCH 6/7] make worker dev only --- .../waku/src/lib/middleware/rsc/worker-api.ts | 82 +++--------------- .../src/lib/middleware/rsc/worker-impl.ts | 84 ++++--------------- 2 files changed, 27 insertions(+), 139 deletions(-) diff --git a/packages/waku/src/lib/middleware/rsc/worker-api.ts b/packages/waku/src/lib/middleware/rsc/worker-api.ts index 46a9811ba..9fc6dfdea 100644 --- a/packages/waku/src/lib/middleware/rsc/worker-api.ts +++ b/packages/waku/src/lib/middleware/rsc/worker-api.ts @@ -1,7 +1,6 @@ import type { Worker as WorkerOrig } from 'node:worker_threads'; import type { ResolvedConfig } from '../../../config.js'; -import type { GetBuildConfig } from '../../../server.js'; export type RenderRequest = { input: string; @@ -20,7 +19,6 @@ export type BuildOutput = { }; export type MessageReq = - | { type: 'shutdown' } // TODO unused | ({ id: number; type: 'render'; @@ -28,8 +26,7 @@ export type MessageReq = } & Omit) | { id: number; type: 'buf'; buf: ArrayBuffer; offset: number; len: number } | { id: number; type: 'end' } - | { id: number; type: 'err'; err: unknown } - | { id: number; type: 'getBuildConfig'; config: Omit }; + | { id: number; type: 'err'; err: unknown }; export type MessageRes = | { type: 'full-reload' } @@ -38,39 +35,24 @@ export type MessageRes = | { id: number; type: 'buf'; buf: ArrayBuffer; offset: number; len: number } | { id: number; type: 'end' } | { id: number; type: 'err'; err: unknown; statusCode?: number } - | { id: number; type: 'moduleId'; moduleId: string } - | { - id: number; - type: 'buildConfig'; - output: Awaited>; - }; + | { id: number; type: 'moduleId'; moduleId: string }; const messageCallbacks = new Map void>(); -let lastCommand: 'dev' | 'build' | 'start' | undefined; let lastWorker: Promise | undefined; -const getWorker = (command: 'dev' | 'build' | 'start') => { +const getWorker = () => { if (lastWorker) { - if (lastCommand !== command) { - throw new Error('cannot create worker with different command'); - } return lastWorker; } - lastCommand = command; return (lastWorker = new Promise((resolve) => { import('node:worker_threads').then(({ Worker }) => { const IS_NODE_18 = Number(process.versions.node.split('.')[0]) < 20; const worker = new Worker(new URL('worker-impl.js', import.meta.url), { - execArgv: - command !== 'dev' - ? [] - : [ - ...(IS_NODE_18 - ? ['--experimental-loader', 'waku/node-loader'] - : []), - '--conditions', - 'react-server', - ], + execArgv: [ + ...(IS_NODE_18 ? ['--experimental-loader', 'waku/node-loader'] : []), + '--conditions', + 'react-server', + ], }); worker.on('message', (mesg: MessageRes) => { if ('id' in mesg) { @@ -85,7 +67,7 @@ const getWorker = (command: 'dev' | 'build' | 'start') => { export async function registerReloadCallback( fn: (type: 'full-reload') => void, ) { - const worker = await getWorker('dev'); + const worker = await getWorker(); const listener = (mesg: MessageRes) => { if (mesg.type === 'full-reload') { fn(mesg.type); @@ -96,7 +78,7 @@ export async function registerReloadCallback( } export async function registerImportCallback(fn: (source: string) => void) { - const worker = await getWorker('dev'); + const worker = await getWorker(); const listener = (mesg: MessageRes) => { if (mesg.type === 'hot-import') { fn(mesg.source); @@ -106,27 +88,12 @@ export async function registerImportCallback(fn: (source: string) => void) { return () => worker.off('message', listener); } -// TODO unused -export function shutdown(): Promise { - const workerPromise = lastWorker; - if (!workerPromise) { - throw new Error('No worker to shutdown'); - } - return new Promise((resolve) => { - workerPromise.then((worker) => { - worker.on('close', resolve); - const mesg: MessageReq = { type: 'shutdown' }; - worker.postMessage(mesg); - }); - }); -} - let nextId = 1; export async function renderRSC( rr: RenderRequest, ): Promise { - const worker = await getWorker(rr.command); + const worker = await getWorker(); const id = nextId++; const pipe = async () => { if (rr.stream) { @@ -220,30 +187,3 @@ export async function renderRSC( pipe(); }); } - -// TODO unused -export async function getBuildConfigRSC( - config: ResolvedConfig, -): ReturnType { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - const { ssr: _removed, ...copiedConfig } = config; - const worker = await getWorker('build'); - return new Promise((resolve, reject) => { - const id = nextId++; - messageCallbacks.set(id, (mesg) => { - if (mesg.type === 'buildConfig') { - resolve(mesg.output); - messageCallbacks.delete(id); - } else if (mesg.type === 'err') { - reject(mesg.err); - messageCallbacks.delete(id); - } - }); - const mesg: MessageReq = { - id, - type: 'getBuildConfig', - config: copiedConfig, - }; - worker.postMessage(mesg); - }); -} diff --git a/packages/waku/src/lib/middleware/rsc/worker-impl.ts b/packages/waku/src/lib/middleware/rsc/worker-impl.ts index d597b87ee..a9d28f7ea 100644 --- a/packages/waku/src/lib/middleware/rsc/worker-impl.ts +++ b/packages/waku/src/lib/middleware/rsc/worker-impl.ts @@ -1,36 +1,28 @@ -import url from 'node:url'; // TODO no node dependency -import { parentPort } from 'node:worker_threads'; // TODO no node dependency - +import url from 'node:url'; +import { parentPort } from 'node:worker_threads'; +import { register } from 'node:module'; +import { Server } from 'node:http'; +import { createServer as viteCreateServer } from 'vite'; import type { ViteDevServer } from 'vite'; import { viteInlineConfig } from '../../config.js'; import { hasStatusCode, deepFreeze } from './utils.js'; import type { MessageReq, MessageRes, RenderRequest } from './worker-api.js'; -import { renderRSC, getBuildConfigRSC } from '../../rsc/renderer.js'; - -let nodeLoaderRegistered = false; -const registerNodeLoader = async () => { - if (!nodeLoaderRegistered) { - nodeLoaderRegistered = true; - const IS_NODE_20 = Number(process.versions.node.split('.')[0]) >= 20; - if (IS_NODE_20) { - const { - default: { register }, - } = await import('node:module'); - register('waku/node-loader', url.pathToFileURL('./')); - } - } -}; +import { renderRSC } from '../../rsc/renderer.js'; +import { rscTransformPlugin } from '../../vite-plugin/rsc-transform-plugin.js'; +import { rscReloadPlugin } from '../../vite-plugin/rsc-reload-plugin.js'; +import { rscDelegatePlugin } from '../../vite-plugin/rsc-delegate-plugin.js'; +const IS_NODE_20 = Number(process.versions.node.split('.')[0]) >= 20; +if (IS_NODE_20) { + register('waku/node-loader', url.pathToFileURL('./')); +} const controllerMap = new Map(); const handleRender = async (mesg: MessageReq & { type: 'render' }) => { // eslint-disable-next-line @typescript-eslint/no-unused-vars const { id, type, hasModuleIdCallback, ...rest } = mesg; const rr: RenderRequest = rest; - if (rr.command === 'dev') { - await registerNodeLoader(); - } try { const stream = new ReadableStream({ start(controller) { @@ -52,14 +44,8 @@ const handleRender = async (mesg: MessageReq & { type: 'render' }) => { body: rr.stream, contentType: rr.headers['content-type'] as string | undefined, ...(rr.moduleIdCallback ? { moduleIdCallback: rr.moduleIdCallback } : {}), - ...(rr.command === 'dev' - ? { - isDev: true, - customImport: loadServerFile, - } - : { - isDev: false, - }), + isDev: true, + customImport: loadServerFile, }); const mesg: MessageRes = { id, type: 'start', context: rr.context }; parentPort!.postMessage(mesg); @@ -93,37 +79,12 @@ const handleRender = async (mesg: MessageReq & { type: 'render' }) => { } }; -const handleGetBuildConfig = async ( - mesg: MessageReq & { type: 'getBuildConfig' }, -) => { - const { id, config } = mesg; - try { - const output = await getBuildConfigRSC({ config }); - const mesg: MessageRes = { id, type: 'buildConfig', output }; - parentPort!.postMessage(mesg); - } catch (err) { - const mesg: MessageRes = { id, type: 'err', err }; - parentPort!.postMessage(mesg); - } -}; - let lastViteServer: ViteDevServer | undefined; const getViteServer = async () => { if (lastViteServer) { return lastViteServer; } - const { Server } = await import('node:http'); const dummyServer = new Server(); // FIXME we hope to avoid this hack - const { createServer: viteCreateServer } = await import('vite'); - const { rscTransformPlugin } = await import( - '../../vite-plugin/rsc-transform-plugin.js' - ); - const { rscReloadPlugin } = await import( - '../../vite-plugin/rsc-reload-plugin.js' - ); - const { rscDelegatePlugin } = await import( - '../../vite-plugin/rsc-delegate-plugin.js' - ); const viteServer = await viteCreateServer({ ...(await viteInlineConfig()), plugins: [ @@ -153,27 +114,14 @@ const getViteServer = async () => { return viteServer; }; -// TODO unused -const shutdown = async () => { - if (lastViteServer) { - await lastViteServer.close(); - lastViteServer = undefined; - } - parentPort!.close(); -}; - const loadServerFile = async (fname: string) => { const vite = await getViteServer(); return vite.ssrLoadModule(fname); }; parentPort!.on('message', (mesg: MessageReq) => { - if (mesg.type === 'shutdown') { - shutdown(); - } else if (mesg.type === 'render') { + if (mesg.type === 'render') { handleRender(mesg); - } else if (mesg.type === 'getBuildConfig') { - handleGetBuildConfig(mesg); } else if (mesg.type === 'buf') { const controller = controllerMap.get(mesg.id)!; controller.enqueue(new Uint8Array(mesg.buf, mesg.offset, mesg.len)); From 712ff6465c81f840036ef7992186554a34872b6a Mon Sep 17 00:00:00 2001 From: daishi Date: Mon, 4 Dec 2023 18:50:29 +0900 Subject: [PATCH 7/7] revert register import --- packages/waku/src/lib/middleware/rsc/worker-impl.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/waku/src/lib/middleware/rsc/worker-impl.ts b/packages/waku/src/lib/middleware/rsc/worker-impl.ts index a9d28f7ea..cf44d6e5b 100644 --- a/packages/waku/src/lib/middleware/rsc/worker-impl.ts +++ b/packages/waku/src/lib/middleware/rsc/worker-impl.ts @@ -1,6 +1,7 @@ +// This file can depend on Node.js + import url from 'node:url'; import { parentPort } from 'node:worker_threads'; -import { register } from 'node:module'; import { Server } from 'node:http'; import { createServer as viteCreateServer } from 'vite'; import type { ViteDevServer } from 'vite'; @@ -15,6 +16,9 @@ import { rscDelegatePlugin } from '../../vite-plugin/rsc-delegate-plugin.js'; const IS_NODE_20 = Number(process.versions.node.split('.')[0]) >= 20; if (IS_NODE_20) { + const { + default: { register }, + } = await import('node:module'); register('waku/node-loader', url.pathToFileURL('./')); } const controllerMap = new Map();