Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node): worker_threads #1151

Merged
merged 10 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion node/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Deno standard library as it's a compatibility module.
- [x] vm _partly_
- [x] wasi
- [ ] webcrypto
- [ ] worker_threads
- [x] worker_threads
- [ ] zlib

* [x] node globals _partly_
Expand Down
2 changes: 1 addition & 1 deletion node/module_all.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,6 @@ export default {
v8,
vm,
wasi,
workerThreads,
"worker_threads": workerThreads,
zlib,
} as Record<string, unknown>;
29 changes: 29 additions & 0 deletions node/testdata/worker_threads.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import {
getEnvironmentData,
isMainThread,
parentPort,
threadId,
workerData,
} from "../worker_threads.ts";
import { once } from "../events.ts";

async function message(expectedMessage: string) {
const [message] = await once(parentPort, "message");
if (message !== expectedMessage) {
// fail test
parentPort.close();
}
}

await message("Hello, how are you my thread?");
parentPort.postMessage("I'm fine!");

parentPort.postMessage({
isMainThread,
threadId,
workerData: Array.isArray(workerData) &&
workerData[workerData.length - 1] instanceof MessagePort
? workerData.slice(0, -1)
: workerData,
envData: [getEnvironmentData("test"), getEnvironmentData(1)],
});
261 changes: 216 additions & 45 deletions node/worker_threads.ts
Original file line number Diff line number Diff line change
@@ -1,63 +1,234 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent and Node contributors. All rights reserved. MIT license.

import { resolve, toFileUrl } from "../path/mod.ts";
import { notImplemented } from "./_utils.ts";
import { EventEmitter, once } from "./events.ts";

