Skip to content

Commit

Permalink
[WIP] Improve message encoding and cleanup integration
Browse files Browse the repository at this point in the history
- Refactor the message encoders of the new message-rpc protocol to improve overall performance. 
- Align main websocket creation: Implement `ẀebSocketMainChannel` class which is responsible for establishing the main websocket connection between frontend and backend. Use a `IWebSocket` wrapper so that the class can be reused in the frontend and backend runtimes and is independent of the actual library used for creating the websocket.
- Improve rpc protocol error handling. In particular ensure that remote call errors are properly decoded to `ResponseErrors`.
- Ensure that the `RemoteFileSystemProvider` API uses  Uint8Arrays over plain number arrays. This enables direct serialization as buffers and reduces the overhead  of unnecessarily converting from and to Uint8 arrays.
- Refactor terminal widget and terminal backend contribution so that the widgets communicates with the underlying terminal process using the new rpc protocol.
- Rework the IPC bootstrap protocol so that it uses a binary pipe for message transmission instead of the `ipc` pipe which only supports string encoding.

Note: This is a WIP PR that is not ready to be reviewed yet as some concepts are not cleaned up yet or fully implemented (e.g. electron support & debug adapter protocol support)  It's intention is to present the current state of #10684 to interested parties.

Contributed on behalf of STMicroelectronics.
  • Loading branch information
tortmayr committed Apr 1, 2022
1 parent cedee25 commit ff7f4f8
Show file tree
Hide file tree
Showing 31 changed files with 689 additions and 873 deletions.
57 changes: 13 additions & 44 deletions packages/core/src/browser/messaging/ws-connection-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
import { decorate, injectable, interfaces, unmanaged } from 'inversify';
import { io, Socket } from 'socket.io-client';
import { Emitter, Event, JsonRpcProxy, JsonRpcProxyFactory } from '../../common';
import { ArrayBufferReadBuffer, ArrayBufferWriteBuffer } from '../../common/message-rpc/array-buffer-message-buffer';
import { Channel, ReadBufferConstructor } from '../../common/message-rpc/channel';
import { WriteBuffer } from '../../common/message-rpc/message-buffer';
import { Channel } from '../../common/message-rpc/channel';
import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider';
import { WebSocketChannel } from '../../common/messaging/web-socket-channel';
import { IWebSocket, WebSocketChannel, WebSocketMainChannel } from '../../common/messaging/web-socket-channel';
import { Endpoint } from '../endpoint';

decorate(injectable(), JsonRpcProxyFactory);
Expand Down Expand Up @@ -56,7 +54,7 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
protected createMainChannel(): Channel {
const url = this.createWebSocketUrl(WebSocketChannel.wsPath);
const socket = this.createWebSocket(url);
const channel = new SocketIOChannel(socket);
const channel = new WebSocketMainChannel(toIWebSocket(socket));
socket.on('connect', () => {
this.fireSocketDidOpen();
});
Expand Down Expand Up @@ -133,43 +131,14 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
}
}

