From df388700cb02889ba657dbec82cf01bace59800c Mon Sep 17 00:00:00 2001 From: Craigory Coppola Date: Tue, 6 Aug 2024 13:53:16 -0400 Subject: [PATCH 1/3] fix(core): allow isolated plugins to shut themselves down --- .../project-graph/plugins/isolation/messaging.ts | 7 +++++++ .../project-graph/plugins/isolation/plugin-pool.ts | 12 ++++-------- .../plugins/isolation/plugin-worker.ts | 14 ++++++++++++++ 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/packages/nx/src/project-graph/plugins/isolation/messaging.ts b/packages/nx/src/project-graph/plugins/isolation/messaging.ts index 7b27e92f46a0c..4d24b049feb26 100644 --- a/packages/nx/src/project-graph/plugins/isolation/messaging.ts +++ b/packages/nx/src/project-graph/plugins/isolation/messaging.ts @@ -135,8 +135,14 @@ export interface PluginWorkerProcessProjectGraphResult { }; } +export interface PluginWorkerShutdownMessage { + type: 'shutdown'; + payload: {}; +} + export type PluginWorkerMessage = | PluginWorkerLoadMessage + | PluginWorkerShutdownMessage | PluginWorkerCreateNodesMessage | PluginCreateDependenciesMessage | PluginWorkerProcessProjectGraphMessage @@ -162,6 +168,7 @@ export function isPluginWorkerMessage( 'createDependencies', 'processProjectGraph', 'createMetadata', + 'shutdown', ].includes(message.type) ); } diff --git a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts index b7a87781b1a2e..876fe79bac717 100644 --- a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts +++ b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts @@ -53,8 +53,8 @@ export async function loadRemoteNxPlugin( const cleanupFunction = () => { worker.off('exit', exitHandler); + shutdownPluginWorker(socket); socket.destroy(); - shutdownPluginWorker(worker); nxPluginWorkerCache.delete(cacheKey); }; @@ -94,13 +94,8 @@ export async function loadRemoteNxPlugin( return [pluginPromise, cleanupFunction]; } -function shutdownPluginWorker(worker: ChildProcess) { - // Clears the plugin cache so no refs to the workers are held - nxPluginCache.clear(); - - // logger.verbose(`[plugin-pool] starting worker shutdown`); - - worker.kill('SIGINT'); +function shutdownPluginWorker(socket: Socket) { + sendMessageOverSocket(socket, { type: 'shutdown', payload: {} }); } /** @@ -249,6 +244,7 @@ function createWorkerExitHandler( let cleanedUp = false; const exitHandler = () => { + nxPluginCache.clear(); for (const fn of cleanupFunctions) { fn(); } diff --git a/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts b/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts index 509b1d0fa9350..52ddf693a2142 100644 --- a/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts +++ b/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts @@ -58,6 +58,20 @@ const server = createServer((socket) => { }; } }, + shutdown: async () => { + // Stops accepting new connections, but existing connections are + // not closed immediately. + server.close(() => { + try { + unlinkSync(socketPath); + } catch (e) {} + process.exit(0); + }); + // Closes existing connection. + socket.end(); + // Destroys the socket once it's fully closed. + socket.destroySoon(); + }, createNodes: async ({ configFiles, context, tx }) => { try { const result = await plugin.createNodes[1](configFiles, context); From 35a33ae5f68b4ae1c6260b3df8ccb36041cc3dd1 Mon Sep 17 00:00:00 2001 From: Craigory Coppola Date: Tue, 6 Aug 2024 17:16:54 -0400 Subject: [PATCH 2/3] fix(core): reenable isolation on windows --- .../src/project-graph/plugins/internal-api.ts | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/packages/nx/src/project-graph/plugins/internal-api.ts b/packages/nx/src/project-graph/plugins/internal-api.ts index 6bc38f0a67ae0..33359f5f831fd 100644 --- a/packages/nx/src/project-graph/plugins/internal-api.ts +++ b/packages/nx/src/project-graph/plugins/internal-api.ts @@ -146,19 +146,32 @@ export const nxPluginCache: Map< [Promise, () => void] > = new Map(); +function isIsolationEnabled() { + // Explicitly enabled, regardless of further conditions + if (process.env.NX_ISOLATE_PLUGINS === 'true') { + return true; + } + if ( + // Explicitly disabled + process.env.NX_ISOLATE_PLUGINS === 'false' || + // Isolation is disabled on WASM builds currently. + IS_WASM + ) { + return false; + } + // Default value + return true; +} + export async function loadNxPlugins( plugins: PluginConfiguration[], root = workspaceRoot ): Promise void]> { performance.mark('loadNxPlugins:start'); - const loadingMethod = - process.env.NX_ISOLATE_PLUGINS === 'true' || - (!IS_WASM && - platform() !== 'win32' && - process.env.NX_ISOLATE_PLUGINS !== 'false') - ? loadNxPluginInIsolation - : loadNxPlugin; + const loadingMethod = isIsolationEnabled() + ? loadNxPluginInIsolation + : loadNxPlugin; plugins = await normalizePlugins(plugins, root); From 5e434586d3ae3cb932206204262ddd916aa929be Mon Sep 17 00:00:00 2001 From: AgentEnder Date: Wed, 7 Aug 2024 12:24:17 -0400 Subject: [PATCH 3/3] fix(core): shutdown daemon via message so it can clean itself up --- packages/nx/src/daemon/cache.ts | 45 +++++++++++-------- packages/nx/src/daemon/client/client.ts | 13 ++++-- .../daemon/message-types/force-shutdown.ts | 16 +++++++ .../daemon/server/handle-force-shutdown.ts | 17 +++++++ .../daemon/server/handle-request-shutdown.ts | 2 + packages/nx/src/daemon/server/server.ts | 14 ++++++ .../nx/src/daemon/server/shutdown-utils.ts | 17 +++++-- packages/nx/src/daemon/server/watcher.ts | 3 ++ 8 files changed, 102 insertions(+), 25 deletions(-) create mode 100644 packages/nx/src/daemon/message-types/force-shutdown.ts create mode 100644 packages/nx/src/daemon/server/handle-force-shutdown.ts diff --git a/packages/nx/src/daemon/cache.ts b/packages/nx/src/daemon/cache.ts index b06b2301078e7..f7d21e05e93de 100644 --- a/packages/nx/src/daemon/cache.ts +++ b/packages/nx/src/daemon/cache.ts @@ -38,33 +38,40 @@ export async function writeDaemonJsonProcessCache( await writeJson(serverProcessJsonPath, daemonJson); } +export async function waitForDaemonToExitAndCleanupProcessJson(): Promise { + const daemonProcessJson = await readDaemonProcessJsonCache(); + if (daemonProcessJson && daemonProcessJson.processId) { + await new Promise((resolve, reject) => { + let count = 0; + const interval = setInterval(() => { + try { + // sending a signal 0 to a process checks if the process is running instead of actually killing it + process.kill(daemonProcessJson.processId, 0); + } catch (e) { + clearInterval(interval); + resolve(); + } + if ((count += 1) > 200) { + clearInterval(interval); + reject( + `Daemon process ${daemonProcessJson.processId} didn't exit after 2 seconds.` + ); + } + }, 10); + }); + deleteDaemonJsonProcessCache(); + } +} + export async function safelyCleanUpExistingProcess(): Promise { const daemonProcessJson = await readDaemonProcessJsonCache(); if (daemonProcessJson && daemonProcessJson.processId) { try { process.kill(daemonProcessJson.processId); // we wait for the process to actually shut down before returning - await new Promise((resolve, reject) => { - let count = 0; - const interval = setInterval(() => { - try { - // sending a signal 0 to a process checks if the process is running instead of actually killing it - process.kill(daemonProcessJson.processId, 0); - } catch (e) { - clearInterval(interval); - resolve(); - } - if ((count += 1) > 200) { - clearInterval(interval); - reject( - `Daemon process ${daemonProcessJson.processId} didn't exit after 2 seconds.` - ); - } - }, 10); - }); + await waitForDaemonToExitAndCleanupProcessJson(); } catch {} } - deleteDaemonJsonProcessCache(); } // Must be sync for the help output use case diff --git a/packages/nx/src/daemon/client/client.ts b/packages/nx/src/daemon/client/client.ts index 17b937aa42d4b..2c7dcd7bc9143 100644 --- a/packages/nx/src/daemon/client/client.ts +++ b/packages/nx/src/daemon/client/client.ts @@ -20,7 +20,7 @@ import { hasNxJson, NxJsonConfiguration } from '../../config/nx-json'; import { readNxJson } from '../../config/configuration'; import { PromisedBasedQueue } from '../../utils/promised-based-queue'; import { DaemonSocketMessenger, Message } from './daemon-socket-messenger'; -import { safelyCleanUpExistingProcess } from '../cache'; +import { waitForDaemonToExitAndCleanupProcessJson } from '../cache'; import { Hash } from '../../hasher/task-hasher'; import { Task, TaskGraph } from '../../config/task-graph'; import { ConfigurationSourceMaps } from '../../project-graph/utils/project-configuration-utils'; @@ -48,6 +48,7 @@ import { HandleGetTaskHistoryForHashesMessage, HandleWriteTaskRunsToHistoryMessage, } from '../message-types/task-history'; +import { FORCE_SHUTDOWN } from '../message-types/force-shutdown'; const DAEMON_ENV_SETTINGS = { NX_PROJECT_GLOB_CACHE: 'false', @@ -439,7 +440,10 @@ export class DaemonClient { } else if (this._daemonStatus == DaemonStatus.CONNECTING) { await this._waitForDaemonReady; } - + // An open promise isn't enough to keep the event loop + // alive, so we set a timeout here and clear it when we hear + // back + const keepAlive = setTimeout(() => {}, 10 * 60 * 1000); return new Promise((resolve, reject) => { performance.mark('sendMessageToDaemon-start'); @@ -448,6 +452,8 @@ export class DaemonClient { this.currentReject = reject; this.socketMessenger.sendMessage(message); + }).finally(() => { + clearTimeout(keepAlive); }); } @@ -541,7 +547,8 @@ export class DaemonClient { async stop(): Promise { try { - await safelyCleanUpExistingProcess(); + await this.sendMessageToDaemon({ type: FORCE_SHUTDOWN }); + await waitForDaemonToExitAndCleanupProcessJson(); } catch (err) { output.error({ title: diff --git a/packages/nx/src/daemon/message-types/force-shutdown.ts b/packages/nx/src/daemon/message-types/force-shutdown.ts new file mode 100644 index 0000000000000..537d6f08c2ceb --- /dev/null +++ b/packages/nx/src/daemon/message-types/force-shutdown.ts @@ -0,0 +1,16 @@ +export const FORCE_SHUTDOWN = 'FORCE_SHUTDOWN' as const; + +export type HandleForceShutdownMessage = { + type: typeof FORCE_SHUTDOWN; +}; + +export function isHandleForceShutdownMessage( + message: unknown +): message is HandleForceShutdownMessage { + return ( + typeof message === 'object' && + message !== null && + 'type' in message && + message['type'] === FORCE_SHUTDOWN + ); +} diff --git a/packages/nx/src/daemon/server/handle-force-shutdown.ts b/packages/nx/src/daemon/server/handle-force-shutdown.ts new file mode 100644 index 0000000000000..fe778414c7672 --- /dev/null +++ b/packages/nx/src/daemon/server/handle-force-shutdown.ts @@ -0,0 +1,17 @@ +import { Server } from 'net'; +import { handleServerProcessTermination } from './shutdown-utils'; +import { openSockets } from './server'; + +export async function handleForceShutdown(server: Server) { + setTimeout(async () => { + await handleServerProcessTermination({ + server, + reason: 'Request to shutdown', + sockets: openSockets, + }); + }); + return { + description: 'Shutdown initiated', + response: '{}', + }; +} diff --git a/packages/nx/src/daemon/server/handle-request-shutdown.ts b/packages/nx/src/daemon/server/handle-request-shutdown.ts index a5506d87d2443..ef788dcda05e3 100644 --- a/packages/nx/src/daemon/server/handle-request-shutdown.ts +++ b/packages/nx/src/daemon/server/handle-request-shutdown.ts @@ -1,5 +1,6 @@ import { Server } from 'net'; import { handleServerProcessTermination } from './shutdown-utils'; +import { openSockets } from './server'; export async function handleRequestShutdown( server: Server, @@ -16,6 +17,7 @@ export async function handleRequestShutdown( await handleServerProcessTermination({ server, reason: 'Request to shutdown', + sockets: openSockets, }); }, 0); return { diff --git a/packages/nx/src/daemon/server/server.ts b/packages/nx/src/daemon/server/server.ts index 33b709511119a..4c2b6337a9f08 100644 --- a/packages/nx/src/daemon/server/server.ts +++ b/packages/nx/src/daemon/server/server.ts @@ -76,6 +76,8 @@ import { } from '../message-types/task-history'; import { handleGetTaskHistoryForHashes } from './handle-get-task-history'; import { handleWriteTaskRunsToHistory } from './handle-write-task-runs-to-history'; +import { isHandleForceShutdownMessage } from '../message-types/force-shutdown'; +import { handleForceShutdown } from './handle-force-shutdown'; let performanceObserver: PerformanceObserver | undefined; let workspaceWatcherError: Error | undefined; @@ -90,9 +92,11 @@ export type HandlerResult = { }; let numberOfOpenConnections = 0; +export const openSockets: Set = new Set(); const server = createServer(async (socket) => { numberOfOpenConnections += 1; + openSockets.add(socket); serverLogger.log( `Established a connection. Number of open connections: ${numberOfOpenConnections}` ); @@ -119,6 +123,7 @@ const server = createServer(async (socket) => { socket.on('close', () => { numberOfOpenConnections -= 1; + openSockets.delete(socket); serverLogger.log( `Closed a connection. Number of open connections: ${numberOfOpenConnections}` ); @@ -216,6 +221,10 @@ async function handleMessage(socket, data: string) { await handleResult(socket, 'WRITE_TASK_RUNS_TO_HISTORY', () => handleWriteTaskRunsToHistory(payload.taskRuns) ); + } else if (isHandleForceShutdownMessage(payload)) { + await handleResult(socket, 'FORCE_SHUTDOWN', () => + handleForceShutdown(server) + ); } else { await respondWithErrorAndExit( socket, @@ -256,6 +265,7 @@ function handleInactivityTimeout() { handleServerProcessTermination({ server, reason: `${SERVER_INACTIVITY_TIMEOUT_MS}ms of inactivity`, + sockets: openSockets, }); } } @@ -266,18 +276,21 @@ function registerProcessTerminationListeners() { handleServerProcessTermination({ server, reason: 'received process SIGINT', + sockets: openSockets, }) ) .on('SIGTERM', () => handleServerProcessTermination({ server, reason: 'received process SIGTERM', + sockets: openSockets, }) ) .on('SIGHUP', () => handleServerProcessTermination({ server, reason: 'received process SIGHUP', + sockets: openSockets, }) ); } @@ -352,6 +365,7 @@ const handleWorkspaceChanges: FileWatcherCallback = async ( await handleServerProcessTermination({ server, reason: outdatedReason, + sockets: openSockets, }); return; } diff --git a/packages/nx/src/daemon/server/shutdown-utils.ts b/packages/nx/src/daemon/server/shutdown-utils.ts index 216a968e0ab38..9c82ae714f9ff 100644 --- a/packages/nx/src/daemon/server/shutdown-utils.ts +++ b/packages/nx/src/daemon/server/shutdown-utils.ts @@ -35,16 +35,24 @@ export function getOutputWatcherInstance() { interface HandleServerProcessTerminationParams { server: Server; reason: string; + sockets: Iterable; } export async function handleServerProcessTermination({ server, reason, + sockets, }: HandleServerProcessTerminationParams) { try { - server.close(); - deleteDaemonJsonProcessCache(); - cleanupPlugins(); + await new Promise((res) => { + server.close(() => { + res(null); + }); + + for (const socket of sockets) { + socket.destroy(); + } + }); if (watcherInstance) { await watcherInstance.stop(); @@ -60,6 +68,9 @@ export async function handleServerProcessTermination({ ); } + deleteDaemonJsonProcessCache(); + cleanupPlugins(); + serverLogger.log(`Server stopped because: "${reason}"`); } finally { process.exit(0); diff --git a/packages/nx/src/daemon/server/watcher.ts b/packages/nx/src/daemon/server/watcher.ts index a9d46542deb48..da161c1545ab7 100644 --- a/packages/nx/src/daemon/server/watcher.ts +++ b/packages/nx/src/daemon/server/watcher.ts @@ -12,6 +12,7 @@ import { import { platform } from 'os'; import { getDaemonProcessIdSync, serverProcessJsonPath } from '../cache'; import type { WatchEvent } from '../../native'; +import { openSockets } from './server'; const ALWAYS_IGNORE = [ ...getAlwaysIgnore(workspaceRoot), @@ -44,6 +45,7 @@ export async function watchWorkspace(server: Server, cb: FileWatcherCallback) { handleServerProcessTermination({ server, reason: 'this process is no longer the current daemon (native)', + sockets: openSockets, }); } @@ -53,6 +55,7 @@ export async function watchWorkspace(server: Server, cb: FileWatcherCallback) { server, reason: 'Stopping the daemon the set of ignored files changed (native)', + sockets: openSockets, }); } }