Skip to content

Commit

Permalink
wip: first blush raw handlers
Browse files Browse the repository at this point in the history
Looks like it works but the first message is skipping the middleware right now.

- Related #500

[ci skip]
  • Loading branch information
tegefaulkes committed Feb 2, 2023
1 parent 50796d8 commit f184f59
Show file tree
Hide file tree
Showing 5 changed files with 304 additions and 176 deletions.
280 changes: 145 additions & 135 deletions src/RPC/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type {
JsonRpcResponse,
JsonRpcResponseError,
JsonRpcResponseResult,
RawDuplexStreamHandler,
ServerStreamHandler,
UnaryHandler,
} from './types';
Expand Down Expand Up @@ -44,8 +45,7 @@ class RPCServer {
// Properties
protected container: POJO;
protected logger: Logger;
protected handlerMap: Map<string, DuplexStreamHandler<JSONValue, JSONValue>> =
new Map();
protected handlerMap: Map<string, RawDuplexStreamHandler> = new Map();
protected activeStreams: Set<PromiseCancellable<void>> = new Set();
protected events: EventTarget = new EventTarget();

Expand All @@ -72,12 +72,115 @@ class RPCServer {
this.logger.info(`Destroyed ${this.constructor.name}`);
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public registerRawStreamHandler(
method: string,
handler: RawDuplexStreamHandler,
) {
this.handlerMap.set(method, handler);
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public registerDuplexStreamHandler<I extends JSONValue, O extends JSONValue>(
method: string,
handler: DuplexStreamHandler<I, O>,
) {
this.handlerMap.set(method, handler);
// This needs to handle all the message parsing and conversion from
// generators to the raw streams.

const rawSteamHandler: RawDuplexStreamHandler = (
[input, header],
container,
connectionInfo,
ctx,
) => {
// Middleware
const outputTransformStream = new rpcUtils.JsonMessageToJsonStream();
const outputReadableSteam = outputTransformStream.readable;
let forwardStream = input.pipeThrough(
new rpcUtils.JsonToJsonMessageStream(rpcUtils.parseJsonRpcRequest),
);
let reverseStream = outputTransformStream.writable;
for (const middlewareFactory of this.middleware) {
const middleware = middlewareFactory();
forwardStream = forwardStream.pipeThrough(middleware.forward);
void middleware.reverse.readable.pipeTo(reverseStream).catch(() => {});
reverseStream = middleware.reverse.writable;
}
const events = this.events;
const outputGen = async function* (): AsyncGenerator<
JsonRpcResponse<JSONValue>
> {
if (ctx.signal.aborted) throw ctx.signal.reason;
const dataGen = async function* () {
yield header.params as I;
for await (const data of forwardStream) {
yield data.params as I;
}
};
for await (const response of handler(
dataGen(),
container,
connectionInfo,
ctx,
)) {
const responseMessage: JsonRpcResponseResult<JSONValue> = {
jsonrpc: '2.0',
result: response,
id: null,
};
yield responseMessage;
}
};
const outputGenerator = outputGen();
const reverseMiddlewareStream = new ReadableStream<
JsonRpcResponse<JSONValue>
>({
pull: async (controller) => {
try {
const { value, done } = await outputGenerator.next();
if (done) {
controller.close();
return;
}
controller.enqueue(value);
} catch (e) {
if (rpcUtils.isReturnableError(e)) {
// We want to convert this error to an error message and pass it along
const rpcError: JsonRpcError = {
code: e.exitCode ?? sysexits.UNKNOWN,
message: e.description ?? '',
data: rpcUtils.fromError(e),
};
const rpcErrorMessage: JsonRpcResponseError = {
jsonrpc: '2.0',
error: rpcError,
id: null,
};
controller.enqueue(rpcErrorMessage);
} else {
// These errors are emitted to the event system
events.dispatchEvent(
new rpcUtils.RPCErrorEvent({
detail: {
error: e,
},
}),
);
}
controller.close();
}
},
cancel: async (reason) => {
await outputGenerator.throw(reason);
},
});
void reverseMiddlewareStream.pipeTo(reverseStream).catch(() => {});

return outputReadableSteam;
};

this.registerRawStreamHandler(method, rawSteamHandler);
}

@ready(new rpcErrors.ErrorRpcDestroyed())
Expand All @@ -96,7 +199,7 @@ class RPCServer {
break;
}
};
this.handlerMap.set(method, wrapperDuplex);
this.registerDuplexStreamHandler(method, wrapperDuplex);
}

@ready(new rpcErrors.ErrorRpcDestroyed())
Expand All @@ -115,7 +218,7 @@ class RPCServer {
break;
}
};
this.handlerMap.set(method, wrapperDuplex);
this.registerDuplexStreamHandler(method, wrapperDuplex);
}

