Skip to content

Commit

Permalink
Revive worker-threads.
Browse files Browse the repository at this point in the history
  • Loading branch information
samchon committed Mar 27, 2024
1 parent 2881f43 commit 1f01f01
Show file tree
Hide file tree
Showing 14 changed files with 97 additions and 61 deletions.
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "tgrid",
"version": "0.9.7",
"version": "0.9.8",
"main": "lib/index.js",
"typings": "lib/index.d.ts",
"description": "Grid Computing Framework for TypeScript",
Expand All @@ -23,7 +23,6 @@
},
"dependencies": {
"import2": "^1.0.3",
"serialize-error": "^4.1.0",
"tstl": "^2.5.16",
"ws": "^7.5.3"
},
Expand Down
21 changes: 8 additions & 13 deletions src/components/Communicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import { Pair } from "tstl/utility/Pair";
import { HashMap } from "tstl/container/HashMap";
import { ConditionVariable } from "tstl/thread/ConditionVariable";

import { Exception } from "tstl/exception/Exception";
import { DomainError } from "tstl/exception/DomainError";
import { RuntimeError } from "tstl/exception/RuntimeError";

import serializeError from "serialize-error";
import { serializeError } from "../utils/internal/serializeError";

/**
* The basic communicator.
Expand Down Expand Up @@ -371,21 +369,18 @@ export abstract class Communicator<Provider> {
*/
private async _Send_return(
uid: number,
flag: boolean,
val: any,
success: boolean,
value: any,
): Promise<void> {
// SPECIAL LOGIC FOR ERROR -> FOR CLEAR JSON ENCODING
if (flag === false && val instanceof Error) {
if (typeof (val as Exception).toJSON === "function")
val = (val as Exception).toJSON();
else val = serializeError(val);
}
if (success === false && value instanceof Error)
value = serializeError(value);

// RETURNS
const ret: Invoke.IReturn = {
uid: uid,
success: flag,
value: val,
uid,
success,
value,
};
await this.sendData(ret);
}
Expand Down
13 changes: 10 additions & 3 deletions src/protocols/workers/WorkerConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ export class WorkerConnector<Header, Provider extends object | null>
extends ConnectorBase<Header, Provider>
implements IWorkerSystem
{
/**
* @hidden
*/
private readonly compiler_: Singleton<Promise<IWorkerCompiler>>;

/**
Expand All @@ -55,18 +58,22 @@ export class WorkerConnector<Header, Provider extends object | null>
/**
* Initializer Constructor.
*
* For reference, you're planning to run a bundled JavaScript file,
* and you're using the NodeJS environment, you can't use the `"thread"`
* mode. You've to use the `"process"` mode instead.
*
* @param header An object containing initialization data like activation.
* @param provider An object providing features for remote system.
* @param type You can specify the worker mode when NodeJS. Default is "thread".
* @param type You can specify the worker mode when NodeJS. Default is "process".
*/
public constructor(
header: Header,
provider: Provider,
type: "thread" | "process" = "thread",
type?: "thread" | "process",
) {
super(header, provider);
this.compiler_ = new Singleton(() =>
is_node() ? NodeWorkerCompiler(type) : WebWorkerCompiler(),
is_node() ? NodeWorkerCompiler(type ?? "process") : WebWorkerCompiler(),
);
}

Expand Down
10 changes: 4 additions & 6 deletions src/protocols/workers/WorkerServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,12 @@ export class WorkerServer<Header, Provider extends object | null>
*/
public constructor() {
super(undefined);

this.channel_ = new Singleton(async () => {
// BROWSER CASE
if (is_node() === false) return (<any>self) as IFeature;

const threadPort = await ThreadPort();
return threadPort.is_worker_server()
? ((<any>threadPort) as IFeature)
return (await ThreadPort.isWorkerThread())
? ((await ThreadPort()) as IFeature)
: (ProcessChannel as IFeature);
});
this.state_ = WorkerServer.State.NONE;
Expand All @@ -88,7 +87,6 @@ export class WorkerServer<Header, Provider extends object | null>

const data: string = await this._Handshake("getHeader");
const wrapper: IHeaderWrapper<Header> = JSON.parse(data);

return wrapper.header;
});
}
Expand Down Expand Up @@ -130,7 +128,7 @@ export class WorkerServer<Header, Provider extends object | null>

// SUCCESS
const channel = await this.channel_.get();
channel.onmessage = this._Handle_message.bind(this);
channel.onmessage = (evt) => this._Handle_message(evt);
channel.postMessage(WorkerServer.State.OPEN);

this.state_ = WorkerServer.State.OPEN;
Expand Down
1 change: 0 additions & 1 deletion src/protocols/workers/internal/NodeWorkerCompiler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ export const NodeWorkerCompiler = async (
break;
}
}

await FileSystem.write(path, content);
return path;
},
Expand Down
41 changes: 29 additions & 12 deletions src/protocols/workers/internal/threads/ThreadPort.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,36 @@ import { NodeModule } from "../../../../utils/internal/NodeModule";
/**
* @hidden
*/

