Skip to content

Commit

Permalink
fix(core): unref socket and avoid needlessly disposing + reconnecting
Browse files Browse the repository at this point in the history
  • Loading branch information
AgentEnder committed Jun 25, 2024
1 parent 1fc5e35 commit 4e9c414
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ describe('getTouchedNpmPackages', () => {
});

it('should handle and log workspace package.json changes when the changes are not in `npmPackages` (projectGraph.externalNodes)', () => {
jest.spyOn(logger, 'warn');
jest.spyOn(logger, 'warn').mockImplementation(() => {});
expect(() => {
getTouchedNpmPackages(
[
Expand Down
20 changes: 1 addition & 19 deletions packages/nx/src/project-graph/plugins/isolation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,15 @@ import { PluginConfiguration } from '../../../config/nx-json';
import { LoadedNxPlugin } from '../internal-api';
import { loadRemoteNxPlugin } from './plugin-pool';

/**
* Used to ensure 1 plugin : 1 worker
*/
const remotePluginCache = new Map<
string,
readonly [Promise<LoadedNxPlugin>, () => void]
>();

export async function loadNxPluginInIsolation(
plugin: PluginConfiguration,
root = workspaceRoot
): Promise<readonly [Promise<LoadedNxPlugin>, () => void]> {
const cacheKey = JSON.stringify(plugin);

if (remotePluginCache.has(cacheKey)) {
return remotePluginCache.get(cacheKey);
}

const [loadingPlugin, cleanup] = await loadRemoteNxPlugin(plugin, root);
// We clean up plugin workers when Nx process completes.
const val = [
return [
loadingPlugin,
() => {
cleanup();
remotePluginCache.delete(cacheKey);
},
] as const;
remotePluginCache.set(cacheKey, val);
return val;
}
24 changes: 9 additions & 15 deletions packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,12 @@ export async function loadRemoteNxPlugin(
plugin: PluginConfiguration,
root: string
): Promise<[Promise<LoadedNxPlugin>, () => void]> {
const cacheKey = JSON.stringify(plugin);
const cacheKey = JSON.stringify({ plugin, root });
if (nxPluginWorkerCache.has(cacheKey)) {
return [nxPluginWorkerCache.get(cacheKey), () => {}];
}

const { ipcPath, worker } = await startPluginWorker();

const socket = await new Promise<Socket>((res, rej) => {
const socket = connect(ipcPath, () => {
res(socket);
});
socket.on('error', rej);
});
const { worker, socket } = await startPluginWorker();

const pendingPromises = new Map<string, PendingPromise>();

Expand Down Expand Up @@ -317,14 +310,16 @@ async function startPluginWorker() {
let attempts = 0;
return new Promise<{
worker: ChildProcess;
ipcPath: string;
socket: Socket;
}>((resolve, reject) => {
const id = setInterval(async () => {
if (await isServerAvailable(ipcPath)) {
const socket = await isServerAvailable(ipcPath);
if (socket) {
// socket.unref();
clearInterval(id);
resolve({
worker,
ipcPath,
socket,
});
} else if (attempts > 1000) {
// daemon fails to start, the process probably exited
Expand All @@ -337,12 +332,11 @@ async function startPluginWorker() {
});
}

function isServerAvailable(ipcPath: string): Promise<boolean> {
function isServerAvailable(ipcPath: string): Promise<Socket | false> {
return new Promise((resolve) => {
try {
const socket = connect(ipcPath, () => {
socket.destroy();
resolve(true);
resolve(socket);
});
socket.once('error', () => {
resolve(false);
Expand Down

0 comments on commit 4e9c414

Please sign in to comment.