From 1f01f017a3f2d6fb6c0124d2f756bbac8d62ee6f Mon Sep 17 00:00:00 2001 From: Jeongho Nam Date: Thu, 28 Mar 2024 00:58:22 +0900 Subject: [PATCH 1/2] Revive `worker-threads`. --- package.json | 3 +- src/components/Communicator.ts | 21 ++++------ src/protocols/workers/WorkerConnector.ts | 13 ++++-- src/protocols/workers/WorkerServer.ts | 10 ++--- .../workers/internal/NodeWorkerCompiler.ts | 1 - .../workers/internal/threads/ThreadPort.ts | 41 +++++++++++++------ src/utils/internal/serializeError.ts | 16 ++++++++ test/node/index.ts | 15 +++---- .../protocols/workers/internal/calculator.ts | 1 + test/node/protocols/workers/internal/join.ts | 3 +- .../workers/test_hierarchical_workers.ts | 1 + .../protocols/workers/test_worker_compiler.ts | 12 +----- .../protocols/workers/test_worker_join.ts | 11 ++--- test/node/utils/test_util_serialize_error.ts | 10 +++++ 14 files changed, 97 insertions(+), 61 deletions(-) create mode 100644 src/utils/internal/serializeError.ts create mode 100644 test/node/utils/test_util_serialize_error.ts diff --git a/package.json b/package.json index 1131984..734771c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "tgrid", - "version": "0.9.7", + "version": "0.9.8", "main": "lib/index.js", "typings": "lib/index.d.ts", "description": "Grid Computing Framework for TypeScript", @@ -23,7 +23,6 @@ }, "dependencies": { "import2": "^1.0.3", - "serialize-error": "^4.1.0", "tstl": "^2.5.16", "ws": "^7.5.3" }, diff --git a/src/components/Communicator.ts b/src/components/Communicator.ts index 67e3034..300a80c 100644 --- a/src/components/Communicator.ts +++ b/src/components/Communicator.ts @@ -5,11 +5,9 @@ import { Pair } from "tstl/utility/Pair"; import { HashMap } from "tstl/container/HashMap"; import { ConditionVariable } from "tstl/thread/ConditionVariable"; -import { Exception } from "tstl/exception/Exception"; import { DomainError } from "tstl/exception/DomainError"; import { RuntimeError } from "tstl/exception/RuntimeError"; - -import serializeError from "serialize-error"; +import { serializeError } from "../utils/internal/serializeError"; /** * The basic communicator. @@ -371,21 +369,18 @@ export abstract class Communicator { */ private async _Send_return( uid: number, - flag: boolean, - val: any, + success: boolean, + value: any, ): Promise { // SPECIAL LOGIC FOR ERROR -> FOR CLEAR JSON ENCODING - if (flag === false && val instanceof Error) { - if (typeof (val as Exception).toJSON === "function") - val = (val as Exception).toJSON(); - else val = serializeError(val); - } + if (success === false && value instanceof Error) + value = serializeError(value); // RETURNS const ret: Invoke.IReturn = { - uid: uid, - success: flag, - value: val, + uid, + success, + value, }; await this.sendData(ret); } diff --git a/src/protocols/workers/WorkerConnector.ts b/src/protocols/workers/WorkerConnector.ts index 182570d..4d8ea62 100644 --- a/src/protocols/workers/WorkerConnector.ts +++ b/src/protocols/workers/WorkerConnector.ts @@ -45,6 +45,9 @@ export class WorkerConnector extends ConnectorBase implements IWorkerSystem { + /** + * @hidden + */ private readonly compiler_: Singleton>; /** @@ -55,18 +58,22 @@ export class WorkerConnector /** * Initializer Constructor. * + * For reference, you're planning to run a bundled JavaScript file, + * and you're using the NodeJS environment, you can't use the `"thread"` + * mode. You've to use the `"process"` mode instead. + * * @param header An object containing initialization data like activation. * @param provider An object providing features for remote system. - * @param type You can specify the worker mode when NodeJS. Default is "thread". + * @param type You can specify the worker mode when NodeJS. Default is "process". */ public constructor( header: Header, provider: Provider, - type: "thread" | "process" = "thread", + type?: "thread" | "process", ) { super(header, provider); this.compiler_ = new Singleton(() => - is_node() ? NodeWorkerCompiler(type) : WebWorkerCompiler(), + is_node() ? NodeWorkerCompiler(type ?? "process") : WebWorkerCompiler(), ); } diff --git a/src/protocols/workers/WorkerServer.ts b/src/protocols/workers/WorkerServer.ts index f0a490d..b9b2993 100644 --- a/src/protocols/workers/WorkerServer.ts +++ b/src/protocols/workers/WorkerServer.ts @@ -73,13 +73,12 @@ export class WorkerServer */ public constructor() { super(undefined); - this.channel_ = new Singleton(async () => { + // BROWSER CASE if (is_node() === false) return (self) as IFeature; - const threadPort = await ThreadPort(); - return threadPort.is_worker_server() - ? ((threadPort) as IFeature) + return (await ThreadPort.isWorkerThread()) + ? ((await ThreadPort()) as IFeature) : (ProcessChannel as IFeature); }); this.state_ = WorkerServer.State.NONE; @@ -88,7 +87,6 @@ export class WorkerServer const data: string = await this._Handshake("getHeader"); const wrapper: IHeaderWrapper
= JSON.parse(data); - return wrapper.header; }); } @@ -130,7 +128,7 @@ export class WorkerServer // SUCCESS const channel = await this.channel_.get(); - channel.onmessage = this._Handle_message.bind(this); + channel.onmessage = (evt) => this._Handle_message(evt); channel.postMessage(WorkerServer.State.OPEN); this.state_ = WorkerServer.State.OPEN; diff --git a/src/protocols/workers/internal/NodeWorkerCompiler.ts b/src/protocols/workers/internal/NodeWorkerCompiler.ts index fd9b61d..ab88e92 100644 --- a/src/protocols/workers/internal/NodeWorkerCompiler.ts +++ b/src/protocols/workers/internal/NodeWorkerCompiler.ts @@ -28,7 +28,6 @@ export const NodeWorkerCompiler = async ( break; } } - await FileSystem.write(path, content); return path; }, diff --git a/src/protocols/workers/internal/threads/ThreadPort.ts b/src/protocols/workers/internal/threads/ThreadPort.ts index 4b59715..30b79af 100644 --- a/src/protocols/workers/internal/threads/ThreadPort.ts +++ b/src/protocols/workers/internal/threads/ThreadPort.ts @@ -6,19 +6,36 @@ import { NodeModule } from "../../../../utils/internal/NodeModule"; /** * @hidden */ + export async function ThreadPort() { const { parentPort } = await NodeModule.thread.get(); + if (!parentPort) throw new Error("This is not a worker thread."); + const process = NodeModule.process(); - return { - postMessage: (message: any) => parentPort!.postMessage(message), - close: () => process.exit(0), - onmessage: (listener: (event: MessageEvent) => void) => - parentPort!.on("message", (message) => - listener({ data: message } as MessageEvent), - ), - document: parentPort - ? (null! as Document) // NOT WORKER - : undefined, - is_worker_server: () => !!parentPort, - }; + class ThreadPort { + public static postMessage(message: any): void { + parentPort!.postMessage(message); + } + public static close(): void { + process.exit(0); + } + public static set onmessage(listener: (event: MessageEvent) => void) { + parentPort!.on("message", (msg) => { + listener({ data: msg } as MessageEvent); + }); + } + public static get document(): Document { + return null!; + } + public static is_worker_server(): boolean { + return true; + } + } + return ThreadPort; +} +export namespace ThreadPort { + export async function isWorkerThread(): Promise { + const { parentPort } = await NodeModule.thread.get(); + return !!parentPort; + } } diff --git a/src/utils/internal/serializeError.ts b/src/utils/internal/serializeError.ts new file mode 100644 index 0000000..c2a9f2d --- /dev/null +++ b/src/utils/internal/serializeError.ts @@ -0,0 +1,16 @@ +export const serializeError = (error: any) => { + if ( + typeof error === "object" && + error !== null && + typeof error.toJSON === "function" + ) + return error.toJSON(); + else if (error instanceof Error) + return { + ...error, + name: error.name, + stack: error.stack, + message: error.message, + }; + return error; +}; diff --git a/test/node/index.ts b/test/node/index.ts index 811c52a..e219375 100644 --- a/test/node/index.ts +++ b/test/node/index.ts @@ -8,22 +8,23 @@ interface IModule { } async function iterate(path: string): Promise { - const file_list: string[] = fs.readdirSync(path); - for (const file of file_list) { - const current_path: string = path + "/" + file; - const stat: fs.Stats = fs.lstatSync(current_path); + for (const file of await fs.promises.readdir(path)) { + const location: string = path + "/" + file; + const stat: fs.Stats = await fs.promises.lstat(location); + + if (file === "utils") console.log(location, stat.isDirectory()); if (stat.isDirectory() === true && file !== "internal") { - await iterate(current_path); + await iterate(location); continue; } else if ( file.substr(-3) !== ".js" || - current_path === __dirname + "/index.js" + location === __dirname + "/index.js" ) continue; const external: IModule = await import( - current_path.substr(0, current_path.length - 3) + location.substr(0, location.length - 3) ); for (const key in external) if (key.substr(0, 5) === "test_") { diff --git a/test/node/protocols/workers/internal/calculator.ts b/test/node/protocols/workers/internal/calculator.ts index 6eeca95..4508520 100644 --- a/test/node/protocols/workers/internal/calculator.ts +++ b/test/node/protocols/workers/internal/calculator.ts @@ -15,6 +15,7 @@ async function get( const connector: WorkerConnector = new WorkerConnector( null, null, + "process", ); await connector.connect(path); diff --git a/test/node/protocols/workers/internal/join.ts b/test/node/protocols/workers/internal/join.ts index db2095c..63a5ae8 100644 --- a/test/node/protocols/workers/internal/join.ts +++ b/test/node/protocols/workers/internal/join.ts @@ -7,8 +7,7 @@ async function main(): Promise { const server: WorkerServer = new WorkerServer(); await server.open(null); await server.join(); - - await fs.promises.writeFile(FILE_PATH, "WorkerServer.join()", "utf8"); + fs.writeFileSync(FILE_PATH, "WorkerServer.join()", "utf8"); } main().catch((exp) => { console.log(exp); diff --git a/test/node/protocols/workers/test_hierarchical_workers.ts b/test/node/protocols/workers/test_hierarchical_workers.ts index b22c8b1..336cfbe 100644 --- a/test/node/protocols/workers/test_hierarchical_workers.ts +++ b/test/node/protocols/workers/test_hierarchical_workers.ts @@ -5,6 +5,7 @@ export async function test_hierarchical_workers(): Promise { const connector: WorkerConnector = new WorkerConnector( null, null, + "process", ); for (let i: number = 0; i < 5; ++i) { // DO CONNECT diff --git a/test/node/protocols/workers/test_worker_compiler.ts b/test/node/protocols/workers/test_worker_compiler.ts index d42a4b2..72347d7 100644 --- a/test/node/protocols/workers/test_worker_compiler.ts +++ b/test/node/protocols/workers/test_worker_compiler.ts @@ -4,21 +4,13 @@ import cp from "child_process"; import { ICalculator } from "../../../controllers/ICalculator"; import { WorkerConnector } from "tgrid"; -export function test_worker_connect(): Promise { - return _Test_worker( - (worker) => - worker.connect(__dirname + "/../../../browser/worker-server.js"), - "process", - ); -} - -export async function test_worker_compile(): Promise { +export async function test_worker_compiler(): Promise { const PATH = __dirname + "/../../../../../bundle/worker-server.js"; if (fs.existsSync(PATH) === false) cp.execSync("npm run bundle"); await _Test_worker( (worker) => worker.compile(fs.readFileSync(PATH, "utf8")), - "thread", + "process", ); } diff --git a/test/node/protocols/workers/test_worker_join.ts b/test/node/protocols/workers/test_worker_join.ts index db15f99..6570051 100644 --- a/test/node/protocols/workers/test_worker_join.ts +++ b/test/node/protocols/workers/test_worker_join.ts @@ -1,11 +1,11 @@ +import fs from "fs"; import { WorkerConnector } from "tgrid"; -import { FileSystem } from "tgrid/lib/protocols/workers/internal/FileSystem"; import { sleep_for } from "tstl/thread"; const FILE_PATH = __dirname + "/log.dat"; export async function test_worker_join(): Promise { - await FileSystem.write(FILE_PATH, "NOT YET"); + await fs.promises.writeFile(FILE_PATH, "NOT YET", "utf8"); const connector: WorkerConnector = new WorkerConnector( null, @@ -13,13 +13,14 @@ export async function test_worker_join(): Promise { ); await connector.connect(__dirname + "/internal/join.js"); - sleep_for(100) + sleep_for(1_000) .then(() => connector.close()) .catch(() => {}); await connector.join(); - const content: string = await FileSystem.read(FILE_PATH, "utf8"); - await FileSystem.unlink(FILE_PATH); + await sleep_for(50); + const content: string = await fs.promises.readFile(FILE_PATH, "utf8"); + await fs.promises.unlink(FILE_PATH); if (content !== "WorkerServer.join()") throw new Error("Error on WorkerServer.join()"); diff --git a/test/node/utils/test_util_serialize_error.ts b/test/node/utils/test_util_serialize_error.ts new file mode 100644 index 0000000..c66de27 --- /dev/null +++ b/test/node/utils/test_util_serialize_error.ts @@ -0,0 +1,10 @@ +import { serializeError } from "tgrid/lib/utils/internal/serializeError"; + +export async function test_util_serialize_error(): Promise { + const e: TypeError = new TypeError("something wrong"); + const p: object = JSON.parse(JSON.stringify(serializeError(e))); + const keys: string[] = Object.keys(p); + for (const expected of ["name", "message", "stack"]) + if (keys.indexOf(expected) === -1) + throw new Error(`Error object does not have ${expected}`); +} From 8709f602a04e250148d3619446cec2c60b1199c0 Mon Sep 17 00:00:00 2001 From: Jeongho Nam Date: Thu, 28 Mar 2024 01:07:13 +0900 Subject: [PATCH 2/2] Fix github actions bug --- .gitignore | 3 ++- package.json | 1 + test/node/protocols/workers/test_worker_compiler.ts | 4 ++-- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 8e30fff..0b49aea 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ lib/ node_modules/ bundle/*.js -package-lock.json \ No newline at end of file +package-lock.json +pnpm-lock.yaml \ No newline at end of file diff --git a/package.json b/package.json index 734771c..67550e0 100644 --- a/package.json +++ b/package.json @@ -40,6 +40,7 @@ "mv": "^2.1.1", "prettier": "^3.2.5", "puppeteer": "^22.4.1", + "rimraf": "^5.0.5", "source-map-support": "^0.5.21", "ts-node": "^10.9.2", "ts-patch": "^3.1.2", diff --git a/test/node/protocols/workers/test_worker_compiler.ts b/test/node/protocols/workers/test_worker_compiler.ts index 72347d7..9c1bf51 100644 --- a/test/node/protocols/workers/test_worker_compiler.ts +++ b/test/node/protocols/workers/test_worker_compiler.ts @@ -1,12 +1,12 @@ import fs from "fs"; -import cp from "child_process"; import { ICalculator } from "../../../controllers/ICalculator"; import { WorkerConnector } from "tgrid"; +import { TestBundler } from "../../../browser/TestBundler"; export async function test_worker_compiler(): Promise { const PATH = __dirname + "/../../../../../bundle/worker-server.js"; - if (fs.existsSync(PATH) === false) cp.execSync("npm run bundle"); + if (fs.existsSync(PATH) === false) await TestBundler.execute(); await _Test_worker( (worker) => worker.compile(fs.readFileSync(PATH, "utf8")),