export function getEnvironmentData() {
notImplemented();
}
export const isMainThread = undefined;
export function markAsUntransferable() {
notImplemented();
}
export function moveMessagePortToContext() {
notImplemented();
}
export const parentPort = undefined;
export function receiveMessageOnPort() {
notImplemented();
}
export const resourceLimits = undefined;
export const SHARE_ENV = undefined;
export function setEnvironmentData() {
notImplemented();
let environmentData = new Map();
let threads = 0;

export interface WorkerOptions {
// only for typings
argv?: unknown[];
env?: Record<string, unknown>;
execArgv?: string[];
stdin?: boolean;
stdout?: boolean;
stderr?: boolean;
trackUnmanagedFds?: boolean;
resourceLimits?: {
maxYoungGenerationSizeMb?: number;
maxOldGenerationSizeMb?: number;
codeRangeSizeMb?: number;
stackSizeMb?: number;
};

eval?: boolean;
transferList?: Transferable[];
workerData?: unknown;
}
export const threadId = undefined;
export const workerData = undefined;
export class BroadcastChannel {
constructor() {
notImplemented();

const kHandle = Symbol("kHandle");
class _Worker extends EventEmitter {
readonly threadId: number;
readonly resourceLimits: Required<
NonNullable<WorkerOptions["resourceLimits"]>
> = {
maxYoungGenerationSizeMb: -1,
maxOldGenerationSizeMb: -1,
codeRangeSizeMb: -1,
stackSizeMb: 4,
};
private readonly [kHandle]: Worker;

postMessage: Worker["postMessage"];

constructor(specifier: URL | string, options?: WorkerOptions) {
super();
if (options?.eval === true) {
specifier = `data:text/javascript,${specifier}`;
} else if (typeof specifier === "string") {
specifier = toFileUrl(resolve(specifier));
}
const handle = this[kHandle] = new Worker(
specifier,
{
...(options || {}),
type: "module",
// unstable
deno: { namespace: true },
},
);
handle.addEventListener(
"error",
(event) => this.emit("error", event.error || event.message),
);
handle.addEventListener(
"messageerror",
(event) => this.emit("messageerror", event.data),
);
handle.addEventListener(
"message",
(event) => this.emit("message", event.data),
);
handle.postMessage({
environmentData,
threadId: (this.threadId = ++threads),
workerData: options?.workerData,
}, options?.transferList || []);
this.postMessage = handle.postMessage.bind(handle);
this.emit("online");
}
}
export class MessageChannel {
constructor() {
notImplemented();

terminate() {
this[kHandle].terminate();
this.emit("exit", 0);
}

readonly getHeapSnapshot = notImplemented;
// fake performance
readonly performance = globalThis.performance;
}
export class MessagePort {
constructor() {
notImplemented();
}

export const isMainThread =
// deno-lint-ignore no-explicit-any
typeof (globalThis as any).DedicatedWorkerGlobalScope === "undefined" ||
// deno-lint-ignore no-explicit-any
self instanceof (globalThis as any).DedicatedWorkerGlobalScope === false;

// fake resourceLimits
export const resourceLimits = isMainThread ? {} : {
maxYoungGenerationSizeMb: 48,
maxOldGenerationSizeMb: 2048,
codeRangeSizeMb: 0,
stackSizeMb: 4,
};

let threadId = 0;
let workerData: unknown = null;

// Like https://github.com/nodejs/node/blob/48655e17e1d84ba5021d7a94b4b88823f7c9c6cf/lib/internal/event_target.js#L611
interface NodeEventTarget extends
Pick<
EventEmitter,
"eventNames" | "listenerCount" | "emit" | "removeAllListeners"
> {
setMaxListeners(n: number): void;
getMaxListeners(): number;
// deno-lint-ignore no-explicit-any
off(eventName: string, listener: (...args: any[]) => void): NodeEventTarget;
// deno-lint-ignore no-explicit-any
on(eventName: string, listener: (...args: any[]) => void): NodeEventTarget;
// deno-lint-ignore no-explicit-any
once(eventName: string, listener: (...args: any[]) => void): NodeEventTarget;
addListener: NodeEventTarget["on"];
removeListener: NodeEventTarget["off"];
}

type ParentPort = typeof self & NodeEventTarget;

// deno-lint-ignore no-explicit-any
let parentPort: ParentPort = null as any;

if (!isMainThread) {
// deno-lint-ignore no-explicit-any
const listeners = new WeakMap<(...args: any[]) => void, (ev: any) => any>();

parentPort = self as ParentPort;
parentPort.off = parentPort.removeListener = function (
this: ParentPort,
name,
listener,
) {
this.removeEventListener(name, listeners.get(listener)!);
listeners.delete(listener);
return this;
};
parentPort.on = parentPort.addListener = function (
this: ParentPort,
name,
listener,
) {
// deno-lint-ignore no-explicit-any
const _listener = (ev: any) => listener(ev.data);
listeners.set(listener, _listener);
this.addEventListener(name, _listener);
return this;
};
parentPort.once = function (this: ParentPort, name, listener) {
// deno-lint-ignore no-explicit-any
const _listener = (ev: any) => listener(ev.data);
listeners.set(listener, _listener);
this.addEventListener(name, _listener);
return this;
};

// mocks
parentPort.setMaxListeners = () => {};
parentPort.getMaxListeners = () => Infinity;
parentPort.eventNames = () => [""];
parentPort.listenerCount = () => 0;

parentPort.emit = () => notImplemented();
parentPort.removeAllListeners = () => notImplemented();

[{ threadId, workerData, environmentData }] = await once(
parentPort,
"message",
);
bartlomieju marked this conversation as resolved.
Show resolved Hide resolved

// alias
parentPort.addEventListener("offline", () => {
parentPort.emit("close");
});
}

export function getEnvironmentData(key: unknown) {
return environmentData.get(key);
}
export class Worker {
constructor() {
notImplemented();

export function setEnvironmentData(key: unknown, value?: unknown) {
if (value === undefined) {
environmentData.delete(key);
} else {
environmentData.set(key, value);
}
}

// deno-lint-ignore no-explicit-any
const _MessagePort: typeof MessagePort = (globalThis as any).MessagePort;
const _MessageChannel: typeof MessageChannel =
// deno-lint-ignore no-explicit-any
(globalThis as any).MessageChannel;
export const BroadcastChannel = globalThis.BroadcastChannel;
export const SHARE_ENV = Symbol.for("nodejs.worker_threads.SHARE_ENV");
export {
_MessageChannel as MessageChannel,
_MessagePort as MessagePort,
_Worker as Worker,
notImplemented as markAsUntransferable,
notImplemented as moveMessagePortToContext,
notImplemented as receiveMessageOnPort,
parentPort,
threadId,
workerData,
};

export default {
markAsUntransferable: notImplemented,
moveMessagePortToContext: notImplemented,
receiveMessageOnPort: notImplemented,
MessagePort: _MessagePort,
MessageChannel: _MessageChannel,
BroadcastChannel,
Worker: _Worker,
getEnvironmentData,
isMainThread,
markAsUntransferable,
moveMessagePortToContext,
parentPort,
receiveMessageOnPort,
resourceLimits,
SHARE_ENV,
setEnvironmentData,
SHARE_ENV,
threadId,
workerData,
BroadcastChannel,
MessageChannel,
MessagePort,
Worker,
resourceLimits,
parentPort,
isMainThread,
};
Loading