Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport Agnostic RPC implementation #498

Merged
merged 44 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
cb7a7d6
feat: message separation and parsing
tegefaulkes Jan 3, 2023
6418c58
tests: expanding message aritraries and testing parsers
tegefaulkes Jan 5, 2023
8cf279e
feat: rpc server
tegefaulkes Jan 6, 2023
6405186
fix: renaming Rpc to RPC
tegefaulkes Jan 10, 2023
1764e38
fix: fixing up stream parsing
tegefaulkes Jan 11, 2023
eb60e1f
fix: swapped client and server streaming logic, it was reversed
tegefaulkes Jan 11, 2023
abb6202
tests: creating tests for handlers
tegefaulkes Jan 11, 2023
74f10b8
feat: creating `RPCClient`
tegefaulkes Jan 12, 2023
d2a1aef
fix: updating parser path filtering
tegefaulkes Jan 16, 2023
28f7eba
feat: generic client callers
tegefaulkes Jan 16, 2023
ca524bc
feat: generic stream pair callback
tegefaulkes Jan 18, 2023
f170162
fix: enforcing RPC message size limit
tegefaulkes Jan 18, 2023
b26e86e
fix: switched client and server to `CreateDestroy`
tegefaulkes Jan 18, 2023
a401594
fix: fixing up message types and parsing
tegefaulkes Jan 19, 2023
cf13564
tests: client and server integration tests
tegefaulkes Jan 19, 2023
f20b342
feat: client error handling
tegefaulkes Jan 19, 2023
7cb9040
fix: small bug with uncaught promise
tegefaulkes Jan 20, 2023
c0c9920
fix: changed generic callers to use a ReadableWritablePair as the int…
tegefaulkes Jan 20, 2023
3f12ea8
feat: implementing withXCaller CO style methods
tegefaulkes Jan 23, 2023
e210e48
feat: fleshing out error handling
tegefaulkes Jan 23, 2023
260f23b
feat: updated streams to use `Uint8Array` instead of `Buffer`
tegefaulkes Jan 23, 2023
74450a9
feat: middleware
tegefaulkes Jan 24, 2023
586c990
feat: client handler usage examples
tegefaulkes Jan 27, 2023
fb198ff
fix: refactoring middleware
tegefaulkes Jan 27, 2023
29e61a9
feat: agentUnlock example
tegefaulkes Jan 30, 2023
a487fbb
tests: test fixes
tegefaulkes Jan 31, 2023
05e6c6b
feat: client-client websocket communication
tegefaulkes Jan 31, 2023
5426020
feat: raw handlers
tegefaulkes Feb 2, 2023
ceddf68
feat: client side manifest and typed callers
tegefaulkes Feb 6, 2023
c8ebd74
feat: static registering handlers using manifest
tegefaulkes Feb 7, 2023
fe0a0b7
feat: static middleware for server
tegefaulkes Feb 7, 2023
6cceeda
fix: general cleaning up
tegefaulkes Feb 8, 2023
282913e
feat: class based client and server manifests
tegefaulkes Feb 9, 2023
cad1d07
fix: updated metadata type usage
tegefaulkes Feb 10, 2023
b0cfc7a
feat: client methods refactor
tegefaulkes Feb 10, 2023
534038f
feat: client static middleware registration
tegefaulkes Feb 10, 2023
8210075
fix: converting generators to async iterables
tegefaulkes Feb 10, 2023
427adac
feat: handler class using abstract function
tegefaulkes Feb 10, 2023
632281c
feat: updating metadata wrapper type name
tegefaulkes Feb 10, 2023
ffb8b57
fix: changed client stream caller signature
tegefaulkes Feb 13, 2023
5b36049
tests: expanding error tests
tegefaulkes Feb 13, 2023
89c6c59
fix: removing old test files
tegefaulkes Feb 13, 2023
1d88f5d
fix: extracting middleware to its own files
tegefaulkes Feb 13, 2023
71833a0
feat: updated client caller interfaces
tegefaulkes Feb 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
"@peculiar/webcrypto": "^1.4.0",
"@peculiar/x509": "^1.8.3",
"@scure/bip39": "^1.1.0",
"@types/ws": "^8.5.4",
"ajv": "^7.0.4",
"bip39": "^3.0.3",
"canonicalize": "^1.0.5",
Expand All @@ -118,13 +119,15 @@
"resource-counter": "^1.2.4",
"sodium-native": "^3.4.1",
"threads": "^1.6.5",
"utp-native": "^2.5.3",
"tslib": "^2.4.0",
"tsyringe": "^4.7.0"
"tsyringe": "^4.7.0",
"utp-native": "^2.5.3",
"ws": "^8.12.0"
},
"devDependencies": {
"@babel/preset-env": "^7.13.10",
"@fast-check/jest": "^1.1.0",
"@streamparser/json": "^0.0.12",
"@swc/core": "^1.2.215",
"@types/cross-spawn": "^6.0.2",
"@types/google-protobuf": "^3.7.4",
Expand Down
222 changes: 222 additions & 0 deletions src/RPC/RPCClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
import type {
HandlerType,
JsonRpcRequestMessage,
StreamPairCreateCallback,
ClientManifest,
} from './types';
import type { JSONValue } from 'types';
import type {
ReadableWritablePair,
WritableStream,
ReadableStream,
} from 'stream/web';
import type {
JsonRpcRequest,
JsonRpcResponse,
MiddlewareFactory,
MapCallers,
} from './types';
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
import Logger from '@matrixai/logger';
import * as middlewareUtils from './middleware';
import * as rpcErrors from './errors';
import * as rpcUtils from './utils';
import {
clientInputTransformStream,
clientOutputTransformStream,
} from './utils';
import { never } from '../utils';

// eslint-disable-next-line
interface RPCClient<M extends ClientManifest> extends CreateDestroy {}
@CreateDestroy()
class RPCClient<M extends ClientManifest> {
static async createRPCClient<M extends ClientManifest>({
manifest,
streamPairCreateCallback,
middleware = middlewareUtils.defaultClientMiddlewareWrapper(),
logger = new Logger(this.name),
}: {
manifest: M;
streamPairCreateCallback: StreamPairCreateCallback;
middleware?: MiddlewareFactory<
Uint8Array,
JsonRpcRequest,
JsonRpcResponse,
Uint8Array
>;
logger?: Logger;
}) {
logger.info(`Creating ${this.name}`);
const rpcClient = new this({
manifest,
streamPairCreateCallback,
middleware,
logger,
});
logger.info(`Created ${this.name}`);
return rpcClient;
}

protected logger: Logger;
protected streamPairCreateCallback: StreamPairCreateCallback;
protected middleware: MiddlewareFactory<
Uint8Array,
JsonRpcRequest,
JsonRpcResponse,
Uint8Array
>;
protected callerTypes: Record<string, HandlerType>;
// Method proxies
public readonly methodsProxy = new Proxy(
{},
{
get: (_, method) => {
if (typeof method === 'symbol') throw never();
switch (this.callerTypes[method]) {
case 'UNARY':
return (params) => this.unaryCaller(method, params);
case 'SERVER':
return (params) => this.serverStreamCaller(method, params);
case 'CLIENT':
return () => this.clientStreamCaller(method);
case 'DUPLEX':
return () => this.duplexStreamCaller(method);
case 'RAW':
return (header) => this.rawStreamCaller(method, header);
default:
return;
}
},
},
);

public constructor({
manifest,
streamPairCreateCallback,
middleware,
logger,
}: {
manifest: M;
streamPairCreateCallback: StreamPairCreateCallback;
middleware: MiddlewareFactory<
Uint8Array,
JsonRpcRequest,
JsonRpcResponse,
Uint8Array
>;
logger: Logger;
}) {
this.callerTypes = rpcUtils.getHandlerTypes(manifest);
this.streamPairCreateCallback = streamPairCreateCallback;
this.middleware = middleware;
this.logger = logger;
}

public async destroy(): Promise<void> {
this.logger.info(`Destroying ${this.constructor.name}`);
this.logger.info(`Destroyed ${this.constructor.name}`);
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public get methods(): MapCallers<M> {
return this.methodsProxy as MapCallers<M>;
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public async unaryCaller<I extends JSONValue, O extends JSONValue>(
method: string,
parameters: I,
): Promise<O> {
const callerInterface = await this.duplexStreamCaller<I, O>(method);
const reader = callerInterface.readable.getReader();
const writer = callerInterface.writable.getWriter();
await writer.write(parameters);
const output = await reader.read();
if (output.done) {
throw new rpcErrors.ErrorRpcRemoteError('Stream ended before response');
}
await reader.cancel();
await writer.close();
return output.value;
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public async serverStreamCaller<I extends JSONValue, O extends JSONValue>(
method: string,
parameters: I,
): Promise<ReadableStream<O>> {
const callerInterface = await this.duplexStreamCaller<I, O>(method);
const writer = callerInterface.writable.getWriter();
await writer.write(parameters);
await writer.close();

return callerInterface.readable;
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public async clientStreamCaller<I extends JSONValue, O extends JSONValue>(
method: string,
): Promise<{
output: Promise<O>;
writable: WritableStream<I>;
}> {
const callerInterface = await this.duplexStreamCaller<I, O>(method);
const reader = callerInterface.readable.getReader();
const output = reader.read().then(({ value, done }) => {
if (done) {
throw new rpcErrors.ErrorRpcRemoteError('Stream ended before response');
}
return value;
});
return {
output,
writable: callerInterface.writable,
};
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public async duplexStreamCaller<I extends JSONValue, O extends JSONValue>(
method: string,
): Promise<ReadableWritablePair<O, I>> {
const outputMessageTransformStream = clientOutputTransformStream<O>();
const inputMessageTransformStream = clientInputTransformStream<I>(method);
const middleware = this.middleware();
// Hooking up agnostic stream side
const streamPair = await this.streamPairCreateCallback();
void streamPair.readable
.pipeThrough(middleware.reverse)
.pipeTo(outputMessageTransformStream.writable)
.catch(() => {});
void inputMessageTransformStream.readable
.pipeThrough(middleware.forward)
.pipeTo(streamPair.writable)
.catch(() => {});

// Returning interface
return {
readable: outputMessageTransformStream.readable,
writable: inputMessageTransformStream.writable,
};
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public async rawStreamCaller(
method: string,
headerParams: JSONValue,
): Promise<ReadableWritablePair<Uint8Array, Uint8Array>> {
const streamPair = await this.streamPairCreateCallback();
const tempWriter = streamPair.writable.getWriter();
const header: JsonRpcRequestMessage = {
jsonrpc: '2.0',
method,
params: headerParams,
id: null,
};
await tempWriter.write(Buffer.from(JSON.stringify(header)));
tempWriter.releaseLock();
return streamPair;
}
}

export default RPCClient;
Loading