Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wip json rpc performance #10808

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
"**/@types/node": "12"
},
"devDependencies": {
"@types/chai": "4.3.0",
"@types/chai-spies": "1.0.3",
"@types/chai-string": "^1.4.0",
"@types/jsdom": "^11.0.4",
"@types/node": "12",
Expand All @@ -20,6 +22,8 @@
"@typescript-eslint/eslint-plugin": "^4.8.1",
"@typescript-eslint/eslint-plugin-tslint": "^4.8.1",
"@typescript-eslint/parser": "^4.8.1",
"chai": "4.3.4",
"chai-spies": "1.0.0",
"chai-string": "^1.4.0",
"chalk": "4.0.0",
"concurrently": "^3.5.0",
Expand Down
84 changes: 57 additions & 27 deletions packages/core/src/browser/messaging/ws-connection-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************

import { injectable, interfaces, decorate, unmanaged } from 'inversify';
import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event } from '../../common';
import { decorate, injectable, interfaces, unmanaged } from 'inversify';
import { io, Socket } from 'socket.io-client';
import { Emitter, Event, JsonRpcProxy, JsonRpcProxyFactory } from '../../common';
import { ArrayBufferReadBuffer, ArrayBufferWriteBuffer } from '../../common/message-rpc/array-buffer-message-buffer';
import { Channel, ReadBufferConstructor } from '../../common/message-rpc/channel';
import { WriteBuffer } from '../../common/message-rpc/message-buffer';
import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider';
import { WebSocketChannel } from '../../common/messaging/web-socket-channel';
import { Endpoint } from '../endpoint';
import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider';
import { io, Socket } from 'socket.io-client';

decorate(injectable(), JsonRpcProxyFactory);
decorate(unmanaged(), JsonRpcProxyFactory, 0);
Expand All @@ -35,6 +38,8 @@ export interface WebSocketOptions {
export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebSocketOptions> {

protected readonly onSocketDidOpenEmitter: Emitter<void> = new Emitter();
// Socket that is used by the main channel
protected socket: Socket;
get onSocketDidOpen(): Event<void> {
return this.onSocketDidOpenEmitter.event;
}
Expand All @@ -48,31 +53,23 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
return container.get(WebSocketConnectionProvider).createProxy<T>(path, arg);
}

protected readonly socket: Socket;

constructor() {
super();
protected createMainChannel(): Channel {
const url = this.createWebSocketUrl(WebSocketChannel.wsPath);
const socket = this.createWebSocket(url);
const channel = new SocketIOChannel(socket);
socket.on('connect', () => {
this.fireSocketDidOpen();
});
socket.on('disconnect', reason => {
for (const channel of [...this.channels.values()]) {
channel.close(undefined, reason);
}
this.fireSocketDidClose();
});
socket.on('message', data => {
this.handleIncomingRawMessage(data);
});
channel.onClose(() => this.fireSocketDidClose());
socket.connect();
this.socket = socket;

return channel;
}

override openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: WebSocketOptions): void {
override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise<void> {
if (this.socket.connected) {
super.openChannel(path, handler, options);
return super.openChannel(path, handler, options);
} else {
const openChannel = () => {
this.socket.off('connect', openChannel);
Expand All @@ -82,14 +79,6 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
}
}

protected createChannel(id: number): WebSocketChannel {
return new WebSocketChannel(id, content => {
if (this.socket.connected) {
this.socket.send(content);
}
});
}

/**
* Creates a websocket URL to the current location
*/
Expand Down Expand Up @@ -128,3 +117,44 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
this.onSocketDidCloseEmitter.fire(undefined);
}
}

export class SocketIOChannel implements Channel {
protected readonly onCloseEmitter: Emitter<void> = new Emitter();
get onClose(): Event<void> {
return this.onCloseEmitter.event;
}

protected readonly onMessageEmitter: Emitter<ReadBufferConstructor> = new Emitter();
get onMessage(): Event<ReadBufferConstructor> {
return this.onMessageEmitter.event;
}

protected readonly onErrorEmitter: Emitter<unknown> = new Emitter();
get onError(): Event<unknown> {
return this.onErrorEmitter.event;
}

readonly id: string;

constructor(protected readonly socket: Socket) {
socket.on('error', error => this.onErrorEmitter.fire(error));
socket.on('disconnect', reason => this.onCloseEmitter.fire());
socket.on('message', buffer => this.onMessageEmitter.fire(() => new ArrayBufferReadBuffer(buffer)));
this.id = socket.id;
}

getWriteBuffer(): WriteBuffer {
const result = new ArrayBufferWriteBuffer();
if (this.socket.connected) {
result.onCommit(buffer => {
this.socket.emit('message', buffer);
});
}
return result;
}

close(): void {
this.socket.close();
}

}
10 changes: 10 additions & 0 deletions packages/core/src/common/message-rpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# message-rpc

An attempt to rewrite the theia RPC infrastructure with a couple of changes:

1. "Zero-copy" message writing and reading
2. Support for binary buffers without ever encoding them
3. Separate RPC server from RPC client
4. Use a unified "Channel" interface

A lot of this code is more or less copied from the current Theia code.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// *****************************************************************************
// Copyright (C) 2021 Red Hat, Inc. and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0.
//
// This Source Code may also be made available under the following Secondary
// Licenses when the conditions for such availability set forth in the Eclipse
// Public License v. 2.0 are satisfied: GNU General Public License, version 2
// with the GNU Classpath Exception which is available at
// https://www.gnu.org/software/classpath/license.html.
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
import { expect } from 'chai';
import { ArrayBufferReadBuffer, ArrayBufferWriteBuffer } from './array-buffer-message-buffer';

describe('array message buffer tests', () => {
it('basic read write test', () => {
const buffer = new ArrayBuffer(1024);
const writer = new ArrayBufferWriteBuffer(buffer);

writer.writeByte(8);
writer.writeInt(10000);
writer.writeBytes(new Uint8Array([1, 2, 3, 4]));
writer.writeString('this is a string');
writer.writeString('another string');
writer.commit();

const written = writer.getCurrentContents();

const reader = new ArrayBufferReadBuffer(written);

expect(reader.readByte()).equal(8);
expect(reader.readInt()).equal(10000);
expect(reader.readBytes()).deep.equal(new Uint8Array([1, 2, 3, 4]).buffer);
expect(reader.readString()).equal('this is a string');
expect(reader.readString()).equal('another string');
});
});
150 changes: 150 additions & 0 deletions packages/core/src/common/message-rpc/array-buffer-message-buffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// *****************************************************************************
// Copyright (C) 2021 Red Hat, Inc. and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0.
//
// This Source Code may also be made available under the following Secondary
// Licenses when the conditions for such availability set forth in the Eclipse
// Public License v. 2.0 are satisfied: GNU General Public License, version 2
// with the GNU Classpath Exception which is available at
// https://www.gnu.org/software/classpath/license.html.
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
import { Emitter, Event } from '../event';
import { ReadBuffer, WriteBuffer } from './message-buffer';

/**
* Converts the given node {@link Buffer} to an {@link ArrayBuffer}. The node buffer implementation is backed by an `Uint8Array`
* so the conversion can be efficiently achieved by slicing the section that is represented by the `Buffer` from the underlying
* array buffer.
* @param buffer The buffer that should be converted.
* @returns an `ArrayBuffer`representation of the given buffer.
*/
export function toArrayBuffer(buffer: Buffer | ArrayBuffer): ArrayBuffer {
if (buffer instanceof ArrayBuffer) {
return buffer;
}
if (buffer.byteOffset === 0 && buffer.byteLength === buffer.buffer.byteLength) {
return buffer.buffer;
}
return buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength);
}

