Skip to content

Commit

Permalink
fix eclipse-theia#7176: gracefully terminate plugin host process with…
Browse files Browse the repository at this point in the history
…out rpc connection

Signed-off-by: Anton Kosyakov <[email protected]>
  • Loading branch information
akosyakov authored and Sean Hellum committed Mar 12, 2020
1 parent 3ff9784 commit b7b7948
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 50 deletions.
30 changes: 23 additions & 7 deletions packages/plugin-ext/src/common/rpc-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ export function createProxyIdentifier<T>(identifier: string): ProxyIdentifier<T>
return new ProxyIdentifier(false, identifier);
}

export interface ConnectionClosedError extends Error {
code: 'RPC_PROTOCOL_CLOSED'
}
export namespace ConnectionClosedError {
const code: ConnectionClosedError['code'] = 'RPC_PROTOCOL_CLOSED';
export function create(message: string = 'connection is closed'): ConnectionClosedError {
return Object.assign(new Error(message), { code });
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function is(error: any): error is ConnectionClosedError {
return !!error && typeof error === 'object' && 'code' in error && error['code'] === code;
}
}

export class RPCProtocolImpl implements RPCProtocol {

private readonly locals = new Map<string, any>();
Expand All @@ -82,7 +96,7 @@ export class RPCProtocolImpl implements RPCProtocol {
this.toDispose.push(Disposable.create(() => {
this.proxies.clear();
for (const reply of this.pendingRPCReplies.values()) {
reply.reject(new Error('connection is closed'));
reply.reject(ConnectionClosedError.create());
}
this.pendingRPCReplies.clear();
}));
Expand All @@ -98,7 +112,7 @@ export class RPCProtocolImpl implements RPCProtocol {

getProxy<T>(proxyId: ProxyIdentifier<T>): T {
if (this.isDisposed) {
throw new Error('connection is closed');
throw ConnectionClosedError.create();
}
let proxy = this.proxies.get(proxyId.id);
if (!proxy) {
Expand All @@ -110,7 +124,7 @@ export class RPCProtocolImpl implements RPCProtocol {

set<T, R extends T>(identifier: ProxyIdentifier<T>, instance: R): R {
if (this.isDisposed) {
throw new Error('connection is closed');
throw ConnectionClosedError.create();
}
this.locals.set(identifier.id, instance);
if (Disposable.is(instance)) {
Expand All @@ -135,7 +149,7 @@ export class RPCProtocolImpl implements RPCProtocol {

private remoteCall(proxyId: string, methodName: string, args: any[]): Promise<any> {
if (this.isDisposed) {
return Promise.reject(new Error('connection is closed'));
return Promise.reject(ConnectionClosedError.create());
}
const cancellationToken: CancellationToken | undefined = args.length && CancellationToken.is(args[args.length - 1]) ? args.pop() : undefined;
if (cancellationToken && cancellationToken.isCancellationRequested) {
Expand Down Expand Up @@ -320,7 +334,7 @@ class RPCMultiplexer implements Disposable {

public send(msg: string): void {
if (this.toDispose.disposed) {
throw new Error('connection is closed');
throw ConnectionClosedError.create();
}
if (this.messagesToSend.length === 0) {
if (typeof setImmediate !== 'undefined') {
Expand Down Expand Up @@ -460,11 +474,13 @@ function isSerializedObject(obj: any): obj is SerializedObject {
return obj && obj.$type !== undefined && obj.data !== undefined;
}

const enum MessageType {
export const enum MessageType {
Request = 1,
Reply = 2,
ReplyErr = 3,
Cancel = 4
Cancel = 4,
Terminate = 5,
Terminated = 6
}

class CancelMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,58 @@ import { injectable } from 'inversify';
import { Argv, Arguments } from 'yargs';
import { CliContribution } from '@theia/core/lib/node';

let pluginHostTerminateTimeout = 10 * 1000;
if (process.env.PLUGIN_HOST_TERMINATE_TIMEOUT) {
pluginHostTerminateTimeout = Number.parseInt(process.env.PLUGIN_HOST_TERMINATE_TIMEOUT);
}

let pluginHostStopTimeout = 4 * 1000;
if (process.env.PLUGIN_HOST_STOP_TIMEOUT) {
pluginHostStopTimeout = Number.parseInt(process.env.PLUGIN_HOST_STOP_TIMEOUT);
}

@injectable()
export class HostedPluginCliContribution implements CliContribution {

static EXTENSION_TESTS_PATH = 'extensionTestsPath';
static PLUGIN_HOST_TERMINATE_TIMEOUT = 'pluginHostTerminateTimeout';
static PLUGIN_HOST_STOP_TIMEOUT = 'pluginHostStopTimeout';

protected _extensionTestsPath: string | undefined;
get extensionTestsPath(): string | undefined {
return this._extensionTestsPath;
}

protected _pluginHostTerminateTimeout = pluginHostTerminateTimeout;
get pluginHostTerminateTimeout(): number {
return this._pluginHostTerminateTimeout;
}

protected _pluginHostStopTimeout = pluginHostStopTimeout;
get pluginHostStopTimeout(): number {
return this._pluginHostStopTimeout;
}

configure(conf: Argv): void {
conf.option(HostedPluginCliContribution.EXTENSION_TESTS_PATH, {
type: 'string'
});
conf.option(HostedPluginCliContribution.PLUGIN_HOST_TERMINATE_TIMEOUT, {
type: 'number',
default: pluginHostTerminateTimeout,
description: 'Timeout in milliseconds to wait for the plugin host process to terminate before killing it. Use 0 for no timeout.'
});
conf.option(HostedPluginCliContribution.PLUGIN_HOST_STOP_TIMEOUT, {
type: 'number',
default: pluginHostStopTimeout,
description: 'Timeout in milliseconds to wait for the plugin host process to stop internal services. Use 0 for no timeout.'
});
}

setArguments(args: Arguments): void {
this._extensionTestsPath = args[HostedPluginCliContribution.EXTENSION_TESTS_PATH];
this._pluginHostTerminateTimeout = args[HostedPluginCliContribution.PLUGIN_HOST_TERMINATE_TIMEOUT];
this._pluginHostStopTimeout = args[HostedPluginCliContribution.PLUGIN_HOST_STOP_TIMEOUT];
}

}
60 changes: 37 additions & 23 deletions packages/plugin-ext/src/hosted/node/hosted-plugin-process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ import * as path from 'path';
import * as cp from 'child_process';
import { injectable, inject, named } from 'inversify';
import { ILogger, ConnectionErrorHandler, ContributionProvider, MessageService } from '@theia/core/lib/common';
import { Emitter } from '@theia/core/lib/common/event';
import { createIpcEnv } from '@theia/core/lib/node/messaging/ipc-protocol';
import { HostedPluginClient, ServerPluginRunner, PluginHostEnvironmentVariable, DeployedPlugin } from '../../common/plugin-protocol';
import { RPCProtocolImpl } from '../../common/rpc-protocol';
import { MAIN_RPC_CONTEXT } from '../../common/plugin-api-rpc';
import { MessageType } from '../../common/rpc-protocol';
import { HostedPluginCliContribution } from './hosted-plugin-cli-contribution';
import * as psTree from 'ps-tree';
import { Deferred } from '@theia/core/lib/common/promise-util';

export interface IPCConnectionOptions {
readonly serverName: string;
Expand Down Expand Up @@ -83,7 +82,7 @@ export class HostedPluginProcess implements ServerPluginRunner {
}
}

public terminatePluginServer(): void {
async terminatePluginServer(): Promise<void> {
if (this.childProcess === undefined) {
return;
}
Expand All @@ -93,34 +92,49 @@ export class HostedPluginProcess implements ServerPluginRunner {
const cp = this.childProcess;
this.childProcess = undefined;

const emitter = new Emitter();
const waitForTerminated = new Deferred<void>();
cp.on('message', message => {
emitter.fire(JSON.parse(message));
});
const rpc = new RPCProtocolImpl({
onMessage: emitter.event,
send: (m: {}) => {
if (cp.send) {
cp.send(JSON.stringify(m));
}
const msg = JSON.parse(message);
if ('type' in msg && msg.type === MessageType.Terminated) {
waitForTerminated.resolve();
}
});
const hostedPluginManager = rpc.getProxy(MAIN_RPC_CONTEXT.HOSTED_PLUGIN_MANAGER_EXT);
hostedPluginManager.$stop().then(() => {
emitter.dispose();
this.killProcessTree(cp.pid);
});
const stopTimeout = this.cli.pluginHostStopTimeout;
cp.send(JSON.stringify({ type: MessageType.Terminate, stopTimeout }));

const terminateTimeout = this.cli.pluginHostTerminateTimeout;
if (terminateTimeout) {
await Promise.race([
waitForTerminated.promise,
new Promise(resolve => setTimeout(resolve, terminateTimeout))
]);
} else {
await waitForTerminated.promise;
}

this.killProcessTree(cp.pid);
}

private killProcessTree(parentPid: number): void {
psTree(parentPid, (err: Error, childProcesses: Array<psTree.PS>) => {
childProcesses.forEach((p: psTree.PS) => {
process.kill(parseInt(p.PID));
});
process.kill(parentPid);
psTree(parentPid, (_, childProcesses) => {
childProcesses.forEach(childProcess =>
this.killProcess(parseInt(childProcess.PID))
);
this.killProcess(parentPid);
});
}

protected killProcess(pid: number): void {
try {
process.kill(pid);
} catch (e) {
if (e && 'code' in e && e.code === 'ESRCH') {
return;
}
this.logger.error(`[${pid}] failed to kill`, e);
}
}

public runPluginServer(): void {
if (this.childProcess) {
this.terminatePluginServer();
Expand Down
4 changes: 4 additions & 0 deletions packages/plugin-ext/src/hosted/node/plugin-host-rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ export class PluginHostRPC {
);
}

async terminate(): Promise<void> {
await this.pluginManager.terminate();
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
initContext(contextPath: string, plugin: Plugin): any {
const { name, version } = plugin.rawModel;
Expand Down
34 changes: 30 additions & 4 deletions packages/plugin-ext/src/hosted/node/plugin-host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
********************************************************************************/

import { Emitter } from '@theia/core/lib/common/event';
import { RPCProtocolImpl } from '../../common/rpc-protocol';
import { RPCProtocolImpl, MessageType, ConnectionClosedError } from '../../common/rpc-protocol';
import { PluginHostRPC } from './plugin-host-rpc';
console.log('PLUGIN_HOST(' + process.pid + ') starting instance');

Expand Down Expand Up @@ -50,6 +50,10 @@ process.on('unhandledRejection', (reason: any, promise: Promise<any>) => {
if (index >= 0) {
promise.catch(err => {
unhandledPromises.splice(index, 1);
if (terminating && (ConnectionClosedError.is(err) || ConnectionClosedError.is(reason))) {
// during termination it is expected that pending rpc requerst are rejected
return;
}
console.error(`Promise rejection not handled in one second: ${err} , reason: ${reason}`);
if (err && err.stack) {
console.error(`With stack trace: ${err.stack}`);
Expand All @@ -67,19 +71,41 @@ process.on('rejectionHandled', (promise: Promise<any>) => {
}
});

let terminating = false;
const emitter = new Emitter();
const rpc = new RPCProtocolImpl({
onMessage: emitter.event,
send: (m: {}) => {
if (process.send) {
if (process.send && !terminating) {
process.send(JSON.stringify(m));
}
}
});

process.on('message', (message: string) => {
process.on('message', async (message: string) => {
if (terminating) {
return;
}
try {
emitter.fire(JSON.parse(message));
const msg = JSON.parse(message);
if ('type' in msg && msg.type === MessageType.Terminate) {
terminating = true;
emitter.dispose();
if ('stopTimeout' in msg && typeof msg.stopTimeout === 'number' && msg.stopTimeout) {
await Promise.race([
pluginHostRPC.terminate(),
new Promise(resolve => setTimeout(resolve, msg.stopTimeout))
]);
} else {
await pluginHostRPC.terminate();
}
rpc.dispose();
if (process.send) {
process.send(JSON.stringify({ type: MessageType.Terminated }));
}
} else {
emitter.fire(msg);
}
} catch (e) {
console.error(e);
}
Expand Down
4 changes: 0 additions & 4 deletions packages/plugin-ext/src/plugin/command-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ export class CommandRegistryImpl implements CommandRegistryExt {
});
}

dispose(): void {
throw new Error('Method not implemented.');
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
$executeCommand<T>(id: string, ...args: any[]): PromiseLike<T | undefined> {
if (this.handlers.has(id)) {
Expand Down
Loading

0 comments on commit b7b7948

Please sign in to comment.