From 6a8ff2c9e97b4237ce3690483a8a0183e197cc4e Mon Sep 17 00:00:00 2001 From: Jack Hsu Date: Mon, 8 May 2023 11:32:17 -0400 Subject: [PATCH] feat(js): improve @nx/js:node executor to be more resilient to many file change events --- .../generated/packages/js/executors/node.json | 5 + packages/js/package.json | 1 - .../js/src/executors/node/lib/kill-tree.ts | 129 ++++++ packages/js/src/executors/node/node.impl.ts | 403 ++++++++++-------- packages/js/src/executors/node/schema.d.ts | 1 + packages/js/src/executors/node/schema.json | 5 + 6 files changed, 357 insertions(+), 187 deletions(-) create mode 100644 packages/js/src/executors/node/lib/kill-tree.ts diff --git a/docs/generated/packages/js/executors/node.json b/docs/generated/packages/js/executors/node.json index 54aa404a860275..57200acfaabcc2 100644 --- a/docs/generated/packages/js/executors/node.json +++ b/docs/generated/packages/js/executors/node.json @@ -59,6 +59,11 @@ "type": "boolean", "description": "Enable re-building when files change.", "default": true + }, + "debounce": { + "type": "number", + "description": "Delay in milliseconds to wait before restarting. Useful to batch multiple file changes events together. Set to zero (0) to disable.", + "default": 500 } }, "additionalProperties": false, diff --git a/packages/js/package.json b/packages/js/package.json index 297ccd30ce2be4..64881b56cc4c76 100644 --- a/packages/js/package.json +++ b/packages/js/package.json @@ -50,7 +50,6 @@ "js-tokens": "^4.0.0", "minimatch": "3.0.5", "source-map-support": "0.5.19", - "tree-kill": "1.2.2", "tslib": "^2.3.0", "@nx/devkit": "file:../devkit", "@nx/workspace": "file:../workspace" diff --git a/packages/js/src/executors/node/lib/kill-tree.ts b/packages/js/src/executors/node/lib/kill-tree.ts new file mode 100644 index 00000000000000..71e6c24656642d --- /dev/null +++ b/packages/js/src/executors/node/lib/kill-tree.ts @@ -0,0 +1,129 @@ +// Adapted from https://raw.githubusercontent.com/pkrumins/node-tree-kill/deee138/index.js +import { spawn, exec, ExecException } from 'child_process'; + +export async function killTree(pid: number, signal: NodeJS.Signals) { + const tree = {}; + const pidsToProcess = {}; + tree[pid] = []; + pidsToProcess[pid] = 1; + + return new Promise((resolve, reject) => { + const callback = (error: ExecException | null) => { + if (error) { + reject(error); + } else { + resolve(); + } + }; + + switch (process.platform) { + case 'win32': + exec('taskkill /pid ' + pid + ' /T /F', callback); + break; + case 'darwin': + buildProcessTree( + pid, + tree, + pidsToProcess, + function (parentPid) { + return spawn('pgrep', ['-P', parentPid]); + }, + function () { + killAll(tree, signal, callback); + } + ); + break; + default: // Linux + buildProcessTree( + pid, + tree, + pidsToProcess, + function (parentPid) { + return spawn('ps', [ + '-o', + 'pid', + '--no-headers', + '--ppid', + parentPid, + ]); + }, + function () { + killAll(tree, signal, callback); + } + ); + break; + } + }); +} + +function killAll(tree, signal, callback) { + const killed = {}; + try { + Object.keys(tree).forEach(function (pid) { + tree[pid].forEach(function (pidpid) { + if (!killed[pidpid]) { + killPid(pidpid, signal); + killed[pidpid] = 1; + } + }); + if (!killed[pid]) { + killPid(pid, signal); + killed[pid] = 1; + } + }); + } catch (err) { + if (callback) { + return callback(err); + } else { + throw err; + } + } + if (callback) { + return callback(); + } +} + +function killPid(pid, signal) { + try { + process.kill(parseInt(pid, 10), signal); + } catch (err) { + if (err.code !== 'ESRCH') throw err; + } +} + +function buildProcessTree( + parentPid, + tree, + pidsToProcess, + spawnChildProcessesList, + cb +) { + const ps = spawnChildProcessesList(parentPid); + let allData = ''; + ps.stdout.on('data', (data) => { + data = data.toString('ascii'); + allData += data; + }); + + const onClose = function (code) { + delete pidsToProcess[parentPid]; + + if (code != 0) { + // no more parent processes + if (Object.keys(pidsToProcess).length == 0) { + cb(); + } + return; + } + + allData.match(/\d+/g).forEach((_pid) => { + const pid = parseInt(_pid, 10); + tree[parentPid].push(pid); + tree[pid] = []; + pidsToProcess[pid] = 1; + buildProcessTree(pid, tree, pidsToProcess, spawnChildProcessesList, cb); + }); + }; + + ps.on('close', onClose); +} diff --git a/packages/js/src/executors/node/node.impl.ts b/packages/js/src/executors/node/node.impl.ts index 00297681db609f..431dfc36e920ae 100644 --- a/packages/js/src/executors/node/node.impl.ts +++ b/packages/js/src/executors/node/node.impl.ts @@ -1,126 +1,230 @@ +import * as chalk from 'chalk'; +import { ChildProcess, exec, fork } from 'child_process'; import { ExecutorContext, joinPathFragments, logger, parseTargetString, - runExecutor, } from '@nx/devkit'; -import { calculateProjectDependencies } from '../../utils/buildable-libs-utils'; -import { ChildProcess, fork } from 'child_process'; +import { daemonClient } from 'nx/src/daemon/client/client'; import { randomUUID } from 'crypto'; -import { HashingImpl } from 'nx/src/hasher/hashing-impl'; -import * as treeKill from 'tree-kill'; -import { promisify } from 'util'; +import { join } from 'path'; + import { InspectType, NodeExecutorOptions } from './schema'; +import { createAsyncIterable } from '@nx/devkit/src/utils/async-iterable'; +import { calculateProjectDependencies } from '../../utils/buildable-libs-utils'; +import { killTree } from './lib/kill-tree'; -const hasher = new HashingImpl(); -const processMap = new Map(); -const hashedMap = new Map(); +interface ActiveTask { + id: string; + killed: boolean; + promise: Promise; + childProcess: null | ChildProcess; + start: () => Promise; + stop: (signal: NodeJS.Signals) => Promise; +} -export interface ExecutorEvent { - outfile: string; - success: boolean; +function debounce(fn: () => void, wait: number) { + let timeoutId: NodeJS.Timeout; + return () => { + clearTimeout(timeoutId); + timeoutId = setTimeout(fn, wait); + }; } export async function* nodeExecutor( options: NodeExecutorOptions, context: ExecutorContext ) { - const uniqueKey = randomUUID(); - process.on('SIGTERM', async () => { - await killCurrentProcess(uniqueKey, options, 'SIGTERM'); - process.exit(128 + 15); - }); - process.on('SIGINT', async () => { - await killCurrentProcess(uniqueKey, options, 'SIGINT'); - process.exit(128 + 2); - }); - process.on('SIGHUP', async () => { - await killCurrentProcess(uniqueKey, options, 'SIGHUP'); - process.exit(128 + 1); - }); - - if (options.waitUntilTargets && options.waitUntilTargets.length > 0) { - const results = await runWaitUntilTargets(options, context); - for (const [i, result] of results.entries()) { - if (!result.success) { - throw new Error( - `Wait until target failed: ${options.waitUntilTargets[i]}.` - ); - } - } - } + const project = context.projectGraph.nodes[context.projectName]; + const buildTarget = parseTargetString( + options.buildTarget, + context.projectGraph + ); - const mappings = calculateResolveMappings(context, options); - for await (const event of startBuild(options, context)) { - if (!event.success) { - logger.error('There was an error with the build. See above.'); - logger.info(`${event.outfile} was not restarted.`); - } - await handleBuildEvent(uniqueKey, event, options, mappings); - yield event; + const buildOptions = project.data.targets[buildTarget.target]?.options; + if (!buildOptions) { + throw new Error( + `Cannot find build target ${chalk.bold( + options.buildTarget + )} for project ${chalk.bold(context.projectName)}` + ); } -} -function calculateResolveMappings( - context: ExecutorContext, - options: NodeExecutorOptions -) { - const parsed = parseTargetString(options.buildTarget, context.projectGraph); - const { dependencies } = calculateProjectDependencies( - context.projectGraph, + // Re-map buildable workspace projects to their output directory. + const mappings = calculateResolveMappings(context, options); + const fileToRun = join( context.root, - parsed.project, - parsed.target, - parsed.configuration + buildOptions.outputPath, + buildOptions.outputFileName ?? 'main.js' ); - return dependencies.reduce((m, c) => { - if (c.node.type !== 'npm' && c.outputs[0] != null) { - m[c.name] = joinPathFragments(context.root, c.outputs[0]); - } - return m; - }, {}); -} -async function runProcess( - uniqueKey: string, - event: ExecutorEvent, - options: NodeExecutorOptions, - mappings: { [project: string]: string } -) { - const execArgv = getExecArgv(options); - - const hashedKey = JSON.stringify([uniqueKey, ...options.args]); - const hashed = hasher.hashArray(execArgv.concat(hashedKey)); - hashedMap.set(hashedKey, hashed); - - const subProcess = fork( - joinPathFragments(__dirname, 'node-with-require-overrides'), - options.args, - { - execArgv, - stdio: 'inherit', - env: { - ...process.env, - NX_FILE_TO_RUN: event.outfile, - NX_MAPPINGS: JSON.stringify(mappings), - }, - } - ); + const tasks: ActiveTask[] = []; + let currentTask: ActiveTask = null; + + yield* createAsyncIterable<{ success: boolean }>( + async ({ done, next, error }) => { + const processQueue = async () => { + if (tasks.length === 0) return; + + const previousTask = currentTask; + const task = tasks.shift(); + currentTask = task; + await previousTask?.stop('SIGTERM'); + await task.start(); + }; + + const debouncedProcessQueue = debounce( + processQueue, + options.debounce ?? 1_000 + ); + + const addToQueue = async () => { + const task: ActiveTask = { + id: randomUUID(), + killed: false, + childProcess: null, + promise: null, + start: async () => { + // Run the build + task.promise = new Promise(async (resolve, reject) => { + task.childProcess = exec( + `npx nx run ${context.projectName}:${buildTarget.target}${ + buildTarget.configuration + ? `:${buildTarget.configuration}` + : '' + }`, + { + cwd: context.root, + }, + (error, stdout, stderr) => { + if ( + // Build succeeded + !error || + // If process receive termination signal, it means another task has started. + error.signal === 'SIGTERM' + ) { + resolve(); + return; + } - processMap.set(hashed, subProcess); + if (process.env.NX_VERBOSE_LOGGING === 'true') { + logger.error(error); + } - if (!options.watch) { - return new Promise((resolve, reject) => { - subProcess.on('exit', (code) => { - if (code === 0) { - resolve(undefined); - } else { - reject(); + if (options.watch) { + logger.error( + `Build failed, waiting for changes to restart...` + ); + resolve(); // Don't reject because it'll error out and kill the Nx process. + } else { + logger.error(`Build failed. See above for errors.`); + reject(); + } + } + ); + }); + + // Wait for build to finish + await task.promise; + + // Task may have been stopped due to another running task + if (task.killed) return; + + // Run the program + task.promise = new Promise((resolve, reject) => { + task.childProcess = fork( + joinPathFragments(__dirname, 'node-with-require-overrides'), + getExecArgv(options), + { + stdio: [0, 1, 'pipe', 'ipc'], + env: { + ...process.env, + NX_FILE_TO_RUN: fileToRun, + NX_MAPPINGS: JSON.stringify(mappings), + }, + } + ); + + task.childProcess.stderr.on('data', (data) => { + // Don't log out error if task is killed and new one has started. + // This could happen if a new build is triggered while new process is starting, since the operation is not atomic. + if (options.watch && !task.killed) { + logger.error(data.toString()); + } + }); + + task.childProcess.once('exit', (code) => { + if (options.watch && !task.killed) { + logger.info( + `NX Process exited with code ${code}, waiting for changes to restart...` + ); + } + if (!options.watch) done(); + resolve(); + }); + + next({ success: true }); + }); + }, + stop: async (signal = 'SIGTERM') => { + task.killed = true; + // Request termination and wait for process to finish gracefully. + // NOTE: `childProcess` may not have been set yet if the task did not have a chance to start. + // e.g. multiple file change events in a short time (like git checkout). + await killTree(task.childProcess.pid, signal); + await task.promise; + }, + }; + + tasks.push(task); + }; + + const stopWatch = await daemonClient.registerFileWatcher( + { + watchProjects: [context.projectName], + includeDependentProjects: true, + }, + async (err, data) => { + if (err === 'closed') { + logger.error(`Watch error: Daemon closed the connection`); + process.exit(1); + } else if (err) { + logger.error(`Watch error: ${err?.message ?? 'Unknown'}`); + } else { + logger.info(`NX File change detected. Restarting...`); + await addToQueue(); + await debouncedProcessQueue(); + } + } + ); + + const stopAllTasks = (signal: NodeJS.Signals = 'SIGTERM') => { + for (const task of tasks) { + task.stop(signal); } + }; + + process.on('SIGTERM', async () => { + stopWatch(); + stopAllTasks('SIGTERM'); + process.exit(128 + 15); }); - }); - } + process.on('SIGINT', async () => { + stopWatch(); + stopAllTasks('SIGINT'); + process.exit(128 + 2); + }); + process.on('SIGHUP', async () => { + stopWatch(); + stopAllTasks('SIGHUP'); + process.exit(128 + 1); + }); + + await addToQueue(); + await processQueue(); + } + ); } function getExecArgv(options: NodeExecutorOptions) { @@ -141,97 +245,24 @@ function getExecArgv(options: NodeExecutorOptions) { return args; } -async function handleBuildEvent( - uniqueKey: string, - event: ExecutorEvent, - options: NodeExecutorOptions, - mappings: { [project: string]: string } -) { - // Don't kill previous run unless new build is successful. - if (options.watch && event.success) { - await killCurrentProcess(uniqueKey, options); - } - - if (event.success) { - await runProcess(uniqueKey, event, options, mappings); - } -} - -const promisifiedTreeKill: (pid: number, signal: string) => Promise = - promisify(treeKill); - -async function killCurrentProcess( - uniqueKey: string, - options: NodeExecutorOptions, - signal: string = 'SIGTERM' -) { - const hashedKey = JSON.stringify([uniqueKey, ...options.args]); - const currentProcessKey = hashedMap.get(hashedKey); - if (!currentProcessKey) return; - - const currentProcess = processMap.get(currentProcessKey); - if (!currentProcess) return; - - try { - await promisifiedTreeKill(currentProcess.pid, signal); - - // if the currentProcess.killed is false, invoke kill() - // to properly send the signal to the process - if (!currentProcess.killed) { - currentProcess.kill(signal as NodeJS.Signals); - } - } catch (err) { - if (Array.isArray(err) && err[0] && err[2]) { - const errorMessage = err[2]; - logger.error(errorMessage); - } else if (err.message) { - logger.error(err.message); - } - } finally { - processMap.delete(currentProcessKey); - hashedMap.delete(hashedKey); - } -} - -async function* startBuild( - options: NodeExecutorOptions, - context: ExecutorContext +function calculateResolveMappings( + context: ExecutorContext, + options: NodeExecutorOptions ) { - const buildTarget = parseTargetString( - options.buildTarget, - context.projectGraph - ); - - yield* await runExecutor( - buildTarget, - { - ...options.buildTargetOptions, - watch: options.watch, - }, - context - ); -} - -function runWaitUntilTargets( - options: NodeExecutorOptions, - context: ExecutorContext -): Promise<{ success: boolean }[]> { - return Promise.all( - options.waitUntilTargets.map(async (waitUntilTarget) => { - const target = parseTargetString(waitUntilTarget, context.projectGraph); - const output = await runExecutor(target, {}, context); - return new Promise<{ success: boolean }>(async (resolve) => { - let event = await output.next(); - // Resolve after first event - resolve(event.value as { success: boolean }); - - // Continue iterating - while (!event.done) { - event = await output.next(); - } - }); - }) + const parsed = parseTargetString(options.buildTarget, context.projectGraph); + const { dependencies } = calculateProjectDependencies( + context.projectGraph, + context.root, + parsed.project, + parsed.target, + parsed.configuration ); + return dependencies.reduce((m, c) => { + if (c.node.type !== 'npm' && c.outputs[0] != null) { + m[c.name] = joinPathFragments(context.root, c.outputs[0]); + } + return m; + }, {}); } export default nodeExecutor; diff --git a/packages/js/src/executors/node/schema.d.ts b/packages/js/src/executors/node/schema.d.ts index 8ce910c7c70a50..08b8ebb072e8d0 100644 --- a/packages/js/src/executors/node/schema.d.ts +++ b/packages/js/src/executors/node/schema.d.ts @@ -13,4 +13,5 @@ export interface NodeExecutorOptions { host: string; port: number; watch?: boolean; + debounce?: number; } diff --git a/packages/js/src/executors/node/schema.json b/packages/js/src/executors/node/schema.json index 288313b4d609c3..302562492b8e53 100644 --- a/packages/js/src/executors/node/schema.json +++ b/packages/js/src/executors/node/schema.json @@ -67,6 +67,11 @@ "type": "boolean", "description": "Enable re-building when files change.", "default": true + }, + "debounce": { + "type": "number", + "description": "Delay in milliseconds to wait before restarting. Useful to batch multiple file changes events together. Set to zero (0) to disable.", + "default": 500 } }, "additionalProperties": false,