Skip to content

Commit

Permalink
Merge pull request #13763 from Microsoft/ben/buffered-process-send
Browse files Browse the repository at this point in the history
Add a queue for process.send
  • Loading branch information
bpasero authored Oct 17, 2016
2 parents 82372ef + 907cc5e commit c613e1b
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 12 deletions.
48 changes: 43 additions & 5 deletions src/vs/base/node/processes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,8 @@ import * as cp from 'child_process';
import ChildProcess = cp.ChildProcess;
import exec = cp.exec;
import spawn = cp.spawn;

import { PassThrough } from 'stream';

import { fork } from './stdFork';

import { fork } from 'vs/base/node/stdFork';
import nls = require('vs/nls');
import { PPromise, Promise, TPromise, TValueCallback, TProgressCallback, ErrorCallback } from 'vs/base/common/winjs.base';
import * as Types from 'vs/base/common/types';
Expand All @@ -22,7 +19,6 @@ import URI from 'vs/base/common/uri';
import * as Objects from 'vs/base/common/objects';
import * as TPath from 'vs/base/common/paths';
import * as Platform from 'vs/base/common/platform';

import { LineDecoder } from 'vs/base/node/decoder';
import { CommandOptions, ForkOptions, SuccessData, Source, TerminateResponse, TerminateResponseCode, Executable } from 'vs/base/common/processes';
export { CommandOptions, ForkOptions, SuccessData, Source, TerminateResponse, TerminateResponseCode };
Expand Down Expand Up @@ -447,4 +443,46 @@ export class StreamProcess extends AbstractProcess<StreamData> {
pp({ stdin: childProcess.stdin, stdout: childProcess.stdout, stderr: childProcess.stderr });
}
}
}

export interface IQueuedSender {
send: (msg: any) => void;
}

// Wrapper around process.send() that will queue any messages if the internal node.js
// queue is filled with messages and only continue sending messages when the internal
// queue is free again to consume messages.
// On Windows we always wait for the send() method to return before sending the next message
// to workaround https://github.com/nodejs/node/issues/7657 (IPC can freeze process)
export function createQueuedSender(childProcess: ChildProcess | NodeJS.Process): IQueuedSender {
let msgQueue = [];
let useQueue = false;

const send = function (msg: any): void {
if (useQueue) {
msgQueue.push(msg); // add to the queue if the process cannot handle more messages
return;
}

let result = childProcess.send(msg, error => {
if (error) {
console.error(error); // unlikely to happen, best we can do is log this error
}

useQueue = false; // we are good again to send directly without queue

// now send all the messages that we have in our queue and did not send yet
if (msgQueue.length > 0) {
const msgQueueCopy = msgQueue.slice(0);
msgQueue = [];
msgQueueCopy.forEach(entry => send(entry));
}
});

if (!result || Platform.isWindows /* workaround https://github.com/nodejs/node/issues/7657 */) {
useQueue = true;
}
};

return { send };
}
16 changes: 16 additions & 0 deletions src/vs/base/test/node/processes/fixtures/fork.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

'use strict';

import processes = require('vs/base/node/processes');

const sender = processes.createQueuedSender(process);

process.on('message', msg => {
sender.send(msg);
});

sender.send('ready');
19 changes: 19 additions & 0 deletions src/vs/base/test/node/processes/fixtures/fork_large.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

'use strict';

import processes = require('vs/base/node/processes');

const sender = processes.createQueuedSender(process);

process.on('message', msg => {
sender.send(msg);
sender.send(msg);
sender.send(msg);
sender.send('done');
});

sender.send('ready');
80 changes: 80 additions & 0 deletions src/vs/base/test/node/processes/processes.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

'use strict';

import * as assert from 'assert';
import * as cp from 'child_process';
import * as objects from 'vs/base/common/objects';
import URI from 'vs/base/common/uri';
import processes = require('vs/base/node/processes');

function fork(id: string): cp.ChildProcess {
const opts: any = {
env: objects.mixin(objects.clone(process.env), {
AMD_ENTRYPOINT: id,
PIPE_LOGGING: 'true',
VERBOSE_LOGGING: true
})
};

return cp.fork(URI.parse(require.toUrl('bootstrap')).fsPath, ['--type=processTests'], opts);
}

