Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@web5/agent DWN + Web5 RPC Clients #433

Merged
merged 13 commits into from
May 1, 2024
14 changes: 14 additions & 0 deletions .changeset/proud-bottles-develop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
"@web5/agent": patch
"@web5/identity-agent": patch
"@web5/proxy-agent": patch
"@web5/user-agent": patch
---

Extend and Test RPC DWN/Web5 Clients to support `http` and `ws`
- move `HttpDwnRpcClient` to `/prototyping` folder
- move `JSON RPC` related files to `/prototyping` folder
- create `WebSocketDwnRpcClient` in `/prototyping` folder
- create `WebSocketWeb5RpcClient` wrapper in `rpc-client`
- does not support `sendDidRequest` via sockets

1 change: 1 addition & 0 deletions packages/agent/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
"@web5/dids": "1.0.1",
"abstract-level": "1.0.4",
"ed25519-keygen": "0.4.11",
"isomorphic-ws": "^5.0.0",
"level": "8.0.0",
"ms": "2.1.3",
"readable-web-to-node-stream": "3.0.2",
Expand Down
3 changes: 1 addition & 2 deletions packages/agent/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ export * from './did-api.js';
export * from './dwn-api.js';
export * from './hd-identity-vault.js';
export * from './identity-api.js';
export * from './json-rpc.js';
export * from './local-key-manager.js';
export * from './rpc-client.js';
export * from './store-data.js';
Expand All @@ -22,4 +21,4 @@ export * from './store-key.js';
export * from './sync-api.js';
export * from './sync-engine-level.js';
export * from './test-harness.js';
export * from './utils.js';
export * from './utils.js';
55 changes: 55 additions & 0 deletions packages/agent/src/prototyping/clients/dwn-rpc-types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import type { MessageEvent, RecordsReadReply, UnionMessageReply } from '@tbd54566975/dwn-sdk-js';

export interface SerializableDwnMessage {
toJSON(): string;
}

export type DwnEventSubscriptionHandler = (event: MessageEvent) => void;

/**
* Interface for communicating with {@link https://github.com/TBD54566975/dwn-server | DWN Servers}
* via JSON-RPC, supporting operations like sending DWN requests.
*/
export interface DwnRpc {
/**
* Lists the transport protocols supported by the DWN RPC client, such as HTTP or HTTPS.
* @returns An array of strings representing the supported transport protocols.
*/
get transportProtocols(): string[]

/**
* Sends a request to a DWN Server using the specified DWN RPC request parameters.
*
* @param request - The DWN RPC request containing the URL, target DID, message, and optional data.
* @returns A promise that resolves to the response from the DWN server.
*/
sendDwnRequest(request: DwnRpcRequest): Promise<DwnRpcResponse>
}


/**
* Represents a JSON RPC request to a DWN server, including the URL, target DID, the message to be
* processed, and optional data.
*/
export type DwnRpcRequest = {
/** Optional data to be sent with the request. */
data?: any;

/** The URL of the DWN server to which the request is sent. */
dwnUrl: string;

/** The message to be processed by the DWN server, which can be a serializable DWN message. */
message: SerializableDwnMessage | any;

/** The DID of the target to which the message is addressed. */
targetDid: string;

/** Optional subscription handler for DWN events. */
subscriptionHandler?: DwnEventSubscriptionHandler;
}

/**
* Represents the JSON RPC response from a DWN server to a request, combining the results of various
* DWN operations.
*/
export type DwnRpcResponse = UnionMessageReply & RecordsReadReply;
68 changes: 68 additions & 0 deletions packages/agent/src/prototyping/clients/http-dwn-rpc-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import type { JsonRpcResponse } from './json-rpc.js';
import type { DwnRpc, DwnRpcRequest, DwnRpcResponse } from './dwn-rpc-types.js';

import { createJsonRpcRequest, parseJson } from './json-rpc.js';
import { utils as cryptoUtils } from '@web5/crypto';

/**
* HTTP client that can be used to communicate with Dwn Servers
*/
export class HttpDwnRpcClient implements DwnRpc {
get transportProtocols() { return ['http:', 'https:']; }

async sendDwnRequest(request: DwnRpcRequest): Promise<DwnRpcResponse> {
const requestId = cryptoUtils.randomUuid();
const jsonRpcRequest = createJsonRpcRequest(requestId, 'dwn.processMessage', {
target : request.targetDid,
message : request.message
});

const fetchOpts = {
method : 'POST',
headers : {
'dwn-request': JSON.stringify(jsonRpcRequest)
}
};

if (request.data) {
// @ts-expect-error TODO: REMOVE
fetchOpts.headers['content-type'] = 'application/octet-stream';
// @ts-expect-error TODO: REMOVE
fetchOpts['body'] = request.data;
}

const resp = await fetch(request.dwnUrl, fetchOpts);
let dwnRpcResponse: JsonRpcResponse;

// check to see if response is in header first. if it is, that means the response is a ReadableStream
let dataStream;
const { headers } = resp;
if (headers.has('dwn-response')) {
// @ts-expect-error TODO: REMOVE
const jsonRpcResponse = parseJson(headers.get('dwn-response')) as JsonRpcResponse;

if (jsonRpcResponse == null) {
throw new Error(`failed to parse json rpc response. dwn url: ${request.dwnUrl}`);
}

dataStream = resp.body;
dwnRpcResponse = jsonRpcResponse;
} else {
// TODO: wonder if i need to try/catch this?
const responseBody = await resp.text();
dwnRpcResponse = JSON.parse(responseBody);
}

if (dwnRpcResponse.error) {
const { code, message } = dwnRpcResponse.error;
throw new Error(`(${code}) - ${message}`);
}

const { reply } = dwnRpcResponse.result;
if (dataStream) {
reply['record']['data'] = dataStream;
}

return reply as DwnRpcResponse;
}
}
169 changes: 169 additions & 0 deletions packages/agent/src/prototyping/clients/json-rpc-socket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import { utils as cryptoUtils } from '@web5/crypto';
import IsomorphicWebSocket from 'isomorphic-ws';
import { JsonRpcId, JsonRpcRequest, JsonRpcResponse, createJsonRpcSubscriptionRequest, parseJson } from './json-rpc.js';

