Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Integrate new message-rpc prototype into core messaging API (extensions) #10809

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
"**/@types/node": "12"
},
"devDependencies": {
"@types/chai": "4.3.0",
"@types/chai-spies": "1.0.3",
"@types/chai-string": "^1.4.0",
"@types/jsdom": "^11.0.4",
"@types/node": "12",
Expand All @@ -20,6 +22,8 @@
"@typescript-eslint/eslint-plugin": "^4.8.1",
"@typescript-eslint/eslint-plugin-tslint": "^4.8.1",
"@typescript-eslint/parser": "^4.8.1",
"chai": "4.3.4",
"chai-spies": "1.0.0",
"chai-string": "^1.4.0",
"chalk": "4.0.0",
"concurrently": "^3.5.0",
Expand Down
55 changes: 27 additions & 28 deletions packages/core/src/browser/messaging/ws-connection-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************

import { injectable, interfaces, decorate, unmanaged } from 'inversify';
import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event } from '../../common';
import { WebSocketChannel } from '../../common/messaging/web-socket-channel';
import { Endpoint } from '../endpoint';
import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider';
import { decorate, injectable, interfaces, unmanaged } from 'inversify';
import { io, Socket } from 'socket.io-client';
import { Emitter, Event, JsonRpcProxy, JsonRpcProxyFactory } from '../../common';
import { Channel } from '../../common/message-rpc/channel';
import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider';
import { IWebSocket, WebSocketChannel, WebSocketMainChannel } from '../../common/messaging/web-socket-channel';
import { Endpoint } from '../endpoint';

