Skip to content

Commit

Permalink
feat: Implementing handler and caller timeouts which can override def…
Browse files Browse the repository at this point in the history
…ault server and client timeouts regardless of their default valu

fix: renamed `RPCClient.timeout` and `RPCServer.timeout` to `timeoutTIme`

fix: timeout tests for RPCClient and RPCServer

[ci-skip]
  • Loading branch information
addievo authored and amydevs committed Oct 30, 2023
1 parent 37accfd commit 97819f5
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 32 deletions.
14 changes: 7 additions & 7 deletions src/RPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class RPCClient<M extends ClientManifest> {
this.onTimeoutCallback = callback;
}
// Method proxies
public readonly streamKeepAliveTimeoutTime: number;
public readonly timeoutTime: number;
public readonly methodsProxy = new Proxy(
{},
{
Expand Down Expand Up @@ -75,7 +75,7 @@ class RPCClient<M extends ClientManifest> {
* The middlewareFactory needs to be a function that creates a pair of
* transform streams that convert `JSONRPCRequest` to `Uint8Array` on the forward
* path and `Uint8Array` to `JSONRPCResponse` on the reverse path.
* @param obj.streamKeepAliveTimeoutTime - Timeout time used if no timeout timer was provided when making a call.
* @param obj.timeoutTime - Timeout time used if no timeout timer was provided when making a call.
* Defaults to 60,000 milliseconds.
* for a client call.
* @param obj.logger
Expand All @@ -84,7 +84,7 @@ class RPCClient<M extends ClientManifest> {
manifest,
streamFactory,
middlewareFactory = middleware.defaultClientMiddlewareWrapper(),
streamKeepAliveTimeoutTime = Infinity,
timeoutTime = Infinity,
logger,
toError = utils.toError,
idGen = () => null,
Expand All @@ -97,7 +97,7 @@ class RPCClient<M extends ClientManifest> {
JSONRPCResponse,
Uint8Array
>;
streamKeepAliveTimeoutTime?: number;
timeoutTime?: number;
logger?: Logger;
idGen?: IdGen;
toError?: ToError;
Expand All @@ -106,7 +106,7 @@ class RPCClient<M extends ClientManifest> {
this.callerTypes = utils.getHandlerTypes(manifest);
this.streamFactory = streamFactory;
this.middlewareFactory = middlewareFactory;
this.streamKeepAliveTimeoutTime = streamKeepAliveTimeoutTime;
this.timeoutTime = timeoutTime;
this.logger = logger ?? new Logger(this.constructor.name);
this.toError = toError;
}
Expand Down Expand Up @@ -253,7 +253,7 @@ class RPCClient<M extends ClientManifest> {
let timer: Timer;
if (!(ctx.timer instanceof Timer)) {
timer = new Timer({
delay: ctx.timer ?? this.streamKeepAliveTimeoutTime,
delay: ctx.timer ?? this.timeoutTime,
});
} else {
timer = ctx.timer;
Expand Down Expand Up @@ -402,7 +402,7 @@ class RPCClient<M extends ClientManifest> {
let timer: Timer;
if (!(ctx.timer instanceof Timer)) {
timer = new Timer({
delay: ctx.timer ?? this.streamKeepAliveTimeoutTime,
delay: ctx.timer ?? this.timeoutTime,
});
} else {
timer = ctx.timer;
Expand Down
17 changes: 7 additions & 10 deletions src/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class RPCServer {
protected logger: Logger;
protected handlerMap: Map<string, RawHandlerImplementation> = new Map();
protected defaultTimeoutMap: Map<string, number | undefined> = new Map();
protected handlerTimeoutTime: number;
protected timeoutTime: number;
protected activeStreams: Set<PromiseCancellable<void>> = new Set();
protected fromError: FromError;
protected replacer?: (key: string, value: any) => any;
Expand All @@ -80,18 +80,15 @@ class RPCServer {
* The middlewareFactory needs to be a function that creates a pair of
* transform streams that convert `Uint8Array` to `JSONRPCRequest` on the forward
* path and `JSONRPCResponse` to `Uint8Array` on the reverse path.
* @param obj.streamKeepAliveTimeoutTime - Time before a connection is cleaned up due to no activity. This is the
* @param obj.timeoutTime - Time before a stream is cleaned up due to no activity. This is the
* value used if the handler doesn't specify its own timeout time. This timeout is advisory and only results in a
* signal sent to the handler. Stream is forced to end after the timeoutForceCloseTime. Defaults to 60,000
* milliseconds.
* @param obj.timeoutForceCloseTime - Time before the stream is forced to end after the initial timeout time.
* The stream will be forced to close after this amount of time after the initial timeout. This is a grace period for
* the handler to handle timeout before it is forced to end. Defaults to 2,000 milliseconds.
* @param obj.logger
*/
public constructor({
middlewareFactory = middleware.defaultServerMiddlewareWrapper(),
handlerTimeoutTime = Infinity,
timeoutTime = Infinity,
logger,
idGen = () => null,
fromError = utils.fromError,
Expand All @@ -103,15 +100,15 @@ class RPCServer {
Uint8Array,
JSONRPCResponseResult
>;
handlerTimeoutTime?: number;
timeoutTime?: number;
logger?: Logger;
idGen?: IdGen;
fromError?: FromError;
replacer?: (key: string, value: any) => any;
}) {
this.idGen = idGen;
this.middlewareFactory = middlewareFactory;
this.handlerTimeoutTime = handlerTimeoutTime;
this.timeoutTime = timeoutTime;
this.fromError = fromError;
this.replacer = replacer;
this.logger = logger ?? new Logger(this.constructor.name);
Expand Down Expand Up @@ -453,7 +450,7 @@ class RPCServer {
const abortController = new AbortController();
// Setting up timeout timer logic
const timer = new Timer({
delay: this.handlerTimeoutTime,
delay: this.timeoutTime,
handler: () => {
abortController.abort(new errors.ErrorRPCTimedOut());
if (this.onTimeoutCallback) {
Expand Down Expand Up @@ -575,7 +572,7 @@ class RPCServer {
// Setting up Timeout logic
const timeout = this.defaultTimeoutMap.get(method);
if (timer.status !== 'settled') {
if (timeout != null && timeout < this.handlerTimeoutTime) {
if (timeout != null) {
// Reset timeout with new delay if it is less than the default
timer.reset(timeout);
} else {
Expand Down
10 changes: 4 additions & 6 deletions tests/RPC.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ describe('RPC', () => {
const rpcServer = new RPCServer({
logger,
idGen,
handlerTimeoutTime: timeout,
timeoutTime: timeout,
});
await rpcServer.start({
manifest: {
Expand Down Expand Up @@ -673,7 +673,7 @@ describe('RPC', () => {
const rpcServer = new RPCServer({
logger,
idGen,
handlerTimeoutTime: 1,
timeoutTime: 1,
});
await rpcServer.start({ manifest: { testMethod: new TestMethod({}) } });
// Register callback
Expand Down Expand Up @@ -746,8 +746,7 @@ describe('RPC', () => {
const rpcServer = new RPCServer({
logger,
idGen,

handlerTimeoutTime: 400,
timeoutTime: 400,
});
await rpcServer.start({
manifest: {
Expand Down Expand Up @@ -811,11 +810,10 @@ describe('RPC', () => {
await abortProm.p;
};
}

const rpcServer = new RPCServer({
logger,
idGen,
handlerTimeoutTime: Infinity,
timeoutTime: Infinity,
});
await rpcServer.start({ manifest: { testMethod: new TestMethod({}) } });
rpcServer.handleStream({ ...serverPair, cancel: () => {} });
Expand Down
74 changes: 70 additions & 4 deletions tests/RPCClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ describe(`${RPCClient.name}`, () => {
// Should never reach this when testing
return {} as RPCStream<Uint8Array, Uint8Array>;
},
streamKeepAliveTimeoutTime: 100,
timeoutTime: 100,
logger,
idGen,
});
Expand Down Expand Up @@ -913,7 +913,7 @@ describe(`${RPCClient.name}`, () => {
// Should never reach this when testing
return {} as RPCStream<Uint8Array, Uint8Array>;
},
streamKeepAliveTimeoutTime: 100,
timeoutTime: 100,
logger,
idGen,
});
Expand Down Expand Up @@ -1000,7 +1000,7 @@ describe(`${RPCClient.name}`, () => {
ctx = ctx_;
return streamPair;
},
streamKeepAliveTimeoutTime: 100,
timeoutTime: 100,
logger,
idGen,
});
Expand Down Expand Up @@ -1143,7 +1143,7 @@ describe(`${RPCClient.name}`, () => {
{ numRuns: 5 },
);
testProp(
'Check that ctx is provided to the middleWare and that the middleware can reset the timer',
'Check that ctx is provided to the middleware and that the middleware can reset the timer',
[specificMessageArb],
async (messages) => {
const inputStream = rpcTestUtils.messagesToReadableStream(messages);
Expand Down Expand Up @@ -1191,4 +1191,70 @@ describe(`${RPCClient.name}`, () => {
{ numRuns: 1 },
);
});
describe('timeout priority', () => {
testProp(
'check that call with ctx can override higher timeout of RPCClient',
[rpcTestUtils.timeoutsArb],
async ([lowerTimeoutTime, higherTimeoutTime]) => {
const streamPair: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
meta: undefined,
readable: new ReadableStream(),
writable: new WritableStream(),
};
const { p: ctxP, resolveP: resolveCtxP } = promise<ContextTimed>();
const rpcClient = new RPCClient({
manifest: {},
streamFactory: async (ctx) => {
resolveCtxP(ctx);
return streamPair;
},
logger,
idGen,
timeoutTime: higherTimeoutTime,
});

await rpcClient.duplexStreamCaller<JSONValue, JSONValue>(methodName, {
timer: lowerTimeoutTime,
});

const ctx = await ctxP;
expect(ctx.timer.delay).toBe(lowerTimeoutTime);
ctx.timer.cancel();
await ctx.timer.catch(() => {});
},
);
testProp(
'check that call with ctx can override lower timeout of RPCClient',
[rpcTestUtils.timeoutsArb],
async ([lowerTimeoutTime, higherTimeoutTime]) => {
const streamPair: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
meta: undefined,
readable: new ReadableStream(),
writable: new WritableStream(),
};
const { p: ctxP, resolveP: resolveCtxP } = promise<ContextTimed>();
const rpcClient = new RPCClient({
manifest: {},
streamFactory: async (ctx) => {
resolveCtxP(ctx);
return streamPair;
},
logger,
idGen,
timeoutTime: lowerTimeoutTime,
});

await rpcClient.duplexStreamCaller<JSONValue, JSONValue>(methodName, {
timer: higherTimeoutTime,
});

const ctx = await ctxP;
expect(ctx.timer.delay).toBe(higherTimeoutTime);
ctx.timer.cancel();
await ctx.timer.catch(() => {});
},
);
});
});
Loading

0 comments on commit 97819f5

Please sign in to comment.