Skip to content

Commit

Permalink
wip: fixing up message types and parsing
Browse files Browse the repository at this point in the history
There is now a reasonably enforced hierarchy of `message` => `request` | `response` => `requestMessage` | `requestNotification` | `responseResult` | `responseError`.

Related #500
Related #501

[ci skip]
  • Loading branch information
tegefaulkes committed Jan 19, 2023
1 parent 4be338a commit 1e89eba
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 253 deletions.
17 changes: 8 additions & 9 deletions src/RPC/RPCClient.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type {
ClientCallerInterface,
DuplexCallerInterface,
JsonRpcRequest,
JsonRpcRequestMessage,
ServerCallerInterface,
StreamPairCreateCallback,
} from './types';
Expand Down Expand Up @@ -64,7 +64,7 @@ class RPCClient {
): Promise<DuplexCallerInterface<I, O>> {
const streamPair = await this.streamPairCreateCallback();
const inputStream = streamPair.readable.pipeThrough(
new rpcUtils.JsonToJsonMessageStream(),
new rpcUtils.JsonToJsonMessageStream(rpcUtils.parseJsonRpcResponse),
);
const outputTransform = new rpcUtils.JsonMessageToJsonStream();
void outputTransform.readable.pipeTo(streamPair.writable);
Expand All @@ -75,9 +75,8 @@ class RPCClient {
try {
while (true) {
value = yield;
const message: JsonRpcRequest<I> = {
const message: JsonRpcRequestMessage<I> = {
method,
type: 'JsonRpcRequest',
jsonrpc: '2.0',
id: null,
params: value,
Expand All @@ -96,12 +95,12 @@ class RPCClient {
while (true) {
const { value, done } = await reader.read();
if (done) break;
if (
value?.type === 'JsonRpcRequest' ||
value?.type === 'JsonRpcNotification'
) {
yield value.params as O;
if ('error' in value) {
throw Error('TMP message was an error message', {
cause: value.error,
});
}
yield value.result as O;
}
};
const output = outputGen();
Expand Down
17 changes: 3 additions & 14 deletions src/RPC/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import Logger from '@matrixai/logger';
import { PromiseCancellable } from '@matrixai/async-cancellable';
import * as rpcErrors from './errors';
import * as rpcUtils from './utils';
import * as grpcUtils from '../grpc/utils';

interface RPCServer extends CreateDestroy {}
@CreateDestroy()
Expand Down Expand Up @@ -151,18 +150,10 @@ class RPCServer {
// a generator.
const inputGen = async function* () {
const pojoStream = streamPair.readable.pipeThrough(
new rpcUtils.JsonToJsonMessageStream(),
new rpcUtils.JsonToJsonMessageStream(rpcUtils.parseJsonRpcRequest),
);
for await (const dataMessage of pojoStream) {
// FIXME: don't bother filtering, we should assume all input messages are request or notification.
// These should be checked by parsing, no need for a type field.
// Filtering for request and notification messages
if (
dataMessage.type === 'JsonRpcRequest' ||
dataMessage.type === 'JsonRpcNotification'
) {
yield dataMessage;
}
yield dataMessage;
}
};
const container = this.container;
Expand Down Expand Up @@ -203,7 +194,6 @@ class RPCServer {
ctx,
)) {
const responseMessage: JsonRpcResponseResult<JSONValue> = {
type: 'JsonRpcResponseResult',
jsonrpc: '2.0',
result: response,
id: null,
Expand All @@ -216,10 +206,9 @@ class RPCServer {
const rpcError: JsonRpcError = {
code: e.exitCode,
message: e.description,
data: grpcUtils.fromError(e),
data: rpcUtils.fromError(e),
};
const rpcErrorMessage: JsonRpcResponseError = {
type: 'JsonRpcResponseError',
jsonrpc: '2.0',
error: rpcError,
id: null,
Expand Down
36 changes: 17 additions & 19 deletions src/RPC/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import type { ReadableWritablePair } from 'stream/web';
/**
* This is the JSON RPC request object. this is the generic message type used for the RPC.
*/
type JsonRpcRequest<T extends JSONValue | unknown = unknown> = {
type: 'JsonRpcRequest';
type JsonRpcRequestMessage<T extends JSONValue | unknown = unknown> = {
// A String specifying the version of the JSON-RPC protocol. MUST be exactly "2.0"
jsonrpc: '2.0';
// A String containing the name of the method to be invoked. Method names that begin with the word rpc followed by a
Expand All @@ -23,8 +22,7 @@ type JsonRpcRequest<T extends JSONValue | unknown = unknown> = {
id: string | number | null;
};

type JsonRpcNotification<T extends JSONValue | unknown = unknown> = {
type: 'JsonRpcNotification';
type JsonRpcRequestNotification<T extends JSONValue | unknown = unknown> = {
// A String specifying the version of the JSON-RPC protocol. MUST be exactly "2.0"
jsonrpc: '2.0';
// A String containing the name of the method to be invoked. Method names that begin with the word rpc followed by a
Expand All @@ -37,7 +35,6 @@ type JsonRpcNotification<T extends JSONValue | unknown = unknown> = {
};

type JsonRpcResponseResult<T extends JSONValue | unknown = unknown> = {
type: 'JsonRpcResponseResult';
// A String specifying the version of the JSON-RPC protocol. MUST be exactly "2.0".
jsonrpc: '2.0';
// This member is REQUIRED on success.
Expand All @@ -51,14 +48,13 @@ type JsonRpcResponseResult<T extends JSONValue | unknown = unknown> = {
id: string | number | null;
};

type JsonRpcResponseError<T extends JSONValue | unknown = unknown> = {
type: 'JsonRpcResponseError';
type JsonRpcResponseError = {
// A String specifying the version of the JSON-RPC protocol. MUST be exactly "2.0".
jsonrpc: '2.0';
// This member is REQUIRED on error.
// This member MUST NOT exist if there was no error triggered during invocation.
// The value for this member MUST be an Object as defined in section 5.1.
error: JsonRpcError<T>;
error: JsonRpcError;
// This member is REQUIRED.
// It MUST be the same as the value of the id member in the Request Object.
// If there was an error in detecting the id in the Request object (e.g. Parse error/Invalid Request),
Expand All @@ -78,7 +74,7 @@ type JsonRpcResponseError<T extends JSONValue | unknown = unknown> = {
// -32603 Internal error Internal JSON-RPC error.
// -32000 to -32099

type JsonRpcError<T extends JSONValue | unknown = unknown> = {
type JsonRpcError = {
// A Number that indicates the error type that occurred.
// This MUST be an integer.
code: number;
Expand All @@ -88,19 +84,20 @@ type JsonRpcError<T extends JSONValue | unknown = unknown> = {
// A Primitive or Structured value that contains additional information about the error.
// This may be omitted.
// The value of this member is defined by the Server (e.g. detailed error information, nested errors etc.).
data?: T;
data?: JSONValue;
};

type JsonRpcResponse<
T extends JSONValue | unknown = unknown,
K extends JSONValue | unknown = unknown,
> = JsonRpcResponseResult<T> | JsonRpcResponseError<K>;
type JsonRpcRequest<T extends JSONValue | unknown = unknown> =
| JsonRpcRequestMessage<T>
| JsonRpcRequestNotification<T>;

type JsonRpcResponse<T extends JSONValue | unknown = unknown> =
| JsonRpcResponseResult<T>
| JsonRpcResponseError;

type JsonRpcMessage<T extends JSONValue | unknown = unknown> =
| JsonRpcRequest<T>
| JsonRpcNotification<T>
| JsonRpcResponseResult<T>
| JsonRpcResponseError<T>;
| JsonRpcResponse<T>;

// Handler types
type Handler<I, O> = (
Expand Down Expand Up @@ -165,11 +162,12 @@ type StreamPairCreateCallback = () => Promise<
>;

export type {
JsonRpcRequest,
JsonRpcNotification,
JsonRpcRequestMessage,
JsonRpcRequestNotification,
JsonRpcResponseResult,
JsonRpcResponseError,
JsonRpcError,
JsonRpcRequest,
JsonRpcResponse,
JsonRpcMessage,
DuplexStreamHandler,
Expand Down
Loading

0 comments on commit 1e89eba

Please sign in to comment.