Skip to content

Commit

Permalink
Close #90: add Communicator.on() message with event types.
Browse files Browse the repository at this point in the history
  • Loading branch information
samchon committed Dec 2, 2024
1 parent 755cef8 commit ff34b03
Show file tree
Hide file tree
Showing 5 changed files with 372 additions and 34 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "tgrid",
"version": "1.0.3",
"version": "1.1.0",
"main": "lib/index.js",
"typings": "lib/index.d.ts",
"exports": {
Expand Down
192 changes: 163 additions & 29 deletions src/components/Communicator.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { ConditionVariable, HashMap, Pair } from "tstl";
import { ConditionVariable, HashMap, HashSet } from "tstl";

import { Driver } from "../typings/Driver";
import { serializeError } from "../utils/internal/serializeError";
import { Invoke } from "./Invoke";
import { InvokeEvent } from "./InvokeEvent";

/**
* The basic communicator.
Expand Down Expand Up @@ -44,7 +45,15 @@ export abstract class Communicator<
/**
* @hidden
*/
private promises_: HashMap<number, Pair<FunctionLike, FunctionLike>>;
private promises_: HashMap<number, IFunctionReservation>;

/**
* @hidden
*/
private event_listeners_: HashMap<
InvokeEvent.Type,
HashSet<(event: InvokeEvent) => void>
>;

/**
* @hidden
Expand Down Expand Up @@ -72,6 +81,50 @@ export abstract class Communicator<
// OTHER MEMBERS
this.promises_ = new HashMap();
this.join_cv_ = new ConditionVariable();
this.event_listeners_ = new HashMap();
}

/**
* Add invoke event listener.
*
* Add an event listener for the invoke event. The event listener would be called
* when some invoke event has been occured; sending, receiving, completing, or returning.
*
* If you change the requesting parameters or returning value in the event listener,
* it would affect to the RPC (Remote Procedure Call) communication. Therefore, you have
* to be careful when modifying the remote function calling.
*
* Of course, you can utilize the event listener just for monitoring the RPC events.
*
* @param type Type of the event
* @param listener The listener function to enroll
*/
public on<Type extends InvokeEvent.Type>(
type: Type,
listener: (event: InvokeEvent.EventMapper[Type]) => void,
): void {
this.event_listeners_
.take(type, () => new HashSet())
.insert(listener as (event: InvokeEvent) => void);
}

/**
* Erase invoke event listener.
*
* Erase an event listener from the invoke event. The event listener would not be
* called anymore when the specific invoke event has been occured.
*
* @param type Type of the event
* @param listener The listener function to erase
*/
public off<Type extends InvokeEvent.Type>(
type: Type,
listener: (event: InvokeEvent.EventMapper[Type]) => void,
): void {
const it = this.event_listeners_.find(type);
if (it.equals(this.event_listeners_.end()) === false)
it.second.erase(listener as (event: InvokeEvent) => void);
if (it.second.empty()) this.event_listeners_.erase(it);
}

/**
Expand All @@ -93,7 +146,7 @@ export abstract class Communicator<
: new Error("Connection has been closed.");

for (const entry of this.promises_) {
const reject: FunctionLike = entry.second.second;
const reject: FunctionLike = entry.second.reject;
reject(rejectError);
}

Expand All @@ -116,7 +169,6 @@ export abstract class Communicator<
*/
private _Proxy_func(name: string): FunctionLike {
const func = (...params: any[]) => this._Call_function(name, ...params);

return new Proxy(func, {
get: ({}, newName: string) => {
if (newName === "bind")
Expand All @@ -125,7 +177,6 @@ export abstract class Communicator<
return (thisArg: any, ...args: any[]) => func.call(thisArg, ...args);
else if (newName === "apply")
return (thisArg: any, args: any[]) => func.apply(thisArg, args);

return this._Proxy_func(`${name}.${newName}`);
},
});
Expand Down Expand Up @@ -155,8 +206,27 @@ export abstract class Communicator<
})),
};

// CALL EVENT LISTENERS
const eventSetIterator = this.event_listeners_.find("send");
if (eventSetIterator.equals(this.event_listeners_.end()) === false) {
const event: InvokeEvent.ISend = {
type: "send",
time: new Date(),
function: invoke,
};
for (const listener of eventSetIterator.second)
try {
listener(event);
} catch {}
}

