diff --git a/src/RPC/RPCServer.ts b/src/RPC/RPCServer.ts index 5d83d3134..b374de488 100644 --- a/src/RPC/RPCServer.ts +++ b/src/RPC/RPCServer.ts @@ -11,6 +11,7 @@ import type { ReadableWritablePair } from 'stream/web'; import type { JSONValue, POJO } from '../types'; import type { ConnectionInfo } from '../network/types'; import type { UnaryHandler } from './types'; +import type { RPCErrorEvent } from './utils'; import { ReadableStream } from 'stream/web'; import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy'; import Logger from '@matrixai/logger'; @@ -42,8 +43,8 @@ class RPCServer { protected logger: Logger; protected handlerMap: Map> = new Map(); - private activeStreams: Set> = new Set(); - private events: EventTarget = new EventTarget(); + protected activeStreams: Set> = new Set(); + protected events: EventTarget = new EventTarget(); public constructor({ container, @@ -164,6 +165,7 @@ class RPCServer { const container = this.container; const handlerMap = this.handlerMap; const ctx = { signal: abortController.signal }; + const events = this.events; const outputGen = async function* (): AsyncGenerator { // Step 1, authentication and establishment // read the first message, lets assume the first message is always leading @@ -207,20 +209,29 @@ class RPCServer { yield responseMessage; } } catch (e) { - // This would be an error from the handler or the streams. We should - // catch this and send an error message back through the stream. - const rpcError: JsonRpcError = { - code: e.exitCode, - message: e.description, - data: rpcUtils.fromError(e), - }; - const rpcErrorMessage: JsonRpcResponseError = { - jsonrpc: '2.0', - error: rpcError, - id: null, - }; - // TODO: catch this and emit error in the event emitter - yield rpcErrorMessage; + if (rpcUtils.isReturnableError(e)) { + // We want to convert this error to an error message and pass it along + const rpcError: JsonRpcError = { + code: e.exitCode, + message: e.description, + data: rpcUtils.fromError(e), + }; + const rpcErrorMessage: JsonRpcResponseError = { + jsonrpc: '2.0', + error: rpcError, + id: null, + }; + yield rpcErrorMessage; + } else { + // These errors are emitted to the event system + events.dispatchEvent( + new rpcUtils.RPCErrorEvent({ + detail: { + error: e, + }, + }), + ); + } } resolve(); }; @@ -245,6 +256,22 @@ class RPCServer { .pipeTo(streamPair.writable) .catch(() => {}); } + + public addEventListener( + type: 'error', + callback: (event: RPCErrorEvent) => void, + options?: boolean | AddEventListenerOptions | undefined, + ) { + this.events.addEventListener(type, callback, options); + } + + public removeEventListener( + type: 'error', + callback: (event: RPCErrorEvent) => void, + options?: boolean | AddEventListenerOptions | undefined, + ) { + this.events.removeEventListener(type, callback, options); + } } export default RPCServer; diff --git a/src/RPC/errors.ts b/src/RPC/errors.ts index e47722205..a5ba14ae9 100644 --- a/src/RPC/errors.ts +++ b/src/RPC/errors.ts @@ -47,6 +47,11 @@ class ErrorRpcRemoteError extends ErrorRpc { exitCode = sysexits.UNAVAILABLE; } +class ErrorRpcPlaceholderConnectionError extends ErrorRpc { + static description = 'placeholder error for connection stream failure'; + exitCode = sysexits.UNAVAILABLE; +} + export { ErrorRpc, ErrorRpcRunning, @@ -58,4 +63,5 @@ export { ErrorRpcProtocal, ErrorRpcMessageLength, ErrorRpcRemoteError, + ErrorRpcPlaceholderConnectionError, }; diff --git a/src/RPC/utils.ts b/src/RPC/utils.ts index 3c6acdcd6..8e5ef2df1 100644 --- a/src/RPC/utils.ts +++ b/src/RPC/utils.ts @@ -475,6 +475,28 @@ class ClientOutputTransformerStream< } } +function isReturnableError(e: Error): boolean { + if (e instanceof rpcErrors.ErrorRpcPlaceholderConnectionError) return false; + return true; +} + +class RPCErrorEvent extends Event { + public detail: { + error: any; + }; + + constructor( + options: EventInit & { + detail: { + error: any; + }; + }, + ) { + super('error', options); + this.detail = options.detail; + } +} + export { JsonToJsonMessageStream, JsonMessageToJsonStream, @@ -489,4 +511,6 @@ export { toError, ClientInputTransformerStream, ClientOutputTransformerStream, + isReturnableError, + RPCErrorEvent, }; diff --git a/tests/RPC/RPCServer.test.ts b/tests/RPC/RPCServer.test.ts index 86e224afa..4481d5cc6 100644 --- a/tests/RPC/RPCServer.test.ts +++ b/tests/RPC/RPCServer.test.ts @@ -12,6 +12,7 @@ import type { ReadableWritablePair } from 'stream/web'; import { testProp, fc } from '@fast-check/jest'; import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; import RPCServer from '@/RPC/RPCServer'; +import * as rpcErrors from '@/RPC/errors'; import * as rpcTestUtils from './utils'; describe(`${RPCServer.name}`, () => { @@ -289,6 +290,85 @@ describe(`${RPCServer.name}`, () => { await rpcServer.destroy(); }); + const errorArb = fc.oneof( + fc.constant(new rpcErrors.ErrorRpcParse()), + fc.constant(new rpcErrors.ErrorRpcHandlerMissing()), + fc.constant(new rpcErrors.ErrorRpcProtocal()), + fc.constant(new rpcErrors.ErrorRpcMessageLength()), + fc.constant(new rpcErrors.ErrorRpcRemoteError()), + ); + testProp( + 'should send error message', + [specificMessageArb, errorArb], + async (messages, error) => { + const stream = rpcTestUtils.jsonRpcStream(messages); + const container = {}; + const rpcServer = await RPCServer.createRPCServer({ container, logger }); + let resolve, reject; + const errorProm = new Promise((resolve_, reject_) => { + resolve = resolve_; + reject = reject_; + }); + rpcServer.addEventListener('error', (thing) => { + resolve(thing); + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: ReadableWritablePair = { + readable: stream, + writable: outputStream, + }; + + const duplexHandler: DuplexStreamHandler = + async function* (_input, _container, _connectionInfo, _ctx) { + throw error; + }; + + rpcServer.registerDuplexStreamHandler(methodName, duplexHandler); + rpcServer.handleStream(readWriteStream, {} as ConnectionInfo); + const errorMessage = JSON.parse((await outputResult)[0]!.toString()); + expect(errorMessage.error.code).toEqual(error.exitCode); + expect(errorMessage.error.message).toEqual(error.description); + reject(); + await expect(errorProm).toReject(); + await rpcServer.destroy(); + }, + ); + testProp( + 'should emit stream error', + [specificMessageArb], + async (messages) => { + const stream = rpcTestUtils.jsonRpcStream(messages); + const container = {}; + const rpcServer = await RPCServer.createRPCServer({ container, logger }); + let resolve, reject; + const errorProm = new Promise((resolve_, reject_) => { + resolve = resolve_; + reject = reject_; + }); + rpcServer.addEventListener('error', (thing) => { + resolve(thing); + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: ReadableWritablePair = { + readable: stream, + writable: outputStream, + }; + + const duplexHandler: DuplexStreamHandler = + async function* (_input, _container, _connectionInfo, _ctx) { + throw new rpcErrors.ErrorRpcPlaceholderConnectionError(); + }; + + rpcServer.registerDuplexStreamHandler(methodName, duplexHandler); + rpcServer.handleStream(readWriteStream, {} as ConnectionInfo); + await outputResult; + + await rpcServer.destroy(); + reject(); + await expect(errorProm).toResolve(); + }, + ); + // TODO: // - Test odd conditions for handlers, like extra messages where 1 is expected. // - Expectations can't be inside the handlers otherwise they're caught.