Skip to content

Commit

Permalink
Fix critical bugs
Browse files Browse the repository at this point in the history
- eclipse-theiaGH-11196 Remove dev/debug logoutput from IPCChannel
- eclipse-theiaGH-11199 Refactor connection provider and channel multiplexer to properly handle a reconnecting backend
   -  Remove console log in websocket-channel if the socket is not connected. Otherwise we end up in an endless loop.
   -  Ensure that channels & RpcProtocol instances proplery dispose all resources if the backend disconnects.
   -  Ensure that all previously open channels and RpcProtocol instances are properly restored once the backend reconnects.

- eclipse-theia#11203 Ensure that debugging is handled gracefully (implicitly fixed with the fix for eclipse-theia#11199)

- Remove dependency to `reconnecting-websocket` which is no longer needed since the swap to socket.io

Fixes eclipse-theia#11196
Fixes eclipse-theia#11199

Contributed on behalf of STMicroelectronics
  • Loading branch information
tortmayr committed May 31, 2022
1 parent b62e17e commit ea5e5a4
Show file tree
Hide file tree
Showing 13 changed files with 100 additions and 71 deletions.
1 change: 0 additions & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
"react-dom": "^16.8.0",
"react-tooltip": "^4.2.21",
"react-virtualized": "^9.20.0",
"reconnecting-websocket": "^4.2.0",
"reflect-metadata": "^0.1.10",
"route-parser": "^0.0.5",
"safer-buffer": "^2.1.2",
Expand Down
27 changes: 17 additions & 10 deletions packages/core/src/browser/messaging/ws-connection-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ export interface WebSocketOptions {
export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebSocketOptions> {

protected readonly onSocketDidOpenEmitter: Emitter<void> = new Emitter();
// Socket that is used by the main channel
protected socket: Socket;
get onSocketDidOpen(): Event<void> {
return this.onSocketDidOpenEmitter.event;
}
Expand All @@ -50,17 +48,27 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
return container.get(WebSocketConnectionProvider).createProxy<T>(path, arg);
}

protected createMainChannel(): Channel {
protected readonly socket: Socket;

constructor() {
super();
const url = this.createWebSocketUrl(WebSocketChannel.wsPath);
const socket = this.createWebSocket(url);
const channel = new WebSocketChannel(this.toIWebSocket(socket));
socket.on('connect', () => {
this.socket = this.createWebSocket(url);
this.socket.on('connect', () => {
this.initializeMultiplexer();
if (this.reconnectChannelOpeners.length > 0) {
this.reconnectChannelOpeners.forEach(opener => opener());
this.reconnectChannelOpeners = [];
}
this.fireSocketDidOpen();
this.socket.on('disconnect', () => this.fireSocketDidClose());
});
channel.onClose(() => this.fireSocketDidClose());
socket.connect();
this.socket = socket;
this.socket.connect();
}

protected createMainChannel(): Channel {
const channel = new WebSocketChannel(this.toIWebSocket(this.socket));
channel.onMessage(() => this.onIncomingMessageActivityEmitter.fire());
return channel;
}

Expand All @@ -70,7 +78,6 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
socket.removeAllListeners('disconnect');
socket.removeAllListeners('error');
socket.removeAllListeners('message');
socket.close();
},
isConnected: () => socket.connected,
onClose: cb => socket.on('disconnect', reason => cb(reason)),
Expand Down
46 changes: 29 additions & 17 deletions packages/core/src/common/message-rpc/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************

import { Disposable, DisposableCollection } from '../disposable';
import { Emitter, Event } from '../event';
import { ReadBuffer, WriteBuffer } from './message-buffer';

Expand Down Expand Up @@ -72,7 +73,9 @@ export type MessageProvider = () => ReadBuffer;
*/
export class ForwardingChannel implements Channel {

protected toDispose = new DisposableCollection();
constructor(readonly id: string, protected readonly closeHandler: () => void, protected readonly writeBufferSource: () => WriteBuffer) {
this.toDispose.pushAll([this.onCloseEmitter, this.onErrorEmitter, this.onMessageEmitter]);
}

onCloseEmitter: Emitter<ChannelCloseEvent> = new Emitter();
Expand All @@ -94,14 +97,9 @@ export class ForwardingChannel implements Channel {
return this.writeBufferSource();
}

send(message: Uint8Array): void {
const writeBuffer = this.getWriteBuffer();
writeBuffer.writeBytes(message);
writeBuffer.commit();
}

close(): void {
this.closeHandler();
this.toDispose.dispose();
}
}

Expand All @@ -120,7 +118,7 @@ export enum MessageTypes {
* channel, so we rely on writers to the multiplexed channels to always commit their
* messages and always in one go.
*/
export class ChannelMultiplexer {
export class ChannelMultiplexer implements Disposable {
protected pendingOpen: Map<string, (channel: ForwardingChannel) => void> = new Map();
protected openChannels: Map<string, ForwardingChannel> = new Map();

Expand All @@ -129,10 +127,15 @@ export class ChannelMultiplexer {
return this.onOpenChannelEmitter.event;
}

protected toDispose = new DisposableCollection();

constructor(protected readonly underlyingChannel: Channel) {
this.underlyingChannel.onMessage(buffer => this.handleMessage(buffer()));
this.underlyingChannel.onClose(event => this.closeUnderlyingChannel(event));
this.underlyingChannel.onError(error => this.handleError(error));
this.toDispose.pushAll([
this.underlyingChannel.onMessage(buffer => this.handleMessage(buffer())),
this.underlyingChannel.onClose(event => this.onUnderlyingChannelClose(event)),
this.underlyingChannel.onError(error => this.handleError(error)),
this.onOpenChannelEmitter
]);
}

protected handleError(error: unknown): void {
Expand All @@ -141,14 +144,19 @@ export class ChannelMultiplexer {
});
}

closeUnderlyingChannel(event?: ChannelCloseEvent): void {

this.pendingOpen.clear();
this.openChannels.forEach(channel => {
channel.onCloseEmitter.fire(event ?? { reason: 'Multiplexer main channel has been closed from the remote side!' });
});
onUnderlyingChannelClose(event?: ChannelCloseEvent): void {
if (!this.toDispose.disposed) {
this.toDispose.push(Disposable.create(() => {
this.pendingOpen.clear();
this.openChannels.forEach(channel => {
channel.onCloseEmitter.fire(event ?? { reason: 'Multiplexer main channel has been closed from the remote side!' });
});

this.openChannels.clear();
}));
this.dispose();
}

this.openChannels.clear();
}

protected handleMessage(buffer: ReadBuffer): void {
Expand Down Expand Up @@ -244,5 +252,9 @@ export class ChannelMultiplexer {
getOpenChannel(id: string): Channel | undefined {
return this.openChannels.get(id);
}

dispose(): void {
this.toDispose.dispose();
}
}

19 changes: 12 additions & 7 deletions packages/core/src/common/message-rpc/rpc-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
/* eslint-disable @typescript-eslint/no-explicit-any */

import { CancellationToken, CancellationTokenSource } from '../cancellation';
import { DisposableCollection } from '../disposable';
import { Emitter, Event } from '../event';
import { Deferred } from '../promise-util';
import { Channel } from './channel';
Expand All @@ -40,6 +41,7 @@ export interface RpcProtocolOptions {
*/
decoder?: RpcMessageDecoder
}

/**
* Establish a bi-directional RPC protocol on top of a given channel. Bi-directional means to send
* sends requests and notifications to the remote side as well as receiving requests and notifications from the remote side.
Expand All @@ -64,12 +66,14 @@ export class RpcProtocol {
return this.onNotificationEmitter.event;
}

protected toDispose = new DisposableCollection();

constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler, options: RpcProtocolOptions = {}) {
this.encoder = options.encoder ?? new RpcMessageEncoder();
this.decoder = options.decoder ?? new RpcMessageDecoder();
const registration = channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer())));
channel.onClose(() => registration.dispose());

this.toDispose.push(this.onNotificationEmitter);
this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer()))));
channel.onClose(() => this.toDispose.dispose());
}

handleMessage(message: RpcMessage): void {
Expand Down Expand Up @@ -103,7 +107,7 @@ export class RpcProtocol {
this.pendingRequests.delete(id);
replyHandler.resolve(value);
} else {
console.warn(`reply: no handler for message: ${id}`);
throw new Error(`No reply handler for reply with id: ${id}`);
}
}

Expand All @@ -114,15 +118,14 @@ export class RpcProtocol {
this.pendingRequests.delete(id);
replyHandler.reject(error);
} else {
console.warn(`error: no handler for message: ${id}`);
throw new Error(`No reply handler for error reply with id: ${id}`);
}
} catch (err) {
throw err;
}
}

sendRequest<T>(method: string, args: any[]): Promise<T> {

const id = this.nextMessageId++;
const reply = new Deferred<T>();

Expand Down Expand Up @@ -176,7 +179,6 @@ export class RpcProtocol {
}

protected async handleRequest(id: number, method: string, args: any[]): Promise<void> {

const output = this.channel.getWriteBuffer();

// Check if the last argument of the received args is the key for indicating that a cancellation token should be used
Expand Down Expand Up @@ -207,6 +209,9 @@ export class RpcProtocol {
}

protected async handleNotify(id: number, method: string, args: any[]): Promise<void> {
if (this.toDispose.disposed) {
return;
}
this.onNotificationEmitter.fire({ method, args });
}
}
22 changes: 14 additions & 8 deletions packages/core/src/common/messaging/abstract-connection-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,16 @@ export abstract class AbstractConnectionProvider<AbstractOptions extends object>
return factory.createProxy();
}

protected channelMultiPlexer: ChannelMultiplexer;
protected channelMultiPlexer?: ChannelMultiplexer;

constructor() {
this.channelMultiPlexer = this.createMultiplexer();
}
// A set of channel opening functions that are executed if the backend reconnects to restore the
// the channels that were open before the disconnect occurred.
protected reconnectChannelOpeners: Array<() => Promise<void>> = [];

protected createMultiplexer(): ChannelMultiplexer {
return new ChannelMultiplexer(this.createMainChannel());
protected initializeMultiplexer(): void {
const mainChannel = this.createMainChannel();
mainChannel.onMessage(() => this.onIncomingMessageActivityEmitter.fire());
this.channelMultiPlexer = new ChannelMultiplexer(mainChannel);
}

/**
Expand All @@ -91,18 +93,22 @@ export abstract class AbstractConnectionProvider<AbstractOptions extends object>
}

async openChannel(path: string, handler: (channel: Channel) => void, options?: AbstractOptions): Promise<void> {
if (!this.channelMultiPlexer) {
throw new Error('The channel multiplexer has not been initialized yet!');
}
const newChannel = await this.channelMultiPlexer.open(path);
newChannel.onClose(() => {
const { reconnecting } = { reconnecting: true, ...options };
if (reconnecting) {
this.openChannel(path, handler, options);
this.reconnectChannelOpeners.push(() => this.openChannel(path, handler, options));
}
});

handler(newChannel);
}

/**
* Create the main connection that is used for multiplexing all channels.
* Create the main connection that is used for multiplexing all service channels.
*/
protected abstract createMainChannel(): Channel;

Expand Down
10 changes: 7 additions & 3 deletions packages/core/src/common/messaging/proxy-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,13 @@ export class JsonRpcProxyFactory<T extends object> implements ProxyHandler<T> {
this.connectionPromiseResolve = resolve
);
this.connectionPromise.then(connection => {
connection.channel.onClose(() =>
this.onDidCloseConnectionEmitter.fire(undefined)
);
connection.channel.onClose(() => {
this.onDidCloseConnectionEmitter.fire(undefined);
// Reset the connectionPromise to be in a clean state if the backed reconnects.
this.connectionPromise = new Promise(resolve =>
this.connectionPromiseResolve = resolve
);
});
this.onDidOpenConnectionEmitter.fire(undefined);
});
}
Expand Down
11 changes: 6 additions & 5 deletions packages/core/src/common/messaging/web-socket-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { Emitter, Event } from '../event';
import { WriteBuffer } from '../message-rpc';
import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../message-rpc/uint8-array-message-buffer';
import { Channel, MessageProvider, ChannelCloseEvent } from '../message-rpc/channel';
import { DisposableCollection } from '../disposable';

/**
* A channel that manages the main websocket connection between frontend and backend. All service channels
Expand All @@ -44,8 +45,12 @@ export class WebSocketChannel implements Channel {
return this.onErrorEmitter.event;
}

protected toDispose = new DisposableCollection();

constructor(protected readonly socket: IWebSocket) {
this.toDispose.pushAll([this.onCloseEmitter, this.onMessageEmitter, this.onErrorEmitter]);
socket.onClose((reason, code) => this.onCloseEmitter.fire({ reason, code }));
socket.onClose(() => this.close());
socket.onError(error => this.onErrorEmitter.fire(error));
// eslint-disable-next-line arrow-body-style
socket.onMessage(data => this.onMessageEmitter.fire(() => {
Expand All @@ -62,19 +67,15 @@ export class WebSocketChannel implements Channel {
result.onCommit(buffer => {
if (this.socket.isConnected()) {
this.socket.send(buffer);
} else {
console.warn('Could not send message. Websocket is not connected');
}
});

return result;
}

close(): void {
this.toDispose.dispose();
this.socket.close();
this.onCloseEmitter.dispose();
this.onMessageEmitter.dispose();
this.onErrorEmitter.dispose();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ export class ElectronIpcConnectionProvider extends AbstractConnectionProvider<El
return container.get(ElectronIpcConnectionProvider).createProxy<T>(path, arg);
}

constructor() {
super();
this.initializeMultiplexer();
}

protected createMainChannel(): Channel {
const onMessageEmitter = new Emitter<MessageProvider>();
ipcRenderer.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (_event: ElectronEvent, data: Uint8Array) => {
Expand All @@ -46,7 +51,6 @@ export class ElectronIpcConnectionProvider extends AbstractConnectionProvider<El
getWriteBuffer: () => {
const writer = new Uint8ArrayWriteBuffer();
writer.onCommit(buffer =>
// The ipcRenderer cannot handle ArrayBuffers directly=> we have to convert to Uint8Array.
ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer)
);
return writer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export class ElectronWebSocketConnectionProvider extends WebSocketConnectionProv
// Manually close the websocket connections `onStop`. Otherwise, the channels will be closed with 30 sec (`MessagingContribution#checkAliveTimeout`) delay.
// https://github.com/eclipse-theia/theia/issues/6499
// `1001` indicates that an endpoint is "going away", such as a server going down or a browser having navigated away from a page.
this.channelMultiPlexer.closeUnderlyingChannel({ reason: 'The frontend is "going away"', code: 1001 });
this.channelMultiPlexer?.onUnderlyingChannelClose({ reason: 'The frontend is "going away"', code: 1001 });
}

override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ export class ElectronMessagingContribution implements ElectronMainApplicationCon
}
});

sender.once('did-navigate', () => multiPlexer.closeUnderlyingChannel({ reason: 'Window was refreshed' })); // When refreshing the browser window.
sender.once('destroyed', () => multiPlexer.closeUnderlyingChannel({ reason: 'Window was closed' })); // When closing the browser window.
sender.once('did-navigate', () => multiPlexer.onUnderlyingChannelClose({ reason: 'Window was refreshed' })); // When refreshing the browser window.
sender.once('destroyed', () => multiPlexer.onUnderlyingChannelClose({ reason: 'Window was closed' })); // When closing the browser window.
const data = { channel: mainChannel, multiPlexer };
this.windowChannelMultiplexer.set(sender.id, data);
return data;
Expand Down
1 change: 0 additions & 1 deletion packages/core/src/node/messaging/ipc-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ export class IPCChannel implements Channel {
this.setupProcess();
}
this.messagePipe.onMessage(message => {
console.log(`IPChannel: fire on message with ${message.length}`);
this.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(message));
});
this.toDispose.pushAll([this.onCloseEmitter, this.onMessageEmitter, this.onErrorEmitter]);
Expand Down
1 change: 0 additions & 1 deletion packages/terminal/src/browser/terminal-widget-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import { terminalsPath } from '../common/terminal-protocol';
import { IBaseTerminalServer, TerminalProcessInfo } from '../common/base-terminal-protocol';
import { TerminalWatcher } from '../common/terminal-watcher';
import { TerminalWidgetOptions, TerminalWidget, TerminalDimensions, TerminalExitStatus } from './base/terminal-widget';
import { MessageConnection } from '@theia/core/shared/vscode-ws-jsonrpc';
import { Deferred } from '@theia/core/lib/common/promise-util';
import { TerminalPreferences, TerminalRendererType, isTerminalRendererType, DEFAULT_TERMINAL_RENDERER_TYPE, CursorStyle } from './terminal-preferences';
import { TerminalContribution } from './terminal-contribution';
Expand Down
Loading

0 comments on commit ea5e5a4

Please sign in to comment.