Skip to content

Commit

Permalink
wip: RPCClient middleware and test plus clean up
Browse files Browse the repository at this point in the history
Related #500
Related #501
Related #502

[ci skip]
  • Loading branch information
tegefaulkes committed Jan 25, 2023
1 parent 0e70a4c commit 14394d4
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 134 deletions.
61 changes: 55 additions & 6 deletions src/RPC/RPCClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import type { StreamPairCreateCallback } from './types';
import type { JSONValue, POJO } from 'types';
import type { ReadableWritablePair } from 'stream/web';
import type {
JsonRpcRequest,
JsonRpcResponse,
MiddlewareFactory,
Middleware,
} from './types';
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
import Logger from '@matrixai/logger';
import * as rpcErrors from './errors';
Expand Down Expand Up @@ -50,14 +56,24 @@ class RPCClient {
_metadata: POJO,
): Promise<ReadableWritablePair<O, I>> {
const streamPair = await this.streamPairCreateCallback();
const outputStream = streamPair.readable
.pipeThrough(
new rpcUtils.JsonToJsonMessageStream(rpcUtils.parseJsonRpcResponse),
)
.pipeThrough(new rpcUtils.ClientOutputTransformerStream<O>());
let reverseMiddlewareStream = streamPair.readable.pipeThrough(
new rpcUtils.JsonToJsonMessageStream(rpcUtils.parseJsonRpcResponse),
);
for (const middleWare of this.reverseMiddleware) {
const middle = middleWare();
reverseMiddlewareStream = middle(reverseMiddlewareStream);
}
const outputStream = reverseMiddlewareStream.pipeThrough(
new rpcUtils.ClientOutputTransformerStream<O>(),
);
const inputMessageTransformer =
new rpcUtils.ClientInputTransformerStream<I>(method);
void inputMessageTransformer.readable
let forwardMiddlewareStream = inputMessageTransformer.readable;
for (const middleware of this.forwardMiddleWare) {
const middle = middleware();
forwardMiddlewareStream = middle(forwardMiddlewareStream);
}
void forwardMiddlewareStream
.pipeThrough(new rpcUtils.JsonMessageToJsonStream())
.pipeTo(streamPair.writable)
.catch(() => {});
Expand Down Expand Up @@ -188,6 +204,39 @@ class RPCClient {
await writer.close();
return callerInterface.output;
}

protected forwardMiddleWare: Array<
MiddlewareFactory<Middleware<JsonRpcRequest<JSONValue>>>
> = [];
protected reverseMiddleware: Array<
MiddlewareFactory<Middleware<JsonRpcResponse<JSONValue>>>
> = [];

@ready(new rpcErrors.ErrorRpcDestroyed())
public registerForwardMiddleware(
middlewareFactory: MiddlewareFactory<Middleware<JsonRpcRequest<JSONValue>>>,
) {
this.forwardMiddleWare.push(middlewareFactory);
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public clearForwardMiddleware() {
this.reverseMiddleware = [];
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public registerReverseMiddleware(
middlewareFactory: MiddlewareFactory<
Middleware<JsonRpcResponse<JSONValue>>
>,
) {
this.reverseMiddleware.push(middlewareFactory);
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public clearReverseMiddleware() {
this.reverseMiddleware = [];
}
}

export default RPCClient;
12 changes: 6 additions & 6 deletions src/RPC/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import type { ConnectionInfo } from '../network/types';
import type { RPCErrorEvent } from './utils';
import type {
MiddlewareFactory,
MiddlewareForward,
MiddlewareReverse,
MiddlewareShort,
Middleware,
} from 'tokens/types';
import { ReadableStream } from 'stream/web';
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
Expand Down Expand Up @@ -303,17 +303,17 @@ class RPCServer {

protected forwardMiddleWare: Array<
MiddlewareFactory<
MiddlewareForward<JsonRpcRequest<JSONValue>, JsonRpcResponse<JSONValue>>
MiddlewareShort<JsonRpcRequest<JSONValue>, JsonRpcResponse<JSONValue>>
>
> = [];
protected reverseMiddleware: Array<
MiddlewareFactory<MiddlewareReverse<JsonRpcResponse<JSONValue>>>
MiddlewareFactory<Middleware<JsonRpcResponse<JSONValue>>>
> = [];

@ready(new rpcErrors.ErrorRpcDestroyed())
public registerForwardMiddleware(
middlewareFactory: MiddlewareFactory<
MiddlewareForward<JsonRpcRequest<JSONValue>, JsonRpcResponse<JSONValue>>
MiddlewareShort<JsonRpcRequest<JSONValue>, JsonRpcResponse<JSONValue>>
>,
) {
this.forwardMiddleWare.push(middlewareFactory);
Expand All @@ -327,7 +327,7 @@ class RPCServer {
@ready(new rpcErrors.ErrorRpcDestroyed())
public registerReverseMiddleware(
middlewareFactory: MiddlewareFactory<
MiddlewareReverse<JsonRpcResponse<JSONValue>>
Middleware<JsonRpcResponse<JSONValue>>
>,
) {
this.reverseMiddleware.push(middlewareFactory);
Expand Down
11 changes: 11 additions & 0 deletions src/RPC/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { JSONValue, POJO } from '../types';
import type { ConnectionInfo } from '../network/types';
import type { ContextCancellable } from '../contexts/types';
import type { ReadableWritablePair } from 'stream/web';
import type { ReadableStream } from 'stream/web';

/**
* This is the JSON RPC request object. this is the generic message type used for the RPC.
Expand Down Expand Up @@ -127,6 +128,13 @@ type StreamPairCreateCallback = () => Promise<
ReadableWritablePair<Uint8Array, Uint8Array>
>;

type MiddlewareShort<T, K> = (
input: ReadableStream<T>,
short: (value: K) => void,
) => ReadableStream<T>;
type Middleware<T> = (input: ReadableStream<T>) => ReadableStream<T>;
type MiddlewareFactory<T> = () => T;

export type {
JsonRpcRequestMessage,
JsonRpcRequestNotification,
Expand All @@ -141,4 +149,7 @@ export type {
ClientStreamHandler,
UnaryHandler,
StreamPairCreateCallback,
MiddlewareShort,
Middleware,
MiddlewareFactory,
};
25 changes: 12 additions & 13 deletions src/RPC/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type {
JsonRpcResponse,
} from 'RPC/types';
import type { JSONValue } from '../types';
import type { JsonValue } from 'fast-check';
import { TransformStream } from 'stream/web';
import { AbstractError } from '@matrixai/errors';
import * as rpcErrors from './errors';
Expand Down Expand Up @@ -431,27 +432,25 @@ function toError(errorData) {
}

class ClientInputTransformer<I extends JSONValue>
implements Transformer<I, JsonRpcRequestMessage>
implements Transformer<I, JsonRpcRequest<JsonValue>>
{
constructor(protected method: string) {}

transform: TransformerTransformCallback<I, JsonRpcRequestMessage<I>> = async (
chunk,
controller,
) => {
const message: JsonRpcRequestMessage<I> = {
method: this.method,
jsonrpc: '2.0',
id: null,
params: chunk,
transform: TransformerTransformCallback<I, JsonRpcRequest<JsonValue>> =
async (chunk, controller) => {
const message: JsonRpcRequest<JsonValue> = {
method: this.method,
jsonrpc: '2.0',
id: null,
params: chunk,
};
controller.enqueue(message);
};
controller.enqueue(message);
};
}

class ClientInputTransformerStream<I extends JSONValue> extends TransformStream<
I,
JsonRpcRequestMessage<I>
JsonRpcRequest<JSONValue>
> {
constructor(method: string) {
super(new ClientInputTransformer<I>(method));
Expand Down
11 changes: 0 additions & 11 deletions src/tokens/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { Opaque, JSONValue } from '../types';
import type { Signature, MAC } from '../keys/types';
import type { NodeIdEncoded } from '../ids/types';
import type { ReadableStream } from 'stream/web';

/**
* Token based on JWT specification.
Expand Down Expand Up @@ -119,13 +118,6 @@ type SignedTokenEncoded = {
signatures: Array<TokenHeaderSignatureEncoded>;
};

type MiddlewareForward<T, K> = (
input: ReadableStream<T>,
short: (value: K) => void,
) => ReadableStream<T>;
type MiddlewareReverse<T> = (input: ReadableStream<T>) => ReadableStream<T>;
type MiddlewareFactory<T> = () => T;

export type {
TokenPayload,
TokenPayloadEncoded,
Expand All @@ -140,7 +132,4 @@ export type {
SignedToken,
SignedTokenJSON,
SignedTokenEncoded,
MiddlewareForward,
MiddlewareReverse,
MiddlewareFactory,
};
125 changes: 124 additions & 1 deletion tests/RPC/RPCClient.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import type { ReadableWritablePair } from 'stream/web';
import type { JSONValue } from '@/types';
import type { JsonRpcRequestMessage } from '@/RPC/types';
import type {
JsonRpcRequest,
JsonRpcRequestMessage,
JsonRpcResponse,
} from '@/RPC/types';
import { TransformStream } from 'stream/web';
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
import { testProp, fc } from '@fast-check/jest';
import RPCClient from '@/RPC/RPCClient';
Expand Down Expand Up @@ -320,4 +325,122 @@ describe(`${RPCClient.name}`, () => {
await rpcClient.destroy();
},
);
testProp(
'generic duplex caller with forward Middleware',
[specificMessageArb],
async (messages) => {
const inputStream = rpcTestUtils.jsonRpcStream(messages);
const [outputResult, outputStream] =
rpcTestUtils.streamToArray<Uint8Array>();
const streamPair: ReadableWritablePair = {
readable: inputStream,
writable: outputStream,
};
const rpcClient = await RPCClient.createRPCClient({
streamPairCreateCallback: async () => streamPair,
logger,
});

rpcClient.registerForwardMiddleware(() => {
return (input) =>
input.pipeThrough(
new TransformStream<
JsonRpcRequest<JSONValue>,
JsonRpcRequest<JSONValue>
>({
transform: (chunk, controller) => {
controller.enqueue({
...chunk,
params: 'one',
});
},
}),
);
});
const callerInterface = await rpcClient.duplexStreamCaller<
JSONValue,
JSONValue
>(methodName, { hello: 'world' });
const reader = callerInterface.readable.getReader();
const writer = callerInterface.writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
// We have to end the writer otherwise the stream never closes
await writer.close();
break;
}
await writer.write(value);
}

const expectedMessages: Array<JsonRpcRequestMessage> = messages.map(
() => {
const request: JsonRpcRequestMessage = {
jsonrpc: '2.0',
method: methodName,
id: null,
params: 'one',
};
return request;
},
);
const outputMessages = (await outputResult).map((v) =>
JSON.parse(v.toString()),
);
expect(outputMessages).toStrictEqual(expectedMessages);
await rpcClient.destroy();
},
);
testProp.only(
'generic duplex caller with reverse Middleware',
[specificMessageArb],
async (messages) => {
const inputStream = rpcTestUtils.jsonRpcStream(messages);
const [outputResult, outputStream] =
rpcTestUtils.streamToArray<Uint8Array>();
const streamPair: ReadableWritablePair = {
readable: inputStream,
writable: outputStream,
};
const rpcClient = await RPCClient.createRPCClient({
streamPairCreateCallback: async () => streamPair,
logger,
});

rpcClient.registerReverseMiddleware(() => {
return (input) =>
input.pipeThrough(
new TransformStream<
JsonRpcResponse<JSONValue>,
JsonRpcResponse<JSONValue>
>({
transform: (chunk, controller) => {
controller.enqueue({
...chunk,
result: 'one',
});
},
}),
);
});
const callerInterface = await rpcClient.duplexStreamCaller<
JSONValue,
JSONValue
>(methodName, { hello: 'world' });
const reader = callerInterface.readable.getReader();
const writer = callerInterface.writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
// We have to end the writer otherwise the stream never closes
await writer.close();
break;
}
expect(value).toBe('one');
await writer.write(value);
}
await outputResult;
await rpcClient.destroy();
},
);
});
Loading

0 comments on commit 14394d4

Please sign in to comment.