Skip to content

Commit

Permalink
fix(core): move plugin worker to socket (#26558)
Browse files Browse the repository at this point in the history
<!-- Please make sure you have read the submission guidelines before
posting an PR -->
<!--
https://github.com/nrwl/nx/blob/master/CONTRIBUTING.md#-submitting-a-pr
-->

<!-- Please make sure that your commit message follows our format -->
<!-- Example: `fix(nx): must begin with lowercase` -->

<!-- If this is a particularly complex change or feature addition, you
can request a dedicated Nx release for this pull request branch. Mention
someone from the Nx team or the `@nrwl/nx-pipelines-reviewers` and they
will confirm if the PR warrants its own release for testing purposes,
and generate it for you if appropriate. -->

## Current Behavior
Plugin isolation communicates with workers via built-in node IPC with
forked processes. When doing this, the parent process will not exit
until the child process has exited, in case more messages would be sent.
This requires an explicit call to shut down the plugin workers.

We set this up as a `process.on('exit')` listener, to shutdown the
workers whenever the main Nx process dies. This is "fine", but requires
explicit calls to `process.exit` as node won't exit on its own
otherwise.

## Expected Behavior
To allow plugin workers to clean themselves up on exit, but not require
explicit `process.exit` calls, we need to detach them from the main
process and call `unref`. This only works when IPC is not being used. As
such, we need a different way to communicate with the worker.

This PR updates the communication method to mirror the daemon, and
communicate over a socket. Additionally, this PR enables isolation
during the Nx repo's E2E tests.

## Related Issue(s)
<!-- Please link the issue being fixed so it gets closed when this is
merged. -->

Fixes #
  • Loading branch information
AgentEnder authored and meeroslav committed Jun 27, 2024
1 parent 695702b commit 14f5459
Show file tree
Hide file tree
Showing 12 changed files with 332 additions and 196 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
NX_ISOLATE_PLUGINS=true
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ out
.angular

# Local dev files
.env
.env.local
.bashrc
.nx

Expand Down
2 changes: 1 addition & 1 deletion e2e/utils/get-env-info.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ export function getStrippedEnvironmentVariables() {
return true;
}

const allowedKeys = ['NX_ADD_PLUGINS'];
const allowedKeys = ['NX_ADD_PLUGINS', 'NX_ISOLATE_PLUGINS'];

if (key.startsWith('NX_') && !allowedKeys.includes(key)) {
return false;
Expand Down
2 changes: 1 addition & 1 deletion nx.json
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,6 @@
"nxCloudUrl": "https://staging.nx.app",
"parallel": 1,
"cacheDirectory": "/tmp/nx-cache",
"bust": 5,
"bust": 7,
"defaultBase": "master"
}
3 changes: 2 additions & 1 deletion packages/nx/src/command-line/run/command-object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export const yargsNxInfixCommand: CommandModule = {
command: '$0 <target> [project] [_..]',
describe: 'Run a target for a project',
handler: async (args) => {
await handleErrors(
const exitCode = await handleErrors(
(args.verbose as boolean) ?? process.env.NX_VERBOSE_LOGGING === 'true',
async () => {
return (await import('./run-one')).runOne(
Expand All @@ -46,5 +46,6 @@ export const yargsNxInfixCommand: CommandModule = {
);
}
);
process.exit(exitCode);
},
};
5 changes: 5 additions & 0 deletions packages/nx/src/daemon/socket-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ export const getForkedProcessOsSocketPath = (id: string) => {
return isWindows ? '\\\\.\\pipe\\nx\\' + resolve(path) : resolve(path);
};

export const getPluginOsSocketPath = (id: string) => {
let path = resolve(join(getSocketDir(), 'plugin' + id + '.sock'));
return isWindows ? '\\\\.\\pipe\\nx\\' + resolve(path) : resolve(path);
};

export function killSocketOrPath(): void {
try {
unlinkSync(getFullOsSocketPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ describe('getTouchedNpmPackages', () => {
});

it('should handle and log workspace package.json changes when the changes are not in `npmPackages` (projectGraph.externalNodes)', () => {
jest.spyOn(logger, 'warn');
jest.spyOn(logger, 'warn').mockImplementation(() => {});
expect(() => {
getTouchedNpmPackages(
[
Expand Down
2 changes: 1 addition & 1 deletion packages/nx/src/project-graph/plugins/internal-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ export async function loadNxPlugins(

const cleanupFunctions: Array<() => void> = [];
for (const plugin of plugins) {
const [loadedPluginPromise, cleanup] = loadingMethod(plugin, root);
const [loadedPluginPromise, cleanup] = await loadingMethod(plugin, root);
result.push(loadedPluginPromise);
cleanupFunctions.push(cleanup);
}
Expand Down
26 changes: 4 additions & 22 deletions packages/nx/src/project-graph/plugins/isolation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,15 @@ import { PluginConfiguration } from '../../../config/nx-json';
import { LoadedNxPlugin } from '../internal-api';
import { loadRemoteNxPlugin } from './plugin-pool';

/**
* Used to ensure 1 plugin : 1 worker
*/
const remotePluginCache = new Map<
string,
readonly [Promise<LoadedNxPlugin>, () => void]
>();

export function loadNxPluginInIsolation(
export async function loadNxPluginInIsolation(
plugin: PluginConfiguration,
root = workspaceRoot
): readonly [Promise<LoadedNxPlugin>, () => void] {
const cacheKey = JSON.stringify(plugin);

if (remotePluginCache.has(cacheKey)) {
return remotePluginCache.get(cacheKey);
}

const [loadingPlugin, cleanup] = loadRemoteNxPlugin(plugin, root);
// We clean up plugin workers when Nx process completes.
const val = [
): Promise<readonly [Promise<LoadedNxPlugin>, () => void]> {
const [loadingPlugin, cleanup] = await loadRemoteNxPlugin(plugin, root);
return [
loadingPlugin,
() => {
cleanup();
remotePluginCache.delete(cacheKey);
},
] as const;
remotePluginCache.set(cacheKey, val);
return val;
}
16 changes: 14 additions & 2 deletions packages/nx/src/project-graph/plugins/isolation/messaging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import {
CreateDependenciesContext,
CreateMetadataContext,
CreateNodesContext,
CreateNodesContextV2,
} from '../public-api';
import { LoadedNxPlugin } from '../internal-api';
import { Serializable } from 'child_process';
import { Socket } from 'net';

export interface PluginWorkerLoadMessage {
type: 'load';
Expand Down Expand Up @@ -42,7 +44,7 @@ export interface PluginWorkerCreateNodesMessage {
type: 'createNodes';
payload: {
configFiles: string[];
context: CreateNodesContext;
context: CreateNodesContextV2;
tx: string;
};
}
Expand Down Expand Up @@ -159,6 +161,7 @@ export function isPluginWorkerMessage(
'createNodes',
'createDependencies',
'processProjectGraph',
'createMetadata',
].includes(message.type)
);
}
Expand All @@ -175,6 +178,7 @@ export function isPluginWorkerResult(
'createNodesResult',
'createDependenciesResult',
'processProjectGraphResult',
'createMetadataResult',
].includes(message.type)
);
}
Expand All @@ -192,6 +196,7 @@ type MessageHandlerReturn<T extends PluginWorkerMessage | PluginWorkerResult> =
export async function consumeMessage<
T extends PluginWorkerMessage | PluginWorkerResult
>(
socket: Socket,
raw: T,
handlers: {
[K in T['type']]: (
Expand All @@ -205,7 +210,14 @@ export async function consumeMessage<
if (handler) {
const response = await handler(message.payload);
if (response) {
process.send!(response);
sendMessageOverSocket(socket, response);
}
}
}

export function sendMessageOverSocket(
socket: Socket,
message: PluginWorkerMessage | PluginWorkerResult
) {
socket.write(JSON.stringify(message) + String.fromCodePoint(4));
}
Loading

0 comments on commit 14f5459

Please sign in to comment.