From 41a4ed5c4ada8fc274e7e254b508879fed78bd4b Mon Sep 17 00:00:00 2001 From: Tobias Ortmayr Date: Thu, 2 Jun 2022 13:38:46 +0200 Subject: [PATCH] Use binary message RPC protocol for plugin API Refactors the plugin RPC protocol to make use of the new message-rpc introduced with #11011/#11228. - Refactor plugin-ext RpcProtocol API to reuse the new message-rpc protocol - Remove custom RPC message encoding and handling reuse message-rpc - Implement `QueuingChannelMultiplexer` that queues messages and sends them accumulated on the next process.tick (replaces the old Multiplexer implementation) - Refactors proxy handlers and remote target handlers - Use `Channel` instead of `MessageConnection` for creating new instances of RPCProtocol - Refactor `RpcMessageEncoder`/`RpcMessageDecoder` to enable overwritting of already registered value encoders/decoders. - Add mode property to base `RpcProtocol` to enable switching from a bidirectional RPC protocol to a client-only or server-only variant. - Implement special message encoders and decoders for the plugin communication. (Replacement for the old `ObjectTransferrer` JSON replacers/revivers) - Adapt `HostedPluginServer` and `HostedPluginClient` API to send/receive messages in binary format instead of strings. This enables direct writethrough of the binary messages received from the hosted plugin process. - Adapt `hosted-plugin-process` and `plugin-host` to directly send binary messages via `IpcChannel`/`BinaryMessagePipe` - Remove incorrect (and unused) notification proxy identifiers and instantiation - NotificationExt was instantiated in the main context - There were unused notification proxy identifiers for main and ext in the wrong contexts Part of #10684 Fixes #9514 Contributed on behalf of STMicroelectronics Co-authored-by: Lucas Koehler --- examples/electron/package.json | 7 +- .../core/src/common/message-rpc/channel.ts | 22 +- .../common/message-rpc/rpc-message-encoder.ts | 26 +- .../src/common/message-rpc/rpc-protocol.ts | 78 ++- packages/core/src/node/messaging/index.ts | 1 + .../plugin-ext/src/common/plugin-api-rpc.ts | 2 - .../plugin-ext/src/common/plugin-protocol.ts | 8 +- .../plugin-ext/src/common/proxy-handler.ts | 116 ++++ .../plugin-ext/src/common/rpc-protocol.ts | 547 +++++------------- .../hosted/browser/hosted-plugin-watcher.ts | 7 +- .../src/hosted/browser/hosted-plugin.ts | 39 +- .../src/hosted/browser/plugin-worker.ts | 33 +- .../src/hosted/browser/worker/worker-main.ts | 38 +- .../src/hosted/node/hosted-plugin-process.ts | 37 +- .../src/hosted/node/hosted-plugin-protocol.ts | 50 ++ .../src/hosted/node/hosted-plugin.ts | 2 +- .../plugin-ext/src/hosted/node/plugin-host.ts | 33 +- .../src/hosted/node/plugin-service.ts | 2 +- .../src/main/browser/main-context.ts | 4 - packages/plugin-ext/src/plugin/types-impl.ts | 13 - 20 files changed, 540 insertions(+), 525 deletions(-) create mode 100644 packages/plugin-ext/src/common/proxy-handler.ts create mode 100644 packages/plugin-ext/src/hosted/node/hosted-plugin-protocol.ts diff --git a/examples/electron/package.json b/examples/electron/package.json index 4120a2eeba9ed..f36dc769c5cfe 100644 --- a/examples/electron/package.json +++ b/examples/electron/package.json @@ -55,7 +55,12 @@ "@theia/userstorage": "1.26.0", "@theia/variable-resolver": "1.26.0", "@theia/vsx-registry": "1.26.0", - "@theia/workspace": "1.26.0" + "@theia/workspace": "1.26.0", + "node-pty": "*", + "nsfw": "*", + "native-keymap": "*", + "find-git-repositories": "*", + "drivelist": "*" }, "scripts": { "build": "yarn -s compile && yarn -s bundle", diff --git a/packages/core/src/common/message-rpc/channel.ts b/packages/core/src/common/message-rpc/channel.ts index 5d9fc6985cc07..bbbdb27c418b2 100644 --- a/packages/core/src/common/message-rpc/channel.ts +++ b/packages/core/src/common/message-rpc/channel.ts @@ -159,6 +159,10 @@ export class ChannelMultiplexer implements Disposable { } + protected getUnderlyingWriteBuffer(): WriteBuffer { + return this.underlyingChannel.getWriteBuffer(); + } + protected handleMessage(buffer: ReadBuffer): void { const type = buffer.readUint8(); const id = buffer.readString(); @@ -185,7 +189,7 @@ export class ChannelMultiplexer implements Disposable { const channel = this.createChannel(id); this.pendingOpen.delete(id); this.openChannels.set(id, channel); - resolve!(channel); + resolve(channel); this.onOpenChannelEmitter.fire({ id, channel }); } } @@ -199,7 +203,7 @@ export class ChannelMultiplexer implements Disposable { // edge case: both side try to open a channel at the same time. resolve(channel); } - this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.AckOpen).writeString(id).commit(); + this.getUnderlyingWriteBuffer().writeUint8(MessageTypes.AckOpen).writeString(id).commit(); this.onOpenChannelEmitter.fire({ id, channel }); } } @@ -215,7 +219,9 @@ export class ChannelMultiplexer implements Disposable { protected handleData(id: string, data: ReadBuffer): void { const channel = this.openChannels.get(id); if (channel) { - channel.onMessageEmitter.fire(() => data); + channel.onMessageEmitter.fire(() => data.sliceAtReadPosition()); + } else { + console.warn(`Could not handle data for channel ${id} because it is not open`); } } @@ -226,14 +232,14 @@ export class ChannelMultiplexer implements Disposable { // Prepare the write buffer for the channel with the give, id. The channel id has to be encoded // and written to the buffer before the actual message. protected prepareWriteBuffer(id: string): WriteBuffer { - const underlying = this.underlyingChannel.getWriteBuffer(); + const underlying = this.getUnderlyingWriteBuffer(); underlying.writeUint8(MessageTypes.Data); underlying.writeString(id); return underlying; } protected closeChannel(id: string): void { - this.underlyingChannel.getWriteBuffer() + this.getUnderlyingWriteBuffer() .writeUint8(MessageTypes.Close) .writeString(id) .commit(); @@ -242,10 +248,14 @@ export class ChannelMultiplexer implements Disposable { } open(id: string): Promise { + const existingChannel = this.getOpenChannel(id); + if (existingChannel) { + return Promise.resolve(existingChannel); + } const result = new Promise((resolve, reject) => { this.pendingOpen.set(id, resolve); }); - this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.Open).writeString(id).commit(); + this.getUnderlyingWriteBuffer().writeUint8(MessageTypes.Open).writeString(id).commit(); return result; } diff --git a/packages/core/src/common/message-rpc/rpc-message-encoder.ts b/packages/core/src/common/message-rpc/rpc-message-encoder.ts index 8b0c237187747..8a0cdd23dd404 100644 --- a/packages/core/src/common/message-rpc/rpc-message-encoder.ts +++ b/packages/core/src/common/message-rpc/rpc-message-encoder.ts @@ -149,6 +149,10 @@ export class RpcMessageDecoder { protected decoders: Map = new Map(); constructor() { + this.registerDecoders(); + } + + protected registerDecoders(): void { this.registerDecoder(ObjectType.JSON, { read: buf => { const json = buf.readString(); @@ -206,7 +210,6 @@ export class RpcMessageDecoder { this.registerDecoder(ObjectType.Number, { read: buf => buf.readNumber() }); - } /** @@ -215,9 +218,10 @@ export class RpcMessageDecoder { * by retrieving the highest tag value and calculating the required Uint size to store it. * @param tag the tag for which the decoder should be registered. * @param decoder the decoder that should be registered. + * @param overwrite flag to indicate wether an existing registration with the same tag should be overwritten with the new registration. */ - registerDecoder(tag: number, decoder: ValueDecoder): void { - if (this.decoders.has(tag)) { + registerDecoder(tag: number, decoder: ValueDecoder, overwrite = false): void { + if (!overwrite && this.decoders.has(tag)) { throw new Error(`Decoder already registered: ${tag}`); } this.decoders.set(tag, decoder); @@ -435,14 +439,20 @@ export class RpcMessageEncoder { * After the successful registration the {@link tagIntType} is recomputed * by retrieving the highest tag value and calculating the required Uint size to store it. * @param tag the tag for which the encoder should be registered. - * @param decoder the encoder that should be registered. + * @param encoder the encoder that should be registered. + * @param overwrite to indicate wether an existing registration with the same tag should be overwritten with the new registration. */ - registerEncoder(tag: number, encoder: ValueEncoder): void { - if (this.registeredTags.has(tag)) { + registerEncoder(tag: number, encoder: ValueEncoder, overwrite = false): void { + if (!overwrite && this.registeredTags.has(tag)) { throw new Error(`Tag already registered: ${tag}`); } - this.registeredTags.add(tag); - this.encoders.push([tag, encoder]); + if (!overwrite) { + this.registeredTags.add(tag); + this.encoders.push([tag, encoder]); + } else { + const overrideIndex = this.encoders.findIndex(existingEncoder => existingEncoder[0] === tag); + this.encoders[overrideIndex] = [tag, encoder]; + } } cancel(buf: WriteBuffer, requestId: number): void { diff --git a/packages/core/src/common/message-rpc/rpc-protocol.ts b/packages/core/src/common/message-rpc/rpc-protocol.ts index 6e037e6e8befd..059be439d97bd 100644 --- a/packages/core/src/common/message-rpc/rpc-protocol.ts +++ b/packages/core/src/common/message-rpc/rpc-protocol.ts @@ -24,7 +24,7 @@ import { RpcMessage, RpcMessageDecoder, RpcMessageEncoder, RpcMessageType } from import { Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; /** - * Handles request messages received by the {@link RpcServer}. + * Handles request messages received by the {@link RPCProtocol}. */ export type RequestHandler = (method: string, args: any[]) => Promise; @@ -39,15 +39,20 @@ export interface RpcProtocolOptions { /** * The message decoder that should be used. If `undefined` the default {@link RpcMessageDecoder} will be used. */ - decoder?: RpcMessageDecoder + decoder?: RpcMessageDecoder, + /** + * The runtime mode determines whether the RPC protocol is bi-directional (default) or acts as a client or server only. + */ + mode?: 'default' | 'clientOnly' | 'serverOnly' } /** - * Establish a bi-directional RPC protocol on top of a given channel. Bi-directional means to send - * sends requests and notifications to the remote side as well as receiving requests and notifications from the remote side. + * Establish a RPC protocol on top of a given channel. By default the rpc protocol is bi-directional, meaning it is possible to send + * sends requests and notifications to the remote side (ie. acts as client) as well as receiving requests and notifications from the remote side (i.e. acts as a server). * Clients can get a promise for a remote request result that will be either resolved or * rejected depending on the success of the request. Keeps track of outstanding requests and matches replies to the appropriate request * Currently, there is no timeout handling for long running requests implemented. + * The bi-directional mode can be reconfigured using the {@link RpcProtocolOptions} to construct and RPC protocol instance that acts only as client or server instead. */ export class RpcProtocol { static readonly CANCELLATION_TOKEN_KEY = 'add.cancellation.token'; @@ -58,6 +63,7 @@ export class RpcProtocol { protected readonly encoder: RpcMessageEncoder; protected readonly decoder: RpcMessageDecoder; + protected readonly mode: 'default' | 'clientOnly' | 'serverOnly'; protected readonly onNotificationEmitter: Emitter<{ method: string; args: any[]; }> = new Emitter(); protected readonly cancellationTokenSources = new Map(); @@ -68,37 +74,50 @@ export class RpcProtocol { protected toDispose = new DisposableCollection(); - constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler, options: RpcProtocolOptions = {}) { + constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler | undefined, options: RpcProtocolOptions = {}) { this.encoder = options.encoder ?? new RpcMessageEncoder(); this.decoder = options.decoder ?? new RpcMessageDecoder(); this.toDispose.push(this.onNotificationEmitter); - this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer())))); channel.onClose(() => this.toDispose.dispose()); + this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer())))); + this.mode = options.mode ?? 'default'; + + if (this.mode !== 'clientOnly' && requestHandler === undefined) { + console.error('RPCProtocol was initialized without a request handler but was not set to clientOnly mode.'); + } } handleMessage(message: RpcMessage): void { - switch (message.type) { - case RpcMessageType.Cancel: { - this.handleCancel(message.id); - break; - } - case RpcMessageType.Request: { - this.handleRequest(message.id, message.method, message.args); - break; + if (this.mode !== 'clientOnly') { + switch (message.type) { + case RpcMessageType.Cancel: { + this.handleCancel(message.id); + return; + } + case RpcMessageType.Request: { + this.handleRequest(message.id, message.method, message.args); + return; + } + case RpcMessageType.Notification: { + this.handleNotify(message.id, message.method, message.args); + return; + } } - case RpcMessageType.Notification: { - this.handleNotify(message.id, message.method, message.args); - break; - } - case RpcMessageType.Reply: { - this.handleReply(message.id, message.res); - break; - } - case RpcMessageType.ReplyErr: { - this.handleReplyErr(message.id, message.err); - break; + } + if (this.mode !== 'serverOnly') { + switch (message.type) { + case RpcMessageType.Reply: { + this.handleReply(message.id, message.res); + return; + } + case RpcMessageType.ReplyErr: { + this.handleReplyErr(message.id, message.err); + return; + } } } + // If the message was not handled until here, it is incompatible with the mode. + console.warn(`Received message incompatible with this RPCProtocol's mode '${this.mode}'. Type: ${message.type}. ID: ${message.id}.`); } protected handleReply(id: number, value: any): void { @@ -153,6 +172,13 @@ export class RpcProtocol { } sendNotification(method: string, args: any[]): void { + // If the notification supports a CancellationToken, it needs to be treated like a request + // because cancellation does not work with the simplified "fire and forget" approach of simple notifications. + if (args.length && CancellationToken.is(args[args.length - 1])) { + this.sendRequest(method, args); + return; + } + const output = this.channel.getWriteBuffer(); this.encoder.notification(output, this.nextMessageId++, method, args); output.commit(); @@ -191,7 +217,7 @@ export class RpcProtocol { } try { - const result = await this.requestHandler(method, args); + const result = await this.requestHandler!(method, args); this.cancellationTokenSources.delete(id); this.encoder.replyOK(output, id, result); output.commit(); diff --git a/packages/core/src/node/messaging/index.ts b/packages/core/src/node/messaging/index.ts index fd161d93a9df0..23da1fe350586 100644 --- a/packages/core/src/node/messaging/index.ts +++ b/packages/core/src/node/messaging/index.ts @@ -16,3 +16,4 @@ export * from './messaging-service'; export * from './ipc-connection-provider'; +export * from './ipc-channel'; diff --git a/packages/plugin-ext/src/common/plugin-api-rpc.ts b/packages/plugin-ext/src/common/plugin-api-rpc.ts index abff12eabe343..2feb707dbe195 100644 --- a/packages/plugin-ext/src/common/plugin-api-rpc.ts +++ b/packages/plugin-ext/src/common/plugin-api-rpc.ts @@ -1839,7 +1839,6 @@ export const PLUGIN_RPC_CONTEXT = { STATUS_BAR_MESSAGE_REGISTRY_MAIN: >createProxyIdentifier('StatusBarMessageRegistryMain'), ENV_MAIN: createProxyIdentifier('EnvMain'), NOTIFICATION_MAIN: createProxyIdentifier('NotificationMain'), - NOTIFICATION_EXT: createProxyIdentifier('NotificationExt'), TERMINAL_MAIN: createProxyIdentifier('TerminalServiceMain'), TREE_VIEWS_MAIN: createProxyIdentifier('TreeViewsMain'), PREFERENCE_REGISTRY_MAIN: createProxyIdentifier('PreferenceRegistryMain'), @@ -1871,7 +1870,6 @@ export const MAIN_RPC_CONTEXT = { QUICK_OPEN_EXT: createProxyIdentifier('QuickOpenExt'), WINDOW_STATE_EXT: createProxyIdentifier('WindowStateExt'), NOTIFICATION_EXT: createProxyIdentifier('NotificationExt'), - NOTIFICATION_MAIN: createProxyIdentifier('NotificationMain'), WORKSPACE_EXT: createProxyIdentifier('WorkspaceExt'), TEXT_EDITORS_EXT: createProxyIdentifier('TextEditorsExt'), EDITORS_AND_DOCUMENTS_EXT: createProxyIdentifier('EditorsAndDocumentsExt'), diff --git a/packages/plugin-ext/src/common/plugin-protocol.ts b/packages/plugin-ext/src/common/plugin-protocol.ts index 47fab8c9bcaca..8b5f4af6fc369 100644 --- a/packages/plugin-ext/src/common/plugin-protocol.ts +++ b/packages/plugin-ext/src/common/plugin-protocol.ts @@ -813,7 +813,7 @@ export function buildFrontendModuleName(plugin: PluginPackage | PluginModel): st export const HostedPluginClient = Symbol('HostedPluginClient'); export interface HostedPluginClient { - postMessage(pluginHost: string, message: string): Promise; + postMessage(pluginHost: string, buffer: Uint8Array): Promise; log(logPart: LogPart): void; @@ -858,7 +858,7 @@ export interface HostedPluginServer extends JsonRpcServer { getExtPluginAPI(): Promise; - onMessage(targetHost: string, message: string): Promise; + onMessage(targetHost: string, message: Uint8Array): Promise; } @@ -895,9 +895,9 @@ export interface PluginServer { export const ServerPluginRunner = Symbol('ServerPluginRunner'); export interface ServerPluginRunner { // eslint-disable-next-line @typescript-eslint/no-explicit-any - acceptMessage(pluginHostId: string, jsonMessage: string): boolean; + acceptMessage(pluginHostId: string, jsonMessage: Uint8Array): boolean; // eslint-disable-next-line @typescript-eslint/no-explicit-any - onMessage(pluginHostId: string, jsonMessage: string): void; + onMessage(pluginHostId: string, jsonMessage: Uint8Array): void; setClient(client: HostedPluginClient): void; setDefault(defaultRunner: ServerPluginRunner): void; clientClosed(): void; diff --git a/packages/plugin-ext/src/common/proxy-handler.ts b/packages/plugin-ext/src/common/proxy-handler.ts new file mode 100644 index 0000000000000..5d01f0a20c712 --- /dev/null +++ b/packages/plugin-ext/src/common/proxy-handler.ts @@ -0,0 +1,116 @@ +/******************************************************************************** + * Copyright (C) 2022 STMicroelectronics and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import { Channel, MaybePromise, RpcProtocol, RpcProtocolOptions } from '@theia/core/'; +import { Deferred } from '@theia/core/lib/common/promise-util'; +import { RpcMessageDecoder, RpcMessageEncoder } from '@theia/core/lib/common/message-rpc/rpc-message-encoder'; + +export interface RpcMessageCodec { + encoder: RpcMessageEncoder, + decoder: RpcMessageDecoder +} +/** + * A proxy handler that will send any method invocation on the proxied object + * as a rcp protocol message over a channel. + */ +export class ClientProxyHandler implements ProxyHandler { + private rpcDeferred: Deferred = new Deferred(); + private isRpcInitialized = false; + + constructor(protected readonly id: string, protected readonly codec: RpcMessageCodec, protected readonly channelProvider: () => MaybePromise) { } + + private async initializeRpc(): Promise { + const clientOptions: RpcProtocolOptions = { ...this.codec, mode: 'clientOnly' }; + const channel = await this.channelProvider(); + const rpc = new RpcProtocol(channel, undefined, clientOptions); + this.rpcDeferred.resolve(rpc); + this.isRpcInitialized = true; + } + + get(target: any, name: string, receiver: any): any { + if (!this.isRpcInitialized) { + this.initializeRpc(); + } + + if (target[name] || name.charCodeAt(0) !== 36 /* CharCode.DollarSign */) { + return target[name]; + } + const isNotify = this.isNotification(name); + return (...args: any[]) => { + const method = name.toString(); + return this.rpcDeferred.promise.then((connection: RpcProtocol) => + new Promise((resolve, reject) => { + try { + if (isNotify) { + connection.sendNotification(method, args); + resolve(undefined); + } else { + const resultPromise = connection.sendRequest(method, args) as Promise; + resultPromise.then((result: any) => { + resolve(result); + }).catch(e => { + reject(e); + }); + } + } catch (err) { + reject(err); + } + }) + ); + }; + } + + /** + * Return whether the given property represents a notification. If true, + * the promise returned from the invocation will resolve immediately to `undefined` + * + * A property leads to a notification rather than a method call if its name + * begins with `notify` or `on`. + * + * @param p - The property being called on the proxy. + * @return Whether `p` represents a notification. + */ + protected isNotification(p: PropertyKey): boolean { + let propertyString = p.toString(); + if (propertyString.charCodeAt(0) === 36/* CharCode.DollarSign */) { + propertyString = propertyString.substring(1); + } + return propertyString.startsWith('notify') || propertyString.startsWith('on'); + } +} + +export class RpcInvocationHandler { + private rpcDeferred: Deferred = new Deferred(); + + constructor(readonly id: string, readonly target: any, protected readonly codec: RpcMessageCodec) { + } + + listen(channel: Channel): void { + const serverOptions: RpcProtocolOptions = { ...this.codec, mode: 'serverOnly' }; + const server = new RpcProtocol(channel, (method: string, args: any[]) => this.handleRequest(method, args), serverOptions); + server.onNotification((e: { method: string, args: any }) => this.onNotification(e.method, e.args)); + this.rpcDeferred.resolve(server); + } + + protected handleRequest(method: string, args: any[]): Promise { + return this.rpcDeferred.promise.then(() => this.target[method](...args)); + } + + protected onNotification(method: string, args: any[]): void { + this.target[method](...args); + } +} + diff --git a/packages/plugin-ext/src/common/rpc-protocol.ts b/packages/plugin-ext/src/common/rpc-protocol.ts index e46c930a3c325..94ace23337816 100644 --- a/packages/plugin-ext/src/common/rpc-protocol.ts +++ b/packages/plugin-ext/src/common/rpc-protocol.ts @@ -22,13 +22,15 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { Emitter, Event } from '@theia/core/lib/common/event'; -import { DisposableCollection, Disposable } from '@theia/core/lib/common/disposable'; -import { Deferred } from '@theia/core/lib/common/promise-util'; +import { Event } from '@theia/core/lib/common/event'; +import { Channel, Disposable, DisposableCollection, ReadBuffer, WriteBuffer } from '@theia/core'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer'; +import { ChannelMultiplexer } from '@theia/core/lib/common/message-rpc/channel'; +import { ObjectType, RpcMessageDecoder, RpcMessageEncoder } from '@theia/core/lib/common/message-rpc/rpc-message-encoder'; +import { Position, Range } from '../plugin/types-impl'; +import { ClientProxyHandler, RpcInvocationHandler, RpcMessageCodec } from './proxy-handler'; +import TheiaURI from '@theia/core/lib/common/uri'; import { URI as VSCodeURI } from '@theia/core/shared/vscode-uri'; -import URI from '@theia/core/lib/common/uri'; -import { CancellationToken, CancellationTokenSource } from '@theia/core/shared/vscode-languageserver-protocol'; -import { Range, Position } from '../plugin/types-impl'; import { BinaryBuffer } from '@theia/core/lib/common/buffer'; export interface MessageConnection { @@ -76,50 +78,43 @@ export namespace ConnectionClosedError { } } -export class RPCProtocolImpl implements RPCProtocol { +// Start with 101 to avoid clashes with ObjectType from core. +// All values < 255, still fit into Uint8 +export enum PluginObjectType { + TheiaRange = 101, + TheiaUri = 102, + VSCodeUri = 103, + // eslint-disable-next-line @typescript-eslint/no-shadow + BinaryBuffer = 104, +} - private readonly locals = new Map(); +export class RPCProtocolImpl implements RPCProtocol { + private readonly locals = new Map(); private readonly proxies = new Map(); - private lastMessageId = 0; - private readonly cancellationTokenSources = new Map(); - private readonly pendingRPCReplies = new Map>(); - private readonly multiplexer: RPCMultiplexer; - - private replacer: (key: string | undefined, value: any) => any; - private reviver: (key: string | undefined, value: any) => any; + private readonly multiplexer: ChannelMultiplexer; + private messageCodec: RpcMessageCodec; private readonly toDispose = new DisposableCollection( Disposable.create(() => { /* mark as no disposed */ }) ); - constructor(connection: MessageConnection, transformations?: { - replacer?: (key: string | undefined, value: any) => any, - reviver?: (key: string | undefined, value: any) => any - }) { - this.toDispose.push( - this.multiplexer = new RPCMultiplexer(connection) - ); - this.multiplexer.onMessage(msg => this.receiveOneMessage(msg)); - this.toDispose.push(Disposable.create(() => { - this.proxies.clear(); - for (const reply of this.pendingRPCReplies.values()) { - reply.reject(ConnectionClosedError.create()); - } - this.pendingRPCReplies.clear(); - })); - - this.reviver = transformations?.reviver || ObjectsTransferrer.reviver; - this.replacer = transformations?.replacer || ObjectsTransferrer.replacer; - } - - private get isDisposed(): boolean { - return this.toDispose.disposed; + constructor(channel: Channel) { + this.messageCodec = { + encoder: new PluginRpcMessageEncoder(), + decoder: new PluginRpcMessageDecoder(), + }; + this.toDispose.push(this.multiplexer = new QueuingChannelMultiplexer(channel)); + this.toDispose.push(Disposable.create(() => this.proxies.clear())); } dispose(): void { this.toDispose.dispose(); } + protected get isDisposed(): boolean { + return this.toDispose.disposed; + } + getProxy(proxyId: ProxyIdentifier): T { if (this.isDisposed) { throw ConnectionClosedError.create(); @@ -132,415 +127,185 @@ export class RPCProtocolImpl implements RPCProtocol { return proxy; } + protected createProxy(proxyId: string): T { + const handler = new ClientProxyHandler(proxyId, this.messageCodec, () => this.multiplexer.open(proxyId)); + return new Proxy(Object.create(null), handler); + } + set(identifier: ProxyIdentifier, instance: R): R { if (this.isDisposed) { throw ConnectionClosedError.create(); } - this.locals.set(identifier.id, instance); - if (Disposable.is(instance)) { - this.toDispose.push(instance); - } - this.toDispose.push(Disposable.create(() => this.locals.delete(identifier.id))); - return instance; - } + const invocationHandler = this.locals.get(identifier.id); + if (!invocationHandler) { + const handler = new RpcInvocationHandler(identifier.id, instance, this.messageCodec); - private createProxy(proxyId: string): T { - const handler = { - get: (target: any, name: string) => { - if (!target[name] && name.charCodeAt(0) === 36 /* CharCode.DollarSign */) { - target[name] = (...myArgs: any[]) => - this.remoteCall(proxyId, name, myArgs); - } - return target[name]; + const channel = this.multiplexer.getOpenChannel(identifier.id); + if (channel) { + handler.listen(channel); + } else { + const channelOpenListener = this.multiplexer.onDidOpenChannel(event => { + if (event.id === identifier.id) { + handler.listen(event.channel); + channelOpenListener.dispose(); + } + }); } - }; - return new Proxy(Object.create(null), handler); - } - private remoteCall(proxyId: string, methodName: string, args: any[]): Promise { - if (this.isDisposed) { - return Promise.reject(ConnectionClosedError.create()); - } - const cancellationToken: CancellationToken | undefined = args.length && CancellationToken.is(args[args.length - 1]) ? args.pop() : undefined; - if (cancellationToken && cancellationToken.isCancellationRequested) { - return Promise.reject(canceled()); - } - - const callId = String(++this.lastMessageId); - const result = new Deferred(); + this.locals.set(identifier.id, handler); + if (Disposable.is(instance)) { + this.toDispose.push(instance); + } + this.toDispose.push(Disposable.create(() => this.locals.delete(identifier.id))); - if (cancellationToken) { - args.push('add.cancellation.token'); - cancellationToken.onCancellationRequested(() => - this.multiplexer.send(this.cancel(callId)) - ); } - - this.pendingRPCReplies.set(callId, result); - this.multiplexer.send(this.request(callId, proxyId, methodName, args)); - return result.promise; + return instance; } +} - private receiveOneMessage(rawmsg: string): void { - if (this.isDisposed) { - return; - } - try { - const msg = JSON.parse(rawmsg, this.reviver); - - switch (msg.type) { - case MessageType.Request: - this.receiveRequest(msg); - break; - case MessageType.Reply: - this.receiveReply(msg); - break; - case MessageType.ReplyErr: - this.receiveReplyErr(msg); - break; - case MessageType.Cancel: - this.receiveCancel(msg); - break; +export class PluginRpcMessageEncoder extends RpcMessageEncoder { + + protected override registerEncoders(): void { + super.registerEncoders(); + + this.registerEncoder(PluginObjectType.TheiaRange, { + is: value => value instanceof Range, + write: (buf, value) => { + const range = value as Range; + const serializedValue = { + start: { + line: range.start.line, + character: range.start.character + }, + end: { + line: range.end.line, + character: range.end.character + } + }; + buf.writeString(JSON.stringify(serializedValue)); } - } catch (e) { - // exception does not show problematic content: log it! - console.log('failed to parse message: ' + rawmsg); - throw e; - } - - } - - private receiveCancel(msg: CancelMessage): void { - const cancellationTokenSource = this.cancellationTokenSources.get(msg.id); - if (cancellationTokenSource) { - cancellationTokenSource.cancel(); - } - } + }); - private receiveRequest(msg: RequestMessage): void { - const callId = msg.id; - const proxyId = msg.proxyId; - // convert `null` to `undefined`, since we don't use `null` in internal plugin APIs - const args = msg.args.map(arg => arg === null ? undefined : arg); // eslint-disable-line no-null/no-null - - const addToken = args.length && args[args.length - 1] === 'add.cancellation.token' ? args.pop() : false; - if (addToken) { - const tokenSource = new CancellationTokenSource(); - this.cancellationTokenSources.set(callId, tokenSource); - args.push(tokenSource.token); - } - const invocation = this.invokeHandler(proxyId, msg.method, args); - - invocation.then(result => { - this.cancellationTokenSources.delete(callId); - this.multiplexer.send(this.replyOK(callId, result)); - }, error => { - this.cancellationTokenSources.delete(callId); - this.multiplexer.send(this.replyErr(callId, error)); + this.registerEncoder(PluginObjectType.TheiaUri, { + is: value => value instanceof TheiaURI, + write: (buf, value) => { + buf.writeString(value.toString()); + } }); - } - private receiveReply(msg: ReplyMessage): void { - const callId = msg.id; - const pendingReply = this.pendingRPCReplies.get(callId); - if (!pendingReply) { - return; - } - this.pendingRPCReplies.delete(callId); - pendingReply.resolve(msg.res); - } + this.registerEncoder(PluginObjectType.VSCodeUri, { + is: value => value instanceof VSCodeURI, + write: (buf, value) => { + buf.writeString(value.toString()); + } + }); - private receiveReplyErr(msg: ReplyErrMessage): void { - const callId = msg.id; - const pendingReply = this.pendingRPCReplies.get(callId); - if (!pendingReply) { - return; - } - this.pendingRPCReplies.delete(callId); - - let err: Error | undefined = undefined; - if (msg.err && msg.err.$isError) { - err = new Error(); - err.name = msg.err.name; - err.message = msg.err.message; - err.stack = msg.err.stack; - } - pendingReply.reject(err); - } + this.registerEncoder(PluginObjectType.BinaryBuffer, { + is: value => value instanceof BinaryBuffer, + write: (buf, value) => { + const binaryBuffer = value as BinaryBuffer; + buf.writeBytes(binaryBuffer.buffer); + } + }); - private invokeHandler(proxyId: string, methodName: string, args: any[]): Promise { - try { - return Promise.resolve(this.doInvokeHandler(proxyId, methodName, args)); - } catch (err) { - return Promise.reject(err); - } + // We don't want/need special encoding for `ResponseErrors`. Overwrite with no-op encoder. + // The default Error encoder will be used as fallback + this.registerEncoder(ObjectType.ResponseError, { + is: () => false, + write: () => { } + }, true); } +} - private doInvokeHandler(proxyId: string, methodName: string, args: any[]): any { - const actor = this.locals.get(proxyId); - if (!actor) { - throw new Error('Unknown actor ' + proxyId); - } - const method = actor[methodName]; - if (typeof method !== 'function') { - throw new Error('Unknown method ' + methodName + ' on actor ' + proxyId); - } - return method.apply(actor, args); - } +export class PluginRpcMessageDecoder extends RpcMessageDecoder { - private cancel(req: string): string { - return `{"type":${MessageType.Cancel},"id":"${req}"}`; - } + protected override registerDecoders(): void { + super.registerDecoders(); - private request(req: string, rpcId: string, method: string, args: any[]): string { - return `{"type":${MessageType.Request},"id":"${req}","proxyId":"${rpcId}","method":"${method}","args":${JSON.stringify(args, this.replacer)}}`; - } + this.registerDecoder(PluginObjectType.TheiaRange, { + read: buf => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const obj: any = JSON.parse(buf.readString()); + const start = new Position(obj.start.line, obj.start.character); + const end = new Position(obj.end.line, obj.end.character); + return new Range(start, end); + } + }); + this.registerDecoder(PluginObjectType.TheiaUri, { + read: buf => new TheiaURI(buf.readString()) + }); - private replyOK(req: string, res: any): string { - if (typeof res === 'undefined') { - return `{"type":${MessageType.Reply},"id":"${req}"}`; - } - return `{"type":${MessageType.Reply},"id":"${req}","res":${safeStringify(res, this.replacer)}}`; - } + this.registerDecoder(PluginObjectType.VSCodeUri, { + read: buf => VSCodeURI.parse(buf.readString()) + }); - private replyErr(req: string, err: any): string { - err = typeof err === 'string' ? new Error(err) : err; - if (err instanceof Error) { - return `{"type":${MessageType.ReplyErr},"id":"${req}","err":${safeStringify(transformErrorForSerialization(err))}}`; - } - return `{"type":${MessageType.ReplyErr},"id":"${req}","err":null}`; + this.registerDecoder(PluginObjectType.BinaryBuffer, { + read: buf => BinaryBuffer.wrap(buf.readBytes()) + }); } } -function canceled(): Error { - const error = new Error('Canceled'); - error.name = error.message; - return error; -} - /** * Sends/Receives multiple messages in one go: * - multiple messages to be sent from one stack get sent in bulk at `process.nextTick`. * - each incoming message is handled in a separate `process.nextTick`. */ -class RPCMultiplexer implements Disposable, MessageConnection { - - private readonly connection: MessageConnection; - private readonly sendAccumulatedBound: () => void; - - private messagesToSend: string[]; - - private readonly messageEmitter = new Emitter(); - private readonly toDispose = new DisposableCollection(); - - constructor(connection: MessageConnection) { - this.connection = connection; - this.sendAccumulatedBound = this.sendAccumulated.bind(this); +export class QueuingChannelMultiplexer extends ChannelMultiplexer { + protected messagesToSend: Uint8Array[] = []; + constructor(underlyingChannel: Channel) { + super(underlyingChannel); this.toDispose.push(Disposable.create(() => this.messagesToSend = [])); - this.toDispose.push(this.connection.onMessage((msg: string) => { - const messages = JSON.parse(msg); - for (const message of messages) { - this.messageEmitter.fire(message); - } - })); - this.toDispose.push(this.messageEmitter); - - this.messagesToSend = []; } - dispose(): void { - this.toDispose.dispose(); - } - - get onMessage(): Event { - return this.messageEmitter.event; + protected override getUnderlyingWriteBuffer(): WriteBuffer { + const writer = new Uint8ArrayWriteBuffer(); + writer.onCommit(buffer => this.commitSingleMessage(buffer)); + return writer; } - private sendAccumulated(): void { - const tmp = this.messagesToSend; - this.messagesToSend = []; - this.connection.send(JSON.stringify(tmp)); - } - - public send(msg: string): void { + protected commitSingleMessage(msg: Uint8Array): void { if (this.toDispose.disposed) { throw ConnectionClosedError.create(); } if (this.messagesToSend.length === 0) { if (typeof setImmediate !== 'undefined') { - setImmediate(this.sendAccumulatedBound); + setImmediate(() => this.sendAccumulated()); } else { - setTimeout(this.sendAccumulatedBound, 0); + setTimeout(() => this.sendAccumulated(), 0); } } this.messagesToSend.push(msg); } -} -/** - * These functions are responsible for correct transferring objects via rpc channel. - * - * To reach that some specific kind of objects is converted to json in some custom way - * and then, after receiving, revived to objects again, - * so there is feeling that object was transferred via rpc channel. - * - * To distinguish between regular and altered objects, field $type is added to altered ones. - * Also value of that field specifies kind of the object. - */ -export namespace ObjectsTransferrer { - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - export function replacer(key: string | undefined, value: any): any { - if (value instanceof URI) { - return { - $type: SerializedObjectType.THEIA_URI, - data: value.toString() - } as SerializedObject; - } else if (value instanceof Range) { - const range = value as Range; - const serializedValue = { - start: { - line: range.start.line, - character: range.start.character - }, - end: { - line: range.end.line, - character: range.end.character - } - }; - return { - $type: SerializedObjectType.THEIA_RANGE, - data: JSON.stringify(serializedValue) - } as SerializedObject; - } else if (value && value['$mid'] === 1) { - // Given value is VSCode URI - // We cannot use instanceof here because VSCode URI has toJSON method which is invoked before this replacer. - const uri = VSCodeURI.revive(value); - return { - $type: SerializedObjectType.VSCODE_URI, - data: uri.toString() - } as SerializedObject; - } else if (value instanceof BinaryBuffer) { - const bytes = [...value.buffer.values()]; - return { - $type: SerializedObjectType.TEXT_BUFFER, - data: JSON.stringify({ bytes }) - }; - } + protected sendAccumulated(): void { + const cachedMessages = this.messagesToSend; + this.messagesToSend = []; + const writer = this.underlyingChannel.getWriteBuffer(); - return value; - } + if (cachedMessages.length > 0) { + writer.writeLength(cachedMessages.length); + cachedMessages.forEach(msg => { + writer.writeBytes(msg); + }); - // eslint-disable-next-line @typescript-eslint/no-explicit-any - export function reviver(key: string | undefined, value: any): any { - if (isSerializedObject(value)) { - switch (value.$type) { - case SerializedObjectType.THEIA_URI: - return new URI(value.data); - case SerializedObjectType.VSCODE_URI: - return VSCodeURI.parse(value.data); - case SerializedObjectType.THEIA_RANGE: - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const obj: any = JSON.parse(value.data); - const start = new Position(obj.start.line, obj.start.character); - const end = new Position(obj.end.line, obj.end.character); - return new Range(start, end); - case SerializedObjectType.TEXT_BUFFER: - const data: { bytes: number[] } = JSON.parse(value.data); - return BinaryBuffer.wrap(Uint8Array.from(data.bytes)); - } } - - return value; + writer.commit(); } -} - -interface SerializedObject { - $type: SerializedObjectType; - data: string; -} - -enum SerializedObjectType { - THEIA_URI, - VSCODE_URI, - THEIA_RANGE, - TEXT_BUFFER -} - -function isSerializedObject(obj: any): obj is SerializedObject { - return obj && obj.$type !== undefined && obj.data !== undefined; -} - -export const enum MessageType { - Request = 1, - Reply = 2, - ReplyErr = 3, - Cancel = 4, - Terminate = 5, - Terminated = 6 -} - -class CancelMessage { - type: MessageType.Cancel; - id: string; -} - -class RequestMessage { - type: MessageType.Request; - id: string; - proxyId: string; - method: string; - args: any[]; -} - -class ReplyMessage { - type: MessageType.Reply; - id: string; - res: any; -} - -class ReplyErrMessage { - type: MessageType.ReplyErr; - id: string; - err: SerializedError; -} - -type RPCMessage = RequestMessage | ReplyMessage | ReplyErrMessage | CancelMessage; - -export interface SerializedError { - readonly $isError: true; - readonly name: string; - readonly message: string; - readonly stack: string; -} + protected override handleMessage(buffer: ReadBuffer): void { + // Read in the list of messages and handle each message individually + const length = buffer.readLength(); + if (length > 0) { + for (let index = 0; index < length; index++) { + const message = buffer.readBytes(); + this.handleSingleMessage(new Uint8ArrayReadBuffer(message)); -export function transformErrorForSerialization(error: Error): SerializedError { - if (error instanceof Error) { - const { name, message } = error; - const stack: string = (error).stacktrace || error.stack; - return { - $isError: true, - name, - message, - stack - }; + } + } } - // return as is - return error; -} - -interface JSONStringifyReplacer { - (key: string, value: any): any; -} - -function safeStringify(obj: any, replacer?: JSONStringifyReplacer): string { - try { - return JSON.stringify(obj, replacer); - } catch (err) { - console.error('error stringifying response: ', err); - return 'null'; + protected handleSingleMessage(buffer: ReadBuffer): void { + return super.handleMessage(buffer); } } diff --git a/packages/plugin-ext/src/hosted/browser/hosted-plugin-watcher.ts b/packages/plugin-ext/src/hosted/browser/hosted-plugin-watcher.ts index 3c80f8d33d410..6fb3a0c55932b 100644 --- a/packages/plugin-ext/src/hosted/browser/hosted-plugin-watcher.ts +++ b/packages/plugin-ext/src/hosted/browser/hosted-plugin-watcher.ts @@ -21,7 +21,8 @@ import { LogPart } from '../../common/types'; @injectable() export class HostedPluginWatcher { - private onPostMessage = new Emitter<{ pluginHostId: string, message: string }>(); + private onPostMessage = new Emitter<{ pluginHostId: string, message: Uint8Array }>(); + private onLogMessage = new Emitter(); private readonly onDidDeployEmitter = new Emitter(); @@ -31,7 +32,7 @@ export class HostedPluginWatcher { const messageEmitter = this.onPostMessage; const logEmitter = this.onLogMessage; return { - postMessage(pluginHostId, message: string): Promise { + postMessage(pluginHostId, message: Uint8Array): Promise { messageEmitter.fire({ pluginHostId, message }); return Promise.resolve(); }, @@ -43,7 +44,7 @@ export class HostedPluginWatcher { }; } - get onPostMessageEvent(): Event<{ pluginHostId: string, message: string }> { + get onPostMessageEvent(): Event<{ pluginHostId: string, message: Uint8Array }> { return this.onPostMessage.event; } diff --git a/packages/plugin-ext/src/hosted/browser/hosted-plugin.ts b/packages/plugin-ext/src/hosted/browser/hosted-plugin.ts index bd69ccf2ac528..63abbfc5c34a1 100644 --- a/packages/plugin-ext/src/hosted/browser/hosted-plugin.ts +++ b/packages/plugin-ext/src/hosted/browser/hosted-plugin.ts @@ -33,7 +33,7 @@ import { RPCProtocol, RPCProtocolImpl } from '../../common/rpc-protocol'; import { Disposable, DisposableCollection, Emitter, isCancelled, ILogger, ContributionProvider, CommandRegistry, WillExecuteCommandEvent, - CancellationTokenSource, JsonRpcProxy, ProgressService, nls + CancellationTokenSource, JsonRpcProxy, ProgressService, nls, ChannelCloseEvent, MessageProvider } from '@theia/core'; import { PreferenceServiceImpl, PreferenceProviderProvider } from '@theia/core/lib/browser/preferences'; import { WorkspaceService } from '@theia/workspace/lib/browser'; @@ -66,6 +66,7 @@ import { StandaloneServices } from '@theia/monaco-editor-core/esm/vs/editor/stan import { ILanguageService } from '@theia/monaco-editor-core/esm/vs/editor/common/languages/language'; import { LanguageService } from '@theia/monaco-editor-core/esm/vs/editor/common/services/languageService'; import { Measurement, Stopwatch } from '@theia/core/lib/common'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer'; export type PluginHost = 'frontend' | string; export type DebugActivationEvent = 'onDebugResolve' | 'onDebugInitialConfigurations' | 'onDebugAdapterProtocolTracker' | 'onDebugDynamicConfigurations'; @@ -521,20 +522,38 @@ export class HostedPluginSupport { } private createServerRpc(pluginHostId: string): RPCProtocol { - const emitter = new Emitter(); + + const onCloseEmitter = new Emitter(); + const onErrorEmitter = new Emitter(); + const onMessageEmitter = new Emitter(); + + // Create RPC protocol before adding the listener to the watcher to receive the watcher's cached messages after the rpc protocol was created. + const rpc = new RPCProtocolImpl({ + close: () => { + onCloseEmitter.dispose(); + onErrorEmitter.dispose(); + onMessageEmitter.dispose(); + }, + getWriteBuffer: () => { + const writer = new Uint8ArrayWriteBuffer(); + writer.onCommit(buffer => { + this.server.onMessage(pluginHostId, buffer); + }); + return writer; + }, + onClose: onCloseEmitter.event, + onError: onErrorEmitter.event, + onMessage: onMessageEmitter.event + }); + this.watcher.onPostMessageEvent(received => { if (pluginHostId === received.pluginHostId) { - emitter.fire(received.message); - } - }); - return new RPCProtocolImpl({ - onMessage: emitter.event, - send: message => { - this.server.onMessage(pluginHostId, message); + onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(received.message)); } }); - } + return rpc; + } private async updateStoragePath(): Promise { const path = await this.getStoragePath(); for (const manager of this.managers.values()) { diff --git a/packages/plugin-ext/src/hosted/browser/plugin-worker.ts b/packages/plugin-ext/src/hosted/browser/plugin-worker.ts index 874ad820448e6..98c222f32a432 100644 --- a/packages/plugin-ext/src/hosted/browser/plugin-worker.ts +++ b/packages/plugin-ext/src/hosted/browser/plugin-worker.ts @@ -16,6 +16,8 @@ import { injectable } from '@theia/core/shared/inversify'; import { Emitter } from '@theia/core/lib/common/event'; import { RPCProtocol, RPCProtocolImpl } from '../../common/rpc-protocol'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer'; +import { ChannelCloseEvent, MessageProvider } from '@theia/core/lib/common/message-rpc'; @injectable() export class PluginWorker { @@ -35,11 +37,34 @@ export class PluginWorker { this.worker.onmessage = m => emitter.fire(m.data); this.worker.onerror = e => console.error(e); + const onCloseEmitter = new Emitter(); + const onMessageEmitter = new Emitter(); + const onErrorEmitter = new Emitter(); + + // eslint-disable-next-line arrow-body-style + this.worker.onmessage = buffer => onMessageEmitter.fire(() => { + return new Uint8ArrayReadBuffer(buffer.data); + }); + + this.worker.onerror = e => onErrorEmitter.fire(e); + onErrorEmitter.event(e => console.error(e)); + this.rpc = new RPCProtocolImpl({ - onMessage: emitter.event, - send: (m: string) => { - this.worker.postMessage(m); - } + close: () => { + onCloseEmitter.dispose(); + onErrorEmitter.dispose(); + onMessageEmitter.dispose(); + }, + getWriteBuffer: () => { + const writer = new Uint8ArrayWriteBuffer(); + writer.onCommit(buffer => { + this.worker.postMessage(buffer); + }); + return writer; + }, + onClose: onCloseEmitter.event, + onError: onErrorEmitter.event, + onMessage: onMessageEmitter.event }); } diff --git a/packages/plugin-ext/src/hosted/browser/worker/worker-main.ts b/packages/plugin-ext/src/hosted/browser/worker/worker-main.ts index 4176d07b499c0..221cdd3c755c2 100644 --- a/packages/plugin-ext/src/hosted/browser/worker/worker-main.ts +++ b/packages/plugin-ext/src/hosted/browser/worker/worker-main.ts @@ -15,7 +15,6 @@ // ***************************************************************************** import { Emitter } from '@theia/core/lib/common/event'; -import { RPCProtocolImpl } from '../../../common/rpc-protocol'; import { PluginManagerExtImpl } from '../../../plugin/plugin-manager'; import { MAIN_RPC_CONTEXT, Plugin, emptyPlugin, TerminalServiceExt } from '../../../common/plugin-api-rpc'; import { createAPIFactory } from '../../../plugin/plugin-context'; @@ -33,8 +32,10 @@ import { KeyValueStorageProxy } from '../../../plugin/plugin-storage'; import { WebviewsExtImpl } from '../../../plugin/webviews'; import { loadManifest } from './plugin-manifest-loader'; import { TerminalServiceExtImpl } from '../../../plugin/terminal-ext'; -import { reviver } from '../../../plugin/types-impl'; import { SecretsExtImpl } from '../../../plugin/secrets-ext'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer'; +import { ChannelCloseEvent, MessageProvider } from '@theia/core'; +import { RPCProtocolImpl } from '../../../common/rpc-protocol'; // eslint-disable-next-line @typescript-eslint/no-explicit-any const ctx = self as any; @@ -42,20 +43,31 @@ const ctx = self as any; const pluginsApiImpl = new Map(); const pluginsModulesNames = new Map(); -const emitter = new Emitter(); -const rpc = new RPCProtocolImpl({ - onMessage: emitter.event, - send: (m: string) => { - ctx.postMessage(m); - }, -}, -{ - reviver: reviver -}); +const onCloseEmitter = new Emitter(); +const onErrorEmitter = new Emitter(); +const onMessageEmitter = new Emitter(); // eslint-disable-next-line @typescript-eslint/no-explicit-any addEventListener('message', (message: any) => { - emitter.fire(message.data); + onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(message.data)); +}); + +const rpc = new RPCProtocolImpl({ + close: () => { + onCloseEmitter.dispose(); + onErrorEmitter.dispose(); + onMessageEmitter.dispose(); + }, + getWriteBuffer: () => { + const writeBuffer = new Uint8ArrayWriteBuffer(); + writeBuffer.onCommit(buffer => { + ctx.postMessage(buffer); + }); + return writeBuffer; + }, + onClose: onCloseEmitter.event, + onError: onErrorEmitter.event, + onMessage: onMessageEmitter.event }); const scripts = new Set(); diff --git a/packages/plugin-ext/src/hosted/node/hosted-plugin-process.ts b/packages/plugin-ext/src/hosted/node/hosted-plugin-process.ts index 9ef8a12350b71..d1f65687fe5c1 100644 --- a/packages/plugin-ext/src/hosted/node/hosted-plugin-process.ts +++ b/packages/plugin-ext/src/hosted/node/hosted-plugin-process.ts @@ -14,16 +14,18 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import * as cp from 'child_process'; -import { injectable, inject, named } from '@theia/core/shared/inversify'; -import { ILogger, ConnectionErrorHandler, ContributionProvider, MessageService } from '@theia/core/lib/common'; +import { ConnectionErrorHandler, ContributionProvider, ILogger, MessageService } from '@theia/core/lib/common'; +import { Deferred } from '@theia/core/lib/common/promise-util'; import { createIpcEnv } from '@theia/core/lib/node/messaging/ipc-protocol'; -import { HostedPluginClient, ServerPluginRunner, PluginHostEnvironmentVariable, DeployedPlugin, PLUGIN_HOST_BACKEND } from '../../common/plugin-protocol'; -import { MessageType } from '../../common/rpc-protocol'; -import { HostedPluginCliContribution } from './hosted-plugin-cli-contribution'; +import { inject, injectable, named } from '@theia/core/shared/inversify'; +import * as cp from 'child_process'; import * as psTree from 'ps-tree'; -import { Deferred } from '@theia/core/lib/common/promise-util'; +import { Duplex } from 'stream'; +import { DeployedPlugin, HostedPluginClient, PluginHostEnvironmentVariable, PLUGIN_HOST_BACKEND, ServerPluginRunner } from '../../common/plugin-protocol'; +import { HostedPluginCliContribution } from './hosted-plugin-cli-contribution'; import { HostedPluginLocalizationService } from './hosted-plugin-localization-service'; +import { ProcessTerminatedMessage, ProcessTerminateMessage } from './hosted-plugin-protocol'; +import { BinaryMessagePipe } from '@theia/core/lib/node/messaging/binary-message-pipe'; export interface IPCConnectionOptions { readonly serverName: string; @@ -60,6 +62,7 @@ export class HostedPluginProcess implements ServerPluginRunner { protected readonly localizationService: HostedPluginLocalizationService; private childProcess: cp.ChildProcess | undefined; + private messagePipe?: BinaryMessagePipe; private client: HostedPluginClient; private terminatingPluginServer = false; @@ -82,14 +85,14 @@ export class HostedPluginProcess implements ServerPluginRunner { } // eslint-disable-next-line @typescript-eslint/no-explicit-any - public acceptMessage(pluginHostId: string, message: string): boolean { + public acceptMessage(pluginHostId: string, message: Uint8Array): boolean { return pluginHostId === 'main'; } // eslint-disable-next-line @typescript-eslint/no-explicit-any - public onMessage(pluginHostId: string, jsonMessage: string): void { - if (this.childProcess) { - this.childProcess.send(jsonMessage); + public onMessage(pluginHostId: string, message: Uint8Array): void { + if (this.messagePipe) { + this.messagePipe.send(message); } } @@ -106,12 +109,12 @@ export class HostedPluginProcess implements ServerPluginRunner { const waitForTerminated = new Deferred(); cp.on('message', message => { const msg = JSON.parse(message); - if ('type' in msg && msg.type === MessageType.Terminated) { + if (ProcessTerminatedMessage.is(msg)) { waitForTerminated.resolve(); } }); const stopTimeout = this.cli.pluginHostStopTimeout; - cp.send(JSON.stringify({ type: MessageType.Terminate, stopTimeout })); + cp.send(JSON.stringify({ type: ProcessTerminateMessage.TYPE, stopTimeout })); const terminateTimeout = this.cli.pluginHostTerminateTimeout; if (terminateTimeout) { @@ -156,9 +159,11 @@ export class HostedPluginProcess implements ServerPluginRunner { logger: this.logger, args: [] }); - this.childProcess.on('message', message => { + + this.messagePipe = new BinaryMessagePipe(this.childProcess.stdio[4] as Duplex); + this.messagePipe.onMessage(buffer => { if (this.client) { - this.client.postMessage(PLUGIN_HOST_BACKEND, message); + this.client.postMessage(PLUGIN_HOST_BACKEND, buffer); } }); } @@ -184,7 +189,7 @@ export class HostedPluginProcess implements ServerPluginRunner { silent: true, env: env, execArgv: [], - stdio: ['pipe', 'pipe', 'pipe', 'ipc'] + stdio: ['pipe', 'pipe', 'pipe', 'ipc', 'pipe'] }; const inspectArgPrefix = `--${options.serverName}-inspect`; const inspectArg = process.argv.find(v => v.startsWith(inspectArgPrefix)); diff --git a/packages/plugin-ext/src/hosted/node/hosted-plugin-protocol.ts b/packages/plugin-ext/src/hosted/node/hosted-plugin-protocol.ts new file mode 100644 index 0000000000000..415c22738d8ae --- /dev/null +++ b/packages/plugin-ext/src/hosted/node/hosted-plugin-protocol.ts @@ -0,0 +1,50 @@ +// ***************************************************************************** +// Copyright (C) 2022 STMicroelectronics and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** + +// Custom message protocol between `HostedPluginProcess` and its `PluginHost` child process. + +/** + * Sent to initiate termination of the counterpart process. + */ +export interface ProcessTerminateMessage { + type: typeof ProcessTerminateMessage.TYPE, + stopTimeout?: number +} + +export namespace ProcessTerminateMessage { + export const TYPE = 0; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + export function is(object: any): object is ProcessTerminateMessage { + return typeof object === 'object' && object.type === TYPE; + } +} + +/** + * Sent to inform the counter part process that the process termination has been completed. + */ + +export interface ProcessTerminatedMessage { + type: typeof ProcessTerminateMessage.TYPE, +} + +export namespace ProcessTerminatedMessage { + export const TYPE = 1; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + export function is(object: any): object is ProcessTerminateMessage { + return typeof object === 'object' && object.type === TYPE; + } +} + diff --git a/packages/plugin-ext/src/hosted/node/hosted-plugin.ts b/packages/plugin-ext/src/hosted/node/hosted-plugin.ts index 937f1a0be7d27..237b93bda41c3 100644 --- a/packages/plugin-ext/src/hosted/node/hosted-plugin.ts +++ b/packages/plugin-ext/src/hosted/node/hosted-plugin.ts @@ -71,7 +71,7 @@ export class HostedPluginSupport { } } - onMessage(pluginHostId: string, message: string): void { + onMessage(pluginHostId: string, message: Uint8Array): void { // need to perform routing // eslint-disable-next-line @typescript-eslint/no-explicit-any if (this.pluginRunners.length > 0) { diff --git a/packages/plugin-ext/src/hosted/node/plugin-host.ts b/packages/plugin-ext/src/hosted/node/plugin-host.ts index fa54f21beb537..24b67a6304b83 100644 --- a/packages/plugin-ext/src/hosted/node/plugin-host.ts +++ b/packages/plugin-ext/src/hosted/node/plugin-host.ts @@ -13,11 +13,12 @@ // // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** - -import { Emitter } from '@theia/core/lib/common/event'; -import { RPCProtocolImpl, MessageType, ConnectionClosedError } from '../../common/rpc-protocol'; +// eslint-disable-next-line import/no-extraneous-dependencies +import 'reflect-metadata'; +import { ConnectionClosedError, RPCProtocolImpl } from '../../common/rpc-protocol'; +import { ProcessTerminatedMessage, ProcessTerminateMessage } from './hosted-plugin-protocol'; import { PluginHostRPC } from './plugin-host-rpc'; -import { reviver } from '../../plugin/types-impl'; +import { IPCChannel } from '@theia/core/lib/node'; console.log('PLUGIN_HOST(' + process.pid + ') starting instance'); @@ -74,18 +75,8 @@ process.on('rejectionHandled', (promise: Promise) => { }); let terminating = false; -const emitter = new Emitter(); -const rpc = new RPCProtocolImpl({ - onMessage: emitter.event, - send: (m: string) => { - if (process.send && !terminating) { - process.send(m); - } - } -}, -{ - reviver: reviver -}); +const channel = new IPCChannel(); +const rpc = new RPCProtocolImpl(channel); process.on('message', async (message: string) => { if (terminating) { @@ -93,10 +84,9 @@ process.on('message', async (message: string) => { } try { const msg = JSON.parse(message); - if ('type' in msg && msg.type === MessageType.Terminate) { + if (ProcessTerminateMessage.is(msg)) { terminating = true; - emitter.dispose(); - if ('stopTimeout' in msg && typeof msg.stopTimeout === 'number' && msg.stopTimeout) { + if (msg.stopTimeout) { await Promise.race([ pluginHostRPC.terminate(), new Promise(resolve => setTimeout(resolve, msg.stopTimeout)) @@ -106,10 +96,9 @@ process.on('message', async (message: string) => { } rpc.dispose(); if (process.send) { - process.send(JSON.stringify({ type: MessageType.Terminated })); + process.send(JSON.stringify({ type: ProcessTerminatedMessage.TYPE })); } - } else { - emitter.fire(message); + } } catch (e) { console.error(e); diff --git a/packages/plugin-ext/src/hosted/node/plugin-service.ts b/packages/plugin-ext/src/hosted/node/plugin-service.ts index 8929a9d155de2..fdca45e49e22d 100644 --- a/packages/plugin-ext/src/hosted/node/plugin-service.ts +++ b/packages/plugin-ext/src/hosted/node/plugin-service.ts @@ -108,7 +108,7 @@ export class HostedPluginServerImpl implements HostedPluginServer { return Promise.all(plugins.map(plugin => this.localizationService.localizePlugin(plugin))); } - onMessage(pluginHostId: string, message: string): Promise { + onMessage(pluginHostId: string, message: Uint8Array): Promise { this.hostedPlugin.onMessage(pluginHostId, message); return Promise.resolve(); } diff --git a/packages/plugin-ext/src/main/browser/main-context.ts b/packages/plugin-ext/src/main/browser/main-context.ts index 92cb03bc8ae23..c5699790b864e 100644 --- a/packages/plugin-ext/src/main/browser/main-context.ts +++ b/packages/plugin-ext/src/main/browser/main-context.ts @@ -57,7 +57,6 @@ import { CustomEditorsMainImpl } from './custom-editors/custom-editors-main'; import { SecretsMainImpl } from './secrets-main'; import { WebviewViewsMainImpl } from './webview-views/webview-views-main'; import { MonacoLanguages } from '@theia/monaco/lib/browser/monaco-languages'; -import { NotificationExtImpl } from '../../plugin/notification'; export function setUpPluginApi(rpc: RPCProtocol, container: interfaces.Container): void { const authenticationMain = new AuthenticationMainImpl(rpc, container); @@ -109,9 +108,6 @@ export function setUpPluginApi(rpc: RPCProtocol, container: interfaces.Container const notificationMain = new NotificationMainImpl(rpc, container); rpc.set(PLUGIN_RPC_CONTEXT.NOTIFICATION_MAIN, notificationMain); - const notificationExt = new NotificationExtImpl(rpc); - rpc.set(MAIN_RPC_CONTEXT.NOTIFICATION_EXT, notificationExt); - const terminalMain = new TerminalServiceMainImpl(rpc, container); rpc.set(PLUGIN_RPC_CONTEXT.TERMINAL_MAIN, terminalMain); diff --git a/packages/plugin-ext/src/plugin/types-impl.ts b/packages/plugin-ext/src/plugin/types-impl.ts index b4cdab5090ad4..3b0dd53da7c85 100644 --- a/packages/plugin-ext/src/plugin/types-impl.ts +++ b/packages/plugin-ext/src/plugin/types-impl.ts @@ -31,21 +31,8 @@ import { startsWithIgnoreCase } from '@theia/core/lib/common/strings'; import { SymbolKind } from '../common/plugin-api-rpc-model'; import { FileSystemProviderErrorCode, markAsFileSystemProviderError } from '@theia/filesystem/lib/common/files'; import * as paths from 'path'; -import { ObjectsTransferrer } from '../common/rpc-protocol'; import { es5ClassCompat } from '../common/types'; -/** - * A reviver that takes URI's transferred via JSON.stringify() and makes - * instances of our local plugin API URI class (below) - */ -export function reviver(key: string | undefined, value: any): any { - const revived = ObjectsTransferrer.reviver(key, value); - if (CodeURI.isUri(revived)) { - return URI.revive(revived); - } - return revived; -} - /** * This is an implementation of #theia.Uri based on vscode-uri. * This is supposed to fix https://github.com/eclipse-theia/theia/issues/8752