Skip to content

Commit

Permalink
wip: refactoring middleware
Browse files Browse the repository at this point in the history
Related #500
Related #501
Related #502

[ci skip]
  • Loading branch information
tegefaulkes committed Jan 30, 2023
1 parent d5b1263 commit b9d1d2a
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 79 deletions.
74 changes: 31 additions & 43 deletions src/RPC/RPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import type {
JsonRpcRequest,
JsonRpcResponse,
MiddlewareFactory,
Middleware,
} from './types';
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
import Logger from '@matrixai/logger';
Expand Down Expand Up @@ -55,34 +54,37 @@ class RPCClient {
method: string,
_metadata: POJO,
): Promise<ReadableWritablePair<O, I>> {
const streamPair = await this.streamPairCreateCallback();
let reverseMiddlewareStream = streamPair.readable.pipeThrough(
new rpcUtils.JsonToJsonMessageStream(rpcUtils.parseJsonRpcResponse),
);
for (const middleWare of this.reverseMiddleware) {
const middle = middleWare();
reverseMiddlewareStream = middle(reverseMiddlewareStream);
}
const outputStream = reverseMiddlewareStream.pipeThrough(
new rpcUtils.ClientOutputTransformerStream<O>(),
);
const inputMessageTransformer =
// Creating caller side transforms
const outputMessageTransforStream =
new rpcUtils.ClientOutputTransformerStream<O>();
const inputMessageTransformStream =
new rpcUtils.ClientInputTransformerStream<I>(method);
let forwardMiddlewareStream = inputMessageTransformer.readable;
for (const middleware of this.forwardMiddleWare) {
const middle = middleware();
forwardMiddlewareStream = middle(forwardMiddlewareStream);
let reverseStream = outputMessageTransforStream.writable;
let forwardStream = inputMessageTransformStream.readable;
// Setting up middleware chains
for (const middlewareFactory of this.middleware) {
const middleware = middlewareFactory();
forwardStream = forwardStream.pipeThrough(middleware.forward);
void middleware.reverse.readable.pipeTo(reverseStream).catch(() => {});
reverseStream = middleware.reverse.writable;
}
void forwardMiddlewareStream
// Hooking up agnostic stream side
const streamPair = await this.streamPairCreateCallback();
void streamPair.readable
.pipeThrough(
new rpcUtils.JsonToJsonMessageStream(rpcUtils.parseJsonRpcResponse),
)
.pipeTo(reverseStream)
.catch(() => {});
void forwardStream
.pipeThrough(new rpcUtils.JsonMessageToJsonStream())
.pipeTo(streamPair.writable)
.catch(() => {});
const inputStream = inputMessageTransformer.writable;

// Returning interface
return {
readable: outputStream,
writable: inputStream,
readable: outputMessageTransforStream.readable,
writable: inputMessageTransformStream.writable,
};
}

Expand Down Expand Up @@ -205,37 +207,23 @@ class RPCClient {
return callerInterface.output;
}

protected forwardMiddleWare: Array<
MiddlewareFactory<Middleware<JsonRpcRequest<JSONValue>>>
> = [];
protected reverseMiddleware: Array<
MiddlewareFactory<Middleware<JsonRpcResponse<JSONValue>>>
protected middleware: Array<
MiddlewareFactory<JsonRpcRequest<JSONValue>, JsonRpcResponse<JSONValue>>
> = [];

@ready(new rpcErrors.ErrorRpcDestroyed())
public registerForwardMiddleware(
middlewareFactory: MiddlewareFactory<Middleware<JsonRpcRequest<JSONValue>>>,
) {
this.forwardMiddleWare.push(middlewareFactory);
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public clearForwardMiddleware() {
this.reverseMiddleware = [];
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public registerReverseMiddleware(
public registerMiddleware(
middlewareFactory: MiddlewareFactory<
Middleware<JsonRpcResponse<JSONValue>>
JsonRpcRequest<JSONValue>,
JsonRpcResponse<JSONValue>
>,
) {
this.reverseMiddleware.push(middlewareFactory);
this.middleware.push(middlewareFactory);
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public clearReverseMiddleware() {
this.reverseMiddleware = [];
public clearMiddleware() {
this.middleware = [];
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/RPC/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class RPCServer {
void handlerProm
.finally(() => this.activeStreams.delete(handlerProm))
.catch(() => {});
// Setting up forward middleware
// Setting up middleware
let forwardStream = streamPair.readable.pipeThrough(
new rpcUtils.JsonToJsonMessageStream(rpcUtils.parseJsonRpcRequest),
);
Expand All @@ -163,7 +163,6 @@ class RPCServer {
.pipeTo(streamPair.writable)
.catch(() => {});
let reverseStream = outputTransformStream.writable;

for (const middlewareFactory of this.middleware) {
const middleware = middlewareFactory();
forwardStream = forwardStream.pipeThrough(middleware.forward);
Expand Down
4 changes: 1 addition & 3 deletions src/RPC/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,10 @@ type StreamPairCreateCallback = () => Promise<
ReadableWritablePair<Uint8Array, Uint8Array>
>;

type Middleware<F, R> = {
type MiddlewareFactory<F, R> = () => {
forward: ReadableWritablePair<F, F>;
reverse: ReadableWritablePair<R, R>;
};
type MiddlewareFactory<F, R> = () => Middleware<F, R>;

export type {
JsonRpcRequestMessage,
Expand All @@ -147,6 +146,5 @@ export type {
ClientStreamHandler,
UnaryHandler,
StreamPairCreateCallback,
Middleware,
MiddlewareFactory,
};
62 changes: 31 additions & 31 deletions tests/RPC/RPCClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -341,21 +341,21 @@ describe(`${RPCClient.name}`, () => {
logger,
});

rpcClient.registerForwardMiddleware(() => {
return (input) =>
input.pipeThrough(
new TransformStream<
JsonRpcRequest<JSONValue>,
JsonRpcRequest<JSONValue>
>({
transform: (chunk, controller) => {
controller.enqueue({
...chunk,
params: 'one',
});
},
}),
);
rpcClient.registerMiddleware(() => {
return {
forward: new TransformStream<
JsonRpcRequest<JSONValue>,
JsonRpcRequest<JSONValue>
>({
transform: (chunk, controller) => {
controller.enqueue({
...chunk,
params: 'one',
});
},
}),
reverse: new TransformStream(),
};
});
const callerInterface = await rpcClient.duplexStreamCaller<
JSONValue,
Expand Down Expand Up @@ -391,7 +391,7 @@ describe(`${RPCClient.name}`, () => {
await rpcClient.destroy();
},
);
testProp.only(
testProp(
'generic duplex caller with reverse Middleware',
[specificMessageArb],
async (messages) => {
Expand All @@ -407,21 +407,21 @@ describe(`${RPCClient.name}`, () => {
logger,
});

rpcClient.registerReverseMiddleware(() => {
return (input) =>
input.pipeThrough(
new TransformStream<
JsonRpcResponse<JSONValue>,
JsonRpcResponse<JSONValue>
>({
transform: (chunk, controller) => {
controller.enqueue({
...chunk,
result: 'one',
});
},
}),
);
rpcClient.registerMiddleware(() => {
return {
forward: new TransformStream(),
reverse: new TransformStream<
JsonRpcResponse<JSONValue>,
JsonRpcResponse<JSONValue>
>({
transform: (chunk, controller) => {
controller.enqueue({
...chunk,
result: 'one',
});
},
}),
};
});
const callerInterface = await rpcClient.duplexStreamCaller<
JSONValue,
Expand Down

0 comments on commit b9d1d2a

Please sign in to comment.