suite('Processes', () => {
test('buffered sending - simple data', function (done: () => void) {
const child = fork('vs/base/test/node/processes/fixtures/fork');
const sender = processes.createQueuedSender(child);

let counter = 0;

const msg1 = 'Hello One';
const msg2 = 'Hello Two';
const msg3 = 'Hello Three';

child.on('message', msgFromChild => {
if (msgFromChild === 'ready') {
sender.send(msg1);
sender.send(msg2);
sender.send(msg3);
} else {
counter++;

if (counter === 1) {
assert.equal(msgFromChild, msg1);
} else if (counter === 2) {
assert.equal(msgFromChild, msg2);
} else if (counter === 3) {
assert.equal(msgFromChild, msg3);

child.kill();
done();
}
}
});
});

test('buffered sending - lots of data (potential deadlock on win32)', function (done: () => void) {
const child = fork('vs/base/test/node/processes/fixtures/fork_large');
const sender = processes.createQueuedSender(child);

const largeObj = Object.create(null);
for (let i = 0; i < 10000; i++) {
largeObj[i] = 'some data';
}

const msg = JSON.stringify(largeObj);
child.on('message', msgFromChild => {
if (msgFromChild === 'ready') {
sender.send(msg);
sender.send(msg);
sender.send(msg);
} else if (msgFromChild === 'done') {
child.kill();
done();
}
});
});
});
10 changes: 7 additions & 3 deletions src/vs/workbench/node/extensionHostProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { TPromise } from 'vs/base/common/winjs.base';
import { ExtensionHostMain, IInitData, exit } from 'vs/workbench/node/extensionHostMain';
import { create as createIPC, IMainProcessExtHostIPC } from 'vs/platform/extensions/common/ipcRemoteCom';
import marshalling = require('vs/base/common/marshalling');
import { createQueuedSender } from 'vs/base/node/processes';

interface IRendererConnection {
remoteCom: IMainProcessExtHostIPC;
Expand All @@ -22,6 +23,9 @@ let onTerminate = function () {
exit();
};

// Utility to not flood the process.send() with messages if it is busy catching up
const queuedSender = createQueuedSender(process);

function connectToRenderer(): TPromise<IRendererConnection> {
return new TPromise<IRendererConnection>((c, e) => {
const stats: number[] = [];
Expand All @@ -32,7 +36,7 @@ function connectToRenderer(): TPromise<IRendererConnection> {
let msg = marshalling.parse(raw);

const remoteCom = createIPC(data => {
process.send(data);
queuedSender.send(data);
stats.push(data.length);
});

Expand Down Expand Up @@ -92,13 +96,13 @@ function connectToRenderer(): TPromise<IRendererConnection> {
}, 1000);

// Tell the outside that we are initialized
process.send('initialized');
queuedSender.send('initialized');

c({ remoteCom, initData: msg });
});

// Tell the outside that we are ready to receive messages
process.send('ready');
queuedSender.send('ready');
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { ExtensionScanner, MessagesCollector } from 'vs/workbench/node/extension
import { IMessagePassingProtocol } from 'vs/base/parts/ipc/common/ipc';
import Event, { Emitter } from 'vs/base/common/event';
import { IStorageService, StorageScope } from 'vs/platform/storage/common/storage';
import { createQueuedSender, IQueuedSender } from 'vs/base/node/processes';

export const EXTENSION_LOG_BROADCAST_CHANNEL = 'vscode:extensionLog';
export const EXTENSION_ATTACH_BROADCAST_CHANNEL = 'vscode:extensionAttach';
Expand All @@ -47,6 +48,7 @@ export interface ILogEntry {
export class ExtensionHostProcessWorker {
private initializeExtensionHostProcess: TPromise<ChildProcess>;
private extensionHostProcessHandle: ChildProcess;
private extensionHostProcessQueuedSender: IQueuedSender;
private extensionHostProcessReady: boolean;
private initializeTimer: number;

Expand Down Expand Up @@ -126,6 +128,7 @@ export class ExtensionHostProcessWorker {

// Run Extension Host as fork of current process
this.extensionHostProcessHandle = fork(URI.parse(require.toUrl('bootstrap')).fsPath, ['--type=extensionHost'], opts);
this.extensionHostProcessQueuedSender = createQueuedSender(this.extensionHostProcessHandle);

// Notify debugger that we are ready to attach to the process if we run a development extension
if (this.isExtensionDevelopmentHost && port) {
Expand Down Expand Up @@ -221,7 +224,7 @@ export class ExtensionHostProcessWorker {
workspaceStoragePath: this.storageService.getStoragePath(StorageScope.WORKSPACE),
extensions: extensionDescriptors
});
this.extensionHostProcessHandle.send(initPayload);
this.extensionHostProcessQueuedSender.send(initPayload);
});
}

Expand Down Expand Up @@ -351,9 +354,9 @@ export class ExtensionHostProcessWorker {

public send(msg: any): void {
if (this.extensionHostProcessReady) {
this.extensionHostProcessHandle.send(msg);
this.extensionHostProcessQueuedSender.send(msg);
} else if (this.initializeExtensionHostProcess) {
this.initializeExtensionHostProcess.done(p => p.send(msg));
this.initializeExtensionHostProcess.done(() => this.extensionHostProcessQueuedSender.send(msg));
} else {
this.unsentMessages.push(msg);
}
Expand All @@ -363,7 +366,7 @@ export class ExtensionHostProcessWorker {
this.terminating = true;

if (this.extensionHostProcessHandle) {
this.extensionHostProcessHandle.send({
this.extensionHostProcessQueuedSender.send({
type: '__$terminate'
});
}
Expand Down

0 comments on commit c613e1b

Please sign in to comment.