Skip to content

Commit

Permalink
chore(worker-api/impl): pass stream to parent (#324)
Browse files Browse the repository at this point in the history
* init

* message callbacks delete

* update

* update

* Update packages/waku/src/lib/handlers/dev-worker-impl.ts

Co-authored-by: Daishi Kato <[email protected]>

* update

* main -> worker

* prettier

* transferlist

* reference lost fix

* Update packages/waku/src/lib/handlers/dev-worker-api.ts

Co-authored-by: Daishi Kato <[email protected]>

* Update packages/waku/src/lib/handlers/dev-worker-api.ts

Co-authored-by: Daishi Kato <[email protected]>

* remove pipe message

* remove async

* ssr fix

* transform stream (removing controllerMap)

* lint

* update

* avoid non null assertion

---------

Co-authored-by: Daishi Kato <[email protected]>
Co-authored-by: daishi <[email protected]>
  • Loading branch information
3 people authored Jan 1, 2024
1 parent c7aed3c commit 58423ef
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 113 deletions.
94 changes: 23 additions & 71 deletions packages/waku/src/lib/handlers/dev-worker-api.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import type { Worker as WorkerOrig } from 'node:worker_threads';
import type {
TransferListItem,
Worker as WorkerType,
} from 'node:worker_threads';

import type { ResolvedConfig } from '../config.js';
import type { ModuleImportResult } from './types.js';
Expand All @@ -19,34 +22,28 @@ export type BuildOutput = {
htmlFiles: string[];
};

export type MessageReq =
| ({
id: number;
type: 'render';
hasModuleIdCallback: boolean;
} & Omit<RenderRequest, 'stream' | 'moduleIdCallback'>)
| { id: number; type: 'buf'; buf: ArrayBuffer; offset: number; len: number }
| { id: number; type: 'end' }
| { id: number; type: 'err'; err: unknown };
export type MessageReq = {
id: number;
type: 'render';
hasModuleIdCallback: boolean;
} & Omit<RenderRequest, 'moduleIdCallback'>;

export type MessageRes =
| { type: 'full-reload' }
| { type: 'hot-import'; source: string }
| { type: 'module-import'; result: ModuleImportResult }
| { id: number; type: 'start'; context: unknown }
| { id: number; type: 'buf'; buf: ArrayBuffer; offset: number; len: number }
| { id: number; type: 'end' }
| { id: number; type: 'start'; context: unknown; stream: ReadableStream }
| { id: number; type: 'err'; err: unknown; statusCode?: number }
| { id: number; type: 'moduleId'; moduleId: string };

const messageCallbacks = new Map<number, (mesg: MessageRes) => void>();

let lastWorker: Promise<WorkerOrig> | undefined;
let lastWorker: Promise<WorkerType> | undefined;
const getWorker = () => {
if (lastWorker) {
return lastWorker;
}
return (lastWorker = new Promise<WorkerOrig>((resolve, reject) => {
return (lastWorker = new Promise<WorkerType>((resolve, reject) => {
Promise.all([
import('node:worker_threads').catch((e) => {
throw e;
Expand Down Expand Up @@ -129,69 +126,23 @@ export async function renderRscWithWorker<Context>(
): Promise<readonly [ReadableStream, Context]> {
const worker = await getWorker();
const id = nextId++;
const pipe = async () => {
if (rr.stream) {
const reader = rr.stream.getReader();
try {
let result: ReadableStreamReadResult<unknown>;
do {
result = await reader.read();
if (result.value) {
const buf = result.value;
let mesg: MessageReq;
if (buf instanceof ArrayBuffer) {
mesg = { id, type: 'buf', buf, offset: 0, len: buf.byteLength };
} else if (buf instanceof Uint8Array) {
mesg = {
id,
type: 'buf',
buf: buf.buffer,
offset: buf.byteOffset,
len: buf.byteLength,
};
} else {
throw new Error('Unexepected buffer type');
}
worker.postMessage(mesg, [mesg.buf]);
}
} while (!result.done);
} catch (err) {
const mesg: MessageReq = { id, type: 'err', err };
worker.postMessage(mesg);
}
}
const mesg: MessageReq = { id, type: 'end' };
worker.postMessage(mesg);
};
let started = false;
return new Promise((resolve, reject) => {
let controller: ReadableStreamDefaultController<Uint8Array>;
const stream = new ReadableStream({
start(c) {
controller = c;
},
});
messageCallbacks.set(id, (mesg) => {
if (mesg.type === 'start') {
if (!started) {
started = true;
resolve([stream, mesg.context as Context]);
const bridge = new TransformStream({
flush() {
messageCallbacks.delete(id);
},
});
resolve([mesg.stream.pipeThrough(bridge), mesg.context as Context]);
} else {
throw new Error('already started');
}
} else if (mesg.type === 'buf') {
if (!started) {
throw new Error('not yet started');
}
controller.enqueue(new Uint8Array(mesg.buf, mesg.offset, mesg.len));
} else if (mesg.type === 'moduleId') {
rr.moduleIdCallback?.(mesg.moduleId);
} else if (mesg.type === 'end') {
if (!started) {
throw new Error('not yet started');
}
controller.close();
messageCallbacks.delete(id);
} else if (mesg.type === 'err') {
const err =
mesg.err instanceof Error ? mesg.err : new Error(String(mesg.err));
Expand All @@ -200,8 +151,6 @@ export async function renderRscWithWorker<Context>(
}
if (!started) {
reject(err);
} else {
controller.error(err);
}
messageCallbacks.delete(id);
}
Expand All @@ -214,9 +163,12 @@ export async function renderRscWithWorker<Context>(
id,
type: 'render',
hasModuleIdCallback: !!rr.moduleIdCallback,
...(rr.stream ? { stream: rr.stream } : {}),
...copied,
};
worker.postMessage(mesg);
pipe();
worker.postMessage(
mesg,
rr.stream ? [rr.stream as unknown as TransferListItem] : undefined,
);
});
}
52 changes: 10 additions & 42 deletions packages/waku/src/lib/handlers/dev-worker-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import { pathToFileURL } from 'node:url';
import { parentPort } from 'node:worker_threads';
import { Server } from 'node:http';
import type { TransferListItem } from 'node:worker_threads';
import { createServer as createViteServer } from 'vite';

import type { EntriesDev } from '../../server.js';
import type { ResolvedConfig } from '../config.js';
import { joinPath, fileURLToFilePath } from '../utils/path.js';
import { hasStatusCode, deepFreeze } from '../renderers/utils.js';
import { deepFreeze, hasStatusCode } from '../renderers/utils.js';
import type {
MessageReq,
MessageRes,
Expand All @@ -27,7 +28,6 @@ const HAS_MODULE_REGISTER = typeof module.register === 'function';
if (HAS_MODULE_REGISTER) {
module.register('waku/node-loader', pathToFileURL('./'));
}
const controllerMap = new Map<number, ReadableStreamDefaultController>();

(globalThis as any).__WAKU_PRIVATE_ENV__ = JSON.parse(
process.env.__WAKU_PRIVATE_ENV__!,
Expand All @@ -37,12 +37,6 @@ const handleRender = async (mesg: MessageReq & { type: 'render' }) => {
const { id, type: _removed, hasModuleIdCallback, ...rest } = mesg;
const rr: RenderRequest = rest;
try {
const stream = new ReadableStream({
start(controller) {
controllerMap.set(id, controller);
},
});
rr.stream = stream;
if (hasModuleIdCallback) {
rr.moduleIdCallback = (moduleId: string) => {
const mesg: MessageRes = { id, type: 'moduleId', moduleId };
Expand All @@ -55,36 +49,21 @@ const handleRender = async (mesg: MessageReq & { type: 'render' }) => {
searchParams: new URLSearchParams(rr.searchParamsString),
method: rr.method,
context: rr.context,
body: rr.stream,
...(rr.stream ? { body: rr.stream } : {}),
contentType: rr.contentType,
...(rr.moduleIdCallback ? { moduleIdCallback: rr.moduleIdCallback } : {}),
isDev: true,
customImport: loadServerFile,
entries: await loadEntries(rr.config),
});
const mesg: MessageRes = { id, type: 'start', context: rr.context };
parentPort!.postMessage(mesg);
const mesg: MessageRes = {
id,
type: 'start',
context: rr.context,
stream: readable,
};
parentPort!.postMessage(mesg, [readable as unknown as TransferListItem]);
deepFreeze(rr.context);
const writable = new WritableStream({
write(chunk) {
if (!(chunk instanceof Uint8Array)) {
throw new Error('Unknown chunk type');
}
const mesg: MessageRes = {
id,
type: 'buf',
buf: chunk.buffer,
offset: chunk.byteOffset,
len: chunk.byteLength,
};
parentPort!.postMessage(mesg, [mesg.buf]);
},
close() {
const mesg: MessageRes = { id, type: 'end' };
parentPort!.postMessage(mesg);
},
});
readable.pipeTo(writable);
} catch (err) {
const mesg: MessageRes = { id, type: 'err', err };
if (hasStatusCode(err)) {
Expand Down Expand Up @@ -148,16 +127,5 @@ const loadEntries = async (config: ResolvedConfig) => {
parentPort!.on('message', (mesg: MessageReq) => {
if (mesg.type === 'render') {
handleRender(mesg);
} else if (mesg.type === 'buf') {
const controller = controllerMap.get(mesg.id)!;
controller.enqueue(new Uint8Array(mesg.buf, mesg.offset, mesg.len));
} else if (mesg.type === 'end') {
const controller = controllerMap.get(mesg.id)!;
controller.close();
} else if (mesg.type === 'err') {
const controller = controllerMap.get(mesg.id)!;
const err =
mesg.err instanceof Error ? mesg.err : new Error(String(mesg.err));
controller.error(err);
}
});

1 comment on commit 58423ef

@vercel
Copy link

@vercel vercel bot commented on 58423ef Jan 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

waku – ./

waku-daishi.vercel.app
waku-git-main-daishi.vercel.app
waku.vercel.app
www.waku.gg
waku.gg

Please sign in to comment.