Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: improve & refactor rpc protocol #12581

Merged
merged 1 commit into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions packages/core/src/common/message-rpc/rpc-message-encoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export interface RequestMessage {

export interface NotificationMessage {
type: RpcMessageType.Notification;
id: number;
id?: number;
method: string;
args: any[];
}
Expand Down Expand Up @@ -111,7 +111,7 @@ export interface RpcMessageDecoder {
export interface RpcMessageEncoder {
cancel(buf: WriteBuffer, requestId: number): void;

notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void
notification(buf: WriteBuffer, method: string, args: any[], id?: number): void

request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void

Expand All @@ -130,8 +130,8 @@ export class MsgPackMessageEncoder implements RpcMessageEncoder {
cancel(buf: WriteBuffer, requestId: number): void {
this.encode<CancelMessage>(buf, { type: RpcMessageType.Cancel, id: requestId });
}
notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void {
this.encode<NotificationMessage>(buf, { type: RpcMessageType.Notification, id: requestId, method, args });
notification(buf: WriteBuffer, method: string, args: any[], id?: number): void {
this.encode<NotificationMessage>(buf, { type: RpcMessageType.Notification, method, args, id });
}
request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void {
this.encode<RequestMessage>(buf, { type: RpcMessageType.Request, id: requestId, method, args });
Expand Down
12 changes: 8 additions & 4 deletions packages/core/src/common/message-rpc/rpc-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ export class RpcProtocol {
this.encoder = options.encoder ?? new MsgPackMessageEncoder();
this.decoder = options.decoder ?? new MsgPackMessageDecoder();
this.toDispose.push(this.onNotificationEmitter);
channel.onClose(() => this.toDispose.dispose());
channel.onClose(event => {
this.pendingRequests.forEach(pending => pending.reject(new Error(event.reason)));
this.pendingRequests.clear();
this.toDispose.dispose();
});
this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer()))));
this.mode = options.mode ?? 'default';

Expand All @@ -98,7 +102,7 @@ export class RpcProtocol {
return;
}
case RpcMessageType.Notification: {
this.handleNotify(message.id, message.method, message.args);
this.handleNotify(message.method, message.args, message.id);
return;
}
}
Expand Down Expand Up @@ -179,7 +183,7 @@ export class RpcProtocol {
}

const output = this.channel.getWriteBuffer();
this.encoder.notification(output, this.nextMessageId++, method, args);
this.encoder.notification(output, method, args, this.nextMessageId++);
output.commit();
}

Expand Down Expand Up @@ -226,7 +230,7 @@ export class RpcProtocol {
}
}

protected async handleNotify(id: number, method: string, args: any[]): Promise<void> {
protected async handleNotify(method: string, args: any[], id?: number): Promise<void> {
if (this.toDispose.disposed) {
return;
}
Expand Down
28 changes: 13 additions & 15 deletions packages/core/src/common/messaging/proxy-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { Emitter, Event } from '../event';
import { Channel } from '../message-rpc/channel';
import { RequestHandler, RpcProtocol } from '../message-rpc/rpc-protocol';
import { ConnectionHandler } from './handler';
import { Deferred } from '../promise-util';

export type JsonRpcServer<Client> = Disposable & {
/**
Expand Down Expand Up @@ -55,11 +56,11 @@ export class JsonRpcConnectionHandler<T extends object> implements ConnectionHan
}
}
/**
* Factory for creating a new {@link RpcConnection} for a given chanel and {@link RequestHandler}.
* Factory for creating a new {@link RpcProtocol} for a given chanel and {@link RequestHandler}.
*/
export type RpcConnectionFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol;
export type RpcProtocolFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol;

const defaultRPCConnectionFactory: RpcConnectionFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler);
const defaultRpcProtocolFactory: RpcProtocolFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler);

/**
* Factory for JSON-RPC proxy objects.
Expand Down Expand Up @@ -109,25 +110,22 @@ export class JsonRpcProxyFactory<T extends object> implements ProxyHandler<T> {
protected readonly onDidOpenConnectionEmitter = new Emitter<void>();
protected readonly onDidCloseConnectionEmitter = new Emitter<void>();

protected connectionPromiseResolve: (connection: RpcProtocol) => void;
protected connectionPromise: Promise<RpcProtocol>;
protected rpcDeferred: Deferred<RpcProtocol>;

/**
* Build a new JsonRpcProxyFactory.
*
* @param target - The object to expose to JSON-RPC methods calls. If this
* is omitted, the proxy won't be able to handle requests, only send them.
*/
constructor(public target?: any, protected rpcConnectionFactory = defaultRPCConnectionFactory) {
constructor(public target?: any, protected rpcProtocolFactory = defaultRpcProtocolFactory) {
this.waitForConnection();
}

protected waitForConnection(): void {
this.connectionPromise = new Promise(resolve =>
this.connectionPromiseResolve = resolve
);
this.connectionPromise.then(connection => {
connection.channel.onClose(() => {
this.rpcDeferred = new Deferred<RpcProtocol>();
this.rpcDeferred.promise.then(protocol => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not exactly the same as before: waitForConnection can be called more than once, but we a single Deferred instance can only be resolved once. I think we have to allocate a new instance here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

protocol.channel.onClose(() => {
this.onDidCloseConnectionEmitter.fire(undefined);
// Wait for connection in case the backend reconnects
this.waitForConnection();
Expand All @@ -143,10 +141,10 @@ export class JsonRpcProxyFactory<T extends object> implements ProxyHandler<T> {
* response.
*/
listen(channel: Channel): void {
const connection = this.rpcConnectionFactory(channel, (meth, args) => this.onRequest(meth, ...args));
connection.onNotification(event => this.onNotification(event.method, ...event.args));
const protocol = this.rpcProtocolFactory(channel, (meth, args) => this.onRequest(meth, ...args));
protocol.onNotification(event => this.onNotification(event.method, ...event.args));

this.connectionPromiseResolve(connection);
this.rpcDeferred.resolve(protocol);
}

/**
Expand Down Expand Up @@ -249,7 +247,7 @@ export class JsonRpcProxyFactory<T extends object> implements ProxyHandler<T> {
return (...args: any[]) => {
const method = p.toString();
const capturedError = new Error(`Request '${method}' failed`);
return this.connectionPromise.then(connection =>
return this.rpcDeferred.promise.then(connection =>
new Promise<void>((resolve, reject) => {
try {
if (isNotify) {
Expand Down