export class SocketIOChannel implements Channel {
protected readonly onCloseEmitter: Emitter<void> = new Emitter();
get onClose(): Event<void> {
return this.onCloseEmitter.event;
}

protected readonly onMessageEmitter: Emitter<ReadBufferConstructor> = new Emitter();
get onMessage(): Event<ReadBufferConstructor> {
return this.onMessageEmitter.event;
}

protected readonly onErrorEmitter: Emitter<unknown> = new Emitter();
get onError(): Event<unknown> {
return this.onErrorEmitter.event;
}

readonly id: string;

constructor(protected readonly socket: Socket) {
socket.on('error', error => this.onErrorEmitter.fire(error));
socket.on('disconnect', reason => this.onCloseEmitter.fire());
socket.on('message', buffer => this.onMessageEmitter.fire(() => new ArrayBufferReadBuffer(buffer)));
this.id = socket.id;
}

getWriteBuffer(): WriteBuffer {
const result = new ArrayBufferWriteBuffer();
if (this.socket.connected) {
result.onCommit(buffer => {
this.socket.emit('message', buffer);
});
}
return result;
}

close(): void {
this.socket.close();
}

function toIWebSocket(socket: Socket): IWebSocket {
return {
close: () => socket.close(),
isConnected: () => socket.connected,
onClose: cb => socket.on('disconnect', () => cb()),
onError: cb => socket.on('error', cb),
onMessage: cb => socket.on('message', data => cb(data)),
send: message => socket.emit('message', message)
};
}

1 change: 1 addition & 0 deletions packages/core/src/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export * from './contribution-provider';
export * from './path';
export * from './logger';
export * from './messaging';
export * from './message-rpc';
export * from './message-service';
export * from './message-service-protocol';
export * from './progress-service';
Expand Down
10 changes: 0 additions & 10 deletions packages/core/src/common/message-rpc/README.md

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ describe('array message buffer tests', () => {
const buffer = new ArrayBuffer(1024);
const writer = new ArrayBufferWriteBuffer(buffer);

writer.writeByte(8);
writer.writeInt(10000);
writer.writeUint8(8);
writer.writeUint32(10000);
writer.writeBytes(new Uint8Array([1, 2, 3, 4]));
writer.writeString('this is a string');
writer.writeString('another string');
Expand All @@ -32,8 +32,8 @@ describe('array message buffer tests', () => {

const reader = new ArrayBufferReadBuffer(written);

expect(reader.readByte()).equal(8);
expect(reader.readInt()).equal(10000);
expect(reader.readUint8()).equal(8);
expect(reader.readUint32()).equal(10000);
expect(reader.readBytes()).deep.equal(new Uint8Array([1, 2, 3, 4]).buffer);
expect(reader.readString()).equal('this is a string');
expect(reader.readString()).equal('another string');
Expand Down
107 changes: 72 additions & 35 deletions packages/core/src/common/message-rpc/array-buffer-message-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,7 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
import { Emitter, Event } from '../event';
import { ReadBuffer, WriteBuffer } from './message-buffer';

/**
* Converts the given node {@link Buffer} to an {@link ArrayBuffer}. The node buffer implementation is backed by an `Uint8Array`
* so the conversion can be efficiently achieved by slicing the section that is represented by the `Buffer` from the underlying
* array buffer.
* @param buffer The buffer that should be converted.
* @returns an `ArrayBuffer`representation of the given buffer.
*/
export function toArrayBuffer(buffer: Buffer | ArrayBuffer): ArrayBuffer {
if (buffer instanceof ArrayBuffer) {
return buffer;
}
if (buffer.byteOffset === 0 && buffer.byteLength === buffer.buffer.byteLength) {
return buffer.buffer;
}
return buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength);
}
import { getUintType, UintType, ReadBuffer, WriteBuffer } from './message-buffer';

export class ArrayBufferWriteBuffer implements WriteBuffer {
constructor(private buffer: ArrayBuffer = new ArrayBuffer(1024), private offset: number = 0) {
Expand All @@ -54,19 +37,42 @@ export class ArrayBufferWriteBuffer implements WriteBuffer {
return this;
}

writeByte(value: number): WriteBuffer {
writeUint8(value: number): WriteBuffer {
this.ensureCapacity(1);
this.msg.setUint8(this.offset++, value);
return this;
}

writeInt(value: number): WriteBuffer {
writeUint16(value: number): WriteBuffer {
this.ensureCapacity(2);
this.msg.setUint16(this.offset, value);
this.offset += 2;
return this;
}

writeUint32(value: number): WriteBuffer {
this.ensureCapacity(4);
this.msg.setUint32(this.offset, value);
this.offset += 4;
return this;
}

writeInteger(value: number): WriteBuffer {
const type = getUintType(value);
this.writeUint8(type);
switch (type) {
case UintType.Uint8:
this.writeUint8(value);
break;
case UintType.Uint16:
this.writeUint16(value);
break;
default:
this.writeUint32(value);
}
return this;
}

writeString(value: string): WriteBuffer {
const encoded = this.encodeString(value);
this.writeBytes(encoded);
Expand All @@ -78,8 +84,8 @@ export class ArrayBufferWriteBuffer implements WriteBuffer {
}

writeBytes(value: ArrayBuffer): WriteBuffer {
this.ensureCapacity(value.byteLength + 4);
this.writeInt(value.byteLength);
this.writeInteger(value.byteLength);
this.ensureCapacity(value.byteLength);
new Uint8Array(this.buffer).set(new Uint8Array(value), this.offset);
this.offset += value.byteLength;
return this;
Expand All @@ -96,6 +102,7 @@ export class ArrayBufferWriteBuffer implements WriteBuffer {

getCurrentContents(): ArrayBuffer {
return this.buffer.slice(0, this.offset);

}
}

Expand All @@ -110,23 +117,36 @@ export class ArrayBufferReadBuffer implements ReadBuffer {
return new DataView(this.buffer);
}

readByte(): number {
readUint8(): number {
return this.msg.getUint8(this.offset++);
}

readInt(): number {
try {
const result = this.msg.getInt32(this.offset);
this.offset += 4;
return result;
} catch (err) {
throw err;
readUint16(): number {
const result = this.msg.getUint16(this.offset);
this.offset += 2;
return result;
}

readUint32(): number {
const result = this.msg.getInt32(this.offset);
this.offset += 4;
return result;
}

readInteger(): number {
const type = this.readUint8();
switch (type) {
case UintType.Uint8:
return this.readUint8();
case UintType.Uint16:
return this.readUint16();
default:
return this.readUint32();
}
}

readString(): string {
const len = this.msg.getUint32(this.offset);
this.offset += 4;
const len = this.readInteger();
const result = this.decodeString(this.buffer.slice(this.offset, this.offset + len));
this.offset += len;
return result;
Expand All @@ -137,14 +157,31 @@ export class ArrayBufferReadBuffer implements ReadBuffer {
}

readBytes(): ArrayBuffer {
const length = this.msg.getUint32(this.offset);
this.offset += 4;
const length = this.readInteger();
const result = this.buffer.slice(this.offset, this.offset + length);
this.offset += length;
return result;
}

copy(): ReadBuffer {
sliceAtCurrentPosition(): ReadBuffer {
return new ArrayBufferReadBuffer(this.buffer, this.offset);
}
}

/**
* Retrieve an {@link ArrayBuffer} view for the given buffer. Some {@link Uint8Array} buffer implementations e.g node's {@link Buffer}
* are using shared memory array buffers under the hood. Therefore we need to check the buffers `byteOffset` and `length` and slice
* the underlying array buffer if needed.
* @param buffer The Uint8Array or ArrayBuffer that should be converted.
* @returns a trimmed `ArrayBuffer` representation for the given buffer.
*/
export function toArrayBuffer(buffer: Uint8Array | ArrayBuffer): ArrayBuffer {
if (buffer instanceof ArrayBuffer) {
return buffer;
}
if (buffer.byteOffset === 0 && buffer.byteLength === buffer.buffer.byteLength) {
return buffer.buffer;
}

return buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength);
}
6 changes: 3 additions & 3 deletions packages/core/src/common/message-rpc/channel.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// *****************************************************************************
import { assert, expect, spy, use } from 'chai';
import * as spies from 'chai-spies';
import { ChannelMultiplexer, ChannelPipe, ReadBufferConstructor } from './channel';
import { ChannelMultiplexer, ChannelPipe, ReadBufferFactory } from './channel';

use(spies);

Expand All @@ -40,14 +40,14 @@ describe('multiplexer test', () => {
assert.isNotNull(rightFirst);
assert.isNotNull(rightSecond);

const leftSecondSpy = spy((buf: ReadBufferConstructor) => {
const leftSecondSpy = spy((buf: ReadBufferFactory) => {
const message = buf().readString();
expect(message).equal('message for second');
});

leftSecond.onMessage(leftSecondSpy);

const rightFirstSpy = spy((buf: ReadBufferConstructor) => {
const rightFirstSpy = spy((buf: ReadBufferFactory) => {
const message = buf().readString();
expect(message).equal('message for first');
});
Expand Down
Loading

0 comments on commit ff7f4f8

Please sign in to comment.