export async function ThreadPort() {
const { parentPort } = await NodeModule.thread.get();
if (!parentPort) throw new Error("This is not a worker thread.");

const process = NodeModule.process();
return {
postMessage: (message: any) => parentPort!.postMessage(message),
close: () => process.exit(0),
onmessage: (listener: (event: MessageEvent) => void) =>
parentPort!.on("message", (message) =>
listener({ data: message } as MessageEvent),
),
document: parentPort
? (null! as Document) // NOT WORKER
: undefined,
is_worker_server: () => !!parentPort,
};
class ThreadPort {
public static postMessage(message: any): void {
parentPort!.postMessage(message);
}
public static close(): void {
process.exit(0);
}
public static set onmessage(listener: (event: MessageEvent) => void) {
parentPort!.on("message", (msg) => {
listener({ data: msg } as MessageEvent);
});
}
public static get document(): Document {
return null!;
}
public static is_worker_server(): boolean {
return true;
}
}
return ThreadPort;
}
export namespace ThreadPort {
export async function isWorkerThread(): Promise<boolean> {
const { parentPort } = await NodeModule.thread.get();
return !!parentPort;
}
}
16 changes: 16 additions & 0 deletions src/utils/internal/serializeError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export const serializeError = (error: any) => {
if (
typeof error === "object" &&
error !== null &&
typeof error.toJSON === "function"
)
return error.toJSON();
else if (error instanceof Error)
return {
...error,
name: error.name,
stack: error.stack,
message: error.message,
};
return error;
};
15 changes: 8 additions & 7 deletions test/node/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,23 @@ interface IModule {
}

async function iterate(path: string): Promise<void> {
const file_list: string[] = fs.readdirSync(path);
for (const file of file_list) {
const current_path: string = path + "/" + file;
const stat: fs.Stats = fs.lstatSync(current_path);
for (const file of await fs.promises.readdir(path)) {
const location: string = path + "/" + file;
const stat: fs.Stats = await fs.promises.lstat(location);

if (file === "utils") console.log(location, stat.isDirectory());

if (stat.isDirectory() === true && file !== "internal") {
await iterate(current_path);
await iterate(location);
continue;
} else if (
file.substr(-3) !== ".js" ||
current_path === __dirname + "/index.js"
location === __dirname + "/index.js"
)
continue;

const external: IModule = await import(
current_path.substr(0, current_path.length - 3)
location.substr(0, location.length - 3)
);
for (const key in external)
if (key.substr(0, 5) === "test_") {
Expand Down
1 change: 1 addition & 0 deletions test/node/protocols/workers/internal/calculator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async function get<Controller extends object>(
const connector: WorkerConnector<null, null> = new WorkerConnector(
null,
null,
"process",
);
await connector.connect(path);

Expand Down
3 changes: 1 addition & 2 deletions test/node/protocols/workers/internal/join.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ async function main(): Promise<void> {
const server: WorkerServer<null, null> = new WorkerServer();
await server.open(null);
await server.join();

await fs.promises.writeFile(FILE_PATH, "WorkerServer.join()", "utf8");
fs.writeFileSync(FILE_PATH, "WorkerServer.join()", "utf8");
}
main().catch((exp) => {
console.log(exp);
Expand Down
1 change: 1 addition & 0 deletions test/node/protocols/workers/test_hierarchical_workers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export async function test_hierarchical_workers(): Promise<void> {
const connector: WorkerConnector<null, null> = new WorkerConnector(
null,
null,
"process",
);
for (let i: number = 0; i < 5; ++i) {
// DO CONNECT
Expand Down
12 changes: 2 additions & 10 deletions test/node/protocols/workers/test_worker_compiler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,13 @@ import cp from "child_process";
import { ICalculator } from "../../../controllers/ICalculator";
import { WorkerConnector } from "tgrid";

export function test_worker_connect(): Promise<void> {
return _Test_worker(
(worker) =>
worker.connect(__dirname + "/../../../browser/worker-server.js"),
"process",
);
}

export async function test_worker_compile(): Promise<void> {
export async function test_worker_compiler(): Promise<void> {
const PATH = __dirname + "/../../../../../bundle/worker-server.js";
if (fs.existsSync(PATH) === false) cp.execSync("npm run bundle");

await _Test_worker(
(worker) => worker.compile(fs.readFileSync(PATH, "utf8")),
"thread",
"process",
);
}

Expand Down
11 changes: 6 additions & 5 deletions test/node/protocols/workers/test_worker_join.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
import fs from "fs";
import { WorkerConnector } from "tgrid";
import { FileSystem } from "tgrid/lib/protocols/workers/internal/FileSystem";
import { sleep_for } from "tstl/thread";

const FILE_PATH = __dirname + "/log.dat";

export async function test_worker_join(): Promise<void> {
await FileSystem.write(FILE_PATH, "NOT YET");
await fs.promises.writeFile(FILE_PATH, "NOT YET", "utf8");

const connector: WorkerConnector<null, null> = new WorkerConnector(
null,
null,
);
await connector.connect(__dirname + "/internal/join.js");

sleep_for(100)
sleep_for(1_000)
.then(() => connector.close())
.catch(() => {});
await connector.join();

const content: string = await FileSystem.read(FILE_PATH, "utf8");
await FileSystem.unlink(FILE_PATH);
await sleep_for(50);
const content: string = await fs.promises.readFile(FILE_PATH, "utf8");
await fs.promises.unlink(FILE_PATH);

if (content !== "WorkerServer.join()")
throw new Error("Error on WorkerServer.join()");
Expand Down
10 changes: 10 additions & 0 deletions test/node/utils/test_util_serialize_error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { serializeError } from "tgrid/lib/utils/internal/serializeError";

export async function test_util_serialize_error(): Promise<void> {
const e: TypeError = new TypeError("something wrong");
const p: object = JSON.parse(JSON.stringify(serializeError(e)));
const keys: string[] = Object.keys(p);
for (const expected of ["name", "message", "stack"])
if (keys.indexOf(expected) === -1)
throw new Error(`Error object does not have ${expected}`);
}

0 comments on commit 1f01f01

Please sign in to comment.