From cb435f4440f2a62ca6d853e4860c121aa4117e1a Mon Sep 17 00:00:00 2001 From: Paul Marechal Date: Thu, 21 Jul 2022 13:09:45 -0400 Subject: [PATCH 1/3] terminal: buffer the output before sending In order to not overload the websocket channel opened between the frontend and the backend we should chunk and buffer the output of terminals. Add `BufferingStream` that will buffer data for some time before emitting it. --- .../src/node/buffering-stream.spec.ts | 46 +++++++++++ .../terminal/src/node/buffering-stream.ts | 77 +++++++++++++++++++ .../src/node/terminal-backend-contribution.ts | 10 ++- 3 files changed, 130 insertions(+), 3 deletions(-) create mode 100644 packages/terminal/src/node/buffering-stream.spec.ts create mode 100644 packages/terminal/src/node/buffering-stream.ts diff --git a/packages/terminal/src/node/buffering-stream.spec.ts b/packages/terminal/src/node/buffering-stream.spec.ts new file mode 100644 index 0000000000000..8332143490b17 --- /dev/null +++ b/packages/terminal/src/node/buffering-stream.spec.ts @@ -0,0 +1,46 @@ +// ***************************************************************************** +// Copyright (C) 2022 Ericsson and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** + +import { wait } from '@theia/core/lib/common/promise-util'; +import { expect } from 'chai'; +import { BufferingStream } from './buffering-stream'; + +describe('BufferringStream', () => { + + it('should emit whatever data was buffered before the timeout', async () => { + const buffer = new BufferingStream({ emitInterval: 1000 }); + const chunkPromise = waitForChunk(buffer); + buffer.push(Buffer.from([0])); + await wait(100); + buffer.push(Buffer.from([1])); + await wait(100); + buffer.push(Buffer.from([2, 3, 4])); + const chunk = await chunkPromise; + expect(chunk).deep.eq(Buffer.from([0, 1, 2, 3, 4])); + }); + + it('should not emit chunks bigger than maxChunkSize', async () => { + const buffer = new BufferingStream({ maxChunkSize: 2 }); + buffer.push(Buffer.from([0, 1, 2, 3, 4, 5])); + expect(await waitForChunk(buffer)).deep.eq(Buffer.from([0, 1])); + expect(await waitForChunk(buffer)).deep.eq(Buffer.from([2, 3])); + expect(await waitForChunk(buffer)).deep.eq(Buffer.from([4, 5])); + }); + + function waitForChunk(buffer: BufferingStream): Promise { + return new Promise(resolve => buffer.onData(resolve)); + } +}); diff --git a/packages/terminal/src/node/buffering-stream.ts b/packages/terminal/src/node/buffering-stream.ts new file mode 100644 index 0000000000000..7c7a8b0d2d134 --- /dev/null +++ b/packages/terminal/src/node/buffering-stream.ts @@ -0,0 +1,77 @@ +// ***************************************************************************** +// Copyright (C) 2022 Ericsson and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** + +import { Emitter, Event } from '@theia/core/lib/common/event'; + +export interface BufferingStreamOptions { + /** + * Max size of the chunks being emitted. + */ + maxChunkSize?: number + /** + * Amount of time to wait between the moment we start buffering data + * and when we emit the buffered chunk. + */ + emitInterval?: number +} + +/** + * This component will buffer whatever is pushed to it and emit chunks back + * every {@link BufferingStreamOptions.emitInterval}. It will also ensure that + * the emitted chunks never exceed {@link BufferingStreamOptions.maxChunkSize}. + */ +export class BufferingStream { + + protected buffer?: Buffer; + protected timeout?: NodeJS.Timeout; + protected maxChunkSize: number; + protected emitInterval: number; + + protected onDataEmitter = new Emitter(); + + constructor(options?: BufferingStreamOptions) { + this.emitInterval = options?.emitInterval ?? 16; // ms + this.maxChunkSize = options?.maxChunkSize ?? 16384; // bytes + } + + get onData(): Event { + return this.onDataEmitter.event; + } + + push(chunk: Buffer): void { + if (this.buffer) { + this.buffer = Buffer.concat([this.buffer, chunk]); + } else { + this.buffer = chunk; + this.timeout = setTimeout(() => this.emitBufferedChunk(), this.emitInterval); + } + } + + dispose(): void { + clearTimeout(this.timeout); + this.buffer = undefined; + } + + protected emitBufferedChunk(): void { + this.onDataEmitter.fire(this.buffer!.slice(0, this.maxChunkSize)); + if (this.buffer!.byteLength <= this.maxChunkSize) { + this.buffer = undefined; + } else { + this.buffer = this.buffer!.slice(this.maxChunkSize); + this.timeout = setTimeout(() => this.emitBufferedChunk(), this.emitInterval); + } + } +} diff --git a/packages/terminal/src/node/terminal-backend-contribution.ts b/packages/terminal/src/node/terminal-backend-contribution.ts index dea4504e0ffea..fb2c0fe0ddce5 100644 --- a/packages/terminal/src/node/terminal-backend-contribution.ts +++ b/packages/terminal/src/node/terminal-backend-contribution.ts @@ -20,6 +20,7 @@ import { TerminalProcess, ProcessManager } from '@theia/process/lib/node'; import { terminalsPath } from '../common/terminal-protocol'; import { MessagingService } from '@theia/core/lib/node/messaging/messaging-service'; import { RpcProtocol } from '@theia/core/'; +import { BufferingStream } from './buffering-stream'; @injectable() export class TerminalBackendContribution implements MessagingService.Contribution { @@ -47,10 +48,13 @@ export class TerminalBackendContribution implements MessagingService.Contributio }; const rpc = new RpcProtocol(channel, requestHandler); - output.on('data', data => { - rpc.sendNotification('onData', [data]); + const buffer = new BufferingStream(); + buffer.onData(chunk => rpc.sendNotification('onData', [chunk.toString('utf8')])); + output.on('data', chunk => buffer.push(Buffer.from(chunk, 'utf8'))); + channel.onClose(() => { + buffer.dispose(); + output.dispose(); }); - channel.onClose(() => output.dispose()); } }); } From 7939cbf6ada4ab278ea8d5016ca3f29233ad1d77 Mon Sep 17 00:00:00 2001 From: Paul Marechal Date: Thu, 21 Jul 2022 13:55:59 -0400 Subject: [PATCH 2/3] dispose onDataEmitter --- packages/terminal/src/node/buffering-stream.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/terminal/src/node/buffering-stream.ts b/packages/terminal/src/node/buffering-stream.ts index 7c7a8b0d2d134..67261a8f6b063 100644 --- a/packages/terminal/src/node/buffering-stream.ts +++ b/packages/terminal/src/node/buffering-stream.ts @@ -63,6 +63,7 @@ export class BufferingStream { dispose(): void { clearTimeout(this.timeout); this.buffer = undefined; + this.onDataEmitter.dispose(); } protected emitBufferedChunk(): void { From 60ebeca9f74ce83c41d8ea807bd4a2499307ff18 Mon Sep 17 00:00:00 2001 From: Paul Marechal Date: Fri, 22 Jul 2022 11:35:08 -0400 Subject: [PATCH 3/3] increase default `maxChunkSize` --- packages/terminal/src/node/buffering-stream.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/terminal/src/node/buffering-stream.ts b/packages/terminal/src/node/buffering-stream.ts index 67261a8f6b063..0a9d1c30688ac 100644 --- a/packages/terminal/src/node/buffering-stream.ts +++ b/packages/terminal/src/node/buffering-stream.ts @@ -18,12 +18,12 @@ import { Emitter, Event } from '@theia/core/lib/common/event'; export interface BufferingStreamOptions { /** - * Max size of the chunks being emitted. + * Max size in bytes of the chunks being emitted. */ maxChunkSize?: number /** - * Amount of time to wait between the moment we start buffering data - * and when we emit the buffered chunk. + * Amount of time in milliseconds to wait between the moment we start + * buffering data and when we emit the buffered chunk. */ emitInterval?: number } @@ -44,7 +44,7 @@ export class BufferingStream { constructor(options?: BufferingStreamOptions) { this.emitInterval = options?.emitInterval ?? 16; // ms - this.maxChunkSize = options?.maxChunkSize ?? 16384; // bytes + this.maxChunkSize = options?.maxChunkSize ?? (256 * 1024); // bytes } get onData(): Event {