From 93d535afac470b63c427965b066814e5182107a2 Mon Sep 17 00:00:00 2001 From: Aditya <38064122+bettercallav@users.noreply.github.com> Date: Fri, 20 Oct 2023 14:12:06 +1100 Subject: [PATCH] feat: Implementing handler and caller timeouts which can override default server and client timeouts regardless of their default valu --- src/RPCClient.ts | 8 +-- src/RPCServer.ts | 15 +++--- tests/RPCClient.test.ts | 72 ++++++++++++++++++++++++++ tests/RPCServer.test.ts | 112 +++++++++++++++++++++++++++++++++------- 4 files changed, 176 insertions(+), 31 deletions(-) diff --git a/src/RPCClient.ts b/src/RPCClient.ts index 4c48d7d..5e99618 100644 --- a/src/RPCClient.ts +++ b/src/RPCClient.ts @@ -40,7 +40,7 @@ class RPCClient { this.onTimeoutCallback = callback; } // Method proxies - public readonly streamKeepAliveTimeoutTime: number; + public readonly timeout: number; public readonly methodsProxy = new Proxy( {}, { @@ -106,7 +106,7 @@ class RPCClient { this.callerTypes = utils.getHandlerTypes(manifest); this.streamFactory = streamFactory; this.middlewareFactory = middlewareFactory; - this.streamKeepAliveTimeoutTime = streamKeepAliveTimeoutTime; + this.timeout = streamKeepAliveTimeoutTime; this.logger = logger ?? new Logger(this.constructor.name); this.toError = toError; } @@ -253,7 +253,7 @@ class RPCClient { let timer: Timer; if (!(ctx.timer instanceof Timer)) { timer = new Timer({ - delay: ctx.timer ?? this.streamKeepAliveTimeoutTime, + delay: ctx.timer ?? this.timeout, }); } else { timer = ctx.timer; @@ -402,7 +402,7 @@ class RPCClient { let timer: Timer; if (!(ctx.timer instanceof Timer)) { timer = new Timer({ - delay: ctx.timer ?? this.streamKeepAliveTimeoutTime, + delay: ctx.timer ?? this.timeout, }); } else { timer = ctx.timer; diff --git a/src/RPCServer.ts b/src/RPCServer.ts index fbf9161..bbe0781 100644 --- a/src/RPCServer.ts +++ b/src/RPCServer.ts @@ -58,7 +58,7 @@ class RPCServer { protected logger: Logger; protected handlerMap: Map = new Map(); protected defaultTimeoutMap: Map = new Map(); - protected handlerTimeoutTime: number; + protected timeout: number; protected activeStreams: Set> = new Set(); protected fromError: FromError; protected replacer?: (key: string, value: any) => any; @@ -80,13 +80,10 @@ class RPCServer { * The middlewareFactory needs to be a function that creates a pair of * transform streams that convert `Uint8Array` to `JSONRPCRequest` on the forward * path and `JSONRPCResponse` to `Uint8Array` on the reverse path. - * @param obj.streamKeepAliveTimeoutTime - Time before a connection is cleaned up due to no activity. This is the + * @param obj.handlerTimeoutTime - Time before a connection is cleaned up due to no activity. This is the * value used if the handler doesn't specify its own timeout time. This timeout is advisory and only results in a * signal sent to the handler. Stream is forced to end after the timeoutForceCloseTime. Defaults to 60,000 * milliseconds. - * @param obj.timeoutForceCloseTime - Time before the stream is forced to end after the initial timeout time. - * The stream will be forced to close after this amount of time after the initial timeout. This is a grace period for - * the handler to handle timeout before it is forced to end. Defaults to 2,000 milliseconds. * @param obj.logger */ public constructor({ @@ -111,7 +108,7 @@ class RPCServer { }) { this.idGen = idGen; this.middlewareFactory = middlewareFactory; - this.handlerTimeoutTime = handlerTimeoutTime; + this.timeout = handlerTimeoutTime; this.fromError = fromError; this.replacer = replacer; this.logger = logger ?? new Logger(this.constructor.name); @@ -449,7 +446,7 @@ class RPCServer { const abortController = new AbortController(); // Setting up timeout timer logic const timer = new Timer({ - delay: this.handlerTimeoutTime, + delay: this.timeout, handler: () => { abortController.abort(new errors.ErrorRPCTimedOut()); if (this.onTimeoutCallback) { @@ -570,8 +567,8 @@ class RPCServer { } // Setting up Timeout logic const timeout = this.defaultTimeoutMap.get(method); - if (timeout != null && timeout < this.handlerTimeoutTime) { - // Reset timeout with new delay if it is less than the default + if (timeout != null) { + // Reset Handler.timeout with new delay if it is less than the default timer.reset(timeout); } else { // Otherwise refresh diff --git a/tests/RPCClient.test.ts b/tests/RPCClient.test.ts index d518616..415574c 100644 --- a/tests/RPCClient.test.ts +++ b/tests/RPCClient.test.ts @@ -1179,5 +1179,77 @@ describe(`${RPCClient.name}`, () => { }, { numRuns: 1 }, ); + testProp( + 'caller overrides client timeout - lesser value', + [specificMessageArb], + async (messages) => { + const inputStream = rpcTestUtils.messagesToReadableStream(messages); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const ctxProm = promise(); + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async (ctx) => { + ctxProm.resolveP(ctx); + return streamPair; + }, + logger, + idGen, + // Setting timeout here to 150ms + streamKeepAliveTimeoutTime: 150, + }); + + const callerInterface = await rpcClient.duplexStreamCaller< + JSONValue, + JSONValue + >(methodName, { timer: 100 }); + + const ctx = await ctxProm.p; + expect(ctx.timer.delay).toEqual(100); + }, + { numRuns: 5 }, + ); + testProp( + 'caller overrides client timeout - greater value', + [specificMessageArb], + async (messages) => { + const inputStream = rpcTestUtils.messagesToReadableStream(messages); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const ctxProm = promise(); + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async (ctx) => { + ctxProm.resolveP(ctx); + return streamPair; + }, + logger, + idGen, + // Setting timeout here to 150ms + streamKeepAliveTimeoutTime: 150, + }); + + const callerInterface = await rpcClient.duplexStreamCaller< + JSONValue, + JSONValue + >(methodName, { timer: 300 }); + + const ctx = await ctxProm.p; + expect(ctx.timer.delay).toEqual(300); + }, + { numRuns: 5 }, + ); }); }); diff --git a/tests/RPCServer.test.ts b/tests/RPCServer.test.ts index acb4ed1..4996af3 100644 --- a/tests/RPCServer.test.ts +++ b/tests/RPCServer.test.ts @@ -55,7 +55,13 @@ describe(`${RPCServer.name}`, () => { data: rpcTestUtils.safeJsonValueArb, }), ); - + const timeoutArb = fc.oneof( + fc.constant(0), + fc.constant(-1), + fc.constant(1000), + fc.constant(-1000), + fc.integer(), + ); testProp( 'can stream data with raw duplex stream handler', [specificMessageArb], @@ -942,11 +948,12 @@ describe(`${RPCServer.name}`, () => { } await rpcServer.stop({ force: true }); }); - test('handler overrides timeout', async () => { + test('handler overrides timeout - lesser value', async () => { { const waitProm = promise(); const ctxShortProm = promise(); class TestMethodShortTimeout extends UnaryHandler { + // This will override the default timeout from 50 to 25, thereby decreasing it. timeout = 25; public handle = async ( input: JSONValue, @@ -959,20 +966,6 @@ describe(`${RPCServer.name}`, () => { return input; }; } - const ctxLongProm = promise(); - class TestMethodLongTimeout extends UnaryHandler { - timeout = 100; - public handle = async ( - input: JSONValue, - _cancel, - _meta, - ctx_, - ): Promise => { - ctxLongProm.resolveP(ctx_); - await waitProm.p; - return input; - }; - } const rpcServer = new RPCServer({ handlerTimeoutTime: 50, logger, @@ -981,7 +974,6 @@ describe(`${RPCServer.name}`, () => { await rpcServer.start({ manifest: { testShort: new TestMethodShortTimeout({}), - testLong: new TestMethodLongTimeout({}), }, }); const streamShort = rpcTestUtils.messagesToReadableStream([ @@ -1000,6 +992,36 @@ describe(`${RPCServer.name}`, () => { // Shorter timeout is updated const ctxShort = await ctxShortProm.p; expect(ctxShort.timer.delay).toEqual(25); + } + }); + test('handler overrides timeout - greater value', async () => { + { + const waitProm = promise(); + const ctxLongProm = promise(); + class TestMethodLongTimeout extends UnaryHandler { + // This will override the default timeout from 50 to 250, thereby increasing it. + timeout = 250; + public handle = async ( + input: JSONValue, + _cancel, + _meta, + ctx_, + ): Promise => { + ctxLongProm.resolveP(ctx_); + await waitProm.p; + return input; + }; + } + const rpcServer = new RPCServer({ + handlerTimeoutTime: 50, + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testLong: new TestMethodLongTimeout({}), + }, + }); const streamLong = rpcTestUtils.messagesToReadableStream([ { jsonrpc: '2.0', @@ -1016,7 +1038,7 @@ describe(`${RPCServer.name}`, () => { // Longer timeout is set to server's default const ctxLong = await ctxLongProm.p; - expect(ctxLong.timer.delay).toEqual(50); + expect(ctxLong.timer.delay).toEqual(250); waitProm.resolveP(); await rpcServer.stop({ force: true }); } @@ -1147,6 +1169,60 @@ describe(`${RPCServer.name}`, () => { await expect(ctx.timer).toReject(); await rpcServer.stop({ force: true }); }); + testProp( + 'handler overrides timeout - arbitrary value with edge cases', + [timeoutArb, timeoutArb], + async (serverTimeout, handlerTimeout) => { + const waitProm = promise(); + const ctxLongProm = promise(); + + class TestMethodArbitraryTimeout extends UnaryHandler { + timeout = handlerTimeout; + public handle = async ( + input: JSONValue, + _cancel, + _meta, + ctx_, + ): Promise => { + ctxLongProm.resolveP(ctx_); + await waitProm.p; + return input; + }; + } + + const rpcServer = new RPCServer({ + handlerTimeoutTime: serverTimeout, + logger, + idGen, + }); + + await rpcServer.start({ + manifest: { + testArbitrary: new TestMethodArbitraryTimeout({}), + }, + }); + + const streamLong = rpcTestUtils.messagesToReadableStream([ + { + jsonrpc: '2.0', + method: 'testArbitrary', + params: null, + }, + ]); + + const readWriteStreamLong: RPCStream = { + cancel: () => {}, + readable: streamLong, + writable: new WritableStream(), + }; + + rpcServer.handleStream(readWriteStreamLong); + const ctxLong = await ctxLongProm.p; + expect(ctxLong.timer.delay).toEqual(handlerTimeout); + waitProm.resolveP(); + await rpcServer.stop({ force: true }); + }, + ); testProp( 'middleware can update timeout timer', [specificMessageArb],