From 97819f5fb5b7602c6de98bbcbddbc91e09b65a9a Mon Sep 17 00:00:00 2001 From: Aditya <38064122+bettercallav@users.noreply.github.com> Date: Fri, 20 Oct 2023 14:12:06 +1100 Subject: [PATCH] feat: Implementing handler and caller timeouts which can override default server and client timeouts regardless of their default valu fix: renamed `RPCClient.timeout` and `RPCServer.timeout` to `timeoutTIme` fix: timeout tests for RPCClient and RPCServer [ci-skip] --- src/RPCClient.ts | 14 +++--- src/RPCServer.ts | 17 +++----- tests/RPC.test.ts | 10 ++--- tests/RPCClient.test.ts | 74 ++++++++++++++++++++++++++++++-- tests/RPCServer.test.ts | 95 ++++++++++++++++++++++++++++++++++++++--- tests/utils.ts | 10 +++++ 6 files changed, 188 insertions(+), 32 deletions(-) diff --git a/src/RPCClient.ts b/src/RPCClient.ts index 4c48d7d..f22d6a2 100644 --- a/src/RPCClient.ts +++ b/src/RPCClient.ts @@ -40,7 +40,7 @@ class RPCClient { this.onTimeoutCallback = callback; } // Method proxies - public readonly streamKeepAliveTimeoutTime: number; + public readonly timeoutTime: number; public readonly methodsProxy = new Proxy( {}, { @@ -75,7 +75,7 @@ class RPCClient { * The middlewareFactory needs to be a function that creates a pair of * transform streams that convert `JSONRPCRequest` to `Uint8Array` on the forward * path and `Uint8Array` to `JSONRPCResponse` on the reverse path. - * @param obj.streamKeepAliveTimeoutTime - Timeout time used if no timeout timer was provided when making a call. + * @param obj.timeoutTime - Timeout time used if no timeout timer was provided when making a call. * Defaults to 60,000 milliseconds. * for a client call. * @param obj.logger @@ -84,7 +84,7 @@ class RPCClient { manifest, streamFactory, middlewareFactory = middleware.defaultClientMiddlewareWrapper(), - streamKeepAliveTimeoutTime = Infinity, + timeoutTime = Infinity, logger, toError = utils.toError, idGen = () => null, @@ -97,7 +97,7 @@ class RPCClient { JSONRPCResponse, Uint8Array >; - streamKeepAliveTimeoutTime?: number; + timeoutTime?: number; logger?: Logger; idGen?: IdGen; toError?: ToError; @@ -106,7 +106,7 @@ class RPCClient { this.callerTypes = utils.getHandlerTypes(manifest); this.streamFactory = streamFactory; this.middlewareFactory = middlewareFactory; - this.streamKeepAliveTimeoutTime = streamKeepAliveTimeoutTime; + this.timeoutTime = timeoutTime; this.logger = logger ?? new Logger(this.constructor.name); this.toError = toError; } @@ -253,7 +253,7 @@ class RPCClient { let timer: Timer; if (!(ctx.timer instanceof Timer)) { timer = new Timer({ - delay: ctx.timer ?? this.streamKeepAliveTimeoutTime, + delay: ctx.timer ?? this.timeoutTime, }); } else { timer = ctx.timer; @@ -402,7 +402,7 @@ class RPCClient { let timer: Timer; if (!(ctx.timer instanceof Timer)) { timer = new Timer({ - delay: ctx.timer ?? this.streamKeepAliveTimeoutTime, + delay: ctx.timer ?? this.timeoutTime, }); } else { timer = ctx.timer; diff --git a/src/RPCServer.ts b/src/RPCServer.ts index 8226009..49769c3 100644 --- a/src/RPCServer.ts +++ b/src/RPCServer.ts @@ -58,7 +58,7 @@ class RPCServer { protected logger: Logger; protected handlerMap: Map = new Map(); protected defaultTimeoutMap: Map = new Map(); - protected handlerTimeoutTime: number; + protected timeoutTime: number; protected activeStreams: Set> = new Set(); protected fromError: FromError; protected replacer?: (key: string, value: any) => any; @@ -80,18 +80,15 @@ class RPCServer { * The middlewareFactory needs to be a function that creates a pair of * transform streams that convert `Uint8Array` to `JSONRPCRequest` on the forward * path and `JSONRPCResponse` to `Uint8Array` on the reverse path. - * @param obj.streamKeepAliveTimeoutTime - Time before a connection is cleaned up due to no activity. This is the + * @param obj.timeoutTime - Time before a stream is cleaned up due to no activity. This is the * value used if the handler doesn't specify its own timeout time. This timeout is advisory and only results in a * signal sent to the handler. Stream is forced to end after the timeoutForceCloseTime. Defaults to 60,000 * milliseconds. - * @param obj.timeoutForceCloseTime - Time before the stream is forced to end after the initial timeout time. - * The stream will be forced to close after this amount of time after the initial timeout. This is a grace period for - * the handler to handle timeout before it is forced to end. Defaults to 2,000 milliseconds. * @param obj.logger */ public constructor({ middlewareFactory = middleware.defaultServerMiddlewareWrapper(), - handlerTimeoutTime = Infinity, + timeoutTime = Infinity, logger, idGen = () => null, fromError = utils.fromError, @@ -103,7 +100,7 @@ class RPCServer { Uint8Array, JSONRPCResponseResult >; - handlerTimeoutTime?: number; + timeoutTime?: number; logger?: Logger; idGen?: IdGen; fromError?: FromError; @@ -111,7 +108,7 @@ class RPCServer { }) { this.idGen = idGen; this.middlewareFactory = middlewareFactory; - this.handlerTimeoutTime = handlerTimeoutTime; + this.timeoutTime = timeoutTime; this.fromError = fromError; this.replacer = replacer; this.logger = logger ?? new Logger(this.constructor.name); @@ -453,7 +450,7 @@ class RPCServer { const abortController = new AbortController(); // Setting up timeout timer logic const timer = new Timer({ - delay: this.handlerTimeoutTime, + delay: this.timeoutTime, handler: () => { abortController.abort(new errors.ErrorRPCTimedOut()); if (this.onTimeoutCallback) { @@ -575,7 +572,7 @@ class RPCServer { // Setting up Timeout logic const timeout = this.defaultTimeoutMap.get(method); if (timer.status !== 'settled') { - if (timeout != null && timeout < this.handlerTimeoutTime) { + if (timeout != null) { // Reset timeout with new delay if it is less than the default timer.reset(timeout); } else { diff --git a/tests/RPC.test.ts b/tests/RPC.test.ts index c7741e2..180047f 100644 --- a/tests/RPC.test.ts +++ b/tests/RPC.test.ts @@ -581,7 +581,7 @@ describe('RPC', () => { const rpcServer = new RPCServer({ logger, idGen, - handlerTimeoutTime: timeout, + timeoutTime: timeout, }); await rpcServer.start({ manifest: { @@ -673,7 +673,7 @@ describe('RPC', () => { const rpcServer = new RPCServer({ logger, idGen, - handlerTimeoutTime: 1, + timeoutTime: 1, }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}) } }); // Register callback @@ -746,8 +746,7 @@ describe('RPC', () => { const rpcServer = new RPCServer({ logger, idGen, - - handlerTimeoutTime: 400, + timeoutTime: 400, }); await rpcServer.start({ manifest: { @@ -811,11 +810,10 @@ describe('RPC', () => { await abortProm.p; }; } - const rpcServer = new RPCServer({ logger, idGen, - handlerTimeoutTime: Infinity, + timeoutTime: Infinity, }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}) } }); rpcServer.handleStream({ ...serverPair, cancel: () => {} }); diff --git a/tests/RPCClient.test.ts b/tests/RPCClient.test.ts index 59c3b13..dcef66c 100644 --- a/tests/RPCClient.test.ts +++ b/tests/RPCClient.test.ts @@ -744,7 +744,7 @@ describe(`${RPCClient.name}`, () => { // Should never reach this when testing return {} as RPCStream; }, - streamKeepAliveTimeoutTime: 100, + timeoutTime: 100, logger, idGen, }); @@ -913,7 +913,7 @@ describe(`${RPCClient.name}`, () => { // Should never reach this when testing return {} as RPCStream; }, - streamKeepAliveTimeoutTime: 100, + timeoutTime: 100, logger, idGen, }); @@ -1000,7 +1000,7 @@ describe(`${RPCClient.name}`, () => { ctx = ctx_; return streamPair; }, - streamKeepAliveTimeoutTime: 100, + timeoutTime: 100, logger, idGen, }); @@ -1143,7 +1143,7 @@ describe(`${RPCClient.name}`, () => { { numRuns: 5 }, ); testProp( - 'Check that ctx is provided to the middleWare and that the middleware can reset the timer', + 'Check that ctx is provided to the middleware and that the middleware can reset the timer', [specificMessageArb], async (messages) => { const inputStream = rpcTestUtils.messagesToReadableStream(messages); @@ -1191,4 +1191,70 @@ describe(`${RPCClient.name}`, () => { { numRuns: 1 }, ); }); + describe('timeout priority', () => { + testProp( + 'check that call with ctx can override higher timeout of RPCClient', + [rpcTestUtils.timeoutsArb], + async ([lowerTimeoutTime, higherTimeoutTime]) => { + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: new ReadableStream(), + writable: new WritableStream(), + }; + const { p: ctxP, resolveP: resolveCtxP } = promise(); + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async (ctx) => { + resolveCtxP(ctx); + return streamPair; + }, + logger, + idGen, + timeoutTime: higherTimeoutTime, + }); + + await rpcClient.duplexStreamCaller(methodName, { + timer: lowerTimeoutTime, + }); + + const ctx = await ctxP; + expect(ctx.timer.delay).toBe(lowerTimeoutTime); + ctx.timer.cancel(); + await ctx.timer.catch(() => {}); + }, + ); + testProp( + 'check that call with ctx can override lower timeout of RPCClient', + [rpcTestUtils.timeoutsArb], + async ([lowerTimeoutTime, higherTimeoutTime]) => { + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: new ReadableStream(), + writable: new WritableStream(), + }; + const { p: ctxP, resolveP: resolveCtxP } = promise(); + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async (ctx) => { + resolveCtxP(ctx); + return streamPair; + }, + logger, + idGen, + timeoutTime: lowerTimeoutTime, + }); + + await rpcClient.duplexStreamCaller(methodName, { + timer: higherTimeoutTime, + }); + + const ctx = await ctxP; + expect(ctx.timer.delay).toBe(higherTimeoutTime); + ctx.timer.cancel(); + await ctx.timer.catch(() => {}); + }, + ); + }); }); diff --git a/tests/RPCServer.test.ts b/tests/RPCServer.test.ts index bd8d22a..1647b13 100644 --- a/tests/RPCServer.test.ts +++ b/tests/RPCServer.test.ts @@ -55,7 +55,6 @@ describe(`${RPCServer.name}`, () => { data: rpcTestUtils.safeJsonValueArb, }), ); - testProp( 'can stream data with raw duplex stream handler', [specificMessageArb], @@ -876,7 +875,7 @@ describe(`${RPCServer.name}`, () => { } const rpcServer = new RPCServer({ - handlerTimeoutTime: 100, + timeoutTime: 100, logger, idGen, }); @@ -922,7 +921,7 @@ describe(`${RPCServer.name}`, () => { }); test('timeout with default time before handler selected', async () => { const rpcServer = new RPCServer({ - handlerTimeoutTime: 100, + timeoutTime: 100, logger, idGen, }); @@ -981,7 +980,7 @@ describe(`${RPCServer.name}`, () => { }; } const rpcServer = new RPCServer({ - handlerTimeoutTime: 50, + timeoutTime: 50, logger, idGen, }); @@ -1053,7 +1052,7 @@ describe(`${RPCServer.name}`, () => { const rpcServer = new RPCServer({ logger, idGen, - handlerTimeoutTime: 1000, + timeoutTime: 1000, }); await rpcServer.start({ manifest: { @@ -1201,4 +1200,90 @@ describe(`${RPCServer.name}`, () => { expect(ctx.timer.delay).toBe(12345); }, ); + describe('timeout priority', () => { + testProp( + 'check that handler can override higher timeout of RPCServer', + [specificMessageArb, rpcTestUtils.timeoutsArb], + async (messages, [lowerTimeoutTime, higherTimeoutTime]) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + const { p: ctxP, resolveP: resolveCtxP } = promise(); + class TestMethod extends DuplexHandler { + public timeout = lowerTimeoutTime; + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + ctx: ContextTimed, + ): AsyncGenerator { + resolveCtxP(ctx); + yield* input; + }; + } + const rpcServer = new RPCServer({ + logger, + timeoutTime: higherTimeoutTime, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + await outputResult; + const ctx = await ctxP; + expect(ctx.timer.delay).toBe(lowerTimeoutTime); + ctx.timer.cancel(); + await ctx.timer.catch(() => {}); + }, + ); + testProp( + 'check that handler can override lower timeout of RPCServer', + [specificMessageArb, rpcTestUtils.timeoutsArb], + async (messages, [lowerTimeoutTime, higherTimeoutTime]) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + const { p: ctxP, resolveP: resolveCtxP } = promise(); + class TestMethod extends DuplexHandler { + public timeout = higherTimeoutTime; + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + ctx: ContextTimed, + ): AsyncGenerator { + resolveCtxP(ctx); + yield* input; + }; + } + const rpcServer = new RPCServer({ + logger, + timeoutTime: lowerTimeoutTime, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + await outputResult; + const ctx = await ctxP; + expect(ctx.timer.delay).toBe(higherTimeoutTime); + ctx.timer.cancel(); + await ctx.timer.catch(() => {}); + }, + ); + }); }); diff --git a/tests/utils.ts b/tests/utils.ts index 6694bf3..325a00c 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -269,6 +269,15 @@ const errorArb = ( ), ); +const timeoutsArb = fc + .integer({ min: 0 }) + .chain((lowerTimeoutTime) => + fc.tuple( + fc.constant(lowerTimeoutTime), + fc.integer({ min: lowerTimeoutTime }), + ), + ); + export { binaryStreamToSnippedStream, binaryStreamToNoisyStream, @@ -289,4 +298,5 @@ export { tapTransformStream, createTapPairs, errorArb, + timeoutsArb, };