Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reapply changes for new message rpc #40

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@
- `FileSystem`, `FileMoveOptions`, `FileDeleteOptions`, `FileStat`, `FileSystemError`
- [filesystem] updated `FileStatNodeData.fileStat` to use the non-deprecated `FileStat` from `@theia/core/lib/common/files` [#11176](https://github.com/eclipse-theia/theia/pull/1176)

<a name="breaking_changes_1.26.0">[Breaking Changes:](#breaking_changes_1.26.0)</a>

- [core] Refactored the core messaging API. Replaced `vscode-ws-jsonrpc` with a custom RPC protocol that is better suited for handling binary data and enables message tunneling.
This impacts all main concepts of the messaging API. The API no longer exposes a `Connection` object and uses a generic `Channel` implementation instead.
- Replaces usage of `vscode-json-rpc`'s `Connection` with the new generic `Channel`. Affects `AbstractConnectionProvider`, `MessagingService`, `IPCConnectionProvider`, `ElectronMessagingService`
- `MessagingService`: No longer offers the `listen` and `forward` method. Use `wsChannel` instead.
- `RemoteFileSystemServer`: Use `UInt8Array` instead of plain number arrays for all arguments and return type that store binary data
- `DebugAdapter`: Replaced the debug-service internal `Channel` implementation with the newly introduced generic `Channel`.
[#11011](https://github.com/eclipse-theia/theia/pull/11011) - Contributed on behalf of STMicroelectronics.


## v1.25.0 - 4/28/2022

[1.25.0 Milestone](https://github.com/eclipse-theia/theia/milestone/35)
Expand Down
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
"**/@types/node": "12"
},
"devDependencies": {
"@types/chai": "4.3.0",
"@types/chai-spies": "1.0.3",
"@types/chai-string": "^1.4.0",
"@types/jsdom": "^11.0.4",
"@types/node": "12",
Expand All @@ -20,6 +22,8 @@
"@typescript-eslint/eslint-plugin": "^4.8.1",
"@typescript-eslint/eslint-plugin-tslint": "^4.8.1",
"@typescript-eslint/parser": "^4.8.1",
"chai": "4.3.4",
"chai-spies": "1.0.0",
"chai-string": "^1.4.0",
"chalk": "4.0.0",
"concurrently": "^3.5.0",
Expand Down
1 change: 0 additions & 1 deletion packages/core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ export class SomeClass {
- `react-virtualized` (from [`react-virtualized@^9.20.0`](https://www.npmjs.com/package/react-virtualized))
- `vscode-languageserver-protocol` (from [`vscode-languageserver-protocol@~3.15.3`](https://www.npmjs.com/package/vscode-languageserver-protocol))
- `vscode-uri` (from [`vscode-uri@^2.1.1`](https://www.npmjs.com/package/vscode-uri))
- `vscode-ws-jsonrpc` (from [`vscode-ws-jsonrpc@^0.2.0`](https://www.npmjs.com/package/vscode-ws-jsonrpc))
- `dompurify` (from [`dompurify@^2.2.9`](https://www.npmjs.com/package/dompurify))
- `express` (from [`express@^4.16.3`](https://www.npmjs.com/package/express))
- `lodash.debounce` (from [`lodash.debounce@^4.0.8`](https://www.npmjs.com/package/lodash.debounce))
Expand Down
5 changes: 1 addition & 4 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
"react-dom": "^16.8.0",
"react-tooltip": "^4.2.21",
"react-virtualized": "^9.20.0",
"reconnecting-websocket": "^4.2.0",
"reflect-metadata": "^0.1.10",
"route-parser": "^0.0.5",
"safer-buffer": "^2.1.2",
Expand All @@ -70,7 +69,6 @@
"uuid": "^8.3.2",
"vscode-languageserver-protocol": "~3.15.3",
"vscode-uri": "^2.1.1",
"vscode-ws-jsonrpc": "^0.2.0",
"ws": "^7.1.2",
"yargs": "^15.3.1"
},
Expand Down Expand Up @@ -113,8 +111,7 @@
"react-dom",
"react-virtualized",
"vscode-languageserver-protocol",
"vscode-uri",
"vscode-ws-jsonrpc"
"vscode-uri"
],
"export =": [
"dompurify as DOMPurify",
Expand Down
1 change: 0 additions & 1 deletion packages/core/shared/vscode-ws-jsonrpc/index.d.ts

This file was deleted.

1 change: 0 additions & 1 deletion packages/core/shared/vscode-ws-jsonrpc/index.js

This file was deleted.

59 changes: 34 additions & 25 deletions packages/core/src/browser/messaging/ws-connection-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
// *****************************************************************************

import { injectable, interfaces, decorate, unmanaged } from 'inversify';
import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event } from '../../common';
import { WebSocketChannel } from '../../common/messaging/web-socket-channel';
import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event, Channel } from '../../common';
import { Endpoint } from '../endpoint';
import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider';
import { io, Socket } from 'socket.io-client';
import { IWebSocket, WebSocketChannel } from '../../common/messaging/web-socket-channel';

decorate(injectable(), JsonRpcProxyFactory);
decorate(unmanaged(), JsonRpcProxyFactory, 0);
Expand Down Expand Up @@ -53,26 +53,42 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
constructor() {
super();
const url = this.createWebSocketUrl(WebSocketChannel.wsPath);
const socket = this.createWebSocket(url);
socket.on('connect', () => {
this.fireSocketDidOpen();
});
socket.on('disconnect', reason => {
for (const channel of [...this.channels.values()]) {
channel.close(undefined, reason);
this.socket = this.createWebSocket(url);
this.socket.on('connect', () => {
this.initializeMultiplexer();
if (this.reconnectChannelOpeners.length > 0) {
this.reconnectChannelOpeners.forEach(opener => opener());
this.reconnectChannelOpeners = [];
}
this.fireSocketDidClose();
});
socket.on('message', data => {
this.handleIncomingRawMessage(data);
this.socket.on('disconnect', () => this.fireSocketDidClose());
this.socket.on('message', () => this.onIncomingMessageActivityEmitter.fire(undefined));
this.fireSocketDidOpen();
});
socket.connect();
this.socket = socket;
this.socket.connect();
}

override openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: WebSocketOptions): void {
protected createMainChannel(): Channel {
return new WebSocketChannel(this.toIWebSocket(this.socket));
}

protected toIWebSocket(socket: Socket): IWebSocket {
return {
close: () => {
socket.removeAllListeners('disconnect');
socket.removeAllListeners('error');
socket.removeAllListeners('message');
},
isConnected: () => socket.connected,
onClose: cb => socket.on('disconnect', reason => cb(reason)),
onError: cb => socket.on('error', reason => cb(reason)),
onMessage: cb => socket.on('message', data => cb(data)),
send: message => socket.emit('message', message)
};
}

override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise<void> {
if (this.socket.connected) {
super.openChannel(path, handler, options);
return super.openChannel(path, handler, options);
} else {
const openChannel = () => {
this.socket.off('connect', openChannel);
Expand All @@ -82,14 +98,6 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
}
}

protected createChannel(id: number): WebSocketChannel {
return new WebSocketChannel(id, content => {
if (this.socket.connected) {
this.socket.send(content);
}
});
}

/**
* @param path The handler to reach in the backend.
*/
Expand Down Expand Up @@ -143,3 +151,4 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
this.onSocketDidCloseEmitter.fire(undefined);
}
}

2 changes: 1 addition & 1 deletion packages/core/src/browser/progress-status-bar-item.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// *****************************************************************************

import { injectable, inject } from 'inversify';
import { CancellationToken } from 'vscode-ws-jsonrpc';
import { CancellationToken } from '../../shared/vscode-languageserver-protocol';
import { ProgressClient, ProgressMessage, ProgressUpdate } from '../common';
import { StatusBar, StatusBarAlignment } from './status-bar';
import { Deferred } from '../common/promise-util';
Expand Down
8 changes: 8 additions & 0 deletions packages/core/src/common/cancellation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ export namespace CancellationToken {
isCancellationRequested: true,
onCancellationRequested: shortcutEvent
});

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function is(value: any): value is CancellationToken {
const candidate = value as CancellationToken;
return candidate && (candidate === CancellationToken.None
|| candidate === CancellationToken.Cancelled
|| (typeof candidate.isCancellationRequested === 'boolean' && !!candidate.onCancellationRequested));
}
}

export class CancellationError extends Error {
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export * from './contribution-provider';
export * from './path';
export * from './logger';
export * from './messaging';
export * from './message-rpc';
export * from './message-service';
export * from './message-service-protocol';
export * from './progress-service';
Expand Down
88 changes: 88 additions & 0 deletions packages/core/src/common/message-rpc/channel.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// *****************************************************************************
// Copyright (C) 2021 Red Hat, Inc. and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0.
//
// This Source Code may also be made available under the following Secondary
// Licenses when the conditions for such availability set forth in the Eclipse
// Public License v. 2.0 are satisfied: GNU General Public License, version 2
// with the GNU Classpath Exception which is available at
// https://www.gnu.org/software/classpath/license.html.
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************

import { assert, expect, spy, use } from 'chai';
import * as spies from 'chai-spies';
import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer';
import { ChannelMultiplexer, ForwardingChannel, MessageProvider } from './channel';

use(spies);

/**
* A pipe with two channels at each end for testing.
*/
export class ChannelPipe {
readonly left: ForwardingChannel = new ForwardingChannel('left', () => this.right.onCloseEmitter.fire({ reason: 'Left channel has been closed' }), () => {
const leftWrite = new Uint8ArrayWriteBuffer();
leftWrite.onCommit(buffer => {
this.right.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer));
});
return leftWrite;
});
readonly right: ForwardingChannel = new ForwardingChannel('right', () => this.left.onCloseEmitter.fire({ reason: 'Right channel has been closed' }), () => {
const rightWrite = new Uint8ArrayWriteBuffer();
rightWrite.onCommit(buffer => {
this.left.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer));
});
return rightWrite;
});
}
describe('Message Channel', () => {
describe('Channel multiplexer', () => {
it('should forward messages to intended target channel', async () => {
const pipe = new ChannelPipe();

const leftMultiplexer = new ChannelMultiplexer(pipe.left);
const rightMultiplexer = new ChannelMultiplexer(pipe.right);
const openChannelSpy = spy(() => {
});

rightMultiplexer.onDidOpenChannel(openChannelSpy);
leftMultiplexer.onDidOpenChannel(openChannelSpy);

const leftFirst = await leftMultiplexer.open('first');
const leftSecond = await leftMultiplexer.open('second');

const rightFirst = rightMultiplexer.getOpenChannel('first');
const rightSecond = rightMultiplexer.getOpenChannel('second');

assert.isNotNull(rightFirst);
assert.isNotNull(rightSecond);

const leftSecondSpy = spy((buf: MessageProvider) => {
const message = buf().readString();
expect(message).equal('message for second');
});

leftSecond.onMessage(leftSecondSpy);

const rightFirstSpy = spy((buf: MessageProvider) => {
const message = buf().readString();
expect(message).equal('message for first');
});

rightFirst!.onMessage(rightFirstSpy);

leftFirst.getWriteBuffer().writeString('message for first').commit();
rightSecond!.getWriteBuffer().writeString('message for second').commit();

expect(leftSecondSpy).to.be.called();
expect(rightFirstSpy).to.be.called();

expect(openChannelSpy).to.be.called.exactly(4);
});
});
});
Loading