Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close #90: add Communicator.on() message with event types. #92

Merged
merged 2 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "tgrid",
"version": "1.0.3",
"version": "1.1.0",
"main": "lib/index.js",
"typings": "lib/index.d.ts",
"exports": {
Expand Down
192 changes: 163 additions & 29 deletions src/components/Communicator.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { ConditionVariable, HashMap, Pair } from "tstl";
import { ConditionVariable, HashMap, HashSet } from "tstl";

import { Driver } from "../typings/Driver";
import { serializeError } from "../utils/internal/serializeError";
import { Invoke } from "./Invoke";
import { InvokeEvent } from "./InvokeEvent";

/**
* The basic communicator.
Expand Down Expand Up @@ -44,7 +45,15 @@ export abstract class Communicator<
/**
* @hidden
*/
private promises_: HashMap<number, Pair<FunctionLike, FunctionLike>>;
private promises_: HashMap<number, IFunctionReservation>;

/**
* @hidden
*/
private event_listeners_: HashMap<
InvokeEvent.Type,
HashSet<(event: InvokeEvent) => void>
>;

/**
* @hidden
Expand Down Expand Up @@ -72,6 +81,50 @@ export abstract class Communicator<
// OTHER MEMBERS
this.promises_ = new HashMap();
this.join_cv_ = new ConditionVariable();
this.event_listeners_ = new HashMap();
}

/**
* Add invoke event listener.
*
* Add an event listener for the invoke event. The event listener would be called
* when some invoke event has been occured; sending, receiving, completing, or returning.
*
* If you change the requesting parameters or returning value in the event listener,
* it would affect to the RPC (Remote Procedure Call) communication. Therefore, you have
* to be careful when modifying the remote function calling.
*
* Of course, you can utilize the event listener just for monitoring the RPC events.
*
* @param type Type of the event
* @param listener The listener function to enroll
*/
public on<Type extends InvokeEvent.Type>(
type: Type,
listener: (event: InvokeEvent.EventMapper[Type]) => void,
): void {
this.event_listeners_
.take(type, () => new HashSet())
.insert(listener as (event: InvokeEvent) => void);
}

/**
* Erase invoke event listener.
*
* Erase an event listener from the invoke event. The event listener would not be
* called anymore when the specific invoke event has been occured.
*
* @param type Type of the event
* @param listener The listener function to erase
*/
public off<Type extends InvokeEvent.Type>(
type: Type,
listener: (event: InvokeEvent.EventMapper[Type]) => void,
): void {
const it = this.event_listeners_.find(type);
if (it.equals(this.event_listeners_.end()) === false)
it.second.erase(listener as (event: InvokeEvent) => void);
if (it.second.empty()) this.event_listeners_.erase(it);
}

/**
Expand All @@ -93,7 +146,7 @@ export abstract class Communicator<
: new Error("Connection has been closed.");

for (const entry of this.promises_) {
const reject: FunctionLike = entry.second.second;
const reject: FunctionLike = entry.second.reject;
reject(rejectError);
}

Expand All @@ -116,7 +169,6 @@ export abstract class Communicator<
*/
private _Proxy_func(name: string): FunctionLike {
const func = (...params: any[]) => this._Call_function(name, ...params);

return new Proxy(func, {
get: ({}, newName: string) => {
if (newName === "bind")
Expand All @@ -125,7 +177,6 @@ export abstract class Communicator<
return (thisArg: any, ...args: any[]) => func.call(thisArg, ...args);
else if (newName === "apply")
return (thisArg: any, args: any[]) => func.apply(thisArg, args);

return this._Proxy_func(`${name}.${newName}`);
},
});
Expand Down Expand Up @@ -155,8 +206,27 @@ export abstract class Communicator<
})),
};

// CALL EVENT LISTENERS
const eventSetIterator = this.event_listeners_.find("send");
if (eventSetIterator.equals(this.event_listeners_.end()) === false) {
const event: InvokeEvent.ISend = {
type: "send",
time: new Date(),
function: invoke,
};
for (const listener of eventSetIterator.second)
try {
listener(event);
} catch {}
}

// DO SEND WITH PROMISE
this.promises_.emplace(invoke.uid, new Pair(resolve, reject));
this.promises_.emplace(invoke.uid, {
function: invoke,
time: new Date(),
resolve,
reject,
});
await this.sendData(invoke);
});
}
Expand Down Expand Up @@ -271,14 +341,15 @@ export abstract class Communicator<
protected replyData(invoke: Invoke): void {
if ((invoke as Invoke.IFunction).listener)
this._Handle_function(invoke as Invoke.IFunction).catch(() => {});
else this._Handle_return(invoke as Invoke.IReturn);
else this._Handle_complete(invoke as Invoke.IReturn);
}

