Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protoype of a faster RPC protocol #10781

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions packages/core/src/common/message-rpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# message-rpc

An attempt to rewrite the theia RPC infrastructure with a couple of changes:

1. "Zero-copy" message writing and reading
2. Support for binary buffers without ever encoding them
3. Separate RPC server from RPC client
4. Use a unified "Channel" interface

A lot of this code is more or less copied from the current Theia code.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/********************************************************************************
* Copyright (C) 2021 Red Hat, Inc. and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License v. 2.0 are satisfied: GNU General Public License, version 2
* with the GNU Classpath Exception which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/
import { expect } from 'chai';
import { ArrayBufferReadBuffer, ArrrayBufferWriteBuffer } from './array-buffer-message-buffer';

describe('array message buffer tests', () => {
it('basic read write test', () => {
const buffer = new ArrayBuffer(1024);
const writer = new ArrrayBufferWriteBuffer(buffer);

writer.writeByte(8);
writer.writeInt(10000);
writer.writeBytes(new Uint8Array([1, 2, 3, 4]));
writer.writeString('this is a string');
writer.writeString('another string');
writer.commit();

const written = writer.getCurrentContents();

const reader = new ArrayBufferReadBuffer(written);

expect(reader.readByte()).equal(8);
expect(reader.readInt()).equal(10000);
expect(reader.readBytes()).deep.equal(new Uint8Array([1, 2, 3, 4]).buffer);
expect(reader.readString()).equal('this is a string');
expect(reader.readString()).equal('another string');
});
});
124 changes: 124 additions & 0 deletions packages/core/src/common/message-rpc/array-buffer-message-buffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/********************************************************************************
* Copyright (C) 2021 Red Hat, Inc. and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License v. 2.0 are satisfied: GNU General Public License, version 2
* with the GNU Classpath Exception which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/
import { Emitter, Event } from '../event';
import { ReadBuffer, WriteBuffer } from './message-buffer';

export class ArrrayBufferWriteBuffer implements WriteBuffer {
constructor(private buffer: ArrayBuffer = new ArrayBuffer(1024), private offset: number = 0) {
}

private get msg(): DataView {
return new DataView(this.buffer);
}

ensureCapacity(value: number): WriteBuffer {
let newLength = this.buffer.byteLength;
while (newLength < this.offset + value) {
newLength *= 2;
}
if (newLength !== this.buffer.byteLength) {
const newBuffer = new ArrayBuffer(newLength);
new Uint8Array(newBuffer).set(new Uint8Array(this.buffer));
this.buffer = newBuffer;
}
return this;
}

writeByte(value: number): WriteBuffer {
this.ensureCapacity(1);
this.msg.setUint8(this.offset++, value);
return this;
}

writeInt(value: number): WriteBuffer {
this.ensureCapacity(4);
this.msg.setUint32(this.offset, value);
this.offset += 4;
return this;
}

writeString(value: string): WriteBuffer {
const encoded = this.encodeString(value);
this.writeBytes(encoded);
return this;
}

private encodeString(value: string): Uint8Array {
return new TextEncoder().encode(value);
}

writeBytes(value: ArrayBuffer): WriteBuffer {
this.ensureCapacity(value.byteLength + 4);
this.writeInt(value.byteLength);
new Uint8Array(this.buffer).set(new Uint8Array(value), this.offset);
this.offset += value.byteLength;
return this;
}

private onCommitEmitter = new Emitter<ArrayBuffer>();
get onCommit(): Event<ArrayBuffer> {
return this.onCommitEmitter.event;
}

commit(): void {
this.onCommitEmitter.fire(this.getCurrentContents());
}

getCurrentContents(): ArrayBuffer {
return this.buffer.slice(0, this.offset);
}
}

export class ArrayBufferReadBuffer implements ReadBuffer {
private offset: number = 0;

constructor(private readonly buffer: ArrayBuffer) {
}

private get msg(): DataView {
return new DataView(this.buffer);
}

readByte(): number {
return this.msg.getUint8(this.offset++);
}

readInt(): number {
const result = this.msg.getInt32(this.offset);
this.offset += 4;
return result;
}

readString(): string {
const len = this.msg.getUint32(this.offset);
this.offset += 4;
const result = this.decodeString(this.buffer.slice(this.offset, this.offset + len));
this.offset += len;
return result;
}

private decodeString(buf: ArrayBuffer): string {
return new TextDecoder().decode(buf);
}

readBytes(): ArrayBuffer {
const length = this.msg.getUint32(this.offset);
this.offset += 4;
const result = this.buffer.slice(this.offset, this.offset + length);
this.offset += length;
return result;
}
}
67 changes: 67 additions & 0 deletions packages/core/src/common/message-rpc/channel.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/********************************************************************************
* Copyright (C) 2021 Red Hat, Inc. and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License v. 2.0 are satisfied: GNU General Public License, version 2
* with the GNU Classpath Exception which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/
import { assert, expect, spy, use } from 'chai';
import * as spies from 'chai-spies';

import { ChannelMultiplexer, ChannelPipe } from './channel';
import { ReadBuffer } from './message-buffer';

use(spies);

describe('multiplexer test', () => {
it('multiplex message', async () => {
const pipe = new ChannelPipe();

const leftMultiplexer = new ChannelMultiplexer(pipe.left);
const rightMultiplexer = new ChannelMultiplexer(pipe.right);
const openChannelSpy = spy(() => {
});

rightMultiplexer.onDidOpenChannel(openChannelSpy);
leftMultiplexer.onDidOpenChannel(openChannelSpy);

const leftFirst = await leftMultiplexer.open('first');
const leftSecond = await leftMultiplexer.open('second');

const rightFirst = rightMultiplexer.getOpenChannel('first');
const rightSecond = rightMultiplexer.getOpenChannel('second');

assert.isNotNull(rightFirst);
assert.isNotNull(rightSecond);

const leftSecondSpy = spy((buf: ReadBuffer) => {
const message = buf.readString();
expect(message).equal('message for second');
});

leftSecond.onMessage(leftSecondSpy);

const rightFirstSpy = spy((buf: ReadBuffer) => {
const message = buf.readString();
expect(message).equal('message for first');
});

rightFirst!.onMessage(rightFirstSpy);

leftFirst.getWriteBuffer().writeString('message for first').commit();
rightSecond!.getWriteBuffer().writeString('message for second').commit();

expect(leftSecondSpy).to.be.called();
expect(rightFirstSpy).to.be.called();

expect(openChannelSpy).to.be.called.exactly(4);
})
});
Loading