Skip to content

Commit

Permalink
feat: negative timeoutTime paramaters will now throw errors
Browse files Browse the repository at this point in the history
chore: add tests to test negative timeout values.

fix: 0 `timeoutTime` value no longer throws in `RPCClient`

fix: `RPCServer.start` now throws when passed a handler with a negative `timeout`

[ci-skip]
  • Loading branch information
addievo authored and amydevs committed Oct 30, 2023
1 parent 669c00b commit 1ebf53c
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 63 deletions.
3 changes: 3 additions & 0 deletions src/RPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ class RPCClient<M extends ClientManifest> {
idGen?: IdGen;
toError?: ToError;
}) {
if (timeoutTime < 0) {
throw new errors.ErrorRPCInvalidTimeout();
}
this.idGen = idGen;
this.callerTypes = utils.getHandlerTypes(manifest);
this.streamFactory = streamFactory;
Expand Down
115 changes: 64 additions & 51 deletions src/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ class RPCServer {
fromError?: FromError;
replacer?: (key: string, value: any) => any;
}) {
if (timeoutTime < 0) {
throw new errors.ErrorRPCInvalidTimeout();
}
this.idGen = idGen;
this.middlewareFactory = middlewareFactory;
this.timeoutTime = timeoutTime;
Expand All @@ -126,58 +129,68 @@ class RPCServer {
manifest: ServerManifest;
}): Promise<void> {
this.logger.info(`Start ${this.constructor.name}`);
for (const [key, manifestItem] of Object.entries(manifest)) {
if (manifestItem instanceof RawHandler) {
this.registerRawStreamHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof DuplexHandler) {
this.registerDuplexStreamHandler(
key,
// Bind the `this` to the generator handler to make the container available
manifestItem.handle.bind(manifestItem),
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof ServerHandler) {
this.registerServerStreamHandler(
key,
// Bind the `this` to the generator handler to make the container available
manifestItem.handle.bind(manifestItem),
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof ClientHandler) {
this.registerClientStreamHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof ClientHandler) {
this.registerClientStreamHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof UnaryHandler) {
this.registerUnaryHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
try {
for (const [key, manifestItem] of Object.entries(manifest)) {
if (manifestItem.timeout != null && manifestItem.timeout < 0) {
throw new errors.ErrorRPCInvalidHandlerTimeout();
}
if (manifestItem instanceof RawHandler) {
this.registerRawStreamHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof DuplexHandler) {
this.registerDuplexStreamHandler(
key,
// Bind the `this` to the generator handler to make the container available
manifestItem.handle.bind(manifestItem),
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof ServerHandler) {
this.registerServerStreamHandler(
key,
// Bind the `this` to the generator handler to make the container available
manifestItem.handle.bind(manifestItem),
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof ClientHandler) {
this.registerClientStreamHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof ClientHandler) {
this.registerClientStreamHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
if (manifestItem instanceof UnaryHandler) {
this.registerUnaryHandler(
key,
manifestItem.handle,
manifestItem.timeout,
);
continue;
}
utils.never();
}
utils.never();
} catch (e) {
// No need to clean up streams, as streams can only be handled after RPCServer has been started.
this.handlerMap.clear();
this.defaultTimeoutMap.clear();
throw e;
}
this.logger.info(`Started ${this.constructor.name}`);
}
Expand Down
10 changes: 10 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ class ErrorRPCCallerFailed<T> extends ErrorRPC<T> {
static description = 'Failed to call stream';
}

class ErrorRPCInvalidTimeout<T> extends ErrorRPC<T> {
static description = 'Invalid timeout provided';
}

class ErrorRPCInvalidHandlerTimeout<T> extends ErrorRPC<T> {
static description = 'Invalid handler timeout provided';
}

abstract class ErrorRPCProtocol<T> extends ErrorRPC<T> {
static error = 'RPC Protocol Error';
code: number;
Expand Down Expand Up @@ -257,6 +265,8 @@ export {
ErrorRPCConnectionLocal,
ErrorRPCConnectionPeer,
ErrorRPCConnectionKeepAliveTimeOut,
ErrorRPCInvalidTimeout,
ErrorRPCInvalidHandlerTimeout,
ErrorRPCConnectionInternal,
ErrorMissingHeader,
ErrorHandlerAborted,
Expand Down
40 changes: 28 additions & 12 deletions tests/RPCClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,28 @@ describe(`${RPCClient.name}`, () => {
},
{ numRuns: 5 },
);
testProp(
'RPCClient constructor should throw when passed a negative timeoutTime',
[fc.integer({ max: -1 })],
async (timeoutTime) => {
const streamPair: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
meta: undefined,
readable: new ReadableStream(),
writable: new WritableStream(),
};
const constructorF = () =>
new RPCClient({
timeoutTime,
streamFactory: () => Promise.resolve(streamPair),
manifest: {},
logger,
idGen,
});

expect(constructorF).toThrowError(rpcErrors.ErrorRPCInvalidTimeout);
},
);
testProp(
'Check that ctx is provided to the middleware and that the middleware can reset the timer',
[specificMessageArb],
Expand Down Expand Up @@ -1220,12 +1242,9 @@ describe(`${RPCClient.name}`, () => {
timeoutTime: higherTimeoutTime,
});

await rpcClient.duplexStreamCaller<JSONRPCParams, JSONRPCResult>(
methodName,
{
timer: lowerTimeoutTime,
},
);
await rpcClient.duplexStreamCaller<JSONRPCParams, JSONRPCResult>(methodName, {
timer: lowerTimeoutTime,
});

const ctx = await ctxP;
expect(ctx.timer.delay).toBe(lowerTimeoutTime);
Expand Down Expand Up @@ -1255,12 +1274,9 @@ describe(`${RPCClient.name}`, () => {
timeoutTime: lowerTimeoutTime,
});

await rpcClient.duplexStreamCaller<JSONRPCParams, JSONRPCResult>(
methodName,
{
timer: higherTimeoutTime,
},
);
await rpcClient.duplexStreamCaller<JSONRPCParams, JSONRPCResult>(methodName, {
timer: higherTimeoutTime,
});

const ctx = await ctxP;
expect(ctx.timer.delay).toBe(higherTimeoutTime);
Expand Down
48 changes: 48 additions & 0 deletions tests/RPCServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,54 @@ describe(`${RPCServer.name}`, () => {
await expect(ctx.timer).toReject();
await rpcServer.stop({ force: true });
});
testProp(
'RPCServer constructor should throw when passed a negative timeoutTime',
[fc.integer({ max: -1 })],
async (timeoutTime) => {
const constructorF = () =>
new RPCServer({
timeoutTime,
logger,
idGen,
});

expect(constructorF).toThrowError(rpcErrors.ErrorRPCInvalidTimeout);
},
);
testProp(
'RPCServer.start should throw when passed a handler with negative timeout',
[fc.integer({ max: -1 })],
async (timeoutTime) => {
const waitProm = promise();
const ctxLongProm = promise<ContextTimed>();

class TestMethodArbitraryTimeout extends UnaryHandler {
timeout = timeoutTime;
public handle = async (
input: JSONRPCParams,
_cancel,
_meta,
ctx_,
): Promise<JSONRPCResult> => {
ctxLongProm.resolveP(ctx_);
await waitProm.p;
return input;
};
}
const rpcServer = new RPCServer({
logger,
idGen,
});

await expect(
rpcServer.start({
manifest: {
testArbitrary: new TestMethodArbitraryTimeout({}),
},
}),
).rejects.toBeInstanceOf(rpcErrors.ErrorRPCInvalidHandlerTimeout);
},
);
testProp(
'middleware can update timeout timer',
[specificMessageArb],
Expand Down

0 comments on commit 1ebf53c

Please sign in to comment.