// These were arbitrarily chosen, but can be modified via connect options
const CONNECT_TIMEOUT = 3_000;
const RESPONSE_TIMEOUT = 30_000;

export interface JsonRpcSocketOptions {
/** socket connection timeout in milliseconds */
connectTimeout?: number;
/** response timeout for rpc requests in milliseconds */
responseTimeout?: number;
/** optional connection close handler */
onclose?: () => void;
/** optional socket error handler */
onerror?: (error?: any) => void;
}

/**
* JSON RPC Socket Client for WebSocket request/response and long-running subscriptions.
*
* NOTE: This is temporarily copied over from https://github.com/TBD54566975/dwn-server/blob/main/src/json-rpc-socket.ts
* This was done in order to avoid taking a dependency on the `dwn-server`, until a future time when there will be a `clients` package.
*/
export class JsonRpcSocket {
private messageHandlers: Map<JsonRpcId, (event: { data: any }) => void> = new Map();

private constructor(private socket: IsomorphicWebSocket, private responseTimeout: number) {}

static async connect(url: string, options: JsonRpcSocketOptions = {}): Promise<JsonRpcSocket> {
const { connectTimeout = CONNECT_TIMEOUT, responseTimeout = RESPONSE_TIMEOUT, onclose, onerror } = options;

const socket = new IsomorphicWebSocket(url);

if (!onclose) {
socket.onclose = ():void => {
console.info(`JSON RPC Socket close ${url}`);
};
} else {
socket.onclose = onclose;
}

if (!onerror) {
socket.onerror = (error?: any):void => {
console.error(`JSON RPC Socket error ${url}`, error);
};
} else {
socket.onerror = onerror;
}

return new Promise<JsonRpcSocket>((resolve, reject) => {
socket.addEventListener('open', () => {
const jsonRpcSocket = new JsonRpcSocket(socket, responseTimeout);

socket.addEventListener('message', (event: { data: any }) => {
const jsonRpcResponse = parseJson(event.data) as JsonRpcResponse;
const handler = jsonRpcSocket.messageHandlers.get(jsonRpcResponse.id);
if (handler) {
handler(event);
}
});

resolve(jsonRpcSocket);
});

socket.addEventListener('error', (error: any) => {
reject(error);
});

setTimeout(() => reject, connectTimeout);
});
}

close(): void {
this.socket.close();
}

/**
* Sends a JSON-RPC request through the socket and waits for a single response.
*/
async request(request: JsonRpcRequest): Promise<JsonRpcResponse> {
return new Promise((resolve, reject) => {
request.id ??= cryptoUtils.randomUuid();

const handleResponse = (event: { data: any }):void => {
const jsonRpsResponse = parseJson(event.data) as JsonRpcResponse;
if (jsonRpsResponse.id === request.id) {
// if the incoming response id matches the request id, we will remove the listener and resolve the response
this.messageHandlers.delete(request.id);
return resolve(jsonRpsResponse);
}
};

// add the listener to the map of message handlers
this.messageHandlers.set(request.id, handleResponse);
this.send(request);

// reject this promise if we don't receive any response back within the timeout period
setTimeout(() => {
this.messageHandlers.delete(request.id!);
reject(new Error('request timed out'));
}, this.responseTimeout);
});
}

/**
* Sends a JSON-RPC request through the socket and keeps a listener open to read associated responses as they arrive.
* Returns a close method to clean up the listener.
*/
async subscribe(request: JsonRpcRequest, listener: (response: JsonRpcResponse) => void): Promise<{
response: JsonRpcResponse;
close?: () => Promise<void>;
}> {

if (!request.method.startsWith('rpc.subscribe.')) {
throw new Error('subscribe rpc requests must include the `rpc.subscribe` prefix');
}

if (!request.subscription) {
throw new Error('subscribe rpc requests must include subscribe options');
}

const subscriptionId = request.subscription.id;
const socketEventListener = (event: { data: any }):void => {
const jsonRpcResponse = parseJson(event.data.toString()) as JsonRpcResponse;
if (jsonRpcResponse.id === subscriptionId) {
if (jsonRpcResponse.error !== undefined) {
// remove the event listener upon receipt of a JSON RPC Error.
this.messageHandlers.delete(subscriptionId);
this.closeSubscription(subscriptionId);
}
listener(jsonRpcResponse);
}
};

this.messageHandlers.set(subscriptionId, socketEventListener);

const response = await this.request(request);
if (response.error) {
this.messageHandlers.delete(subscriptionId);
return { response };
}

// clean up listener and create a `rpc.subscribe.close` message to use when closing this JSON RPC subscription
const close = async (): Promise<void> => {
this.messageHandlers.delete(subscriptionId);
await this.closeSubscription(subscriptionId);
};

return {
response,
close
};
}

private closeSubscription(id: JsonRpcId): Promise<JsonRpcResponse> {
const requestId = cryptoUtils.randomUuid();
const request = createJsonRpcSubscriptionRequest(requestId, 'close', id, {});
return this.request(request);
}

/**
* Sends a JSON-RPC request through the socket. You must subscribe to a message listener separately to capture the response.
*/
send(request: JsonRpcRequest):void {
this.socket.send(JSON.stringify(request));
}
}
Loading
Loading