Skip to content

Commit

Permalink
feat: negative timeoutTime paramaters will now throw errors
Browse files Browse the repository at this point in the history
chore: add tests to test negative timeout values.

fix: 0 `timeoutTime` value no longer throws in `RPCClient`

fix: `RPCServer.start` now throws when passed a handler with a negative `timeout`

[ci-skip]
  • Loading branch information
addievo authored and amydevs committed Oct 30, 2023
1 parent 88f4254 commit 6f1a568
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 78 deletions.
3 changes: 3 additions & 0 deletions src/RPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ class RPCClient<M extends ClientManifest> {
idGen?: IdGen;
toError?: ToError;
}) {
if (timeoutTime < 0) {
throw new errors.ErrorRPCInvalidTimeout();
}
this.idGen = idGen;
this.callerTypes = utils.getHandlerTypes(manifest);
this.streamFactory = streamFactory;
Expand Down
115 changes: 64 additions & 51 deletions src/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ class RPCServer {
fromError?: FromError;
replacer?: (key: string, value: any) => any;
}) {
if (timeoutTime < 0) {
throw new errors.ErrorRPCInvalidTimeout();
}
this.idGen = idGen;
this.middlewareFactory = middlewareFactory;
this.timeoutTime = timeoutTime;
Expand All @@ -126,58 +129,68 @@ class RPCServer {
manifest: ServerManifest;
}): Promise<void> {
this.logger.info(`Start ${this.constructor.name}`);
for (const [key, manifestItem] of Object.entries(manifest)) {
if (manifestItem instanceof RawHandler) {
this.registerRawStreamHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof DuplexHandler) {
this.registerDuplexStreamHandler(
key,
// Bind the `this` to the generator handler to make the container available
manifestItem.handle.bind(manifestItem),
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof ServerHandler) {
this.registerServerStreamHandler(
key,
// Bind the `this` to the generator handler to make the container available
manifestItem.handle.bind(manifestItem),
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof ClientHandler) {
this.registerClientStreamHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof ClientHandler) {
this.registerClientStreamHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof UnaryHandler) {
this.registerUnaryHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
try {
for (const [key, manifestItem] of Object.entries(manifest)) {
if (manifestItem.timeout != null && manifestItem.timeout < 0) {
throw new errors.ErrorRPCInvalidHandlerTimeout();
}
if (manifestItem instanceof RawHandler) {
this.registerRawStreamHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof DuplexHandler) {
this.registerDuplexStreamHandler(
key,
// Bind the `this` to the generator handler to make the container available
manifestItem.handle.bind(manifestItem),
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof ServerHandler) {
this.registerServerStreamHandler(
key,
// Bind the `this` to the generator handler to make the container available
manifestItem.handle.bind(manifestItem),
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof ClientHandler) {
this.registerClientStreamHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof ClientHandler) {
this.registerClientStreamHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof UnaryHandler) {
this.registerUnaryHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
utils.never();
}
utils.never();
} catch (e) {
// No need to clean up streams, as streams can only be handled after RPCServer has been started.
this.handlerMap.clear();
this.defaultTimeoutMap.clear();
throw e;
}
this.logger.info(`Started ${this.constructor.name}`);
}
Expand Down
10 changes: 10 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ class ErrorRPCCallerFailed<T> extends ErrorRPC<T> {
static description = 'Failed to call stream';
}

class ErrorRPCInvalidTimeout<T> extends ErrorRPC<T> {
static description = 'Invalid timeout provided';
}

class ErrorRPCInvalidHandlerTimeout<T> extends ErrorRPC<T> {
static description = 'Invalid handler timeout provided';
}

abstract class ErrorRPCProtocol<T> extends ErrorRPC<T> {
static error = 'RPC Protocol Error';
code: number;
Expand Down Expand Up @@ -257,6 +265,8 @@ export {
ErrorRPCConnectionLocal,
ErrorRPCConnectionPeer,
ErrorRPCConnectionKeepAliveTimeOut,
ErrorRPCInvalidTimeout,
ErrorRPCInvalidHandlerTimeout,
ErrorRPCConnectionInternal,
ErrorMissingHeader,
ErrorHandlerAborted,
Expand Down
22 changes: 22 additions & 0 deletions tests/RPCClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,28 @@ describe(`${RPCClient.name}`, () => {
},
{ numRuns: 5 },
);
testProp(
'RPCClient constructor should throw when passed a negative timeoutTime',
[fc.integer({ max: -1 })],
async (timeoutTime) => {
const streamPair: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
meta: undefined,
readable: new ReadableStream(),
writable: new WritableStream(),
};
const constructorF = () =>
new RPCClient({
timeoutTime,
streamFactory: () => Promise.resolve(streamPair),
manifest: {},
logger,
idGen,
});

expect(constructorF).toThrowError(rpcErrors.ErrorRPCInvalidTimeout);
},
);
testProp(
'Check that ctx is provided to the middleware and that the middleware can reset the timer',
[specificMessageArb],
Expand Down
112 changes: 85 additions & 27 deletions tests/RPCServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,6 @@ describe(`${RPCServer.name}`, () => {
const waitProm = promise();
const ctxShortProm = promise<ContextTimed>();
class TestMethodShortTimeout extends UnaryHandler {
timeout = 25;
public handle = async (
input: JSONValue,
_cancel,
Expand All @@ -965,29 +964,59 @@ describe(`${RPCServer.name}`, () => {
return input;
};
}
const ctxLongProm = promise<ContextTimed>();
class TestMethodLongTimeout extends UnaryHandler {
timeout = 100;
const rpcServer = new RPCServer({
timeoutTime: Infinity,
logger,
idGen,
});
await rpcServer.start({
manifest: {
testShort: new TestMethodShortTimeout({}),
},
});
const streamShort = rpcTestUtils.messagesToReadableStream([
{
jsonrpc: '2.0',
method: 'testShort',
params: null,
},
]);
const readWriteStreamShort: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
readable: streamShort,
writable: new WritableStream(),
};
rpcServer.handleStream(readWriteStreamShort);
// Shorter timeout is updated
const ctxShort = await ctxShortProm.p;
expect(ctxShort.timer.delay).toEqual(Infinity);
}
});
test('handler overrides the server with Infinite timeout', async () => {
{
const waitProm = promise();
const ctxShortProm = promise<ContextTimed>();
class TestMethodShortTimeout extends UnaryHandler {
timeout = Infinity;
public handle = async (
input: JSONValue,
_cancel,
_meta,
ctx_,
): Promise<JSONValue> => {
ctxLongProm.resolveP(ctx_);
ctxShortProm.resolveP(ctx_);
await waitProm.p;
return input;
};
}
const rpcServer = new RPCServer({
timeoutTime: 50,
timeoutTime: 1000,
logger,
idGen,
});
await rpcServer.start({
manifest: {
testShort: new TestMethodShortTimeout({}),
testLong: new TestMethodLongTimeout({}),
},
});
const streamShort = rpcTestUtils.messagesToReadableStream([
Expand All @@ -1005,26 +1034,7 @@ describe(`${RPCServer.name}`, () => {
rpcServer.handleStream(readWriteStreamShort);
// Shorter timeout is updated
const ctxShort = await ctxShortProm.p;
expect(ctxShort.timer.delay).toEqual(25);
const streamLong = rpcTestUtils.messagesToReadableStream([
{
jsonrpc: '2.0',
method: 'testLong',
params: null,
},
]);
const readWriteStreamLong: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
readable: streamLong,
writable: new WritableStream(),
};
rpcServer.handleStream(readWriteStreamLong);

// Longer timeout is set to server's default
const ctxLong = await ctxLongProm.p;
expect(ctxLong.timer.delay).toEqual(50);
waitProm.resolveP();
await rpcServer.stop({ force: true });
expect(ctxShort.timer.delay).toEqual(Infinity);
}
});
test('duplex handler refreshes timeout when messages are sent', async () => {
Expand Down Expand Up @@ -1153,6 +1163,54 @@ describe(`${RPCServer.name}`, () => {
await expect(ctx.timer).toReject();
await rpcServer.stop({ force: true });
});
testProp(
'RPCServer constructor should throw when passed a negative timeoutTime',
[fc.integer({ max: -1 })],
async (timeoutTime) => {
const constructorF = () =>
new RPCServer({
timeoutTime,
logger,
idGen,
});

expect(constructorF).toThrowError(rpcErrors.ErrorRPCInvalidTimeout);
},
);
testProp(
'RPCServer.start should throw when passed a handler with negative timeout',
[fc.integer({ max: -1 })],
async (timeoutTime) => {
const waitProm = promise();
const ctxLongProm = promise<ContextTimed>();

class TestMethodArbitraryTimeout extends UnaryHandler {
timeout = timeoutTime;
public handle = async (
input: JSONValue,
_cancel,
_meta,
ctx_,
): Promise<JSONValue> => {
ctxLongProm.resolveP(ctx_);
await waitProm.p;
return input;
};
}
const rpcServer = new RPCServer({
logger,
idGen,
});

await expect(
rpcServer.start({
manifest: {
testArbitrary: new TestMethodArbitraryTimeout({}),
},
}),
).rejects.toBeInstanceOf(rpcErrors.ErrorRPCInvalidHandlerTimeout);
},
);
testProp(
'middleware can update timeout timer',
[specificMessageArb],
Expand Down

0 comments on commit 6f1a568

Please sign in to comment.