Skip to content

Commit

Permalink
fix: fixes #7
Browse files Browse the repository at this point in the history
* Removes graceTimer and related jests
  • Loading branch information
addievo committed Sep 18, 2023
1 parent cda7b02 commit 6f11b4e
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 124 deletions.
23 changes: 3 additions & 20 deletions src/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ class RPCServer extends EventTarget {
middlewareFactory = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(),
sensitive = false,
handlerTimeoutTime = 60_000, // 1 minute
handlerTimeoutGraceTime = 2_000, // 2 seconds
logger = new Logger(this.name),
}: {
manifest: ServerManifest;
Expand All @@ -97,7 +96,6 @@ class RPCServer extends EventTarget {
>;
sensitive?: boolean;
handlerTimeoutTime?: number;
handlerTimeoutGraceTime?: number;
logger?: Logger;
}): Promise<RPCServer> {
logger.info(`Creating ${this.name}`);
Expand All @@ -106,7 +104,6 @@ class RPCServer extends EventTarget {
middlewareFactory,
sensitive,
handlerTimeoutTime,
handlerTimeoutGraceTime,
logger,
});
logger.info(`Created ${this.name}`);
Expand All @@ -116,7 +113,6 @@ class RPCServer extends EventTarget {
protected handlerMap: Map<string, RawHandlerImplementation> = new Map();
protected defaultTimeoutMap: Map<string, number | undefined> = new Map();
protected handlerTimeoutTime: number;
protected handlerTimeoutGraceTime: number;
protected activeStreams: Set<PromiseCancellable<void>> = new Set();
protected sensitive: boolean;
protected middlewareFactory: MiddlewareFactory<
Expand All @@ -131,7 +127,6 @@ class RPCServer extends EventTarget {
middlewareFactory,
sensitive,
handlerTimeoutTime = 60_000, // 1 minuet
handlerTimeoutGraceTime = 2_000, // 2 seconds
logger,
}: {
manifest: ServerManifest;
Expand All @@ -143,7 +138,6 @@ class RPCServer extends EventTarget {
JSONRPCResponseResult
>;
handlerTimeoutTime?: number;
handlerTimeoutGraceTime?: number;
sensitive: boolean;
logger: Logger;
}) {
Expand Down Expand Up @@ -202,7 +196,6 @@ class RPCServer extends EventTarget {
this.middlewareFactory = middlewareFactory;
this.sensitive = sensitive;
this.handlerTimeoutTime = handlerTimeoutTime;
this.handlerTimeoutGraceTime = handlerTimeoutGraceTime;
this.logger = logger;
}

Expand Down Expand Up @@ -450,18 +443,15 @@ class RPCServer extends EventTarget {
},
});

