Skip to content

Commit

Permalink
fix(core): shutdown daemon via message so it can clean itself up
Browse files Browse the repository at this point in the history
  • Loading branch information
AgentEnder committed Aug 8, 2024
1 parent 35a33ae commit 5e43458
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 25 deletions.
45 changes: 26 additions & 19 deletions packages/nx/src/daemon/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,40 @@ export async function writeDaemonJsonProcessCache(
await writeJson(serverProcessJsonPath, daemonJson);
}

export async function waitForDaemonToExitAndCleanupProcessJson(): Promise<void> {
const daemonProcessJson = await readDaemonProcessJsonCache();
if (daemonProcessJson && daemonProcessJson.processId) {
await new Promise<void>((resolve, reject) => {
let count = 0;
const interval = setInterval(() => {
try {
// sending a signal 0 to a process checks if the process is running instead of actually killing it
process.kill(daemonProcessJson.processId, 0);
} catch (e) {
clearInterval(interval);
resolve();
}
if ((count += 1) > 200) {
clearInterval(interval);
reject(
`Daemon process ${daemonProcessJson.processId} didn't exit after 2 seconds.`
);
}
}, 10);
});
deleteDaemonJsonProcessCache();
}
}

export async function safelyCleanUpExistingProcess(): Promise<void> {
const daemonProcessJson = await readDaemonProcessJsonCache();
if (daemonProcessJson && daemonProcessJson.processId) {
try {
process.kill(daemonProcessJson.processId);
// we wait for the process to actually shut down before returning
await new Promise<void>((resolve, reject) => {
let count = 0;
const interval = setInterval(() => {
try {
// sending a signal 0 to a process checks if the process is running instead of actually killing it
process.kill(daemonProcessJson.processId, 0);
} catch (e) {
clearInterval(interval);
resolve();
}
if ((count += 1) > 200) {
clearInterval(interval);
reject(
`Daemon process ${daemonProcessJson.processId} didn't exit after 2 seconds.`
);
}
}, 10);
});
await waitForDaemonToExitAndCleanupProcessJson();
} catch {}
}
deleteDaemonJsonProcessCache();
}

// Must be sync for the help output use case
Expand Down
13 changes: 10 additions & 3 deletions packages/nx/src/daemon/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { hasNxJson, NxJsonConfiguration } from '../../config/nx-json';
import { readNxJson } from '../../config/configuration';
import { PromisedBasedQueue } from '../../utils/promised-based-queue';
import { DaemonSocketMessenger, Message } from './daemon-socket-messenger';
import { safelyCleanUpExistingProcess } from '../cache';
import { waitForDaemonToExitAndCleanupProcessJson } from '../cache';
import { Hash } from '../../hasher/task-hasher';
import { Task, TaskGraph } from '../../config/task-graph';
import { ConfigurationSourceMaps } from '../../project-graph/utils/project-configuration-utils';
Expand Down Expand Up @@ -48,6 +48,7 @@ import {
HandleGetTaskHistoryForHashesMessage,
HandleWriteTaskRunsToHistoryMessage,
} from '../message-types/task-history';
import { FORCE_SHUTDOWN } from '../message-types/force-shutdown';