decorate(injectable(), JsonRpcProxyFactory);
decorate(unmanaged(), JsonRpcProxyFactory, 0);
Expand All @@ -35,6 +36,8 @@ export interface WebSocketOptions {
export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebSocketOptions> {

protected readonly onSocketDidOpenEmitter: Emitter<void> = new Emitter();
// Socket that is used by the main channel
protected socket: Socket;
get onSocketDidOpen(): Event<void> {
return this.onSocketDidOpenEmitter.event;
}
Expand All @@ -48,31 +51,23 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
return container.get(WebSocketConnectionProvider).createProxy<T>(path, arg);
}

protected readonly socket: Socket;

constructor() {
super();
protected createMainChannel(): Channel {
const url = this.createWebSocketUrl(WebSocketChannel.wsPath);
const socket = this.createWebSocket(url);
const channel = new WebSocketMainChannel(toIWebSocket(socket));
socket.on('connect', () => {
this.fireSocketDidOpen();
});
socket.on('disconnect', reason => {
for (const channel of [...this.channels.values()]) {
channel.close(undefined, reason);
}
this.fireSocketDidClose();
});
socket.on('message', data => {
this.handleIncomingRawMessage(data);
});
channel.onClose(() => this.fireSocketDidClose());
socket.connect();
this.socket = socket;

return channel;
}

override openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: WebSocketOptions): void {
override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise<void> {
if (this.socket.connected) {
super.openChannel(path, handler, options);
return super.openChannel(path, handler, options);
} else {
const openChannel = () => {
this.socket.off('connect', openChannel);
Expand All @@ -82,14 +77,6 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
}
}

protected createChannel(id: number): WebSocketChannel {
return new WebSocketChannel(id, content => {
if (this.socket.connected) {
this.socket.send(content);
}
});
}

/**
* @param path The handler to reach in the backend.
*/
Expand Down Expand Up @@ -143,3 +130,15 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
this.onSocketDidCloseEmitter.fire(undefined);
}
}

function toIWebSocket(socket: Socket): IWebSocket {
return {
close: () => socket.close(),
isConnected: () => socket.connected,
onClose: cb => socket.on('disconnect', () => cb()),
onError: cb => socket.on('error', cb),
onMessage: cb => socket.on('message', data => cb(data)),
send: message => socket.emit('message', message)
};
}

1 change: 1 addition & 0 deletions packages/core/src/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export * from './contribution-provider';
export * from './path';
export * from './logger';
export * from './messaging';
export * from './message-rpc';
export * from './message-service';
export * from './message-service-protocol';
export * from './progress-service';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// *****************************************************************************
// Copyright (C) 2021 Red Hat, Inc. and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0.
//
// This Source Code may also be made available under the following Secondary
// Licenses when the conditions for such availability set forth in the Eclipse
// Public License v. 2.0 are satisfied: GNU General Public License, version 2
// with the GNU Classpath Exception which is available at
// https://www.gnu.org/software/classpath/license.html.
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
import { expect } from 'chai';
import { ArrayBufferReadBuffer, ArrayBufferWriteBuffer } from './array-buffer-message-buffer';

describe('array message buffer tests', () => {
it('basic read write test', () => {
const buffer = new ArrayBuffer(1024);
const writer = new ArrayBufferWriteBuffer(buffer);

writer.writeUint8(8);
writer.writeUint32(10000);
writer.writeBytes(new Uint8Array([1, 2, 3, 4]));
writer.writeString('this is a string');
writer.writeString('another string');
writer.commit();

const written = writer.getCurrentContents();

const reader = new ArrayBufferReadBuffer(written);

expect(reader.readUint8()).equal(8);
expect(reader.readUint32()).equal(10000);
expect(reader.readBytes()).deep.equal(new Uint8Array([1, 2, 3, 4]).buffer);
expect(reader.readString()).equal('this is a string');
expect(reader.readString()).equal('another string');
});
});
187 changes: 187 additions & 0 deletions packages/core/src/common/message-rpc/array-buffer-message-buffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// *****************************************************************************
// Copyright (C) 2021 Red Hat, Inc. and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0.
//
// This Source Code may also be made available under the following Secondary
// Licenses when the conditions for such availability set forth in the Eclipse
// Public License v. 2.0 are satisfied: GNU General Public License, version 2
// with the GNU Classpath Exception which is available at
// https://www.gnu.org/software/classpath/license.html.
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
import { Emitter, Event } from '../event';
import { getUintType, UintType, ReadBuffer, WriteBuffer } from './message-buffer';

export class ArrayBufferWriteBuffer implements WriteBuffer {
constructor(private buffer: ArrayBuffer = new ArrayBuffer(1024), private offset: number = 0) {
}

private get msg(): DataView {
return new DataView(this.buffer);
}

ensureCapacity(value: number): WriteBuffer {
let newLength = this.buffer.byteLength;
while (newLength < this.offset + value) {
newLength *= 2;
}
if (newLength !== this.buffer.byteLength) {
const newBuffer = new ArrayBuffer(newLength);
new Uint8Array(newBuffer).set(new Uint8Array(this.buffer));
this.buffer = newBuffer;
}
return this;
}

writeUint8(value: number): WriteBuffer {
this.ensureCapacity(1);
this.msg.setUint8(this.offset++, value);
return this;
}

writeUint16(value: number): WriteBuffer {
this.ensureCapacity(2);
this.msg.setUint16(this.offset, value);
this.offset += 2;
return this;
}

writeUint32(value: number): WriteBuffer {
this.ensureCapacity(4);
this.msg.setUint32(this.offset, value);
this.offset += 4;
return this;
}

writeInteger(value: number): WriteBuffer {
const type = getUintType(value);
this.writeUint8(type);
switch (type) {
case UintType.Uint8:
this.writeUint8(value);
break;
case UintType.Uint16:
this.writeUint16(value);
break;
default:
this.writeUint32(value);
}
return this;
}

writeString(value: string): WriteBuffer {
const encoded = this.encodeString(value);
this.writeBytes(encoded);
return this;
}

private encodeString(value: string): Uint8Array {
return new TextEncoder().encode(value);
}

writeBytes(value: ArrayBuffer): WriteBuffer {
this.writeInteger(value.byteLength);
this.ensureCapacity(value.byteLength);
new Uint8Array(this.buffer).set(new Uint8Array(value), this.offset);
this.offset += value.byteLength;
return this;
}

private onCommitEmitter = new Emitter<ArrayBuffer>();
get onCommit(): Event<ArrayBuffer> {
return this.onCommitEmitter.event;
}

commit(): void {
this.onCommitEmitter.fire(this.getCurrentContents());
}

getCurrentContents(): ArrayBuffer {
return this.buffer.slice(0, this.offset);

}
}

export class ArrayBufferReadBuffer implements ReadBuffer {
private offset: number = 0;

constructor(private readonly buffer: ArrayBuffer, readPosition = 0) {
this.offset = readPosition;
}

private get msg(): DataView {
return new DataView(this.buffer);
}

readUint8(): number {
return this.msg.getUint8(this.offset++);
}

readUint16(): number {
const result = this.msg.getUint16(this.offset);
this.offset += 2;
return result;
}

readUint32(): number {
const result = this.msg.getInt32(this.offset);
this.offset += 4;
return result;
}

readInteger(): number {
const type = this.readUint8();
switch (type) {
case UintType.Uint8:
return this.readUint8();
case UintType.Uint16:
return this.readUint16();
default:
return this.readUint32();
}
}

readString(): string {
const len = this.readInteger();
const result = this.decodeString(this.buffer.slice(this.offset, this.offset + len));
this.offset += len;
return result;
}

private decodeString(buf: ArrayBuffer): string {
return new TextDecoder().decode(buf);
}

readBytes(): ArrayBuffer {
const length = this.readInteger();
const result = this.buffer.slice(this.offset, this.offset + length);
this.offset += length;
return result;
}

sliceAtCurrentPosition(): ReadBuffer {
return new ArrayBufferReadBuffer(this.buffer, this.offset);
}
}

/**
* Retrieve an {@link ArrayBuffer} view for the given buffer. Some {@link Uint8Array} buffer implementations e.g node's {@link Buffer}
* are using shared memory array buffers under the hood. Therefore we need to check the buffers `byteOffset` and `length` and slice
* the underlying array buffer if needed.
* @param buffer The Uint8Array or ArrayBuffer that should be converted.
* @returns a trimmed `ArrayBuffer` representation for the given buffer.
*/
export function toArrayBuffer(buffer: Uint8Array | ArrayBuffer): ArrayBuffer {
if (buffer instanceof ArrayBuffer) {
return buffer;
}
if (buffer.byteOffset === 0 && buffer.byteLength === buffer.buffer.byteLength) {
return buffer.buffer;
}

return buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength);
}
Loading