From b3f4d51a4915e4b5892567abbe8cdddc59b4f913 Mon Sep 17 00:00:00 2001 From: Benjamin Pasero Date: Fri, 14 Oct 2016 16:17:58 +0200 Subject: [PATCH 1/6] buffer process.send --- src/vs/base/node/processes.ts | 41 +++++++++++ .../base/test/node/processes/fixtures/fork.ts | 16 +++++ .../node/processes/fixtures/fork_large.ts | 16 +++++ .../test/node/processes/processes.test.ts | 72 +++++++++++++++++++ 4 files changed, 145 insertions(+) create mode 100644 src/vs/base/test/node/processes/fixtures/fork.ts create mode 100644 src/vs/base/test/node/processes/fixtures/fork_large.ts create mode 100644 src/vs/base/test/node/processes/processes.test.ts diff --git a/src/vs/base/node/processes.ts b/src/vs/base/node/processes.ts index 1fb759c976237..c48653e5eab29 100644 --- a/src/vs/base/node/processes.ts +++ b/src/vs/base/node/processes.ts @@ -447,4 +447,45 @@ export class StreamProcess extends AbstractProcess { pp({ stdin: childProcess.stdin, stdout: childProcess.stdout, stderr: childProcess.stderr }); } } +} + +export interface ISender { + send: (msg: any) => void; +} + +// Wrapper around process.send() that will buffer any messages if the internal node.js +// buffer is filled with messages and only continue sending messages when the internal +// buffer is free again to consume messages. +// Workaround for https://github.com/nodejs/node/issues/7657 (IPC can freeze process) +export function createBufferedSender(childProcess: ChildProcess | NodeJS.Process): ISender { + let msgBuffer = []; + let useBuffer = false; + + const send = function (msg: any): void { + if (useBuffer) { + msgBuffer.push(msg); // add to the buffer if the process cannot handle more messages + return; + } + + let result = childProcess.send(msg, error => { + if (error) { + console.error(error); // unlikely to happen, best we can do is log this error + } + + useBuffer = false; // we are good again to send directly without buffer + + // now send all the messages that we have in our buffer and did not send yet + if (msgBuffer.length > 0) { + const msgBufferCopy = msgBuffer.slice(0); + msgBuffer = []; + msgBufferCopy.forEach(entry => send(entry)); + } + }); + + if (!result) { + useBuffer = true; // the process indicates that it could not directly consume the message so we need to use our buffer next time + } + }; + + return { send }; } \ No newline at end of file diff --git a/src/vs/base/test/node/processes/fixtures/fork.ts b/src/vs/base/test/node/processes/fixtures/fork.ts new file mode 100644 index 0000000000000..b7220975577bb --- /dev/null +++ b/src/vs/base/test/node/processes/fixtures/fork.ts @@ -0,0 +1,16 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +'use strict'; + +import processes = require('vs/base/node/processes'); + +const sender = processes.createBufferedSender(process); + +process.on('message', msg => { + sender.send(msg); +}); + +sender.send('ready'); \ No newline at end of file diff --git a/src/vs/base/test/node/processes/fixtures/fork_large.ts b/src/vs/base/test/node/processes/fixtures/fork_large.ts new file mode 100644 index 0000000000000..b7220975577bb --- /dev/null +++ b/src/vs/base/test/node/processes/fixtures/fork_large.ts @@ -0,0 +1,16 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +'use strict'; + +import processes = require('vs/base/node/processes'); + +const sender = processes.createBufferedSender(process); + +process.on('message', msg => { + sender.send(msg); +}); + +sender.send('ready'); \ No newline at end of file diff --git a/src/vs/base/test/node/processes/processes.test.ts b/src/vs/base/test/node/processes/processes.test.ts new file mode 100644 index 0000000000000..d27a3396511fd --- /dev/null +++ b/src/vs/base/test/node/processes/processes.test.ts @@ -0,0 +1,72 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +'use strict'; + +import * as assert from 'assert'; +import * as cp from 'child_process'; +import * as objects from 'vs/base/common/objects'; +import URI from 'vs/base/common/uri'; +import processes = require('vs/base/node/processes'); + +function fork(id: string): cp.ChildProcess { + const opts: any = { + env: objects.mixin(objects.clone(process.env), { + AMD_ENTRYPOINT: id, + PIPE_LOGGING: 'true', + VERBOSE_LOGGING: true + }) + }; + + return cp.fork(URI.parse(require.toUrl('bootstrap')).fsPath, ['--type=processTests'], opts); +} + +suite('Processes', () => { + test('buffered sending - simple data', function (done: () => void) { + const child = fork('vs/base/test/node/processes/fixtures/fork'); + const sender = processes.createBufferedSender(child); + + const msg = 'Hello Child'; + child.on('message', msgFromChild => { + if (msgFromChild === 'ready') { + sender.send(msg); + } else { + assert.equal(msgFromChild, msg); + + child.kill(); + done(); + } + }); + }); + + test('buffered sending - lots of data (potential deadlock on windows)', function (done: () => void) { + const child = fork('vs/base/test/node/processes/fixtures/fork_large'); + const sender = processes.createBufferedSender(child); + + const largeObj = Object.create(null); + for (let i = 0; i < 10000; i++) { + largeObj[i] = 'some data'; + } + + let counter = 0; + + const msg = JSON.stringify(largeObj); + child.on('message', msgFromChild => { + if (msgFromChild === 'ready') { + sender.send(msg); + sender.send(msg); + sender.send(msg); + } else { + assert.equal(msgFromChild, msg); + counter++; + + if (counter === 3) { + child.kill(); + done(); + } + } + }); + }); +}); \ No newline at end of file From 7abb34cdf03052c31266e1b6e2979d1f70c45bcb Mon Sep 17 00:00:00 2001 From: Benjamin Pasero Date: Fri, 14 Oct 2016 16:57:09 +0200 Subject: [PATCH 2/6] improve large test to simulate real deadlock behaviour --- .../base/test/node/processes/fixtures/fork_large.ts | 3 +++ src/vs/base/test/node/processes/processes.test.ts | 11 +++-------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/vs/base/test/node/processes/fixtures/fork_large.ts b/src/vs/base/test/node/processes/fixtures/fork_large.ts index b7220975577bb..82886c05b6ebb 100644 --- a/src/vs/base/test/node/processes/fixtures/fork_large.ts +++ b/src/vs/base/test/node/processes/fixtures/fork_large.ts @@ -11,6 +11,9 @@ const sender = processes.createBufferedSender(process); process.on('message', msg => { sender.send(msg); + sender.send(msg); + sender.send(msg); + sender.send('done'); }); sender.send('ready'); \ No newline at end of file diff --git a/src/vs/base/test/node/processes/processes.test.ts b/src/vs/base/test/node/processes/processes.test.ts index d27a3396511fd..4d2c372ec27db 100644 --- a/src/vs/base/test/node/processes/processes.test.ts +++ b/src/vs/base/test/node/processes/processes.test.ts @@ -58,14 +58,9 @@ suite('Processes', () => { sender.send(msg); sender.send(msg); sender.send(msg); - } else { - assert.equal(msgFromChild, msg); - counter++; - - if (counter === 3) { - child.kill(); - done(); - } + } else if (msgFromChild === 'done') { + child.kill(); + done(); } }); }); From 844484aa1aab43af4eb0d06b5a4f8b7de116d59b Mon Sep 17 00:00:00 2001 From: Benjamin Pasero Date: Sun, 16 Oct 2016 09:42:03 +0200 Subject: [PATCH 3/6] clean up --- src/vs/base/node/processes.ts | 43 +++++++++---------- .../base/test/node/processes/fixtures/fork.ts | 2 +- .../node/processes/fixtures/fork_large.ts | 2 +- .../test/node/processes/processes.test.ts | 6 +-- 4 files changed, 24 insertions(+), 29 deletions(-) diff --git a/src/vs/base/node/processes.ts b/src/vs/base/node/processes.ts index c48653e5eab29..28ea85bff6f4d 100644 --- a/src/vs/base/node/processes.ts +++ b/src/vs/base/node/processes.ts @@ -9,11 +9,8 @@ import * as cp from 'child_process'; import ChildProcess = cp.ChildProcess; import exec = cp.exec; import spawn = cp.spawn; - import { PassThrough } from 'stream'; - -import { fork } from './stdFork'; - +import { fork } from 'vs/base/node/stdFork'; import nls = require('vs/nls'); import { PPromise, Promise, TPromise, TValueCallback, TProgressCallback, ErrorCallback } from 'vs/base/common/winjs.base'; import * as Types from 'vs/base/common/types'; @@ -22,7 +19,6 @@ import URI from 'vs/base/common/uri'; import * as Objects from 'vs/base/common/objects'; import * as TPath from 'vs/base/common/paths'; import * as Platform from 'vs/base/common/platform'; - import { LineDecoder } from 'vs/base/node/decoder'; import { CommandOptions, ForkOptions, SuccessData, Source, TerminateResponse, TerminateResponseCode, Executable } from 'vs/base/common/processes'; export { CommandOptions, ForkOptions, SuccessData, Source, TerminateResponse, TerminateResponseCode }; @@ -449,21 +445,22 @@ export class StreamProcess extends AbstractProcess { } } -export interface ISender { +export interface IQueuedSender { send: (msg: any) => void; } -// Wrapper around process.send() that will buffer any messages if the internal node.js -// buffer is filled with messages and only continue sending messages when the internal -// buffer is free again to consume messages. -// Workaround for https://github.com/nodejs/node/issues/7657 (IPC can freeze process) -export function createBufferedSender(childProcess: ChildProcess | NodeJS.Process): ISender { - let msgBuffer = []; - let useBuffer = false; +// Wrapper around process.send() that will queue any messages if the internal node.js +// queue is filled with messages and only continue sending messages when the internal +// queue is free again to consume messages. +// On Windows we always wait for the send() method to return before sending the next message +// to workaround https://github.com/nodejs/node/issues/7657 (IPC can freeze process) +export function createQueuedSender(childProcess: ChildProcess | NodeJS.Process): IQueuedSender { + let msgQueue = []; + let useQueue = false; const send = function (msg: any): void { - if (useBuffer) { - msgBuffer.push(msg); // add to the buffer if the process cannot handle more messages + if (useQueue) { + msgQueue.push(msg); // add to the queue if the process cannot handle more messages return; } @@ -472,18 +469,18 @@ export function createBufferedSender(childProcess: ChildProcess | NodeJS.Process console.error(error); // unlikely to happen, best we can do is log this error } - useBuffer = false; // we are good again to send directly without buffer + useQueue = false; // we are good again to send directly without queue - // now send all the messages that we have in our buffer and did not send yet - if (msgBuffer.length > 0) { - const msgBufferCopy = msgBuffer.slice(0); - msgBuffer = []; - msgBufferCopy.forEach(entry => send(entry)); + // now send all the messages that we have in our queue and did not send yet + if (msgQueue.length > 0) { + const msgQueueCopy = msgQueue.slice(0); + msgQueue = []; + msgQueueCopy.forEach(entry => send(entry)); } }); - if (!result) { - useBuffer = true; // the process indicates that it could not directly consume the message so we need to use our buffer next time + if (!result || Platform.isWindows /* workaround https://github.com/nodejs/node/issues/7657 */) { + useQueue = true; } }; diff --git a/src/vs/base/test/node/processes/fixtures/fork.ts b/src/vs/base/test/node/processes/fixtures/fork.ts index b7220975577bb..f4ac8978d7743 100644 --- a/src/vs/base/test/node/processes/fixtures/fork.ts +++ b/src/vs/base/test/node/processes/fixtures/fork.ts @@ -7,7 +7,7 @@ import processes = require('vs/base/node/processes'); -const sender = processes.createBufferedSender(process); +const sender = processes.createQueuedSender(process); process.on('message', msg => { sender.send(msg); diff --git a/src/vs/base/test/node/processes/fixtures/fork_large.ts b/src/vs/base/test/node/processes/fixtures/fork_large.ts index 82886c05b6ebb..77ab54d8c0141 100644 --- a/src/vs/base/test/node/processes/fixtures/fork_large.ts +++ b/src/vs/base/test/node/processes/fixtures/fork_large.ts @@ -7,7 +7,7 @@ import processes = require('vs/base/node/processes'); -const sender = processes.createBufferedSender(process); +const sender = processes.createQueuedSender(process); process.on('message', msg => { sender.send(msg); diff --git a/src/vs/base/test/node/processes/processes.test.ts b/src/vs/base/test/node/processes/processes.test.ts index 4d2c372ec27db..ea0b1567dd397 100644 --- a/src/vs/base/test/node/processes/processes.test.ts +++ b/src/vs/base/test/node/processes/processes.test.ts @@ -26,7 +26,7 @@ function fork(id: string): cp.ChildProcess { suite('Processes', () => { test('buffered sending - simple data', function (done: () => void) { const child = fork('vs/base/test/node/processes/fixtures/fork'); - const sender = processes.createBufferedSender(child); + const sender = processes.createQueuedSender(child); const msg = 'Hello Child'; child.on('message', msgFromChild => { @@ -43,15 +43,13 @@ suite('Processes', () => { test('buffered sending - lots of data (potential deadlock on windows)', function (done: () => void) { const child = fork('vs/base/test/node/processes/fixtures/fork_large'); - const sender = processes.createBufferedSender(child); + const sender = processes.createQueuedSender(child); const largeObj = Object.create(null); for (let i = 0; i < 10000; i++) { largeObj[i] = 'some data'; } - let counter = 0; - const msg = JSON.stringify(largeObj); child.on('message', msgFromChild => { if (msgFromChild === 'ready') { From ae6a270ab054c0fde76188bb54ad6b5207a2d647 Mon Sep 17 00:00:00 2001 From: Benjamin Pasero Date: Sun, 16 Oct 2016 09:45:33 +0200 Subject: [PATCH 4/6] use queued sender for ext host communication --- src/vs/workbench/node/extensionHostProcess.ts | 10 +++++++--- .../extensions/electron-browser/extensionHost.ts | 11 +++++++---- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/vs/workbench/node/extensionHostProcess.ts b/src/vs/workbench/node/extensionHostProcess.ts index bdfe65f36f8d6..d278b1588537d 100644 --- a/src/vs/workbench/node/extensionHostProcess.ts +++ b/src/vs/workbench/node/extensionHostProcess.ts @@ -10,6 +10,7 @@ import { TPromise } from 'vs/base/common/winjs.base'; import { ExtensionHostMain, IInitData, exit } from 'vs/workbench/node/extensionHostMain'; import { create as createIPC, IMainProcessExtHostIPC } from 'vs/platform/extensions/common/ipcRemoteCom'; import marshalling = require('vs/base/common/marshalling'); +import { createQueuedSender } from 'vs/base/node/processes'; interface IRendererConnection { remoteCom: IMainProcessExtHostIPC; @@ -22,6 +23,9 @@ let onTerminate = function () { exit(); }; +// Utility to not flood the process.send() with messages if it is busy catching up +const queuedSender = createQueuedSender(process); + function connectToRenderer(): TPromise { return new TPromise((c, e) => { const stats: number[] = []; @@ -32,7 +36,7 @@ function connectToRenderer(): TPromise { let msg = marshalling.parse(raw); const remoteCom = createIPC(data => { - process.send(data); + queuedSender.send(data); stats.push(data.length); }); @@ -92,13 +96,13 @@ function connectToRenderer(): TPromise { }, 1000); // Tell the outside that we are initialized - process.send('initialized'); + queuedSender.send('initialized'); c({ remoteCom, initData: msg }); }); // Tell the outside that we are ready to receive messages - process.send('ready'); + queuedSender.send('ready'); }); } diff --git a/src/vs/workbench/services/extensions/electron-browser/extensionHost.ts b/src/vs/workbench/services/extensions/electron-browser/extensionHost.ts index 53b94a9c54c55..ea9fec3515cc8 100644 --- a/src/vs/workbench/services/extensions/electron-browser/extensionHost.ts +++ b/src/vs/workbench/services/extensions/electron-browser/extensionHost.ts @@ -29,6 +29,7 @@ import { ExtensionScanner, MessagesCollector } from 'vs/workbench/node/extension import { IMessagePassingProtocol } from 'vs/base/parts/ipc/common/ipc'; import Event, { Emitter } from 'vs/base/common/event'; import { IStorageService, StorageScope } from 'vs/platform/storage/common/storage'; +import { createQueuedSender, IQueuedSender } from 'vs/base/node/processes'; export const EXTENSION_LOG_BROADCAST_CHANNEL = 'vscode:extensionLog'; export const EXTENSION_ATTACH_BROADCAST_CHANNEL = 'vscode:extensionAttach'; @@ -47,6 +48,7 @@ export interface ILogEntry { export class ExtensionHostProcessWorker { private initializeExtensionHostProcess: TPromise; private extensionHostProcessHandle: ChildProcess; + private extensionHostProcessQueuedSender: IQueuedSender; private extensionHostProcessReady: boolean; private initializeTimer: number; @@ -126,6 +128,7 @@ export class ExtensionHostProcessWorker { // Run Extension Host as fork of current process this.extensionHostProcessHandle = fork(URI.parse(require.toUrl('bootstrap')).fsPath, ['--type=extensionHost'], opts); + this.extensionHostProcessQueuedSender = createQueuedSender(this.extensionHostProcessHandle); // Notify debugger that we are ready to attach to the process if we run a development extension if (this.isExtensionDevelopmentHost && port) { @@ -221,7 +224,7 @@ export class ExtensionHostProcessWorker { workspaceStoragePath: this.storageService.getStoragePath(StorageScope.WORKSPACE), extensions: extensionDescriptors }); - this.extensionHostProcessHandle.send(initPayload); + this.extensionHostProcessQueuedSender.send(initPayload); }); } @@ -351,9 +354,9 @@ export class ExtensionHostProcessWorker { public send(msg: any): void { if (this.extensionHostProcessReady) { - this.extensionHostProcessHandle.send(msg); + this.extensionHostProcessQueuedSender.send(msg); } else if (this.initializeExtensionHostProcess) { - this.initializeExtensionHostProcess.done(p => p.send(msg)); + this.initializeExtensionHostProcess.done(() => this.extensionHostProcessQueuedSender.send(msg)); } else { this.unsentMessages.push(msg); } @@ -363,7 +366,7 @@ export class ExtensionHostProcessWorker { this.terminating = true; if (this.extensionHostProcessHandle) { - this.extensionHostProcessHandle.send({ + this.extensionHostProcessQueuedSender.send({ type: '__$terminate' }); } From feacc72e72f11cf6666a96393bc9bfbf38e8959f Mon Sep 17 00:00:00 2001 From: Benjamin Pasero Date: Sun, 16 Oct 2016 10:03:00 +0200 Subject: [PATCH 5/6] trigger tests again --- src/vs/base/test/node/processes/processes.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/vs/base/test/node/processes/processes.test.ts b/src/vs/base/test/node/processes/processes.test.ts index ea0b1567dd397..dee8137b3a2ec 100644 --- a/src/vs/base/test/node/processes/processes.test.ts +++ b/src/vs/base/test/node/processes/processes.test.ts @@ -41,7 +41,7 @@ suite('Processes', () => { }); }); - test('buffered sending - lots of data (potential deadlock on windows)', function (done: () => void) { + test('buffered sending - lots of data (potential deadlock on win32)', function (done: () => void) { const child = fork('vs/base/test/node/processes/fixtures/fork_large'); const sender = processes.createQueuedSender(child); From 907cc5e7a47d77fbcf276bce6b12c8ab49bbb7dd Mon Sep 17 00:00:00 2001 From: Benjamin Pasero Date: Mon, 17 Oct 2016 07:47:38 +0200 Subject: [PATCH 6/6] more tests --- .../test/node/processes/processes.test.ts | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/src/vs/base/test/node/processes/processes.test.ts b/src/vs/base/test/node/processes/processes.test.ts index dee8137b3a2ec..cc10dfd4ab9a3 100644 --- a/src/vs/base/test/node/processes/processes.test.ts +++ b/src/vs/base/test/node/processes/processes.test.ts @@ -28,15 +28,30 @@ suite('Processes', () => { const child = fork('vs/base/test/node/processes/fixtures/fork'); const sender = processes.createQueuedSender(child); - const msg = 'Hello Child'; + let counter = 0; + + const msg1 = 'Hello One'; + const msg2 = 'Hello Two'; + const msg3 = 'Hello Three'; + child.on('message', msgFromChild => { if (msgFromChild === 'ready') { - sender.send(msg); + sender.send(msg1); + sender.send(msg2); + sender.send(msg3); } else { - assert.equal(msgFromChild, msg); + counter++; - child.kill(); - done(); + if (counter === 1) { + assert.equal(msgFromChild, msg1); + } else if (counter === 2) { + assert.equal(msgFromChild, msg2); + } else if (counter === 3) { + assert.equal(msgFromChild, msg3); + + child.kill(); + done(); + } } }); });