Skip to content

Commit

Permalink
core: improve rpc protocol (eclipse-theia#12581)
Browse files Browse the repository at this point in the history
- Ensure that pending requests are properly rejected when the underlying service channel is closed.
- Make  id-property from `NotificationMessage`s.  optional. Ids in notification are not strictly required and can be safely omitted.
- Rename `RpcConnectionFactory` to `RpcProtocolFactory` 
- Use a deferred in the `RpcProxyFactory` for protocol initialization

Part of eclipse-theia#12581
  • Loading branch information
tortmayr authored Jun 7, 2023
1 parent 7c47bee commit c42adc2
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 23 deletions.
8 changes: 4 additions & 4 deletions packages/core/src/common/message-rpc/rpc-message-encoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export interface RequestMessage {

export interface NotificationMessage {
type: RpcMessageType.Notification;
id: number;
id?: number;
method: string;
args: any[];
}
Expand Down Expand Up @@ -111,7 +111,7 @@ export interface RpcMessageDecoder {
export interface RpcMessageEncoder {
cancel(buf: WriteBuffer, requestId: number): void;

notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void
notification(buf: WriteBuffer, method: string, args: any[], id?: number): void

request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void

Expand All @@ -130,8 +130,8 @@ export class MsgPackMessageEncoder implements RpcMessageEncoder {
cancel(buf: WriteBuffer, requestId: number): void {
this.encode<CancelMessage>(buf, { type: RpcMessageType.Cancel, id: requestId });
}
notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void {
this.encode<NotificationMessage>(buf, { type: RpcMessageType.Notification, id: requestId, method, args });
notification(buf: WriteBuffer, method: string, args: any[], id?: number): void {
this.encode<NotificationMessage>(buf, { type: RpcMessageType.Notification, method, args, id });
}
request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void {
this.encode<RequestMessage>(buf, { type: RpcMessageType.Request, id: requestId, method, args });
Expand Down
12 changes: 8 additions & 4 deletions packages/core/src/common/message-rpc/rpc-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ export class RpcProtocol {
this.encoder = options.encoder ?? new MsgPackMessageEncoder();
this.decoder = options.decoder ?? new MsgPackMessageDecoder();
this.toDispose.push(this.onNotificationEmitter);
channel.onClose(() => this.toDispose.dispose());
channel.onClose(event => {
this.pendingRequests.forEach(pending => pending.reject(new Error(event.reason)));
this.pendingRequests.clear();
this.toDispose.dispose();
});
this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer()))));
this.mode = options.mode ?? 'default';

Expand All @@ -98,7 +102,7 @@ export class RpcProtocol {
return;
}
case RpcMessageType.Notification: {
this.handleNotify(message.id, message.method, message.args);
this.handleNotify(message.method, message.args, message.id);
return;
}
}
Expand Down Expand Up @@ -179,7 +183,7 @@ export class RpcProtocol {
}

const output = this.channel.getWriteBuffer();
this.encoder.notification(output, this.nextMessageId++, method, args);
this.encoder.notification(output, method, args, this.nextMessageId++);
output.commit();
}

Expand Down Expand Up @@ -226,7 +230,7 @@ export class RpcProtocol {
}
}

protected async handleNotify(id: number, method: string, args: any[]): Promise<void> {
protected async handleNotify(method: string, args: any[], id?: number): Promise<void> {
if (this.toDispose.disposed) {
return;
}
Expand Down
28 changes: 13 additions & 15 deletions packages/core/src/common/messaging/proxy-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { Emitter, Event } from '../event';
import { Channel } from '../message-rpc/channel';
import { RequestHandler, RpcProtocol } from '../message-rpc/rpc-protocol';
import { ConnectionHandler } from './handler';
import { Deferred } from '../promise-util';

export type JsonRpcServer<Client> = Disposable & {
/**
Expand Down Expand Up @@ -55,11 +56,11 @@ export class JsonRpcConnectionHandler<T extends object> implements ConnectionHan
}
}
/**
* Factory for creating a new {@link RpcConnection} for a given chanel and {@link RequestHandler}.
* Factory for creating a new {@link RpcProtocol} for a given chanel and {@link RequestHandler}.
*/
export type RpcConnectionFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol;
export type RpcProtocolFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol;

const defaultRPCConnectionFactory: RpcConnectionFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler);
const defaultRpcProtocolFactory: RpcProtocolFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler);

/**
* Factory for JSON-RPC proxy objects.
Expand Down Expand Up @@ -109,25 +110,22 @@ export class JsonRpcProxyFactory<T extends object> implements ProxyHandler<T> {
protected readonly onDidOpenConnectionEmitter = new Emitter<void>();
protected readonly onDidCloseConnectionEmitter = new Emitter<void>();

protected connectionPromiseResolve: (connection: RpcProtocol) => void;
protected connectionPromise: Promise<RpcProtocol>;
protected rpcDeferred: Deferred<RpcProtocol>;

/**
* Build a new JsonRpcProxyFactory.
*
* @param target - The object to expose to JSON-RPC methods calls. If this
* is omitted, the proxy won't be able to handle requests, only send them.
*/
constructor(public target?: any, protected rpcConnectionFactory = defaultRPCConnectionFactory) {
constructor(public target?: any, protected rpcProtocolFactory = defaultRpcProtocolFactory) {
this.waitForConnection();
}

protected waitForConnection(): void {
this.connectionPromise = new Promise(resolve =>
this.connectionPromiseResolve = resolve
);
this.connectionPromise.then(connection => {
connection.channel.onClose(() => {
this.rpcDeferred = new Deferred<RpcProtocol>();
this.rpcDeferred.promise.then(protocol => {
protocol.channel.onClose(() => {
this.onDidCloseConnectionEmitter.fire(undefined);
// Wait for connection in case the backend reconnects
this.waitForConnection();
Expand All @@ -143,10 +141,10 @@ export class JsonRpcProxyFactory<T extends object> implements ProxyHandler<T> {
* response.
*/
listen(channel: Channel): void {
const connection = this.rpcConnectionFactory(channel, (meth, args) => this.onRequest(meth, ...args));
connection.onNotification(event => this.onNotification(event.method, ...event.args));
const protocol = this.rpcProtocolFactory(channel, (meth, args) => this.onRequest(meth, ...args));
protocol.onNotification(event => this.onNotification(event.method, ...event.args));

this.connectionPromiseResolve(connection);
this.rpcDeferred.resolve(protocol);
}

/**
Expand Down Expand Up @@ -249,7 +247,7 @@ export class JsonRpcProxyFactory<T extends object> implements ProxyHandler<T> {
return (...args: any[]) => {
const method = p.toString();
const capturedError = new Error(`Request '${method}' failed`);
return this.connectionPromise.then(connection =>
return this.rpcDeferred.promise.then(connection =>
new Promise<void>((resolve, reject) => {
try {
if (isNotify) {
Expand Down

0 comments on commit c42adc2

Please sign in to comment.