Skip to content

Commit

Permalink
Support FormData in worker
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobbe committed Oct 24, 2023
1 parent 9bca5d3 commit aa764bc
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 572 deletions.
7 changes: 2 additions & 5 deletions packages/cli/src/commands/serveBothHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ export const bothExperimentalServerFileHandler = async () => {

await execa(
'node',
[
'--conditions react-server',
'./node_modules/@redwoodjs/vite/dist/runRscFeServer.js',
],
['./node_modules/@redwoodjs/vite/dist/runRscFeServer.js'],
{
cwd: getPaths().base,
stdio: 'inherit',
Expand Down Expand Up @@ -64,9 +61,9 @@ export const bothRscServerHandler = async (argv) => {
const fePromise = execa(
'node',
[
// TODO (RSC): Do we need these on the worker thread?
'--experimental-loader @redwoodjs/vite/node-loader',
'--experimental-loader @redwoodjs/vite/react-node-loader',
'--conditions react-server',
'./node_modules/@redwoodjs/vite/dist/runRscFeServer.js',
],
{
Expand Down
32 changes: 30 additions & 2 deletions packages/vite/src/rsc/rscRequestHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import type { Request, Response } from 'express'
import RSDWServer from 'react-server-dom-webpack/server.node.unbundled'

import { hasStatusCode } from '../lib/StatusError'
import { renderRSC } from '../waku-lib/rsc-handler-worker'

import { renderRsc } from './rscWorkerCommunication'

const { decodeReply, decodeReplyFromBusboy } = RSDWServer

Expand Down Expand Up @@ -40,12 +41,39 @@ export function createRscRequestHandler() {
}

if (rsfId) {
console.log('headers', req.headers)
if (req.headers['content-type']?.startsWith('multipart/form-data')) {
const bb = busboy({ headers: req.headers })
const reply = decodeReplyFromBusboy(bb)

req.pipe(bb)
args = await reply

// TODO (RSC): Loop over args (to not only look at args[0])
// TODO (RSC): Verify that this works with node16 (MDN says FormData is
// only supported in node18 and up)
if (args[0] instanceof FormData) {
const serializedFormData: Record<string, any> = {}

for (const [key, value] of args[0]) {
// Several form fields can share the same name. This should be
// represented as an array of the values of all those fields
if (serializedFormData[key] !== undefined) {
if (!Array.isArray(serializedFormData[key])) {
serializedFormData[key] = [serializedFormData[key]]
}

serializedFormData[key].push(value)
} else {
serializedFormData[key] = value
}
}

args[0] = {
__formData__: true,
state: serializedFormData,
}
}
} else {
let body = ''

Expand Down Expand Up @@ -82,7 +110,7 @@ export function createRscRequestHandler() {
}

try {
const pipeable = await renderRSC({ rscId, props, rsfId, args })
const pipeable = await renderRsc({ rscId, props, rsfId, args })
// TODO (RSC): See if we can/need to do more error handling here
// pipeable.on(handleError)
pipeable.pipe(res)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// TODO (RSC) Take ownership of this file and move it out ouf the waku-lib folder
// import fs from 'node:fs'
// This is a dedicated worker for RSCs.
// It's needed because the main process can't be loaded with
// `--condition react-server`. If we did try to do that the main process
// couldn't do SSR because it would be missing client-side React functions
// like `useState` and `createContext`.
import path from 'node:path'
import type { Writable } from 'node:stream'
import { Writable } from 'node:stream'
import { parentPort } from 'node:worker_threads'

import { createElement } from 'react'
Expand All @@ -21,7 +24,11 @@ import {
} from '../waku-lib/vite-plugin-rsc'

// import type { unstable_GetCustomModules } from '../waku-server'
import type { RenderInput, MessageRes } from './rsc-handler'
import type {
RenderInput,
MessageRes,
MessageReq,
} from './rscWorkerCommunication'
// import type { RenderInput, MessageReq, MessageRes } from './rsc-handler'
// import { transformRsfId, generatePrefetchCode } from './rsc-utils'

Expand All @@ -30,75 +37,77 @@ const { renderToPipeableStream } = RSDWServer
type Entries = { default: ReturnType<typeof defineEntries> }
type PipeableStream = { pipe<T extends Writable>(destination: T): T }

// const handleSetClientEntries = async (
// mesg: MessageReq & { type: 'setClientEntries' }
// ) => {
// const { id, value } = mesg
// try {
// await setClientEntries(value)
const handleSetClientEntries = async ({
id,
value,
}: MessageReq & { type: 'setClientEntries' }) => {
try {
await setClientEntries(value)

// if (!parentPort) {
// throw new Error('parentPort is undefined')
// }
if (!parentPort) {
throw new Error('parentPort is undefined')
}

// const message: MessageRes = { id, type: 'end' }
// parentPort.postMessage(message)
// } catch (err) {
// if (!parentPort) {
// throw new Error('parentPort is undefined')
// }
const message: MessageRes = { id, type: 'end' }
parentPort.postMessage(message)
} catch (err) {
if (!parentPort) {
throw new Error('parentPort is undefined')
}

// const message: MessageRes = { id, type: 'err', err }
// parentPort.postMessage(message)
// }
// }
const message: MessageRes = { id, type: 'err', err }
parentPort.postMessage(message)
}
}

// const handleRender = async (message: MessageReq & { type: 'render' }) => {
// const { id, input } = message
const handleRender = async ({ id, input }: MessageReq & { type: 'render' }) => {
console.log('handleRender', id, input)

try {
const pipeable = await renderRsc(input)

const writable = new Writable({
write(chunk, encoding, callback) {
if (encoding !== ('buffer' as any)) {
throw new Error('Unknown encoding')
}

if (!parentPort) {
throw new Error('parentPort is undefined')
}

const buffer: Buffer = chunk
const message: MessageRes = {
id,
type: 'buf',
buf: buffer.buffer,
offset: buffer.byteOffset,
len: buffer.length,
}
parentPort.postMessage(message, [message.buf])
callback()
},
final(callback) {
if (!parentPort) {
throw new Error('parentPort is undefined')
}

const message: MessageRes = { id, type: 'end' }
parentPort.postMessage(message)
callback()
},
})

// try {
// const pipeable = await renderRSC(input)
// const writable = new Writable({
// write(chunk, encoding, callback) {
// if (encoding !== ('buffer' as any)) {
// throw new Error('Unknown encoding')
// }

// if (!parentPort) {
// throw new Error('parentPort is undefined')
// }

// const buffer: Buffer = chunk
// const msg: MessageRes = {
// id,
// type: 'buf',
// buf: buffer.buffer,
// offset: buffer.byteOffset,
// len: buffer.length,
// }
// parentPort.postMessage(msg, [msg.buf])
// callback()
// },
// final(callback) {
// if (!parentPort) {
// throw new Error('parentPort is undefined')
// }

// const mesg: MessageRes = { id, type: 'end' }
// parentPort.postMessage(mesg)
// callback()
// },
// })
// pipeable.pipe(writable)
// } catch (err) {
// if (!parentPort) {
// throw new Error('parentPort is undefined')
// }
pipeable.pipe(writable)
} catch (err) {
if (!parentPort) {
throw new Error('parentPort is undefined')
}

// const mesg: MessageRes = { id, type: 'err', err }
// parentPort.postMessage(mesg)
// }
// }
const message: MessageRes = { id, type: 'err', err }
parentPort.postMessage(message)
}
}

// const handleGetCustomModules = async (
// mesg: MessageReq & { type: 'getCustomModules' }
Expand Down Expand Up @@ -162,38 +171,42 @@ const vitePromise = createServer({
appType: 'custom',
})

// const shutdown = async () => {
// const vite = await vitePromise
// await vite.close()
// if (!parentPort) {
// throw new Error('parentPort is undefined')
// }
const shutdown = async () => {
const vite = await vitePromise
await vite.close()
if (!parentPort) {
throw new Error('parentPort is undefined')
}

// parentPort.close()
// }
parentPort.close()
}

const loadServerFile = async (fname: string) => {
const vite = await vitePromise
return vite.ssrLoadModule(fname)
}

// if (!parentPort) {
// throw new Error('parentPort is undefined')
// }
if (!parentPort) {
throw new Error('parentPort is undefined')
}

// parentPort.on('message', (mesg: MessageReq) => {
// if (mesg.type === 'shutdown') {
// shutdown()
// } else if (mesg.type === 'setClientEntries') {
// handleSetClientEntries(mesg)
// } else if (mesg.type === 'render') {
// handleRender(mesg)
// } else if (mesg.type === 'getCustomModules') {
// handleGetCustomModules(mesg)
// } else if (mesg.type === 'build') {
// handleBuild(mesg)
// }
// })
parentPort.on('message', (message: MessageReq) => {
console.log('message', message)
console.log('message.input.args', (message as any).input?.args)
console.log('message.input.args[0]', (message as any).input?.args?.[0])

if (message.type === 'shutdown') {
shutdown()
} else if (message.type === 'setClientEntries') {
handleSetClientEntries(message)
} else if (message.type === 'render') {
handleRender(message)
// } else if (message.type === 'getCustomModules') {
// handleGetCustomModules(message)
// } else if (message.type === 'build') {
// handleBuild(message)
}
})

const configPromise = resolveConfig('serve')

Expand Down Expand Up @@ -253,7 +266,7 @@ const resolveClientEntry = (
return clientEntry
}

export async function setClientEntries(
async function setClientEntries(
value: 'load' | Record<string, string>
): Promise<void> {
if (value !== 'load') {
Expand Down Expand Up @@ -286,7 +299,16 @@ export async function setClientEntries(
)
}

export async function renderRSC(input: RenderInput): Promise<PipeableStream> {
interface SerializedFormData {
__formData__: boolean
state: Record<string, string | string[]>
}

function isSerializedFormData(data?: unknown): data is SerializedFormData {
return !!data && (data as SerializedFormData)?.__formData__
}

async function renderRsc(input: RenderInput): Promise<PipeableStream> {
const config = await configPromise
const bundlerConfig = new Proxy(
{},
Expand All @@ -303,13 +325,31 @@ export async function renderRSC(input: RenderInput): Promise<PipeableStream> {
}
)

console.log('renderRSC input', input)
console.log('renderRsc input', input)

if (input.rsfId && input.args) {
const [fileId, name] = input.rsfId.split('#')
const fname = path.join(config.root, fileId)
const mod = await loadServerFile(fname)
const data = await (mod[name] || mod)(...input.args)
console.log('Server Action, fileId', fileId, 'name', name, 'fname', fname)
const module = await loadServerFile(fname)

if (isSerializedFormData(input.args[0])) {
const formData = new FormData()

Object.entries(input.args[0].state).forEach(([key, value]) => {
if (Array.isArray(value)) {
value.forEach((v) => {
formData.append(key, v)
})
} else {
formData.append(key, value)
}
})

input.args[0] = formData
}

const data = await (module[name] || module)(...input.args)
if (!input.rscId) {
return renderToPipeableStream(data, bundlerConfig)
}
Expand Down
Loading

0 comments on commit aa764bc

Please sign in to comment.