Skip to content

Commit

Permalink
create a map of message mandlers instead of listening to the socket e…
Browse files Browse the repository at this point in the history
…vent `message` many times
  • Loading branch information
LiranCohen committed Apr 24, 2024
1 parent 4f36a93 commit 58c028a
Showing 1 changed file with 24 additions and 10 deletions.
34 changes: 24 additions & 10 deletions packages/agent/src/prototyping/clients/json-rpc-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ export interface JsonRpcSocketOptions {
* This was done in order to avoid taking a dependency on the `dwn-server`, until a future time when there will be a `clients` package.
*/
export class JsonRpcSocket {
private messageHandlers: Map<JsonRpcId, (event: { data: any }) => void> = new Map();

private constructor(private socket: IsomorphicWebSocket, private responseTimeout: number) {}

static async connect(url: string, options: JsonRpcSocketOptions = {}): Promise<JsonRpcSocket> {
Expand All @@ -49,10 +51,20 @@ export class JsonRpcSocket {

return new Promise<JsonRpcSocket>((resolve, reject) => {
socket.addEventListener('open', () => {
resolve(new JsonRpcSocket(socket, responseTimeout));
const jsonRpcSocket = new JsonRpcSocket(socket, responseTimeout);

socket.addEventListener('message', (event: { data: any }) => {
const jsonRpcResponse = parseJson(event.data) as JsonRpcResponse;
const handler = jsonRpcSocket.messageHandlers.get(jsonRpcResponse.id);
if (handler) {
handler(event);
}
});

resolve(jsonRpcSocket);
});

socket.addEventListener('error', (error) => {
socket.addEventListener('error', (error: any) => {
reject(error);
});

Expand All @@ -75,17 +87,18 @@ export class JsonRpcSocket {
const jsonRpsResponse = parseJson(event.data) as JsonRpcResponse;
if (jsonRpsResponse.id === request.id) {
// if the incoming response id matches the request id, we will remove the listener and resolve the response
this.socket.removeEventListener('message', handleResponse);
this.messageHandlers.delete(request.id);
return resolve(jsonRpsResponse);
}
};
// subscribe to the listener before sending the request
this.socket.addEventListener('message', handleResponse);

// add the listener to the map of message handlers
this.messageHandlers.set(request.id, handleResponse);
this.send(request);

// reject this promise if we don't receive any response back within the timeout period
setTimeout(() => {
this.socket.removeEventListener('message', handleResponse);
this.messageHandlers.delete(request.id!);
reject(new Error('request timed out'));
}, this.responseTimeout);
});
Expand Down Expand Up @@ -114,23 +127,24 @@ export class JsonRpcSocket {
if (jsonRpcResponse.id === subscriptionId) {
if (jsonRpcResponse.error !== undefined) {
// remove the event listener upon receipt of a JSON RPC Error.
this.socket.removeEventListener('message', socketEventListener);
this.messageHandlers.delete(subscriptionId);
this.closeSubscription(subscriptionId);
}
listener(jsonRpcResponse);
}
};
this.socket.addEventListener('message', socketEventListener);

this.messageHandlers.set(subscriptionId, socketEventListener);

const response = await this.request(request);
if (response.error) {
this.socket.removeEventListener('message', socketEventListener);
this.messageHandlers.delete(subscriptionId);
return { response };
}

// clean up listener and create a `rpc.subscribe.close` message to use when closing this JSON RPC subscription
const close = async (): Promise<void> => {
this.socket.removeEventListener('message', socketEventListener);
this.messageHandlers.delete(subscriptionId);
await this.closeSubscription(subscriptionId);
};

Expand Down

0 comments on commit 58c028a

Please sign in to comment.