From 4e29479764738327ad75f00c5ef0848f63fabdd9 Mon Sep 17 00:00:00 2001 From: Aditya <38064122+bettercallav@users.noreply.github.com> Date: Fri, 27 Oct 2023 12:55:41 +1100 Subject: [PATCH] feat: negative `timeoutTime` paramaters will now throw errors chore: add tests to test negative timeout values. fix: 0 `timeoutTime` value no longer throws in `RPCClient` fix: `RPCServer.start` now throws when passed a handler with a negative `timeout` [ci-skip] --- src/RPCClient.ts | 3 ++ src/RPCServer.ts | 115 ++++++++++++++++++++++------------------ src/errors.ts | 10 ++++ tests/RPCClient.test.ts | 22 ++++++++ tests/RPCServer.test.ts | 112 ++++++++++++++++++++++++++++---------- 5 files changed, 184 insertions(+), 78 deletions(-) diff --git a/src/RPCClient.ts b/src/RPCClient.ts index f22d6a2..7c412fb 100644 --- a/src/RPCClient.ts +++ b/src/RPCClient.ts @@ -102,6 +102,9 @@ class RPCClient { idGen?: IdGen; toError?: ToError; }) { + if (timeoutTime < 0) { + throw new errors.ErrorRPCInvalidTimeout(); + } this.idGen = idGen; this.callerTypes = utils.getHandlerTypes(manifest); this.streamFactory = streamFactory; diff --git a/src/RPCServer.ts b/src/RPCServer.ts index 49769c3..1a98294 100644 --- a/src/RPCServer.ts +++ b/src/RPCServer.ts @@ -106,6 +106,9 @@ class RPCServer { fromError?: FromError; replacer?: (key: string, value: any) => any; }) { + if (timeoutTime < 0) { + throw new errors.ErrorRPCInvalidTimeout(); + } this.idGen = idGen; this.middlewareFactory = middlewareFactory; this.timeoutTime = timeoutTime; @@ -126,58 +129,68 @@ class RPCServer { manifest: ServerManifest; }): Promise { this.logger.info(`Start ${this.constructor.name}`); - for (const [key, manifestItem] of Object.entries(manifest)) { - if (manifestItem instanceof RawHandler) { - this.registerRawStreamHandler( - key, - manifestItem.handle, - manifestItem.timeout, - ); - continue; - } - if (manifestItem instanceof DuplexHandler) { - this.registerDuplexStreamHandler( - key, - // Bind the `this` to the generator handler to make the container available - manifestItem.handle.bind(manifestItem), - manifestItem.timeout, - ); - continue; - } - if (manifestItem instanceof ServerHandler) { - this.registerServerStreamHandler( - key, - // Bind the `this` to the generator handler to make the container available - manifestItem.handle.bind(manifestItem), - manifestItem.timeout, - ); - continue; - } - if (manifestItem instanceof ClientHandler) { - this.registerClientStreamHandler( - key, - manifestItem.handle, - manifestItem.timeout, - ); - continue; - } - if (manifestItem instanceof ClientHandler) { - this.registerClientStreamHandler( - key, - manifestItem.handle, - manifestItem.timeout, - ); - continue; - } - if (manifestItem instanceof UnaryHandler) { - this.registerUnaryHandler( - key, - manifestItem.handle, - manifestItem.timeout, - ); - continue; + try { + for (const [key, manifestItem] of Object.entries(manifest)) { + if (manifestItem.timeout != null && manifestItem.timeout < 0) { + throw new errors.ErrorRPCInvalidHandlerTimeout(); + } + if (manifestItem instanceof RawHandler) { + this.registerRawStreamHandler( + key, + manifestItem.handle, + manifestItem.timeout, + ); + continue; + } + if (manifestItem instanceof DuplexHandler) { + this.registerDuplexStreamHandler( + key, + // Bind the `this` to the generator handler to make the container available + manifestItem.handle.bind(manifestItem), + manifestItem.timeout, + ); + continue; + } + if (manifestItem instanceof ServerHandler) { + this.registerServerStreamHandler( + key, + // Bind the `this` to the generator handler to make the container available + manifestItem.handle.bind(manifestItem), + manifestItem.timeout, + ); + continue; + } + if (manifestItem instanceof ClientHandler) { + this.registerClientStreamHandler( + key, + manifestItem.handle, + manifestItem.timeout, + ); + continue; + } + if (manifestItem instanceof ClientHandler) { + this.registerClientStreamHandler( + key, + manifestItem.handle, + manifestItem.timeout, + ); + continue; + } + if (manifestItem instanceof UnaryHandler) { + this.registerUnaryHandler( + key, + manifestItem.handle, + manifestItem.timeout, + ); + continue; + } + utils.never(); } - utils.never(); + } catch (e) { + // No need to clean up streams, as streams can only be handled after RPCServer has been started. + this.handlerMap.clear(); + this.defaultTimeoutMap.clear(); + throw e; } this.logger.info(`Started ${this.constructor.name}`); } diff --git a/src/errors.ts b/src/errors.ts index 4a1fd4b..69c3ce3 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -27,6 +27,14 @@ class ErrorRPCCallerFailed extends ErrorRPC { static description = 'Failed to call stream'; } +class ErrorRPCInvalidTimeout extends ErrorRPC { + static description = 'Invalid timeout provided'; +} + +class ErrorRPCInvalidHandlerTimeout extends ErrorRPC { + static description = 'Invalid handler timeout provided'; +} + abstract class ErrorRPCProtocol extends ErrorRPC { static error = 'RPC Protocol Error'; code: number; @@ -257,6 +265,8 @@ export { ErrorRPCConnectionLocal, ErrorRPCConnectionPeer, ErrorRPCConnectionKeepAliveTimeOut, + ErrorRPCInvalidTimeout, + ErrorRPCInvalidHandlerTimeout, ErrorRPCConnectionInternal, ErrorMissingHeader, ErrorHandlerAborted, diff --git a/tests/RPCClient.test.ts b/tests/RPCClient.test.ts index dcef66c..e8060c6 100644 --- a/tests/RPCClient.test.ts +++ b/tests/RPCClient.test.ts @@ -1142,6 +1142,28 @@ describe(`${RPCClient.name}`, () => { }, { numRuns: 5 }, ); + testProp( + 'RPCClient constructor should throw when passed a negative timeoutTime', + [fc.integer({ max: -1 })], + async (timeoutTime) => { + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: new ReadableStream(), + writable: new WritableStream(), + }; + const constructorF = () => + new RPCClient({ + timeoutTime, + streamFactory: () => Promise.resolve(streamPair), + manifest: {}, + logger, + idGen, + }); + + expect(constructorF).toThrowError(rpcErrors.ErrorRPCInvalidTimeout); + }, + ); testProp( 'Check that ctx is provided to the middleware and that the middleware can reset the timer', [specificMessageArb], diff --git a/tests/RPCServer.test.ts b/tests/RPCServer.test.ts index 1647b13..877f6b4 100644 --- a/tests/RPCServer.test.ts +++ b/tests/RPCServer.test.ts @@ -953,7 +953,6 @@ describe(`${RPCServer.name}`, () => { const waitProm = promise(); const ctxShortProm = promise(); class TestMethodShortTimeout extends UnaryHandler { - timeout = 25; public handle = async ( input: JSONValue, _cancel, @@ -965,29 +964,59 @@ describe(`${RPCServer.name}`, () => { return input; }; } - const ctxLongProm = promise(); - class TestMethodLongTimeout extends UnaryHandler { - timeout = 100; + const rpcServer = new RPCServer({ + timeoutTime: Infinity, + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testShort: new TestMethodShortTimeout({}), + }, + }); + const streamShort = rpcTestUtils.messagesToReadableStream([ + { + jsonrpc: '2.0', + method: 'testShort', + params: null, + }, + ]); + const readWriteStreamShort: RPCStream = { + cancel: () => {}, + readable: streamShort, + writable: new WritableStream(), + }; + rpcServer.handleStream(readWriteStreamShort); + // Shorter timeout is updated + const ctxShort = await ctxShortProm.p; + expect(ctxShort.timer.delay).toEqual(Infinity); + } + }); + test('handler overrides the server with Infinite timeout', async () => { + { + const waitProm = promise(); + const ctxShortProm = promise(); + class TestMethodShortTimeout extends UnaryHandler { + timeout = Infinity; public handle = async ( input: JSONValue, _cancel, _meta, ctx_, ): Promise => { - ctxLongProm.resolveP(ctx_); + ctxShortProm.resolveP(ctx_); await waitProm.p; return input; }; } const rpcServer = new RPCServer({ - timeoutTime: 50, + timeoutTime: 1000, logger, idGen, }); await rpcServer.start({ manifest: { testShort: new TestMethodShortTimeout({}), - testLong: new TestMethodLongTimeout({}), }, }); const streamShort = rpcTestUtils.messagesToReadableStream([ @@ -1005,26 +1034,7 @@ describe(`${RPCServer.name}`, () => { rpcServer.handleStream(readWriteStreamShort); // Shorter timeout is updated const ctxShort = await ctxShortProm.p; - expect(ctxShort.timer.delay).toEqual(25); - const streamLong = rpcTestUtils.messagesToReadableStream([ - { - jsonrpc: '2.0', - method: 'testLong', - params: null, - }, - ]); - const readWriteStreamLong: RPCStream = { - cancel: () => {}, - readable: streamLong, - writable: new WritableStream(), - }; - rpcServer.handleStream(readWriteStreamLong); - - // Longer timeout is set to server's default - const ctxLong = await ctxLongProm.p; - expect(ctxLong.timer.delay).toEqual(50); - waitProm.resolveP(); - await rpcServer.stop({ force: true }); + expect(ctxShort.timer.delay).toEqual(Infinity); } }); test('duplex handler refreshes timeout when messages are sent', async () => { @@ -1153,6 +1163,54 @@ describe(`${RPCServer.name}`, () => { await expect(ctx.timer).toReject(); await rpcServer.stop({ force: true }); }); + testProp( + 'RPCServer constructor should throw when passed a negative timeoutTime', + [fc.integer({ max: -1 })], + async (timeoutTime) => { + const constructorF = () => + new RPCServer({ + timeoutTime, + logger, + idGen, + }); + + expect(constructorF).toThrowError(rpcErrors.ErrorRPCInvalidTimeout); + }, + ); + testProp( + 'RPCServer.start should throw when passed a handler with negative timeout', + [fc.integer({ max: -1 })], + async (timeoutTime) => { + const waitProm = promise(); + const ctxLongProm = promise(); + + class TestMethodArbitraryTimeout extends UnaryHandler { + timeout = timeoutTime; + public handle = async ( + input: JSONValue, + _cancel, + _meta, + ctx_, + ): Promise => { + ctxLongProm.resolveP(ctx_); + await waitProm.p; + return input; + }; + } + const rpcServer = new RPCServer({ + logger, + idGen, + }); + + await expect( + rpcServer.start({ + manifest: { + testArbitrary: new TestMethodArbitraryTimeout({}), + }, + }), + ).rejects.toBeInstanceOf(rpcErrors.ErrorRPCInvalidHandlerTimeout); + }, + ); testProp( 'middleware can update timeout timer', [specificMessageArb],