Skip to content

Commit

Permalink
feat: Implementing handler and caller timeouts which can override def…
Browse files Browse the repository at this point in the history
…ault server and client timeouts regardless of their default value
  • Loading branch information
addievo committed Oct 25, 2023
1 parent 8434c70 commit b1c1608
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 27 deletions.
20 changes: 10 additions & 10 deletions src/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class RPCServer {
Uint8Array,
JSONRPCResponseResult
>;
// Function to register a callback for timeout
// Function to register a callback for Handler.timeout
public registerOnTimeoutCallback(callback: () => void) {
this.onTimeoutCallback = callback;
}
Expand All @@ -81,12 +81,12 @@ class RPCServer {
* transform streams that convert `Uint8Array` to `JSONRPCRequest` on the forward
* path and `JSONRPCResponse` to `Uint8Array` on the reverse path.
* @param obj.streamKeepAliveTimeoutTime - Time before a connection is cleaned up due to no activity. This is the
* value used if the handler doesn't specify its own timeout time. This timeout is advisory and only results in a
* value used if the handler doesn't specify its own Handler.timeout time. This Handler.timeout is advisory and only results in a
* signal sent to the handler. Stream is forced to end after the timeoutForceCloseTime. Defaults to 60,000
* milliseconds.
* @param obj.timeoutForceCloseTime - Time before the stream is forced to end after the initial timeout time.
* The stream will be forced to close after this amount of time after the initial timeout. This is a grace period for
* the handler to handle timeout before it is forced to end. Defaults to 2,000 milliseconds.
* @param obj.timeoutForceCloseTime - Time before the stream is forced to end after the initial Handler.timeout time.
* The stream will be forced to close after this amount of time after the initial Handler.timeout. This is a grace period for
* the handler to handle Handler.timeout before it is forced to end. Defaults to 2,000 milliseconds.
* @param obj.logger
*/
public constructor({
Expand Down Expand Up @@ -214,7 +214,7 @@ class RPCServer {
/**
* Registers a raw stream handler. This is the basis for all handlers as
* handling the streams is done with raw streams only.
* The raw streams do not automatically refresh the timeout timer when
* The raw streams do not automatically refresh the Handler.timeout timer when
* messages are sent or received.
*/
protected registerRawStreamHandler(
Expand Down Expand Up @@ -447,7 +447,7 @@ class RPCServer {
// handling for it.
// Constructing the PromiseCancellable for tracking the active stream
const abortController = new AbortController();
// Setting up timeout timer logic
// Setting up Handler.timeout timer logic
const timer = new Timer({
delay: this.handlerTimeoutTime,
handler: () => {
Expand Down Expand Up @@ -524,7 +524,7 @@ class RPCServer {
// Downgrade back to the raw stream
await reader.cancel();
// There are 2 conditions where we just end here
// 1. The timeout timer resolves before the first message
// 1. The Handler.timeout timer resolves before the first message
// 2. the stream ends before the first message
if (headerMessage == null) {
const newErr = new errors.ErrorRPCTimedOut(
Expand Down Expand Up @@ -570,8 +570,8 @@ class RPCServer {
}
// Setting up Timeout logic
const timeout = this.defaultTimeoutMap.get(method);
if (timeout != null && timeout < this.handlerTimeoutTime) {
// Reset timeout with new delay if it is less than the default
if (timeout != null) {
// Reset Handler.timeout with new delay if it is less than the default
timer.reset(timeout);
} else {
// Otherwise refresh
Expand Down
72 changes: 72 additions & 0 deletions tests/RPCClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1179,5 +1179,77 @@ describe(`${RPCClient.name}`, () => {
},
{ numRuns: 1 },
);
testProp(
'handler overrides client timeout - lesser value',
[specificMessageArb],
async (messages) => {
const inputStream = rpcTestUtils.messagesToReadableStream(messages);
const [outputResult, outputStream] =
rpcTestUtils.streamToArray<Uint8Array>();
const streamPair: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
meta: undefined,
readable: inputStream,
writable: outputStream,
};
const ctxProm = promise<ContextTimed>();
const rpcClient = new RPCClient({
manifest: {},
streamFactory: async (ctx) => {
ctxProm.resolveP(ctx);
return streamPair;
},
logger,
idGen,
// Setting timeout here to 150ms
streamKeepAliveTimeoutTime: 150,
});

const callerInterface = await rpcClient.duplexStreamCaller<
JSONValue,
JSONValue
>(methodName, { timer: 100 });

const ctx = await ctxProm.p;
expect(ctx.timer.delay).toEqual(100);
},
{ numRuns: 5 },
);
testProp(
'handler overrides client timeout - greater value',
[specificMessageArb],
async (messages) => {
const inputStream = rpcTestUtils.messagesToReadableStream(messages);
const [outputResult, outputStream] =
rpcTestUtils.streamToArray<Uint8Array>();
const streamPair: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
meta: undefined,
readable: inputStream,
writable: outputStream,
};
const ctxProm = promise<ContextTimed>();
const rpcClient = new RPCClient({
manifest: {},
streamFactory: async (ctx) => {
ctxProm.resolveP(ctx);
return streamPair;
},
logger,
idGen,
// Setting timeout here to 150ms
streamKeepAliveTimeoutTime: 150,
});

const callerInterface = await rpcClient.duplexStreamCaller<
JSONValue,
JSONValue
>(methodName, { timer: 300 });

const ctx = await ctxProm.p;
expect(ctx.timer.delay).toEqual(300);
},
{ numRuns: 5 },
);
});
});
50 changes: 33 additions & 17 deletions tests/RPCServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -942,11 +942,12 @@ describe(`${RPCServer.name}`, () => {
}
await rpcServer.stop({ force: true });
});
test('handler overrides timeout', async () => {
test('handler overrides timeout - lesser value', async () => {
{
const waitProm = promise();
const ctxShortProm = promise<ContextTimed>();
class TestMethodShortTimeout extends UnaryHandler {
// This will override the default timeout from 50 to 25, thereby decreasing it.
timeout = 25;
public handle = async (
input: JSONValue,
Expand All @@ -959,20 +960,6 @@ describe(`${RPCServer.name}`, () => {
return input;
};
}
const ctxLongProm = promise<ContextTimed>();
class TestMethodLongTimeout extends UnaryHandler {
timeout = 100;
public handle = async (
input: JSONValue,
_cancel,
_meta,
ctx_,
): Promise<JSONValue> => {
ctxLongProm.resolveP(ctx_);
await waitProm.p;
return input;
};
}
const rpcServer = new RPCServer({
handlerTimeoutTime: 50,
logger,
Expand All @@ -981,7 +968,6 @@ describe(`${RPCServer.name}`, () => {
await rpcServer.start({
manifest: {
testShort: new TestMethodShortTimeout({}),
testLong: new TestMethodLongTimeout({}),
},
});
const streamShort = rpcTestUtils.messagesToReadableStream([
Expand All @@ -1000,6 +986,36 @@ describe(`${RPCServer.name}`, () => {
// Shorter timeout is updated
const ctxShort = await ctxShortProm.p;
expect(ctxShort.timer.delay).toEqual(25);
}
});
test('handler overrides timeout - greater value', async () => {
{
const waitProm = promise();
const ctxLongProm = promise<ContextTimed>();
class TestMethodLongTimeout extends UnaryHandler {
// This will override the default timeout from 50 to 250, thereby increasing it.
timeout = 250;
public handle = async (
input: JSONValue,
_cancel,
_meta,
ctx_,
): Promise<JSONValue> => {
ctxLongProm.resolveP(ctx_);
await waitProm.p;
return input;
};
}
const rpcServer = new RPCServer({
handlerTimeoutTime: 50,
logger,
idGen,
});
await rpcServer.start({
manifest: {
testLong: new TestMethodLongTimeout({}),
},
});
const streamLong = rpcTestUtils.messagesToReadableStream([
{
jsonrpc: '2.0',
Expand All @@ -1016,7 +1032,7 @@ describe(`${RPCServer.name}`, () => {

// Longer timeout is set to server's default
const ctxLong = await ctxLongProm.p;
expect(ctxLong.timer.delay).toEqual(50);
expect(ctxLong.timer.delay).toEqual(250);
waitProm.resolveP();
await rpcServer.stop({ force: true });
}
Expand Down

0 comments on commit b1c1608

Please sign in to comment.