From 9be6e0eb2274f0b41ac087591950404c610862e7 Mon Sep 17 00:00:00 2001 From: Craigory Coppola Date: Wed, 5 Jun 2024 11:34:01 -0400 Subject: [PATCH 01/12] chore(repo): enable isolation locally + in e2e --- .env | 1 + .gitignore | 2 +- e2e/utils/get-env-info.ts | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 .env diff --git a/.env b/.env new file mode 100644 index 0000000000000..582460b0d8516 --- /dev/null +++ b/.env @@ -0,0 +1 @@ +NX_ISOLATE_PLUGINS=true \ No newline at end of file diff --git a/.gitignore b/.gitignore index ab215da9f1474..926728d344f16 100644 --- a/.gitignore +++ b/.gitignore @@ -37,7 +37,7 @@ out .angular # Local dev files -.env +.env.local .bashrc .nx diff --git a/e2e/utils/get-env-info.ts b/e2e/utils/get-env-info.ts index 17e8f98a9a5d7..0869cf918225c 100644 --- a/e2e/utils/get-env-info.ts +++ b/e2e/utils/get-env-info.ts @@ -160,7 +160,7 @@ export function getStrippedEnvironmentVariables() { return true; } - const allowedKeys = ['NX_ADD_PLUGINS']; + const allowedKeys = ['NX_ADD_PLUGINS', 'NX_ISOLATE_PLUGINS']; if (key.startsWith('NX_') && !allowedKeys.includes(key)) { return false; From f27cd00c559f4ac4d50ad1f7ef6d8f2e52a7cf54 Mon Sep 17 00:00:00 2001 From: Craigory Coppola Date: Fri, 14 Jun 2024 14:41:08 -0400 Subject: [PATCH 02/12] fix(core): move plugin worker to socket + server --- .../nx/src/command-line/run/command-object.ts | 3 +- packages/nx/src/daemon/socket-utils.ts | 5 + .../plugins/isolation/messaging.ts | 14 +- .../plugins/isolation/plugin-pool.ts | 194 +++++++++++---- .../plugins/isolation/plugin-worker.ts | 230 ++++++++++-------- 5 files changed, 286 insertions(+), 160 deletions(-) diff --git a/packages/nx/src/command-line/run/command-object.ts b/packages/nx/src/command-line/run/command-object.ts index 6b27f0cd3b3ee..120c2093b4588 100644 --- a/packages/nx/src/command-line/run/command-object.ts +++ b/packages/nx/src/command-line/run/command-object.ts @@ -37,7 +37,7 @@ export const yargsNxInfixCommand: CommandModule = { command: '$0 [project] [_..]', describe: 'Run a target for a project', handler: async (args) => { - await handleErrors( + const exitCode = await handleErrors( (args.verbose as boolean) ?? process.env.NX_VERBOSE_LOGGING === 'true', async () => { return (await import('./run-one')).runOne( @@ -46,5 +46,6 @@ export const yargsNxInfixCommand: CommandModule = { ); } ); + process.exit(exitCode); }, }; diff --git a/packages/nx/src/daemon/socket-utils.ts b/packages/nx/src/daemon/socket-utils.ts index be83e1ae9ebfc..3cf26406131a4 100644 --- a/packages/nx/src/daemon/socket-utils.ts +++ b/packages/nx/src/daemon/socket-utils.ts @@ -22,6 +22,11 @@ export const getForkedProcessOsSocketPath = (id: string) => { return isWindows ? '\\\\.\\pipe\\nx\\' + resolve(path) : resolve(path); }; +export const getPluginOsSocketPath = (id: string) => { + let path = resolve(join(getSocketDir(), 'plugin' + id + '.sock')); + return isWindows ? '\\\\.\\pipe\\nx\\' + resolve(path) : resolve(path); +}; + export function killSocketOrPath(): void { try { unlinkSync(getFullOsSocketPath()); diff --git a/packages/nx/src/project-graph/plugins/isolation/messaging.ts b/packages/nx/src/project-graph/plugins/isolation/messaging.ts index ce64fb43aa45f..e764247bba814 100644 --- a/packages/nx/src/project-graph/plugins/isolation/messaging.ts +++ b/packages/nx/src/project-graph/plugins/isolation/messaging.ts @@ -7,9 +7,11 @@ import { CreateDependenciesContext, CreateMetadataContext, CreateNodesContext, + CreateNodesContextV2, } from '../public-api'; import { LoadedNxPlugin } from '../internal-api'; import { Serializable } from 'child_process'; +import { Socket } from 'net'; export interface PluginWorkerLoadMessage { type: 'load'; @@ -42,7 +44,7 @@ export interface PluginWorkerCreateNodesMessage { type: 'createNodes'; payload: { configFiles: string[]; - context: CreateNodesContext; + context: CreateNodesContextV2; tx: string; }; } @@ -192,6 +194,7 @@ type MessageHandlerReturn = export async function consumeMessage< T extends PluginWorkerMessage | PluginWorkerResult >( + socket: Socket, raw: T, handlers: { [K in T['type']]: ( @@ -205,7 +208,14 @@ export async function consumeMessage< if (handler) { const response = await handler(message.payload); if (response) { - process.send!(response); + sendMessageOverSocket(socket, response); } } } + +export function sendMessageOverSocket( + socket: Socket, + message: PluginWorkerMessage | PluginWorkerResult +) { + socket.write(JSON.stringify(message) + String.fromCodePoint(4)); +} diff --git a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts index ecfc269de746e..545f770ea7b26 100644 --- a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts +++ b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts @@ -7,7 +7,16 @@ import { PluginConfiguration } from '../../../config/nx-json'; // import { logger } from '../../utils/logger'; import { LoadedNxPlugin, nxPluginCache } from '../internal-api'; -import { consumeMessage, isPluginWorkerResult } from './messaging'; +import { getPluginOsSocketPath } from '../../../daemon/socket-utils'; +import { consumeMessagesFromSocket } from '../../../utils/consume-messages-from-socket'; + +import { + consumeMessage, + isPluginWorkerResult, + sendMessageOverSocket, +} from './messaging'; +import { Socket, connect } from 'net'; +import { unlinkSync } from 'fs'; const cleanupFunctions = new Set<() => void>(); @@ -19,41 +28,18 @@ interface PendingPromise { rejector: (err: any) => void; } -export function loadRemoteNxPlugin( +export async function loadRemoteNxPlugin( plugin: PluginConfiguration, root: string -): [Promise, () => void] { - // this should only really be true when running unit tests within - // the Nx repo. We still need to start the worker in this case, - // but its typescript. - const isWorkerTypescript = path.extname(__filename) === '.ts'; - const workerPath = path.join(__dirname, 'plugin-worker'); +): Promise<[Promise, () => void]> { + const { ipcPath, worker } = await startPluginWorker(plugin); - const env: Record = { - ...process.env, - ...(isWorkerTypescript - ? { - // Ensures that the worker uses the same tsconfig as the main process - TS_NODE_PROJECT: path.join( - __dirname, - '../../../../tsconfig.lib.json' - ), - } - : {}), - }; - - const worker = fork(workerPath, [], { - stdio: ['ignore', 'inherit', 'inherit', 'ipc'], - env, - execArgv: [ - ...process.execArgv, - // If the worker is typescript, we need to register ts-node - ...(isWorkerTypescript ? ['-r', 'ts-node/register'] : []), - ], + const socket = await new Promise((res, rej) => { + const socket = connect(ipcPath, () => { + res(socket); + }); + socket.on('error', rej); }); - worker.send({ type: 'load', payload: { plugin, root } }); - - // logger.verbose(`[plugin-worker] started worker: ${worker.pid}`); const pendingPromises = new Map(); @@ -61,6 +47,7 @@ export function loadRemoteNxPlugin( const cleanupFunction = () => { worker.off('exit', exitHandler); + socket.destroy(); shutdownPluginWorker(worker); }; @@ -68,16 +55,21 @@ export function loadRemoteNxPlugin( return [ new Promise((res, rej) => { - worker.on( - 'message', - createWorkerHandler(worker, pendingPromises, res, rej) + sendMessageOverSocket(socket, { + type: 'load', + payload: { plugin, root }, + }); + // logger.verbose(`[plugin-worker] started worker: ${worker.pid}`); + + socket.on( + 'data', + consumeMessagesFromSocket( + createWorkerHandler(worker, pendingPromises, res, rej, socket) + ) ); worker.on('exit', exitHandler); }), - () => { - cleanupFunction(); - cleanupFunctions.delete(cleanupFunction); - }, + cleanupFunction, ]; } @@ -102,15 +94,18 @@ function createWorkerHandler( worker: ChildProcess, pending: Map, onload: (plugin: LoadedNxPlugin) => void, - onloadError: (err?: unknown) => void + onloadError: (err?: unknown) => void, + socket: Socket ) { let pluginName: string; - return function (message: Serializable) { + return function (raw: string) { + const message = JSON.parse(raw); + if (!isPluginWorkerResult(message)) { return; } - return consumeMessage(message, { + return consumeMessage(socket, message, { 'load-result': (result) => { if (result.success) { const { name, createNodesPattern, include, exclude } = result; @@ -124,9 +119,13 @@ function createWorkerHandler( ? [ createNodesPattern, (configFiles, ctx) => { - const tx = pluginName + ':createNodes:' + performance.now(); + const tx = + pluginName + + worker.pid + + ':createNodes:' + + performance.now(); return registerPendingPromise(tx, pending, () => { - worker.send({ + sendMessageOverSocket(socket, { type: 'createNodes', payload: { configFiles, context: ctx, tx }, }); @@ -137,9 +136,12 @@ function createWorkerHandler( createDependencies: result.hasCreateDependencies ? (ctx) => { const tx = - pluginName + ':createDependencies:' + performance.now(); + pluginName + + worker.pid + + ':createDependencies:' + + performance.now(); return registerPendingPromise(tx, pending, () => { - worker.send({ + sendMessageOverSocket(socket, { type: 'createDependencies', payload: { context: ctx, tx }, }); @@ -149,9 +151,12 @@ function createWorkerHandler( processProjectGraph: result.hasProcessProjectGraph ? (graph, ctx) => { const tx = - pluginName + ':processProjectGraph:' + performance.now(); + pluginName + + worker.pid + + ':processProjectGraph:' + + performance.now(); return registerPendingPromise(tx, pending, () => { - worker.send({ + sendMessageOverSocket(socket, { type: 'processProjectGraph', payload: { graph, ctx, tx }, }); @@ -161,9 +166,12 @@ function createWorkerHandler( createMetadata: result.hasCreateMetadata ? (graph, ctx) => { const tx = - pluginName + ':createMetadata:' + performance.now(); + pluginName + + worker.pid + + ':createMetadata:' + + performance.now(); return registerPendingPromise(tx, pending, () => { - worker.send({ + sendMessageOverSocket(socket, { type: 'createMetadata', payload: { graph, context: ctx, tx }, }); @@ -228,11 +236,18 @@ function createWorkerExitHandler( }; } -process.on('exit', () => { +let cleanedUp = false; +const exitHandler = () => { + if (cleanedUp) return; for (const fn of cleanupFunctions) { fn(); } -}); + cleanedUp = true; +}; + +process.on('exit', exitHandler); +process.on('SIGINT', exitHandler); +process.on('SIGTERM', exitHandler); function registerPendingPromise( tx: string, @@ -258,3 +273,78 @@ function registerPendingPromise( return promise; } + +let workerCount = 0; +async function startPluginWorker(plugin: PluginConfiguration) { + // this should only really be true when running unit tests within + // the Nx repo. We still need to start the worker in this case, + // but its typescript. + const isWorkerTypescript = path.extname(__filename) === '.ts'; + const workerPath = path.join(__dirname, 'plugin-worker'); + + const env: Record = { + ...process.env, + ...(isWorkerTypescript + ? { + // Ensures that the worker uses the same tsconfig as the main process + TS_NODE_PROJECT: path.join( + __dirname, + '../../../../tsconfig.lib.json' + ), + } + : {}), + }; + + const ipcPath = getPluginOsSocketPath([process.pid, workerCount++].join('-')); + + const worker = fork(workerPath, [ipcPath], { + stdio: process.stdout.isTTY ? 'inherit' : 'ignore', + env, + execArgv: [ + ...process.execArgv, + // If the worker is typescript, we need to register ts-node + ...(isWorkerTypescript ? ['-r', 'ts-node/register'] : []), + ], + detached: true, + }); + worker.disconnect(); + worker.unref(); + + let attempts = 0; + return new Promise<{ + worker: ChildProcess; + ipcPath: string; + }>((resolve, reject) => { + const id = setInterval(async () => { + if (await isServerAvailable(ipcPath)) { + clearInterval(id); + resolve({ + worker, + ipcPath, + }); + } else if (attempts > 1000) { + // daemon fails to start, the process probably exited + // we print the logs and exit the client + reject('Failed to start plugin worker.'); + } else { + attempts++; + } + }, 10); + }); +} + +function isServerAvailable(ipcPath: string): Promise { + return new Promise((resolve) => { + try { + const socket = connect(ipcPath, () => { + socket.destroy(); + resolve(true); + }); + socket.once('error', () => { + resolve(false); + }); + } catch (err) { + resolve(false); + } + }); +} diff --git a/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts b/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts index 0f61ab620c3ba..ed636665e021f 100644 --- a/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts +++ b/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts @@ -1,8 +1,10 @@ import { consumeMessage, isPluginWorkerMessage } from './messaging'; import { LoadedNxPlugin } from '../internal-api'; import { loadNxPlugin } from '../loader'; -import { Serializable } from 'child_process'; import { createSerializableError } from '../../../utils/serializable-error'; +import { createServer } from 'net'; +import { consumeMessagesFromSocket } from '../../../utils/consume-messages-from-socket'; +import { unlinkSync } from 'fs'; if (process.env.NX_PERF_LOGGING === 'true') { require('../../../utils/perf-logging'); @@ -12,109 +14,127 @@ global.NX_GRAPH_CREATION = true; let plugin: LoadedNxPlugin; -process.on('message', async (message: Serializable) => { - if (!isPluginWorkerMessage(message)) { - return; - } - return consumeMessage(message, { - load: async ({ plugin: pluginConfiguration, root }) => { - process.chdir(root); - try { - const [promise] = loadNxPlugin(pluginConfiguration, root); - plugin = await promise; - return { - type: 'load-result', - payload: { - name: plugin.name, - include: plugin.include, - exclude: plugin.exclude, - createNodesPattern: plugin.createNodes?.[0], - hasCreateDependencies: - 'createDependencies' in plugin && !!plugin.createDependencies, - hasProcessProjectGraph: - 'processProjectGraph' in plugin && !!plugin.processProjectGraph, - hasCreateMetadata: - 'createMetadata' in plugin && !!plugin.createMetadata, - success: true, - }, - }; - } catch (e) { - return { - type: 'load-result', - payload: { - success: false, - error: createSerializableError(e), - }, - }; - } - }, - createNodes: async ({ configFiles, context, tx }) => { - try { - const result = await plugin.createNodes[1](configFiles, context); - return { - type: 'createNodesResult', - payload: { result, success: true, tx }, - }; - } catch (e) { - return { - type: 'createNodesResult', - payload: { - success: false, - error: createSerializableError(e), - tx, - }, - }; - } - }, - createDependencies: async ({ context, tx }) => { - try { - const result = await plugin.createDependencies(context); - return { - type: 'createDependenciesResult', - payload: { dependencies: result, success: true, tx }, - }; - } catch (e) { - return { - type: 'createDependenciesResult', - payload: { - success: false, - error: createSerializableError(e), - tx, - }, - }; - } - }, - processProjectGraph: async ({ graph, ctx, tx }) => { - try { - const result = await plugin.processProjectGraph(graph, ctx); - return { - type: 'processProjectGraphResult', - payload: { graph: result, success: true, tx }, - }; - } catch (e) { - return { - type: 'processProjectGraphResult', - payload: { - success: false, - error: createSerializableError(e), - tx, - }, - }; - } - }, - createMetadata: async ({ graph, context, tx }) => { - try { - const result = await plugin.createMetadata(graph, context); - return { - type: 'createMetadataResult', - payload: { metadata: result, success: true, tx }, - }; - } catch (e) { - return { - type: 'createMetadataResult', - payload: { success: false, error: e.stack, tx }, - }; +const socketPath = process.argv[2]; + +const server = createServer((socket) => { + socket.on( + 'data', + consumeMessagesFromSocket((raw) => { + const message = JSON.parse(raw.toString()); + if (!isPluginWorkerMessage(message)) { + return; } - }, - }); + return consumeMessage(socket, message, { + load: async ({ plugin: pluginConfiguration, root }) => { + process.chdir(root); + try { + const [promise] = loadNxPlugin(pluginConfiguration, root); + plugin = await promise; + return { + type: 'load-result', + payload: { + name: plugin.name, + include: plugin.include, + exclude: plugin.exclude, + createNodesPattern: plugin.createNodes?.[0], + hasCreateDependencies: + 'createDependencies' in plugin && !!plugin.createDependencies, + hasProcessProjectGraph: + 'processProjectGraph' in plugin && + !!plugin.processProjectGraph, + hasCreateMetadata: + 'createMetadata' in plugin && !!plugin.createMetadata, + success: true, + }, + }; + } catch (e) { + return { + type: 'load-result', + payload: { + success: false, + error: createSerializableError(e), + }, + }; + } + }, + createNodes: async ({ configFiles, context, tx }) => { + try { + const result = await plugin.createNodes[1](configFiles, context); + return { + type: 'createNodesResult', + payload: { result, success: true, tx }, + }; + } catch (e) { + return { + type: 'createNodesResult', + payload: { + success: false, + error: createSerializableError(e), + tx, + }, + }; + } + }, + createDependencies: async ({ context, tx }) => { + try { + const result = await plugin.createDependencies(context); + return { + type: 'createDependenciesResult', + payload: { dependencies: result, success: true, tx }, + }; + } catch (e) { + return { + type: 'createDependenciesResult', + payload: { + success: false, + error: createSerializableError(e), + tx, + }, + }; + } + }, + processProjectGraph: async ({ graph, ctx, tx }) => { + try { + const result = await plugin.processProjectGraph(graph, ctx); + return { + type: 'processProjectGraphResult', + payload: { graph: result, success: true, tx }, + }; + } catch (e) { + return { + type: 'processProjectGraphResult', + payload: { + success: false, + error: createSerializableError(e), + tx, + }, + }; + } + }, + createMetadata: async ({ graph, context, tx }) => { + try { + const result = await plugin.createMetadata(graph, context); + return { + type: 'createMetadataResult', + payload: { metadata: result, success: true, tx }, + }; + } catch (e) { + return { + type: 'createMetadataResult', + payload: { success: false, error: e.stack, tx }, + }; + } + }, + }); + }) + ); +}); + +server.listen(socketPath); + +process.on('exit', () => { + server.close(); + try { + unlinkSync(socketPath); + } catch (e) {} }); From 1f8a8539c45b1184357e66de9ef677915245c800 Mon Sep 17 00:00:00 2001 From: Craigory Coppola Date: Mon, 24 Jun 2024 13:19:18 -0400 Subject: [PATCH 03/12] chore(core): don't remove promises from pending map in plugin isolation --- .../plugins/isolation/plugin-pool.ts | 25 ++++++------------- 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts index 545f770ea7b26..d29efc6ccccfb 100644 --- a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts +++ b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts @@ -16,7 +16,6 @@ import { sendMessageOverSocket, } from './messaging'; import { Socket, connect } from 'net'; -import { unlinkSync } from 'fs'; const cleanupFunctions = new Set<() => void>(); @@ -99,6 +98,8 @@ function createWorkerHandler( ) { let pluginName: string; + let txId = 0; + return function (raw: string) { const message = JSON.parse(raw); @@ -120,10 +121,7 @@ function createWorkerHandler( createNodesPattern, (configFiles, ctx) => { const tx = - pluginName + - worker.pid + - ':createNodes:' + - performance.now(); + pluginName + worker.pid + ':createNodes:' + txId++; return registerPendingPromise(tx, pending, () => { sendMessageOverSocket(socket, { type: 'createNodes', @@ -136,10 +134,7 @@ function createWorkerHandler( createDependencies: result.hasCreateDependencies ? (ctx) => { const tx = - pluginName + - worker.pid + - ':createDependencies:' + - performance.now(); + pluginName + worker.pid + ':createDependencies:' + txId++; return registerPendingPromise(tx, pending, () => { sendMessageOverSocket(socket, { type: 'createDependencies', @@ -151,10 +146,7 @@ function createWorkerHandler( processProjectGraph: result.hasProcessProjectGraph ? (graph, ctx) => { const tx = - pluginName + - worker.pid + - ':processProjectGraph:' + - performance.now(); + pluginName + worker.pid + ':processProjectGraph:' + txId++; return registerPendingPromise(tx, pending, () => { sendMessageOverSocket(socket, { type: 'processProjectGraph', @@ -166,10 +158,7 @@ function createWorkerHandler( createMetadata: result.hasCreateMetadata ? (graph, ctx) => { const tx = - pluginName + - worker.pid + - ':createMetadata:' + - performance.now(); + pluginName + worker.pid + ':createMetadata:' + txId++; return registerPendingPromise(tx, pending, () => { sendMessageOverSocket(socket, { type: 'createMetadata', @@ -262,7 +251,7 @@ function registerPendingPromise( callback(); }).finally(() => { - pending.delete(tx); + // pending.delete(tx); }); pending.set(tx, { From 7b378b628dba60baab0b057ca323362a8115717e Mon Sep 17 00:00:00 2001 From: Craigory Coppola Date: Mon, 24 Jun 2024 13:43:45 -0400 Subject: [PATCH 04/12] chore(repo): tests --- .circleci/config.yml | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index ba498cd4d15a2..ebe65316f530a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -86,6 +86,8 @@ jobs: NX_E2E_RUN_E2E: 'true' NX_CI_EXECUTION_ENV: 'linux' NX_CLOUD_DTE_V2: 'true' + NX_VERBOSE_LOGGING: 'true' + NX_E2E_VERBOSE_LOGGING: 'true' steps: - checkout - nx/set-shas: @@ -106,21 +108,7 @@ jobs: name: Run Checks/Lint/Test/Build no_output_timeout: 60m command: | - pids=() - - pnpm nx-cloud record -- nx format:check --base=$NX_BASE --head=$NX_HEAD & - pids+=($!) - - pnpm nx run-many -t check-imports check-commit check-lock-files check-codeowners documentation --parallel=1 --no-dte & - pids+=($!) - - (pnpm nx affected --targets=lint,test,build --base=$NX_BASE --head=$NX_HEAD --parallel=3 && - pnpm nx affected --targets=e2e,e2e-ci --base=$NX_BASE --head=$NX_HEAD --parallel=1) & - pids+=($!) - - for pid in "${pids[@]}"; do - wait "$pid" - done + pnpm nx e2e-ci e2e-react # ------------------------- # JOBS: Main-MacOS # ------------------------- From ac3023e4a5d3819bbb1197346a23d68f2b86d07d Mon Sep 17 00:00:00 2001 From: Craigory Coppola Date: Mon, 24 Jun 2024 17:39:48 -0400 Subject: [PATCH 05/12] fix(core): ensure workers used if multiple instances of plugin pool are created --- .../src/project-graph/plugins/internal-api.ts | 2 +- .../project-graph/plugins/isolation/index.ts | 6 +- .../plugins/isolation/plugin-pool.ts | 55 ++++++++++++------- 3 files changed, 39 insertions(+), 24 deletions(-) diff --git a/packages/nx/src/project-graph/plugins/internal-api.ts b/packages/nx/src/project-graph/plugins/internal-api.ts index 230f27f381a18..68d4163ce04b5 100644 --- a/packages/nx/src/project-graph/plugins/internal-api.ts +++ b/packages/nx/src/project-graph/plugins/internal-api.ts @@ -159,7 +159,7 @@ export async function loadNxPlugins( const cleanupFunctions: Array<() => void> = []; for (const plugin of plugins) { - const [loadedPluginPromise, cleanup] = loadingMethod(plugin, root); + const [loadedPluginPromise, cleanup] = await loadingMethod(plugin, root); result.push(loadedPluginPromise); cleanupFunctions.push(cleanup); } diff --git a/packages/nx/src/project-graph/plugins/isolation/index.ts b/packages/nx/src/project-graph/plugins/isolation/index.ts index 19a5ba5abac9f..7b6e9af11460f 100644 --- a/packages/nx/src/project-graph/plugins/isolation/index.ts +++ b/packages/nx/src/project-graph/plugins/isolation/index.ts @@ -11,17 +11,17 @@ const remotePluginCache = new Map< readonly [Promise, () => void] >(); -export function loadNxPluginInIsolation( +export async function loadNxPluginInIsolation( plugin: PluginConfiguration, root = workspaceRoot -): readonly [Promise, () => void] { +): Promise, () => void]> { const cacheKey = JSON.stringify(plugin); if (remotePluginCache.has(cacheKey)) { return remotePluginCache.get(cacheKey); } - const [loadingPlugin, cleanup] = loadRemoteNxPlugin(plugin, root); + const [loadingPlugin, cleanup] = await loadRemoteNxPlugin(plugin, root); // We clean up plugin workers when Nx process completes. const val = [ loadingPlugin, diff --git a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts index d29efc6ccccfb..8618ab8909c5d 100644 --- a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts +++ b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts @@ -27,10 +27,21 @@ interface PendingPromise { rejector: (err: any) => void; } +type NxPluginWorkerCache = Map>; + +const nxPluginWorkerCache: NxPluginWorkerCache = (global[ + 'nxPluginWorkerCache' +] ??= new Map()); + export async function loadRemoteNxPlugin( plugin: PluginConfiguration, root: string ): Promise<[Promise, () => void]> { + const cacheKey = JSON.stringify(plugin); + if (nxPluginWorkerCache.has(cacheKey)) { + return [nxPluginWorkerCache.get(cacheKey), () => {}]; + } + const { ipcPath, worker } = await startPluginWorker(plugin); const socket = await new Promise((res, rej) => { @@ -48,28 +59,30 @@ export async function loadRemoteNxPlugin( worker.off('exit', exitHandler); socket.destroy(); shutdownPluginWorker(worker); + nxPluginWorkerCache.delete(cacheKey); }; cleanupFunctions.add(cleanupFunction); - return [ - new Promise((res, rej) => { - sendMessageOverSocket(socket, { - type: 'load', - payload: { plugin, root }, - }); - // logger.verbose(`[plugin-worker] started worker: ${worker.pid}`); + const pluginPromise = new Promise((res, rej) => { + sendMessageOverSocket(socket, { + type: 'load', + payload: { plugin, root }, + }); + // logger.verbose(`[plugin-worker] started worker: ${worker.pid}`); + + socket.on( + 'data', + consumeMessagesFromSocket( + createWorkerHandler(worker, pendingPromises, res, rej, socket) + ) + ); + worker.on('exit', exitHandler); + }); - socket.on( - 'data', - consumeMessagesFromSocket( - createWorkerHandler(worker, pendingPromises, res, rej, socket) - ) - ); - worker.on('exit', exitHandler); - }), - cleanupFunction, - ]; + nxPluginWorkerCache.set(cacheKey, pluginPromise); + + return [pluginPromise, cleanupFunction]; } function shutdownPluginWorker(worker: ChildProcess) { @@ -251,7 +264,7 @@ function registerPendingPromise( callback(); }).finally(() => { - // pending.delete(tx); + pending.delete(tx); }); pending.set(tx, { @@ -263,7 +276,7 @@ function registerPendingPromise( return promise; } -let workerCount = 0; +global.nxPluginWorkerCount ??= 0; async function startPluginWorker(plugin: PluginConfiguration) { // this should only really be true when running unit tests within // the Nx repo. We still need to start the worker in this case, @@ -284,7 +297,9 @@ async function startPluginWorker(plugin: PluginConfiguration) { : {}), }; - const ipcPath = getPluginOsSocketPath([process.pid, workerCount++].join('-')); + const ipcPath = getPluginOsSocketPath( + [process.pid, global.nxPluginWorkerCount++].join('-') + ); const worker = fork(workerPath, [ipcPath], { stdio: process.stdout.isTTY ? 'inherit' : 'ignore', From 88779a53720bb1b6614bb63e7275ea848d9a2ac1 Mon Sep 17 00:00:00 2001 From: Craigory Coppola Date: Mon, 24 Jun 2024 17:55:37 -0400 Subject: [PATCH 06/12] Revert "chore(repo): tests" This reverts commit 3919a13606232bde9d318c562ce839782d2eb889. --- .circleci/config.yml | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index ebe65316f530a..ba498cd4d15a2 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -86,8 +86,6 @@ jobs: NX_E2E_RUN_E2E: 'true' NX_CI_EXECUTION_ENV: 'linux' NX_CLOUD_DTE_V2: 'true' - NX_VERBOSE_LOGGING: 'true' - NX_E2E_VERBOSE_LOGGING: 'true' steps: - checkout - nx/set-shas: @@ -108,7 +106,21 @@ jobs: name: Run Checks/Lint/Test/Build no_output_timeout: 60m command: | - pnpm nx e2e-ci e2e-react + pids=() + + pnpm nx-cloud record -- nx format:check --base=$NX_BASE --head=$NX_HEAD & + pids+=($!) + + pnpm nx run-many -t check-imports check-commit check-lock-files check-codeowners documentation --parallel=1 --no-dte & + pids+=($!) + + (pnpm nx affected --targets=lint,test,build --base=$NX_BASE --head=$NX_HEAD --parallel=3 && + pnpm nx affected --targets=e2e,e2e-ci --base=$NX_BASE --head=$NX_HEAD --parallel=1) & + pids+=($!) + + for pid in "${pids[@]}"; do + wait "$pid" + done # ------------------------- # JOBS: Main-MacOS # ------------------------- From e13db7dc257016caaf29de539d9130021af2212d Mon Sep 17 00:00:00 2001 From: Craigory Coppola Date: Mon, 24 Jun 2024 18:04:16 -0400 Subject: [PATCH 07/12] chore(core): remove unused var --- nx.json | 2 +- .../nx/src/project-graph/plugins/isolation/plugin-pool.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/nx.json b/nx.json index 3e1d6f5ff825a..4e9ad71484b5a 100644 --- a/nx.json +++ b/nx.json @@ -212,6 +212,6 @@ "nxCloudUrl": "https://staging.nx.app", "parallel": 1, "cacheDirectory": "/tmp/nx-cache", - "bust": 5, + "bust": 6, "defaultBase": "master" } diff --git a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts index 8618ab8909c5d..232fe0b63d10a 100644 --- a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts +++ b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts @@ -42,7 +42,7 @@ export async function loadRemoteNxPlugin( return [nxPluginWorkerCache.get(cacheKey), () => {}]; } - const { ipcPath, worker } = await startPluginWorker(plugin); + const { ipcPath, worker } = await startPluginWorker(); const socket = await new Promise((res, rej) => { const socket = connect(ipcPath, () => { @@ -277,7 +277,7 @@ function registerPendingPromise( } global.nxPluginWorkerCount ??= 0; -async function startPluginWorker(plugin: PluginConfiguration) { +async function startPluginWorker() { // this should only really be true when running unit tests within // the Nx repo. We still need to start the worker in this case, // but its typescript. From 1fc5e35d3fea79f4a21870de831cf196188f37cd Mon Sep 17 00:00:00 2001 From: Craigory Coppola Date: Tue, 25 Jun 2024 14:23:18 -0400 Subject: [PATCH 08/12] chore(repo): test --- .circleci/config.yml | 6 +----- nx.json | 2 +- .../project-graph/plugins/isolation/plugin-worker.ts | 11 +++++++++-- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index ba498cd4d15a2..ced0e0a694b1a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -98,10 +98,6 @@ jobs: - browser-tools/install-chrome - run-pnpm-install: os: linux - - run: - name: Check Documentation - command: pnpm nx documentation --no-dte - no_output_timeout: 20m - run: name: Run Checks/Lint/Test/Build no_output_timeout: 60m @@ -114,7 +110,7 @@ jobs: pnpm nx run-many -t check-imports check-commit check-lock-files check-codeowners documentation --parallel=1 --no-dte & pids+=($!) - (pnpm nx affected --targets=lint,test,build --base=$NX_BASE --head=$NX_HEAD --parallel=3 && + (pnpm nx affected --targets=lint,test,build --base=$NX_BASE --head=$NX_HEAD --parallel=3 --exclude nx && pnpm nx affected --targets=e2e,e2e-ci --base=$NX_BASE --head=$NX_HEAD --parallel=1) & pids+=($!) diff --git a/nx.json b/nx.json index 4e9ad71484b5a..24750ba2831a1 100644 --- a/nx.json +++ b/nx.json @@ -212,6 +212,6 @@ "nxCloudUrl": "https://staging.nx.app", "parallel": 1, "cacheDirectory": "/tmp/nx-cache", - "bust": 6, + "bust": 7, "defaultBase": "master" } diff --git a/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts b/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts index ed636665e021f..0303021a83da1 100644 --- a/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts +++ b/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts @@ -132,9 +132,16 @@ const server = createServer((socket) => { server.listen(socketPath); -process.on('exit', () => { +const exitHandler = (exitCode: number) => () => { server.close(); try { unlinkSync(socketPath); } catch (e) {} -}); + process.exit(exitCode); +}; + +const events = ['SIGINT', 'SIGTERM', 'SIGQUIT', 'exit']; + +events.forEach((event) => process.once(event, exitHandler(0))); +process.once('uncaughtException', exitHandler(1)); +process.once('unhandledRejection', exitHandler(1)); From 4e9c4149cb4551637cb775665d7e8ef604b062b2 Mon Sep 17 00:00:00 2001 From: Craigory Coppola Date: Tue, 25 Jun 2024 16:51:28 -0400 Subject: [PATCH 09/12] fix(core): unref socket and avoid needlessly disposing + reconnecting --- .../affected/npm-packages.spec.ts | 2 +- .../project-graph/plugins/isolation/index.ts | 20 +--------------- .../plugins/isolation/plugin-pool.ts | 24 +++++++------------ 3 files changed, 11 insertions(+), 35 deletions(-) diff --git a/packages/nx/src/plugins/js/project-graph/affected/npm-packages.spec.ts b/packages/nx/src/plugins/js/project-graph/affected/npm-packages.spec.ts index b5c7992a4ed85..a5ea8032ff64d 100644 --- a/packages/nx/src/plugins/js/project-graph/affected/npm-packages.spec.ts +++ b/packages/nx/src/plugins/js/project-graph/affected/npm-packages.spec.ts @@ -296,7 +296,7 @@ describe('getTouchedNpmPackages', () => { }); it('should handle and log workspace package.json changes when the changes are not in `npmPackages` (projectGraph.externalNodes)', () => { - jest.spyOn(logger, 'warn'); + jest.spyOn(logger, 'warn').mockImplementation(() => {}); expect(() => { getTouchedNpmPackages( [ diff --git a/packages/nx/src/project-graph/plugins/isolation/index.ts b/packages/nx/src/project-graph/plugins/isolation/index.ts index 7b6e9af11460f..4bab409265189 100644 --- a/packages/nx/src/project-graph/plugins/isolation/index.ts +++ b/packages/nx/src/project-graph/plugins/isolation/index.ts @@ -3,33 +3,15 @@ import { PluginConfiguration } from '../../../config/nx-json'; import { LoadedNxPlugin } from '../internal-api'; import { loadRemoteNxPlugin } from './plugin-pool'; -/** - * Used to ensure 1 plugin : 1 worker - */ -const remotePluginCache = new Map< - string, - readonly [Promise, () => void] ->(); - export async function loadNxPluginInIsolation( plugin: PluginConfiguration, root = workspaceRoot ): Promise, () => void]> { - const cacheKey = JSON.stringify(plugin); - - if (remotePluginCache.has(cacheKey)) { - return remotePluginCache.get(cacheKey); - } - const [loadingPlugin, cleanup] = await loadRemoteNxPlugin(plugin, root); - // We clean up plugin workers when Nx process completes. - const val = [ + return [ loadingPlugin, () => { cleanup(); - remotePluginCache.delete(cacheKey); }, ] as const; - remotePluginCache.set(cacheKey, val); - return val; } diff --git a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts index 232fe0b63d10a..df76b0309a193 100644 --- a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts +++ b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts @@ -37,19 +37,12 @@ export async function loadRemoteNxPlugin( plugin: PluginConfiguration, root: string ): Promise<[Promise, () => void]> { - const cacheKey = JSON.stringify(plugin); + const cacheKey = JSON.stringify({ plugin, root }); if (nxPluginWorkerCache.has(cacheKey)) { return [nxPluginWorkerCache.get(cacheKey), () => {}]; } - const { ipcPath, worker } = await startPluginWorker(); - - const socket = await new Promise((res, rej) => { - const socket = connect(ipcPath, () => { - res(socket); - }); - socket.on('error', rej); - }); + const { worker, socket } = await startPluginWorker(); const pendingPromises = new Map(); @@ -317,14 +310,16 @@ async function startPluginWorker() { let attempts = 0; return new Promise<{ worker: ChildProcess; - ipcPath: string; + socket: Socket; }>((resolve, reject) => { const id = setInterval(async () => { - if (await isServerAvailable(ipcPath)) { + const socket = await isServerAvailable(ipcPath); + if (socket) { + // socket.unref(); clearInterval(id); resolve({ worker, - ipcPath, + socket, }); } else if (attempts > 1000) { // daemon fails to start, the process probably exited @@ -337,12 +332,11 @@ async function startPluginWorker() { }); } -function isServerAvailable(ipcPath: string): Promise { +function isServerAvailable(ipcPath: string): Promise { return new Promise((resolve) => { try { const socket = connect(ipcPath, () => { - socket.destroy(); - resolve(true); + resolve(socket); }); socket.once('error', () => { resolve(false); From 2f2ec2ddcc1568e4c7cae6d6636145bd37406aef Mon Sep 17 00:00:00 2001 From: Craigory Coppola Date: Wed, 26 Jun 2024 10:46:19 -0400 Subject: [PATCH 10/12] fix(core): add timeout on messaging plugin workers to prevent node exiting --- .../plugins/isolation/plugin-pool.ts | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts index df76b0309a193..550b074643432 100644 --- a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts +++ b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts @@ -21,6 +21,8 @@ const cleanupFunctions = new Set<() => void>(); const pluginNames = new Map(); +const MAX_MESSAGE_WAIT = 1000 * 60 * 5; // 5 minutes + interface PendingPromise { promise: Promise; resolver: (result: any) => void; @@ -64,10 +66,23 @@ export async function loadRemoteNxPlugin( }); // logger.verbose(`[plugin-worker] started worker: ${worker.pid}`); + const loadTimeout = setTimeout(() => { + rej(new Error('Plugin worker timed out when loading plugin:' + plugin)); + }, MAX_MESSAGE_WAIT); + socket.on( 'data', consumeMessagesFromSocket( - createWorkerHandler(worker, pendingPromises, res, rej, socket) + createWorkerHandler( + worker, + pendingPromises, + (val) => { + clearTimeout(loadTimeout); + res(val); + }, + rej, + socket + ) ) ); worker.on('exit', exitHandler); @@ -249,15 +264,20 @@ function registerPendingPromise( pending: Map, callback: () => void ): Promise { - let resolver, rejector; + let resolver, rejector, timeout; const promise = new Promise((res, rej) => { - resolver = res; rejector = rej; + resolver = res; + + timeout = setTimeout(() => { + rej(new Error(`Plugin worker timed out when processing message ${tx}`)); + }, MAX_MESSAGE_WAIT); callback(); }).finally(() => { pending.delete(tx); + clearTimeout(timeout); }); pending.set(tx, { @@ -315,7 +335,7 @@ async function startPluginWorker() { const id = setInterval(async () => { const socket = await isServerAvailable(ipcPath); if (socket) { - // socket.unref(); + socket.unref(); clearInterval(id); resolve({ worker, From 5d1ff97e1758a3da46f109523bf06139349f4e5f Mon Sep 17 00:00:00 2001 From: Craigory Coppola Date: Wed, 26 Jun 2024 12:42:01 -0400 Subject: [PATCH 11/12] chore(repo): reenable unit tests --- .circleci/config.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index ced0e0a694b1a..ba498cd4d15a2 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -98,6 +98,10 @@ jobs: - browser-tools/install-chrome - run-pnpm-install: os: linux + - run: + name: Check Documentation + command: pnpm nx documentation --no-dte + no_output_timeout: 20m - run: name: Run Checks/Lint/Test/Build no_output_timeout: 60m @@ -110,7 +114,7 @@ jobs: pnpm nx run-many -t check-imports check-commit check-lock-files check-codeowners documentation --parallel=1 --no-dte & pids+=($!) - (pnpm nx affected --targets=lint,test,build --base=$NX_BASE --head=$NX_HEAD --parallel=3 --exclude nx && + (pnpm nx affected --targets=lint,test,build --base=$NX_BASE --head=$NX_HEAD --parallel=3 && pnpm nx affected --targets=e2e,e2e-ci --base=$NX_BASE --head=$NX_HEAD --parallel=1) & pids+=($!) From c900c76799d05947db9a9c2f6cca00e00b937581 Mon Sep 17 00:00:00 2001 From: Craigory Coppola Date: Wed, 26 Jun 2024 13:09:58 -0400 Subject: [PATCH 12/12] fix(core): ensure worker handles create metadata --- packages/nx/src/project-graph/plugins/isolation/messaging.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/nx/src/project-graph/plugins/isolation/messaging.ts b/packages/nx/src/project-graph/plugins/isolation/messaging.ts index e764247bba814..bafc17fd32551 100644 --- a/packages/nx/src/project-graph/plugins/isolation/messaging.ts +++ b/packages/nx/src/project-graph/plugins/isolation/messaging.ts @@ -161,6 +161,7 @@ export function isPluginWorkerMessage( 'createNodes', 'createDependencies', 'processProjectGraph', + 'createMetadata', ].includes(message.type) ); } @@ -177,6 +178,7 @@ export function isPluginWorkerResult( 'createNodesResult', 'createDependenciesResult', 'processProjectGraphResult', + 'createMetadataResult', ].includes(message.type) ); }