Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): allow isolated plugins to shut themselves down #27317

Merged
merged 3 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 26 additions & 19 deletions packages/nx/src/daemon/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,40 @@ export async function writeDaemonJsonProcessCache(
await writeJson(serverProcessJsonPath, daemonJson);
}

export async function waitForDaemonToExitAndCleanupProcessJson(): Promise<void> {
const daemonProcessJson = await readDaemonProcessJsonCache();
if (daemonProcessJson && daemonProcessJson.processId) {
await new Promise<void>((resolve, reject) => {
let count = 0;
const interval = setInterval(() => {
try {
// sending a signal 0 to a process checks if the process is running instead of actually killing it
process.kill(daemonProcessJson.processId, 0);
} catch (e) {
clearInterval(interval);
resolve();
}
if ((count += 1) > 200) {
clearInterval(interval);
reject(
`Daemon process ${daemonProcessJson.processId} didn't exit after 2 seconds.`
);
}
}, 10);
});
deleteDaemonJsonProcessCache();
}
}

export async function safelyCleanUpExistingProcess(): Promise<void> {
const daemonProcessJson = await readDaemonProcessJsonCache();
if (daemonProcessJson && daemonProcessJson.processId) {
try {
process.kill(daemonProcessJson.processId);
// we wait for the process to actually shut down before returning
await new Promise<void>((resolve, reject) => {
let count = 0;
const interval = setInterval(() => {
try {
// sending a signal 0 to a process checks if the process is running instead of actually killing it
process.kill(daemonProcessJson.processId, 0);
} catch (e) {
clearInterval(interval);
resolve();
}
if ((count += 1) > 200) {
clearInterval(interval);
reject(
`Daemon process ${daemonProcessJson.processId} didn't exit after 2 seconds.`
);
}
}, 10);
});
await waitForDaemonToExitAndCleanupProcessJson();
} catch {}
}
deleteDaemonJsonProcessCache();
}

// Must be sync for the help output use case
Expand Down
13 changes: 10 additions & 3 deletions packages/nx/src/daemon/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { hasNxJson, NxJsonConfiguration } from '../../config/nx-json';
import { readNxJson } from '../../config/configuration';
import { PromisedBasedQueue } from '../../utils/promised-based-queue';
import { DaemonSocketMessenger, Message } from './daemon-socket-messenger';
import { safelyCleanUpExistingProcess } from '../cache';
import { waitForDaemonToExitAndCleanupProcessJson } from '../cache';
import { Hash } from '../../hasher/task-hasher';
import { Task, TaskGraph } from '../../config/task-graph';
import { ConfigurationSourceMaps } from '../../project-graph/utils/project-configuration-utils';
Expand Down Expand Up @@ -48,6 +48,7 @@ import {
HandleGetTaskHistoryForHashesMessage,
HandleWriteTaskRunsToHistoryMessage,
} from '../message-types/task-history';
import { FORCE_SHUTDOWN } from '../message-types/force-shutdown';

