diff --git a/src/vs/base/node/processes.ts b/src/vs/base/node/processes.ts index 1fb759c976237..28ea85bff6f4d 100644 --- a/src/vs/base/node/processes.ts +++ b/src/vs/base/node/processes.ts @@ -9,11 +9,8 @@ import * as cp from 'child_process'; import ChildProcess = cp.ChildProcess; import exec = cp.exec; import spawn = cp.spawn; - import { PassThrough } from 'stream'; - -import { fork } from './stdFork'; - +import { fork } from 'vs/base/node/stdFork'; import nls = require('vs/nls'); import { PPromise, Promise, TPromise, TValueCallback, TProgressCallback, ErrorCallback } from 'vs/base/common/winjs.base'; import * as Types from 'vs/base/common/types'; @@ -22,7 +19,6 @@ import URI from 'vs/base/common/uri'; import * as Objects from 'vs/base/common/objects'; import * as TPath from 'vs/base/common/paths'; import * as Platform from 'vs/base/common/platform'; - import { LineDecoder } from 'vs/base/node/decoder'; import { CommandOptions, ForkOptions, SuccessData, Source, TerminateResponse, TerminateResponseCode, Executable } from 'vs/base/common/processes'; export { CommandOptions, ForkOptions, SuccessData, Source, TerminateResponse, TerminateResponseCode }; @@ -447,4 +443,46 @@ export class StreamProcess extends AbstractProcess { pp({ stdin: childProcess.stdin, stdout: childProcess.stdout, stderr: childProcess.stderr }); } } +} + +export interface IQueuedSender { + send: (msg: any) => void; +} + +// Wrapper around process.send() that will queue any messages if the internal node.js +// queue is filled with messages and only continue sending messages when the internal +// queue is free again to consume messages. +// On Windows we always wait for the send() method to return before sending the next message +// to workaround https://github.com/nodejs/node/issues/7657 (IPC can freeze process) +export function createQueuedSender(childProcess: ChildProcess | NodeJS.Process): IQueuedSender { + let msgQueue = []; + let useQueue = false; + + const send = function (msg: any): void { + if (useQueue) { + msgQueue.push(msg); // add to the queue if the process cannot handle more messages + return; + } + + let result = childProcess.send(msg, error => { + if (error) { + console.error(error); // unlikely to happen, best we can do is log this error + } + + useQueue = false; // we are good again to send directly without queue + + // now send all the messages that we have in our queue and did not send yet + if (msgQueue.length > 0) { + const msgQueueCopy = msgQueue.slice(0); + msgQueue = []; + msgQueueCopy.forEach(entry => send(entry)); + } + }); + + if (!result || Platform.isWindows /* workaround https://github.com/nodejs/node/issues/7657 */) { + useQueue = true; + } + }; + + return { send }; } \ No newline at end of file diff --git a/src/vs/base/test/node/processes/fixtures/fork.ts b/src/vs/base/test/node/processes/fixtures/fork.ts new file mode 100644 index 0000000000000..f4ac8978d7743 --- /dev/null +++ b/src/vs/base/test/node/processes/fixtures/fork.ts @@ -0,0 +1,16 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +'use strict'; + +import processes = require('vs/base/node/processes'); + +const sender = processes.createQueuedSender(process); + +process.on('message', msg => { + sender.send(msg); +}); + +sender.send('ready'); \ No newline at end of file diff --git a/src/vs/base/test/node/processes/fixtures/fork_large.ts b/src/vs/base/test/node/processes/fixtures/fork_large.ts new file mode 100644 index 0000000000000..77ab54d8c0141 --- /dev/null +++ b/src/vs/base/test/node/processes/fixtures/fork_large.ts @@ -0,0 +1,19 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +'use strict'; + +import processes = require('vs/base/node/processes'); + +const sender = processes.createQueuedSender(process); + +process.on('message', msg => { + sender.send(msg); + sender.send(msg); + sender.send(msg); + sender.send('done'); +}); + +sender.send('ready'); \ No newline at end of file diff --git a/src/vs/base/test/node/processes/processes.test.ts b/src/vs/base/test/node/processes/processes.test.ts new file mode 100644 index 0000000000000..cc10dfd4ab9a3 --- /dev/null +++ b/src/vs/base/test/node/processes/processes.test.ts @@ -0,0 +1,80 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +'use strict'; + +import * as assert from 'assert'; +import * as cp from 'child_process'; +import * as objects from 'vs/base/common/objects'; +import URI from 'vs/base/common/uri'; +import processes = require('vs/base/node/processes'); + +function fork(id: string): cp.ChildProcess { + const opts: any = { + env: objects.mixin(objects.clone(process.env), { + AMD_ENTRYPOINT: id, + PIPE_LOGGING: 'true', + VERBOSE_LOGGING: true + }) + }; + + return cp.fork(URI.parse(require.toUrl('bootstrap')).fsPath, ['--type=processTests'], opts); +} + +suite('Processes', () => { + test('buffered sending - simple data', function (done: () => void) { + const child = fork('vs/base/test/node/processes/fixtures/fork'); + const sender = processes.createQueuedSender(child); + + let counter = 0; + + const msg1 = 'Hello One'; + const msg2 = 'Hello Two'; + const msg3 = 'Hello Three'; + + child.on('message', msgFromChild => { + if (msgFromChild === 'ready') { + sender.send(msg1); + sender.send(msg2); + sender.send(msg3); + } else { + counter++; + + if (counter === 1) { + assert.equal(msgFromChild, msg1); + } else if (counter === 2) { + assert.equal(msgFromChild, msg2); + } else if (counter === 3) { + assert.equal(msgFromChild, msg3); + + child.kill(); + done(); + } + } + }); + }); + + test('buffered sending - lots of data (potential deadlock on win32)', function (done: () => void) { + const child = fork('vs/base/test/node/processes/fixtures/fork_large'); + const sender = processes.createQueuedSender(child); + + const largeObj = Object.create(null); + for (let i = 0; i < 10000; i++) { + largeObj[i] = 'some data'; + } + + const msg = JSON.stringify(largeObj); + child.on('message', msgFromChild => { + if (msgFromChild === 'ready') { + sender.send(msg); + sender.send(msg); + sender.send(msg); + } else if (msgFromChild === 'done') { + child.kill(); + done(); + } + }); + }); +}); \ No newline at end of file diff --git a/src/vs/workbench/node/extensionHostProcess.ts b/src/vs/workbench/node/extensionHostProcess.ts index bdfe65f36f8d6..d278b1588537d 100644 --- a/src/vs/workbench/node/extensionHostProcess.ts +++ b/src/vs/workbench/node/extensionHostProcess.ts @@ -10,6 +10,7 @@ import { TPromise } from 'vs/base/common/winjs.base'; import { ExtensionHostMain, IInitData, exit } from 'vs/workbench/node/extensionHostMain'; import { create as createIPC, IMainProcessExtHostIPC } from 'vs/platform/extensions/common/ipcRemoteCom'; import marshalling = require('vs/base/common/marshalling'); +import { createQueuedSender } from 'vs/base/node/processes'; interface IRendererConnection { remoteCom: IMainProcessExtHostIPC; @@ -22,6 +23,9 @@ let onTerminate = function () { exit(); }; +// Utility to not flood the process.send() with messages if it is busy catching up +const queuedSender = createQueuedSender(process); + function connectToRenderer(): TPromise { return new TPromise((c, e) => { const stats: number[] = []; @@ -32,7 +36,7 @@ function connectToRenderer(): TPromise { let msg = marshalling.parse(raw); const remoteCom = createIPC(data => { - process.send(data); + queuedSender.send(data); stats.push(data.length); }); @@ -92,13 +96,13 @@ function connectToRenderer(): TPromise { }, 1000); // Tell the outside that we are initialized - process.send('initialized'); + queuedSender.send('initialized'); c({ remoteCom, initData: msg }); }); // Tell the outside that we are ready to receive messages - process.send('ready'); + queuedSender.send('ready'); }); } diff --git a/src/vs/workbench/services/extensions/electron-browser/extensionHost.ts b/src/vs/workbench/services/extensions/electron-browser/extensionHost.ts index 53b94a9c54c55..ea9fec3515cc8 100644 --- a/src/vs/workbench/services/extensions/electron-browser/extensionHost.ts +++ b/src/vs/workbench/services/extensions/electron-browser/extensionHost.ts @@ -29,6 +29,7 @@ import { ExtensionScanner, MessagesCollector } from 'vs/workbench/node/extension import { IMessagePassingProtocol } from 'vs/base/parts/ipc/common/ipc'; import Event, { Emitter } from 'vs/base/common/event'; import { IStorageService, StorageScope } from 'vs/platform/storage/common/storage'; +import { createQueuedSender, IQueuedSender } from 'vs/base/node/processes'; export const EXTENSION_LOG_BROADCAST_CHANNEL = 'vscode:extensionLog'; export const EXTENSION_ATTACH_BROADCAST_CHANNEL = 'vscode:extensionAttach'; @@ -47,6 +48,7 @@ export interface ILogEntry { export class ExtensionHostProcessWorker { private initializeExtensionHostProcess: TPromise; private extensionHostProcessHandle: ChildProcess; + private extensionHostProcessQueuedSender: IQueuedSender; private extensionHostProcessReady: boolean; private initializeTimer: number; @@ -126,6 +128,7 @@ export class ExtensionHostProcessWorker { // Run Extension Host as fork of current process this.extensionHostProcessHandle = fork(URI.parse(require.toUrl('bootstrap')).fsPath, ['--type=extensionHost'], opts); + this.extensionHostProcessQueuedSender = createQueuedSender(this.extensionHostProcessHandle); // Notify debugger that we are ready to attach to the process if we run a development extension if (this.isExtensionDevelopmentHost && port) { @@ -221,7 +224,7 @@ export class ExtensionHostProcessWorker { workspaceStoragePath: this.storageService.getStoragePath(StorageScope.WORKSPACE), extensions: extensionDescriptors }); - this.extensionHostProcessHandle.send(initPayload); + this.extensionHostProcessQueuedSender.send(initPayload); }); } @@ -351,9 +354,9 @@ export class ExtensionHostProcessWorker { public send(msg: any): void { if (this.extensionHostProcessReady) { - this.extensionHostProcessHandle.send(msg); + this.extensionHostProcessQueuedSender.send(msg); } else if (this.initializeExtensionHostProcess) { - this.initializeExtensionHostProcess.done(p => p.send(msg)); + this.initializeExtensionHostProcess.done(() => this.extensionHostProcessQueuedSender.send(msg)); } else { this.unsentMessages.push(msg); } @@ -363,7 +366,7 @@ export class ExtensionHostProcessWorker { this.terminating = true; if (this.extensionHostProcessHandle) { - this.extensionHostProcessHandle.send({ + this.extensionHostProcessQueuedSender.send({ type: '__$terminate' }); }