diff --git a/CHANGELOG.md b/CHANGELOG.md index 7765aeca41716..35cf3cfc706c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,17 @@ - [plugin] Introduce `DebugSession#workspaceFolder` [#11090](https://github.com/eclipse-theia/theia/pull/11090) - Contributed on behalf of STMicroelectronics +[Breaking Changes:](#breaking_changes_1.26.0) + +- [core] Refactored the core messaging API. Replaced `vscode-ws-jsonrpc` with a custom RPC protocol that is better suited for handling binary data and enables message tunneling. + This impacts all main concepts of the messaging API. The API no longer exposes a `Connection` object and uses a generic `Channel` implementation instead. + - Replaces usage of `vscode-json-rpc`'s `Connection` with the new generic `Channel`. Affects `AbstractConnectionProvider`, `MessagingService`, `IPCConnectionProvider`, `ElectronMessagingService` + - `MessagingService`: No longer offers the `listen` and `forward` method. Use `wsChannel` instead. + - `RemoteFileSystemServer`: Use `UInt8Array` instead of plain number arrays for all arguments and return type that store binary data + - `DebugAdapter`: Replaced the debug-service internal `Channel` implementation with the newly introduced generic `Channel`. + [#11011](https://github.com/eclipse-theia/theia/pull/11011) - Contributed on behalf of STMicroelectronics. + + ## v1.25.0 - 4/28/2022 [1.25.0 Milestone](https://github.com/eclipse-theia/theia/milestone/35) diff --git a/package.json b/package.json index 950701b6136a3..6b294c3fed0c2 100644 --- a/package.json +++ b/package.json @@ -10,6 +10,8 @@ "**/@types/node": "12" }, "devDependencies": { + "@types/chai": "4.3.0", + "@types/chai-spies": "1.0.3", "@types/chai-string": "^1.4.0", "@types/jsdom": "^11.0.4", "@types/node": "12", @@ -20,6 +22,8 @@ "@typescript-eslint/eslint-plugin": "^4.8.1", "@typescript-eslint/eslint-plugin-tslint": "^4.8.1", "@typescript-eslint/parser": "^4.8.1", + "chai": "4.3.4", + "chai-spies": "1.0.0", "chai-string": "^1.4.0", "chalk": "4.0.0", "concurrently": "^3.5.0", diff --git a/packages/core/README.md b/packages/core/README.md index 64537785cf2f5..9ddef5f7b5b30 100644 --- a/packages/core/README.md +++ b/packages/core/README.md @@ -96,7 +96,6 @@ export class SomeClass { - `react-virtualized` (from [`react-virtualized@^9.20.0`](https://www.npmjs.com/package/react-virtualized)) - `vscode-languageserver-protocol` (from [`vscode-languageserver-protocol@~3.15.3`](https://www.npmjs.com/package/vscode-languageserver-protocol)) - `vscode-uri` (from [`vscode-uri@^2.1.1`](https://www.npmjs.com/package/vscode-uri)) - - `vscode-ws-jsonrpc` (from [`vscode-ws-jsonrpc@^0.2.0`](https://www.npmjs.com/package/vscode-ws-jsonrpc)) - `dompurify` (from [`dompurify@^2.2.9`](https://www.npmjs.com/package/dompurify)) - `express` (from [`express@^4.16.3`](https://www.npmjs.com/package/express)) - `lodash.debounce` (from [`lodash.debounce@^4.0.8`](https://www.npmjs.com/package/lodash.debounce)) diff --git a/packages/core/package.json b/packages/core/package.json index 01c2c52c39c8d..df71d5dfcc1ee 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -68,7 +68,6 @@ "uuid": "^8.3.2", "vscode-languageserver-protocol": "~3.15.3", "vscode-uri": "^2.1.1", - "vscode-ws-jsonrpc": "^0.2.0", "ws": "^7.1.2", "yargs": "^15.3.1" }, @@ -108,8 +107,7 @@ "react-dom", "react-virtualized", "vscode-languageserver-protocol", - "vscode-uri", - "vscode-ws-jsonrpc" + "vscode-uri" ], "export =": [ "dompurify as DOMPurify", diff --git a/packages/core/shared/vscode-ws-jsonrpc/index.d.ts b/packages/core/shared/vscode-ws-jsonrpc/index.d.ts deleted file mode 100644 index b11ff897103ed..0000000000000 --- a/packages/core/shared/vscode-ws-jsonrpc/index.d.ts +++ /dev/null @@ -1 +0,0 @@ -export * from 'vscode-ws-jsonrpc'; diff --git a/packages/core/shared/vscode-ws-jsonrpc/index.js b/packages/core/shared/vscode-ws-jsonrpc/index.js deleted file mode 100644 index 3aed560b82d62..0000000000000 --- a/packages/core/shared/vscode-ws-jsonrpc/index.js +++ /dev/null @@ -1 +0,0 @@ -module.exports = require('vscode-ws-jsonrpc'); diff --git a/packages/core/src/browser/messaging/ws-connection-provider.ts b/packages/core/src/browser/messaging/ws-connection-provider.ts index f83aabda22826..d5ec2dce20c4d 100644 --- a/packages/core/src/browser/messaging/ws-connection-provider.ts +++ b/packages/core/src/browser/messaging/ws-connection-provider.ts @@ -15,11 +15,11 @@ // ***************************************************************************** import { injectable, interfaces, decorate, unmanaged } from 'inversify'; -import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event } from '../../common'; -import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; +import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event, Channel } from '../../common'; import { Endpoint } from '../endpoint'; import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider'; import { io, Socket } from 'socket.io-client'; +import { IWebSocket, WebSocketChannel } from '../../common/messaging/web-socket-channel'; decorate(injectable(), JsonRpcProxyFactory); decorate(unmanaged(), JsonRpcProxyFactory, 0); @@ -35,6 +35,8 @@ export interface WebSocketOptions { export class WebSocketConnectionProvider extends AbstractConnectionProvider { protected readonly onSocketDidOpenEmitter: Emitter = new Emitter(); + // Socket that is used by the main channel + protected socket: Socket; get onSocketDidOpen(): Event { return this.onSocketDidOpenEmitter.event; } @@ -48,31 +50,23 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider(path, arg); } - protected readonly socket: Socket; - - constructor() { - super(); + protected createMainChannel(): Channel { const url = this.createWebSocketUrl(WebSocketChannel.wsPath); const socket = this.createWebSocket(url); + const channel = new WebSocketChannel(toIWebSocket(socket)); socket.on('connect', () => { this.fireSocketDidOpen(); }); - socket.on('disconnect', reason => { - for (const channel of [...this.channels.values()]) { - channel.close(undefined, reason); - } - this.fireSocketDidClose(); - }); - socket.on('message', data => { - this.handleIncomingRawMessage(data); - }); + channel.onClose(() => this.fireSocketDidClose()); socket.connect(); this.socket = socket; + + return channel; } - override openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: WebSocketOptions): void { + override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise { if (this.socket.connected) { - super.openChannel(path, handler, options); + return super.openChannel(path, handler, options); } else { const openChannel = () => { this.socket.off('connect', openChannel); @@ -82,14 +76,6 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider { - if (this.socket.connected) { - this.socket.send(content); - } - }); - } - /** * @param path The handler to reach in the backend. */ @@ -143,3 +129,20 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider { + socket.removeAllListeners('disconnect'); + socket.removeAllListeners('error'); + socket.removeAllListeners('message'); + socket.close(); + }, + isConnected: () => socket.connected, + onClose: cb => socket.on('disconnect', reason => cb(reason)), + onError: cb => socket.on('error', reason => cb(reason)), + onMessage: cb => socket.on('message', data => cb(data)), + send: message => socket.emit('message', message) + }; +} + diff --git a/packages/core/src/browser/progress-status-bar-item.ts b/packages/core/src/browser/progress-status-bar-item.ts index 6c882bdb90de9..d63134edb146a 100644 --- a/packages/core/src/browser/progress-status-bar-item.ts +++ b/packages/core/src/browser/progress-status-bar-item.ts @@ -15,7 +15,7 @@ // ***************************************************************************** import { injectable, inject } from 'inversify'; -import { CancellationToken } from 'vscode-ws-jsonrpc'; +import { CancellationToken } from '../../shared/vscode-languageserver-protocol'; import { ProgressClient, ProgressMessage, ProgressUpdate } from '../common'; import { StatusBar, StatusBarAlignment } from './status-bar'; import { Deferred } from '../common/promise-util'; diff --git a/packages/core/src/common/index.ts b/packages/core/src/common/index.ts index 5c944c157087a..e82ecddfa1268 100644 --- a/packages/core/src/common/index.ts +++ b/packages/core/src/common/index.ts @@ -29,6 +29,7 @@ export * from './contribution-provider'; export * from './path'; export * from './logger'; export * from './messaging'; +export * from './message-rpc'; export * from './message-service'; export * from './message-service-protocol'; export * from './progress-service'; diff --git a/packages/core/src/common/message-rpc/channel.spec.ts b/packages/core/src/common/message-rpc/channel.spec.ts new file mode 100644 index 0000000000000..43579ec957c6c --- /dev/null +++ b/packages/core/src/common/message-rpc/channel.spec.ts @@ -0,0 +1,88 @@ +// ***************************************************************************** +// Copyright (C) 2021 Red Hat, Inc. and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** + +import { assert, expect, spy, use } from 'chai'; +import * as spies from 'chai-spies'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; +import { ChannelMultiplexer, ForwardingChannel, MessageProvider } from './channel'; + +use(spies); + +/** + * A pipe with two channels at each end for testing. + */ +export class ChannelPipe { + readonly left: ForwardingChannel = new ForwardingChannel('left', () => this.right.onCloseEmitter.fire({ reason: 'Left channel has been closed' }), () => { + const leftWrite = new Uint8ArrayWriteBuffer(); + leftWrite.onCommit(buffer => { + this.right.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer)); + }); + return leftWrite; + }); + readonly right: ForwardingChannel = new ForwardingChannel('right', () => this.left.onCloseEmitter.fire({ reason: 'Right channel has been closed' }), () => { + const rightWrite = new Uint8ArrayWriteBuffer(); + rightWrite.onCommit(buffer => { + this.left.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer)); + }); + return rightWrite; + }); +} +describe('Message Channel', () => { + describe('Channel multiplexer', () => { + it('should forward messages to intended target channel', async () => { + const pipe = new ChannelPipe(); + + const leftMultiplexer = new ChannelMultiplexer(pipe.left); + const rightMultiplexer = new ChannelMultiplexer(pipe.right); + const openChannelSpy = spy(() => { + }); + + rightMultiplexer.onDidOpenChannel(openChannelSpy); + leftMultiplexer.onDidOpenChannel(openChannelSpy); + + const leftFirst = await leftMultiplexer.open('first'); + const leftSecond = await leftMultiplexer.open('second'); + + const rightFirst = rightMultiplexer.getOpenChannel('first'); + const rightSecond = rightMultiplexer.getOpenChannel('second'); + + assert.isNotNull(rightFirst); + assert.isNotNull(rightSecond); + + const leftSecondSpy = spy((buf: MessageProvider) => { + const message = buf().readString(); + expect(message).equal('message for second'); + }); + + leftSecond.onMessage(leftSecondSpy); + + const rightFirstSpy = spy((buf: MessageProvider) => { + const message = buf().readString(); + expect(message).equal('message for first'); + }); + + rightFirst!.onMessage(rightFirstSpy); + + leftFirst.getWriteBuffer().writeString('message for first').commit(); + rightSecond!.getWriteBuffer().writeString('message for second').commit(); + + expect(leftSecondSpy).to.be.called(); + expect(rightFirstSpy).to.be.called(); + + expect(openChannelSpy).to.be.called.exactly(4); + }); + }); +}); diff --git a/packages/core/src/common/message-rpc/channel.ts b/packages/core/src/common/message-rpc/channel.ts new file mode 100644 index 0000000000000..e4279b43d5173 --- /dev/null +++ b/packages/core/src/common/message-rpc/channel.ts @@ -0,0 +1,248 @@ +// ***************************************************************************** +// Copyright (C) 2021 Red Hat, Inc. and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** + +import { Emitter, Event } from '../event'; +import { ReadBuffer, WriteBuffer } from './message-buffer'; + +/** + * A channel is a bidirectional communications channel with lifecycle and + * error signalling. Note that creation of channels is specific to particular + * implementations and thus not part of the protocol. + */ +export interface Channel { + + /** + * The remote side has closed the channel + */ + onClose: Event; + + /** + * An error has occurred while writing to or reading from the channel + */ + onError: Event; + + /** + * A message has arrived and can be read by listeners using a {@link MessageProvider}. + */ + onMessage: Event; + + /** + * Obtain a {@link WriteBuffer} to write a message to the channel. + */ + getWriteBuffer(): WriteBuffer; + + /** + * Close this channel. No {@link onClose} event should be sent + */ + close(): void; +} + +/** + * The event that is emitted when a channel is closed from the remote side. + */ +export interface ChannelCloseEvent { + reason: string, + code?: number +}; + +/** + * The `MessageProvider` is emitted when a channel receives a new message. + * Listeners can invoke the provider to obtain a new {@link ReadBuffer} for the received message. + * This ensures that each listener has its own isolated {@link ReadBuffer} instance. + * + */ +export type MessageProvider = () => ReadBuffer; + +/** + * Helper class to implement the single channels on a {@link ChannelMultiplexer}. Simply forwards write requests to + * the given write buffer source i.e. the main channel of the {@link ChannelMultiplexer}. + */ +export class ForwardingChannel implements Channel { + + constructor(readonly id: string, protected readonly closeHandler: () => void, protected readonly writeBufferSource: () => WriteBuffer) { + } + + onCloseEmitter: Emitter = new Emitter(); + get onClose(): Event { + return this.onCloseEmitter.event; + }; + + onErrorEmitter: Emitter = new Emitter(); + get onError(): Event { + return this.onErrorEmitter.event; + }; + + onMessageEmitter: Emitter = new Emitter(); + get onMessage(): Event { + return this.onMessageEmitter.event; + }; + + getWriteBuffer(): WriteBuffer { + return this.writeBufferSource(); + } + + send(message: Uint8Array): void { + const writeBuffer = this.getWriteBuffer(); + writeBuffer.writeBytes(message); + writeBuffer.commit(); + } + + close(): void { + this.closeHandler(); + } +} + +/** + * The different message types used in the messaging protocol of the {@link ChannelMultiplexer} + */ +export enum MessageTypes { + Open = 1, + Close = 2, + AckOpen = 3, + Data = 4 +} + +/** + * The write buffers in this implementation immediately write to the underlying + * channel, so we rely on writers to the multiplexed channels to always commit their + * messages and always in one go. + */ +export class ChannelMultiplexer { + protected pendingOpen: Map void> = new Map(); + protected openChannels: Map = new Map(); + + protected readonly onOpenChannelEmitter = new Emitter<{ id: string, channel: Channel }>(); + get onDidOpenChannel(): Event<{ id: string, channel: Channel }> { + return this.onOpenChannelEmitter.event; + } + + constructor(protected readonly underlyingChannel: Channel) { + this.underlyingChannel.onMessage(buffer => this.handleMessage(buffer())); + this.underlyingChannel.onClose(event => this.closeUnderlyingChannel(event)); + this.underlyingChannel.onError(error => this.handleError(error)); + } + + protected handleError(error: unknown): void { + this.openChannels.forEach(channel => { + channel.onErrorEmitter.fire(error); + }); + } + + closeUnderlyingChannel(event?: ChannelCloseEvent): void { + + this.pendingOpen.clear(); + this.openChannels.forEach(channel => { + channel.onCloseEmitter.fire(event ?? { reason: 'Multiplexer main channel has been closed from the remote side!' }); + }); + + this.openChannels.clear(); + } + + protected handleMessage(buffer: ReadBuffer): void { + const type = buffer.readUint8(); + const id = buffer.readString(); + switch (type) { + case MessageTypes.AckOpen: { + return this.handleAckOpen(id); + } + case MessageTypes.Open: { + return this.handleOpen(id); + } + case MessageTypes.Close: { + return this.handleClose(id); + } + case MessageTypes.Data: { + return this.handleData(id, buffer.sliceAtReadPosition()); + } + } + } + + protected handleAckOpen(id: string): void { + // edge case: both side try to open a channel at the same time. + const resolve = this.pendingOpen.get(id); + if (resolve) { + const channel = this.createChannel(id); + this.pendingOpen.delete(id); + this.openChannels.set(id, channel); + resolve!(channel); + this.onOpenChannelEmitter.fire({ id, channel }); + } + } + + protected handleOpen(id: string): void { + if (!this.openChannels.has(id)) { + const channel = this.createChannel(id); + this.openChannels.set(id, channel); + const resolve = this.pendingOpen.get(id); + if (resolve) { + // edge case: both side try to open a channel at the same time. + resolve(channel); + } + this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.AckOpen).writeString(id).commit(); + this.onOpenChannelEmitter.fire({ id, channel }); + } + } + + protected handleClose(id: string): void { + const channel = this.openChannels.get(id); + if (channel) { + channel.onCloseEmitter.fire({ reason: 'Channel has been closed from the remote side' }); + this.openChannels.delete(id); + } + } + + protected handleData(id: string, data: ReadBuffer): void { + const channel = this.openChannels.get(id); + if (channel) { + channel.onMessageEmitter.fire(() => data); + } + } + + protected createChannel(id: string): ForwardingChannel { + return new ForwardingChannel(id, () => this.closeChannel(id), () => this.prepareWriteBuffer(id)); + } + + // Prepare the write buffer for the channel with the give, id. The channel id has to be encoded + // and written to the buffer before the actual message. + protected prepareWriteBuffer(id: string): WriteBuffer { + const underlying = this.underlyingChannel.getWriteBuffer(); + underlying.writeUint8(MessageTypes.Data); + underlying.writeString(id); + return underlying; + } + + protected closeChannel(id: string): void { + this.underlyingChannel.getWriteBuffer() + .writeUint8(MessageTypes.Close) + .writeString(id) + .commit(); + + this.openChannels.delete(id); + } + + open(id: string): Promise { + const result = new Promise((resolve, reject) => { + this.pendingOpen.set(id, resolve); + }); + this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.Open).writeString(id).commit(); + return result; + } + + getOpenChannel(id: string): Channel | undefined { + return this.openChannels.get(id); + } +} + diff --git a/packages/core/src/node/messaging/logger.ts b/packages/core/src/common/message-rpc/index.ts similarity index 64% rename from packages/core/src/node/messaging/logger.ts rename to packages/core/src/common/message-rpc/index.ts index 45229f3f47423..671540a31c42b 100644 --- a/packages/core/src/node/messaging/logger.ts +++ b/packages/core/src/common/message-rpc/index.ts @@ -1,5 +1,5 @@ // ***************************************************************************** -// Copyright (C) 2017 TypeFox and others. +// Copyright (C) 2022 STMicroelectronics and others. // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License v. 2.0 which is available at @@ -13,25 +13,6 @@ // // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** - -import { Logger } from 'vscode-ws-jsonrpc'; - -export class ConsoleLogger implements Logger { - - error(message: string): void { - console.log(message); - } - - warn(message: string): void { - console.log(message); - } - - info(message: string): void { - console.log(message); - } - - log(message: string): void { - console.log(message); - } - -} +export { RequestHandler, RpcProtocol, RpcProtocolOptions } from './rpc-protocol'; +export { Channel, ChannelCloseEvent, MessageProvider } from './channel'; +export { ReadBuffer, WriteBuffer } from './message-buffer'; diff --git a/packages/core/src/common/message-rpc/message-buffer.ts b/packages/core/src/common/message-rpc/message-buffer.ts new file mode 100644 index 0000000000000..b7ea2dd85965a --- /dev/null +++ b/packages/core/src/common/message-rpc/message-buffer.ts @@ -0,0 +1,99 @@ +// ***************************************************************************** +// Copyright (C) 2021 Red Hat, Inc. and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** + +/** + * A buffer maintaining a write position capable of writing primitive values + */ +export interface WriteBuffer { + writeUint8(byte: number): WriteBuffer + writeUint16(value: number): WriteBuffer + writeUint32(value: number): WriteBuffer + writeString(value: string): WriteBuffer + writeBytes(value: Uint8Array): WriteBuffer + writeNumber(value: number): WriteBuffer + writeLength(value: number): WriteBuffer + /** + * Makes any writes to the buffer permanent, for example by sending the writes over a channel. + * You must obtain a new write buffer after committing + */ + commit(): void; +} + +export class ForwardingWriteBuffer implements WriteBuffer { + constructor(protected readonly underlying: WriteBuffer) { + } + + writeUint8(byte: number): WriteBuffer { + this.underlying.writeUint8(byte); + return this; + } + + writeUint16(value: number): WriteBuffer { + this.underlying.writeUint16(value); + return this; + } + + writeUint32(value: number): WriteBuffer { + this.underlying.writeUint32(value); + return this; + } + + writeLength(value: number): WriteBuffer { + this.underlying.writeLength(value); + return this; + } + + writeString(value: string): WriteBuffer { + this.underlying.writeString(value); + return this; + } + + writeBytes(value: Uint8Array): WriteBuffer { + this.underlying.writeBytes(value); + return this; + } + + writeNumber(value: number): WriteBuffer { + this.underlying.writeNumber(value); + return this; + } + + commit(): void { + this.underlying.commit(); + } +} + +/** + * A buffer maintaining a read position in a buffer containing a received message capable of + * reading primitive values. + */ +export interface ReadBuffer { + readUint8(): number; + readUint16(): number; + readUint32(): number; + readString(): string; + readNumber(): number, + readLength(): number, + readBytes(): Uint8Array; + + /** + * Returns a new read buffer whose starting read position is the current read position of this buffer. + * This is useful to create read buffers sub messages. + * (e.g. when using a multiplexer the beginning of the message might contain some protocol overhead which should not be part + * of the message reader that is sent to the target channel) + */ + sliceAtReadPosition(): ReadBuffer +} diff --git a/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts b/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts new file mode 100644 index 0000000000000..01a8aa2099ffb --- /dev/null +++ b/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts @@ -0,0 +1,42 @@ +// ***************************************************************************** +// Copyright (C) 2021 Red Hat, Inc. and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** + +import { expect } from 'chai'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; +import { RpcMessageDecoder, RpcMessageEncoder } from './rpc-message-encoder'; + +describe('PPC Message Codex', () => { + describe('RPC Message Encoder & Decoder', () => { + it('should encode object into binary message and decode the message back into the original object', () => { + const buffer = new Uint8Array(1024); + const writer = new Uint8ArrayWriteBuffer(buffer); + + const encoder = new RpcMessageEncoder(); + const jsonMangled = JSON.parse(JSON.stringify(encoder)); + + encoder.writeTypedValue(writer, encoder); + + const written = writer.getCurrentContents(); + + const reader = new Uint8ArrayReadBuffer(written); + + const decoder = new RpcMessageDecoder(); + const decoded = decoder.readTypedValue(reader); + + expect(decoded).deep.equal(jsonMangled); + }); + }); +}); diff --git a/packages/core/src/common/message-rpc/rpc-message-encoder.ts b/packages/core/src/common/message-rpc/rpc-message-encoder.ts new file mode 100644 index 0000000000000..070c050a2855b --- /dev/null +++ b/packages/core/src/common/message-rpc/rpc-message-encoder.ts @@ -0,0 +1,487 @@ +// ***************************************************************************** +// Copyright (C) 2022 Red Hat, Inc. and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import { ResponseError } from 'vscode-languageserver-protocol'; +import { ReadBuffer, WriteBuffer } from './message-buffer'; + +/** + * This code lets you encode rpc protocol messages (request/reply/notification/error/cancel) + * into a channel write buffer and decode the same messages from a read buffer. + * Custom encoders/decoders can be registered to specially handling certain types of values + * to be encoded. Clients are responsible for ensuring that the set of tags for encoders + * is distinct and the same at both ends of a channel. + */ + +export type RpcMessage = RequestMessage | ReplyMessage | ReplyErrMessage | CancelMessage | NotificationMessage; + +export const enum RpcMessageType { + Request = 1, + Notification = 2, + Reply = 3, + ReplyErr = 4, + Cancel = 5, +} + +export interface CancelMessage { + type: RpcMessageType.Cancel; + id: number; +} + +export interface RequestMessage { + type: RpcMessageType.Request; + id: number; + method: string; + args: any[]; +} + +export interface NotificationMessage { + type: RpcMessageType.Notification; + id: number; + method: string; + args: any[]; +} + +export interface ReplyMessage { + type: RpcMessageType.Reply; + id: number; + res: any; +} + +export interface ReplyErrMessage { + type: RpcMessageType.ReplyErr; + id: number; + err: any; +} + +export interface SerializedError { + readonly $isError: true; + readonly name: string; + readonly message: string; + readonly stack: string; +} + +/** + * The tag values for the default {@link ValueEncoder}s & {@link ValueDecoder}s + */ + +export enum ObjectType { + JSON = 0, + ByteArray = 1, + ObjectArray = 2, + Undefined = 3, + Object = 4, + String = 5, + Boolean = 6, + Number = 7, + // eslint-disable-next-line @typescript-eslint/no-shadow + ResponseError = 8, + Error = 9 + +} + +/** + * A value encoder writes javascript values to a write buffer. Encoders will be asked + * in turn (ordered by their tag value, descending) whether they can encode a given value + * This means encoders with higher tag values have priority. Since the default encoders + * have tag values from 1-7, they can be easily overridden. + */ +export interface ValueEncoder { + /** + * Returns true if this encoder wants to encode this value. + * @param value the value to be encoded + */ + is(value: any): boolean; + /** + * Write the given value to the buffer. Will only be called if {@link is(value)} returns true. + * @param buf The buffer to write to + * @param value The value to be written + * @param recursiveEncode A function that will use the encoders registered on the {@link MessageEncoder} + * to write a value to the underlying buffer. This is used mostly to write structures like an array + * without having to know how to encode the values in the array + */ + write(buf: WriteBuffer, value: any, recursiveEncode?: (buf: WriteBuffer, value: any) => void): void; +} + +/** + * Reads javascript values from a read buffer + */ +export interface ValueDecoder { + /** + * Reads a value from a read buffer. This method will be called for the decoder that is + * registered for the tag associated with the value encoder that encoded this value. + * @param buf The read buffer to read from + * @param recursiveDecode A function that will use the decoders registered on the {@link RpcMessageDecoder} + * to read values from the underlying read buffer. This is used mostly to decode structures like an array + * without having to know how to decode the values in the array. + */ + read(buf: ReadBuffer, recursiveDecode: (buf: ReadBuffer) => unknown): unknown; +} + +/** + * A `RpcMessageDecoder` parses a a binary message received via {@link ReadBuffer} into a {@link RpcMessage} + */ +export class RpcMessageDecoder { + + protected decoders: Map = new Map(); + + constructor() { + this.registerDecoder(ObjectType.JSON, { + read: buf => { + const json = buf.readString(); + return JSON.parse(json); + } + }); + this.registerDecoder(ObjectType.Error, { + read: buf => { + const serializedError: SerializedError = JSON.parse(buf.readString()); + const error = new Error(serializedError.message); + Object.assign(error, serializedError); + return error; + } + }); + + this.registerDecoder(ObjectType.ResponseError, { + read: buf => { + const error = JSON.parse(buf.readString()); + return new ResponseError(error.code, error.message, error.data); + } + }); + this.registerDecoder(ObjectType.ByteArray, { + read: buf => buf.readBytes() + }); + + this.registerDecoder(ObjectType.ObjectArray, { + read: buf => this.readArray(buf) + }); + + this.registerDecoder(ObjectType.Undefined, { + read: () => undefined + }); + + this.registerDecoder(ObjectType.Object, { + read: (buf, recursiveRead) => { + const propertyCount = buf.readLength(); + const result = Object.create({}); + for (let i = 0; i < propertyCount; i++) { + const key = buf.readString(); + const value = recursiveRead(buf); + result[key] = value; + } + return result; + } + }); + + this.registerDecoder(ObjectType.String, { + read: (buf, recursiveRead) => buf.readString() + }); + + this.registerDecoder(ObjectType.Boolean, { + read: buf => buf.readUint8() === 1 + }); + + this.registerDecoder(ObjectType.Number, { + read: buf => buf.readNumber() + }); + + } + + /** + * Registers a new {@link ValueDecoder} for the given tag. + * After the successful registration the {@link tagIntType} is recomputed + * by retrieving the highest tag value and calculating the required Uint size to store it. + * @param tag the tag for which the decoder should be registered. + * @param decoder the decoder that should be registered. + */ + registerDecoder(tag: number, decoder: ValueDecoder): void { + if (this.decoders.has(tag)) { + throw new Error(`Decoder already registered: ${tag}`); + } + this.decoders.set(tag, decoder); + } + parse(buf: ReadBuffer): RpcMessage { + try { + const msgType = buf.readUint8(); + + switch (msgType) { + case RpcMessageType.Request: + return this.parseRequest(buf); + case RpcMessageType.Notification: + return this.parseNotification(buf); + case RpcMessageType.Reply: + return this.parseReply(buf); + case RpcMessageType.ReplyErr: + return this.parseReplyErr(buf); + case RpcMessageType.Cancel: + return this.parseCancel(buf); + } + throw new Error(`Unknown message type: ${msgType}`); + } catch (e) { + // exception does not show problematic content: log it! + console.log('failed to parse message: ' + buf); + throw e; + } + } + + protected parseCancel(msg: ReadBuffer): CancelMessage { + const callId = msg.readUint32(); + return { + type: RpcMessageType.Cancel, + id: callId + }; + } + + protected parseRequest(msg: ReadBuffer): RequestMessage { + const callId = msg.readUint32(); + const method = msg.readString(); + let args = this.readArray(msg); + // convert `null` to `undefined`, since we don't use `null` in internal plugin APIs + args = args.map(arg => arg === null ? undefined : arg); // eslint-disable-line no-null/no-null + + return { + type: RpcMessageType.Request, + id: callId, + method: method, + args: args + }; + } + + protected parseNotification(msg: ReadBuffer): NotificationMessage { + const callId = msg.readUint32(); + const method = msg.readString(); + let args = this.readArray(msg); + // convert `null` to `undefined`, since we don't use `null` in internal plugin APIs + args = args.map(arg => arg === null ? undefined : arg); // eslint-disable-line no-null/no-null + + return { + type: RpcMessageType.Notification, + id: callId, + method: method, + args: args + }; + } + + parseReply(msg: ReadBuffer): ReplyMessage { + const callId = msg.readUint32(); + const value = this.readTypedValue(msg); + return { + type: RpcMessageType.Reply, + id: callId, + res: value + }; + } + + parseReplyErr(msg: ReadBuffer): ReplyErrMessage { + const callId = msg.readUint32(); + + const err = this.readTypedValue(msg); + + return { + type: RpcMessageType.ReplyErr, + id: callId, + err + }; + } + + readArray(buf: ReadBuffer): any[] { + const length = buf.readLength(); + const result = new Array(length); + for (let i = 0; i < length; i++) { + result[i] = this.readTypedValue(buf); + } + return result; + } + + readTypedValue(buf: ReadBuffer): any { + const type = buf.readUint8(); + const decoder = this.decoders.get(type); + if (!decoder) { + throw new Error(`No decoder for tag ${type}`); + } + return decoder.read(buf, innerBuffer => this.readTypedValue(innerBuffer)); + } +} + +/** + * A `RpcMessageEncoder` writes {@link RpcMessage} objects to a {@link WriteBuffer}. Note that it is + * up to clients to commit the message. This allows for multiple messages being + * encoded before sending. + */ +export class RpcMessageEncoder { + + protected readonly encoders: [number, ValueEncoder][] = []; + protected readonly registeredTags: Set = new Set(); + + constructor() { + this.registerEncoders(); + } + + protected registerEncoders(): void { + // encoders will be consulted in reverse order of registration, so the JSON fallback needs to be last + this.registerEncoder(ObjectType.JSON, { + is: () => true, + write: (buf, value) => { + buf.writeString(JSON.stringify(value)); + } + }); + + this.registerEncoder(ObjectType.Object, { + is: value => typeof value === 'object', + write: (buf, object, recursiveEncode) => { + const properties = Object.keys(object); + const relevant = []; + for (const property of properties) { + const value = object[property]; + if (typeof value !== 'function') { + relevant.push([property, value]); + } + } + + buf.writeLength(relevant.length); + for (const [property, value] of relevant) { + buf.writeString(property); + recursiveEncode?.(buf, value); + } + } + }); + + this.registerEncoder(ObjectType.Error, { + is: value => value instanceof Error, + write: (buf, error: Error) => { + const { name, message } = error; + const stack: string = (error).stacktrace || error.stack; + const serializedError = { + $isError: true, + name, + message, + stack + }; + buf.writeString(JSON.stringify(serializedError)); + } + }); + + this.registerEncoder(ObjectType.ResponseError, { + is: value => value instanceof ResponseError, + write: (buf, value) => buf.writeString(JSON.stringify(value)) + }); + + this.registerEncoder(ObjectType.Undefined, { + // eslint-disable-next-line no-null/no-null + is: value => value == null, + write: () => { } + }); + + this.registerEncoder(ObjectType.ObjectArray, { + is: value => Array.isArray(value), + write: (buf, value) => { + this.writeArray(buf, value); + } + }); + + this.registerEncoder(ObjectType.ByteArray, { + is: value => value instanceof Uint8Array, + write: (buf, value) => { + buf.writeBytes(value); + } + }); + + this.registerEncoder(ObjectType.String, { + is: value => typeof value === 'string', + write: (buf, value) => { + buf.writeString(value); + } + }); + + this.registerEncoder(ObjectType.Boolean, { + is: value => typeof value === 'boolean', + write: (buf, value) => { + buf.writeUint8(value === true ? 1 : 0); + } + }); + + this.registerEncoder(ObjectType.Number, { + is: value => typeof value === 'number', + write: (buf, value) => { + buf.writeNumber(value); + } + }); + } + + /** + * Registers a new {@link ValueEncoder} for the given tag. + * After the successful registration the {@link tagIntType} is recomputed + * by retrieving the highest tag value and calculating the required Uint size to store it. + * @param tag the tag for which the encoder should be registered. + * @param decoder the encoder that should be registered. + */ + registerEncoder(tag: number, encoder: ValueEncoder): void { + if (this.registeredTags.has(tag)) { + throw new Error(`Tag already registered: ${tag}`); + } + this.registeredTags.add(tag); + this.encoders.push([tag, encoder]); + } + + cancel(buf: WriteBuffer, requestId: number): void { + buf.writeUint8(RpcMessageType.Cancel); + buf.writeUint32(requestId); + } + + notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { + buf.writeUint8(RpcMessageType.Notification); + buf.writeUint32(requestId); + buf.writeString(method); + this.writeArray(buf, args); + } + + request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { + buf.writeUint8(RpcMessageType.Request); + buf.writeUint32(requestId); + buf.writeString(method); + this.writeArray(buf, args); + } + + replyOK(buf: WriteBuffer, requestId: number, res: any): void { + buf.writeUint8(RpcMessageType.Reply); + buf.writeUint32(requestId); + this.writeTypedValue(buf, res); + } + + replyErr(buf: WriteBuffer, requestId: number, err: any): void { + buf.writeUint8(RpcMessageType.ReplyErr); + buf.writeUint32(requestId); + this.writeTypedValue(buf, err); + } + + writeTypedValue(buf: WriteBuffer, value: any): void { + for (let i: number = this.encoders.length - 1; i >= 0; i--) { + if (this.encoders[i][1].is(value)) { + buf.writeUint8(this.encoders[i][0]); + this.encoders[i][1].write(buf, value, (innerBuffer, innerValue) => { + this.writeTypedValue(innerBuffer, innerValue); + }); + return; + } + } + } + + writeArray(buf: WriteBuffer, value: any[]): void { + buf.writeLength(value.length); + for (let i = 0; i < value.length; i++) { + this.writeTypedValue(buf, value[i]); + } + } +} diff --git a/packages/core/src/common/message-rpc/rpc-protocol.ts b/packages/core/src/common/message-rpc/rpc-protocol.ts new file mode 100644 index 0000000000000..4d145c80b4c7b --- /dev/null +++ b/packages/core/src/common/message-rpc/rpc-protocol.ts @@ -0,0 +1,212 @@ +// ***************************************************************************** +// Copyright (C) 2021 Red Hat, Inc. and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import { CancellationToken, CancellationTokenSource } from '../../../shared/vscode-languageserver-protocol'; +import { Emitter, Event } from '../event'; +import { Deferred } from '../promise-util'; +import { Channel } from './channel'; +import { RpcMessage, RpcMessageDecoder, RpcMessageEncoder, RpcMessageType } from './rpc-message-encoder'; +import { Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; + +/** + * Handles request messages received by the {@link RpcServer}. + */ +export type RequestHandler = (method: string, args: any[]) => Promise; + +/** + * Initialization options for a {@link RpcProtocol}. + */ +export interface RpcProtocolOptions { + /** + * The message encoder that should be used. If `undefined` the default {@link RpcMessageEncoder} will be used. + */ + encoder?: RpcMessageEncoder, + /** + * The message decoder that should be used. If `undefined` the default {@link RpcMessageDecoder} will be used. + */ + decoder?: RpcMessageDecoder +} +/** + * Establish a bi-directional RPC protocol on top of a given channel. Bi-directional means to send + * sends requests and notifications to the remote side as well as receiving requests and notifications from the remote side. + * Clients can get a promise for a remote request result that will be either resolved or + * rejected depending on the success of the request. Keeps track of outstanding requests and matches replies to the appropriate request + * Currently, there is no timeout handling for long running requests implemented. + */ +export class RpcProtocol { + static readonly CANCELLATION_TOKEN_KEY = 'add.cancellation.token'; + + protected readonly pendingRequests: Map> = new Map(); + + protected nextMessageId: number = 0; + + protected readonly encoder: RpcMessageEncoder; + protected readonly decoder: RpcMessageDecoder; + + protected readonly onNotificationEmitter: Emitter<{ method: string; args: any[]; }> = new Emitter(); + protected readonly cancellationTokenSources = new Map(); + + get onNotification(): Event<{ method: string; args: any[]; }> { + return this.onNotificationEmitter.event; + } + + constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler, options: RpcProtocolOptions = {}) { + this.encoder = options.encoder ?? new RpcMessageEncoder(); + this.decoder = options.decoder ?? new RpcMessageDecoder(); + const registration = channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer()))); + channel.onClose(() => registration.dispose()); + + } + + handleMessage(message: RpcMessage): void { + switch (message.type) { + case RpcMessageType.Cancel: { + this.handleCancel(message.id); + break; + } + case RpcMessageType.Request: { + this.handleRequest(message.id, message.method, message.args); + break; + } + case RpcMessageType.Notification: { + this.handleNotify(message.id, message.method, message.args); + break; + } + case RpcMessageType.Reply: { + this.handleReply(message.id, message.res); + break; + } + case RpcMessageType.ReplyErr: { + this.handleReplyErr(message.id, message.err); + break; + } + } + } + + protected handleReply(id: number, value: any): void { + const replyHandler = this.pendingRequests.get(id); + if (replyHandler) { + this.pendingRequests.delete(id); + replyHandler.resolve(value); + } else { + console.warn(`reply: no handler for message: ${id}`); + } + } + + protected handleReplyErr(id: number, error: any): void { + try { + const replyHandler = this.pendingRequests.get(id); + if (replyHandler) { + this.pendingRequests.delete(id); + replyHandler.reject(error); + } else { + console.warn(`error: no handler for message: ${id}`); + } + } catch (err) { + throw err; + } + } + + sendRequest(method: string, args: any[]): Promise { + + const id = this.nextMessageId++; + const reply = new Deferred(); + + // The last element of the request args might be a cancellation token. As these tokens are not serializable we have to remove it from the + // args array and the `CANCELLATION_TOKEN_KEY` string instead. + const cancellationToken: CancellationToken | undefined = args.length && CancellationToken.is(args[args.length - 1]) ? args.pop() : undefined; + if (cancellationToken && cancellationToken.isCancellationRequested) { + return Promise.reject(this.cancelError()); + } + + if (cancellationToken) { + args.push(RpcProtocol.CANCELLATION_TOKEN_KEY); + cancellationToken.onCancellationRequested(() => { + this.sendCancel(id); + this.pendingRequests.get(id)?.reject(this.cancelError()); + } + ); + } + this.pendingRequests.set(id, reply); + + const output = this.channel.getWriteBuffer(); + this.encoder.request(output, id, method, args); + output.commit(); + return reply.promise; + } + + sendNotification(method: string, args: any[]): void { + const output = this.channel.getWriteBuffer(); + this.encoder.notification(output, this.nextMessageId++, method, args); + output.commit(); + } + + sendCancel(requestId: number): void { + const output = this.channel.getWriteBuffer(); + this.encoder.cancel(output, requestId); + output.commit(); + } + + cancelError(): Error { + const error = new Error('"Request has already been canceled by the sender"'); + error.name = 'Cancel'; + return error; + } + + protected handleCancel(id: number): void { + const cancellationTokenSource = this.cancellationTokenSources.get(id); + if (cancellationTokenSource) { + this.cancellationTokenSources.delete(id); + cancellationTokenSource.cancel(); + } + } + + protected async handleRequest(id: number, method: string, args: any[]): Promise { + + const output = this.channel.getWriteBuffer(); + + // Check if the last argument of the received args is the key for indicating that a cancellation token should be used + // If so remove the key from the args and create a new cancellation token. + const addToken = args.length && args[args.length - 1] === RpcProtocol.CANCELLATION_TOKEN_KEY ? args.pop() : false; + if (addToken) { + const tokenSource = new CancellationTokenSource(); + this.cancellationTokenSources.set(id, tokenSource); + args.push(tokenSource.token); + } + + try { + const result = await this.requestHandler(method, args); + this.cancellationTokenSources.delete(id); + this.encoder.replyOK(output, id, result); + output.commit(); + } catch (err) { + // In case of an error the output buffer might already contains parts of an message. + // => Dispose the current buffer and retrieve a new, clean one for writing the response error. + if (output instanceof Uint8ArrayWriteBuffer) { + output.dispose(); + } + const errorOutput = this.channel.getWriteBuffer(); + this.cancellationTokenSources.delete(id); + this.encoder.replyErr(errorOutput, id, err); + errorOutput.commit(); + } + } + + protected async handleNotify(id: number, method: string, args: any[]): Promise { + this.onNotificationEmitter.fire({ method, args }); + } +} diff --git a/packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts b/packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts new file mode 100644 index 0000000000000..59cccbf90a605 --- /dev/null +++ b/packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts @@ -0,0 +1,41 @@ +// ***************************************************************************** +// Copyright (C) 2021 Red Hat, Inc. and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** +import { expect } from 'chai'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; + +describe('array message buffer tests', () => { + it('basic read write test', () => { + const buffer = new Uint8Array(1024); + const writer = new Uint8ArrayWriteBuffer(buffer); + + writer.writeUint8(8); + writer.writeUint32(10000); + writer.writeBytes(new Uint8Array([1, 2, 3, 4])); + writer.writeString('this is a string'); + writer.writeString('another string'); + writer.commit(); + + const written = writer.getCurrentContents(); + + const reader = new Uint8ArrayReadBuffer(written); + + expect(reader.readUint8()).equal(8); + expect(reader.readUint32()).equal(10000); + expect(reader.readBytes()).deep.equal(new Uint8Array([1, 2, 3, 4])); + expect(reader.readString()).equal('this is a string'); + expect(reader.readString()).equal('another string'); + }); +}); diff --git a/packages/core/src/common/message-rpc/uint8-array-message-buffer.ts b/packages/core/src/common/message-rpc/uint8-array-message-buffer.ts new file mode 100644 index 0000000000000..691fa2b6860d3 --- /dev/null +++ b/packages/core/src/common/message-rpc/uint8-array-message-buffer.ts @@ -0,0 +1,206 @@ +// ***************************************************************************** +// Copyright (C) 2021 Red Hat, Inc. and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** +import { Disposable } from 'vscode-languageserver-protocol'; +import { Emitter, Event } from '../event'; +import { ReadBuffer, WriteBuffer } from './message-buffer'; + +/** + * The default {@link WriteBuffer} implementation. Uses a {@link Uint8Array} for buffering. + * The {@link Uint8ArrayWriteBuffer.onCommit} hook can be used to rect to on-commit events. + * After the {@link Uint8ArrayWriteBuffer.commit} method has been called the buffer is disposed + * and can no longer be used for writing data. If the writer buffer is no longer needed but the message + * has not been committed yet it has to be disposed manually. + */ +export class Uint8ArrayWriteBuffer implements WriteBuffer, Disposable { + + private encoder = new TextEncoder(); + private msg: DataView; + private isDisposed = false; + private offset: number; + + constructor(private buffer: Uint8Array = new Uint8Array(1024), writePosition: number = 0) { + this.offset = buffer.byteOffset + writePosition; + this.msg = new DataView(buffer.buffer); + } + + ensureCapacity(value: number): WriteBuffer { + let newLength = this.buffer.byteLength; + while (newLength < this.offset + value) { + newLength *= 2; + } + if (newLength !== this.buffer.byteLength) { + const newBuffer = new Uint8Array(newLength); + newBuffer.set(this.buffer); + this.buffer = newBuffer; + this.msg = new DataView(this.buffer.buffer); + } + return this; + } + + writeLength(length: number): WriteBuffer { + if (length < 0 || (length % 1) !== 0) { + throw new Error(`Could not write the given length value. '${length}' is not an integer >= 0`); + } + if (length < 127) { + this.writeUint8(length); + } else { + this.writeUint8(128 + (length & 127)); + this.writeLength(length >> 7); + } + return this; + } + + writeNumber(value: number): WriteBuffer { + this.ensureCapacity(8); + this.msg.setFloat64(this.offset, value); + this.offset += 8; + return this; + } + + writeUint8(value: number): WriteBuffer { + this.ensureCapacity(1); + this.buffer[this.offset++] = value; + return this; + } + + writeUint16(value: number): WriteBuffer { + this.ensureCapacity(2); + this.msg.setUint16(this.offset, value); + this.offset += 2; + return this; + } + + writeUint32(value: number): WriteBuffer { + this.ensureCapacity(4); + this.msg.setUint32(this.offset, value); + this.offset += 4; + return this; + } + + writeString(value: string): WriteBuffer { + this.ensureCapacity(4 * value.length); + const result = this.encoder.encodeInto(value, this.buffer.subarray(this.offset + 4)); + this.msg.setUint32(this.offset, result.written!); + this.offset += 4 + result.written!; + return this; + } + + writeBytes(value: Uint8Array): WriteBuffer { + this.writeLength(value.byteLength); + this.ensureCapacity(value.length); + this.buffer.set(value, this.offset); + this.offset += value.length; + return this; + } + + private onCommitEmitter = new Emitter(); + get onCommit(): Event { + return this.onCommitEmitter.event; + } + + commit(): void { + if (this.isDisposed) { + throw new Error("Could not invoke 'commit'. The WriteBuffer is already disposed."); + } + this.onCommitEmitter.fire(this.getCurrentContents()); + this.dispose(); + } + + getCurrentContents(): Uint8Array { + return this.buffer.slice(this.buffer.byteOffset, this.offset); + } + + dispose(): void { + if (!this.isDisposed) { + this.onCommitEmitter.dispose(); + this.isDisposed = true; + } + } + +} +/** + * The default {@link ReadBuffer} implementation. Uses a {@link Uint8Array} for buffering. + * Is for single message read. A message can only be read once. + */ +export class Uint8ArrayReadBuffer implements ReadBuffer { + private offset: number = 0; + private msg: DataView; + private decoder = new TextDecoder(); + + constructor(private readonly buffer: Uint8Array, readPosition = 0) { + this.offset = buffer.byteOffset + readPosition; + this.msg = new DataView(buffer.buffer); + } + + readUint8(): number { + return this.msg.getUint8(this.offset++); + } + + readUint16(): number { + const result = this.msg.getUint16(this.offset); + this.offset += 2; + return result; + } + + readUint32(): number { + const result = this.msg.getUint32(this.offset); + this.offset += 4; + return result; + } + + readLength(): number { + let shift = 0; + let byte = this.readUint8(); + let value = (byte & 127) << shift; + while (byte > 127) { + shift += 7; + byte = this.readUint8(); + value = value + ((byte & 127) << shift); + } + return value; + } + + readNumber(): number { + const result = this.msg.getFloat64(this.offset); + this.offset += 8; + return result; + } + + readString(): string { + const len = this.readUint32(); + const sliceOffset = this.offset - this.buffer.byteOffset; + const result = this.decodeString(this.buffer.slice(sliceOffset, sliceOffset + len)); + this.offset += len; + return result; + } + + private decodeString(buf: Uint8Array): string { + return this.decoder.decode(buf); + } + + readBytes(): Uint8Array { + const length = this.readLength(); + const sliceOffset = this.offset - this.buffer.byteOffset; + const result = this.buffer.slice(sliceOffset, sliceOffset + length); + this.offset += length; + return result; + } + + sliceAtReadPosition(): ReadBuffer { + const sliceOffset = this.offset - this.buffer.byteOffset; + return new Uint8ArrayReadBuffer(this.buffer, sliceOffset); + } +} diff --git a/packages/core/src/common/messaging/abstract-connection-provider.ts b/packages/core/src/common/messaging/abstract-connection-provider.ts index d4c5c3bd3aaf3..d003fe86fc683 100644 --- a/packages/core/src/common/messaging/abstract-connection-provider.ts +++ b/packages/core/src/common/messaging/abstract-connection-provider.ts @@ -15,11 +15,10 @@ // ***************************************************************************** import { injectable, interfaces } from 'inversify'; -import { ConsoleLogger, createWebSocketConnection, Logger } from 'vscode-ws-jsonrpc'; import { Emitter, Event } from '../event'; import { ConnectionHandler } from './handler'; import { JsonRpcProxy, JsonRpcProxyFactory } from './proxy-factory'; -import { WebSocketChannel } from './web-socket-channel'; +import { Channel, ChannelMultiplexer } from '../message-rpc/channel'; /** * Factor common logic according to `ElectronIpcConnectionProvider` and @@ -45,9 +44,6 @@ export abstract class AbstractConnectionProvider throw new Error('abstract'); } - protected channelIdSeq = 0; - protected readonly channels = new Map(); - protected readonly onIncomingMessageActivityEmitter: Emitter = new Emitter(); get onIncomingMessageActivity(): Event { return this.onIncomingMessageActivityEmitter.event; @@ -75,50 +71,39 @@ export abstract class AbstractConnectionProvider return factory.createProxy(); } + protected channelMultiPlexer: ChannelMultiplexer; + + constructor() { + this.channelMultiPlexer = this.createMultiplexer(); + } + + protected createMultiplexer(): ChannelMultiplexer { + return new ChannelMultiplexer(this.createMainChannel()); + } + /** * Install a connection handler for the given path. */ listen(handler: ConnectionHandler, options?: AbstractOptions): void { this.openChannel(handler.path, channel => { - const connection = createWebSocketConnection(channel, this.createLogger()); - connection.onDispose(() => channel.close()); - handler.onConnection(connection); + handler.onConnection(channel); }, options); } - openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: AbstractOptions): void { - const id = this.channelIdSeq++; - const channel = this.createChannel(id); - this.channels.set(id, channel); - channel.onClose(() => { - if (this.channels.delete(channel.id)) { - const { reconnecting } = { reconnecting: true, ...options }; - if (reconnecting) { - this.openChannel(path, handler, options); - } - } else { - console.error('The ws channel does not exist', channel.id); + async openChannel(path: string, handler: (channel: Channel) => void, options?: AbstractOptions): Promise { + const newChannel = await this.channelMultiPlexer.open(path); + newChannel.onClose(() => { + const { reconnecting } = { reconnecting: true, ...options }; + if (reconnecting) { + this.openChannel(path, handler, options); } }); - channel.onOpen(() => handler(channel)); - channel.open(path); + handler(newChannel); } - protected abstract createChannel(id: number): WebSocketChannel; - - protected handleIncomingRawMessage(data: string): void { - const message: WebSocketChannel.Message = JSON.parse(data); - const channel = this.channels.get(message.id); - if (channel) { - channel.handleMessage(message); - } else { - console.error('The ws channel does not exist', message.id); - } - this.onIncomingMessageActivityEmitter.fire(undefined); - } - - protected createLogger(): Logger { - return new ConsoleLogger(); - } + /** + * Create the main connection that is used for multiplexing all channels. + */ + protected abstract createMainChannel(): Channel; } diff --git a/packages/core/src/common/messaging/connection-error-handler.ts b/packages/core/src/common/messaging/connection-error-handler.ts index aecfe68901e24..a5306865158fa 100644 --- a/packages/core/src/common/messaging/connection-error-handler.ts +++ b/packages/core/src/common/messaging/connection-error-handler.ts @@ -14,7 +14,7 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import { Message } from 'vscode-ws-jsonrpc'; +import { Message } from '../../../shared/vscode-languageserver-protocol'; import { ILogger } from '../../common'; export interface ResolvedConnectionErrorHandlerOptions { diff --git a/packages/core/src/common/messaging/handler.ts b/packages/core/src/common/messaging/handler.ts index ed03d9d331206..1e790d38aeec3 100644 --- a/packages/core/src/common/messaging/handler.ts +++ b/packages/core/src/common/messaging/handler.ts @@ -14,11 +14,11 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import { MessageConnection } from 'vscode-ws-jsonrpc'; +import { Channel } from '../message-rpc/channel'; export const ConnectionHandler = Symbol('ConnectionHandler'); export interface ConnectionHandler { readonly path: string; - onConnection(connection: MessageConnection): void; + onConnection(connection: Channel): void; } diff --git a/packages/core/src/common/messaging/proxy-factory.spec.ts b/packages/core/src/common/messaging/proxy-factory.spec.ts index 2fd0700a41034..37280e4dbfdaa 100644 --- a/packages/core/src/common/messaging/proxy-factory.spec.ts +++ b/packages/core/src/common/messaging/proxy-factory.spec.ts @@ -15,21 +15,11 @@ // ***************************************************************************** import * as chai from 'chai'; -import { ConsoleLogger } from '../../node/messaging/logger'; import { JsonRpcProxyFactory, JsonRpcProxy } from './proxy-factory'; -import { createMessageConnection } from 'vscode-jsonrpc/lib/main'; -import * as stream from 'stream'; +import { ChannelPipe } from '../message-rpc/channel.spec'; const expect = chai.expect; -class NoTransform extends stream.Transform { - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - override _transform(chunk: any, encoding: string, callback: Function): void { - callback(undefined, chunk); - } -} - class TestServer { requests: string[] = []; doStuff(arg: string): Promise { @@ -102,15 +92,12 @@ function getSetup(): { const server = new TestServer(); const serverProxyFactory = new JsonRpcProxyFactory(client); - const client2server = new NoTransform(); - const server2client = new NoTransform(); - const serverConnection = createMessageConnection(server2client, client2server, new ConsoleLogger()); - serverProxyFactory.listen(serverConnection); + const pipe = new ChannelPipe(); + serverProxyFactory.listen(pipe.right); const serverProxy = serverProxyFactory.createProxy(); const clientProxyFactory = new JsonRpcProxyFactory(server); - const clientConnection = createMessageConnection(client2server, server2client, new ConsoleLogger()); - clientProxyFactory.listen(clientConnection); + clientProxyFactory.listen(pipe.left); const clientProxy = clientProxyFactory.createProxy(); return { client, diff --git a/packages/core/src/common/messaging/proxy-factory.ts b/packages/core/src/common/messaging/proxy-factory.ts index f8869449eae94..5b35615b1f2a8 100644 --- a/packages/core/src/common/messaging/proxy-factory.ts +++ b/packages/core/src/common/messaging/proxy-factory.ts @@ -16,10 +16,12 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { MessageConnection, ResponseError } from 'vscode-ws-jsonrpc'; +import { ResponseError } from '../../../shared/vscode-languageserver-protocol'; import { ApplicationError } from '../application-error'; -import { Event, Emitter } from '../event'; import { Disposable } from '../disposable'; +import { Emitter, Event } from '../event'; +import { Channel } from '../message-rpc/channel'; +import { RequestHandler, RpcProtocol } from '../message-rpc/rpc-protocol'; import { ConnectionHandler } from './handler'; export type JsonRpcServer = Disposable & { @@ -45,13 +47,19 @@ export class JsonRpcConnectionHandler implements ConnectionHan readonly factoryConstructor: new () => JsonRpcProxyFactory = JsonRpcProxyFactory ) { } - onConnection(connection: MessageConnection): void { + onConnection(connection: Channel): void { const factory = new this.factoryConstructor(); const proxy = factory.createProxy(); factory.target = this.targetFactory(proxy); factory.listen(connection); } } +/** + * Factory for creating a new {@link RpcConnection} for a given chanel and {@link RequestHandler}. + */ +export type RpcConnectionFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol; + +const defaultRPCConnectionFactory: RpcConnectionFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler); /** * Factory for JSON-RPC proxy objects. @@ -95,13 +103,14 @@ export class JsonRpcConnectionHandler implements ConnectionHan * * @param - The type of the object to expose to JSON-RPC. */ + export class JsonRpcProxyFactory implements ProxyHandler { protected readonly onDidOpenConnectionEmitter = new Emitter(); protected readonly onDidCloseConnectionEmitter = new Emitter(); - protected connectionPromiseResolve: (connection: MessageConnection) => void; - protected connectionPromise: Promise; + protected connectionPromiseResolve: (connection: RpcProtocol) => void; + protected connectionPromise: Promise; /** * Build a new JsonRpcProxyFactory. @@ -109,7 +118,7 @@ export class JsonRpcProxyFactory implements ProxyHandler { * @param target - The object to expose to JSON-RPC methods calls. If this * is omitted, the proxy won't be able to handle requests, only send them. */ - constructor(public target?: any) { + constructor(public target?: any, protected rpcConnectionFactory = defaultRPCConnectionFactory) { this.waitForConnection(); } @@ -118,7 +127,7 @@ export class JsonRpcProxyFactory implements ProxyHandler { this.connectionPromiseResolve = resolve ); this.connectionPromise.then(connection => { - connection.onClose(() => + connection.channel.onClose(() => this.onDidCloseConnectionEmitter.fire(undefined) ); this.onDidOpenConnectionEmitter.fire(undefined); @@ -131,11 +140,10 @@ export class JsonRpcProxyFactory implements ProxyHandler { * This connection will be used to send/receive JSON-RPC requests and * response. */ - listen(connection: MessageConnection): void { - connection.onRequest((prop, ...args) => this.onRequest(prop, ...args)); - connection.onNotification((prop, ...args) => this.onNotification(prop, ...args)); - connection.onDispose(() => this.waitForConnection()); - connection.listen(); + listen(channel: Channel): void { + const connection = this.rpcConnectionFactory(channel, (meth, args) => this.onRequest(meth, ...args)); + connection.onNotification(event => this.onNotification(event.method, ...event.args)); + this.connectionPromiseResolve(connection); } @@ -239,10 +247,10 @@ export class JsonRpcProxyFactory implements ProxyHandler { new Promise((resolve, reject) => { try { if (isNotify) { - connection.sendNotification(method, ...args); + connection.sendNotification(method, args); resolve(undefined); } else { - const resultPromise = connection.sendRequest(method, ...args) as Promise; + const resultPromise = connection.sendRequest(method, args) as Promise; resultPromise .catch((err: any) => reject(this.deserializeError(capturedError, err))) .then((result: any) => resolve(result)); @@ -293,3 +301,4 @@ export class JsonRpcProxyFactory implements ProxyHandler { } } + diff --git a/packages/core/src/common/messaging/web-socket-channel.ts b/packages/core/src/common/messaging/web-socket-channel.ts index 28dff9400068a..f43f33e6f7644 100644 --- a/packages/core/src/common/messaging/web-socket-channel.ts +++ b/packages/core/src/common/messaging/web-socket-channel.ts @@ -16,157 +16,98 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { IWebSocket } from 'vscode-ws-jsonrpc/lib/socket/socket'; -import { Disposable, DisposableCollection } from '../disposable'; -import { Emitter } from '../event'; - -export class WebSocketChannel implements IWebSocket { - +import { Emitter, Event } from '../event'; +import { WriteBuffer } from '../message-rpc'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../message-rpc/uint8-array-message-buffer'; +import { Channel, MessageProvider, ChannelCloseEvent } from '../message-rpc/channel'; + +/** + * A channel that manages the main websocket connection between frontend and backend. All service channels + * are reusing this main channel. (multiplexing). An {@link IWebSocket} abstraction is used to keep the implementation + * independent of the actual websocket implementation and its execution context (backend vs. frontend). + */ +export class WebSocketChannel implements Channel { static wsPath = '/services'; - protected readonly closeEmitter = new Emitter<[number, string]>(); - protected readonly toDispose = new DisposableCollection(this.closeEmitter); - - constructor( - readonly id: number, - protected readonly doSend: (content: string) => void - ) { } - - dispose(): void { - this.toDispose.dispose(); - } - - protected checkNotDisposed(): void { - if (this.toDispose.disposed) { - throw new Error('The channel has been disposed.'); - } - } - - handleMessage(message: WebSocketChannel.Message): void { - if (message.kind === 'ready') { - this.fireOpen(); - } else if (message.kind === 'data') { - this.fireMessage(message.content); - } else if (message.kind === 'close') { - this.fireClose(message.code, message.reason); - } - } - - open(path: string): void { - this.checkNotDisposed(); - this.doSend(JSON.stringify({ - kind: 'open', - id: this.id, - path - })); - } - - ready(): void { - this.checkNotDisposed(); - this.doSend(JSON.stringify({ - kind: 'ready', - id: this.id - })); + protected readonly onCloseEmitter: Emitter = new Emitter(); + get onClose(): Event { + return this.onCloseEmitter.event; } - send(content: string): void { - this.checkNotDisposed(); - this.doSend(JSON.stringify({ - kind: 'data', - id: this.id, - content - })); + protected readonly onMessageEmitter: Emitter = new Emitter(); + get onMessage(): Event { + return this.onMessageEmitter.event; } - close(code: number = 1000, reason: string = ''): void { - if (this.closing) { - // Do not try to close the channel if it is already closing. - return; - } - this.checkNotDisposed(); - this.doSend(JSON.stringify({ - kind: 'close', - id: this.id, - code, - reason - })); - this.fireClose(code, reason); + protected readonly onErrorEmitter: Emitter = new Emitter(); + get onError(): Event { + return this.onErrorEmitter.event; } - tryClose(code: number = 1000, reason: string = ''): void { - if (this.closing || this.toDispose.disposed) { - // Do not try to close the channel if it is already closing or disposed. - return; - } - this.doSend(JSON.stringify({ - kind: 'close', - id: this.id, - code, - reason + constructor(protected readonly socket: IWebSocket) { + socket.onClose((reason, code) => this.onCloseEmitter.fire({ reason, code })); + socket.onError(error => this.onErrorEmitter.fire(error)); + // eslint-disable-next-line arrow-body-style + socket.onMessage(data => this.onMessageEmitter.fire(() => { + // In the browser context socketIO receives binary messages as ArrayBuffers. + // So we have to convert them to a Uint8Array before delegating the message to the read buffer. + const buffer = data instanceof ArrayBuffer ? new Uint8Array(data) : data; + return new Uint8ArrayReadBuffer(buffer); })); - this.fireClose(code, reason); } - protected fireOpen: () => void = () => { }; - onOpen(cb: () => void): void { - this.checkNotDisposed(); - this.fireOpen = cb; - this.toDispose.push(Disposable.create(() => this.fireOpen = () => { })); - } + getWriteBuffer(): WriteBuffer { + const result = new Uint8ArrayWriteBuffer(); - protected fireMessage: (data: any) => void = () => { }; - onMessage(cb: (data: any) => void): void { - this.checkNotDisposed(); - this.fireMessage = cb; - this.toDispose.push(Disposable.create(() => this.fireMessage = () => { })); - } + result.onCommit(buffer => { + if (this.socket.isConnected()) { + this.socket.send(buffer); + } + }); - fireError: (reason: any) => void = () => { }; - onError(cb: (reason: any) => void): void { - this.checkNotDisposed(); - this.fireError = cb; - this.toDispose.push(Disposable.create(() => this.fireError = () => { })); + return result; } - protected closing = false; - protected fireClose(code: number, reason: string): void { - if (this.closing) { - return; - } - this.closing = true; - try { - this.closeEmitter.fire([code, reason]); - } finally { - this.closing = false; - } - this.dispose(); - } - onClose(cb: (code: number, reason: string) => void): Disposable { - this.checkNotDisposed(); - return this.closeEmitter.event(([code, reason]) => cb(code, reason)); + close(): void { + this.socket.close(); + this.onCloseEmitter.dispose(); + this.onMessageEmitter.dispose(); + this.onErrorEmitter.dispose(); } - } -export namespace WebSocketChannel { - export interface OpenMessage { - kind: 'open' - id: number - path: string - } - export interface ReadyMessage { - kind: 'ready' - id: number - } - export interface DataMessage { - kind: 'data' - id: number - content: string - } - export interface CloseMessage { - kind: 'close' - id: number - code: number - reason: string - } - export type Message = OpenMessage | ReadyMessage | DataMessage | CloseMessage; + +/** + * An abstraction that enables reuse of the `{@link WebSocketChannel} class in the frontend and backend + * independent of the actual underlying socket implementation. + */ +export interface IWebSocket { + /** + * Sends the given message over the web socket in binary format. + * @param message The binary message. + */ + send(message: Uint8Array): void; + /** + * Closes the websocket from the local side. + */ + close(): void; + /** + * The connection state of the web socket. + */ + isConnected(): boolean; + /** + * Listener callback to handle incoming messages. + * @param cb The callback. + */ + onMessage(cb: (message: Uint8Array) => void): void; + /** + * Listener callback to handle socket errors. + * @param cb The callback. + */ + onError(cb: (reason: any) => void): void; + /** + * Listener callback to handle close events (Remote side). + * @param cb The callback. + */ + onClose(cb: (reason: string, code?: number) => void): void; } + diff --git a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts index b3d8ce8ab9415..6aa06861d2e93 100644 --- a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts +++ b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts @@ -17,9 +17,11 @@ import { Event as ElectronEvent, ipcRenderer } from '@theia/electron/shared/electron'; import { injectable, interfaces } from 'inversify'; import { JsonRpcProxy } from '../../common/messaging'; -import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider'; import { THEIA_ELECTRON_IPC_CHANNEL_NAME } from '../../electron-common/messaging/electron-connection-handler'; +import { Emitter, Event } from '../../common'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; +import { Channel, MessageProvider } from '../../common/message-rpc/channel'; export interface ElectronIpcOptions { } @@ -34,17 +36,25 @@ export class ElectronIpcConnectionProvider extends AbstractConnectionProvider(path, arg); } - constructor() { - super(); - ipcRenderer.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (event: ElectronEvent, data: string) => { - this.handleIncomingRawMessage(data); - }); - } - - protected createChannel(id: number): WebSocketChannel { - return new WebSocketChannel(id, content => { - ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, content); + protected createMainChannel(): Channel { + const onMessageEmitter = new Emitter(); + ipcRenderer.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (_event: ElectronEvent, data: Uint8Array) => { + onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); }); + return { + close: () => Event.None, + getWriteBuffer: () => { + const writer = new Uint8ArrayWriteBuffer(); + writer.onCommit(buffer => + // The ipcRenderer cannot handle ArrayBuffers directly=> we have to convert to Uint8Array. + ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer) + ); + return writer; + }, + onClose: Event.None, + onError: Event.None, + onMessage: onMessageEmitter.event + }; } } diff --git a/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts b/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts index 6f75ea31d0dae..ac99ee85bbab8 100644 --- a/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts +++ b/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts @@ -15,9 +15,9 @@ // ***************************************************************************** import { injectable } from 'inversify'; -import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; import { WebSocketConnectionProvider, WebSocketOptions } from '../../browser/messaging/ws-connection-provider'; import { FrontendApplicationContribution } from '../../browser/frontend-application'; +import { Channel } from '../../common'; /** * Customized connection provider between the frontend and the backend in electron environment. @@ -34,16 +34,13 @@ export class ElectronWebSocketConnectionProvider extends WebSocketConnectionProv onStop(): void { this.stopping = true; - // Close the websocket connection `onStop`. Otherwise, the channels will be closed with 30 sec (`MessagingContribution#checkAliveTimeout`) delay. + // Manually close the websocket connections `onStop`. Otherwise, the channels will be closed with 30 sec (`MessagingContribution#checkAliveTimeout`) delay. // https://github.com/eclipse-theia/theia/issues/6499 - for (const channel of [...this.channels.values()]) { - // `1001` indicates that an endpoint is "going away", such as a server going down or a browser having navigated away from a page. - // But we cannot use `1001`: https://github.com/TypeFox/vscode-ws-jsonrpc/issues/15 - channel.close(1000, 'The frontend is "going away"...'); - } + // `1001` indicates that an endpoint is "going away", such as a server going down or a browser having navigated away from a page. + this.channelMultiPlexer.closeUnderlyingChannel({ reason: 'The frontend is "going away"', code: 1001 }); } - override openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: WebSocketOptions): void { + override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise { if (!this.stopping) { super.openChannel(path, handler, options); } diff --git a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts index 071796c5cf0ca..dfd8f6115c8ce 100644 --- a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts +++ b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts @@ -16,24 +16,23 @@ import { IpcMainEvent, ipcMain, WebContents } from '@theia/electron/shared/electron'; import { inject, injectable, named, postConstruct } from 'inversify'; -import { MessageConnection } from 'vscode-ws-jsonrpc'; -import { createWebSocketConnection } from 'vscode-ws-jsonrpc/lib/socket/connection'; import { ContributionProvider } from '../../common/contribution-provider'; -import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; import { MessagingContribution } from '../../node/messaging/messaging-contribution'; -import { ConsoleLogger } from '../../node/messaging/logger'; import { ElectronConnectionHandler, THEIA_ELECTRON_IPC_CHANNEL_NAME } from '../../electron-common/messaging/electron-connection-handler'; import { ElectronMainApplicationContribution } from '../electron-main-application'; import { ElectronMessagingService } from './electron-messaging-service'; +import { Channel, ChannelCloseEvent, ChannelMultiplexer, MessageProvider } from '../../common/message-rpc/channel'; +import { Emitter, Event, WriteBuffer } from '../../common'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; /** * This component replicates the role filled by `MessagingContribution` but for Electron. * Unlike the WebSocket based implementation, we do not expect to receive * connection events. Instead, we'll create channels based on incoming `open` * events on the `ipcMain` channel. - * * This component allows communication between renderer process (frontend) and electron main process. */ + @injectable() export class ElectronMessagingContribution implements ElectronMainApplicationContribution, ElectronMessagingService { @@ -43,89 +42,112 @@ export class ElectronMessagingContribution implements ElectronMainApplicationCon @inject(ContributionProvider) @named(ElectronConnectionHandler) protected readonly connectionHandlers: ContributionProvider; - protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers(); - protected readonly windowChannels = new Map>(); + protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers(); + /** + * Each electron window has a main chanel and its own multiplexer to route multiple client messages the same IPC connection. + */ + protected readonly windowChannelMultiplexer = new Map(); @postConstruct() protected init(): void { - ipcMain.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (event: IpcMainEvent, data: string) => { - this.handleIpcMessage(event, data); + ipcMain.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (event: IpcMainEvent, data: Uint8Array) => { + this.handleIpcEvent(event, data); }); } + protected handleIpcEvent(event: IpcMainEvent, data: Uint8Array): void { + const sender = event.sender; + // Get the multiplexer for a given window id + try { + const windowChannelData = this.windowChannelMultiplexer.get(sender.id) ?? this.createWindowChannelData(sender); + windowChannelData!.channel.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); + } catch (error) { + console.error('IPC: Failed to handle message', { error, data }); + } + } + + // Creates a new multiplexer for a given sender/window + protected createWindowChannelData(sender: Electron.WebContents): { channel: ElectronWebContentChannel, multiPlexer: ChannelMultiplexer } { + const mainChannel = this.createWindowMainChannel(sender); + const multiPlexer = new ChannelMultiplexer(mainChannel); + multiPlexer.onDidOpenChannel(openEvent => { + const { channel, id } = openEvent; + if (this.channelHandlers.route(id, channel)) { + console.debug(`Opening channel for service path '${id}'.`); + channel.onClose(() => console.debug(`Closing channel on service path '${id}'.`)); + } + }); + + sender.once('did-navigate', () => multiPlexer.closeUnderlyingChannel({ reason: 'Window was refreshed' })); // When refreshing the browser window. + sender.once('destroyed', () => multiPlexer.closeUnderlyingChannel({ reason: 'Window was closed' })); // When closing the browser window. + const data = { channel: mainChannel, multiPlexer }; + this.windowChannelMultiplexer.set(sender.id, data); + return data; + } + + /** + * Creates the main channel to a window. + * @param sender The window that the channel should be established to. + */ + protected createWindowMainChannel(sender: WebContents): ElectronWebContentChannel { + return new ElectronWebContentChannel(sender); + } + onStart(): void { for (const contribution of this.messagingContributions.getContributions()) { contribution.configure(this); } for (const connectionHandler of this.connectionHandlers.getContributions()) { this.channelHandlers.push(connectionHandler.path, (params, channel) => { - const connection = createWebSocketConnection(channel, new ConsoleLogger()); - connectionHandler.onConnection(connection); + connectionHandler.onConnection(channel); }); } } - listen(spec: string, callback: (params: ElectronMessagingService.PathParams, connection: MessageConnection) => void): void { - this.ipcChannel(spec, (params, channel) => { - const connection = createWebSocketConnection(channel, new ConsoleLogger()); - callback(params, connection); - }); - } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - ipcChannel(spec: string, callback: (params: any, channel: WebSocketChannel) => void): void { + ipcChannel(spec: string, callback: (params: any, channel: Channel) => void): void { this.channelHandlers.push(spec, callback); } +} + +/** + * Used to establish a connection between the ipcMain and the Electron frontend (window). + * Messages a transferred via electron IPC. + */ +export class ElectronWebContentChannel implements Channel { + protected readonly onCloseEmitter: Emitter = new Emitter(); + get onClose(): Event { + return this.onCloseEmitter.event; + } - protected handleIpcMessage(event: IpcMainEvent, data: string): void { - const sender = event.sender; - try { - // Get the channel map for a given window id - let channels = this.windowChannels.get(sender.id)!; - if (!channels) { - this.windowChannels.set(sender.id, channels = new Map()); - } - // Start parsing the message to extract the channel id and route - const message: WebSocketChannel.Message = JSON.parse(data.toString()); - // Someone wants to open a logical channel - if (message.kind === 'open') { - const { id, path } = message; - const channel = this.createChannel(id, sender); - if (this.channelHandlers.route(path, channel)) { - channel.ready(); - channels.set(id, channel); - channel.onClose(() => channels.delete(id)); - } else { - console.error('Cannot find a service for the path: ' + path); - } - } else { - const { id } = message; - const channel = channels.get(id); - if (channel) { - channel.handleMessage(message); - } else { - console.error('The ipc channel does not exist', id); - } - } - const close = () => { - for (const channel of Array.from(channels.values())) { - channel.close(undefined, 'webContent destroyed'); - } - channels.clear(); - }; - sender.once('did-navigate', close); // When refreshing the browser window. - sender.once('destroyed', close); // When closing the browser window. - } catch (error) { - console.error('IPC: Failed to handle message', { error, data }); - } + // Make the message emitter public so that we can easily forward messages received from the ipcMain. + readonly onMessageEmitter: Emitter = new Emitter(); + get onMessage(): Event { + return this.onMessageEmitter.event; + } + + protected readonly onErrorEmitter: Emitter = new Emitter(); + get onError(): Event { + return this.onErrorEmitter.event; + } + + constructor(protected readonly sender: Electron.WebContents) { } - protected createChannel(id: number, sender: WebContents): WebSocketChannel { - return new WebSocketChannel(id, content => { - if (!sender.isDestroyed()) { - sender.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, content); + getWriteBuffer(): WriteBuffer { + const writer = new Uint8ArrayWriteBuffer(); + + writer.onCommit(buffer => { + if (!this.sender.isDestroyed()) { + this.sender.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer); } }); - } + return writer; + } + close(): void { + this.onCloseEmitter.dispose(); + this.onMessageEmitter.dispose(); + this.onErrorEmitter.dispose(); + } } diff --git a/packages/core/src/electron-main/messaging/electron-messaging-service.ts b/packages/core/src/electron-main/messaging/electron-messaging-service.ts index dde3fdde1d181..874d51237b4fd 100644 --- a/packages/core/src/electron-main/messaging/electron-messaging-service.ts +++ b/packages/core/src/electron-main/messaging/electron-messaging-service.ts @@ -14,20 +14,14 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import type { MessageConnection } from 'vscode-jsonrpc'; -import type { WebSocketChannel } from '../../common/messaging/web-socket-channel'; +import { Channel } from '../../common/message-rpc/channel'; export interface ElectronMessagingService { - /** - * Accept a JSON-RPC connection on the given path. - * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. - */ - listen(path: string, callback: (params: ElectronMessagingService.PathParams, connection: MessageConnection) => void): void; /** * Accept an ipc channel on the given path. * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. */ - ipcChannel(path: string, callback: (params: ElectronMessagingService.PathParams, socket: WebSocketChannel) => void): void; + ipcChannel(path: string, callback: (params: ElectronMessagingService.PathParams, socket: Channel) => void): void; } export namespace ElectronMessagingService { export interface PathParams { diff --git a/packages/core/src/node/messaging/ipc-bootstrap.ts b/packages/core/src/node/messaging/ipc-bootstrap.ts index 0bac13bb163b8..cfc24f9d58f19 100644 --- a/packages/core/src/node/messaging/ipc-bootstrap.ts +++ b/packages/core/src/node/messaging/ipc-bootstrap.ts @@ -16,20 +16,44 @@ import 'reflect-metadata'; import { dynamicRequire } from '../dynamic-require'; -import { ConsoleLogger } from 'vscode-ws-jsonrpc/lib/logger'; -import { createMessageConnection, IPCMessageReader, IPCMessageWriter, Trace } from 'vscode-ws-jsonrpc'; import { checkParentAlive, IPCEntryPoint } from './ipc-protocol'; +import { Socket } from 'net'; +import { Channel, ChannelCloseEvent, Emitter, MessageProvider } from '../../common'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; checkParentAlive(); const entryPoint = IPCEntryPoint.getScriptFromEnv(); -const reader = new IPCMessageReader(process); -const writer = new IPCMessageWriter(process); -const logger = new ConsoleLogger(); -const connection = createMessageConnection(reader, writer, logger); -connection.trace(Trace.Off, { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - log: (message: any, data?: string) => console.log(message, data) -}); - -dynamicRequire<{ default: IPCEntryPoint }>(entryPoint).default(connection); + +dynamicRequire<{ default: IPCEntryPoint }>(entryPoint).default(createChannel()); + +function createChannel(): Channel { + const pipe = new Socket({ + fd: 4 + }); + + const onCloseEmitter = new Emitter(); + const onMessageEmitter = new Emitter(); + const onErrorEmitter = new Emitter(); + const eventEmitter: NodeJS.EventEmitter = process; + eventEmitter.on('error', error => onErrorEmitter.fire(error)); + eventEmitter.on('close', () => onCloseEmitter.fire({ reason: 'Process has been closed from remote site (parent)' })); + pipe.on('data', (data: Uint8Array) => { + onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); + }); + + return { + close: () => process.exit(), + onClose: onCloseEmitter.event, + onError: onErrorEmitter.event, + onMessage: onMessageEmitter.event, + getWriteBuffer: () => { + const result = new Uint8ArrayWriteBuffer(); + result.onCommit(buffer => { + pipe.write(buffer); + }); + + return result; + } + }; +} diff --git a/packages/core/src/node/messaging/ipc-connection-provider.ts b/packages/core/src/node/messaging/ipc-connection-provider.ts index 84c256997257a..d5b96de7a15e6 100644 --- a/packages/core/src/node/messaging/ipc-connection-provider.ts +++ b/packages/core/src/node/messaging/ipc-connection-provider.ts @@ -15,10 +15,12 @@ // ***************************************************************************** import * as cp from 'child_process'; +import { inject, injectable } from 'inversify'; import * as path from 'path'; -import { injectable, inject } from 'inversify'; -import { Trace, Tracer, IPCMessageReader, IPCMessageWriter, createMessageConnection, MessageConnection, Message } from 'vscode-ws-jsonrpc'; -import { ILogger, ConnectionErrorHandler, DisposableCollection, Disposable } from '../../common'; +import { Writable } from 'stream'; +import { Message } from '../../../shared/vscode-languageserver-protocol'; +import { Channel, ChannelCloseEvent, ConnectionErrorHandler, Disposable, DisposableCollection, Emitter, ILogger, MessageProvider } from '../../common'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; import { createIpcEnv } from './ipc-protocol'; export interface ResolvedIPCConnectionOptions { @@ -40,7 +42,7 @@ export class IPCConnectionProvider { @inject(ILogger) protected readonly logger: ILogger; - listen(options: IPCConnectionOptions, acceptor: (connection: MessageConnection) => void): Disposable { + listen(options: IPCConnectionOptions, acceptor: (connection: Channel) => void): Disposable { return this.doListen({ logger: this.logger, args: [], @@ -48,7 +50,7 @@ export class IPCConnectionProvider { }, acceptor); } - protected doListen(options: ResolvedIPCConnectionOptions, acceptor: (connection: MessageConnection) => void): Disposable { + protected doListen(options: ResolvedIPCConnectionOptions, acceptor: (connection: Channel) => void): Disposable { const childProcess = this.fork(options); const connection = this.createConnection(childProcess, options); const toStop = new DisposableCollection(); @@ -74,32 +76,41 @@ export class IPCConnectionProvider { return toStop; } - protected createConnection(childProcess: cp.ChildProcess, options: ResolvedIPCConnectionOptions): MessageConnection { - const reader = new IPCMessageReader(childProcess); - const writer = new IPCMessageWriter(childProcess); - const connection = createMessageConnection(reader, writer, { - error: (message: string) => this.logger.error(`[${options.serverName}: ${childProcess.pid}] ${message}`), - warn: (message: string) => this.logger.warn(`[${options.serverName}: ${childProcess.pid}] ${message}`), - info: (message: string) => this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${message}`), - log: (message: string) => this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${message}`) + protected createConnection(childProcess: cp.ChildProcess, options?: ResolvedIPCConnectionOptions): Channel { + + const onCloseEmitter = new Emitter(); + const onMessageEmitter = new Emitter(); + const onErrorEmitter = new Emitter(); + const pipe = childProcess.stdio[4] as Writable; + + pipe.on('data', (data: Uint8Array) => { + onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); }); - const tracer: Tracer = { - log: (message: unknown, data?: string) => this.logger.debug(`[${options.serverName}: ${childProcess.pid}] ${message}` + (typeof data === 'string' ? ' ' + data : '')) - }; - connection.trace(Trace.Verbose, tracer); - this.logger.isDebug().then(isDebug => { - if (!isDebug) { - connection.trace(Trace.Off, tracer); + + childProcess.on('error', err => onErrorEmitter.fire(err)); + childProcess.on('exit', code => onCloseEmitter.fire({ reason: 'Child process been terminated', code: code ?? undefined })); + + return { + close: () => { }, + onClose: onCloseEmitter.event, + onError: onErrorEmitter.event, + onMessage: onMessageEmitter.event, + getWriteBuffer: () => { + const result = new Uint8ArrayWriteBuffer(); + result.onCommit(buffer => { + pipe.write(buffer); + }); + + return result; } - }); - return connection; + }; } protected fork(options: ResolvedIPCConnectionOptions): cp.ChildProcess { const forkOptions: cp.ForkOptions = { - silent: true, env: createIpcEnv(options), - execArgv: [] + execArgv: [], + stdio: ['pipe', 'pipe', 'pipe', 'ipc', 'pipe'] }; const inspectArgPrefix = `--${options.serverName}-inspect`; const inspectArg = process.argv.find(v => v.startsWith(inspectArgPrefix)); @@ -108,7 +119,9 @@ export class IPCConnectionProvider { } const childProcess = cp.fork(path.join(__dirname, 'ipc-bootstrap'), options.args, forkOptions); - childProcess.stdout!.on('data', data => this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${data.toString().trim()}`)); + childProcess.stdout!.on('data', data => { + this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${data.toString().trim()}`); + }); childProcess.stderr!.on('data', data => this.logger.error(`[${options.serverName}: ${childProcess.pid}] ${data.toString().trim()}`)); this.logger.debug(`[${options.serverName}: ${childProcess.pid}] IPC started`); diff --git a/packages/core/src/node/messaging/ipc-protocol.ts b/packages/core/src/node/messaging/ipc-protocol.ts index de9a77394b03e..03aa3944521c3 100644 --- a/packages/core/src/node/messaging/ipc-protocol.ts +++ b/packages/core/src/node/messaging/ipc-protocol.ts @@ -15,14 +15,14 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import { MessageConnection } from 'vscode-ws-jsonrpc'; +import { Channel } from '../../common/message-rpc/channel'; const THEIA_PARENT_PID = 'THEIA_PARENT_PID'; const THEIA_ENTRY_POINT = 'THEIA_ENTRY_POINT'; export const ipcEntryPoint: string | undefined = process.env[THEIA_ENTRY_POINT]; -export type IPCEntryPoint = (connection: MessageConnection) => void; +export type IPCEntryPoint = (connection: Channel) => void; export namespace IPCEntryPoint { /** * Throws if `THEIA_ENTRY_POINT` is undefined or empty. diff --git a/packages/core/src/node/messaging/messaging-contribution.ts b/packages/core/src/node/messaging/messaging-contribution.ts index 2ee8764854780..afa78063d6d73 100644 --- a/packages/core/src/node/messaging/messaging-contribution.ts +++ b/packages/core/src/node/messaging/messaging-contribution.ts @@ -18,19 +18,15 @@ import * as http from 'http'; import * as https from 'https'; import { Server, Socket } from 'socket.io'; import { injectable, inject, named, postConstruct, interfaces, Container } from 'inversify'; -import { MessageConnection } from 'vscode-ws-jsonrpc'; -import { createWebSocketConnection } from 'vscode-ws-jsonrpc/lib/socket/connection'; -import { IConnection } from 'vscode-ws-jsonrpc/lib/server/connection'; -import * as launch from 'vscode-ws-jsonrpc/lib/server/launch'; import { ContributionProvider, ConnectionHandler, bindContributionProvider } from '../../common'; -import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; +import { IWebSocket, WebSocketChannel } from '../../common/messaging/web-socket-channel'; import { BackendApplicationContribution } from '../backend-application'; -import { MessagingService, WebSocketChannelConnection } from './messaging-service'; -import { ConsoleLogger } from './logger'; +import { MessagingService } from './messaging-service'; import { ConnectionContainerModule } from './connection-container-module'; import Route = require('route-parser'); import { WsRequestValidator } from '../ws-request-validators'; import { MessagingListener } from './messaging-listeners'; +import { Channel, ChannelMultiplexer } from '../../common/message-rpc/channel'; export const MessagingContainer = Symbol('MessagingContainer'); @@ -53,7 +49,7 @@ export class MessagingContribution implements BackendApplicationContribution, Me protected readonly messagingListener: MessagingListener; protected readonly wsHandlers = new MessagingContribution.ConnectionHandlers(); - protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers(); + protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers(); @postConstruct() protected init(): void { @@ -63,21 +59,7 @@ export class MessagingContribution implements BackendApplicationContribution, Me } } - listen(spec: string, callback: (params: MessagingService.PathParams, connection: MessageConnection) => void): void { - this.wsChannel(spec, (params, channel) => { - const connection = createWebSocketConnection(channel, new ConsoleLogger()); - callback(params, connection); - }); - } - - forward(spec: string, callback: (params: MessagingService.PathParams, connection: IConnection) => void): void { - this.wsChannel(spec, (params, channel) => { - const connection = launch.createWebSocketConnection(channel); - callback(params, WebSocketChannelConnection.create(connection, channel)); - }); - } - - wsChannel(spec: string, callback: (params: MessagingService.PathParams, channel: WebSocketChannel) => void): void { + wsChannel(spec: string, callback: (params: MessagingService.PathParams, channel: Channel) => void): void { this.channelHandlers.push(spec, (params, channel) => callback(params, channel)); } @@ -125,49 +107,15 @@ export class MessagingContribution implements BackendApplicationContribution, Me } protected handleChannels(socket: Socket): void { + const socketChannel = new WebSocketChannel(toIWebSocket(socket)); + const mulitplexer = new ChannelMultiplexer(socketChannel); const channelHandlers = this.getConnectionChannelHandlers(socket); - const channels = new Map(); - socket.on('message', data => { - try { - const message: WebSocketChannel.Message = JSON.parse(data.toString()); - if (message.kind === 'open') { - const { id, path } = message; - const channel = this.createChannel(id, socket); - if (channelHandlers.route(path, channel)) { - channel.ready(); - console.debug(`Opening channel for service path '${path}'. [ID: ${id}]`); - channels.set(id, channel); - channel.onClose(() => { - console.debug(`Closing channel on service path '${path}'. [ID: ${id}]`); - channels.delete(id); - }); - } else { - console.error('Cannot find a service for the path: ' + path); - } - } else { - const { id } = message; - const channel = channels.get(id); - if (channel) { - channel.handleMessage(message); - } else { - console.error('The ws channel does not exist', id); - } - } - } catch (error) { - console.error('Failed to handle message', { error, data }); + mulitplexer.onDidOpenChannel(event => { + if (channelHandlers.route(event.id, event.channel)) { + console.debug(`Opening channel for service path '${event.id}'.`); + event.channel.onClose(() => console.debug(`Closing channel on service path '${event.id}'.`)); } }); - socket.on('error', err => { - for (const channel of channels.values()) { - channel.fireError(err); - } - }); - socket.on('disconnect', reason => { - for (const channel of channels.values()) { - channel.close(undefined, reason); - } - channels.clear(); - }); } protected createSocketContainer(socket: Socket): Container { @@ -176,7 +124,7 @@ export class MessagingContribution implements BackendApplicationContribution, Me return connectionContainer; } - protected getConnectionChannelHandlers(socket: Socket): MessagingContribution.ConnectionHandlers { + protected getConnectionChannelHandlers(socket: Socket): MessagingContribution.ConnectionHandlers { const connectionContainer = this.createSocketContainer(socket); bindContributionProvider(connectionContainer, ConnectionHandler); connectionContainer.load(...this.connectionModules.getContributions()); @@ -184,21 +132,25 @@ export class MessagingContribution implements BackendApplicationContribution, Me const connectionHandlers = connectionContainer.getNamed>(ContributionProvider, ConnectionHandler); for (const connectionHandler of connectionHandlers.getContributions(true)) { connectionChannelHandlers.push(connectionHandler.path, (_, channel) => { - const connection = createWebSocketConnection(channel, new ConsoleLogger()); - connectionHandler.onConnection(connection); + connectionHandler.onConnection(channel); }); } return connectionChannelHandlers; } - protected createChannel(id: number, socket: Socket): WebSocketChannel { - return new WebSocketChannel(id, content => { - if (socket.connected) { - socket.send(content); - } - }); - } +} +function toIWebSocket(socket: Socket): IWebSocket { + return { + close: () => { + socket.disconnect(); + }, + isConnected: () => socket.connected, + onClose: cb => socket.on('disconnect', reason => cb(reason)), + onError: cb => socket.on('error', error => cb(error)), + onMessage: cb => socket.on('message', data => cb(data)), + send: message => socket.emit('message', message) + }; } export namespace MessagingContribution { diff --git a/packages/core/src/node/messaging/messaging-service.ts b/packages/core/src/node/messaging/messaging-service.ts index 087f6d5850def..276b58734bcff 100644 --- a/packages/core/src/node/messaging/messaging-service.ts +++ b/packages/core/src/node/messaging/messaging-service.ts @@ -15,33 +15,21 @@ // ***************************************************************************** import { Socket } from 'socket.io'; -import { MessageConnection } from 'vscode-ws-jsonrpc'; -import { IConnection } from 'vscode-ws-jsonrpc/lib/server/connection'; -import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; +import { Channel } from '../../common/message-rpc/channel'; export interface MessagingService { - /** - * Accept a JSON-RPC connection on the given path. - * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. - */ - listen(path: string, callback: (params: MessagingService.PathParams, connection: MessageConnection) => void): void; - /** - * Accept a raw JSON-RPC connection on the given path. - * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. - */ - forward(path: string, callback: (params: MessagingService.PathParams, connection: IConnection) => void): void; /** * Accept a web socket channel on the given path. * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. */ - wsChannel(path: string, callback: (params: MessagingService.PathParams, socket: WebSocketChannel) => void): void; + wsChannel(path: string, callback: (params: MessagingService.PathParams, channel: Channel) => void): void; /** * Accept a web socket connection on the given path. * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. * * #### Important - * Prefer JSON-RPC connections or web socket channels over web sockets. Clients can handle only limited amount of web sockets - * and excessive amount can cause performance degradation. All JSON-RPC connections and web socket channels share the single web socket connection. + * Prefer using web socket channels over establishing new web socket connection. Clients can handle only limited amount of web sockets + * and excessive amount can cause performance degradation. All web socket channels share a single web socket connection. */ ws(path: string, callback: (params: MessagingService.PathParams, socket: Socket) => void): void; } @@ -56,18 +44,3 @@ export namespace MessagingService { configure(service: MessagingService): void; } } - -export interface WebSocketChannelConnection extends IConnection { - channel: WebSocketChannel; -} -export namespace WebSocketChannelConnection { - export function is(connection: IConnection): connection is WebSocketChannelConnection { - return (connection as WebSocketChannelConnection).channel instanceof WebSocketChannel; - } - - export function create(connection: IConnection, channel: WebSocketChannel): WebSocketChannelConnection { - const result = connection as WebSocketChannelConnection; - result.channel = channel; - return result; - } -} diff --git a/packages/core/src/node/messaging/test/test-web-socket-channel.ts b/packages/core/src/node/messaging/test/test-web-socket-channel.ts index 2fbb17c9aa8ec..4fa11a11a468a 100644 --- a/packages/core/src/node/messaging/test/test-web-socket-channel.ts +++ b/packages/core/src/node/messaging/test/test-web-socket-channel.ts @@ -16,32 +16,41 @@ import * as http from 'http'; import * as https from 'https'; -import { WebSocketChannel } from '../../../common/messaging/web-socket-channel'; -import { Disposable } from '../../../common/disposable'; import { AddressInfo } from 'net'; -import { io } from 'socket.io-client'; +import { io, Socket } from 'socket.io-client'; +import { Channel, ChannelMultiplexer } from '../../../common/message-rpc/channel'; +import { IWebSocket, WebSocketChannel } from '../../../common/messaging/web-socket-channel'; -export class TestWebSocketChannel extends WebSocketChannel { +export class TestWebSocketChannelSetup { + public readonly multiPlexer: ChannelMultiplexer; + public readonly channel: Channel; constructor({ server, path }: { server: http.Server | https.Server, path: string }) { - super(0, content => socket.send(content)); const socket = io(`ws://localhost:${(server.address() as AddressInfo).port}${WebSocketChannel.wsPath}`); - socket.on('error', error => - this.fireError(error) - ); - socket.on('disconnect', reason => - this.fireClose(0, reason) - ); - socket.on('message', data => { - this.handleMessage(JSON.parse(data.toString())); + this.channel = new WebSocketChannel(toIWebSocket(socket)); + this.multiPlexer = new ChannelMultiplexer(this.channel); + socket.on('connect', () => { + this.multiPlexer.open(path); }); - socket.on('connect', () => - this.open(path) - ); - this.toDispose.push(Disposable.create(() => socket.close())); + socket.connect(); } +} +function toIWebSocket(socket: Socket): IWebSocket { + return { + close: () => { + socket.removeAllListeners('disconnect'); + socket.removeAllListeners('error'); + socket.removeAllListeners('message'); + socket.close(); + }, + isConnected: () => socket.connected, + onClose: cb => socket.on('disconnect', reason => cb(reason)), + onError: cb => socket.on('error', reason => cb(reason)), + onMessage: cb => socket.on('message', data => cb(data)), + send: message => socket.emit('message', message) + }; } diff --git a/packages/debug/src/browser/debug-session-connection.ts b/packages/debug/src/browser/debug-session-connection.ts index 4ef9db7818a74..f309094ed8cbe 100644 --- a/packages/debug/src/browser/debug-session-connection.ts +++ b/packages/debug/src/browser/debug-session-connection.ts @@ -18,13 +18,9 @@ import { DebugProtocol } from 'vscode-debugprotocol'; import { Deferred } from '@theia/core/lib/common/promise-util'; -import { Event, Emitter, DisposableCollection, Disposable, MaybePromise } from '@theia/core'; +import { Event, Emitter, DisposableCollection, Disposable, MaybePromise, Channel } from '@theia/core'; import { OutputChannel } from '@theia/output/lib/browser/output-channel'; -import { Channel } from '../common/debug-service'; - -export type DebugRequestHandler = (request: DebugProtocol.Request) => MaybePromise; - export interface DebugRequestTypes { 'attach': [DebugProtocol.AttachRequestArguments, DebugProtocol.AttachResponse] 'breakpointLocations': [DebugProtocol.BreakpointLocationsArguments, DebugProtocol.BreakpointLocationsResponse] @@ -116,6 +112,8 @@ const standardDebugEvents = new Set([ 'thread' ]); +export type DebugRequestHandler = (request: DebugProtocol.Request) => MaybePromise; + export class DebugSessionConnection implements Disposable { private sequence = 1; @@ -168,7 +166,7 @@ export class DebugSessionConnection implements Disposable { this.cancelPendingRequests(); this.onDidCloseEmitter.fire(); }); - connection.onMessage(data => this.handleMessage(data)); + connection.onMessage(data => this.handleMessage(data().readString())); return connection; } @@ -247,7 +245,7 @@ export class DebugSessionConnection implements Disposable { const dateStr = `${now.toLocaleString(undefined, { hour12: false })}.${now.getMilliseconds()}`; this.traceOutputChannel.appendLine(`${this.sessionId.substring(0, 8)} ${dateStr} theia -> adapter: ${JSON.stringify(message, undefined, 4)}`); } - connection.send(messageStr); + connection.getWriteBuffer().writeString(messageStr).commit(); } protected handleMessage(data: string): void { diff --git a/packages/debug/src/browser/debug-session-contribution.ts b/packages/debug/src/browser/debug-session-contribution.ts index cf8b5e7424cc0..a14db324cdf81 100644 --- a/packages/debug/src/browser/debug-session-contribution.ts +++ b/packages/debug/src/browser/debug-session-contribution.ts @@ -26,10 +26,11 @@ import { DebugSessionOptions } from './debug-session-options'; import { OutputChannelManager, OutputChannel } from '@theia/output/lib/browser/output-channel'; import { DebugPreferences } from './debug-preferences'; import { DebugSessionConnection } from './debug-session-connection'; -import { Channel, DebugAdapterPath } from '../common/debug-service'; +import { DebugAdapterPath } from '../common/debug-service'; import { ContributionProvider } from '@theia/core/lib/common/contribution-provider'; import { FileService } from '@theia/filesystem/lib/browser/file-service'; import { DebugContribution } from './debug-contribution'; +import { Channel } from '@theia/core/lib/common/message-rpc/channel'; import { WorkspaceService } from '@theia/workspace/lib/browser'; /** diff --git a/packages/debug/src/node/debug-adapter-session.ts b/packages/debug/src/node/debug-adapter-session.ts index 03ff950d38a90..b47e552ae6344 100644 --- a/packages/debug/src/node/debug-adapter-session.ts +++ b/packages/debug/src/node/debug-adapter-session.ts @@ -26,7 +26,7 @@ import { DebugAdapterSession } from './debug-model'; import { DebugProtocol } from 'vscode-debugprotocol'; -import { Channel } from '../common/debug-service'; +import { Channel } from '@theia/core'; /** * [DebugAdapterSession](#DebugAdapterSession) implementation. @@ -53,7 +53,7 @@ export class DebugAdapterSessionImpl implements DebugAdapterSession { throw new Error('The session has already been started, id: ' + this.id); } this.channel = channel; - this.channel.onMessage((message: string) => this.write(message)); + this.channel.onMessage(message => this.write(message().readString())); this.channel.onClose(() => this.channel = undefined); } @@ -80,7 +80,7 @@ export class DebugAdapterSessionImpl implements DebugAdapterSession { protected send(message: string): void { if (this.channel) { - this.channel.send(message); + this.channel.getWriteBuffer().writeString(message); } } diff --git a/packages/debug/src/node/debug-model.ts b/packages/debug/src/node/debug-model.ts index a39352fabbddf..3e6de9ce68fc6 100644 --- a/packages/debug/src/node/debug-model.ts +++ b/packages/debug/src/node/debug-model.ts @@ -25,8 +25,7 @@ import { DebugConfiguration } from '../common/debug-configuration'; import { IJSONSchema, IJSONSchemaSnippet } from '@theia/core/lib/common/json-schema'; import { MaybePromise } from '@theia/core/lib/common/types'; -import { Event } from '@theia/core/lib/common/event'; -import { Channel } from '../common/debug-service'; +import { Channel, Event } from '@theia/core'; // FIXME: break down this file to debug adapter and debug adapter contribution (see Theia file naming conventions) diff --git a/packages/filesystem/src/common/files.ts b/packages/filesystem/src/common/files.ts index 95ee67c57d8de..63f789b9da2a3 100644 --- a/packages/filesystem/src/common/files.ts +++ b/packages/filesystem/src/common/files.ts @@ -846,7 +846,7 @@ export function hasOpenReadWriteCloseCapability(provider: FileSystemProvider): p */ export interface FileSystemProviderWithFileReadStreamCapability extends FileSystemProvider { /** - * Read the contents of the given file as stream. + * Read the contents of the given file as stream. * @param resource The `URI` of the file. * * @return The `ReadableStreamEvents` for the readable stream of the given file. diff --git a/packages/filesystem/src/common/remote-file-system-provider.ts b/packages/filesystem/src/common/remote-file-system-provider.ts index 5edb5dbbad9e7..f67e198db75f7 100644 --- a/packages/filesystem/src/common/remote-file-system-provider.ts +++ b/packages/filesystem/src/common/remote-file-system-provider.ts @@ -42,11 +42,11 @@ export interface RemoteFileSystemServer extends JsonRpcServer; open(resource: string, opts: FileOpenOptions): Promise; close(fd: number): Promise; - read(fd: number, pos: number, length: number): Promise<{ bytes: number[]; bytesRead: number; }>; + read(fd: number, pos: number, length: number): Promise<{ bytes: Uint8Array; bytesRead: number; }>; readFileStream(resource: string, opts: FileReadStreamOptions, token: CancellationToken): Promise; - readFile(resource: string): Promise; - write(fd: number, pos: number, data: number[], offset: number, length: number): Promise; - writeFile(resource: string, content: number[], opts: FileWriteOptions): Promise; + readFile(resource: string): Promise; + write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise; + writeFile(resource: string, content: Uint8Array, opts: FileWriteOptions): Promise; delete(resource: string, opts: FileDeleteOptions): Promise; mkdir(resource: string): Promise; readdir(resource: string): Promise<[string, FileType][]>; @@ -70,7 +70,7 @@ export interface RemoteFileSystemClient { notifyDidChangeFile(event: { changes: RemoteFileChange[] }): void; notifyFileWatchError(): void; notifyDidChangeCapabilities(capabilities: FileSystemProviderCapabilities): void; - onFileStreamData(handle: number, data: number[]): void; + onFileStreamData(handle: number, data: Uint8Array): void; onFileStreamEnd(handle: number, error: RemoteFileStreamError | undefined): void; } @@ -169,7 +169,7 @@ export class RemoteFileSystemProvider implements Required, D this.onFileWatchErrorEmitter.fire(); }, notifyDidChangeCapabilities: capabilities => this.setCapabilities(capabilities), - onFileStreamData: (handle, data) => this.onFileStreamDataEmitter.fire([handle, Uint8Array.from(data)]), + onFileStreamData: (handle, data) => this.onFileStreamDataEmitter.fire([handle, data]), onFileStreamEnd: (handle, error) => this.onFileStreamEndEmitter.fire([handle, error]) }); const onInitialized = this.server.onDidOpenConnection(() => { @@ -224,7 +224,7 @@ export class RemoteFileSystemProvider implements Required, D async readFile(resource: URI): Promise { const bytes = await this.server.readFile(resource.toString()); - return Uint8Array.from(bytes); + return bytes; } readFileStream(resource: URI, opts: FileReadStreamOptions, token: CancellationToken): ReadableStreamEvents { @@ -264,11 +264,11 @@ export class RemoteFileSystemProvider implements Required, D } write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise { - return this.server.write(fd, pos, [...data.values()], offset, length); + return this.server.write(fd, pos, data, offset, length); } writeFile(resource: URI, content: Uint8Array, opts: FileWriteOptions): Promise { - return this.server.writeFile(resource.toString(), [...content.values()], opts); + return this.server.writeFile(resource.toString(), content, opts); } delete(resource: URI, opts: FileDeleteOptions): Promise { @@ -412,34 +412,33 @@ export class FileSystemProviderServer implements RemoteFileSystemServer { throw new Error('not supported'); } - async read(fd: number, pos: number, length: number): Promise<{ bytes: number[]; bytesRead: number; }> { + async read(fd: number, pos: number, length: number): Promise<{ bytes: Uint8Array; bytesRead: number; }> { if (hasOpenReadWriteCloseCapability(this.provider)) { const buffer = BinaryBuffer.alloc(this.BUFFER_SIZE); const bytes = buffer.buffer; const bytesRead = await this.provider.read(fd, pos, bytes, 0, length); - return { bytes: [...bytes.values()], bytesRead }; + return { bytes, bytesRead }; } throw new Error('not supported'); } - write(fd: number, pos: number, data: number[], offset: number, length: number): Promise { + write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise { if (hasOpenReadWriteCloseCapability(this.provider)) { - return this.provider.write(fd, pos, Uint8Array.from(data), offset, length); + return this.provider.write(fd, pos, data, offset, length); } throw new Error('not supported'); } - async readFile(resource: string): Promise { + async readFile(resource: string): Promise { if (hasReadWriteCapability(this.provider)) { - const buffer = await this.provider.readFile(new URI(resource)); - return [...buffer.values()]; + return this.provider.readFile(new URI(resource)); } throw new Error('not supported'); } - writeFile(resource: string, content: number[], opts: FileWriteOptions): Promise { + writeFile(resource: string, content: Uint8Array, opts: FileWriteOptions): Promise { if (hasReadWriteCapability(this.provider)) { - return this.provider.writeFile(new URI(resource), Uint8Array.from(content), opts); + return this.provider.writeFile(new URI(resource), content, opts); } throw new Error('not supported'); } @@ -497,7 +496,7 @@ export class FileSystemProviderServer implements RemoteFileSystemServer { if (hasFileReadStreamCapability(this.provider)) { const handle = this.readFileStreamSeq++; const stream = this.provider.readFileStream(new URI(resource), opts, token); - stream.on('data', data => this.client?.onFileStreamData(handle, [...data.values()])); + stream.on('data', data => this.client?.onFileStreamData(handle, data)); stream.on('error', error => { const code = error instanceof FileSystemProviderError ? error.code : undefined; const { name, message, stack } = error; diff --git a/packages/plugin-ext/src/common/connection.ts b/packages/plugin-ext/src/common/connection.ts index 48ae3adb36363..4139d3314c0e9 100644 --- a/packages/plugin-ext/src/common/connection.ts +++ b/packages/plugin-ext/src/common/connection.ts @@ -13,27 +13,38 @@ // // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import { Channel } from '@theia/debug/lib/common/debug-service'; import { ConnectionExt, ConnectionMain } from './plugin-api-rpc'; -import { Emitter } from '@theia/core/lib/common/event'; +import { Emitter, Event } from '@theia/core/lib/common/event'; +import { ChannelCloseEvent, MessageProvider } from '@theia/core/lib/common/message-rpc/channel'; +import { WriteBuffer, Channel } from '@theia/core'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer'; /** * A channel communicating with a counterpart in a plugin host. */ export class PluginChannel implements Channel { - private messageEmitter: Emitter = new Emitter(); + private messageEmitter: Emitter = new Emitter(); private errorEmitter: Emitter = new Emitter(); - private closedEmitter: Emitter = new Emitter(); + private closedEmitter: Emitter = new Emitter(); constructor( - protected readonly id: string, + readonly id: string, protected readonly connection: ConnectionExt | ConnectionMain) { } + getWriteBuffer(): WriteBuffer { + const result = new Uint8ArrayWriteBuffer(); + result.onCommit(buffer => { + this.connection.$sendMessage(this.id, new Uint8ArrayReadBuffer(buffer).readString()); + }); + + return result; + } + send(content: string): void { this.connection.$sendMessage(this.id, content); } - fireMessageReceived(msg: string): void { + fireMessageReceived(msg: MessageProvider): void { this.messageEmitter.fire(msg); } @@ -42,21 +53,19 @@ export class PluginChannel implements Channel { } fireClosed(): void { - this.closedEmitter.fire(); + this.closedEmitter.fire({ reason: 'Plugin channel has been closed from the extension side' }); } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - onMessage(cb: (data: any) => void): void { - this.messageEmitter.event(cb); + get onMessage(): Event { + return this.messageEmitter.event; } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - onError(cb: (reason: any) => void): void { - this.errorEmitter.event(cb); + get onError(): Event { + return this.errorEmitter.event; } - onClose(cb: (code: number, reason: string) => void): void { - this.closedEmitter.event(() => cb(-1, 'closed')); + get onClose(): Event { + return this.closedEmitter.event; } close(): void { @@ -80,7 +89,10 @@ export class ConnectionImpl implements ConnectionMain, ConnectionExt { */ async $sendMessage(id: string, message: string): Promise { if (this.connections.has(id)) { - this.connections.get(id)!.fireMessageReceived(message); + const writer = new Uint8ArrayWriteBuffer(); + writer.writeString(message); + const reader = new Uint8ArrayReadBuffer(writer.getCurrentContents()); + this.connections.get(id)!.fireMessageReceived(() => reader); } else { console.warn(`Received message for unknown connection: ${id}`); } diff --git a/packages/plugin-ext/src/main/browser/debug/plugin-debug-session-factory.ts b/packages/plugin-ext/src/main/browser/debug/plugin-debug-session-factory.ts index b16892dccc727..176fdb246a693 100644 --- a/packages/plugin-ext/src/main/browser/debug/plugin-debug-session-factory.ts +++ b/packages/plugin-ext/src/main/browser/debug/plugin-debug-session-factory.ts @@ -30,7 +30,7 @@ import { TerminalOptionsExt } from '../../../common/plugin-api-rpc'; import { FileService } from '@theia/filesystem/lib/browser/file-service'; import { DebugContribution } from '@theia/debug/lib/browser/debug-contribution'; import { ContributionProvider } from '@theia/core/lib/common/contribution-provider'; -import { Channel } from '@theia/debug/lib/common/debug-service'; +import { Channel } from '@theia/core/lib/common/message-rpc/channel'; import { WorkspaceService } from '@theia/workspace/lib/browser'; export class PluginDebugSession extends DebugSession { diff --git a/packages/plugin-ext/src/plugin/node/debug/plugin-debug-adapter-session.ts b/packages/plugin-ext/src/plugin/node/debug/plugin-debug-adapter-session.ts index b05859750198d..b34a73ae201be 100644 --- a/packages/plugin-ext/src/plugin/node/debug/plugin-debug-adapter-session.ts +++ b/packages/plugin-ext/src/plugin/node/debug/plugin-debug-adapter-session.ts @@ -17,7 +17,7 @@ import { DebugAdapterSessionImpl } from '@theia/debug/lib/node/debug-adapter-session'; import * as theia from '@theia/plugin'; import { DebugAdapter } from '@theia/debug/lib/node/debug-model'; -import { Channel } from '@theia/debug/lib/common/debug-service'; +import { Channel } from '@theia/core/lib/common/message-rpc/channel'; /* eslint-disable @typescript-eslint/no-explicit-any */ diff --git a/packages/task/src/node/task-server.slow-spec.ts b/packages/task/src/node/task-server.slow-spec.ts index fbe968348d9d2..c387a985a8e2c 100644 --- a/packages/task/src/node/task-server.slow-spec.ts +++ b/packages/task/src/node/task-server.slow-spec.ts @@ -28,9 +28,10 @@ import { isWindows, isOSX } from '@theia/core/lib/common/os'; import { FileUri } from '@theia/core/lib/node'; import { terminalsPath } from '@theia/terminal/lib/common/terminal-protocol'; import { expectThrowsAsync } from '@theia/core/lib/common/test/expect'; -import { TestWebSocketChannel } from '@theia/core/lib/node/messaging/test/test-web-socket-channel'; +import { TestWebSocketChannelSetup } from '@theia/core/lib/node/messaging/test/test-web-socket-channel'; import { expect } from 'chai'; import URI from '@theia/core/lib/common/uri'; +import { RpcProtocol } from '@theia/core'; // test scripts that we bundle with tasks const commandShortRunning = './task'; @@ -106,26 +107,37 @@ describe('Task server / back-end', function (): void { // hook-up to terminal's ws and confirm that it outputs expected tasks' output await new Promise((resolve, reject) => { - const channel = new TestWebSocketChannel({ server, path: `${terminalsPath}/${terminalId}` }); - channel.onError(reject); - channel.onClose((code, reason) => reject(new Error(`channel is closed with '${code}' code and '${reason}' reason`))); - channel.onMessage(msg => { - // check output of task on terminal is what we expect - const expected = `${isOSX ? 'tasking osx' : 'tasking'}... ${someString}`; - // Instead of waiting for one message from the terminal, we wait for several ones as the very first message can be something unexpected. - // For instance: `nvm is not compatible with the \"PREFIX\" environment variable: currently set to \"/usr/local\"\r\n` - const currentMessage = msg.toString(); - messages.unshift(currentMessage); - if (currentMessage.indexOf(expected) !== -1) { - resolve(); - channel.close(); - return; - } - if (messages.length >= messagesToWaitFor) { - reject(new Error(`expected sub-string not found in terminal output. Expected: "${expected}" vs Actual messages: ${JSON.stringify(messages)}`)); - channel.close(); - } + const setup = new TestWebSocketChannelSetup({ server, path: `${terminalsPath}/${terminalId}` }); + setup.multiPlexer.onDidOpenChannel(event => { + const channel = event.channel; + const connection = new RpcProtocol(channel, (method, args) => { + reject(`Received unexpected request: ${method} with args: ${args} `); + return Promise.reject(); + }); + channel.onError(reject); + channel.onClose(() => reject(new Error('Channel has been closed'))); + connection.onNotification(not => { + // check output of task on terminal is what we expect + const expected = `${isOSX ? 'tasking osx' : 'tasking'}... ${someString}`; + // Instead of waiting for one message from the terminal, we wait for several ones as the very first message can be something unexpected. + // For instance: `nvm is not compatible with the \"PREFIX\" environment variable: currently set to \"/usr/local\"\r\n` + const currentMessage = not.args[0]; + messages.unshift(currentMessage); + if (currentMessage.indexOf(expected) !== -1) { + resolve(); + channel.close(); + return; + } + if (messages.length >= messagesToWaitFor) { + reject(new Error(`expected sub-string not found in terminal output. Expected: "${expected}" vs Actual messages: ${JSON.stringify(messages)}`)); + channel.close(); + } + }); + channel.onMessage(reader => { + + }); }); + }); }); diff --git a/packages/terminal/src/browser/terminal-widget-impl.ts b/packages/terminal/src/browser/terminal-widget-impl.ts index 311b9122e3659..040831c26c88c 100644 --- a/packages/terminal/src/browser/terminal-widget-impl.ts +++ b/packages/terminal/src/browser/terminal-widget-impl.ts @@ -17,7 +17,7 @@ import { Terminal, RendererType } from 'xterm'; import { FitAddon } from 'xterm-addon-fit'; import { inject, injectable, named, postConstruct } from '@theia/core/shared/inversify'; -import { ContributionProvider, Disposable, Event, Emitter, ILogger, DisposableCollection } from '@theia/core'; +import { ContributionProvider, Disposable, Event, Emitter, ILogger, DisposableCollection, RpcProtocol, RequestHandler } from '@theia/core'; import { Widget, Message, WebSocketConnectionProvider, StatefulWidget, isFirefox, MessageLoop, KeyCode, codicon } from '@theia/core/lib/browser'; import { isOSX } from '@theia/core/lib/common'; import { WorkspaceService } from '@theia/workspace/lib/browser'; @@ -26,7 +26,6 @@ import { terminalsPath } from '../common/terminal-protocol'; import { IBaseTerminalServer, TerminalProcessInfo } from '../common/base-terminal-protocol'; import { TerminalWatcher } from '../common/terminal-watcher'; import { TerminalWidgetOptions, TerminalWidget, TerminalDimensions } from './base/terminal-widget'; -import { MessageConnection } from '@theia/core/shared/vscode-ws-jsonrpc'; import { Deferred } from '@theia/core/lib/common/promise-util'; import { TerminalPreferences, TerminalRendererType, isTerminalRendererType, DEFAULT_TERMINAL_RENDERER_TYPE, CursorStyle } from './terminal-preferences'; import { TerminalContribution } from './terminal-contribution'; @@ -58,7 +57,7 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget protected searchBox: TerminalSearchWidget; protected restored = false; protected closeOnDispose = true; - protected waitForConnection: Deferred | undefined; + protected waitForConnection: Deferred | undefined; protected hoverMessage: HTMLDivElement; protected lastTouchEnd: TouchEvent | undefined; protected isAttachedCloseListener: boolean = false; @@ -507,16 +506,23 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget } this.toDisposeOnConnect.dispose(); this.toDispose.push(this.toDisposeOnConnect); - const waitForConnection = this.waitForConnection = new Deferred(); + const waitForConnection = this.waitForConnection = new Deferred(); this.webSocketConnectionProvider.listen({ path: `${terminalsPath}/${this.terminalId}`, onConnection: connection => { - connection.onNotification('onData', (data: string) => this.write(data)); + const requestHandler: RequestHandler = _method => this.logger.warn('Received an unhandled RPC request from the terminal process'); + + const rpc = new RpcProtocol(connection, requestHandler); + rpc.onNotification(event => { + if (event.method === 'onData') { + this.write(event.args[0]); + } + }); // Excludes the device status code emitted by Xterm.js const sendData = (data?: string) => { if (data && !this.deviceStatusCodes.has(data) && !this.disableEnterWhenAttachCloseListener()) { - return connection.sendRequest('write', data); + return rpc.sendRequest('write', [data]); } }; @@ -524,12 +530,10 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget disposable.push(this.term.onData(sendData)); disposable.push(this.term.onBinary(sendData)); - connection.onDispose(() => disposable.dispose()); + connection.onClose(() => disposable.dispose()); - this.toDisposeOnConnect.push(connection); - connection.listen(); if (waitForConnection) { - waitForConnection.resolve(connection); + waitForConnection.resolve(rpc); } } }, { reconnecting: false }); @@ -579,7 +583,7 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget sendText(text: string): void { if (this.waitForConnection) { this.waitForConnection.promise.then(connection => - connection.sendRequest('write', text) + connection.sendRequest('write', [text]) ); } } diff --git a/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts b/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts index 6d39ddd973f20..3a76e6e785ff0 100644 --- a/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts +++ b/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts @@ -20,7 +20,7 @@ import { IShellTerminalServer } from '../common/shell-terminal-protocol'; import * as http from 'http'; import * as https from 'https'; import { terminalsPath } from '../common/terminal-protocol'; -import { TestWebSocketChannel } from '@theia/core/lib/node/messaging/test/test-web-socket-channel'; +import { TestWebSocketChannelSetup } from '@theia/core/lib/node/messaging/test/test-web-socket-channel'; describe('Terminal Backend Contribution', function (): void { @@ -45,13 +45,19 @@ describe('Terminal Backend Contribution', function (): void { it('is data received from the terminal ws server', async () => { const terminalId = await shellTerminalServer.create({}); await new Promise((resolve, reject) => { - const channel = new TestWebSocketChannel({ server, path: `${terminalsPath}/${terminalId}` }); + const path = `${terminalsPath}/${terminalId}`; + const { channel, multiPlexer } = new TestWebSocketChannelSetup({ server, path }); channel.onError(reject); - channel.onClose((code, reason) => reject(new Error(`channel is closed with '${code}' code and '${reason}' reason`))); - channel.onOpen(() => { - resolve(); - channel.close(); + channel.onClose(event => reject(new Error(`channel is closed with '${event.code}' code and '${event.reason}' reason}`))); + + multiPlexer.onDidOpenChannel(event => { + if (event.id === path) { + resolve(); + channel.close(); + } }); + }); }); + }); diff --git a/packages/terminal/src/node/terminal-backend-contribution.ts b/packages/terminal/src/node/terminal-backend-contribution.ts index 4675b7a32290c..ae55fe138d73e 100644 --- a/packages/terminal/src/node/terminal-backend-contribution.ts +++ b/packages/terminal/src/node/terminal-backend-contribution.ts @@ -15,10 +15,11 @@ // ***************************************************************************** import { injectable, inject, named } from '@theia/core/shared/inversify'; -import { ILogger } from '@theia/core/lib/common'; +import { ILogger, RequestHandler } from '@theia/core/lib/common'; import { TerminalProcess, ProcessManager } from '@theia/process/lib/node'; import { terminalsPath } from '../common/terminal-protocol'; import { MessagingService } from '@theia/core/lib/node/messaging/messaging-service'; +import { RpcProtocol } from '@theia/core/'; @injectable() export class TerminalBackendContribution implements MessagingService.Contribution { @@ -30,19 +31,28 @@ export class TerminalBackendContribution implements MessagingService.Contributio protected readonly logger: ILogger; configure(service: MessagingService): void { - service.listen(`${terminalsPath}/:id`, (params: { id: string }, connection) => { + service.wsChannel(`${terminalsPath}/:id`, (params: { id: string }, channel) => { const id = parseInt(params.id, 10); const termProcess = this.processManager.get(id); if (termProcess instanceof TerminalProcess) { const output = termProcess.createOutputStream(); - output.on('data', data => connection.sendNotification('onData', data.toString())); - connection.onRequest('write', (data: string) => termProcess.write(data)); - connection.onClose(() => output.dispose()); - connection.listen(); - } else { - connection.dispose(); + // Create a RPC connection to the terminal process + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const requestHandler: RequestHandler = async (method: string, args: any[]) => { + if (method === 'write' && args[0]) { + termProcess.write(args[0]); + } else { + this.logger.warn('Terminal process received a request with an unsupported method or argument', { method, args }); + } + }; + + const rpc = new RpcProtocol(channel, requestHandler); + output.on('data', data => { + rpc.sendNotification('onData', [data]); + }); + channel.onClose(() => output.dispose()); } }); } - } + diff --git a/yarn.lock b/yarn.lock index 90f9595550e63..6e26c479ec832 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2184,6 +2184,13 @@ resolved "https://registry.yarnpkg.com/@types/caseless/-/caseless-0.12.2.tgz#f65d3d6389e01eeb458bd54dc8f52b95a9463bc8" integrity sha512-6ckxMjBBD8URvjB6J3NcnuAn5Pkl7t3TizAg+xdlzzQGSPSmBcXf8KoIH0ua/i+tio+ZRUHEXp0HEmvaR4kt0w== +"@types/chai-spies@1.0.3": + version "1.0.3" + resolved "https://registry.yarnpkg.com/@types%2fchai-spies/-/chai-spies-1.0.3.tgz#a52dc61af3853ec9b80965040811d15dfd401542" + integrity sha512-RBZjhVuK7vrg4rWMt04UF5zHYwfHnpk5mIWu3nQvU3AKGDixXzSjZ6v0zke6pBcaJqMv3IBZ5ibLWPMRDL0sLw== + dependencies: + "@types/chai" "*" + "@types/chai-string@^1.4.0": version "1.4.2" resolved "https://registry.yarnpkg.com/@types/chai-string/-/chai-string-1.4.2.tgz#0f116504a666b6c6a3c42becf86634316c9a19ac" @@ -2196,6 +2203,11 @@ resolved "https://registry.yarnpkg.com/@types/chai/-/chai-4.3.1.tgz#e2c6e73e0bdeb2521d00756d099218e9f5d90a04" integrity sha512-/zPMqDkzSZ8t3VtxOa4KPq7uzzW978M9Tvh+j7GHKuo6k6GTLxPJ4J5gE5cjfJ26pnXst0N5Hax8Sr0T2Mi9zQ== +"@types/chai@4.3.0": + version "4.3.0" + resolved "https://registry.yarnpkg.com/@types/chai/-/chai-4.3.0.tgz#23509ebc1fa32f1b4d50d6a66c4032d5b8eaabdc" + integrity sha512-/ceqdqeRraGolFTcfoXNiqjyQhZzbINDngeoAq9GoHa8PPK1yNzTaxWjA6BFWp5Ua9JpXEMSS4s5i9tS0hOJtw== + "@types/component-emitter@^1.2.10": version "1.2.11" resolved "https://registry.yarnpkg.com/@types/component-emitter/-/component-emitter-1.2.11.tgz#50d47d42b347253817a39709fef03ce66a108506" @@ -3907,11 +3919,28 @@ caseless@~0.12.0: resolved "https://registry.yarnpkg.com/caseless/-/caseless-0.12.0.tgz#1b681c21ff84033c826543090689420d187151dc" integrity sha1-G2gcIf+EAzyCZUMJBolCDRhxUdw= +chai-spies@1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/chai-spies/-/chai-spies-1.0.0.tgz#d16b39336fb316d03abf8c375feb23c0c8bb163d" + integrity sha512-elF2ZUczBsFoP07qCfMO/zeggs8pqCf3fZGyK5+2X4AndS8jycZYID91ztD9oQ7d/0tnS963dPkd0frQEThDsg== + chai-string@^1.4.0: version "1.5.0" resolved "https://registry.yarnpkg.com/chai-string/-/chai-string-1.5.0.tgz#0bdb2d8a5f1dbe90bc78ec493c1c1c180dd4d3d2" integrity sha512-sydDC3S3pNAQMYwJrs6dQX0oBQ6KfIPuOZ78n7rocW0eJJlsHPh2t3kwW7xfwYA/1Bf6/arGtSUo16rxR2JFlw== +chai@4.3.4: + version "4.3.4" + resolved "https://registry.yarnpkg.com/chai/-/chai-4.3.4.tgz#b55e655b31e1eac7099be4c08c21964fce2e6c49" + integrity sha512-yS5H68VYOCtN1cjfwumDSuzn/9c+yza4f3reKXlE5rUg7SFcCEy90gJvydNgOYtblyf4Zi6jIWRnXOgErta0KA== + dependencies: + assertion-error "^1.1.0" + check-error "^1.0.2" + deep-eql "^3.0.1" + get-func-name "^2.0.0" + pathval "^1.1.1" + type-detect "^4.0.5" + chai@^4.2.0: version "4.3.6" resolved "https://registry.yarnpkg.com/chai/-/chai-4.3.6.tgz#ffe4ba2d9fa9d6680cc0b370adae709ec9011e9c" @@ -11606,7 +11635,7 @@ vscode-debugprotocol@^1.32.0: resolved "https://registry.yarnpkg.com/vscode-debugprotocol/-/vscode-debugprotocol-1.51.0.tgz#c03168dac778b6c24ce17b3511cb61e89c11b2df" integrity sha512-dzKWTMMyebIMPF1VYMuuQj7gGFq7guR8AFya0mKacu+ayptJfaRuM0mdHCqiOth4FnRP8mPhEroFPx6Ift8wHA== -vscode-jsonrpc@^5.0.0, vscode-jsonrpc@^5.0.1: +vscode-jsonrpc@^5.0.1: version "5.0.1" resolved "https://registry.yarnpkg.com/vscode-jsonrpc/-/vscode-jsonrpc-5.0.1.tgz#9bab9c330d89f43fc8c1e8702b5c36e058a01794" integrity sha512-JvONPptw3GAQGXlVV2utDcHx0BiY34FupW/kI6mZ5x06ER5DdPG/tXWMVHjTNULF5uKPOUUD0SaXg5QaubJL0A== @@ -11659,13 +11688,6 @@ vscode-uri@^2.1.1: resolved "https://registry.yarnpkg.com/vscode-uri/-/vscode-uri-2.1.2.tgz#c8d40de93eb57af31f3c715dd650e2ca2c096f1c" integrity sha512-8TEXQxlldWAuIODdukIb+TR5s+9Ds40eSJrw+1iDDA9IFORPjMELarNQE3myz5XIkWWpdprmJjm1/SxMlWOC8A== -vscode-ws-jsonrpc@^0.2.0: - version "0.2.0" - resolved "https://registry.yarnpkg.com/vscode-ws-jsonrpc/-/vscode-ws-jsonrpc-0.2.0.tgz#5e9c26e10da54a1a235da7d59e74508bbcb8edd9" - integrity sha512-NE9HNRgPjCaPyTJvIudcpyIWPImxwRDtuTX16yks7SAiZgSXigxAiZOvSvVBGmD1G/OMfrFo6BblOtjVR9DdVA== - dependencies: - vscode-jsonrpc "^5.0.0" - w3c-hr-time@^1.0.1: version "1.0.2" resolved "https://registry.yarnpkg.com/w3c-hr-time/-/w3c-hr-time-1.0.2.tgz#0a89cdf5cc15822df9c360543676963e0cc308cd"