diff --git a/packages/core/src/common/message-rpc/README.md b/packages/core/src/common/message-rpc/README.md new file mode 100644 index 0000000000000..d94e3170c0906 --- /dev/null +++ b/packages/core/src/common/message-rpc/README.md @@ -0,0 +1,10 @@ +# message-rpc + +An attempt to rewrite the theia RPC infrastructure with a couple of changes: + +1. "Zero-copy" message writing and reading +2. Support for binary buffers without ever encoding them +3. Separate RPC server from RPC client +4. Use a unified "Channel" interface + +A lot of this code is more or less copied from the current Theia code. diff --git a/packages/core/src/common/message-rpc/array-buffer-message-buffer.spec.ts b/packages/core/src/common/message-rpc/array-buffer-message-buffer.spec.ts new file mode 100644 index 0000000000000..9c84a7ba7558a --- /dev/null +++ b/packages/core/src/common/message-rpc/array-buffer-message-buffer.spec.ts @@ -0,0 +1,41 @@ +/******************************************************************************** + * Copyright (C) 2021 Red Hat, Inc. and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ +import { expect } from 'chai'; +import { ArrayBufferReadBuffer, ArrrayBufferWriteBuffer } from './array-buffer-message-buffer'; + +describe('array message buffer tests', () => { + it('basic read write test', () => { + const buffer = new ArrayBuffer(1024); + const writer = new ArrrayBufferWriteBuffer(buffer); + + writer.writeByte(8); + writer.writeInt(10000); + writer.writeBytes(new Uint8Array([1, 2, 3, 4])); + writer.writeString('this is a string'); + writer.writeString('another string'); + writer.commit(); + + const written = writer.getCurrentContents(); + + const reader = new ArrayBufferReadBuffer(written); + + expect(reader.readByte()).equal(8); + expect(reader.readInt()).equal(10000); + expect(reader.readBytes()).deep.equal(new Uint8Array([1, 2, 3, 4]).buffer); + expect(reader.readString()).equal('this is a string'); + expect(reader.readString()).equal('another string'); + }); +}); diff --git a/packages/core/src/common/message-rpc/array-buffer-message-buffer.ts b/packages/core/src/common/message-rpc/array-buffer-message-buffer.ts new file mode 100644 index 0000000000000..cf5d8832705f5 --- /dev/null +++ b/packages/core/src/common/message-rpc/array-buffer-message-buffer.ts @@ -0,0 +1,124 @@ +/******************************************************************************** + * Copyright (C) 2021 Red Hat, Inc. and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ +import { Emitter, Event } from '../event'; +import { ReadBuffer, WriteBuffer } from './message-buffer'; + +export class ArrrayBufferWriteBuffer implements WriteBuffer { + constructor(private buffer: ArrayBuffer = new ArrayBuffer(1024), private offset: number = 0) { + } + + private get msg(): DataView { + return new DataView(this.buffer); + } + + ensureCapacity(value: number): WriteBuffer { + let newLength = this.buffer.byteLength; + while (newLength < this.offset + value) { + newLength *= 2; + } + if (newLength !== this.buffer.byteLength) { + const newBuffer = new ArrayBuffer(newLength); + new Uint8Array(newBuffer).set(new Uint8Array(this.buffer)); + this.buffer = newBuffer; + } + return this; + } + + writeByte(value: number): WriteBuffer { + this.ensureCapacity(1); + this.msg.setUint8(this.offset++, value); + return this; + } + + writeInt(value: number): WriteBuffer { + this.ensureCapacity(4); + this.msg.setUint32(this.offset, value); + this.offset += 4; + return this; + } + + writeString(value: string): WriteBuffer { + const encoded = this.encodeString(value); + this.writeBytes(encoded); + return this; + } + + private encodeString(value: string): Uint8Array { + return new TextEncoder().encode(value); + } + + writeBytes(value: ArrayBuffer): WriteBuffer { + this.ensureCapacity(value.byteLength + 4); + this.writeInt(value.byteLength); + new Uint8Array(this.buffer).set(new Uint8Array(value), this.offset); + this.offset += value.byteLength; + return this; + } + + private onCommitEmitter = new Emitter(); + get onCommit(): Event { + return this.onCommitEmitter.event; + } + + commit(): void { + this.onCommitEmitter.fire(this.getCurrentContents()); + } + + getCurrentContents(): ArrayBuffer { + return this.buffer.slice(0, this.offset); + } +} + +export class ArrayBufferReadBuffer implements ReadBuffer { + private offset: number = 0; + + constructor(private readonly buffer: ArrayBuffer) { + } + + private get msg(): DataView { + return new DataView(this.buffer); + } + + readByte(): number { + return this.msg.getUint8(this.offset++); + } + + readInt(): number { + const result = this.msg.getInt32(this.offset); + this.offset += 4; + return result; + } + + readString(): string { + const len = this.msg.getUint32(this.offset); + this.offset += 4; + const result = this.decodeString(this.buffer.slice(this.offset, this.offset + len)); + this.offset += len; + return result; + } + + private decodeString(buf: ArrayBuffer): string { + return new TextDecoder().decode(buf); + } + + readBytes(): ArrayBuffer { + const length = this.msg.getUint32(this.offset); + this.offset += 4; + const result = this.buffer.slice(this.offset, this.offset + length); + this.offset += length; + return result; + } +} diff --git a/packages/core/src/common/message-rpc/channel.spec.ts b/packages/core/src/common/message-rpc/channel.spec.ts new file mode 100644 index 0000000000000..6c372ffb64a06 --- /dev/null +++ b/packages/core/src/common/message-rpc/channel.spec.ts @@ -0,0 +1,67 @@ +/******************************************************************************** + * Copyright (C) 2021 Red Hat, Inc. and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ +import { assert, expect, spy, use } from 'chai'; +import * as spies from 'chai-spies'; + +import { ChannelMultiplexer, ChannelPipe } from './channel'; +import { ReadBuffer } from './message-buffer'; + +use(spies); + +describe('multiplexer test', () => { + it('multiplex message', async () => { + const pipe = new ChannelPipe(); + + const leftMultiplexer = new ChannelMultiplexer(pipe.left); + const rightMultiplexer = new ChannelMultiplexer(pipe.right); + const openChannelSpy = spy(() => { + }); + + rightMultiplexer.onDidOpenChannel(openChannelSpy); + leftMultiplexer.onDidOpenChannel(openChannelSpy); + + const leftFirst = await leftMultiplexer.open('first'); + const leftSecond = await leftMultiplexer.open('second'); + + const rightFirst = rightMultiplexer.getOpenChannel('first'); + const rightSecond = rightMultiplexer.getOpenChannel('second'); + + assert.isNotNull(rightFirst); + assert.isNotNull(rightSecond); + + const leftSecondSpy = spy((buf: ReadBuffer) => { + const message = buf.readString(); + expect(message).equal('message for second'); + }); + + leftSecond.onMessage(leftSecondSpy); + + const rightFirstSpy = spy((buf: ReadBuffer) => { + const message = buf.readString(); + expect(message).equal('message for first'); + }); + + rightFirst!.onMessage(rightFirstSpy); + + leftFirst.getWriteBuffer().writeString('message for first').commit(); + rightSecond!.getWriteBuffer().writeString('message for second').commit(); + + expect(leftSecondSpy).to.be.called(); + expect(rightFirstSpy).to.be.called(); + + expect(openChannelSpy).to.be.called.exactly(4); + }) +}); diff --git a/packages/core/src/common/message-rpc/channel.ts b/packages/core/src/common/message-rpc/channel.ts new file mode 100644 index 0000000000000..da7342251c291 --- /dev/null +++ b/packages/core/src/common/message-rpc/channel.ts @@ -0,0 +1,215 @@ +/******************************************************************************** + * Copyright (C) 2021 Red Hat, Inc. and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ +import { ArrayBufferReadBuffer, ArrrayBufferWriteBuffer } from './array-buffer-message-buffer'; +import { Emitter, Event } from '../event'; +import { ReadBuffer, WriteBuffer } from './message-buffer'; + +/** + * A channel is a bidirectinal communications channel with lifecycle and + * error signalling. Note that creation of channels is specific to particular + * implementations and thus not part of the protocol. + */ +export interface Channel { + /** + * The remote side has closed the channel + */ + onClose: Event; + /** + * An error has occurred while writing to or reading from the channel + */ + onError: Event; + /** + * A message has arrived and can be read using the given {@link ReadBuffer} + */ + onMessage: Event; + /** + * Obtain a {@link WriteBuffer} to write a message to the channel. + */ + getWriteBuffer(): WriteBuffer; + /** + * Close this channel. No {@link onClose} event should be sent + */ + close(): void; +} + +enum MessageTypes { + Open = 1, + Close = 2, + AckOpen = 3, + Data = 4 +} + +/** + * Helper class to implement the single channels on a {@link ChannelMultiplexer} + */ +class ForwardingChannel implements Channel { + constructor(private readonly closeHander: () => void, private readonly writeBufferSource: () => WriteBuffer) { + } + + onCloseEmitter: Emitter = new Emitter(); + get onClose(): Event { + return this.onCloseEmitter.event; + }; + onErrorEmitter: Emitter = new Emitter(); + get onError(): Event { + return this.onErrorEmitter.event; + }; + onMessageEmitter: Emitter = new Emitter(); + get onMessage(): Event { + return this.onMessageEmitter.event; + }; + + getWriteBuffer(): WriteBuffer { + return this.writeBufferSource(); + } + + close(): void { + this.closeHander(); + } +} + +/** + * A class to encode/decode multiple channels over a single underlying {@link Channel} + * The write buffers in this implementation immediately write to the underlying + * channel, so we rely on writers to the multiplexed channels to always commit their + * messages and always in one go. + */ +export class ChannelMultiplexer { + protected pendingOpen: Map void> = new Map(); + protected openChannels: Map = new Map(); + + protected readonly onOpenChannelEmitter: Emitter = new Emitter(); + get onDidOpenChannel(): Event { + return this.onOpenChannelEmitter.event; + } + + constructor(protected readonly underlyingChannel: Channel) { + this.underlyingChannel.onMessage(buffer => this.handleMessage(buffer)); + this.underlyingChannel.onClose(() => this.handleClose()); + this.underlyingChannel.onError(error => this.handleError(error)); + } + + protected handleError(error: unknown): void { + this.openChannels.forEach(channel => { + channel.onErrorEmitter.fire(error); + }); + } + + protected handleClose(): void { + this.pendingOpen.clear(); + this.openChannels.forEach(channel => { + channel.close(); + }); + this.openChannels.clear(); + } + + protected handleMessage(buffer: ReadBuffer): void { + const type = buffer.readByte(); + const id = buffer.readString(); + switch (type) { + case MessageTypes.AckOpen: { + // edge case: both side try to open a channel at the same time. + const resolve = this.pendingOpen.get(id); + if (resolve) { + const channel = this.createChannel(id); + this.pendingOpen.delete(id); + this.openChannels.set(id, channel); + resolve!(channel); + this.onOpenChannelEmitter.fire(channel); + } + break; + } + case MessageTypes.Open: { + if (!this.openChannels.has(id)) { + const channel = this.createChannel(id); + this.openChannels.set(id, channel); + const resolve = this.pendingOpen.get(id); + if (resolve) { + // edge case: both side try to open a channel at the same time. + resolve(channel); + } + this.underlyingChannel.getWriteBuffer().writeByte(MessageTypes.AckOpen).writeString(id).commit(); + this.onOpenChannelEmitter.fire(channel); + } + + break; + } + case MessageTypes.Close: { + const channel = this.openChannels.get(id); + if (channel) { + channel.onCloseEmitter.fire(); + this.openChannels.delete(id); + } + break; + } + case MessageTypes.Data: { + const channel = this.openChannels.get(id); + if (channel) { + channel.onMessageEmitter.fire(buffer); + } + break; + } + + } + } + + protected createChannel(id: string): ForwardingChannel { + return new ForwardingChannel(() => this.closeChannel(id), () => { + const underlying = this.underlyingChannel.getWriteBuffer(); + underlying.writeByte(MessageTypes.Data); + underlying.writeString(id); + return underlying; + }); + } + + protected closeChannel(id: string): void { + this.underlyingChannel.getWriteBuffer().writeByte(MessageTypes.Close).writeString(id).commit(); + this.openChannels.get(id)!.onCloseEmitter.fire(); + this.openChannels.delete(id); + } + + open(id: string): Promise { + const result = new Promise((resolve, reject) => { + this.pendingOpen.set(id, resolve); + }); + this.underlyingChannel.getWriteBuffer().writeByte(MessageTypes.Open).writeString(id).commit(); + return result; + } + + getOpenChannel(id: string): Channel | undefined { + return this.openChannels.get(id); + } +} + +/** + * A pipe with two channels at each end for testing. + */ +export class ChannelPipe { + readonly left: ForwardingChannel = new ForwardingChannel(() => this.right.onCloseEmitter.fire(), () => { + const leftWrite = new ArrrayBufferWriteBuffer(); + leftWrite.onCommit(buffer => { + this.right.onMessageEmitter.fire(new ArrayBufferReadBuffer(buffer)); + }); + return leftWrite; + }); + readonly right: ForwardingChannel = new ForwardingChannel(() => this.left.onCloseEmitter.fire(), () => { + const rightWrite = new ArrrayBufferWriteBuffer(); + rightWrite.onCommit(buffer => { + this.left.onMessageEmitter.fire(new ArrayBufferReadBuffer(buffer)); + }); + return rightWrite; + }); +} diff --git a/packages/core/src/common/message-rpc/connection-handler.ts b/packages/core/src/common/message-rpc/connection-handler.ts new file mode 100644 index 0000000000000..d5fbfa277224a --- /dev/null +++ b/packages/core/src/common/message-rpc/connection-handler.ts @@ -0,0 +1,38 @@ +/******************************************************************************** + * Copyright (C) 2021 Red Hat, Inc. and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ +import { Channel } from './channel'; +import { RpcHandler, RpcProxyHandler } from './rpc-proxy'; + +interface ConnectionHandler { + onConnection(connection: Channel): void; +} + +export class JsonRpcConnectionHandler implements ConnectionHandler { + constructor( + readonly path: string, + readonly targetFactory: (proxy: T) => unknown, + ) { } + + onConnection(connection: Channel): void { + const proxyHandler = new RpcProxyHandler(); + // eslint-disable-next-line no-null/no-null + const proxy = new Proxy(Object.create(null), proxyHandler); + const target = this.targetFactory(proxy); + + new RpcHandler(target).onChannelOpen(connection); + proxyHandler.onChannelOpen(connection); + } +} diff --git a/packages/core/src/common/message-rpc/experiments.ts b/packages/core/src/common/message-rpc/experiments.ts new file mode 100644 index 0000000000000..60285ea3d7907 --- /dev/null +++ b/packages/core/src/common/message-rpc/experiments.ts @@ -0,0 +1,56 @@ +/******************************************************************************** + * Copyright (C) 2021 Red Hat, Inc. and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ +import { ChannelPipe } from './channel'; +import { RpcHandler, RpcProxyHandler } from './rpc-proxy'; +import * as fs from 'fs'; + +/** + * This file is for fiddling around and testing. Not production code. + */ + +const pipe = new ChannelPipe(); + +interface ReadFile { + read(path: string): Promise; +} + +class Server implements ReadFile { + read(path: string): Promise { + const bytes = fs.readFileSync(path); + const result = new ArrayBuffer(bytes.byteLength); + bytes.copy(new Uint8Array(result)); + return Promise.resolve(result); + } +} + +const handler = new RpcHandler(new Server()); +handler.onChannelOpen(pipe.right); + +const proxyHandler = new RpcProxyHandler(); +// eslint-disable-next-line no-null/no-null +const proxy: ReadFile = new Proxy(Object.create(null), proxyHandler); +proxyHandler.onChannelOpen(pipe.left); + +const t0 = new Date().getTime(); + +proxy.read(process.argv[2]).then(value => { + const t1 = new Date().getTime(); + console.log(`read file of length: ${value.byteLength} in ${t1 - t0}ms`); + console.log(value.slice(0, 20)); +}).catch(e => { + console.log(e); +}); + diff --git a/packages/core/src/common/message-rpc/message-buffer.ts b/packages/core/src/common/message-rpc/message-buffer.ts new file mode 100644 index 0000000000000..79466424512b5 --- /dev/null +++ b/packages/core/src/common/message-rpc/message-buffer.ts @@ -0,0 +1,70 @@ +/******************************************************************************** + * Copyright (C) 2021 Red Hat, Inc. and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ + +/** + * A buffer maintaining a write position capable of writing primitive values + */ +export interface WriteBuffer { + writeByte(byte: number): WriteBuffer + writeInt(value: number): WriteBuffer; + writeString(value: string): WriteBuffer; + writeBytes(value: ArrayBuffer): WriteBuffer; + + /** + * Makes any writes to the buffer permanent, for example by sending the writes over a channel. + * You must obtain a new write buffer after committing + */ + commit(): void; +} + +export class ForwardingWriteBuffer implements WriteBuffer { + constructor(protected readonly underlying: WriteBuffer) { + } + writeByte(byte: number): WriteBuffer { + this.underlying.writeByte(byte); + return this; + } + + writeInt(value: number): WriteBuffer { + this.underlying.writeInt(value); + return this; + } + + writeString(value: string): WriteBuffer { + this.underlying.writeString(value); + return this; + } + + writeBytes(value: ArrayBuffer): WriteBuffer { + this.underlying.writeBytes(value); + return this; + } + + commit(): void { + this.underlying.commit(); + } +} + +/** + * A buffer maintaining a read position in a buffer containing a received message capable of + * reading primitive values. + */ +export interface ReadBuffer { + readByte(): number; + readInt(): number; + readString(): string; + readBytes(): ArrayBuffer; +} diff --git a/packages/core/src/common/message-rpc/message-encoder.spec.ts b/packages/core/src/common/message-rpc/message-encoder.spec.ts new file mode 100644 index 0000000000000..0f6108052c0a5 --- /dev/null +++ b/packages/core/src/common/message-rpc/message-encoder.spec.ts @@ -0,0 +1,39 @@ +/******************************************************************************** + * Copyright (C) 2021 Red Hat, Inc. and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ +import { expect } from 'chai'; +import { ArrayBufferReadBuffer, ArrrayBufferWriteBuffer } from './array-buffer-message-buffer'; +import { MessageDecoder, MessageEncoder } from './message-encoder'; + +describe('message buffer test', () => { + it('encode object', () => { + const buffer = new ArrayBuffer(1024); + const writer = new ArrrayBufferWriteBuffer(buffer); + + const encoder = new MessageEncoder(); + const jsonMangled = JSON.parse(JSON.stringify(encoder)); + + encoder.writeTypedValue(writer, encoder); + + const written = writer.getCurrentContents(); + + const reader = new ArrayBufferReadBuffer(written); + + const decoder = new MessageDecoder(); + const decoded = decoder.readTypedValue(reader); + + expect(decoded).deep.equal(jsonMangled); + }); +}); diff --git a/packages/core/src/common/message-rpc/message-encoder.ts b/packages/core/src/common/message-rpc/message-encoder.ts new file mode 100644 index 0000000000000..16e3a55593a30 --- /dev/null +++ b/packages/core/src/common/message-rpc/message-encoder.ts @@ -0,0 +1,400 @@ +/******************************************************************************** + * Copyright (C) 2021 Red Hat, Inc. and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ +import { ReadBuffer, WriteBuffer } from './message-buffer'; + +/** + * This code lets you encode rpc protocol messages (request/reply/notification/error/cancel) + * into a channel write buffer and decode the same messages from a read buffer. + * Custom encoders/decoders can be registered to specially handling certain types of values + * to be encoded. Clients are responsible for ensuring that the set of tags for encoders + * is distinct and the same at both ends of a channel. + */ + +export interface SerializedError { + readonly $isError: true; + readonly name: string; + readonly message: string; + readonly stack: string; +} + +export const enum MessageType { + Request = 1, + Notification = 2, + Reply = 3, + ReplyErr = 4, + Cancel = 5, +} + +export interface CancelMessage { + type: MessageType.Cancel; + id: number; +} + +export interface RequestMessage { + type: MessageType.Request; + id: number; + method: string; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + args: any[]; +} + +export interface NotificationMessage { + type: MessageType.Notification; + id: number; + method: string; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + args: any[]; +} + +export interface ReplyMessage { + type: MessageType.Reply; + id: number; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + res: any; +} + +export interface ReplyErrMessage { + type: MessageType.ReplyErr; + id: number; + err: SerializedError; +} + +export type RPCMessage = RequestMessage | ReplyMessage | ReplyErrMessage | CancelMessage | NotificationMessage; + +enum ObjectType { + JSON = 0, + ByteArray = 1, + ObjectArray = 2, + Undefined = 3, + Object = 4 +} +/** + * A value encoder writes javascript values to a write buffer. Encoders will be asked + * in turn (ordered by their tag value, descending) whether they can encode a given value + * This means encoders with higher tag values have priority. Since the default encoders + * have tag values from 0-4, they can be easily overridden. + */ +export interface ValueEncoder { + /** + * Returns true if this encoder wants to encode this value. + * @param value the value to be encoded + */ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + is(value: any): boolean; + /** + * Write the given value to the buffer. Will only be called if {@link is(value)} returns true. + * @param buf The buffer to write to + * @param value The value to be written + * @param recursiveEncode A function that will use the encoders registered on the {@link MessageEncoder} + * to write a value to the underlying buffer. This is used mostly to write structures like an array + * without having to know how to encode the values in the array + */ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + write(buf: WriteBuffer, value: any, recursiveEncode: (buf: WriteBuffer, value: any) => void): void; +} + +/** + * Reads javascript values from a read buffer + */ +export interface ValueDecoder { + /** + * Reads a value from a read buffer. This method will be called for the decoder that is + * registered for the tag associated with the value encoder that encoded this value. + * @param buf The read buffer to read from + * @param recursiveDecode A function that will use the decoders registered on the {@link MessageEncoder} + * to read values from the underlying read buffer. This is used mostly to decode structures like an array + * without having to know how to decode the values in the aray. + */ + read(buf: ReadBuffer, recursiveDecode: (buf: ReadBuffer) => unknown): unknown; +} + +/** + * A MessageDecoder parses a ReadBuffer into a RCPMessage + */ + +export class MessageDecoder { + protected decoders: Map = new Map(); + + constructor() { + this.registerDecoder(ObjectType.JSON, { + read: buf => { + const json = buf.readString(); + return JSON.parse(json); + } + }); + this.registerDecoder(ObjectType.ByteArray, { + read: buf => buf.readBytes() + }); + this.registerDecoder(ObjectType.ObjectArray, { + read: buf => this.readArray(buf) + }); + + this.registerDecoder(ObjectType.Undefined, { + read: () => undefined + }); + + this.registerDecoder(ObjectType.Object, { + read: (buf, recursiveRead) => { + const propertyCount = buf.readInt(); + const result = Object.create({}); + for (let i = 0; i < propertyCount; i++) { + const key = buf.readString(); + const value = recursiveRead(buf); + result[key] = value; + } + return result; + } + }); + } + + registerDecoder(tag: number, decoder: ValueDecoder): void { + if (this.decoders.has(tag)) { + throw new Error(`Decoder already registered: ${tag}`); + } + this.decoders.set(tag, decoder); + } + + parse(buf: ReadBuffer): RPCMessage { + try { + const msgType = buf.readByte(); + + switch (msgType) { + case MessageType.Request: + return this.parseRequest(buf); + case MessageType.Notification: + return this.parseNotification(buf); + case MessageType.Reply: + return this.parseReply(buf); + case MessageType.ReplyErr: + return this.parseReplyErr(buf); + case MessageType.Cancel: + return this.parseCancel(buf); + } + throw new Error(`Unknown message type: ${msgType}`); + } catch (e) { + // exception does not show problematic content: log it! + console.log('failed to parse message: ' + buf); + throw e; + } + } + + protected parseCancel(msg: ReadBuffer): CancelMessage { + const callId = msg.readInt(); + return { + type: MessageType.Cancel, + id: callId + }; + } + + protected parseRequest(msg: ReadBuffer): RequestMessage { + const callId = msg.readInt(); + const method = msg.readString(); + let args = this.readArray(msg); + // convert `null` to `undefined`, since we don't use `null` in internal plugin APIs + args = args.map(arg => arg === null ? undefined : arg); // eslint-disable-line no-null/no-null + + return { + type: MessageType.Request, + id: callId, + method: method, + args: args + }; + } + + protected parseNotification(msg: ReadBuffer): NotificationMessage { + const callId = msg.readInt(); + const method = msg.readString(); + let args = this.readArray(msg); + // convert `null` to `undefined`, since we don't use `null` in internal plugin APIs + args = args.map(arg => arg === null ? undefined : arg); // eslint-disable-line no-null/no-null + + return { + type: MessageType.Notification, + id: callId, + method: method, + args: args + }; + } + + parseReply(msg: ReadBuffer): ReplyMessage { + const callId = msg.readInt(); + const value = this.readTypedValue(msg); + return { + type: MessageType.Reply, + id: callId, + res: value + }; + } + + parseReplyErr(msg: ReadBuffer): ReplyErrMessage { + const callId = msg.readInt(); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let err: any = this.readTypedValue(msg); + if (err && err.$isError) { + err = new Error(); + err.name = err.name; + err.message = err.message; + err.stack = err.stack; + } + return { + type: MessageType.ReplyErr, + id: callId, + err: err + }; + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + readArray(buf: ReadBuffer): any[] { + const length = buf.readInt(); + const result = new Array(length); + for (let i = 0; i < length; i++) { + result[i] = this.readTypedValue(buf); + } + return result; + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + readTypedValue(buf: ReadBuffer): any { + const type = buf.readInt(); + const decoder = this.decoders.get(type); + if (!decoder) { + throw new Error(`No decoder for tag ${type}`); + } + return decoder.read(buf, innerBuffer => this.readTypedValue(innerBuffer)); + } +} +/** + * A MessageEncoder writes RCPMessage objects to a WriteBuffer. Note that it is + * up to clients to commit the message. This allows for multiple messages being + * encoded before sending. + */ +export class MessageEncoder { + protected readonly encoders: [number, ValueEncoder][] = []; + protected readonly registeredTags: Set = new Set(); + + constructor() { + // encoders will be consulted in reverse order of registration, so the JSON fallback needs to be last + this.registerEncoder(ObjectType.JSON, { + is: () => true, + write: (buf, value) => { + buf.writeString(JSON.stringify(value)); + } + }); + this.registerEncoder(ObjectType.Object, { + is: value => typeof value === 'object', + write: (buf, object, recursiveEncode) => { + const properties = Object.keys(object); + const relevant = []; + for (const property of properties) { + const value = object[property]; + if (typeof value !== 'function') { + relevant.push([property, value]); + } + } + + buf.writeInt(relevant.length); + for (const [property, value] of relevant) { + buf.writeString(property); + recursiveEncode(buf, value); + } + } + }); + this.registerEncoder(ObjectType.Undefined, { + is: value => (typeof value === 'undefined'), + write: () => { } + }); + + this.registerEncoder(ObjectType.ObjectArray, { + is: value => Array.isArray(value), + write: (buf, value) => { + this.writeArray(buf, value); + } + }); + + this.registerEncoder(ObjectType.ByteArray, { + is: value => value instanceof ArrayBuffer, + write: (buf, value) => { + buf.writeBytes(value); + } + }); + } + + registerEncoder(tag: number, encoder: ValueEncoder): void { + if (this.registeredTags.has(tag)) { + throw new Error(`Tag already registered: ${tag}`); + } + this.registeredTags.add(tag); + this.encoders.push([tag, encoder]); + } + + cancel(buf: WriteBuffer, requestId: number): void { + buf.writeByte(MessageType.Cancel); + buf.writeInt(requestId); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { + buf.writeByte(MessageType.Notification); + buf.writeInt(requestId); + buf.writeString(method); + this.writeArray(buf, args); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { + buf.writeByte(MessageType.Request); + buf.writeInt(requestId); + buf.writeString(method); + this.writeArray(buf, args); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + replyOK(buf: WriteBuffer, requestId: number, res: any): void { + buf.writeByte(MessageType.Reply); + buf.writeInt(requestId); + this.writeTypedValue(buf, res); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + replyErr(buf: WriteBuffer, requestId: number, err: any): void { + buf.writeByte(MessageType.ReplyErr); + buf.writeInt(requestId); + this.writeTypedValue(buf, err); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + writeTypedValue(buf: WriteBuffer, value: any): void { + for (let i: number = this.encoders.length - 1; i >= 0; i--) { + if (this.encoders[i][1].is(value)) { + buf.writeInt(this.encoders[i][0]); + this.encoders[i][1].write(buf, value, (innerBuffer, innerValue) => { + this.writeTypedValue(innerBuffer, innerValue); + }); + return; + } + } + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + writeArray(buf: WriteBuffer, value: any[]): void { + buf.writeInt(value.length); + for (let i = 0; i < value.length; i++) { + this.writeTypedValue(buf, value[i]); + } + } + +} diff --git a/packages/core/src/common/message-rpc/rpc-protocol.ts b/packages/core/src/common/message-rpc/rpc-protocol.ts new file mode 100644 index 0000000000000..f9ea1c93cc044 --- /dev/null +++ b/packages/core/src/common/message-rpc/rpc-protocol.ts @@ -0,0 +1,163 @@ +/******************************************************************************** + * Copyright (C) 2021 Red Hat, Inc. and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import { Emitter, Event } from '../event'; +import { Deferred } from '../promise-util'; +import { Channel } from './channel'; +import { ReadBuffer } from './message-buffer'; +import { MessageDecoder, MessageEncoder, MessageType } from './message-encoder'; +/** + * A RCPServer reads rcp request and notification messages and sends the reply values or + * errors from the request to the channel. + */ +export class RPCServer { + protected readonly encoder: MessageEncoder = new MessageEncoder(); + protected readonly decoder: MessageDecoder = new MessageDecoder(); + protected onNotificationEmitter: Emitter<{ method: string; args: any[]; }> = new Emitter(); + + get onNotification(): Event<{ method: string; args: any[]; }> { + return this.onNotificationEmitter.event; + } + + constructor(protected channel: Channel, public readonly requestHandler: (method: string, args: any[]) => Promise) { + const registration = channel.onMessage((data: ReadBuffer) => this.handleMessage(data)); + channel.onClose(() => registration.dispose()); + } + + handleMessage(data: ReadBuffer): void { + const message = this.decoder.parse(data); + switch (message.type) { + case MessageType.Cancel: { + this.handleCancel(message.id); + break; + } + case MessageType.Request: { + this.handleRequest(message.id, message.method, message.args); + break; + } + case MessageType.Notification: { + this.handleNotify(message.id, message.method, message.args); + break; + } + } + } + + protected handleCancel(id: number): void { + // implement cancellation + /* const token = this.cancellationTokens.get(id); + if (token) { + this.cancellationTokens.delete(id); + token.cancel(); + } else { + console.warn(`cancel: no token for message: ${id}`); + }*/ + } + + protected async handleRequest(id: number, method: string, args: any[]): Promise { + const output = this.channel.getWriteBuffer(); + try { + // console.log(`handling request ${method} with id ${id}`); + const result = await this.requestHandler(method, args); + this.encoder.replyOK(output, id, result); + // console.log(`handled request ${method} with id ${id}`); + } catch (err) { + this.encoder.replyErr(output, id, err); + console.log(`error on request ${method} with id ${id}`); + } + output.commit(); + } + + protected async handleNotify(id: number, method: string, args: any[]): Promise { + // console.log(`handling notification ${method} with id ${id}`); + this.onNotificationEmitter.fire({ method, args }); + } +} + +/** + * An RpcClient sends requests and notifications to a remote server. + * Clients can get a promise for the request result that will be either resolved or + * rejected depending on the success of the request. + * The RpcClient keeps track of outstanding requests and matches replies to the appropriate request + * Currently, there is no timeout handling implemented in the client. + */ +export class RpcClient { + protected readonly pendingRequests: Map> = new Map(); + protected nextMessageId: number = 0; + + protected readonly encoder: MessageEncoder = new MessageEncoder(); + protected readonly decoder: MessageDecoder = new MessageDecoder(); + + constructor(protected channel: Channel) { + const registration = channel.onMessage((data: ReadBuffer) => this.handleMessage(data)); + channel.onClose(() => registration.dispose()); + } + + handleMessage(data: ReadBuffer): void { + const message = this.decoder.parse(data); + switch (message.type) { + case MessageType.Reply: { + this.handleReply(message.id, message.res); + break; + } + case MessageType.ReplyErr: { + this.handleReplyErr(message.id, message.err); + break; + } + } + } + + protected handleReply(id: number, value: any): void { + const replyHandler = this.pendingRequests.get(id); + // console.log(`received reply with id ${id}`); + if (replyHandler) { + this.pendingRequests.delete(id); + replyHandler.resolve(value); + } else { + console.warn(`reply: no handler for message: ${id}`); + } + } + + protected handleReplyErr(id: number, error: any): void { + const replyHandler = this.pendingRequests.get(id); + if (replyHandler) { + this.pendingRequests.delete(id); + // console.log(`received error id ${id}`); + replyHandler.reject(error); + } else { + console.warn(`error: no handler for message: ${id}`); + } + } + + sendRequest(method: string, args: any[]): Promise { + const id = this.nextMessageId++; + const reply = new Deferred(); + // console.log(`sending request ${method} with id ${id}`); + + this.pendingRequests.set(id, reply); + const output = this.channel.getWriteBuffer(); + this.encoder.request(output, id, method, args); + output.commit(); + return reply.promise; + } + + sendNotification(method: string, args: any[]): void { + // console.log(`sending notification ${method} with id ${this.nextMessageId + 1}`); + const output = this.channel.getWriteBuffer(); + this.encoder.notification(output, this.nextMessageId++, method, args); + output.commit(); + } +} diff --git a/packages/core/src/common/message-rpc/rpc-proxy.ts b/packages/core/src/common/message-rpc/rpc-proxy.ts new file mode 100644 index 0000000000000..3578f64560942 --- /dev/null +++ b/packages/core/src/common/message-rpc/rpc-proxy.ts @@ -0,0 +1,93 @@ +/******************************************************************************** + * Copyright (C) 2021 Red Hat, Inc. and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import { Deferred } from '../promise-util'; +import { Channel } from './channel'; +import { RpcClient, RPCServer } from './rpc-protocol'; + +/** + * A proxy handler that will send any method invocation on the proxied object + * as a rcp protocol message over a channel. + */ +export class RpcProxyHandler implements ProxyHandler { + private channelDeferred: Deferred = new Deferred(); + + onChannelOpen(channel: Channel): void { + const client = new RpcClient(channel); + this.channelDeferred.resolve(client); + } + + get?(target: T, p: string | symbol, receiver: any): any { + const isNotify = this.isNotification(p); + return (...args: any[]) => { + const method = p.toString(); + return this.channelDeferred.promise.then((connection: RpcClient) => + new Promise((resolve, reject) => { + try { + if (isNotify) { + // console.info(`Send notification ${method}`); + connection.sendNotification(method, args); + resolve(undefined); + } else { + // console.info(`Send request ${method}`); + const resultPromise = connection.sendRequest(method, args) as Promise; + resultPromise.then((result: any) => { + // console.info(`request succeeded: ${method}`); + resolve(result); + }).catch(e => { + reject(e); + }); + } + } catch (err) { + reject(err); + } + }) + ); + }; + } + + /** + * Return whether the given property represents a notification. If true, + * the promise returned from the invocation will resolve immediatey to `undefined` + * + * A property leads to a notification rather than a method call if its name + * begins with `notify` or `on`. + * + * @param p - The property being called on the proxy. + * @return Whether `p` represents a notification. + */ + protected isNotification(p: PropertyKey): boolean { + return p.toString().startsWith('notify') || p.toString().startsWith('on'); + } +} + +export class RpcHandler { + constructor(readonly target: any) { + } + + onChannelOpen(channel: Channel): void { + const server = new RPCServer(channel, (method: string, args: any[]) => this.handleRequest(method, args)); + server.onNotification((e: { method: string, args: any }) => this.onNotification(e.method, e.args)); + } + + protected async handleRequest(method: string, args: any[]): Promise { + return this.target[method](...args); + } + + protected onNotification(method: string, args: any[]): void { + this.target[method](args); + } +} diff --git a/packages/core/src/common/message-rpc/websocket-client-channel.ts b/packages/core/src/common/message-rpc/websocket-client-channel.ts new file mode 100644 index 0000000000000..bf07088f18448 --- /dev/null +++ b/packages/core/src/common/message-rpc/websocket-client-channel.ts @@ -0,0 +1,229 @@ +/******************************************************************************** + * Copyright (C) 2018 TypeFox and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import ReconnectingWebSocket from 'reconnecting-websocket'; +import { v4 as uuid } from 'uuid'; +import { Channel } from './channel'; +import { ReadBuffer, WriteBuffer } from './message-buffer'; +import { ArrayBufferReadBuffer, ArrrayBufferWriteBuffer } from './array-buffer-message-buffer'; +import { Deferred } from '../promise-util'; +import { Emitter, Event } from '../event'; +import { Endpoint } from 'src/browser'; + +/** + * An attempt at a channel implementation over a websocket with fallback to http. + */ + +export interface WebSocketOptions { + /** + * True by default. + */ + reconnecting?: boolean; +} + +export const HttpFallbackOptions = Symbol('HttpFallbackOptions'); + +export interface HttpFallbackOptions { + /** Determines whether Theia is allowed to use the http fallback. True by default. */ + allowed: boolean; + /** Number of failed websocket connection attempts before the fallback is triggered. 2 by default. */ + maxAttempts: number; + /** The maximum duration (in ms) after which the http request should timeout. 5000 by default. */ + pollingTimeout: number; + /** The timeout duration (in ms) after a request was answered with an error code. 5000 by default. */ + errorTimeout: number; + /** The minimum timeout duration (in ms) between two http requests. 0 by default. */ + requestTimeout: number; +} + +export const DEFAULT_HTTP_FALLBACK_OPTIONS: HttpFallbackOptions = { + allowed: true, + maxAttempts: 2, + errorTimeout: 5000, + pollingTimeout: 5000, + requestTimeout: 0 +}; + +export class WebSocketClientChannel implements Channel { + + protected readonly readyDeferred: Deferred = new Deferred(); + + protected readonly onCloseEmitter: Emitter = new Emitter(); + get onClose(): Event { + return this.onCloseEmitter.event; + } + + protected readonly onMessageEmitter: Emitter = new Emitter(); + get onMessage(): Event { + return this.onMessageEmitter.event; + } + + protected readonly onErrorEmitter: Emitter = new Emitter(); + get onError(): Event { + return this.onErrorEmitter.event; + } + + protected readonly socket: ReconnectingWebSocket; + protected useHttpFallback = false; + protected websocketErrorCounter = 0; + protected httpFallbackId = uuid(); + protected httpFallbackDisconnected = true; + + constructor(protected readonly httpFallbackOptions: HttpFallbackOptions | undefined) { + const url = this.createWebSocketUrl('/services'); + const socket = this.createWebSocket(url); + socket.onerror = event => this.handleSocketError(event); + socket.onopen = () => { + this.fireSocketDidOpen(); + }; + socket.onclose = ({ code, reason }) => { + this.onCloseEmitter.fire(); + }; + socket.onmessage = ({ data }) => { + this.onMessageEmitter.fire(new ArrayBufferReadBuffer(data)); + }; + this.socket = socket; + window.addEventListener('offline', () => this.tryReconnect()); + window.addEventListener('online', () => this.tryReconnect()); + } + + getWriteBuffer(): WriteBuffer { + const result = new ArrrayBufferWriteBuffer(); + const httpUrl = this.createHttpWebSocketUrl('/services'); + if (this.useHttpFallback) { + result.writeString(this.httpFallbackId); + result.writeString('true'); + result.onCommit(buffer => { + fetch(httpUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/octet-stream' + }, + body: buffer + }); + }); + + } else if (this.socket.readyState < WebSocket.CLOSING) { + result.onCommit(buffer => { + this.socket.send(buffer); + }); + } + return result; + + } + + close(): void { + this.socket.close(); + } + + get ready(): Promise { + return this.readyDeferred.promise; + } + + handleSocketError(event: unknown): void { + this.websocketErrorCounter += 1; + if (this.httpFallbackOptions?.allowed && this.websocketErrorCounter >= this.httpFallbackOptions?.maxAttempts) { + this.useHttpFallback = true; + this.socket.close(); + const httpUrl = this.createHttpWebSocketUrl('/services'); + this.readyDeferred.resolve(); + this.doLongPolling(httpUrl); + console.warn( + 'Could not establish a websocket connection. The application will be using the HTTP fallback mode. This may affect performance and the behavior of some features.' + ); + } + this.onErrorEmitter.fire(event); + console.error(event); + } + + async doLongPolling(url: string): Promise { + let timeoutDuration = this.httpFallbackOptions?.requestTimeout || 0; + const controller = new AbortController(); + const pollingId = window.setTimeout(() => controller.abort(), this.httpFallbackOptions?.pollingTimeout); + try { + const response = await fetch(url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + signal: controller.signal, + keepalive: true, + body: JSON.stringify({ id: this.httpFallbackId, polling: true }) + }); + if (response.status === 200) { + window.clearTimeout(pollingId); + if (this.httpFallbackDisconnected) { + this.fireSocketDidOpen(); + } + const bytes = await response.arrayBuffer(); + this.onMessageEmitter.fire(new ArrayBufferReadBuffer(bytes)); + } else { + timeoutDuration = this.httpFallbackOptions?.errorTimeout || 0; + this.httpFallbackDisconnected = true; + this.onCloseEmitter.fire(); + throw new Error('Response has error code: ' + response.status); + } + } catch (e) { + console.error('Error occurred during long polling', e); + } + setTimeout(() => this.doLongPolling(url), timeoutDuration); + } + + /** + * Creates a websocket URL to the current location + */ + protected createWebSocketUrl(path: string): string { + const endpoint = new Endpoint({ path }); + return endpoint.getWebSocketUrl().toString(); + } + + protected createHttpWebSocketUrl(path: string): string { + const endpoint = new Endpoint({ path }); + return endpoint.getRestUrl().toString(); + } + + /** + * Creates a web socket for the given url + */ + protected createWebSocket(url: string): ReconnectingWebSocket { + const socket = new ReconnectingWebSocket(url, undefined, { + maxReconnectionDelay: 10000, + minReconnectionDelay: 1000, + reconnectionDelayGrowFactor: 1.3, + connectionTimeout: 10000, + maxRetries: Infinity, + debug: false + }); + socket.binaryType = 'arraybuffer'; + return socket; + } + + protected fireSocketDidOpen(): void { + // Once a websocket connection has opened, disable the http fallback + if (this.httpFallbackOptions?.allowed) { + this.httpFallbackOptions.allowed = false; + } + this.readyDeferred.resolve(); + } + + protected tryReconnect(): void { + if (!this.useHttpFallback && this.socket.readyState !== WebSocket.CONNECTING) { + this.socket.reconnect(); + } + } + +} diff --git a/packages/search-in-workspace/src/browser/styles/index.css b/packages/search-in-workspace/src/browser/styles/index.css index 9285cd0bed9ff..e87c60a16f5b0 100644 --- a/packages/search-in-workspace/src/browser/styles/index.css +++ b/packages/search-in-workspace/src/browser/styles/index.css @@ -38,7 +38,6 @@ .t-siw-search-container .theia-input { flex: 1; line-height: var(--theia-content-line-height); - padding-left: 8px; padding: 3px 0 3px 4px; }