/**
* @hidden
*/
private async _Handle_function(invoke: Invoke.IFunction): Promise<void> {
const uid: number = invoke.uid;
const time: Date = new Date();

try {
//----
Expand Down Expand Up @@ -323,33 +394,80 @@ export abstract class Communicator<
}
func = func.bind(thisArg);

// CALL EVENT LISTENERS
const eventSetIterator: HashMap.Iterator<
InvokeEvent.Type,
HashSet<(event: InvokeEvent) => void>
> = this.event_listeners_.find("receive");
if (eventSetIterator.equals(this.event_listeners_.end()) === false) {
const event: InvokeEvent.IReceive = {
type: "receive",
time,
function: invoke,
};
for (const closure of eventSetIterator.second)
try {
closure(event);
} catch {}
}

//----
// RETURN VALUE
//----
// CALL FUNCTION
const parameters: any[] = invoke.parameters.map((p) => p.value);
const ret: any = await func(...parameters);

await this._Send_return(uid, true, ret);
const result: any = await func(...parameters);
await this._Send_return({
invoke,
time,
return: {
uid,
success: true,
value: result,
},
});
} catch (exp) {
await this._Send_return(uid, false, exp);
await this._Send_return({
invoke,
time,
return: {
uid,
success: false,
value: exp,
},
});
}
}

/**
* @hidden
*/
private _Handle_return(invoke: Invoke.IReturn): void {
// GET THE PROMISE OBJECT
private _Handle_complete(invoke: Invoke.IReturn): void {
// FIND TARGET FUNCTION CALL
const it = this.promises_.find(invoke.uid);
if (it.equals(this.promises_.end())) return;

// CALL EVENT LISTENERS
const eventSetIterator = this.event_listeners_.find("complete");
if (eventSetIterator.equals(this.event_listeners_.end()) === false) {
const event: InvokeEvent.IComplete = {
type: "complete",
function: it.second.function,
return: invoke,
requested_at: it.second.time,
completed_at: new Date(),
};
for (const closure of eventSetIterator.second)
try {
closure(event);
} catch {}
}

// RETURNS
const func: FunctionLike = invoke.success
? it.second.first
: it.second.second;
? it.second.resolve
: it.second.reject;
this.promises_.erase(it);

func(invoke.value);
}

Expand All @@ -366,23 +484,39 @@ export abstract class Communicator<
/**
* @hidden
*/
private async _Send_return(
uid: number,
success: boolean,
value: any,
): Promise<void> {
private async _Send_return(props: {
invoke: Invoke.IFunction;
return: Invoke.IReturn;
time: Date;
}): Promise<void> {
const eventSet = this.event_listeners_.find("return");
if (eventSet.equals(this.event_listeners_.end()) === false) {
const event: InvokeEvent.IReturn = {
type: "return",
function: props.invoke,
return: props.return,
requested_at: props.time,
completed_at: new Date(),
};
for (const closure of eventSet.second)
try {
closure(event);
} catch {}
}

// SPECIAL LOGIC FOR ERROR -> FOR CLEAR JSON ENCODING
if (success === false && value instanceof Error)
value = serializeError(value);
if (props.return.success === false && props.return.value instanceof Error)
props.return.value = serializeError(props.return.value);

// RETURNS
const ret: Invoke.IReturn = {
uid,
success,
value,
};
await this.sendData(ret);
await this.sendData(props.return);
}
}

type FunctionLike = (...args: any[]) => any;
interface IFunctionReservation {
function: Invoke.IFunction;
time: Date;
resolve: FunctionLike;
reject: FunctionLike;
}
8 changes: 4 additions & 4 deletions src/components/Invoke.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ export namespace Invoke {
/**
* Unique identifier.
*/
uid: number;
readonly uid: number;

/**
* Target function (sometimes capsuled in objects) to call.
*/
listener: string;
readonly listener: string;

/**
* Parameters for the function call.
Expand Down Expand Up @@ -49,12 +49,12 @@ export namespace Invoke {
/**
* Unique identifier.
*/
uid: number;
readonly uid: number;

/**
* `true` -> return, `false` -> exception.
*/
success: boolean;
readonly success: boolean;

/**
* Returned value or thrown exception.
Expand Down
Loading