@ready(new rpcErrors.ErrorRpcDestroyed())
Expand All @@ -131,7 +234,7 @@ class RPCServer {
) {
yield handler(input, container, connectionInfo, ctx);
};
this.handlerMap.set(method, wrapperDuplex);
this.registerDuplexStreamHandler(method, wrapperDuplex);
}

@ready(new rpcErrors.ErrorRpcDestroyed())
Expand All @@ -142,143 +245,50 @@ class RPCServer {
// This will take a buffer stream of json messages and set up service
// handling for it.
// Constructing the PromiseCancellable for tracking the active stream
let resolve: (value: void | PromiseLike<void>) => void;
const abortController = new AbortController();
const handlerProm: PromiseCancellable<void> = new PromiseCancellable(
(resolve_) => {
resolve = resolve_;
(resolve, reject, signal) => {
const prom = (async () => {
const { firstMessageProm, headTransformStream } =
rpcUtils.extractFirstMessageTransform(rpcUtils.parseJsonRpcRequest);
const inputStreamEndProm = streamPair.readable.pipeTo(
headTransformStream.writable,
);
const inputStream = headTransformStream.readable;
// Read a single empty value to consume the first message
const reader = inputStream.getReader();
await reader.read();
reader.releaseLock();
const leadingMetadataMessage = await firstMessageProm;
// If the stream ends early then we just stop processing
if (leadingMetadataMessage == null) {
await streamPair.writable.close();
return;
}
const method = leadingMetadataMessage.method;
const handler = this.handlerMap.get(method);
if (handler == null) {
return;
}
if (signal.aborted) return;
const outputStream = handler(
[inputStream, leadingMetadataMessage],
this.container,
connectionInfo,
{ signal },
);
await Promise.allSettled([
inputStreamEndProm,
outputStream.pipeTo(streamPair.writable),
]);
})();
prom.then(resolve, reject);
},
abortController,
);
// Putting the PromiseCancellable into the active streams map
this.activeStreams.add(handlerProm);
void handlerProm
.finally(() => this.activeStreams.delete(handlerProm))
.catch(() => {});
// Setting up middleware
let forwardStream = streamPair.readable.pipeThrough(
new rpcUtils.JsonToJsonMessageStream(rpcUtils.parseJsonRpcRequest),
);
const outputTransformStream = new rpcUtils.JsonMessageToJsonStream();
void outputTransformStream.readable
.pipeTo(streamPair.writable)
.catch(() => {});
let reverseStream = outputTransformStream.writable;
for (const middlewareFactory of this.middleware) {
const middleware = middlewareFactory();
forwardStream = forwardStream.pipeThrough(middleware.forward);
void middleware.reverse.readable.pipeTo(reverseStream).catch(() => {});
reverseStream = middleware.reverse.writable;
}
// While ReadableStream can be converted to AsyncIterable, we want it as
// a generator.
const inputGen = async function* () {
for await (const dataMessage of forwardStream) {
yield dataMessage;
}
};
const container = this.container;
const handlerMap = this.handlerMap;
const ctx = { signal: abortController.signal };
const events = this.events;
const outputGen = async function* (): AsyncGenerator<
JsonRpcResponse<JSONValue>
> {
// Step 1, authentication and establishment
// read the first message, lets assume the first message is always leading
// metadata.
const input = inputGen();
if (ctx.signal.aborted) throw ctx.signal.reason;
const leadingMetadataMessage = await input.next();
// If the stream ends early then we just stop processing
if (leadingMetadataMessage.done === true) return;
const method = leadingMetadataMessage.value.method;
const initialParams = leadingMetadataMessage.value.params;
const dataGen = async function* () {
yield initialParams as JSONValue;
for await (const data of input) {
yield data.params as JSONValue;
}
};
const handler = handlerMap.get(method);
if (handler == null) {
// Failed to find handler, this is an error. We should respond with
// an error message.
throw new rpcErrors.ErrorRpcHandlerMissing(
`No handler registered for method: ${method}`,
);
}
if (ctx.signal.aborted) throw ctx.signal.reason;
for await (const response of handler(
dataGen(),
container,
connectionInfo,
ctx,
)) {
const responseMessage: JsonRpcResponseResult<JSONValue> = {
jsonrpc: '2.0',
result: response,
id: null,
};
yield responseMessage;
}
};

const outputGenerator = outputGen();

const reverseMiddlewareStream = new ReadableStream<
JsonRpcResponse<JSONValue>
>({
pull: async (controller) => {
try {
const { value, done } = await outputGenerator.next();
if (done) {
try {
controller.close();
} catch {
// Ignore already closed error
}
resolve();
return;
}
controller.enqueue(value);
} catch (e) {
if (rpcUtils.isReturnableError(e)) {
// We want to convert this error to an error message and pass it along
const rpcError: JsonRpcError = {
code: e.exitCode ?? sysexits.UNKNOWN,
message: e.description ?? '',
data: rpcUtils.fromError(e),
};
const rpcErrorMessage: JsonRpcResponseError = {
jsonrpc: '2.0',
error: rpcError,
id: null,
};
controller.enqueue(rpcErrorMessage);
} else {
// These errors are emitted to the event system
events.dispatchEvent(
new rpcUtils.RPCErrorEvent({
detail: {
error: e,
},
}),
);
}
try {
controller.close();
} catch {
// Ignore already closed error
}
resolve();
}
},
cancel: async (reason) => {
await outputGenerator.throw(reason);
},
});
void reverseMiddlewareStream.pipeTo(reverseStream).catch(() => {});
}

@ready(new rpcErrors.ErrorRpcDestroyed())
Expand Down
7 changes: 6 additions & 1 deletion src/RPC/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { JSONValue, POJO } from '../types';
import type { ConnectionInfo } from '../network/types';
import type { ContextCancellable } from '../contexts/types';
import type { ReadableWritablePair } from 'stream/web';
import type { ReadableStream, ReadableWritablePair } from 'stream/web';

/**
* This is the JSON RPC request object. this is the generic message type used for the RPC.
Expand Down Expand Up @@ -106,6 +106,10 @@ type Handler<I, O> = (
connectionInfo: ConnectionInfo,
ctx: ContextCancellable,
) => O;
type RawDuplexStreamHandler = Handler<
[ReadableStream<Uint8Array>, JsonRpcRequest<JSONValue>],
ReadableStream<Uint8Array>
>;
type DuplexStreamHandler<I extends JSONValue, O extends JSONValue> = Handler<
AsyncGenerator<I>,
AsyncGenerator<O>
Expand Down Expand Up @@ -141,6 +145,7 @@ export type {
JsonRpcRequest,
JsonRpcResponse,
JsonRpcMessage,
RawDuplexStreamHandler,
DuplexStreamHandler,
ServerStreamHandler,
ClientStreamHandler,
Expand Down
Loading

0 comments on commit f184f59

Please sign in to comment.