diff --git a/packages/nx/src/daemon/cache.ts b/packages/nx/src/daemon/cache.ts index b06b2301078e7..f7d21e05e93de 100644 --- a/packages/nx/src/daemon/cache.ts +++ b/packages/nx/src/daemon/cache.ts @@ -38,33 +38,40 @@ export async function writeDaemonJsonProcessCache( await writeJson(serverProcessJsonPath, daemonJson); } +export async function waitForDaemonToExitAndCleanupProcessJson(): Promise { + const daemonProcessJson = await readDaemonProcessJsonCache(); + if (daemonProcessJson && daemonProcessJson.processId) { + await new Promise((resolve, reject) => { + let count = 0; + const interval = setInterval(() => { + try { + // sending a signal 0 to a process checks if the process is running instead of actually killing it + process.kill(daemonProcessJson.processId, 0); + } catch (e) { + clearInterval(interval); + resolve(); + } + if ((count += 1) > 200) { + clearInterval(interval); + reject( + `Daemon process ${daemonProcessJson.processId} didn't exit after 2 seconds.` + ); + } + }, 10); + }); + deleteDaemonJsonProcessCache(); + } +} + export async function safelyCleanUpExistingProcess(): Promise { const daemonProcessJson = await readDaemonProcessJsonCache(); if (daemonProcessJson && daemonProcessJson.processId) { try { process.kill(daemonProcessJson.processId); // we wait for the process to actually shut down before returning - await new Promise((resolve, reject) => { - let count = 0; - const interval = setInterval(() => { - try { - // sending a signal 0 to a process checks if the process is running instead of actually killing it - process.kill(daemonProcessJson.processId, 0); - } catch (e) { - clearInterval(interval); - resolve(); - } - if ((count += 1) > 200) { - clearInterval(interval); - reject( - `Daemon process ${daemonProcessJson.processId} didn't exit after 2 seconds.` - ); - } - }, 10); - }); + await waitForDaemonToExitAndCleanupProcessJson(); } catch {} } - deleteDaemonJsonProcessCache(); } // Must be sync for the help output use case diff --git a/packages/nx/src/daemon/client/client.ts b/packages/nx/src/daemon/client/client.ts index 17b937aa42d4b..2c7dcd7bc9143 100644 --- a/packages/nx/src/daemon/client/client.ts +++ b/packages/nx/src/daemon/client/client.ts @@ -20,7 +20,7 @@ import { hasNxJson, NxJsonConfiguration } from '../../config/nx-json'; import { readNxJson } from '../../config/configuration'; import { PromisedBasedQueue } from '../../utils/promised-based-queue'; import { DaemonSocketMessenger, Message } from './daemon-socket-messenger'; -import { safelyCleanUpExistingProcess } from '../cache'; +import { waitForDaemonToExitAndCleanupProcessJson } from '../cache'; import { Hash } from '../../hasher/task-hasher'; import { Task, TaskGraph } from '../../config/task-graph'; import { ConfigurationSourceMaps } from '../../project-graph/utils/project-configuration-utils'; @@ -48,6 +48,7 @@ import { HandleGetTaskHistoryForHashesMessage, HandleWriteTaskRunsToHistoryMessage, } from '../message-types/task-history'; +import { FORCE_SHUTDOWN } from '../message-types/force-shutdown'; const DAEMON_ENV_SETTINGS = { NX_PROJECT_GLOB_CACHE: 'false', @@ -439,7 +440,10 @@ export class DaemonClient { } else if (this._daemonStatus == DaemonStatus.CONNECTING) { await this._waitForDaemonReady; } - + // An open promise isn't enough to keep the event loop + // alive, so we set a timeout here and clear it when we hear + // back + const keepAlive = setTimeout(() => {}, 10 * 60 * 1000); return new Promise((resolve, reject) => { performance.mark('sendMessageToDaemon-start'); @@ -448,6 +452,8 @@ export class DaemonClient { this.currentReject = reject; this.socketMessenger.sendMessage(message); + }).finally(() => { + clearTimeout(keepAlive); }); } @@ -541,7 +547,8 @@ export class DaemonClient { async stop(): Promise { try { - await safelyCleanUpExistingProcess(); + await this.sendMessageToDaemon({ type: FORCE_SHUTDOWN }); + await waitForDaemonToExitAndCleanupProcessJson(); } catch (err) { output.error({ title: diff --git a/packages/nx/src/daemon/message-types/force-shutdown.ts b/packages/nx/src/daemon/message-types/force-shutdown.ts new file mode 100644 index 0000000000000..537d6f08c2ceb --- /dev/null +++ b/packages/nx/src/daemon/message-types/force-shutdown.ts @@ -0,0 +1,16 @@ +export const FORCE_SHUTDOWN = 'FORCE_SHUTDOWN' as const; + +export type HandleForceShutdownMessage = { + type: typeof FORCE_SHUTDOWN; +}; + +export function isHandleForceShutdownMessage( + message: unknown +): message is HandleForceShutdownMessage { + return ( + typeof message === 'object' && + message !== null && + 'type' in message && + message['type'] === FORCE_SHUTDOWN + ); +} diff --git a/packages/nx/src/daemon/server/handle-force-shutdown.ts b/packages/nx/src/daemon/server/handle-force-shutdown.ts new file mode 100644 index 0000000000000..fe778414c7672 --- /dev/null +++ b/packages/nx/src/daemon/server/handle-force-shutdown.ts @@ -0,0 +1,17 @@ +import { Server } from 'net'; +import { handleServerProcessTermination } from './shutdown-utils'; +import { openSockets } from './server'; + +export async function handleForceShutdown(server: Server) { + setTimeout(async () => { + await handleServerProcessTermination({ + server, + reason: 'Request to shutdown', + sockets: openSockets, + }); + }); + return { + description: 'Shutdown initiated', + response: '{}', + }; +} diff --git a/packages/nx/src/daemon/server/handle-request-shutdown.ts b/packages/nx/src/daemon/server/handle-request-shutdown.ts index a5506d87d2443..ef788dcda05e3 100644 --- a/packages/nx/src/daemon/server/handle-request-shutdown.ts +++ b/packages/nx/src/daemon/server/handle-request-shutdown.ts @@ -1,5 +1,6 @@ import { Server } from 'net'; import { handleServerProcessTermination } from './shutdown-utils'; +import { openSockets } from './server'; export async function handleRequestShutdown( server: Server, @@ -16,6 +17,7 @@ export async function handleRequestShutdown( await handleServerProcessTermination({ server, reason: 'Request to shutdown', + sockets: openSockets, }); }, 0); return { diff --git a/packages/nx/src/daemon/server/server.ts b/packages/nx/src/daemon/server/server.ts index 33b709511119a..4c2b6337a9f08 100644 --- a/packages/nx/src/daemon/server/server.ts +++ b/packages/nx/src/daemon/server/server.ts @@ -76,6 +76,8 @@ import { } from '../message-types/task-history'; import { handleGetTaskHistoryForHashes } from './handle-get-task-history'; import { handleWriteTaskRunsToHistory } from './handle-write-task-runs-to-history'; +import { isHandleForceShutdownMessage } from '../message-types/force-shutdown'; +import { handleForceShutdown } from './handle-force-shutdown'; let performanceObserver: PerformanceObserver | undefined; let workspaceWatcherError: Error | undefined; @@ -90,9 +92,11 @@ export type HandlerResult = { }; let numberOfOpenConnections = 0; +export const openSockets: Set = new Set(); const server = createServer(async (socket) => { numberOfOpenConnections += 1; + openSockets.add(socket); serverLogger.log( `Established a connection. Number of open connections: ${numberOfOpenConnections}` ); @@ -119,6 +123,7 @@ const server = createServer(async (socket) => { socket.on('close', () => { numberOfOpenConnections -= 1; + openSockets.delete(socket); serverLogger.log( `Closed a connection. Number of open connections: ${numberOfOpenConnections}` ); @@ -216,6 +221,10 @@ async function handleMessage(socket, data: string) { await handleResult(socket, 'WRITE_TASK_RUNS_TO_HISTORY', () => handleWriteTaskRunsToHistory(payload.taskRuns) ); + } else if (isHandleForceShutdownMessage(payload)) { + await handleResult(socket, 'FORCE_SHUTDOWN', () => + handleForceShutdown(server) + ); } else { await respondWithErrorAndExit( socket, @@ -256,6 +265,7 @@ function handleInactivityTimeout() { handleServerProcessTermination({ server, reason: `${SERVER_INACTIVITY_TIMEOUT_MS}ms of inactivity`, + sockets: openSockets, }); } } @@ -266,18 +276,21 @@ function registerProcessTerminationListeners() { handleServerProcessTermination({ server, reason: 'received process SIGINT', + sockets: openSockets, }) ) .on('SIGTERM', () => handleServerProcessTermination({ server, reason: 'received process SIGTERM', + sockets: openSockets, }) ) .on('SIGHUP', () => handleServerProcessTermination({ server, reason: 'received process SIGHUP', + sockets: openSockets, }) ); } @@ -352,6 +365,7 @@ const handleWorkspaceChanges: FileWatcherCallback = async ( await handleServerProcessTermination({ server, reason: outdatedReason, + sockets: openSockets, }); return; } diff --git a/packages/nx/src/daemon/server/shutdown-utils.ts b/packages/nx/src/daemon/server/shutdown-utils.ts index 216a968e0ab38..9c82ae714f9ff 100644 --- a/packages/nx/src/daemon/server/shutdown-utils.ts +++ b/packages/nx/src/daemon/server/shutdown-utils.ts @@ -35,16 +35,24 @@ export function getOutputWatcherInstance() { interface HandleServerProcessTerminationParams { server: Server; reason: string; + sockets: Iterable; } export async function handleServerProcessTermination({ server, reason, + sockets, }: HandleServerProcessTerminationParams) { try { - server.close(); - deleteDaemonJsonProcessCache(); - cleanupPlugins(); + await new Promise((res) => { + server.close(() => { + res(null); + }); + + for (const socket of sockets) { + socket.destroy(); + } + }); if (watcherInstance) { await watcherInstance.stop(); @@ -60,6 +68,9 @@ export async function handleServerProcessTermination({ ); } + deleteDaemonJsonProcessCache(); + cleanupPlugins(); + serverLogger.log(`Server stopped because: "${reason}"`); } finally { process.exit(0); diff --git a/packages/nx/src/daemon/server/watcher.ts b/packages/nx/src/daemon/server/watcher.ts index a9d46542deb48..da161c1545ab7 100644 --- a/packages/nx/src/daemon/server/watcher.ts +++ b/packages/nx/src/daemon/server/watcher.ts @@ -12,6 +12,7 @@ import { import { platform } from 'os'; import { getDaemonProcessIdSync, serverProcessJsonPath } from '../cache'; import type { WatchEvent } from '../../native'; +import { openSockets } from './server'; const ALWAYS_IGNORE = [ ...getAlwaysIgnore(workspaceRoot), @@ -44,6 +45,7 @@ export async function watchWorkspace(server: Server, cb: FileWatcherCallback) { handleServerProcessTermination({ server, reason: 'this process is no longer the current daemon (native)', + sockets: openSockets, }); } @@ -53,6 +55,7 @@ export async function watchWorkspace(server: Server, cb: FileWatcherCallback) { server, reason: 'Stopping the daemon the set of ignored files changed (native)', + sockets: openSockets, }); } }