From 155844fe25879aca840bd8db51b0bf17a58f59dd Mon Sep 17 00:00:00 2001 From: Craigory Coppola Date: Mon, 18 Nov 2024 17:59:33 -0500 Subject: [PATCH] feat(core): transition daemon from queue to transaction based messaging --- nx.json | 55 +-- packages/angular/index.ts | 1 + packages/angular/src/plugins/plugin.ts | 3 +- packages/nx/src/command-line/graph/graph.ts | 2 +- packages/nx/src/daemon/client/client.ts | 349 ++++++++++-------- .../daemon/client/daemon-socket-messenger.ts | 3 +- .../flush-sync-generator-changes-to-disk.ts | 4 +- .../daemon/message-types/force-shutdown.ts | 4 +- .../message-types/get-context-file-data.ts | 4 +- .../message-types/get-files-in-directory.ts | 4 +- .../message-types/get-nx-workspace-files.ts | 4 +- .../get-registered-sync-generators.ts | 4 +- .../get-sync-generator-changes.ts | 4 +- packages/nx/src/daemon/message-types/glob.ts | 4 +- .../nx/src/daemon/message-types/hash-glob.ts | 4 +- .../nx/src/daemon/message-types/hash-tasks.ts | 23 ++ .../message-types/output-hashes-match.ts | 20 + .../message-types/process-in-background.ts | 21 ++ .../message-types/record-outputs-hash.ts | 20 + .../src/daemon/message-types/task-history.ts | 7 +- .../message-types/update-workspace-context.ts | 4 +- .../file-watching/file-watcher-sockets.ts | 2 +- .../nx/src/daemon/server/handle-hash-tasks.ts | 19 +- .../daemon/server/handle-outputs-tracking.ts | 16 +- .../server/handle-process-in-background.ts | 9 +- packages/nx/src/daemon/server/server.ts | 81 ++-- .../nx/src/daemon/server/shutdown-utils.ts | 48 ++- .../plugins/isolation/plugin-pool.ts | 71 +--- .../plugins/isolation/plugin-worker.ts | 2 +- packages/nx/src/tasks-runner/pseudo-ipc.ts | 2 +- .../src/utils/consume-messages-from-socket.ts | 19 - ...-from-socket.spec.ts => messaging.spec.ts} | 2 +- packages/nx/src/utils/messaging.ts | 59 +++ 33 files changed, 522 insertions(+), 352 deletions(-) create mode 100644 packages/nx/src/daemon/message-types/hash-tasks.ts create mode 100644 packages/nx/src/daemon/message-types/output-hashes-match.ts create mode 100644 packages/nx/src/daemon/message-types/process-in-background.ts create mode 100644 packages/nx/src/daemon/message-types/record-outputs-hash.ts delete mode 100644 packages/nx/src/utils/consume-messages-from-socket.ts rename packages/nx/src/utils/{consume-messages-from-socket.spec.ts => messaging.spec.ts} (95%) create mode 100644 packages/nx/src/utils/messaging.ts diff --git a/nx.json b/nx.json index d1f329741846e..3e17a925058e4 100644 --- a/nx.json +++ b/nx.json @@ -181,66 +181,13 @@ } }, "plugins": [ - "@monodon/rust", - { - "plugin": "@nx/playwright/plugin", - "options": { - "targetName": "pw-e2e", - "ciTargetName": "e2e-ci" - } - }, { "plugin": "@nx/eslint/plugin", "exclude": ["packages/**/__fixtures__/**/*"], "options": { "targetName": "lint" } - }, - { - "plugin": "@nx/jest/plugin", - "exclude": [ - "e2e/**/*", - "packages/**/__fixtures__/**/*", - "jest.config.ts" - ], - "options": { - "targetName": "test" - } - }, - { - "plugin": "@nx/webpack/plugin", - "options": { - "serveTargetName": "serve-base", - "buildTargetName": "build-client" - } - }, - { - "plugin": "@nx/jest/plugin", - "include": ["e2e/**/*"], - "exclude": ["e2e/detox/**/*", "e2e/react-native/**/*", "e2e/expo/**/*"], - "options": { - "targetName": "e2e-local", - "ciTargetName": "e2e-ci" - } - }, - { - "plugin": "@nx/jest/plugin", - "include": ["e2e/detox/**/*", "e2e/react-native/**/*", "e2e/expo/**/*"], - "options": { - "targetName": "e2e-macos-local", - "ciTargetName": "e2e-macos-ci" - } - }, - { - "plugin": "@nx/next/plugin", - "options": { - "startTargetName": "next:start", - "buildTargetName": "next:build", - "devTargetName": "dev", - "serveStaticTargetName": "serve-static" - } - }, - "@nx/powerpack-enterprise-cloud" + } ], "nxCloudId": "62d013ea0852fe0a2df74438", "nxCloudUrl": "https://staging.nx.app", diff --git a/packages/angular/index.ts b/packages/angular/index.ts index 270769c0b94af..1d0ea25c99679 100644 --- a/packages/angular/index.ts +++ b/packages/angular/index.ts @@ -4,3 +4,4 @@ export { optimisticUpdate, pessimisticUpdate, } from './src/runtime/nx/data-persistence'; +// asda diff --git a/packages/angular/src/plugins/plugin.ts b/packages/angular/src/plugins/plugin.ts index 7c41b9647e8f5..f21837fadb408 100644 --- a/packages/angular/src/plugins/plugin.ts +++ b/packages/angular/src/plugins/plugin.ts @@ -67,8 +67,6 @@ const knownExecutors = { test: new Set(['@angular-devkit/build-angular:karma']), }; -const pmc = getPackageManagerCommand(); - function readProjectsCache(cachePath: string): Record { return existsSync(cachePath) ? readJsonFile(cachePath) : {}; } @@ -144,6 +142,7 @@ async function buildAngularProjects( ): Promise { const projects: Record = {}; + const pmc = getPackageManagerCommand(); const absoluteConfigFilePath = join(context.workspaceRoot, configFilePath); const angularJson = readJsonFile(absoluteConfigFilePath); diff --git a/packages/nx/src/command-line/graph/graph.ts b/packages/nx/src/command-line/graph/graph.ts index e355d7d137682..e0ca615a9d8bf 100644 --- a/packages/nx/src/command-line/graph/graph.ts +++ b/packages/nx/src/command-line/graph/graph.ts @@ -560,7 +560,7 @@ async function startServer( } if (watchForChanges && daemonClient.enabled()) { - unregisterFileWatcher = await createFileWatcher(); + // unregisterFileWatcher = await createFileWatcher(); } const { projectGraphClientResponse, sourceMapResponse } = diff --git a/packages/nx/src/daemon/client/client.ts b/packages/nx/src/daemon/client/client.ts index f3d4ca7c02744..01057d09843e2 100644 --- a/packages/nx/src/daemon/client/client.ts +++ b/packages/nx/src/daemon/client/client.ts @@ -77,12 +77,38 @@ import { FLUSH_SYNC_GENERATOR_CHANGES_TO_DISK, type HandleFlushSyncGeneratorChangesToDiskMessage, } from '../message-types/flush-sync-generator-changes-to-disk'; +import { PendingPromise, registerPendingPromise } from '../../utils/messaging'; +import { + HandleRecordOutputsHashMessage, + RECORD_OUTPUTS_HASH, +} from '../message-types/record-outputs-hash'; const DAEMON_ENV_SETTINGS = { NX_PROJECT_GLOB_CACHE: 'false', NX_CACHE_PROJECTS_CONFIG: 'false', }; +const DAEMON_TIMEOUT_HINT_TEXT = + 'As a last resort, you can set NX_DAEMON_NO_TIMEOUTS=true to bypass this timeout.'; + +const MINUTES = 15; + +const MAX_MESSAGE_WAIT = + process.env.NX_DAEMON_NO_TIMEOUTS === 'true' + ? // Registering a timeout prevents the process from exiting + // if the call to a plugin happens to be the only thing + // keeping the process alive. As such, even if timeouts are disabled + // we need to register one. 2147483647 is the max timeout + // that Node.js allows, and is equivalent to 24.8 days.... + // This does mean that the NX_PLUGIN_NO_TIMEOUTS env var + // would still timeout after 24.8 days, but that seems + // like a reasonable compromise. + 2147483647 + : 1000 * 60 * MINUTES; // 10 minutes + +const getTimeoutErrorMessage = (messageType: string) => () => + `The daemon process has not responded to the ${messageType} message within ${MINUTES} minutes. ${DAEMON_TIMEOUT_HINT_TEXT}`; + export type UnregisterCallback = () => void; export type ChangedFile = { path: string; @@ -107,12 +133,14 @@ export class DaemonClient { this.reset(); } - private queue: PromisedBasedQueue; + // private queue: PromisedBasedQueue; private socketMessenger: DaemonSocketMessenger; - private currentMessage; - private currentResolve; - private currentReject; + private pendingPromises: Map = new Map(); + private pendingMessages: Map = new Map(); + // private currentMessage; + // private currentResolve; + // private currentReject; private _enabled: boolean | undefined; private _daemonStatus: DaemonStatus = DaemonStatus.DISCONNECTED; @@ -120,6 +148,11 @@ export class DaemonClient { private _daemonReady: () => void | null = null; private _out: FileHandle = null; private _err: FileHandle = null; + private txId: number = 0; + + getNextTxId(type: string) { + return `${process.pid}:${type}:${this.txId++}`; + } enabled() { if (this._enabled === undefined) { @@ -169,10 +202,11 @@ export class DaemonClient { reset() { this.socketMessenger?.close(); this.socketMessenger = null; - this.queue = new PromisedBasedQueue(); - this.currentMessage = null; - this.currentResolve = null; - this.currentReject = null; + // this.queue = new PromisedBasedQueue(); + // this.currentMessage = null; + // this.currentResolve = null; + // this.currentReject = null; + this.pendingPromises.clear(); this._enabled = undefined; this._out?.close(); @@ -186,33 +220,50 @@ export class DaemonClient { ); } + private sendMessageToDaemonViaTx< + T extends { type: string } = { type: string } + >(message: T) { + const tx = this.getNextTxId(message.type); + this.pendingMessages.set(tx, message); + return registerPendingPromise( + tx, + this.pendingPromises, + () => { + this.sendMessageToDaemon({ ...message, tx }); + }, + getTimeoutErrorMessage(message.type), + MAX_MESSAGE_WAIT + ); + } + async requestShutdown(): Promise { - return this.sendToDaemonViaQueue({ type: 'REQUEST_SHUTDOWN' }); + return this.sendMessageToDaemonViaTx({ type: 'REQUEST_SHUTDOWN' }); } async getProjectGraphAndSourceMaps(): Promise<{ projectGraph: ProjectGraph; sourceMaps: ConfigurationSourceMaps; }> { - try { - const response = await this.sendToDaemonViaQueue({ - type: 'REQUEST_PROJECT_GRAPH', + return this.sendMessageToDaemonViaTx({ + type: 'REQUEST_PROJECT_GRAPH', + }) + .then((response) => { + return { + projectGraph: response.projectGraph, + sourceMaps: response.sourceMaps, + }; + }) + .catch((e) => { + if (e.name === DaemonProjectGraphError.name) { + throw ProjectGraphError.fromDaemonProjectGraphError(e); + } else { + throw e; + } }); - return { - projectGraph: response.projectGraph, - sourceMaps: response.sourceMaps, - }; - } catch (e) { - if (e.name === DaemonProjectGraphError.name) { - throw ProjectGraphError.fromDaemonProjectGraphError(e); - } else { - throw e; - } - } } async getAllFileData(): Promise { - return await this.sendToDaemonViaQueue({ type: 'REQUEST_FILE_DATA' }); + return this.sendMessageToDaemonViaTx({ type: 'REQUEST_ALL_FILE_DATA' }); } hashTasks( @@ -221,7 +272,7 @@ export class DaemonClient { taskGraph: TaskGraph, env: NodeJS.ProcessEnv ): Promise { - return this.sendToDaemonViaQueue({ + return this.sendMessageToDaemonViaTx({ type: 'HASH_TASKS', runnerOptions, env, @@ -244,7 +295,7 @@ export class DaemonClient { changedFiles: ChangedFile[]; } | null ) => void - ): Promise { + ) /*: Promise */ { try { await this.getProjectGraphAndSourceMaps(); } catch (e) { @@ -254,12 +305,9 @@ export class DaemonClient { throw e; } } - let messenger: DaemonSocketMessenger | undefined; - await this.queue.sendToQueue(() => { - messenger = new DaemonSocketMessenger( - connect(getFullOsSocketPath()) - ).listen( + let messenger: DaemonSocketMessenger | undefined = + new DaemonSocketMessenger(connect(getFullOsSocketPath())).listen( (message) => { try { const parsedMessage = JSON.parse(message); @@ -273,8 +321,18 @@ export class DaemonClient { }, (err) => callback(err, null) ); - return messenger.sendMessage({ type: 'REGISTER_FILE_WATCHER', config }); - }); + + const tx = this.getNextTxId('REGISTER_FILE_WATCHER'); + + await registerPendingPromise( + tx, + this.pendingPromises, + () => { + messenger.sendMessage({ type: 'REGISTER_FILE_WATCHER', config, tx }); + }, + getTimeoutErrorMessage('REGISTER_FILE_WATCHER'), + MAX_MESSAGE_WAIT + ); return () => { messenger?.close(); @@ -282,7 +340,7 @@ export class DaemonClient { } processInBackground(requirePath: string, data: any): Promise { - return this.sendToDaemonViaQueue({ + return this.sendMessageToDaemonViaTx({ type: 'PROCESS_IN_BACKGROUND', requirePath, data, @@ -290,8 +348,8 @@ export class DaemonClient { } recordOutputsHash(outputs: string[], hash: string): Promise { - return this.sendToDaemonViaQueue({ - type: 'RECORD_OUTPUTS_HASH', + return this.sendMessageToDaemonViaTx({ + type: RECORD_OUTPUTS_HASH, data: { outputs, hash, @@ -300,7 +358,7 @@ export class DaemonClient { } outputsHashesMatch(outputs: string[], hash: string): Promise { - return this.sendToDaemonViaQueue({ + return this.sendMessageToDaemonViaTx({ type: 'OUTPUTS_HASHES_MATCH', data: { outputs, @@ -310,104 +368,95 @@ export class DaemonClient { } glob(globs: string[], exclude?: string[]): Promise { - const message: HandleGlobMessage = { + return this.sendMessageToDaemonViaTx({ type: 'GLOB', globs, exclude, - }; - return this.sendToDaemonViaQueue(message); + }); } getWorkspaceContextFileData(): Promise { - const message: HandleContextFileDataMessage = { + return this.sendMessageToDaemonViaTx({ type: GET_CONTEXT_FILE_DATA, - }; - return this.sendToDaemonViaQueue(message); + }); } getWorkspaceFiles( projectRootMap: Record ): Promise { - const message: HandleNxWorkspaceFilesMessage = { + return this.sendMessageToDaemonViaTx({ type: GET_NX_WORKSPACE_FILES, projectRootMap, - }; - return this.sendToDaemonViaQueue(message); + }); } getFilesInDirectory(dir: string): Promise { - const message: HandleGetFilesInDirectoryMessage = { + return this.sendMessageToDaemonViaTx({ type: GET_FILES_IN_DIRECTORY, dir, - }; - return this.sendToDaemonViaQueue(message); + }); } hashGlob(globs: string[], exclude?: string[]): Promise { - const message: HandleHashGlobMessage = { + return this.sendMessageToDaemonViaTx({ type: HASH_GLOB, globs, exclude, - }; - return this.sendToDaemonViaQueue(message); + }); } getFlakyTasks(hashes: string[]): Promise { - const message: HandleGetFlakyTasks = { + return this.sendMessageToDaemonViaTx({ type: GET_FLAKY_TASKS, hashes, - }; - - return this.sendToDaemonViaQueue(message); + }); } async getEstimatedTaskTimings( targets: TaskTarget[] ): Promise> { - const message: HandleGetEstimatedTaskTimings = { + return this.sendMessageToDaemonViaTx({ type: GET_ESTIMATED_TASK_TIMINGS, targets, - }; - - return this.sendToDaemonViaQueue(message); + }); } recordTaskRuns(taskRuns: TaskRun[]): Promise { - const message: HandleRecordTaskRunsMessage = { + return this.sendMessageToDaemonViaTx({ type: RECORD_TASK_RUNS, taskRuns, - }; - return this.sendMessageToDaemon(message); + }); } getSyncGeneratorChanges( generators: string[] ): Promise { - const message: HandleGetSyncGeneratorChangesMessage = { + return this.sendMessageToDaemonViaTx({ type: GET_SYNC_GENERATOR_CHANGES, generators, - }; - return this.sendToDaemonViaQueue(message); + }); } flushSyncGeneratorChangesToDisk( generators: string[] ): Promise { - const message: HandleFlushSyncGeneratorChangesToDiskMessage = { - type: FLUSH_SYNC_GENERATOR_CHANGES_TO_DISK, - generators, - }; - return this.sendToDaemonViaQueue(message); + return this.sendMessageToDaemonViaTx( + { + type: FLUSH_SYNC_GENERATOR_CHANGES_TO_DISK, + generators, + } + ); } getRegisteredSyncGenerators(): Promise<{ globalGenerators: string[]; taskGenerators: string[]; }> { - const message: HandleGetRegisteredSyncGeneratorsMessage = { - type: GET_REGISTERED_SYNC_GENERATORS, - }; - return this.sendToDaemonViaQueue(message); + return this.sendMessageToDaemonViaTx( + { + type: GET_REGISTERED_SYNC_GENERATORS, + } + ); } updateWorkspaceContext( @@ -415,13 +464,12 @@ export class DaemonClient { updatedFiles: string[], deletedFiles: string[] ): Promise { - const message: HandleUpdateWorkspaceContextMessage = { + return this.sendMessageToDaemonViaTx({ type: UPDATE_WORKSPACE_CONTEXT, createdFiles, updatedFiles, deletedFiles, - }; - return this.sendToDaemonViaQueue(message); + }); } async isServerAvailable(): Promise { @@ -440,10 +488,11 @@ export class DaemonClient { }); } - private async sendToDaemonViaQueue(messageToDaemon: Message): Promise { - return this.queue.sendToQueue(() => - this.sendMessageToDaemon(messageToDaemon) - ); + private rejectAllPendingPromises(err) { + this.pendingPromises.forEach((pending) => { + pending.rejector(err); + }); + this.pendingPromises.clear(); } private setUpConnection() { @@ -454,7 +503,7 @@ export class DaemonClient { () => { // it's ok for the daemon to terminate if the client doesn't wait on // any messages from the daemon - if (this.queue.isEmpty()) { + if (this.pendingPromises.size === 0) { this.reset(); } else { output.error({ @@ -465,9 +514,11 @@ export class DaemonClient { ], }); this._daemonStatus = DaemonStatus.DISCONNECTED; - this.currentReject?.( - daemonProcessException( - 'Daemon process terminated and closed the connection' + this.pendingPromises.forEach((pending) => + pending.rejector?.( + daemonProcessException( + 'Daemon process terminated and closed the connection' + ) ) ); process.exit(1); @@ -475,19 +526,23 @@ export class DaemonClient { }, (err) => { if (!err.message) { - return this.currentReject(daemonProcessException(err.toString())); - } - - if (err.message.startsWith('LOCK-FILES-CHANGED')) { - // retry the current message - // we cannot send it via the queue because we are in the middle of processing - // a message from the queue - return this.sendMessageToDaemon(this.currentMessage).then( - this.currentResolve, - this.currentReject + return this.rejectAllPendingPromises( + daemonProcessException(err.toString()) ); } + // TODO: @AgentEnder - figure out how to handle this... Not sure this is + // actually even being used. + // if (err.message.startsWith('LOCK-FILES-CHANGED')) { + // // retry the current message + // // we cannot send it via the queue because we are in the middle of processing + // // a message from the queue + // return this.sendMessageToDaemon(this.currentMessage).then( + // this.currentResolve, + // this.currentReject + // ); + // } + let error: any; if (err.message.startsWith('connect ENOENT')) { error = daemonProcessException('The Daemon Server is not running'); @@ -503,7 +558,7 @@ export class DaemonClient { } else { error = daemonProcessException(err.toString()); } - return this.currentReject(error); + return this.rejectAllPendingPromises(error); } ); } @@ -521,60 +576,67 @@ export class DaemonClient { } else if (this._daemonStatus == DaemonStatus.CONNECTING) { await this._waitForDaemonReady; } - // An open promise isn't enough to keep the event loop - // alive, so we set a timeout here and clear it when we hear - // back - const keepAlive = setTimeout(() => {}, 10 * 60 * 1000); - return new Promise((resolve, reject) => { - performance.mark('sendMessageToDaemon-start'); - - this.currentMessage = message; - this.currentResolve = resolve; - this.currentReject = reject; - - this.socketMessenger.sendMessage(message); - }).finally(() => { - clearTimeout(keepAlive); - }); + this.socketMessenger.sendMessage(message); } private handleMessage(serializedResult: string) { try { performance.mark('json-parse-start'); - const parsedResult = JSON.parse(serializedResult); + const { response: parsedResult, tx } = JSON.parse(serializedResult); performance.mark('json-parse-end'); performance.measure( 'deserialize daemon response', 'json-parse-start', 'json-parse-end' ); - if (parsedResult.error) { - this.currentReject(parsedResult.error); + if (tx) { + const pending = this.pendingPromises.get(tx); + if (pending) { + if (parsedResult.error) { + pending.rejector(parsedResult.error); + } else { + pending.resolver(parsedResult); + } + this.pendingPromises.delete(tx); + } else { + output.error({ + title: `Received a response with an unknown transaction ID: ${tx}`, + bodyLines: [ + 'This is likely a bug in Nx. Please report it.', + 'The response will be ignored.', + ], + }); + } } else { - performance.measure( - 'total for sendMessageToDaemon()', - 'sendMessageToDaemon-start', - 'json-parse-end' - ); - return this.currentResolve(parsedResult); + output.error({ + title: `Received a response without a transaction ID: ${JSON.stringify( + parsedResult + )}`, + bodyLines: [ + 'This is likely a bug in Nx. Please report it.', + 'The response will be ignored.', + ], + }); } } catch (e) { const endOfResponse = serializedResult.length > 300 ? serializedResult.substring(serializedResult.length - 300) : serializedResult; - this.currentReject( - daemonProcessException( - [ - 'Could not deserialize response from Nx daemon.', - `Message: ${e.message}`, - '\n', - `Received:`, - endOfResponse, - '\n', - ].join('\n') - ) - ); + this.pendingPromises.forEach((pending) => { + pending.rejector( + daemonProcessException( + [ + 'Could not deserialize response from Nx daemon.', + `Message: ${e.message}`, + '\n', + `Received:`, + endOfResponse, + '\n', + ].join('\n') + ) + ); + }); } } @@ -629,18 +691,17 @@ export class DaemonClient { } async stop(): Promise { - try { - await this.sendMessageToDaemon({ type: FORCE_SHUTDOWN }); - await waitForDaemonToExitAndCleanupProcessJson(); - } catch (err) { - output.error({ - title: - err?.message || - 'Something unexpected went wrong when stopping the daemon server', + return this.sendMessageToDaemonViaTx({ type: 'FORCE_SHUTDOWN' }) + .catch((e) => { + output.error({ + title: + e?.message || + 'Something unexpected went wrong when stopping the daemon server', + }); + }) + .then(() => { + removeSocketDir(); }); - } - - removeSocketDir(); } } diff --git a/packages/nx/src/daemon/client/daemon-socket-messenger.ts b/packages/nx/src/daemon/client/daemon-socket-messenger.ts index 34c00ff12e839..00ebff88b66bf 100644 --- a/packages/nx/src/daemon/client/daemon-socket-messenger.ts +++ b/packages/nx/src/daemon/client/daemon-socket-messenger.ts @@ -1,11 +1,12 @@ import { randomUUID } from 'crypto'; import { Socket } from 'net'; import { performance } from 'perf_hooks'; -import { consumeMessagesFromSocket } from '../../utils/consume-messages-from-socket'; +import { consumeMessagesFromSocket } from '../../utils/messaging'; export interface Message extends Record { type: string; data?: any; + tx?: string; } export class DaemonSocketMessenger { diff --git a/packages/nx/src/daemon/message-types/flush-sync-generator-changes-to-disk.ts b/packages/nx/src/daemon/message-types/flush-sync-generator-changes-to-disk.ts index 57350094b8494..9649ef8d2156e 100644 --- a/packages/nx/src/daemon/message-types/flush-sync-generator-changes-to-disk.ts +++ b/packages/nx/src/daemon/message-types/flush-sync-generator-changes-to-disk.ts @@ -1,7 +1,9 @@ +import { Message } from '../client/daemon-socket-messenger'; + export const FLUSH_SYNC_GENERATOR_CHANGES_TO_DISK = 'CLEAR_CACHED_SYNC_GENERATOR_CHANGES' as const; -export type HandleFlushSyncGeneratorChangesToDiskMessage = { +export type HandleFlushSyncGeneratorChangesToDiskMessage = Message & { type: typeof FLUSH_SYNC_GENERATOR_CHANGES_TO_DISK; generators: string[]; }; diff --git a/packages/nx/src/daemon/message-types/force-shutdown.ts b/packages/nx/src/daemon/message-types/force-shutdown.ts index 537d6f08c2ceb..871acfc67a104 100644 --- a/packages/nx/src/daemon/message-types/force-shutdown.ts +++ b/packages/nx/src/daemon/message-types/force-shutdown.ts @@ -1,6 +1,8 @@ +import { Message } from '../client/daemon-socket-messenger'; + export const FORCE_SHUTDOWN = 'FORCE_SHUTDOWN' as const; -export type HandleForceShutdownMessage = { +export type HandleForceShutdownMessage = Message & { type: typeof FORCE_SHUTDOWN; }; diff --git a/packages/nx/src/daemon/message-types/get-context-file-data.ts b/packages/nx/src/daemon/message-types/get-context-file-data.ts index 935f1cbb87609..75b7075ff0c08 100644 --- a/packages/nx/src/daemon/message-types/get-context-file-data.ts +++ b/packages/nx/src/daemon/message-types/get-context-file-data.ts @@ -1,6 +1,8 @@ +import { Message } from '../client/daemon-socket-messenger'; + export const GET_CONTEXT_FILE_DATA = 'GET_CONTEXT_FILE_DATA' as const; -export type HandleContextFileDataMessage = { +export type HandleContextFileDataMessage = Message & { type: typeof GET_CONTEXT_FILE_DATA; }; diff --git a/packages/nx/src/daemon/message-types/get-files-in-directory.ts b/packages/nx/src/daemon/message-types/get-files-in-directory.ts index 022a2f8756622..16cd05d8f8f94 100644 --- a/packages/nx/src/daemon/message-types/get-files-in-directory.ts +++ b/packages/nx/src/daemon/message-types/get-files-in-directory.ts @@ -1,6 +1,8 @@ +import { Message } from '../client/daemon-socket-messenger'; + export const GET_FILES_IN_DIRECTORY = 'GET_FILES_IN_DIRECTORY' as const; -export type HandleGetFilesInDirectoryMessage = { +export type HandleGetFilesInDirectoryMessage = Message & { type: typeof GET_FILES_IN_DIRECTORY; dir: string; }; diff --git a/packages/nx/src/daemon/message-types/get-nx-workspace-files.ts b/packages/nx/src/daemon/message-types/get-nx-workspace-files.ts index 872157b091f9e..bf73f56a192d8 100644 --- a/packages/nx/src/daemon/message-types/get-nx-workspace-files.ts +++ b/packages/nx/src/daemon/message-types/get-nx-workspace-files.ts @@ -1,6 +1,8 @@ +import { Message } from '../client/daemon-socket-messenger'; + export const GET_NX_WORKSPACE_FILES = 'GET_NX_WORKSPACE_FILES' as const; -export type HandleNxWorkspaceFilesMessage = { +export type HandleNxWorkspaceFilesMessage = Message & { type: typeof GET_NX_WORKSPACE_FILES; projectRootMap: Record; }; diff --git a/packages/nx/src/daemon/message-types/get-registered-sync-generators.ts b/packages/nx/src/daemon/message-types/get-registered-sync-generators.ts index 026ed6a32b499..1f2cf977ed951 100644 --- a/packages/nx/src/daemon/message-types/get-registered-sync-generators.ts +++ b/packages/nx/src/daemon/message-types/get-registered-sync-generators.ts @@ -1,7 +1,9 @@ +import { Message } from '../client/daemon-socket-messenger'; + export const GET_REGISTERED_SYNC_GENERATORS = 'GET_REGISTERED_SYNC_GENERATORS' as const; -export type HandleGetRegisteredSyncGeneratorsMessage = { +export type HandleGetRegisteredSyncGeneratorsMessage = Message & { type: typeof GET_REGISTERED_SYNC_GENERATORS; }; diff --git a/packages/nx/src/daemon/message-types/get-sync-generator-changes.ts b/packages/nx/src/daemon/message-types/get-sync-generator-changes.ts index 2dfe05edac5c3..02a5c56f30288 100644 --- a/packages/nx/src/daemon/message-types/get-sync-generator-changes.ts +++ b/packages/nx/src/daemon/message-types/get-sync-generator-changes.ts @@ -1,6 +1,8 @@ +import { Message } from '../client/daemon-socket-messenger'; + export const GET_SYNC_GENERATOR_CHANGES = 'GET_SYNC_GENERATOR_CHANGES' as const; -export type HandleGetSyncGeneratorChangesMessage = { +export type HandleGetSyncGeneratorChangesMessage = Message & { type: typeof GET_SYNC_GENERATOR_CHANGES; generators: string[]; }; diff --git a/packages/nx/src/daemon/message-types/glob.ts b/packages/nx/src/daemon/message-types/glob.ts index 50fd3071cdded..ef78d74387706 100644 --- a/packages/nx/src/daemon/message-types/glob.ts +++ b/packages/nx/src/daemon/message-types/glob.ts @@ -1,6 +1,8 @@ +import { Message } from '../client/daemon-socket-messenger'; + export const GLOB = 'GLOB' as const; -export type HandleGlobMessage = { +export type HandleGlobMessage = Message & { type: typeof GLOB; globs: string[]; exclude?: string[]; diff --git a/packages/nx/src/daemon/message-types/hash-glob.ts b/packages/nx/src/daemon/message-types/hash-glob.ts index 9bec1f566c14c..50f233a1e6cfa 100644 --- a/packages/nx/src/daemon/message-types/hash-glob.ts +++ b/packages/nx/src/daemon/message-types/hash-glob.ts @@ -1,6 +1,8 @@ +import { Message } from '../client/daemon-socket-messenger'; + export const HASH_GLOB = 'HASH_GLOB' as const; -export type HandleHashGlobMessage = { +export type HandleHashGlobMessage = Message & { type: typeof HASH_GLOB; globs: string[]; exclude?: string[]; diff --git a/packages/nx/src/daemon/message-types/hash-tasks.ts b/packages/nx/src/daemon/message-types/hash-tasks.ts new file mode 100644 index 0000000000000..fe19f68615743 --- /dev/null +++ b/packages/nx/src/daemon/message-types/hash-tasks.ts @@ -0,0 +1,23 @@ +import { Task, TaskGraph } from '../../config/task-graph'; +import { Message } from '../client/daemon-socket-messenger'; + +export const HASH_TASKS = 'HASH_TASKS' as const; + +export type HandleHashTasksMessage = Message & { + type: typeof HASH_TASKS; + runnerOptions: any; + env: any; + tasks: Task[]; + taskGraph: TaskGraph; +}; + +export function isHandleHashTasksMessage( + message: unknown +): message is HandleHashTasksMessage { + return ( + typeof message === 'object' && + message !== null && + 'type' in message && + message['type'] === HASH_TASKS + ); +} diff --git a/packages/nx/src/daemon/message-types/output-hashes-match.ts b/packages/nx/src/daemon/message-types/output-hashes-match.ts new file mode 100644 index 0000000000000..1169403c89e1a --- /dev/null +++ b/packages/nx/src/daemon/message-types/output-hashes-match.ts @@ -0,0 +1,20 @@ +import { Task, TaskGraph } from '../../config/task-graph'; +import { Message } from '../client/daemon-socket-messenger'; + +export const OUTPUTS_HASHES_MATCH = 'OUTPUTS_HASHES_MATCH' as const; + +export type HandleOutputHashesMatchMessage = Message & { + type: typeof OUTPUTS_HASHES_MATCH; + data: { outputs: string[]; hash: string }; +}; + +export function isHandleOutputHashesMatchMessage( + message: unknown +): message is HandleOutputHashesMatchMessage { + return ( + typeof message === 'object' && + message !== null && + 'type' in message && + message['type'] === OUTPUTS_HASHES_MATCH + ); +} diff --git a/packages/nx/src/daemon/message-types/process-in-background.ts b/packages/nx/src/daemon/message-types/process-in-background.ts new file mode 100644 index 0000000000000..3bf060c9284dd --- /dev/null +++ b/packages/nx/src/daemon/message-types/process-in-background.ts @@ -0,0 +1,21 @@ +import { Task, TaskGraph } from '../../config/task-graph'; +import { Message } from '../client/daemon-socket-messenger'; + +export const PROCESS_IN_BACKGROUND = 'PROCESS_IN_BACKGROUND' as const; + +export type HandleProcessInBackgroundMessage = Message & { + type: typeof PROCESS_IN_BACKGROUND; + requirePath: string; + data: any; +}; + +export function isHandleProcessInBackgroundMessageMessage( + message: unknown +): message is HandleProcessInBackgroundMessage { + return ( + typeof message === 'object' && + message !== null && + 'type' in message && + message['type'] === PROCESS_IN_BACKGROUND + ); +} diff --git a/packages/nx/src/daemon/message-types/record-outputs-hash.ts b/packages/nx/src/daemon/message-types/record-outputs-hash.ts new file mode 100644 index 0000000000000..950bec4a928d8 --- /dev/null +++ b/packages/nx/src/daemon/message-types/record-outputs-hash.ts @@ -0,0 +1,20 @@ +import { Task, TaskGraph } from '../../config/task-graph'; +import { Message } from '../client/daemon-socket-messenger'; + +export const RECORD_OUTPUTS_HASH = 'RECORD_OUTPUTS_HASH' as const; + +export type HandleRecordOutputsHashMessage = Message & { + type: typeof RECORD_OUTPUTS_HASH; + data: { outputs: string[]; hash: string }; +}; + +export function isHandleRecordOutputsHashMessage( + message: unknown +): message is HandleRecordOutputsHashMessage { + return ( + typeof message === 'object' && + message !== null && + 'type' in message && + message['type'] === RECORD_OUTPUTS_HASH + ); +} diff --git a/packages/nx/src/daemon/message-types/task-history.ts b/packages/nx/src/daemon/message-types/task-history.ts index 73b812a1c1aea..473aeaa4b94d3 100644 --- a/packages/nx/src/daemon/message-types/task-history.ts +++ b/packages/nx/src/daemon/message-types/task-history.ts @@ -1,20 +1,21 @@ import type { TaskRun, TaskTarget } from '../../native'; +import { Message } from '../client/daemon-socket-messenger'; export const GET_FLAKY_TASKS = 'GET_FLAKY_TASKS' as const; export const GET_ESTIMATED_TASK_TIMINGS = 'GET_ESTIMATED_TASK_TIMINGS' as const; export const RECORD_TASK_RUNS = 'RECORD_TASK_RUNS' as const; -export type HandleGetFlakyTasks = { +export type HandleGetFlakyTasks = Message & { type: typeof GET_FLAKY_TASKS; hashes: string[]; }; -export type HandleGetEstimatedTaskTimings = { +export type HandleGetEstimatedTaskTimings = Message & { type: typeof GET_ESTIMATED_TASK_TIMINGS; targets: TaskTarget[]; }; -export type HandleRecordTaskRunsMessage = { +export type HandleRecordTaskRunsMessage = Message & { type: typeof RECORD_TASK_RUNS; taskRuns: TaskRun[]; }; diff --git a/packages/nx/src/daemon/message-types/update-workspace-context.ts b/packages/nx/src/daemon/message-types/update-workspace-context.ts index d021b8340c626..6d61079480aa3 100644 --- a/packages/nx/src/daemon/message-types/update-workspace-context.ts +++ b/packages/nx/src/daemon/message-types/update-workspace-context.ts @@ -1,6 +1,8 @@ +import { Message } from '../client/daemon-socket-messenger'; + export const UPDATE_WORKSPACE_CONTEXT = 'UPDATE_WORKSPACE_CONTEXT' as const; -export type HandleUpdateWorkspaceContextMessage = { +export type HandleUpdateWorkspaceContextMessage = Message & { type: typeof UPDATE_WORKSPACE_CONTEXT; createdFiles: string[]; updatedFiles: string[]; diff --git a/packages/nx/src/daemon/server/file-watching/file-watcher-sockets.ts b/packages/nx/src/daemon/server/file-watching/file-watcher-sockets.ts index 453010d913bc9..1fd45c9c56fcd 100644 --- a/packages/nx/src/daemon/server/file-watching/file-watcher-sockets.ts +++ b/packages/nx/src/daemon/server/file-watching/file-watcher-sockets.ts @@ -89,7 +89,7 @@ export function notifyFileWatcherSockets( } if (changedProjects.length > 0 || changedFiles.length > 0) { - return handleResult(socket, 'FILE-WATCH-CHANGED', () => + return handleResult(socket, 'FILE-WATCH-CHANGED', null, () => Promise.resolve({ description: 'File watch changed', response: JSON.stringify({ diff --git a/packages/nx/src/daemon/server/handle-hash-tasks.ts b/packages/nx/src/daemon/server/handle-hash-tasks.ts index 211c5f6af576f..a1dcc875bee4c 100644 --- a/packages/nx/src/daemon/server/handle-hash-tasks.ts +++ b/packages/nx/src/daemon/server/handle-hash-tasks.ts @@ -3,6 +3,7 @@ import { getCachedSerializedProjectGraphPromise } from './project-graph-incremen import { InProcessTaskHasher } from '../../hasher/task-hasher'; import { readNxJson } from '../../config/configuration'; import { DaemonProjectGraphError } from '../../project-graph/error-types'; +import { HandleHashTasksMessage } from '../message-types/hash-tasks'; /** * We use this not to recreated hasher for every hash operation @@ -11,12 +12,7 @@ import { DaemonProjectGraphError } from '../../project-graph/error-types'; let storedProjectGraph: any = null; let storedHasher: InProcessTaskHasher | null = null; -export async function handleHashTasks(payload: { - runnerOptions: any; - env: any; - tasks: Task[]; - taskGraph: TaskGraph; -}) { +export async function handleHashTasks(payload: HandleHashTasksMessage) { const { error, projectGraph: _graph, @@ -45,9 +41,14 @@ export async function handleHashTasks(payload: { payload.runnerOptions ); } - const response = JSON.stringify( - await storedHasher.hashTasks(payload.tasks, payload.taskGraph, payload.env) - ); + const response = JSON.stringify({ + ...(await storedHasher.hashTasks( + payload.tasks, + payload.taskGraph, + payload.env + )), + tx: payload.tx, + }); return { response, description: 'handleHashTasks', diff --git a/packages/nx/src/daemon/server/handle-outputs-tracking.ts b/packages/nx/src/daemon/server/handle-outputs-tracking.ts index e5d59b73d9f35..e656b1b2bd5f0 100644 --- a/packages/nx/src/daemon/server/handle-outputs-tracking.ts +++ b/packages/nx/src/daemon/server/handle-outputs-tracking.ts @@ -1,10 +1,11 @@ import { HandlerResult } from './server'; import { outputsHashesMatch, recordOutputsHash } from './outputs-tracking'; +import { HandleRecordOutputsHashMessage } from '../message-types/record-outputs-hash'; +import { HandleOutputHashesMatchMessage } from '../message-types/output-hashes-match'; -export async function handleRecordOutputsHash(payload: { - type: string; - data: { outputs: string[]; hash: string }; -}): Promise { +export async function handleRecordOutputsHash( + payload: HandleRecordOutputsHashMessage +): Promise { try { await recordOutputsHash(payload.data.outputs, payload.data.hash); return { @@ -21,10 +22,9 @@ export async function handleRecordOutputsHash(payload: { } } -export async function handleOutputsHashesMatch(payload: { - type: string; - data: { outputs: string[]; hash: string }; -}): Promise { +export async function handleOutputsHashesMatch( + payload: HandleOutputHashesMatchMessage +): Promise { try { const res = await outputsHashesMatch( payload.data.outputs, diff --git a/packages/nx/src/daemon/server/handle-process-in-background.ts b/packages/nx/src/daemon/server/handle-process-in-background.ts index 6e58fb8ad9518..ed11afa66e37d 100644 --- a/packages/nx/src/daemon/server/handle-process-in-background.ts +++ b/packages/nx/src/daemon/server/handle-process-in-background.ts @@ -1,12 +1,11 @@ import { HandlerResult } from './server'; import { serverLogger } from './logger'; import { getNxRequirePaths } from '../../utils/installation-directory'; +import { HandleProcessInBackgroundMessage } from '../message-types/process-in-background'; -export async function handleProcessInBackground(payload: { - type: string; - requirePath: string; - data: any; -}): Promise { +export async function handleProcessInBackground( + payload: HandleProcessInBackgroundMessage +): Promise { let fn; try { fn = require(require.resolve(payload.requirePath, { diff --git a/packages/nx/src/daemon/server/server.ts b/packages/nx/src/daemon/server/server.ts index 00c3c72ba339c..57ae9357ac509 100644 --- a/packages/nx/src/daemon/server/server.ts +++ b/packages/nx/src/daemon/server/server.ts @@ -4,7 +4,7 @@ import { join } from 'path'; import { PerformanceObserver } from 'perf_hooks'; import { hashArray } from '../../hasher/file-hasher'; import { hashFile } from '../../native'; -import { consumeMessagesFromSocket } from '../../utils/consume-messages-from-socket'; +import { consumeMessagesFromSocket } from '../../utils/messaging'; import { readJsonFile } from '../../utils/fileutils'; import { PackageJson } from '../../utils/package-json'; import { nxVersion } from '../../utils/versions'; @@ -43,6 +43,7 @@ import { handleServerProcessTermination, resetInactivityTimeout, respondToClient, + respondWithError, respondWithErrorAndExit, SERVER_INACTIVITY_TIMEOUT_MS, storeOutputWatcherInstance, @@ -109,6 +110,20 @@ import { isHandleFlushSyncGeneratorChangesToDiskMessage, } from '../message-types/flush-sync-generator-changes-to-disk'; import { handleFlushSyncGeneratorChangesToDisk } from './handle-flush-sync-generator-changes-to-disk'; +import { Message } from '../client/daemon-socket-messenger'; +import { + HASH_TASKS, + isHandleHashTasksMessage, +} from '../message-types/hash-tasks'; +import { + isHandleProcessInBackgroundMessageMessage, + PROCESS_IN_BACKGROUND, +} from '../message-types/process-in-background'; +import { isHandleRecordOutputsHashMessage } from '../message-types/record-outputs-hash'; +import { + isHandleOutputHashesMatchMessage, + OUTPUTS_HASHES_MATCH, +} from '../message-types/output-hashes-match'; let performanceObserver: PerformanceObserver | undefined; let workspaceWatcherError: Error | undefined; @@ -185,7 +200,7 @@ async function handleMessage(socket, data: string) { resetInactivityTimeout(handleInactivityTimeout); const unparsedPayload = data; - let payload; + let payload: Message; try { payload = JSON.parse(unparsedPayload); } catch (e) { @@ -197,83 +212,88 @@ async function handleMessage(socket, data: string) { } if (payload.type === 'PING') { - await handleResult(socket, 'PING', () => + handleResult(socket, 'PING', payload.tx, () => Promise.resolve({ response: JSON.stringify(true), description: 'ping' }) ); } else if (payload.type === 'REQUEST_PROJECT_GRAPH') { - await handleResult(socket, 'REQUEST_PROJECT_GRAPH', () => + handleResult(socket, 'REQUEST_PROJECT_GRAPH', payload.tx, () => handleRequestProjectGraph() ); - } else if (payload.type === 'HASH_TASKS') { - await handleResult(socket, 'HASH_TASKS', () => handleHashTasks(payload)); - } else if (payload.type === 'PROCESS_IN_BACKGROUND') { - await handleResult(socket, 'PROCESS_IN_BACKGROUND', () => + } else if (isHandleHashTasksMessage(payload)) { + handleResult(socket, HASH_TASKS, payload.tx, () => + handleHashTasks(payload) + ); + } else if (isHandleProcessInBackgroundMessageMessage(payload)) { + handleResult(socket, PROCESS_IN_BACKGROUND, payload.tx, () => handleProcessInBackground(payload) ); - } else if (payload.type === 'RECORD_OUTPUTS_HASH') { - await handleResult(socket, 'RECORD_OUTPUTS_HASH', () => + } else if (isHandleRecordOutputsHashMessage(payload)) { + handleResult(socket, 'RECORD_OUTPUTS_HASH', payload.tx, () => handleRecordOutputsHash(payload) ); - } else if (payload.type === 'OUTPUTS_HASHES_MATCH') { - await handleResult(socket, 'OUTPUTS_HASHES_MATCH', () => + } else if (isHandleOutputHashesMatchMessage(payload)) { + handleResult(socket, OUTPUTS_HASHES_MATCH, payload.tx, () => handleOutputsHashesMatch(payload) ); } else if (payload.type === 'REQUEST_SHUTDOWN') { - await handleResult(socket, 'REQUEST_SHUTDOWN', () => + handleResult(socket, 'REQUEST_SHUTDOWN', payload.tx, () => handleRequestShutdown(server, numberOfOpenConnections) ); } else if (payload.type === 'REGISTER_FILE_WATCHER') { registeredFileWatcherSockets.push({ socket, config: payload.config }); } else if (isHandleGlobMessage(payload)) { - await handleResult(socket, GLOB, () => + handleResult(socket, GLOB, payload.tx, () => handleGlob(payload.globs, payload.exclude) ); } else if (isHandleNxWorkspaceFilesMessage(payload)) { - await handleResult(socket, GET_NX_WORKSPACE_FILES, () => + handleResult(socket, GET_NX_WORKSPACE_FILES, payload.tx, () => handleNxWorkspaceFiles(payload.projectRootMap) ); } else if (isHandleGetFilesInDirectoryMessage(payload)) { - await handleResult(socket, GET_FILES_IN_DIRECTORY, () => + handleResult(socket, GET_FILES_IN_DIRECTORY, payload.tx, () => handleGetFilesInDirectory(payload.dir) ); } else if (isHandleContextFileDataMessage(payload)) { - await handleResult(socket, GET_CONTEXT_FILE_DATA, () => + handleResult(socket, GET_CONTEXT_FILE_DATA, payload.tx, () => handleContextFileData() ); } else if (isHandleHashGlobMessage(payload)) { - await handleResult(socket, HASH_GLOB, () => + handleResult(socket, HASH_GLOB, payload.tx, () => handleHashGlob(payload.globs, payload.exclude) ); } else if (isHandleGetFlakyTasksMessage(payload)) { - await handleResult(socket, GET_FLAKY_TASKS, () => + handleResult(socket, GET_FLAKY_TASKS, payload.tx, () => handleGetFlakyTasks(payload.hashes) ); } else if (isHandleGetEstimatedTaskTimings(payload)) { - await handleResult(socket, GET_ESTIMATED_TASK_TIMINGS, () => + handleResult(socket, GET_ESTIMATED_TASK_TIMINGS, payload.tx, () => handleGetEstimatedTaskTimings(payload.targets) ); } else if (isHandleWriteTaskRunsToHistoryMessage(payload)) { - await handleResult(socket, RECORD_TASK_RUNS, () => + handleResult(socket, RECORD_TASK_RUNS, payload.tx, () => handleRecordTaskRuns(payload.taskRuns) ); } else if (isHandleForceShutdownMessage(payload)) { - await handleResult(socket, 'FORCE_SHUTDOWN', () => + handleResult(socket, 'FORCE_SHUTDOWN', payload.tx, () => handleForceShutdown(server) ); } else if (isHandleGetSyncGeneratorChangesMessage(payload)) { - await handleResult(socket, GET_SYNC_GENERATOR_CHANGES, () => + await handleResult(socket, GET_SYNC_GENERATOR_CHANGES, payload.tx, () => handleGetSyncGeneratorChanges(payload.generators) ); } else if (isHandleFlushSyncGeneratorChangesToDiskMessage(payload)) { - await handleResult(socket, FLUSH_SYNC_GENERATOR_CHANGES_TO_DISK, () => - handleFlushSyncGeneratorChangesToDisk(payload.generators) + await handleResult( + socket, + FLUSH_SYNC_GENERATOR_CHANGES_TO_DISK, + payload.tx, + () => handleFlushSyncGeneratorChangesToDisk(payload.generators) ); } else if (isHandleGetRegisteredSyncGeneratorsMessage(payload)) { - await handleResult(socket, GET_REGISTERED_SYNC_GENERATORS, () => + handleResult(socket, GET_REGISTERED_SYNC_GENERATORS, payload.tx, () => handleGetRegisteredSyncGenerators() ); } else if (isHandleUpdateWorkspaceContextMessage(payload)) { - await handleResult(socket, UPDATE_WORKSPACE_CONTEXT, () => + handleResult(socket, UPDATE_WORKSPACE_CONTEXT, payload.tx, () => handleUpdateWorkspaceContext( payload.createdFiles, payload.updatedFiles, @@ -281,7 +301,7 @@ async function handleMessage(socket, data: string) { ) ); } else { - await respondWithErrorAndExit( + respondWithErrorAndExit( socket, `Invalid payload from the client`, new Error(`Unsupported payload sent to daemon server: ${unparsedPayload}`) @@ -292,15 +312,16 @@ async function handleMessage(socket, data: string) { export async function handleResult( socket: Socket, type: string, + tx: string, hrFn: () => Promise ) { const startMark = new Date(); const hr = await hrFn(); const doneHandlingMark = new Date(); if (hr.error) { - await respondWithErrorAndExit(socket, hr.description, hr.error); + await respondWithError(socket, hr.description, hr.error, tx); } else { - await respondToClient(socket, hr.response, hr.description); + await respondToClient(socket, hr.response, hr.description, tx); } const endMark = new Date(); serverLogger.log( diff --git a/packages/nx/src/daemon/server/shutdown-utils.ts b/packages/nx/src/daemon/server/shutdown-utils.ts index fde1e65d936f6..4bf455f5ac7a6 100644 --- a/packages/nx/src/daemon/server/shutdown-utils.ts +++ b/packages/nx/src/daemon/server/shutdown-utils.ts @@ -92,26 +92,54 @@ export function resetInactivityTimeout(cb: () => void): void { export function respondToClient( socket: Socket, response: string, - description: string + description: string, + tx: string ) { return new Promise(async (res) => { if (description) { serverLogger.requestLog(`Responding to the client.`, description); } - socket.write(`${response}${String.fromCodePoint(4)}`, (err) => { - if (err) { - console.error(err); + socket.write( + `{ "tx": "${tx}", "response": ${response}}${String.fromCodePoint(4)}`, + (err) => { + if (err) { + console.error(err); + } + serverLogger.log(`Done responding to the client`, description); + res(null); } - serverLogger.log(`Done responding to the client`, description); - res(null); - }); + ); }); } +export async function respondWithError( + socket: Socket, + description: string, + error: Error, + tx: string +) { + const normalizedError = + error instanceof DaemonProjectGraphError + ? ProjectGraphError.fromDaemonProjectGraphError(error) + : error; + + // print some extra stuff in the error message + serverLogger.requestLog( + `Responding to the client with an error.`, + description, + normalizedError.message + ); + console.error(normalizedError.stack); + + // Respond with the original error + await respondToClient(socket, serializeResult(error, null, null), null, tx); +} + export async function respondWithErrorAndExit( socket: Socket, description: string, - error: Error + error: Error, + tx?: string ) { const normalizedError = error instanceof DaemonProjectGraphError @@ -127,5 +155,7 @@ export async function respondWithErrorAndExit( console.error(normalizedError.stack); // Respond with the original error - await respondToClient(socket, serializeResult(error, null, null), null); + await respondToClient(socket, serializeResult(error, null, null), null, tx); + + process.exit(1); } diff --git a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts index 740f8562ac68c..bb287ea9c8b31 100644 --- a/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts +++ b/packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts @@ -9,7 +9,11 @@ import { PluginConfiguration } from '../../../config/nx-json'; import { LoadedNxPlugin } from '../internal-api'; import { getPluginOsSocketPath } from '../../../daemon/socket-utils'; -import { consumeMessagesFromSocket } from '../../../utils/consume-messages-from-socket'; +import { + consumeMessagesFromSocket, + PendingPromise, + registerPendingPromise, +} from '../../../utils/messaging'; import { consumeMessage, @@ -39,12 +43,6 @@ const MAX_MESSAGE_WAIT = 2147483647 : 1000 * 60 * MINUTES; // 10 minutes -interface PendingPromise { - promise: Promise; - resolver: (result: any) => void; - rejector: (err: any) => void; -} - type NxPluginWorkerCache = Map>; const nxPluginWorkerCache: NxPluginWorkerCache = (global[ @@ -113,6 +111,10 @@ export async function loadRemoteNxPlugin( return [pluginPromise, cleanupFunction]; } +const getTimeoutErrorText = + (context: { plugin: string; operation: string }) => (): string => + `${context.plugin} timed out after ${MINUTES} minutes during ${context.operation}. ${PLUGIN_TIMEOUT_HINT_TEXT}`; + /** * Creates a message handler for the given worker. * @param worker Instance of plugin-worker @@ -163,10 +165,11 @@ function createWorkerHandler( payload: { configFiles, context: ctx, tx }, }); }, - { + getTimeoutErrorText({ plugin: pluginName, operation: 'createNodes', - } + }), + MAX_MESSAGE_WAIT ); }, ] @@ -184,10 +187,11 @@ function createWorkerHandler( payload: { context: ctx, tx }, }); }, - { + getTimeoutErrorText({ plugin: pluginName, operation: 'createDependencies', - } + }), + MAX_MESSAGE_WAIT ); } : undefined, @@ -204,10 +208,11 @@ function createWorkerHandler( payload: { graph, context: ctx, tx }, }); }, - { + getTimeoutErrorText({ plugin: pluginName, operation: 'createMetadata', - } + }), + MAX_MESSAGE_WAIT ); } : undefined, @@ -261,46 +266,6 @@ function createWorkerExitHandler( }; } -function registerPendingPromise( - tx: string, - pending: Map, - callback: () => void, - context: { - plugin: string; - operation: string; - } -): Promise { - let resolver: (x: unknown) => void, - rejector: (e: Error | unknown) => void, - timeout: NodeJS.Timeout; - - const promise = new Promise((res, rej) => { - rejector = rej; - resolver = res; - - timeout = setTimeout(() => { - rej( - new Error( - `${context.plugin} timed out after ${MINUTES} minutes during ${context.operation}. ${PLUGIN_TIMEOUT_HINT_TEXT}` - ) - ); - }, MAX_MESSAGE_WAIT); - - callback(); - }).finally(() => { - pending.delete(tx); - if (timeout) clearTimeout(timeout); - }); - - pending.set(tx, { - promise, - resolver, - rejector, - }); - - return promise; -} - global.nxPluginWorkerCount ??= 0; async function startPluginWorker() { // this should only really be true when running unit tests within diff --git a/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts b/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts index b4673afe2d052..58ed4b31ac65d 100644 --- a/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts +++ b/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts @@ -2,7 +2,7 @@ import { consumeMessage, isPluginWorkerMessage } from './messaging'; import { LoadedNxPlugin } from '../internal-api'; import { loadNxPlugin } from '../loader'; import { createSerializableError } from '../../../utils/serializable-error'; -import { consumeMessagesFromSocket } from '../../../utils/consume-messages-from-socket'; +import { consumeMessagesFromSocket } from '../../../utils/messaging'; import { createServer } from 'net'; import { unlinkSync } from 'fs'; diff --git a/packages/nx/src/tasks-runner/pseudo-ipc.ts b/packages/nx/src/tasks-runner/pseudo-ipc.ts index 1a251ea24e62a..8f2e4377b52e4 100644 --- a/packages/nx/src/tasks-runner/pseudo-ipc.ts +++ b/packages/nx/src/tasks-runner/pseudo-ipc.ts @@ -18,7 +18,7 @@ */ import { connect, Server, Socket } from 'net'; -import { consumeMessagesFromSocket } from '../utils/consume-messages-from-socket'; +import { consumeMessagesFromSocket } from '../utils/messaging'; import { Serializable } from 'child_process'; export interface PseudoIPCMessage { diff --git a/packages/nx/src/utils/consume-messages-from-socket.ts b/packages/nx/src/utils/consume-messages-from-socket.ts deleted file mode 100644 index 1abb4ee014da7..0000000000000 --- a/packages/nx/src/utils/consume-messages-from-socket.ts +++ /dev/null @@ -1,19 +0,0 @@ -export function consumeMessagesFromSocket(callback: (message: string) => void) { - let message = ''; - return (data) => { - const chunk = data.toString(); - if (chunk.codePointAt(chunk.length - 1) === 4) { - message += chunk.substring(0, chunk.length - 1); - - // Server may send multiple messages in one chunk, so splitting by 0x4 - const messages = message.split(''); - for (const splitMessage of messages) { - callback(splitMessage); - } - - message = ''; - } else { - message += chunk; - } - }; -} diff --git a/packages/nx/src/utils/consume-messages-from-socket.spec.ts b/packages/nx/src/utils/messaging.spec.ts similarity index 95% rename from packages/nx/src/utils/consume-messages-from-socket.spec.ts rename to packages/nx/src/utils/messaging.spec.ts index 9188b7fd6dc07..dd690a051e42a 100644 --- a/packages/nx/src/utils/consume-messages-from-socket.spec.ts +++ b/packages/nx/src/utils/messaging.spec.ts @@ -1,4 +1,4 @@ -import { consumeMessagesFromSocket } from './consume-messages-from-socket'; +import { consumeMessagesFromSocket } from './messaging'; describe('consumeMessagesFromSocket', () => { it('should handle messages where every messages is in its own chunk', () => { diff --git a/packages/nx/src/utils/messaging.ts b/packages/nx/src/utils/messaging.ts new file mode 100644 index 0000000000000..482d8cd15cc35 --- /dev/null +++ b/packages/nx/src/utils/messaging.ts @@ -0,0 +1,59 @@ +export function consumeMessagesFromSocket(callback: (message: string) => void) { + let message = ''; + return (data) => { + const chunk = data.toString(); + if (chunk.codePointAt(chunk.length - 1) === 4) { + message += chunk.substring(0, chunk.length - 1); + + // Server may send multiple messages in one chunk, so splitting by 0x4 + const messages = message.split(''); + for (const splitMessage of messages) { + callback(splitMessage); + } + + message = ''; + } else { + message += chunk; + } + }; +} + +export interface PendingPromise { + promise: Promise; + resolver: (result: any) => void; + rejector: (err: any) => void; +} + +export function registerPendingPromise( + tx: string, + pending: Map, + callback: () => void, + timeoutErrorText: () => string, + timeoutMs: number +): Promise { + let resolver: (x: unknown) => void, + rejector: (e: Error | unknown) => void, + timeout: NodeJS.Timeout; + + const promise = new Promise((res, rej) => { + rejector = rej; + resolver = res; + + timeout = setTimeout(() => { + rej(new Error(timeoutErrorText())); + }, timeoutMs); + + callback(); + }).finally(() => { + pending.delete(tx); + if (timeout) clearTimeout(timeout); + }); + + pending.set(tx, { + promise, + resolver, + rejector, + }); + + return promise; +}