// Grace timer is triggered with any abort signal.
// If grace timer completes then it will cause the RPCStream to end with
// `RPCStream.cancel(reason)`.
let graceTimer: Timer<void> | undefined;
const handleAbort = () => {
const graceTimer = new Timer({
delay: this.handlerTimeoutGraceTime,
const timer = new Timer({
delay: this.handlerTimeoutTime,
handler: () => {
rpcStream.cancel(abortController.signal.reason);
},
});
void graceTimer
void timer
.catch(() => {}) // Ignore cancellation error
.finally(() => {
abortController.signal.removeEventListener('abort', handleAbort);
Expand Down Expand Up @@ -496,9 +486,7 @@ class RPCServer extends EventTarget {
await rpcStream.writable.abort(reason);
await inputStreamEndProm;
timer.cancel(cleanupReason);
graceTimer?.cancel(cleanupReason);
await timer.catch(() => {});
await graceTimer?.catch(() => {});
};
// Read a single empty value to consume the first message
const reader = headTransformStream.readable.getReader();
Expand All @@ -522,9 +510,7 @@ class RPCServer extends EventTarget {
);
await inputStreamEndProm;
timer.cancel(cleanupReason);
graceTimer?.cancel(cleanupReason);
await timer.catch(() => {});
await graceTimer?.catch(() => {});
this.dispatchEvent(
new rpcEvents.RPCErrorEvent({
detail: new rpcErrors.ErrorRPCOutputStreamError(
Expand Down Expand Up @@ -616,8 +602,6 @@ class RPCServer extends EventTarget {
// Clean up and return
timer.cancel(cleanupReason);
abortController.signal.removeEventListener('abort', handleAbort);
graceTimer?.cancel(cleanupReason);
abortController.abort(new rpcErrors.ErrorRPCStreamEnded());
rpcStream.cancel(Error('TMP header message was an error'));
return;
}
Expand All @@ -641,7 +625,6 @@ class RPCServer extends EventTarget {
// Cleaning up abort and timer
timer.cancel(cleanupReason);
abortController.signal.removeEventListener('abort', handleAbort);
graceTimer?.cancel(cleanupReason);
abortController.abort(new rpcErrors.ErrorRPCStreamEnded());
})();
const handlerProm = PromiseCancellable.from(prom, abortController).finally(
Expand Down
104 changes: 0 additions & 104 deletions tests/RPCServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1084,58 +1084,6 @@ describe(`${RPCServer.name}`, () => {
await expect(ctx.timer).toReject();
await rpcServer.destroy();
});
test('Timeout has a grace period before forcing the streams closed', async () => {
const ctxProm = promise<ContextTimed>();
class TestHandler extends RawHandler<ContainerType> {
public handle = async (
input: [JSONRPCRequest<JSONValue>, ReadableStream<Uint8Array>],
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): Promise<[JSONValue, ReadableStream<Uint8Array>]> => {
ctxProm.resolveP(ctx);

return Promise.resolve([null, new ReadableStream<Uint8Array>()]);
};
}
const rpcServer = await RPCServer.createRPCServer({
manifest: {
testMethod: new TestHandler({}),
},
handlerTimeoutTime: 50,
handlerTimeoutGraceTime: 100,
logger,
});
const [, outputStream] = rpcTestUtils.streamToArray();
const stream = rpcTestUtils.messagesToReadableStream([
{
jsonrpc: '2.0',
method: 'testMethod',
params: null,
},
{
jsonrpc: '2.0',
method: 'testMethod',
params: null,
},
]);
const cancelProm = promise<any>();
const readWriteStream: RPCStream<Uint8Array, Uint8Array> = {
cancel: (reason) => cancelProm.resolveP(reason),
readable: stream,
writable: outputStream,
};
rpcServer.handleStream(readWriteStream);
const ctx = await ctxProm.p;
await ctx.timer;
const then = Date.now();
expect(ctx.signal.reason).toBeInstanceOf(rpcErrors.ErrorRPCTimedOut);
// Should end after grace period
await expect(cancelProm.p).resolves.toBeInstanceOf(
rpcErrors.ErrorRPCTimedOut,
);
expect(Date.now() - then).toBeGreaterThanOrEqual(90);
});
testProp(
'middleware can update timeout timer',
[specificMessageArb],
Expand Down Expand Up @@ -1180,56 +1128,4 @@ describe(`${RPCServer.name}`, () => {
expect(ctx.timer.delay).toBe(12345);
},
);
test('destroying the `RPCServer` sends an abort signal and closes connection', async () => {
const ctxProm = promise<ContextTimed>();
class TestHandler extends RawHandler<ContainerType> {
public handle = async (
input: [JSONRPCRequest<JSONValue>, ReadableStream<Uint8Array>],
_cancel: (reason?: any) => void,
_meta: Record<string, JSONValue> | undefined,
ctx_: ContextTimed,
): Promise<[JSONValue, ReadableStream<Uint8Array>]> => {
return new Promise((resolve) => {
ctxProm.resolveP(ctx_);
// Echo messages
return [null, input[1]];
});
};
}
const rpcServer = await RPCServer.createRPCServer({
manifest: {
testMethod: new TestHandler({}),
},
handlerTimeoutGraceTime: 0,
logger,
});
const [, outputStream] = rpcTestUtils.streamToArray();
const message = Buffer.from(
JSON.stringify({
jsonrpc: '2.0',
method: 'testMethod',
params: null,
}),
);
const forwardStream = new TransformStream<Uint8Array, Uint8Array>();
const cancelProm = promise<any>();
const readWriteStream: RPCStream<Uint8Array, Uint8Array> = {
cancel: (reason) => cancelProm.resolveP(reason),
readable: forwardStream.readable,
writable: outputStream,
};
rpcServer.handleStream(readWriteStream);
const writer = forwardStream.writable.getWriter();
await writer.write(message);
const ctx = await ctxProm.p;
void rpcServer.destroy(true).then(
() => {},
() => {},
);
await expect(cancelProm.p).resolves.toBeInstanceOf(
rpcErrors.ErrorRPCStopping,
);
expect(ctx.signal.reason).toBeInstanceOf(rpcErrors.ErrorRPCStopping);
await writer.close();
});
});

0 comments on commit 6f11b4e

Please sign in to comment.