// DO SEND WITH PROMISE
this.promises_.emplace(invoke.uid, new Pair(resolve, reject));
this.promises_.emplace(invoke.uid, {
function: invoke,
time: new Date(),
resolve,
reject,
});
await this.sendData(invoke);
});
}
Expand Down Expand Up @@ -271,14 +341,15 @@ export abstract class Communicator<
protected replyData(invoke: Invoke): void {
if ((invoke as Invoke.IFunction).listener)
this._Handle_function(invoke as Invoke.IFunction).catch(() => {});
else this._Handle_return(invoke as Invoke.IReturn);
else this._Handle_complete(invoke as Invoke.IReturn);
}

/**
* @hidden
*/
private async _Handle_function(invoke: Invoke.IFunction): Promise<void> {
const uid: number = invoke.uid;
const time: Date = new Date();

try {
//----
Expand Down Expand Up @@ -323,33 +394,80 @@ export abstract class Communicator<
}
func = func.bind(thisArg);

// CALL EVENT LISTENERS
const eventSetIterator: HashMap.Iterator<
InvokeEvent.Type,
HashSet<(event: InvokeEvent) => void>
> = this.event_listeners_.find("receive");
if (eventSetIterator.equals(this.event_listeners_.end()) === false) {
const event: InvokeEvent.IReceive = {
type: "receive",
time,
function: invoke,
};
for (const closure of eventSetIterator.second)
try {
closure(event);
} catch {}
}

//----
// RETURN VALUE
//----
// CALL FUNCTION
const parameters: any[] = invoke.parameters.map((p) => p.value);
const ret: any = await func(...parameters);

await this._Send_return(uid, true, ret);
const result: any = await func(...parameters);
await this._Send_return({
invoke,
time,
return: {
uid,
success: true,
value: result,
},
});
} catch (exp) {
await this._Send_return(uid, false, exp);
await this._Send_return({
invoke,
time,
return: {
uid,
success: false,
value: exp,
},
});
}
}

/**
* @hidden
*/
private _Handle_return(invoke: Invoke.IReturn): void {
// GET THE PROMISE OBJECT
private _Handle_complete(invoke: Invoke.IReturn): void {
// FIND TARGET FUNCTION CALL
const it = this.promises_.find(invoke.uid);
if (it.equals(this.promises_.end())) return;

// CALL EVENT LISTENERS
const eventSetIterator = this.event_listeners_.find("complete");
if (eventSetIterator.equals(this.event_listeners_.end()) === false) {
const event: InvokeEvent.IComplete = {
type: "complete",
function: it.second.function,
return: invoke,
requested_at: it.second.time,
completed_at: new Date(),
};
for (const closure of eventSetIterator.second)
try {
closure(event);
} catch {}
}

// RETURNS
const func: FunctionLike = invoke.success
? it.second.first
: it.second.second;
? it.second.resolve
: it.second.reject;
this.promises_.erase(it);

func(invoke.value);
}

Expand All @@ -366,23 +484,39 @@ export abstract class Communicator<
/**
* @hidden
*/
private async _Send_return(
uid: number,
success: boolean,
value: any,
): Promise<void> {
private async _Send_return(props: {
invoke: Invoke.IFunction;
return: Invoke.IReturn;
time: Date;
}): Promise<void> {
const eventSet = this.event_listeners_.find("return");
if (eventSet.equals(this.event_listeners_.end()) === false) {
const event: InvokeEvent.IReturn = {
type: "return",
function: props.invoke,
return: props.return,
requested_at: props.time,
completed_at: new Date(),
};
for (const closure of eventSet.second)
try {
closure(event);
} catch {}
}

// SPECIAL LOGIC FOR ERROR -> FOR CLEAR JSON ENCODING
if (success === false && value instanceof Error)
value = serializeError(value);
if (props.return.success === false && props.return.value instanceof Error)
props.return.value = serializeError(props.return.value);

// RETURNS
const ret: Invoke.IReturn = {
uid,
success,
value,
};
await this.sendData(ret);
await this.sendData(props.return);
}
}

type FunctionLike = (...args: any[]) => any;
interface IFunctionReservation {
function: Invoke.IFunction;
time: Date;
resolve: FunctionLike;
reject: FunctionLike;
}
8 changes: 4 additions & 4 deletions src/components/Invoke.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ export namespace Invoke {
/**
* Unique identifier.
*/
uid: number;
readonly uid: number;

/**
* Target function (sometimes calsuled in objects) to call.
*/
listener: string;
readonly listener: string;

/**
* Parameters for the function call.
Expand Down Expand Up @@ -49,12 +49,12 @@ export namespace Invoke {
/**
* Unique identifier.
*/
uid: number;
readonly uid: number;

/**
* `true` -> return, `false` -> exception.
*/
success: boolean;
readonly success: boolean;

/**
* Returned value or thrown exception.
Expand Down
Loading

0 comments on commit ff34b03

Please sign in to comment.