Skip to content

Commit

Permalink
feat: class based client and server manifests
Browse files Browse the repository at this point in the history
- Related #500
- Related #501

[ci skip]
  • Loading branch information
tegefaulkes committed Feb 14, 2023
1 parent 6cceeda commit 282913e
Show file tree
Hide file tree
Showing 14 changed files with 543 additions and 482 deletions.
20 changes: 10 additions & 10 deletions src/RPC/RPCClient.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import type {
HandlerType,
JsonRpcRequestMessage,
Manifest,
MapWithHandlers,
StreamPairCreateCallback,
ClientManifest,
MapWithCallers,
} from './types';
import type { JSONValue } from 'types';
import type {
Expand All @@ -15,18 +15,18 @@ import type {
JsonRpcRequest,
JsonRpcResponse,
MiddlewareFactory,
MapHandlers,
MapCallers,
} from './types';
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
import Logger from '@matrixai/logger';
import * as rpcErrors from './errors';
import * as rpcUtils from './utils';

// eslint-disable-next-line
interface RPCClient<M extends Manifest> extends CreateDestroy {}
interface RPCClient<M extends ClientManifest> extends CreateDestroy {}
@CreateDestroy()
class RPCClient<M extends Manifest> {
static async createRPCClient<M extends Manifest>({
class RPCClient<M extends ClientManifest> {
static async createRPCClient<M extends ClientManifest>({
manifest,
streamPairCreateCallback,
logger = new Logger(this.name),
Expand Down Expand Up @@ -113,13 +113,13 @@ class RPCClient<M extends Manifest> {
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public get methods(): MapHandlers<M> {
return this.methodsProxy as MapHandlers<M>;
public get methods(): MapCallers<M> {
return this.methodsProxy as MapCallers<M>;
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public get withMethods(): MapWithHandlers<M> {
return this.withMethodsProxy as MapWithHandlers<M>;
public get withMethods(): MapWithCallers<M> {
return this.withMethodsProxy as MapWithCallers<M>;
}

@ready(new rpcErrors.ErrorRpcDestroyed())
Expand Down
112 changes: 54 additions & 58 deletions src/RPC/RPCServer.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
import type {
ClientStreamHandler,
DuplexStreamHandler,
ClientHandlerImplementation,
DuplexHandlerImplementation,
JsonRpcError,
JsonRpcRequest,
JsonRpcResponse,
JsonRpcResponseError,
JsonRpcResponseResult,
Manifest,
RawDuplexStreamHandler,
ServerStreamHandler,
UnaryHandler,
ServerManifest,
RawHandlerImplementation,
ServerHandlerImplementation,
UnaryHandlerImplementation,
} from './types';
import type { ReadableWritablePair } from 'stream/web';
import type { JSONValue, POJO } from '../types';
import type { JSONValue } from '../types';
import type { ConnectionInfo } from '../network/types';
import type { RPCErrorEvent } from './utils';
import type { MiddlewareFactory } from './types';
import { ReadableStream } from 'stream/web';
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
import Logger from '@matrixai/logger';
import { PromiseCancellable } from '@matrixai/async-cancellable';
import {
ClientHandler,
DuplexHandler,
RawHandler,
ServerHandler,
UnaryHandler,
} from './handlers';
import * as rpcUtils from './utils';
import * as rpcErrors from './errors';
import { never } from '../utils/utils';
Expand All @@ -30,12 +37,10 @@ interface RPCServer extends CreateDestroy {}
class RPCServer {
static async createRPCServer({
manifest,
container,
middleware = rpcUtils.defaultMiddlewareWrapper(),
logger = new Logger(this.name),
}: {
manifest: Manifest;
container: POJO;
manifest: ServerManifest;
middleware?: MiddlewareFactory<
JsonRpcRequest,
Uint8Array,
Expand All @@ -47,7 +52,6 @@ class RPCServer {
logger.info(`Creating ${this.name}`);
const rpcServer = new this({
manifest,
container,
middleware,
logger,
});
Expand All @@ -56,9 +60,8 @@ class RPCServer {
}

// Properties
protected container: POJO;
protected logger: Logger;
protected handlerMap: Map<string, RawDuplexStreamHandler> = new Map();
protected handlerMap: Map<string, RawHandlerImplementation> = new Map();
protected activeStreams: Set<PromiseCancellable<void>> = new Set();
protected events: EventTarget = new EventTarget();
protected middleware: MiddlewareFactory<
Expand All @@ -70,12 +73,10 @@ class RPCServer {

public constructor({
manifest,
container,
middleware,
logger,
}: {
manifest: Manifest;
container: POJO;
manifest: ServerManifest;
middleware: MiddlewareFactory<
JsonRpcRequest,
Uint8Array,
Expand All @@ -85,27 +86,32 @@ class RPCServer {
logger: Logger;
}) {
for (const [key, manifestItem] of Object.entries(manifest)) {
switch (manifestItem.type) {
case 'RAW':
this.registerRawStreamHandler(key, manifestItem.handler);
continue;
case 'DUPLEX':
this.registerDuplexStreamHandler(key, manifestItem.handler);
continue;
case 'SERVER':
this.registerServerStreamHandler(key, manifestItem.handler);
continue;
case 'CLIENT':
this.registerClientStreamHandler(key, manifestItem.handler);
continue;
case 'UNARY':
this.registerUnaryHandler(key, manifestItem.handler);
continue;
default:
never();
if (manifestItem instanceof RawHandler) {
this.registerRawStreamHandler(key, manifestItem.handle);
continue;
}
if (manifestItem instanceof DuplexHandler) {
this.registerDuplexStreamHandler(key, manifestItem.handle);
continue;
}
if (manifestItem instanceof ServerHandler) {
this.registerServerStreamHandler(key, manifestItem.handle);
continue;
}
if (manifestItem instanceof ClientHandler) {
this.registerClientStreamHandler(key, manifestItem.handle);
continue;
}
if (manifestItem instanceof ClientHandler) {
this.registerClientStreamHandler(key, manifestItem.handle);
continue;
}
if (manifestItem instanceof UnaryHandler) {
this.registerUnaryHandler(key, manifestItem.handle);
continue;
}
never();
}
this.container = container;
this.middleware = middleware;
this.logger = logger;
}
Expand All @@ -124,21 +130,20 @@ class RPCServer {

protected registerRawStreamHandler(
method: string,
handler: RawDuplexStreamHandler,
handler: RawHandlerImplementation,
) {
this.handlerMap.set(method, handler);
}

protected registerDuplexStreamHandler<
I extends JSONValue,
O extends JSONValue,
>(method: string, handler: DuplexStreamHandler<I, O>) {
>(method: string, handler: DuplexHandlerImplementation<I, O>) {
// This needs to handle all the message parsing and conversion from
// generators to the raw streams.

const rawSteamHandler: RawDuplexStreamHandler = (
const rawSteamHandler: RawHandlerImplementation = (
[input, header],
container,
connectionInfo,
ctx,
) => {
Expand All @@ -154,12 +159,7 @@ class RPCServer {
yield data.params as I;
}
};
for await (const response of handler(
dataGen(),
container,
connectionInfo,
ctx,
)) {
for await (const response of handler(dataGen(), connectionInfo, ctx)) {
const responseMessage: JsonRpcResponseResult = {
jsonrpc: '2.0',
result: response,
Expand Down Expand Up @@ -222,16 +222,15 @@ class RPCServer {

protected registerUnaryHandler<I extends JSONValue, O extends JSONValue>(
method: string,
handler: UnaryHandler<I, O>,
handler: UnaryHandlerImplementation<I, O>,
) {
const wrapperDuplex: DuplexStreamHandler<I, O> = async function* (
const wrapperDuplex: DuplexHandlerImplementation<I, O> = async function* (
input,
container,
connectionInfo,
ctx,
) {
for await (const inputVal of input) {
yield handler(inputVal, container, connectionInfo, ctx);
yield handler(inputVal, connectionInfo, ctx);
break;
}
};
Expand All @@ -241,15 +240,14 @@ class RPCServer {
protected registerServerStreamHandler<
I extends JSONValue,
O extends JSONValue,
>(method: string, handler: ServerStreamHandler<I, O>) {
const wrapperDuplex: DuplexStreamHandler<I, O> = async function* (
>(method: string, handler: ServerHandlerImplementation<I, O>) {
const wrapperDuplex: DuplexHandlerImplementation<I, O> = async function* (
input,
container,
connectionInfo,
ctx,
) {
for await (const inputVal of input) {
yield* handler(inputVal, container, connectionInfo, ctx);
yield* handler(inputVal, connectionInfo, ctx);
break;
}
};
Expand All @@ -259,14 +257,13 @@ class RPCServer {
protected registerClientStreamHandler<
I extends JSONValue,
O extends JSONValue,
>(method: string, handler: ClientStreamHandler<I, O>) {
const wrapperDuplex: DuplexStreamHandler<I, O> = async function* (
>(method: string, handler: ClientHandlerImplementation<I, O>) {
const wrapperDuplex: DuplexHandlerImplementation<I, O> = async function* (
input,
container,
connectionInfo,
ctx,
) {
yield handler(input, container, connectionInfo, ctx);
yield handler(input, connectionInfo, ctx);
};
this.registerDuplexStreamHandler(method, wrapperDuplex);
}
Expand Down Expand Up @@ -321,7 +318,6 @@ class RPCServer {
}
const outputStream = handler(
[inputStream, leadingMetadataMessage],
this.container,
connectionInfo,
{ signal: abortController.signal },
);
Expand Down
53 changes: 53 additions & 0 deletions src/RPC/callers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import type { JSONValue } from 'types';
import type { HandlerType } from './types';

abstract class Caller<
Input extends JSONValue = JSONValue,
Output extends JSONValue = JSONValue,
> {
protected _inputType: Input;
protected _outputType: Output;
// Need this to distinguish the classes when inferring types
abstract type: HandlerType;
}

class RawCaller extends Caller {
public type: 'RAW' = 'RAW' as const;
}

class DuplexCaller<
Input extends JSONValue = JSONValue,
Output extends JSONValue = JSONValue,
> extends Caller<Input, Output> {
public type: 'DUPLEX' = 'DUPLEX' as const;
}

class ServerCaller<
Input extends JSONValue = JSONValue,
Output extends JSONValue = JSONValue,
> extends Caller<Input, Output> {
public type: 'SERVER' = 'SERVER' as const;
}

class ClientCaller<
Input extends JSONValue = JSONValue,
Output extends JSONValue = JSONValue,
> extends Caller<Input, Output> {
public type: 'CLIENT' = 'CLIENT' as const;
}

class UnaryCaller<
Input extends JSONValue = JSONValue,
Output extends JSONValue = JSONValue,
> extends Caller<Input, Output> {
public type: 'UNARY' = 'UNARY' as const;
}

export {
Caller,
RawCaller,
DuplexCaller,
ServerCaller,
ClientCaller,
UnaryCaller,
};
Loading

0 comments on commit 282913e

Please sign in to comment.