const DAEMON_ENV_SETTINGS = {
NX_PROJECT_GLOB_CACHE: 'false',
Expand Down Expand Up @@ -439,7 +440,10 @@ export class DaemonClient {
} else if (this._daemonStatus == DaemonStatus.CONNECTING) {
await this._waitForDaemonReady;
}

// An open promise isn't enough to keep the event loop
// alive, so we set a timeout here and clear it when we hear
// back
const keepAlive = setTimeout(() => {}, 10 * 60 * 1000);
return new Promise((resolve, reject) => {
performance.mark('sendMessageToDaemon-start');

Expand All @@ -448,6 +452,8 @@ export class DaemonClient {
this.currentReject = reject;

this.socketMessenger.sendMessage(message);
}).finally(() => {
clearTimeout(keepAlive);
});
}

Expand Down Expand Up @@ -541,7 +547,8 @@ export class DaemonClient {

async stop(): Promise<void> {
try {
await safelyCleanUpExistingProcess();
await this.sendMessageToDaemon({ type: FORCE_SHUTDOWN });
await waitForDaemonToExitAndCleanupProcessJson();
} catch (err) {
output.error({
title:
Expand Down
16 changes: 16 additions & 0 deletions packages/nx/src/daemon/message-types/force-shutdown.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export const FORCE_SHUTDOWN = 'FORCE_SHUTDOWN' as const;

export type HandleForceShutdownMessage = {
type: typeof FORCE_SHUTDOWN;
};

export function isHandleForceShutdownMessage(
message: unknown
): message is HandleForceShutdownMessage {
return (
typeof message === 'object' &&
message !== null &&
'type' in message &&
message['type'] === FORCE_SHUTDOWN
);
}
17 changes: 17 additions & 0 deletions packages/nx/src/daemon/server/handle-force-shutdown.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Server } from 'net';
import { handleServerProcessTermination } from './shutdown-utils';
import { openSockets } from './server';

export async function handleForceShutdown(server: Server) {
setTimeout(async () => {
await handleServerProcessTermination({
server,
reason: 'Request to shutdown',
sockets: openSockets,
});
});
return {
description: 'Shutdown initiated',
response: '{}',
};
}
2 changes: 2 additions & 0 deletions packages/nx/src/daemon/server/handle-request-shutdown.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Server } from 'net';
import { handleServerProcessTermination } from './shutdown-utils';
import { openSockets } from './server';

export async function handleRequestShutdown(
server: Server,
Expand All @@ -16,6 +17,7 @@ export async function handleRequestShutdown(
await handleServerProcessTermination({
server,
reason: 'Request to shutdown',
sockets: openSockets,
});
}, 0);
return {
Expand Down
14 changes: 14 additions & 0 deletions packages/nx/src/daemon/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ import {
} from '../message-types/task-history';
import { handleGetTaskHistoryForHashes } from './handle-get-task-history';
import { handleWriteTaskRunsToHistory } from './handle-write-task-runs-to-history';
import { isHandleForceShutdownMessage } from '../message-types/force-shutdown';
import { handleForceShutdown } from './handle-force-shutdown';

let performanceObserver: PerformanceObserver | undefined;
let workspaceWatcherError: Error | undefined;
Expand All @@ -90,9 +92,11 @@ export type HandlerResult = {
};

let numberOfOpenConnections = 0;
export const openSockets: Set<Socket> = new Set();

const server = createServer(async (socket) => {
numberOfOpenConnections += 1;
openSockets.add(socket);
serverLogger.log(
`Established a connection. Number of open connections: ${numberOfOpenConnections}`
);
Expand All @@ -119,6 +123,7 @@ const server = createServer(async (socket) => {

socket.on('close', () => {
numberOfOpenConnections -= 1;
openSockets.delete(socket);
serverLogger.log(
`Closed a connection. Number of open connections: ${numberOfOpenConnections}`
);
Expand Down Expand Up @@ -216,6 +221,10 @@ async function handleMessage(socket, data: string) {
await handleResult(socket, 'WRITE_TASK_RUNS_TO_HISTORY', () =>
handleWriteTaskRunsToHistory(payload.taskRuns)
);
} else if (isHandleForceShutdownMessage(payload)) {
await handleResult(socket, 'FORCE_SHUTDOWN', () =>
handleForceShutdown(server)
);
} else {
await respondWithErrorAndExit(
socket,
Expand Down Expand Up @@ -256,6 +265,7 @@ function handleInactivityTimeout() {
handleServerProcessTermination({
server,
reason: `${SERVER_INACTIVITY_TIMEOUT_MS}ms of inactivity`,
sockets: openSockets,
});
}
}
Expand All @@ -266,18 +276,21 @@ function registerProcessTerminationListeners() {
handleServerProcessTermination({
server,
reason: 'received process SIGINT',
sockets: openSockets,
})
)
.on('SIGTERM', () =>
handleServerProcessTermination({
server,
reason: 'received process SIGTERM',
sockets: openSockets,
})
)
.on('SIGHUP', () =>
handleServerProcessTermination({
server,
reason: 'received process SIGHUP',
sockets: openSockets,
})
);
}
Expand Down Expand Up @@ -352,6 +365,7 @@ const handleWorkspaceChanges: FileWatcherCallback = async (
await handleServerProcessTermination({
server,
reason: outdatedReason,
sockets: openSockets,
});
return;
}
Expand Down
17 changes: 14 additions & 3 deletions packages/nx/src/daemon/server/shutdown-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,24 @@ export function getOutputWatcherInstance() {
interface HandleServerProcessTerminationParams {
server: Server;
reason: string;
sockets: Iterable<Socket>;
}

export async function handleServerProcessTermination({
server,
reason,
sockets,
}: HandleServerProcessTerminationParams) {
try {
server.close();
deleteDaemonJsonProcessCache();
cleanupPlugins();
await new Promise((res) => {
server.close(() => {
res(null);
});

for (const socket of sockets) {
socket.destroy();
}
});

if (watcherInstance) {
await watcherInstance.stop();
Expand All @@ -60,6 +68,9 @@ export async function handleServerProcessTermination({
);
}

deleteDaemonJsonProcessCache();
cleanupPlugins();

serverLogger.log(`Server stopped because: "${reason}"`);
} finally {
process.exit(0);
Expand Down
3 changes: 3 additions & 0 deletions packages/nx/src/daemon/server/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
import { platform } from 'os';
import { getDaemonProcessIdSync, serverProcessJsonPath } from '../cache';
import type { WatchEvent } from '../../native';
import { openSockets } from './server';

const ALWAYS_IGNORE = [
...getAlwaysIgnore(workspaceRoot),
Expand Down Expand Up @@ -44,6 +45,7 @@ export async function watchWorkspace(server: Server, cb: FileWatcherCallback) {
handleServerProcessTermination({
server,
reason: 'this process is no longer the current daemon (native)',
sockets: openSockets,
});
}

Expand All @@ -53,6 +55,7 @@ export async function watchWorkspace(server: Server, cb: FileWatcherCallback) {
server,
reason:
'Stopping the daemon the set of ignored files changed (native)',
sockets: openSockets,
});
}
}
Expand Down
27 changes: 20 additions & 7 deletions packages/nx/src/project-graph/plugins/internal-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,19 +146,32 @@ export const nxPluginCache: Map<
[Promise<LoadedNxPlugin>, () => void]
> = new Map();

function isIsolationEnabled() {
// Explicitly enabled, regardless of further conditions
if (process.env.NX_ISOLATE_PLUGINS === 'true') {
return true;
}
if (
// Explicitly disabled
process.env.NX_ISOLATE_PLUGINS === 'false' ||
// Isolation is disabled on WASM builds currently.
IS_WASM
) {
return false;
}
// Default value
return true;
}

export async function loadNxPlugins(
plugins: PluginConfiguration[],
root = workspaceRoot
): Promise<readonly [LoadedNxPlugin[], () => void]> {
performance.mark('loadNxPlugins:start');

const loadingMethod =
process.env.NX_ISOLATE_PLUGINS === 'true' ||
(!IS_WASM &&
platform() !== 'win32' &&
process.env.NX_ISOLATE_PLUGINS !== 'false')
? loadNxPluginInIsolation
: loadNxPlugin;
const loadingMethod = isIsolationEnabled()
? loadNxPluginInIsolation
: loadNxPlugin;

plugins = await normalizePlugins(plugins, root);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,14 @@ export interface PluginWorkerProcessProjectGraphResult {
};
}

export interface PluginWorkerShutdownMessage {
type: 'shutdown';
payload: {};
}

export type PluginWorkerMessage =
| PluginWorkerLoadMessage
| PluginWorkerShutdownMessage
| PluginWorkerCreateNodesMessage
| PluginCreateDependenciesMessage
| PluginWorkerProcessProjectGraphMessage
Expand All @@ -162,6 +168,7 @@ export function isPluginWorkerMessage(
'createDependencies',
'processProjectGraph',
'createMetadata',
'shutdown',
].includes(message.type)
);
}
Expand Down
12 changes: 4 additions & 8 deletions packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ export async function loadRemoteNxPlugin(

const cleanupFunction = () => {
worker.off('exit', exitHandler);
shutdownPluginWorker(socket);
socket.destroy();
shutdownPluginWorker(worker);
nxPluginWorkerCache.delete(cacheKey);
};

Expand Down Expand Up @@ -94,13 +94,8 @@ export async function loadRemoteNxPlugin(
return [pluginPromise, cleanupFunction];
}

function shutdownPluginWorker(worker: ChildProcess) {
// Clears the plugin cache so no refs to the workers are held
nxPluginCache.clear();

// logger.verbose(`[plugin-pool] starting worker shutdown`);

worker.kill('SIGINT');
function shutdownPluginWorker(socket: Socket) {
sendMessageOverSocket(socket, { type: 'shutdown', payload: {} });
}

/**
Expand Down Expand Up @@ -249,6 +244,7 @@ function createWorkerExitHandler(

let cleanedUp = false;
const exitHandler = () => {
nxPluginCache.clear();
for (const fn of cleanupFunctions) {
fn();
}
Expand Down
Loading