From 3e0be92e36249f8bfa172b121c9dcf587f72d1a3 Mon Sep 17 00:00:00 2001 From: Jeremy Tuloup Date: Wed, 14 Aug 2024 18:16:37 +0200 Subject: [PATCH 01/12] Add comlink dependency --- .gitignore | 3 ++- package.json | 3 ++- yarn.lock | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index c080c6f..99b3fd0 100644 --- a/.gitignore +++ b/.gitignore @@ -128,5 +128,6 @@ dmypy.json experiment.sh env.yml -# Jupyterlite cache +# JupyterLite .jupyterlite.doit.db +_output diff --git a/package.json b/package.json index 8159edf..9de20ef 100644 --- a/package.json +++ b/package.json @@ -60,7 +60,8 @@ "@jupyterlite/server": "^0.2.0 || ^0.3.0 || ^0.4.0", "@lumino/coreutils": "^2", "@lumino/signaling": "^2", - "coincident": "^1.2.3" + "coincident": "^1.2.3", + "comlink": "^4.4.1" }, "devDependencies": { "@jupyterlab/builder": "^4.1.0", diff --git a/yarn.lock b/yarn.lock index 73011db..c3e0bd0 100644 --- a/yarn.lock +++ b/yarn.lock @@ -473,6 +473,7 @@ __metadata: "@typescript-eslint/eslint-plugin": ^6.1.0 "@typescript-eslint/parser": ^6.1.0 coincident: ^1.2.3 + comlink: ^4.4.1 css-loader: ^6.7.1 eslint: ^8.36.0 eslint-config-prettier: ^8.8.0 @@ -1579,7 +1580,7 @@ __metadata: languageName: node linkType: hard -"comlink@npm:^4.3.1": +"comlink@npm:^4.3.1, comlink@npm:^4.4.1": version: 4.4.1 resolution: "comlink@npm:4.4.1" checksum: 16d58a8f590087fc45432e31d6c138308dfd4b75b89aec0b7f7bb97ad33d810381bd2b1e608a1fb2cf05979af9cbfcdcaf1715996d5fcf77aeb013b6da3260af From a2d38c3f1f5d203c63e254b92193b3e47bb9254c Mon Sep 17 00:00:00 2001 From: Jeremy Tuloup Date: Fri, 16 Aug 2024 10:55:33 +0200 Subject: [PATCH 02/12] Use either coincident or comlink --- src/coincident.worker.ts | 78 +++++++++++ src/comlink.worker.ts | 65 +++++++++ src/index.ts | 2 +- src/tokens.ts | 83 ++++++++++++ src/web_worker_kernel.ts | 119 +++++++++++----- src/worker.ts | 285 ++++++++++++++++++--------------------- worker.webpack.config.js | 7 +- 7 files changed, 448 insertions(+), 191 deletions(-) create mode 100644 src/coincident.worker.ts create mode 100644 src/comlink.worker.ts create mode 100644 src/tokens.ts diff --git a/src/coincident.worker.ts b/src/coincident.worker.ts new file mode 100644 index 0000000..c0da12c --- /dev/null +++ b/src/coincident.worker.ts @@ -0,0 +1,78 @@ +// Copyright (c) Thorsten Beier +// Copyright (c) JupyterLite Contributors +// Distributed under the terms of the Modified BSD License. + +import coincident from 'coincident'; + +import { + ContentsAPI, + DriveFS, + TDriveRequest, + TDriveMethod, + TDriveResponse +} from '@jupyterlite/contents'; + +import { IXeusWorkerKernel } from './tokens'; +import { XeusRemoteKernel } from './worker'; + +const workerAPI = coincident(self) as IXeusWorkerKernel; + +/** + * An Emscripten-compatible synchronous Contents API using shared array buffers. + */ +export class SharedBufferContentsAPI extends ContentsAPI { + request(data: TDriveRequest): TDriveResponse { + return workerAPI.processDriveRequest(data); + } +} + +/** + * A custom drive implementation which uses shared array buffers (via coincident) if available + */ +class XeusDriveFS extends DriveFS { + createAPI(options: DriveFS.IOptions): ContentsAPI { + return new SharedBufferContentsAPI( + options.driveName, + options.mountpoint, + options.FS, + options.ERRNO_CODES + ); + } +} + +export class XeusCoincidentKernel extends XeusRemoteKernel { + /** + * Setup custom Emscripten FileSystem + */ + protected async initFilesystem( + options: IXeusWorkerKernel.IOptions + ): Promise { + if (options.mountDrive) { + const mountpoint = '/drive'; + const { FS, PATH, ERRNO_CODES } = globalThis.Module; + const { baseUrl } = options; + + const driveFS = new XeusDriveFS({ + FS, + PATH, + ERRNO_CODES, + baseUrl, + driveName: this._driveName, + mountpoint + }); + FS.mkdir(mountpoint); + FS.mount(driveFS, {}, mountpoint); + FS.chdir(mountpoint); + this._driveFS = driveFS; + } + } +} + +const worker = new XeusCoincidentKernel(); + +const sendWorkerMessage = workerAPI.processWorkerMessage.bind(workerAPI); +worker.registerCallback(sendWorkerMessage); + +workerAPI.initialize = worker.initialize.bind(worker); +workerAPI.cd = worker.cd.bind(worker); +workerAPI.isDir = worker.isDir.bind(worker); diff --git a/src/comlink.worker.ts b/src/comlink.worker.ts new file mode 100644 index 0000000..b56d3c4 --- /dev/null +++ b/src/comlink.worker.ts @@ -0,0 +1,65 @@ +// Copyright (c) Jupyter Development Team. +// Distributed under the terms of the Modified BSD License. + +/** + * A WebWorker entrypoint that uses comlink to handle postMessage details + */ + +import { expose } from 'comlink'; + +import { + ContentsAPI, + DriveFS, + ServiceWorkerContentsAPI +} from '@jupyterlite/contents'; + +import { IXeusWorkerKernel } from './tokens'; + +import { XeusRemoteKernel } from './worker'; + +/** + * A custom drive implementation which uses the service worker + */ +class XeusDriveFS extends DriveFS { + createAPI(options: DriveFS.IOptions): ContentsAPI { + return new ServiceWorkerContentsAPI( + options.baseUrl, + options.driveName, + options.mountpoint, + options.FS, + options.ERRNO_CODES + ); + } +} + +export class XeusComlinkKernel extends XeusRemoteKernel { + /** + * Setup custom Emscripten FileSystem + */ + protected async initFilesystem( + options: IXeusWorkerKernel.IOptions + ): Promise { + if (options.mountDrive) { + const mountpoint = '/drive'; + const { FS, PATH, ERRNO_CODES } = globalThis.Module; + const { baseUrl } = options; + + const driveFS = new XeusDriveFS({ + FS, + PATH, + ERRNO_CODES, + baseUrl, + driveName: this._driveName, + mountpoint + }); + FS.mkdir(mountpoint); + FS.mount(driveFS, {}, mountpoint); + FS.chdir(mountpoint); + this._driveFS = driveFS; + } + } +} + +const worker = new XeusComlinkKernel(); + +expose(worker); diff --git a/src/index.ts b/src/index.ts index ff90264..44d2864 100644 --- a/src/index.ts +++ b/src/index.ts @@ -76,7 +76,7 @@ const plugins = kernel_list.map((kernel): JupyterLiteServerPlugin => { ...options, contentsManager, mountDrive, - kernelspec + kernelSpec: kernelspec }); } }); diff --git a/src/tokens.ts b/src/tokens.ts new file mode 100644 index 0000000..b0609a6 --- /dev/null +++ b/src/tokens.ts @@ -0,0 +1,83 @@ +// Copyright (c) Jupyter Development Team. +// Distributed under the terms of the Modified BSD License. + +/** + * Definitions for the Pyodide kernel. + */ + +import { + TDriveMethod, + TDriveRequest, + TDriveResponse +} from '@jupyterlite/contents'; + +import { IWorkerKernel } from '@jupyterlite/kernel'; + +/** + * An interface for Xeus workers. + */ +export interface IXeusWorkerKernel extends IWorkerKernel { + /** + * Handle any lazy initialization activities. + */ + initialize(options: IXeusWorkerKernel.IOptions): Promise; + + /** + * Process drive request + * @param data + */ + processDriveRequest( + data: TDriveRequest + ): TDriveResponse; + + /** + * Process worker message + * @param msg + */ + processWorkerMessage(msg: any): void; + + /** + * Register a callback for handling messages from the worker. + */ + registerCallback(callback: (msg: any) => void): void; + + /** + * Whether the kernel is ready. + * @returns a promise that resolves when the kernel is ready. + */ + ready(): Promise; + + /** + * Mount a drive + * @param driveName The name of the drive + * @param mountpoint The mountpoint of the drive + * @param baseUrl The base URL of the server + */ + mount(driveName: string, mountpoint: string, baseUrl: string): Promise; + + /** + * Change the current working directory + * @param path The path to change to + */ + cd(path: string): Promise; + + /** + * Check if a path is a directory + * @param path The path to check + */ + isDir(path: string): Promise; +} + +/** + * An namespace for Xeus workers. + */ +export namespace IXeusWorkerKernel { + /** + * Initialization options for a worker. + */ + export interface IOptions extends IWorkerKernel.IOptions { + baseUrl: string; + kernelSpec: any; + mountDrive: boolean; + } +} diff --git a/src/web_worker_kernel.ts b/src/web_worker_kernel.ts index e11e2ce..1e0a3cd 100644 --- a/src/web_worker_kernel.ts +++ b/src/web_worker_kernel.ts @@ -4,6 +4,8 @@ import coincident from 'coincident'; +import { Remote, proxy, wrap } from 'comlink'; + import { ISignal, Signal } from '@lumino/signaling'; import { PromiseDelegate } from '@lumino/coreutils'; @@ -17,21 +19,7 @@ import { TDriveRequest } from '@jupyterlite/contents'; -interface IXeusKernel { - initialize(kernel_spec: any, base_url: string): Promise; - - ready(): Promise; - - mount(driveName: string, mountpoint: string, baseUrl: string): Promise; - - cd(path: string): Promise; - - isDir(path: string): Promise; - - processMessage(msg: any): Promise; - - processDriveRequest(data: any): void; -} +import { IXeusWorkerKernel } from './tokens'; export class WebWorkerKernel implements IKernel { /** @@ -40,27 +28,78 @@ export class WebWorkerKernel implements IKernel { * @param options The instantiation options for a new WebWorkerKernel */ constructor(options: WebWorkerKernel.IOptions) { - const { id, name, sendMessage, location, kernelspec, contentsManager } = + const { id, name, sendMessage, location, kernelSpec, contentsManager } = options; this._id = id; this._name = name; this._location = location; - this._kernelspec = kernelspec; + this._kernelSpec = kernelSpec; this._contentsManager = contentsManager; this._sendMessage = sendMessage; - this._worker = new Worker(new URL('./worker.js', import.meta.url), { - type: 'module' - }); + this._worker = this.initWorker(options); + this._remoteKernel = this.initRemote(options); - this._worker.onmessage = this._processWorkerMessage.bind(this); + this.setupFilesystemAPIs(); - this._remote = coincident(this._worker) as IXeusKernel; + this.initFileSystem(options); + } - this.setupFilesystemAPIs(); + /** + * Load the worker. + */ + protected initWorker(options: WebWorkerKernel.IOptions): Worker { + if (crossOriginIsolated) { + return new Worker(new URL('./coincident.worker.js', import.meta.url), { + type: 'module' + }); + } else { + return new Worker(new URL('./comlink.worker.js', import.meta.url), { + type: 'module' + }); + } + } - this._remote.initialize(this._kernelspec, PageConfig.getBaseUrl()); + /** + * Initialize the remote kernel. + * Use coincident if crossOriginIsolated, comlink otherwise + * See the two following issues for more context: + * - https://github.com/jupyterlite/jupyterlite/issues/1424 + * - https://github.com/jupyterlite/pyodide-kernel/pull/126 + */ + protected initRemote(options: WebWorkerKernel.IOptions): IXeusWorkerKernel { + let remote: IXeusWorkerKernel; + if (crossOriginIsolated) { + remote = coincident(this._worker) as IXeusWorkerKernel; + remote.processWorkerMessage = this._processWorkerMessage.bind(this); + // The coincident worker uses its own filesystem API: + (remote.processDriveRequest as any) = async ( + data: TDriveRequest + ) => { + if (!DriveContentsProcessor) { + throw new Error( + 'File system calls over Atomics.wait is only supported with jupyterlite>=0.4.0a3' + ); + } + + if (this._contentsProcessor === undefined) { + this._contentsProcessor = new DriveContentsProcessor({ + contentsManager: this._contentsManager + }); + } + + return await this._contentsProcessor.processDriveRequest(data); + }; + } else { + remote = wrap(this._worker) as IXeusWorkerKernel; + remote.registerCallback(proxy(this._processWorkerMessage.bind(this))); + } + remote.initialize({ + kernelSpec: this._kernelSpec, + baseUrl: PageConfig.getBaseUrl(), + mountDrive: options.mountDrive + }); - this.initFileSystem(options); + return remote; } async handleMessage(msg: KernelMessage.IMessage): Promise { @@ -75,7 +114,7 @@ export class WebWorkerKernel implements IKernel { this._executeDelegate = new PromiseDelegate(); } - await this._remote.processMessage({ msg, parent: this.parent }); + this._remoteKernel.processWorkerMessage({ msg, parent: this.parent }); if (msg.header.msg_type !== 'input_reply') { return await this._executeDelegate.promise; } @@ -105,7 +144,7 @@ export class WebWorkerKernel implements IKernel { } /** - * Process a message coming from the pyodide web worker. + * Process a message coming from the xeus web worker. * * @param msg The worker message to process. */ @@ -157,7 +196,7 @@ export class WebWorkerKernel implements IKernel { } this._worker.terminate(); (this._worker as any) = null; - (this._remote as any) = null; + (this._remoteKernel as any) = null; this._isDisposed = true; this._disposed.emit(void 0); } @@ -177,7 +216,9 @@ export class WebWorkerKernel implements IKernel { } private setupFilesystemAPIs() { - this._remote.processDriveRequest = async ( + (this._remoteKernel as any).processDriveRequest = async < + T extends TDriveMethod + >( data: TDriveRequest ) => { if (!DriveContentsProcessor) { @@ -210,24 +251,28 @@ export class WebWorkerKernel implements IKernel { localPath = options.location; } - await this._remote.ready(); + await this._remoteKernel.ready(); - await this._remote.mount(driveName, '/drive', PageConfig.getBaseUrl()); + await this._remoteKernel.mount( + driveName, + '/drive', + PageConfig.getBaseUrl() + ); - if (await this._remote.isDir('/files')) { - await this._remote.cd('/files'); + if (await this._remoteKernel.isDir('/files')) { + await this._remoteKernel.cd('/files'); } else { - await this._remote.cd(localPath); + await this._remoteKernel.cd(localPath); } } - private _kernelspec: any; + private _kernelSpec: any; private _id: string; private _name: string; private _location: string; private _contentsManager: Contents.IManager; private _contentsProcessor: DriveContentsProcessor | undefined = undefined; - private _remote: IXeusKernel; + private _remoteKernel: IXeusWorkerKernel | Remote; private _isDisposed = false; private _disposed = new Signal(this); private _worker: Worker; @@ -249,6 +294,6 @@ export namespace WebWorkerKernel { export interface IOptions extends IKernel.IOptions { contentsManager: Contents.IManager; mountDrive: boolean; - kernelspec: any; + kernelSpec: any; } } diff --git a/src/worker.ts b/src/worker.ts index 7180b15..0059d0e 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -1,56 +1,13 @@ -// Copyright (c) Thorsten Beier // Copyright (c) JupyterLite Contributors // Distributed under the terms of the Modified BSD License. -import coincident from 'coincident'; - -import { - ContentsAPI, - DriveFS, - TDriveRequest, - TDriveMethod, - TDriveResponse, - ServiceWorkerContentsAPI -} from '@jupyterlite/contents'; - import { URLExt } from '@jupyterlab/coreutils'; -declare function createXeusModule(options: any): any; +import type { DriveFS } from '@jupyterlite/contents'; +import { IXeusWorkerKernel } from './tokens'; globalThis.Module = {}; -const workerAPI = coincident(self) as typeof globalThis; - -/** - * An Emscripten-compatible synchronous Contents API using shared array buffers. - */ -export class SharedBufferContentsAPI extends ContentsAPI { - request(data: TDriveRequest): TDriveResponse { - return workerAPI.processDriveRequest(data); - } -} - -class XeusDriveFS extends DriveFS { - createAPI(options: DriveFS.IOptions): ContentsAPI { - if (crossOriginIsolated) { - return new SharedBufferContentsAPI( - options.driveName, - options.mountpoint, - options.FS, - options.ERRNO_CODES - ); - } else { - return new ServiceWorkerContentsAPI( - options.baseUrl, - options.driveName, - options.mountpoint, - options.FS, - options.ERRNO_CODES - ); - } - } -} - // when a toplevel cell uses an await, the cell is implicitly // wrapped in a async function. Since the webloop - eventloop // implementation does not support `eventloop.run_until_complete(f)` @@ -62,8 +19,9 @@ class XeusDriveFS extends DriveFS { globalThis.toplevel_promise = null; globalThis.toplevel_promise_py_proxy = null; +declare function createXeusModule(options: any): any; + let resolveInputReply: any; -let drive: XeusDriveFS; let kernelReady: (value: unknown) => void; let rawXKernel: any; let rawXServer: any; @@ -97,127 +55,152 @@ globalThis.ready = new Promise(resolve => { kernelReady = resolve; }); -workerAPI.mount = ( - driveName: string, - mountpoint: string, - baseUrl: string -): void => { - const { FS, PATH, ERRNO_CODES } = globalThis.Module; +export class XeusRemoteKernel { + constructor(options: XeusRemoteKernel.IOptions = {}) {} - if (!FS) { - return; + async ready(): Promise { + return await globalThis.ready; } - drive = new XeusDriveFS({ - FS, - PATH, - ERRNO_CODES, - baseUrl, - driveName, - mountpoint - }); - - FS.mkdir(mountpoint); - FS.mount(drive, {}, mountpoint); - FS.chdir(mountpoint); -}; - -workerAPI.ready = async (): Promise => { - return await globalThis.ready; -}; + async cd(path: string): Promise { + if (!path || !globalThis.Module.FS) { + return; + } -workerAPI.cd = (path: string) => { - if (!path || !globalThis.Module.FS) { - return; + globalThis.Module.FS.chdir(path); } - globalThis.Module.FS.chdir(path); -}; - -workerAPI.isDir = (path: string) => { - try { - const lookup = globalThis.Module.FS.lookupPath(path); - return globalThis.Module.FS.isDir(lookup.node.mode); - } catch (e) { - return false; + async isDir(path: string): Promise { + try { + const lookup = globalThis.Module.FS.lookupPath(path); + return globalThis.Module.FS.isDir(lookup.node.mode); + } catch (e) { + return false; + } } -}; -workerAPI.processMessage = async (event: any): Promise => { - const msg_type = event.msg.header.msg_type; + async processMessage(event: any): Promise { + const msg_type = event.msg.header.msg_type; - await globalThis.ready; + await globalThis.ready; - if ( - globalThis.toplevel_promise !== null && - globalThis.toplevel_promise_py_proxy !== null - ) { - await globalThis.toplevel_promise; - globalThis.toplevel_promise_py_proxy.delete(); - globalThis.toplevel_promise_py_proxy = null; - globalThis.toplevel_promise = null; - } + if ( + globalThis.toplevel_promise !== null && + globalThis.toplevel_promise_py_proxy !== null + ) { + await globalThis.toplevel_promise; + globalThis.toplevel_promise_py_proxy.delete(); + globalThis.toplevel_promise_py_proxy = null; + globalThis.toplevel_promise = null; + } - if (msg_type === 'input_reply') { - resolveInputReply(event.msg); - } else { - rawXServer.notify_listener(event.msg); + if (msg_type === 'input_reply') { + resolveInputReply(event.msg); + } else { + rawXServer.notify_listener(event.msg); + } } -}; - -workerAPI.initialize = async (kernel_spec: any, base_url: string) => { - // location of the kernel binary on the server - const binary_js = URLExt.join(base_url, kernel_spec.argv[0]); - const binary_wasm = binary_js.replace('.js', '.wasm'); - - importScripts(binary_js); - globalThis.Module = await createXeusModule({ - locateFile: (file: string) => { - if (file.endsWith('.wasm')) { - return binary_wasm; + + async initialize(options: IXeusWorkerKernel.IOptions): Promise { + const { baseUrl, kernelSpec } = options; + // location of the kernel binary on the server + const binary_js = URLExt.join(baseUrl, kernelSpec.argv[0]); + const binary_wasm = binary_js.replace('.js', '.wasm'); + + importScripts(binary_js); + globalThis.Module = await createXeusModule({ + locateFile: (file: string) => { + if (file.endsWith('.wasm')) { + return binary_wasm; + } + return file; + } + }); + try { + await waitRunDependency(); + + // each kernel can have a `async_init` function + // which can do kernel specific **async** initialization + // This function is usually implemented in the pre/post.js + // in the emscripten build of that kernel + if (globalThis.Module['async_init'] !== undefined) { + const kernel_root_url = URLExt.join( + baseUrl, + `xeus/kernels/${kernelSpec.dir}` + ); + const pkg_root_url = URLExt.join(baseUrl, 'xeus/kernel_packages'); + const verbose = true; + await globalThis.Module['async_init']( + kernel_root_url, + pkg_root_url, + verbose + ); } - return file; - } - }); - try { - await waitRunDependency(); - - // each kernel can have a `async_init` function - // which can do kernel specific **async** initialization - // This function is usually implemented in the pre/post.js - // in the emscripten build of that kernel - if (globalThis.Module['async_init'] !== undefined) { - const kernel_root_url = URLExt.join( - base_url, - `xeus/kernels/${kernel_spec.dir}` - ); - const pkg_root_url = URLExt.join(base_url, 'xeus/kernel_packages'); - const verbose = true; - await globalThis.Module['async_init']( - kernel_root_url, - pkg_root_url, - verbose - ); - } - await waitRunDependency(); + await waitRunDependency(); - rawXKernel = new globalThis.Module.xkernel(); - rawXServer = rawXKernel.get_server(); - if (!rawXServer) { - console.error('Failed to start kernel!'); + rawXKernel = new globalThis.Module.xkernel(); + rawXServer = rawXKernel.get_server(); + if (!rawXServer) { + console.error('Failed to start kernel!'); + } + rawXKernel.start(); + } catch (e) { + if (typeof e === 'number') { + const msg = globalThis.Module.get_exception_message(e); + console.error(msg); + throw new Error(msg); + } else { + console.error(e); + throw e; + } } - rawXKernel.start(); - } catch (e) { - if (typeof e === 'number') { - const msg = globalThis.Module.get_exception_message(e); - console.error(msg); - throw new Error(msg); - } else { - console.error(e); - throw e; + + await this.initFilesystem(options); + + kernelReady(1); + } + + /** + * Setup custom Emscripten FileSystem + */ + protected async initFilesystem( + options: IXeusWorkerKernel.IOptions + ): Promise { + if (options.mountDrive) { + const mountpoint = '/drive'; + const { FS, PATH, ERRNO_CODES } = globalThis.Module; + const { baseUrl } = options; + const { DriveFS } = await import('@jupyterlite/contents'); + + const driveFS = new DriveFS({ + FS, + PATH, + ERRNO_CODES, + baseUrl, + driveName: this._driveName, + mountpoint + }); + FS.mkdir(mountpoint); + FS.mount(driveFS, {}, mountpoint); + FS.chdir(mountpoint); + this._driveFS = driveFS; } } - kernelReady(1); -}; + /** + * Register the callback function to send messages from the worker back to the main thread. + * @param callback the callback to register + */ + registerCallback(callback: (msg: any) => void): void { + this._sendWorkerMessage = callback; + } + + protected _driveName = ''; + protected _driveFS: DriveFS | null = null; + protected _sendWorkerMessage: (msg: any) => void = () => {}; +} + +export namespace XeusRemoteKernel { + export interface IOptions {} +} diff --git a/worker.webpack.config.js b/worker.webpack.config.js index ab382c3..a3e1da8 100644 --- a/worker.webpack.config.js +++ b/worker.webpack.config.js @@ -18,9 +18,12 @@ const resolve = { module.exports = [ { - entry: './lib/worker.js', + entry: { + ['coincident.worker']: './lib/coincident.worker.js', + ['comlink.worker']: './lib/comlink.worker.js', + }, output: { - filename: 'worker.js', + filename: '[name].js', path: path.resolve(__dirname, 'lib'), libraryTarget: 'amd' }, From fd62f14afe51b2e399bef04c408825c4339953f7 Mon Sep 17 00:00:00 2001 From: Jeremy Tuloup Date: Fri, 16 Aug 2024 10:59:32 +0200 Subject: [PATCH 03/12] typos --- src/tokens.ts | 2 +- src/web_worker_kernel.ts | 2 +- worker.webpack.config.js | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/tokens.ts b/src/tokens.ts index b0609a6..69a2169 100644 --- a/src/tokens.ts +++ b/src/tokens.ts @@ -2,7 +2,7 @@ // Distributed under the terms of the Modified BSD License. /** - * Definitions for the Pyodide kernel. + * Definitions for the Xeus kernel. */ import { diff --git a/src/web_worker_kernel.ts b/src/web_worker_kernel.ts index 1e0a3cd..12480bc 100644 --- a/src/web_worker_kernel.ts +++ b/src/web_worker_kernel.ts @@ -64,7 +64,7 @@ export class WebWorkerKernel implements IKernel { * Use coincident if crossOriginIsolated, comlink otherwise * See the two following issues for more context: * - https://github.com/jupyterlite/jupyterlite/issues/1424 - * - https://github.com/jupyterlite/pyodide-kernel/pull/126 + * - https://github.com/jupyterlite/xeus/issues/102 */ protected initRemote(options: WebWorkerKernel.IOptions): IXeusWorkerKernel { let remote: IXeusWorkerKernel; diff --git a/worker.webpack.config.js b/worker.webpack.config.js index a3e1da8..59e064a 100644 --- a/worker.webpack.config.js +++ b/worker.webpack.config.js @@ -20,7 +20,7 @@ module.exports = [ { entry: { ['coincident.worker']: './lib/coincident.worker.js', - ['comlink.worker']: './lib/comlink.worker.js', + ['comlink.worker']: './lib/comlink.worker.js' }, output: { filename: '[name].js', From c0467613f9fb4444f70da34106b6883b8efdce55 Mon Sep 17 00:00:00 2001 From: Jeremy Tuloup Date: Fri, 16 Aug 2024 11:40:21 +0200 Subject: [PATCH 04/12] cleanup --- src/web_worker_kernel.ts | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/src/web_worker_kernel.ts b/src/web_worker_kernel.ts index 12480bc..96515f7 100644 --- a/src/web_worker_kernel.ts +++ b/src/web_worker_kernel.ts @@ -39,8 +39,6 @@ export class WebWorkerKernel implements IKernel { this._worker = this.initWorker(options); this._remoteKernel = this.initRemote(options); - this.setupFilesystemAPIs(); - this.initFileSystem(options); } @@ -215,29 +213,6 @@ export class WebWorkerKernel implements IKernel { return this._name; } - private setupFilesystemAPIs() { - (this._remoteKernel as any).processDriveRequest = async < - T extends TDriveMethod - >( - data: TDriveRequest - ) => { - if (!DriveContentsProcessor) { - console.error( - 'File system calls over Atomics.wait is only supported with jupyterlite>=0.4.0a3' - ); - return; - } - - if (this._contentsProcessor === undefined) { - this._contentsProcessor = new DriveContentsProcessor({ - contentsManager: this._contentsManager - }); - } - - return await this._contentsProcessor.processDriveRequest(data); - }; - } - private async initFileSystem(options: WebWorkerKernel.IOptions) { let driveName: string; let localPath: string; From d62c5eb5d5c3395352950429ff07a0873b105a11 Mon Sep 17 00:00:00 2001 From: Jeremy Tuloup Date: Fri, 16 Aug 2024 13:22:15 +0200 Subject: [PATCH 05/12] fixes --- src/web_worker_kernel.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/web_worker_kernel.ts b/src/web_worker_kernel.ts index 96515f7..30870d4 100644 --- a/src/web_worker_kernel.ts +++ b/src/web_worker_kernel.ts @@ -64,8 +64,10 @@ export class WebWorkerKernel implements IKernel { * - https://github.com/jupyterlite/jupyterlite/issues/1424 * - https://github.com/jupyterlite/xeus/issues/102 */ - protected initRemote(options: WebWorkerKernel.IOptions): IXeusWorkerKernel { - let remote: IXeusWorkerKernel; + protected initRemote( + options: WebWorkerKernel.IOptions + ): IXeusWorkerKernel | Remote { + let remote: IXeusWorkerKernel | Remote; if (crossOriginIsolated) { remote = coincident(this._worker) as IXeusWorkerKernel; remote.processWorkerMessage = this._processWorkerMessage.bind(this); @@ -88,7 +90,7 @@ export class WebWorkerKernel implements IKernel { return await this._contentsProcessor.processDriveRequest(data); }; } else { - remote = wrap(this._worker) as IXeusWorkerKernel; + remote = wrap(this._worker) as Remote; remote.registerCallback(proxy(this._processWorkerMessage.bind(this))); } remote.initialize({ From 7ad60a6de0c55d1c304c3a16b6c0e3ef3695c3eb Mon Sep 17 00:00:00 2001 From: Jeremy Tuloup Date: Fri, 16 Aug 2024 13:31:21 +0200 Subject: [PATCH 06/12] ready --- src/web_worker_kernel.ts | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/web_worker_kernel.ts b/src/web_worker_kernel.ts index 30870d4..0db9253 100644 --- a/src/web_worker_kernel.ts +++ b/src/web_worker_kernel.ts @@ -93,11 +93,13 @@ export class WebWorkerKernel implements IKernel { remote = wrap(this._worker) as Remote; remote.registerCallback(proxy(this._processWorkerMessage.bind(this))); } - remote.initialize({ - kernelSpec: this._kernelSpec, - baseUrl: PageConfig.getBaseUrl(), - mountDrive: options.mountDrive - }); + remote + .initialize({ + kernelSpec: this._kernelSpec, + baseUrl: PageConfig.getBaseUrl(), + mountDrive: options.mountDrive + }) + .then(this._ready.resolve.bind(this._ready)); return remote; } @@ -170,7 +172,7 @@ export class WebWorkerKernel implements IKernel { * A promise that is fulfilled when the kernel is ready. */ get ready(): Promise { - return Promise.resolve(); + return this._ready.promise; } /** @@ -259,6 +261,7 @@ export class WebWorkerKernel implements IKernel { | KernelMessage.IHeader | undefined = undefined; private _parent: KernelMessage.IMessage | undefined = undefined; + private _ready = new PromiseDelegate(); } /** From 668d0e41d3c8e809af29d9dd6b7fc0f9b10b6b01 Mon Sep 17 00:00:00 2001 From: Jeremy Tuloup Date: Fri, 16 Aug 2024 14:39:03 +0200 Subject: [PATCH 07/12] fixes --- src/tokens.ts | 6 ++++++ src/web_worker_kernel.ts | 4 ++-- src/worker.ts | 1 + 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/tokens.ts b/src/tokens.ts index 69a2169..bf41bdc 100644 --- a/src/tokens.ts +++ b/src/tokens.ts @@ -30,6 +30,12 @@ export interface IXeusWorkerKernel extends IWorkerKernel { data: TDriveRequest ): TDriveResponse; + /** + * Process a message sent from the main thread to the worker. + * @param msg + */ + processMessage(msg: any): void; + /** * Process worker message * @param msg diff --git a/src/web_worker_kernel.ts b/src/web_worker_kernel.ts index 0db9253..cf65afc 100644 --- a/src/web_worker_kernel.ts +++ b/src/web_worker_kernel.ts @@ -116,7 +116,7 @@ export class WebWorkerKernel implements IKernel { this._executeDelegate = new PromiseDelegate(); } - this._remoteKernel.processWorkerMessage({ msg, parent: this.parent }); + await this._remoteKernel.processMessage({ msg, parent: this.parent }); if (msg.header.msg_type !== 'input_reply') { return await this._executeDelegate.promise; } @@ -151,7 +151,7 @@ export class WebWorkerKernel implements IKernel { * @param msg The worker message to process. */ private _processWorkerMessage(msg: any): void { - if (!msg.data.header) { + if (!msg.data?.header) { return; } diff --git a/src/worker.ts b/src/worker.ts index 0059d0e..5859c12 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -1,3 +1,4 @@ +// Copyright (c) Thorsten Beier // Copyright (c) JupyterLite Contributors // Distributed under the terms of the Modified BSD License. From cb707f6d94274ef3f11f5873feb89d82cac9d852 Mon Sep 17 00:00:00 2001 From: martinRenou Date: Fri, 6 Sep 2024 13:17:47 +0200 Subject: [PATCH 08/12] Fix mounting logic --- src/coincident.worker.ts | 38 ++++++++++++++++++++------------------ src/comlink.worker.ts | 40 ++++++++++++++++++++-------------------- src/web_worker_kernel.ts | 7 +++++-- src/worker.ts | 29 ----------------------------- 4 files changed, 45 insertions(+), 69 deletions(-) diff --git a/src/coincident.worker.ts b/src/coincident.worker.ts index c0da12c..9b25e97 100644 --- a/src/coincident.worker.ts +++ b/src/coincident.worker.ts @@ -44,27 +44,29 @@ export class XeusCoincidentKernel extends XeusRemoteKernel { /** * Setup custom Emscripten FileSystem */ - protected async initFilesystem( - options: IXeusWorkerKernel.IOptions + async mount( + driveName: string, + mountpoint: string, + baseUrl: string ): Promise { - if (options.mountDrive) { - const mountpoint = '/drive'; - const { FS, PATH, ERRNO_CODES } = globalThis.Module; - const { baseUrl } = options; + const { FS, PATH, ERRNO_CODES } = globalThis.Module; - const driveFS = new XeusDriveFS({ - FS, - PATH, - ERRNO_CODES, - baseUrl, - driveName: this._driveName, - mountpoint - }); - FS.mkdir(mountpoint); - FS.mount(driveFS, {}, mountpoint); - FS.chdir(mountpoint); - this._driveFS = driveFS; + if (!FS) { + return; } + + const drive = new XeusDriveFS({ + FS, + PATH, + ERRNO_CODES, + baseUrl, + driveName, + mountpoint + }); + + FS.mkdir(mountpoint); + FS.mount(drive, {}, mountpoint); + FS.chdir(mountpoint); } } diff --git a/src/comlink.worker.ts b/src/comlink.worker.ts index b56d3c4..ca62442 100644 --- a/src/comlink.worker.ts +++ b/src/comlink.worker.ts @@ -13,8 +13,6 @@ import { ServiceWorkerContentsAPI } from '@jupyterlite/contents'; -import { IXeusWorkerKernel } from './tokens'; - import { XeusRemoteKernel } from './worker'; /** @@ -36,27 +34,29 @@ export class XeusComlinkKernel extends XeusRemoteKernel { /** * Setup custom Emscripten FileSystem */ - protected async initFilesystem( - options: IXeusWorkerKernel.IOptions + async mount( + driveName: string, + mountpoint: string, + baseUrl: string ): Promise { - if (options.mountDrive) { - const mountpoint = '/drive'; - const { FS, PATH, ERRNO_CODES } = globalThis.Module; - const { baseUrl } = options; + const { FS, PATH, ERRNO_CODES } = globalThis.Module; - const driveFS = new XeusDriveFS({ - FS, - PATH, - ERRNO_CODES, - baseUrl, - driveName: this._driveName, - mountpoint - }); - FS.mkdir(mountpoint); - FS.mount(driveFS, {}, mountpoint); - FS.chdir(mountpoint); - this._driveFS = driveFS; + if (!FS) { + return; } + + const drive = new XeusDriveFS({ + FS, + PATH, + ERRNO_CODES, + baseUrl, + driveName, + mountpoint + }); + + FS.mkdir(mountpoint); + FS.mount(drive, {}, mountpoint); + FS.chdir(mountpoint); } } diff --git a/src/web_worker_kernel.ts b/src/web_worker_kernel.ts index cf65afc..7901923 100644 --- a/src/web_worker_kernel.ts +++ b/src/web_worker_kernel.ts @@ -4,7 +4,8 @@ import coincident from 'coincident'; -import { Remote, proxy, wrap } from 'comlink'; +import { wrap } from 'comlink'; +import type { Remote } from 'comlink'; import { ISignal, Signal } from '@lumino/signaling'; import { PromiseDelegate } from '@lumino/coreutils'; @@ -90,8 +91,10 @@ export class WebWorkerKernel implements IKernel { return await this._contentsProcessor.processDriveRequest(data); }; } else { + this._worker.onmessage = e => { + this._processWorkerMessage(e.data); + }; remote = wrap(this._worker) as Remote; - remote.registerCallback(proxy(this._processWorkerMessage.bind(this))); } remote .initialize({ diff --git a/src/worker.ts b/src/worker.ts index 5859c12..60c91b4 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -157,38 +157,9 @@ export class XeusRemoteKernel { } } - await this.initFilesystem(options); - kernelReady(1); } - /** - * Setup custom Emscripten FileSystem - */ - protected async initFilesystem( - options: IXeusWorkerKernel.IOptions - ): Promise { - if (options.mountDrive) { - const mountpoint = '/drive'; - const { FS, PATH, ERRNO_CODES } = globalThis.Module; - const { baseUrl } = options; - const { DriveFS } = await import('@jupyterlite/contents'); - - const driveFS = new DriveFS({ - FS, - PATH, - ERRNO_CODES, - baseUrl, - driveName: this._driveName, - mountpoint - }); - FS.mkdir(mountpoint); - FS.mount(driveFS, {}, mountpoint); - FS.chdir(mountpoint); - this._driveFS = driveFS; - } - } - /** * Register the callback function to send messages from the worker back to the main thread. * @param callback the callback to register From 7cd6298892a80c65794140b7ee5be28de87018a6 Mon Sep 17 00:00:00 2001 From: martinRenou Date: Mon, 9 Sep 2024 10:43:01 +0200 Subject: [PATCH 09/12] Fix communication with comlink worker --- src/web_worker_kernel.ts | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/src/web_worker_kernel.ts b/src/web_worker_kernel.ts index 7901923..4c19fc2 100644 --- a/src/web_worker_kernel.ts +++ b/src/web_worker_kernel.ts @@ -71,7 +71,7 @@ export class WebWorkerKernel implements IKernel { let remote: IXeusWorkerKernel | Remote; if (crossOriginIsolated) { remote = coincident(this._worker) as IXeusWorkerKernel; - remote.processWorkerMessage = this._processWorkerMessage.bind(this); + remote.processWorkerMessage = this._processCoincidentWorkerMessage.bind(this); // The coincident worker uses its own filesystem API: (remote.processDriveRequest as any) = async ( data: TDriveRequest @@ -92,7 +92,7 @@ export class WebWorkerKernel implements IKernel { }; } else { this._worker.onmessage = e => { - this._processWorkerMessage(e.data); + this._processComlinkWorkerMessage(e.data); }; remote = wrap(this._worker) as Remote; } @@ -114,7 +114,6 @@ export class WebWorkerKernel implements IKernel { } private async _sendMessageToWorker(msg: any): Promise { - // TODO Remove this?? if (msg.header.msg_type !== 'input_reply') { this._executeDelegate = new PromiseDelegate(); } @@ -149,11 +148,11 @@ export class WebWorkerKernel implements IKernel { } /** - * Process a message coming from the xeus web worker. + * Process a message coming from the coincident web worker. * * @param msg The worker message to process. */ - private _processWorkerMessage(msg: any): void { + private _processCoincidentWorkerMessage(msg: any): void { if (!msg.data?.header) { return; } @@ -171,6 +170,29 @@ export class WebWorkerKernel implements IKernel { } } + /** + * Process a message coming from the comlink web worker. + * + * @param msg The worker message to process. + */ + private _processComlinkWorkerMessage(msg: any): void { + if (!msg.header) { + return; + } + + msg.header.session = this._parentHeader?.session ?? ''; + msg.session = this._parentHeader?.session ?? ''; + this._sendMessage(msg); + + // resolve promise + if ( + msg.header.msg_type === 'status' && + msg.content.execution_state === 'idle' + ) { + this._executeDelegate.resolve(); + } + } + /** * A promise that is fulfilled when the kernel is ready. */ From 8143149685cdd834825b263b853406d2a6edee81 Mon Sep 17 00:00:00 2001 From: martinRenou Date: Mon, 9 Sep 2024 10:43:40 +0200 Subject: [PATCH 10/12] Linting --- src/web_worker_kernel.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/web_worker_kernel.ts b/src/web_worker_kernel.ts index 4c19fc2..480cd37 100644 --- a/src/web_worker_kernel.ts +++ b/src/web_worker_kernel.ts @@ -71,7 +71,8 @@ export class WebWorkerKernel implements IKernel { let remote: IXeusWorkerKernel | Remote; if (crossOriginIsolated) { remote = coincident(this._worker) as IXeusWorkerKernel; - remote.processWorkerMessage = this._processCoincidentWorkerMessage.bind(this); + remote.processWorkerMessage = + this._processCoincidentWorkerMessage.bind(this); // The coincident worker uses its own filesystem API: (remote.processDriveRequest as any) = async ( data: TDriveRequest From b38ba605e1e97f494c9934dd9470060ea2c0ba9c Mon Sep 17 00:00:00 2001 From: martinRenou Date: Mon, 9 Sep 2024 13:35:16 +0200 Subject: [PATCH 11/12] Back to a working coincident setup --- src/coincident.worker.ts | 6 +++--- src/web_worker_kernel.ts | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/coincident.worker.ts b/src/coincident.worker.ts index 9b25e97..245ceb5 100644 --- a/src/coincident.worker.ts +++ b/src/coincident.worker.ts @@ -72,9 +72,9 @@ export class XeusCoincidentKernel extends XeusRemoteKernel { const worker = new XeusCoincidentKernel(); -const sendWorkerMessage = workerAPI.processWorkerMessage.bind(workerAPI); -worker.registerCallback(sendWorkerMessage); - workerAPI.initialize = worker.initialize.bind(worker); +workerAPI.mount = worker.mount.bind(worker); +workerAPI.ready = worker.ready.bind(worker); workerAPI.cd = worker.cd.bind(worker); workerAPI.isDir = worker.isDir.bind(worker); +workerAPI.processMessage = worker.processMessage.bind(worker); diff --git a/src/web_worker_kernel.ts b/src/web_worker_kernel.ts index 480cd37..befb0d5 100644 --- a/src/web_worker_kernel.ts +++ b/src/web_worker_kernel.ts @@ -70,9 +70,9 @@ export class WebWorkerKernel implements IKernel { ): IXeusWorkerKernel | Remote { let remote: IXeusWorkerKernel | Remote; if (crossOriginIsolated) { + this._worker.onmessage = this._processCoincidentWorkerMessage.bind(this); + remote = coincident(this._worker) as IXeusWorkerKernel; - remote.processWorkerMessage = - this._processCoincidentWorkerMessage.bind(this); // The coincident worker uses its own filesystem API: (remote.processDriveRequest as any) = async ( data: TDriveRequest From 7e4f931ffa8a904aa0c98e8fbcf7915d5bee33f6 Mon Sep 17 00:00:00 2001 From: martinRenou Date: Mon, 9 Sep 2024 16:31:35 +0200 Subject: [PATCH 12/12] Add comment as per review --- src/web_worker_kernel.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/web_worker_kernel.ts b/src/web_worker_kernel.ts index befb0d5..c0b34cd 100644 --- a/src/web_worker_kernel.ts +++ b/src/web_worker_kernel.ts @@ -70,6 +70,8 @@ export class WebWorkerKernel implements IKernel { ): IXeusWorkerKernel | Remote { let remote: IXeusWorkerKernel | Remote; if (crossOriginIsolated) { + // We directly forward messages to xeus, which will dispatch them properly + // See discussion in https://github.com/jupyterlite/xeus/pull/108#discussion_r1750143661 this._worker.onmessage = this._processCoincidentWorkerMessage.bind(this); remote = coincident(this._worker) as IXeusWorkerKernel;