export class ArrayBufferWriteBuffer implements WriteBuffer {
constructor(private buffer: ArrayBuffer = new ArrayBuffer(1024), private offset: number = 0) {
}

private get msg(): DataView {
return new DataView(this.buffer);
}

ensureCapacity(value: number): WriteBuffer {
let newLength = this.buffer.byteLength;
while (newLength < this.offset + value) {
newLength *= 2;
}
if (newLength !== this.buffer.byteLength) {
const newBuffer = new ArrayBuffer(newLength);
new Uint8Array(newBuffer).set(new Uint8Array(this.buffer));
this.buffer = newBuffer;
}
return this;
}

writeByte(value: number): WriteBuffer {
this.ensureCapacity(1);
this.msg.setUint8(this.offset++, value);
return this;
}

writeInt(value: number): WriteBuffer {
this.ensureCapacity(4);
this.msg.setUint32(this.offset, value);
this.offset += 4;
return this;
}

writeString(value: string): WriteBuffer {
const encoded = this.encodeString(value);
this.writeBytes(encoded);
return this;
}

private encodeString(value: string): Uint8Array {
return new TextEncoder().encode(value);
}

writeBytes(value: ArrayBuffer): WriteBuffer {
this.ensureCapacity(value.byteLength + 4);
this.writeInt(value.byteLength);
new Uint8Array(this.buffer).set(new Uint8Array(value), this.offset);
this.offset += value.byteLength;
return this;
}

private onCommitEmitter = new Emitter<ArrayBuffer>();
get onCommit(): Event<ArrayBuffer> {
return this.onCommitEmitter.event;
}

commit(): void {
this.onCommitEmitter.fire(this.getCurrentContents());
}

getCurrentContents(): ArrayBuffer {
return this.buffer.slice(0, this.offset);
}
}

export class ArrayBufferReadBuffer implements ReadBuffer {
private offset: number = 0;

constructor(private readonly buffer: ArrayBuffer, readPosition = 0) {
this.offset = readPosition;
}

private get msg(): DataView {
return new DataView(this.buffer);
}

readByte(): number {
return this.msg.getUint8(this.offset++);
}

readInt(): number {
try {
const result = this.msg.getInt32(this.offset);
this.offset += 4;
return result;
} catch (err) {
throw err;
}
}

readString(): string {
const len = this.msg.getUint32(this.offset);
this.offset += 4;
const result = this.decodeString(this.buffer.slice(this.offset, this.offset + len));
this.offset += len;
return result;
}

private decodeString(buf: ArrayBuffer): string {
return new TextDecoder().decode(buf);
}

readBytes(): ArrayBuffer {
const length = this.msg.getUint32(this.offset);
this.offset += 4;
const result = this.buffer.slice(this.offset, this.offset + length);
this.offset += length;
return result;
}

copy(): ReadBuffer {
return new ArrayBufferReadBuffer(this.buffer, this.offset);
}
}
Loading