const DAEMON_ENV_SETTINGS = {
NX_PROJECT_GLOB_CACHE: 'false',
Expand Down Expand Up @@ -439,7 +440,10 @@ export class DaemonClient {
} else if (this._daemonStatus == DaemonStatus.CONNECTING) {
await this._waitForDaemonReady;
}

// An open promise isn't enough to keep the event loop
// alive, so we set a timeout here and clear it when we hear
// back
const keepAlive = setTimeout(() => {}, 10 * 60 * 1000);
return new Promise((resolve, reject) => {
performance.mark('sendMessageToDaemon-start');

Expand All @@ -448,6 +452,8 @@ export class DaemonClient {
this.currentReject = reject;

this.socketMessenger.sendMessage(message);
}).finally(() => {
clearTimeout(keepAlive);
});
}

Expand Down Expand Up @@ -541,7 +547,8 @@ export class DaemonClient {

async stop(): Promise<void> {
try {
await safelyCleanUpExistingProcess();
await this.sendMessageToDaemon({ type: FORCE_SHUTDOWN });
await waitForDaemonToExitAndCleanupProcessJson();
} catch (err) {
output.error({
title:
Expand Down
16 changes: 16 additions & 0 deletions packages/nx/src/daemon/message-types/force-shutdown.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export const FORCE_SHUTDOWN = 'FORCE_SHUTDOWN' as const;

export type HandleForceShutdownMessage = {
type: typeof FORCE_SHUTDOWN;
};

export function isHandleForceShutdownMessage(
message: unknown
): message is HandleForceShutdownMessage {
return (
typeof message === 'object' &&
message !== null &&
'type' in message &&
message['type'] === FORCE_SHUTDOWN
);
}
17 changes: 17 additions & 0 deletions packages/nx/src/daemon/server/handle-force-shutdown.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Server } from 'net';
import { handleServerProcessTermination } from './shutdown-utils';
import { openSockets } from './server';

export async function handleForceShutdown(server: Server) {
setTimeout(async () => {
await handleServerProcessTermination({
server,
reason: 'Request to shutdown',
sockets: openSockets,
});
});
return {
description: 'Shutdown initiated',
response: '{}',
};
}
2 changes: 2 additions & 0 deletions packages/nx/src/daemon/server/handle-request-shutdown.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Server } from 'net';
import { handleServerProcessTermination } from './shutdown-utils';
import { openSockets } from './server';

export async function handleRequestShutdown(
server: Server,
Expand All @@ -16,6 +17,7 @@ export async function handleRequestShutdown(
await handleServerProcessTermination({
server,
reason: 'Request to shutdown',
sockets: openSockets,
});
}, 0);
return {
Expand Down
14 changes: 14 additions & 0 deletions packages/nx/src/daemon/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ import {
} from '../message-types/task-history';
import { handleGetTaskHistoryForHashes } from './handle-get-task-history';
import { handleWriteTaskRunsToHistory } from './handle-write-task-runs-to-history';
import { isHandleForceShutdownMessage } from '../message-types/force-shutdown';
import { handleForceShutdown } from './handle-force-shutdown';

let performanceObserver: PerformanceObserver | undefined;
let workspaceWatcherError: Error | undefined;
Expand All @@ -90,9 +92,11 @@ export type HandlerResult = {
};

let numberOfOpenConnections = 0;
export const openSockets: Set<Socket> = new Set();

const server = createServer(async (socket) => {
numberOfOpenConnections += 1;
openSockets.add(socket);
serverLogger.log(
`Established a connection. Number of open connections: ${numberOfOpenConnections}`
);
Expand All @@ -119,6 +123,7 @@ const server = createServer(async (socket) => {

socket.on('close', () => {
numberOfOpenConnections -= 1;
openSockets.delete(socket);
serverLogger.log(
`Closed a connection. Number of open connections: ${numberOfOpenConnections}`
);
Expand Down Expand Up @@ -216,6 +221,10 @@ async function handleMessage(socket, data: string) {
await handleResult(socket, 'WRITE_TASK_RUNS_TO_HISTORY', () =>
handleWriteTaskRunsToHistory(payload.taskRuns)
);
} else if (isHandleForceShutdownMessage(payload)) {
await handleResult(socket, 'FORCE_SHUTDOWN', () =>
handleForceShutdown(server)
);
} else {
await respondWithErrorAndExit(
socket,
Expand Down Expand Up @@ -256,6 +265,7 @@ function handleInactivityTimeout() {
handleServerProcessTermination({
server,
reason: `${SERVER_INACTIVITY_TIMEOUT_MS}ms of inactivity`,
sockets: openSockets,
});
}
}
Expand All @@ -266,18 +276,21 @@ function registerProcessTerminationListeners() {
handleServerProcessTermination({
server,
reason: 'received process SIGINT',
sockets: openSockets,
})
)
.on('SIGTERM', () =>
handleServerProcessTermination({
server,
reason: 'received process SIGTERM',
sockets: openSockets,
})
)
.on('SIGHUP', () =>
handleServerProcessTermination({
server,
reason: 'received process SIGHUP',
sockets: openSockets,
})
);
}
Expand Down Expand Up @@ -352,6 +365,7 @@ const handleWorkspaceChanges: FileWatcherCallback = async (
await handleServerProcessTermination({
server,
reason: outdatedReason,
sockets: openSockets,
});
return;
}
Expand Down
17 changes: 14 additions & 3 deletions packages/nx/src/daemon/server/shutdown-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,24 @@ export function getOutputWatcherInstance() {
interface HandleServerProcessTerminationParams {
server: Server;
reason: string;
sockets: Iterable<Socket>;
}

export async function handleServerProcessTermination({
server,
reason,
sockets,
}: HandleServerProcessTerminationParams) {
try {
server.close();
deleteDaemonJsonProcessCache();
cleanupPlugins();
await new Promise((res) => {
server.close(() => {
res(null);
});

for (const socket of sockets) {
socket.destroy();
}
});

if (watcherInstance) {
await watcherInstance.stop();
Expand All @@ -60,6 +68,9 @@ export async function handleServerProcessTermination({
);
}

deleteDaemonJsonProcessCache();
cleanupPlugins();

serverLogger.log(`Server stopped because: "${reason}"`);
} finally {
process.exit(0);
Expand Down
3 changes: 3 additions & 0 deletions packages/nx/src/daemon/server/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
import { platform } from 'os';
import { getDaemonProcessIdSync, serverProcessJsonPath } from '../cache';
import type { WatchEvent } from '../../native';
import { openSockets } from './server';

const ALWAYS_IGNORE = [
...getAlwaysIgnore(workspaceRoot),
Expand Down Expand Up @@ -44,6 +45,7 @@ export async function watchWorkspace(server: Server, cb: FileWatcherCallback) {
handleServerProcessTermination({
server,
reason: 'this process is no longer the current daemon (native)',
sockets: openSockets,
});
}

Expand All @@ -53,6 +55,7 @@ export async function watchWorkspace(server: Server, cb: FileWatcherCallback) {
server,
reason:
'Stopping the daemon the set of ignored files changed (native)',
sockets: openSockets,
});
}
}
Expand Down

0 comments on commit 5e43458

Please sign in to comment.