Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSC: Implement RSC worker #9331

Merged
merged 5 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions packages/cli/src/commands/serveBothHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ export const bothExperimentalServerFileHandler = async () => {

await execa(
'node',
[
'--conditions react-server',
'./node_modules/@redwoodjs/vite/dist/runRscFeServer.js',
],
['./node_modules/@redwoodjs/vite/dist/runRscFeServer.js'],
{
cwd: getPaths().base,
stdio: 'inherit',
Expand Down Expand Up @@ -64,9 +61,9 @@ export const bothRscServerHandler = async (argv) => {
const fePromise = execa(
'node',
[
// TODO (RSC): Do we need these on the worker thread?
'--experimental-loader @redwoodjs/vite/node-loader',
'--experimental-loader @redwoodjs/vite/react-node-loader',
'--conditions react-server',
'./node_modules/@redwoodjs/vite/dist/runRscFeServer.js',
],
{
Expand Down
31 changes: 29 additions & 2 deletions packages/vite/src/rsc/rscRequestHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import type { Request, Response } from 'express'
import RSDWServer from 'react-server-dom-webpack/server.node.unbundled'

import { hasStatusCode } from '../lib/StatusError'
import { renderRSC } from '../waku-lib/rsc-handler-worker'

import { renderRsc } from './rscWorkerCommunication'

const { decodeReply, decodeReplyFromBusboy } = RSDWServer

Expand Down Expand Up @@ -46,6 +47,32 @@ export function createRscRequestHandler() {

req.pipe(bb)
args = await reply

// TODO (RSC): Loop over args (to not only look at args[0])
// TODO (RSC): Verify that this works with node16 (MDN says FormData is
// only supported in node18 and up)
if (args[0] instanceof FormData) {
const serializedFormData: Record<string, any> = {}

for (const [key, value] of args[0]) {
// Several form fields can share the same name. This should be
// represented as an array of the values of all those fields
if (serializedFormData[key] !== undefined) {
if (!Array.isArray(serializedFormData[key])) {
serializedFormData[key] = [serializedFormData[key]]
}

serializedFormData[key].push(value)
} else {
serializedFormData[key] = value
}
}

args[0] = {
__formData__: true,
state: serializedFormData,
}
}
} else {
let body = ''

Expand Down Expand Up @@ -82,7 +109,7 @@ export function createRscRequestHandler() {
}

try {
const pipeable = await renderRSC({ rscId, props, rsfId, args })
const pipeable = await renderRsc({ rscId, props, rsfId, args })
// TODO (RSC): See if we can/need to do more error handling here
// pipeable.on(handleError)
pipeable.pipe(res)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// TODO (RSC) Take ownership of this file and move it out ouf the waku-lib folder
// import fs from 'node:fs'
// This is a dedicated worker for RSCs.
// It's needed because the main process can't be loaded with
// `--condition react-server`. If we did try to do that the main process
// couldn't do SSR because it would be missing client-side React functions
// like `useState` and `createContext`.
import path from 'node:path'
import type { Writable } from 'node:stream'
import { Writable } from 'node:stream'
import { parentPort } from 'node:worker_threads'

import { createElement } from 'react'
Expand All @@ -13,89 +16,98 @@ import { getPaths } from '@redwoodjs/project-config'

import type { defineEntries } from '../entries'
import { StatusError } from '../lib/StatusError'
// import type { unstable_GetCustomModules } from '../waku-server'
import { configFileConfig, resolveConfig } from '../waku-lib/config'
import { transformRsfId } from '../waku-lib/rsc-utils'
import {
rscTransformPlugin,
rscReloadPlugin,
} from '../waku-lib/vite-plugin-rsc'

import { configFileConfig, resolveConfig } from './config'
// import type { unstable_GetCustomModules } from '../waku-server'
import type {
RenderInput,
MessageRes,
MessageReq,
} from './rscWorkerCommunication'
// import type { RenderInput, MessageReq, MessageRes } from './rsc-handler'
import type { RenderInput, MessageRes } from './rsc-handler'
// import { transformRsfId, generatePrefetchCode } from './rsc-utils'
import { transformRsfId } from './rsc-utils'
import { rscTransformPlugin, rscReloadPlugin } from './vite-plugin-rsc'

const { renderToPipeableStream } = RSDWServer

type Entries = { default: ReturnType<typeof defineEntries> }
type PipeableStream = { pipe<T extends Writable>(destination: T): T }

// const handleSetClientEntries = async (
// mesg: MessageReq & { type: 'setClientEntries' }
// ) => {
// const { id, value } = mesg
// try {
// await setClientEntries(value)
const handleSetClientEntries = async ({
id,
value,
}: MessageReq & { type: 'setClientEntries' }) => {
try {
await setClientEntries(value)

// if (!parentPort) {
// throw new Error('parentPort is undefined')
// }
if (!parentPort) {
throw new Error('parentPort is undefined')
}

// const message: MessageRes = { id, type: 'end' }
// parentPort.postMessage(message)
// } catch (err) {
// if (!parentPort) {
// throw new Error('parentPort is undefined')
// }
const message: MessageRes = { id, type: 'end' }
parentPort.postMessage(message)
} catch (err) {
if (!parentPort) {
throw new Error('parentPort is undefined')
}

// const message: MessageRes = { id, type: 'err', err }
// parentPort.postMessage(message)
// }
// }
const message: MessageRes = { id, type: 'err', err }
parentPort.postMessage(message)
}
}

// const handleRender = async (message: MessageReq & { type: 'render' }) => {
// const { id, input } = message
const handleRender = async ({ id, input }: MessageReq & { type: 'render' }) => {
console.log('handleRender', id, input)

try {
const pipeable = await renderRsc(input)

const writable = new Writable({
write(chunk, encoding, callback) {
if (encoding !== ('buffer' as any)) {
throw new Error('Unknown encoding')
}

if (!parentPort) {
throw new Error('parentPort is undefined')
}

const buffer: Buffer = chunk
const message: MessageRes = {
id,
type: 'buf',
buf: buffer.buffer,
offset: buffer.byteOffset,
len: buffer.length,
}
parentPort.postMessage(message, [message.buf])
callback()
},
final(callback) {
if (!parentPort) {
throw new Error('parentPort is undefined')
}

const message: MessageRes = { id, type: 'end' }
parentPort.postMessage(message)
callback()
},
})

// try {
// const pipeable = await renderRSC(input)
// const writable = new Writable({
// write(chunk, encoding, callback) {
// if (encoding !== ('buffer' as any)) {
// throw new Error('Unknown encoding')
// }

// if (!parentPort) {
// throw new Error('parentPort is undefined')
// }

// const buffer: Buffer = chunk
// const msg: MessageRes = {
// id,
// type: 'buf',
// buf: buffer.buffer,
// offset: buffer.byteOffset,
// len: buffer.length,
// }
// parentPort.postMessage(msg, [msg.buf])
// callback()
// },
// final(callback) {
// if (!parentPort) {
// throw new Error('parentPort is undefined')
// }

// const mesg: MessageRes = { id, type: 'end' }
// parentPort.postMessage(mesg)
// callback()
// },
// })
// pipeable.pipe(writable)
// } catch (err) {
// if (!parentPort) {
// throw new Error('parentPort is undefined')
// }
pipeable.pipe(writable)
} catch (err) {
if (!parentPort) {
throw new Error('parentPort is undefined')
}

// const mesg: MessageRes = { id, type: 'err', err }
// parentPort.postMessage(mesg)
// }
// }
const message: MessageRes = { id, type: 'err', err }
parentPort.postMessage(message)
}
}

// const handleGetCustomModules = async (
// mesg: MessageReq & { type: 'getCustomModules' }
Expand Down Expand Up @@ -149,8 +161,8 @@ const vitePromise = createServer({
throw new Error('parentPort is undefined')
}

const mesg: MessageRes = { type }
parentPort.postMessage(mesg)
const message: MessageRes = { type }
parentPort.postMessage(message)
}),
],
resolve: {
Expand All @@ -159,38 +171,40 @@ const vitePromise = createServer({
appType: 'custom',
})

// const shutdown = async () => {
// const vite = await vitePromise
// await vite.close()
// if (!parentPort) {
// throw new Error('parentPort is undefined')
// }
const shutdown = async () => {
const vite = await vitePromise
await vite.close()
if (!parentPort) {
throw new Error('parentPort is undefined')
}

// parentPort.close()
// }
parentPort.close()
}

const loadServerFile = async (fname: string) => {
const vite = await vitePromise
return vite.ssrLoadModule(fname)
}

// if (!parentPort) {
// throw new Error('parentPort is undefined')
// }
if (!parentPort) {
throw new Error('parentPort is undefined')
}

// parentPort.on('message', (mesg: MessageReq) => {
// if (mesg.type === 'shutdown') {
// shutdown()
// } else if (mesg.type === 'setClientEntries') {
// handleSetClientEntries(mesg)
// } else if (mesg.type === 'render') {
// handleRender(mesg)
// } else if (mesg.type === 'getCustomModules') {
// handleGetCustomModules(mesg)
// } else if (mesg.type === 'build') {
// handleBuild(mesg)
// }
// })
parentPort.on('message', (message: MessageReq) => {
console.log('message', message)

if (message.type === 'shutdown') {
shutdown()
} else if (message.type === 'setClientEntries') {
handleSetClientEntries(message)
} else if (message.type === 'render') {
handleRender(message)
// } else if (message.type === 'getCustomModules') {
// handleGetCustomModules(message)
// } else if (message.type === 'build') {
// handleBuild(message)
}
})

const configPromise = resolveConfig('serve')

Expand Down Expand Up @@ -250,7 +264,7 @@ const resolveClientEntry = (
return clientEntry
}

export async function setClientEntries(
async function setClientEntries(
value: 'load' | Record<string, string>
): Promise<void> {
if (value !== 'load') {
Expand Down Expand Up @@ -283,7 +297,16 @@ export async function setClientEntries(
)
}

export async function renderRSC(input: RenderInput): Promise<PipeableStream> {
interface SerializedFormData {
__formData__: boolean
state: Record<string, string | string[]>
}

function isSerializedFormData(data?: unknown): data is SerializedFormData {
return !!data && (data as SerializedFormData)?.__formData__
}

async function renderRsc(input: RenderInput): Promise<PipeableStream> {
const config = await configPromise
const bundlerConfig = new Proxy(
{},
Expand All @@ -300,13 +323,31 @@ export async function renderRSC(input: RenderInput): Promise<PipeableStream> {
}
)

console.log('renderRSC input', input)
console.log('renderRsc input', input)

if (input.rsfId && input.args) {
const [fileId, name] = input.rsfId.split('#')
const fname = path.join(config.root, fileId)
const mod = await loadServerFile(fname)
const data = await (mod[name] || mod)(...input.args)
console.log('Server Action, fileId', fileId, 'name', name, 'fname', fname)
const module = await loadServerFile(fname)

if (isSerializedFormData(input.args[0])) {
const formData = new FormData()

Object.entries(input.args[0].state).forEach(([key, value]) => {
if (Array.isArray(value)) {
value.forEach((v) => {
formData.append(key, v)
})
} else {
formData.append(key, value)
}
})

input.args[0] = formData
}

const data = await (module[name] || module)(...input.args)
if (!input.rscId) {
return renderToPipeableStream(data, bundlerConfig)
}
Expand Down
Loading
Loading