Skip to content

Commit

Permalink
wip: fleshing out error handling
Browse files Browse the repository at this point in the history
Related #500

[ci skip]
  • Loading branch information
tegefaulkes committed Jan 23, 2023
1 parent 3197764 commit 3821392
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 16 deletions.
59 changes: 43 additions & 16 deletions src/RPC/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { ReadableWritablePair } from 'stream/web';
import type { JSONValue, POJO } from '../types';
import type { ConnectionInfo } from '../network/types';
import type { UnaryHandler } from './types';
import type { RPCErrorEvent } from './utils';
import { ReadableStream } from 'stream/web';
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
import Logger from '@matrixai/logger';
Expand Down Expand Up @@ -42,8 +43,8 @@ class RPCServer {
protected logger: Logger;
protected handlerMap: Map<string, DuplexStreamHandler<JSONValue, JSONValue>> =
new Map();
private activeStreams: Set<PromiseCancellable<void>> = new Set();
private events: EventTarget = new EventTarget();
protected activeStreams: Set<PromiseCancellable<void>> = new Set();
protected events: EventTarget = new EventTarget();

public constructor({
container,
Expand Down Expand Up @@ -164,6 +165,7 @@ class RPCServer {
const container = this.container;
const handlerMap = this.handlerMap;
const ctx = { signal: abortController.signal };
const events = this.events;
const outputGen = async function* (): AsyncGenerator<JsonRpcMessage> {
// Step 1, authentication and establishment
// read the first message, lets assume the first message is always leading
Expand Down Expand Up @@ -207,20 +209,29 @@ class RPCServer {
yield responseMessage;
}
} catch (e) {
// This would be an error from the handler or the streams. We should
// catch this and send an error message back through the stream.
const rpcError: JsonRpcError = {
code: e.exitCode,
message: e.description,
data: rpcUtils.fromError(e),
};
const rpcErrorMessage: JsonRpcResponseError = {
jsonrpc: '2.0',
error: rpcError,
id: null,
};
// TODO: catch this and emit error in the event emitter
yield rpcErrorMessage;
if (rpcUtils.isReturnableError(e)) {
// We want to convert this error to an error message and pass it along
const rpcError: JsonRpcError = {
code: e.exitCode,
message: e.description,
data: rpcUtils.fromError(e),
};
const rpcErrorMessage: JsonRpcResponseError = {
jsonrpc: '2.0',
error: rpcError,
id: null,
};
yield rpcErrorMessage;
} else {
// These errors are emitted to the event system
events.dispatchEvent(
new rpcUtils.RPCErrorEvent({
detail: {
error: e,
},
}),
);
}
}
resolve();
};
Expand All @@ -245,6 +256,22 @@ class RPCServer {
.pipeTo(streamPair.writable)
.catch(() => {});
}

public addEventListener(
type: 'error',
callback: (event: RPCErrorEvent) => void,
options?: boolean | AddEventListenerOptions | undefined,
) {
this.events.addEventListener(type, callback, options);
}

public removeEventListener(
type: 'error',
callback: (event: RPCErrorEvent) => void,
options?: boolean | AddEventListenerOptions | undefined,
) {
this.events.removeEventListener(type, callback, options);
}
}

export default RPCServer;
6 changes: 6 additions & 0 deletions src/RPC/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ class ErrorRpcRemoteError<T> extends ErrorRpc<T> {
exitCode = sysexits.UNAVAILABLE;
}

class ErrorRpcPlaceholderConnectionError<T> extends ErrorRpc<T> {
static description = 'placeholder error for connection stream failure';
exitCode = sysexits.UNAVAILABLE;
}

export {
ErrorRpc,
ErrorRpcRunning,
Expand All @@ -58,4 +63,5 @@ export {
ErrorRpcProtocal,
ErrorRpcMessageLength,
ErrorRpcRemoteError,
ErrorRpcPlaceholderConnectionError,
};
24 changes: 24 additions & 0 deletions src/RPC/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,28 @@ class ClientOutputTransformerStream<
}
}

function isReturnableError(e: Error): boolean {
if (e instanceof rpcErrors.ErrorRpcPlaceholderConnectionError) return false;
return true;
}

class RPCErrorEvent extends Event {
public detail: {
error: any;
};

constructor(
options: EventInit & {
detail: {
error: any;
};
},
) {
super('error', options);
this.detail = options.detail;
}
}

export {
JsonToJsonMessageStream,
JsonMessageToJsonStream,
Expand All @@ -489,4 +511,6 @@ export {
toError,
ClientInputTransformerStream,
ClientOutputTransformerStream,
isReturnableError,
RPCErrorEvent,
};
80 changes: 80 additions & 0 deletions tests/RPC/RPCServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type { ReadableWritablePair } from 'stream/web';
import { testProp, fc } from '@fast-check/jest';
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
import RPCServer from '@/RPC/RPCServer';
import * as rpcErrors from '@/RPC/errors';
import * as rpcTestUtils from './utils';

describe(`${RPCServer.name}`, () => {
Expand Down Expand Up @@ -289,6 +290,85 @@ describe(`${RPCServer.name}`, () => {
await rpcServer.destroy();
});

const errorArb = fc.oneof(
fc.constant(new rpcErrors.ErrorRpcParse()),
fc.constant(new rpcErrors.ErrorRpcHandlerMissing()),
fc.constant(new rpcErrors.ErrorRpcProtocal()),
fc.constant(new rpcErrors.ErrorRpcMessageLength()),
fc.constant(new rpcErrors.ErrorRpcRemoteError()),
);
testProp(
'should send error message',
[specificMessageArb, errorArb],
async (messages, error) => {
const stream = rpcTestUtils.jsonRpcStream(messages);
const container = {};
const rpcServer = await RPCServer.createRPCServer({ container, logger });
let resolve, reject;
const errorProm = new Promise((resolve_, reject_) => {
resolve = resolve_;
reject = reject_;
});
rpcServer.addEventListener('error', (thing) => {
resolve(thing);
});
const [outputResult, outputStream] = rpcTestUtils.streamToArray();
const readWriteStream: ReadableWritablePair = {
readable: stream,
writable: outputStream,
};

const duplexHandler: DuplexStreamHandler<JSONValue, JSONValue> =
async function* (_input, _container, _connectionInfo, _ctx) {
throw error;
};

rpcServer.registerDuplexStreamHandler(methodName, duplexHandler);
rpcServer.handleStream(readWriteStream, {} as ConnectionInfo);
const errorMessage = JSON.parse((await outputResult)[0]!.toString());
expect(errorMessage.error.code).toEqual(error.exitCode);
expect(errorMessage.error.message).toEqual(error.description);
reject();
await expect(errorProm).toReject();
await rpcServer.destroy();
},
);
testProp(
'should emit stream error',
[specificMessageArb],
async (messages) => {
const stream = rpcTestUtils.jsonRpcStream(messages);
const container = {};
const rpcServer = await RPCServer.createRPCServer({ container, logger });
let resolve, reject;
const errorProm = new Promise((resolve_, reject_) => {
resolve = resolve_;
reject = reject_;
});
rpcServer.addEventListener('error', (thing) => {
resolve(thing);
});
const [outputResult, outputStream] = rpcTestUtils.streamToArray();
const readWriteStream: ReadableWritablePair = {
readable: stream,
writable: outputStream,
};

const duplexHandler: DuplexStreamHandler<JSONValue, JSONValue> =
async function* (_input, _container, _connectionInfo, _ctx) {
throw new rpcErrors.ErrorRpcPlaceholderConnectionError();
};

rpcServer.registerDuplexStreamHandler(methodName, duplexHandler);
rpcServer.handleStream(readWriteStream, {} as ConnectionInfo);
await outputResult;

await rpcServer.destroy();
reject();
await expect(errorProm).toResolve();
},
);

// TODO:
// - Test odd conditions for handlers, like extra messages where 1 is expected.
// - Expectations can't be inside the handlers otherwise they're caught.
Expand Down

0 comments on commit 3821392

Please sign in to comment.