From c42adc25db04d38e9d300a56b9f91ba9f46601f3 Mon Sep 17 00:00:00 2001 From: Tobias Ortmayr Date: Wed, 7 Jun 2023 15:38:00 +0200 Subject: [PATCH] core: improve rpc protocol (#12581) - Ensure that pending requests are properly rejected when the underlying service channel is closed. - Make id-property from `NotificationMessage`s. optional. Ids in notification are not strictly required and can be safely omitted. - Rename `RpcConnectionFactory` to `RpcProtocolFactory` - Use a deferred in the `RpcProxyFactory` for protocol initialization Part of #12581 --- .../common/message-rpc/rpc-message-encoder.ts | 8 +++--- .../src/common/message-rpc/rpc-protocol.ts | 12 +++++--- .../src/common/messaging/proxy-factory.ts | 28 +++++++++---------- 3 files changed, 25 insertions(+), 23 deletions(-) diff --git a/packages/core/src/common/message-rpc/rpc-message-encoder.ts b/packages/core/src/common/message-rpc/rpc-message-encoder.ts index b6886d908f002..dc68a34653e80 100644 --- a/packages/core/src/common/message-rpc/rpc-message-encoder.ts +++ b/packages/core/src/common/message-rpc/rpc-message-encoder.ts @@ -51,7 +51,7 @@ export interface RequestMessage { export interface NotificationMessage { type: RpcMessageType.Notification; - id: number; + id?: number; method: string; args: any[]; } @@ -111,7 +111,7 @@ export interface RpcMessageDecoder { export interface RpcMessageEncoder { cancel(buf: WriteBuffer, requestId: number): void; - notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void + notification(buf: WriteBuffer, method: string, args: any[], id?: number): void request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void @@ -130,8 +130,8 @@ export class MsgPackMessageEncoder implements RpcMessageEncoder { cancel(buf: WriteBuffer, requestId: number): void { this.encode(buf, { type: RpcMessageType.Cancel, id: requestId }); } - notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { - this.encode(buf, { type: RpcMessageType.Notification, id: requestId, method, args }); + notification(buf: WriteBuffer, method: string, args: any[], id?: number): void { + this.encode(buf, { type: RpcMessageType.Notification, method, args, id }); } request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { this.encode(buf, { type: RpcMessageType.Request, id: requestId, method, args }); diff --git a/packages/core/src/common/message-rpc/rpc-protocol.ts b/packages/core/src/common/message-rpc/rpc-protocol.ts index 4dbf422e0b877..4bcc08043f80f 100644 --- a/packages/core/src/common/message-rpc/rpc-protocol.ts +++ b/packages/core/src/common/message-rpc/rpc-protocol.ts @@ -77,7 +77,11 @@ export class RpcProtocol { this.encoder = options.encoder ?? new MsgPackMessageEncoder(); this.decoder = options.decoder ?? new MsgPackMessageDecoder(); this.toDispose.push(this.onNotificationEmitter); - channel.onClose(() => this.toDispose.dispose()); + channel.onClose(event => { + this.pendingRequests.forEach(pending => pending.reject(new Error(event.reason))); + this.pendingRequests.clear(); + this.toDispose.dispose(); + }); this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer())))); this.mode = options.mode ?? 'default'; @@ -98,7 +102,7 @@ export class RpcProtocol { return; } case RpcMessageType.Notification: { - this.handleNotify(message.id, message.method, message.args); + this.handleNotify(message.method, message.args, message.id); return; } } @@ -179,7 +183,7 @@ export class RpcProtocol { } const output = this.channel.getWriteBuffer(); - this.encoder.notification(output, this.nextMessageId++, method, args); + this.encoder.notification(output, method, args, this.nextMessageId++); output.commit(); } @@ -226,7 +230,7 @@ export class RpcProtocol { } } - protected async handleNotify(id: number, method: string, args: any[]): Promise { + protected async handleNotify(method: string, args: any[], id?: number): Promise { if (this.toDispose.disposed) { return; } diff --git a/packages/core/src/common/messaging/proxy-factory.ts b/packages/core/src/common/messaging/proxy-factory.ts index a18f54305af96..7adb97f4b033a 100644 --- a/packages/core/src/common/messaging/proxy-factory.ts +++ b/packages/core/src/common/messaging/proxy-factory.ts @@ -23,6 +23,7 @@ import { Emitter, Event } from '../event'; import { Channel } from '../message-rpc/channel'; import { RequestHandler, RpcProtocol } from '../message-rpc/rpc-protocol'; import { ConnectionHandler } from './handler'; +import { Deferred } from '../promise-util'; export type JsonRpcServer = Disposable & { /** @@ -55,11 +56,11 @@ export class JsonRpcConnectionHandler implements ConnectionHan } } /** - * Factory for creating a new {@link RpcConnection} for a given chanel and {@link RequestHandler}. + * Factory for creating a new {@link RpcProtocol} for a given chanel and {@link RequestHandler}. */ -export type RpcConnectionFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol; +export type RpcProtocolFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol; -const defaultRPCConnectionFactory: RpcConnectionFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler); +const defaultRpcProtocolFactory: RpcProtocolFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler); /** * Factory for JSON-RPC proxy objects. @@ -109,8 +110,7 @@ export class JsonRpcProxyFactory implements ProxyHandler { protected readonly onDidOpenConnectionEmitter = new Emitter(); protected readonly onDidCloseConnectionEmitter = new Emitter(); - protected connectionPromiseResolve: (connection: RpcProtocol) => void; - protected connectionPromise: Promise; + protected rpcDeferred: Deferred; /** * Build a new JsonRpcProxyFactory. @@ -118,16 +118,14 @@ export class JsonRpcProxyFactory implements ProxyHandler { * @param target - The object to expose to JSON-RPC methods calls. If this * is omitted, the proxy won't be able to handle requests, only send them. */ - constructor(public target?: any, protected rpcConnectionFactory = defaultRPCConnectionFactory) { + constructor(public target?: any, protected rpcProtocolFactory = defaultRpcProtocolFactory) { this.waitForConnection(); } protected waitForConnection(): void { - this.connectionPromise = new Promise(resolve => - this.connectionPromiseResolve = resolve - ); - this.connectionPromise.then(connection => { - connection.channel.onClose(() => { + this.rpcDeferred = new Deferred(); + this.rpcDeferred.promise.then(protocol => { + protocol.channel.onClose(() => { this.onDidCloseConnectionEmitter.fire(undefined); // Wait for connection in case the backend reconnects this.waitForConnection(); @@ -143,10 +141,10 @@ export class JsonRpcProxyFactory implements ProxyHandler { * response. */ listen(channel: Channel): void { - const connection = this.rpcConnectionFactory(channel, (meth, args) => this.onRequest(meth, ...args)); - connection.onNotification(event => this.onNotification(event.method, ...event.args)); + const protocol = this.rpcProtocolFactory(channel, (meth, args) => this.onRequest(meth, ...args)); + protocol.onNotification(event => this.onNotification(event.method, ...event.args)); - this.connectionPromiseResolve(connection); + this.rpcDeferred.resolve(protocol); } /** @@ -249,7 +247,7 @@ export class JsonRpcProxyFactory implements ProxyHandler { return (...args: any[]) => { const method = p.toString(); const capturedError = new Error(`Request '${method}' failed`); - return this.connectionPromise.then(connection => + return this.rpcDeferred.promise.then(connection => new Promise((resolve, reject) => { try { if (isNotify) {