Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #50880 - enable passthrough IPC in watch mode #1

Open
wants to merge 1 commit into
base: v18.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions lib/internal/main/watch_mode.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ function start() {
process.stdout.write(`${red}Failed running ${kCommandStr}${white}\n`);
}
});
return child;
}

async function killAndWait(signal = kKillSignal, force = false) {
Expand Down Expand Up @@ -91,29 +92,31 @@ function reportGracefulTermination() {
};
}

async function stop() {
async function stop(child) {
// without this line, the child process is still able to receive IPC, but is unable to send additional messages
watcher.destroyIPC(child);
watcher.clearFileFilters();
const clearGraceReport = reportGracefulTermination();
await killAndWait();
clearGraceReport();
}

async function restart() {
async function restart(child) {
if (!kPreserveOutput) process.stdout.write(clear);
process.stdout.write(`${green}Restarting ${kCommandStr}${white}\n`);
await stop();
start();
await stop(child);
return start();
}

(async () => {
emitExperimentalWarning('Watch mode');

let child;
try {
start();
child = start();

// eslint-disable-next-line no-unused-vars
for await (const _ of on(watcher, 'changed')) {
await restart();
child = await restart(child);
}
} catch (error) {
triggerUncaughtException(error, true /* fromPromise */);
Expand Down
24 changes: 24 additions & 0 deletions lib/internal/watch_mode/files_watcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@ class FilesWatcher extends EventEmitter {
#throttle;
#mode;

#wantsPassthroughIPC = false;

constructor({ throttle = 500, mode = 'filter' } = kEmptyObject) {
super();

validateNumber(throttle, 'options.throttle', 0, TIMEOUT_MAX);
validateOneOf(mode, 'options.mode', ['filter', 'all']);
this.#throttle = throttle;
this.#mode = mode;
this.#wantsPassthroughIPC = !!process.send;
}

#isPathWatched(path) {
Expand Down Expand Up @@ -117,7 +120,28 @@ class FilesWatcher extends EventEmitter {
this.#ownerDependencies.set(owner, dependencies);
}
}


#setupIPC(child) {
child._ipcMessages = {
parentToChild: message => child.send(message),
childToParent: message => process.send(message)
};
process.on("message", child._ipcMessages.parentToChild);
child.on("message", child._ipcMessages.childToParent);
}

destroyIPC(child) {
if (this.#wantsPassthroughIPC) {
process.off("message", child._ipcMessages.parentToChild);
child.off("message", child._ipcMessages.childToParent);
}
}

watchChildProcessModules(child, key = null) {
if (this.#wantsPassthroughIPC) {
this.#setupIPC(child);
}
if (this.#mode !== 'filter') {
return;
}
Expand Down
122 changes: 99 additions & 23 deletions test/sequential/test-watch-mode.mjs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import fs from 'fs/promises';
import * as common from '../common/index.mjs';
import * as fixtures from '../common/fixtures.mjs';
import tmpdir from '../common/tmpdir.js';
Expand Down Expand Up @@ -34,6 +35,8 @@ async function spawnWithRestarts({
watchedFile = file,
restarts = 1,
isReady,
spawnOptions,
returnChild = false
}) {
args ??= [file];
const printedArgs = inspect(args.slice(args.indexOf(file)).join(' '));
Expand All @@ -44,30 +47,36 @@ async function spawnWithRestarts({
let cancelRestarts;

disableRestart = true;
const child = spawn(execPath, ['--watch', '--no-warnings', ...args], { encoding: 'utf8' });
child.stderr.on('data', (data) => {
stderr += data;
});
child.stdout.on('data', async (data) => {
if (data.toString().includes('Restarting')) {
disableRestart = true;
}
stdout += data;
const restartsCount = stdout.match(new RegExp(`Restarting ${printedArgs.replace(/\\/g, '\\\\')}`, 'g'))?.length ?? 0;
if (restarts === 0 || !isReady(data.toString())) {
return;
}
if (restartsCount >= restarts) {
cancelRestarts?.();
child.kill();
return;
}
cancelRestarts ??= restart(watchedFile);
if (isReady(data.toString())) {
disableRestart = false;
}
});
const child = spawn(execPath, ['--watch', '--no-warnings', ...args], { encoding: 'utf8', ...spawnOptions });

if (!returnChild) {
child.stderr.on('data', (data) => {
stderr += data;
});
child.stdout.on('data', async (data) => {
if (data.toString().includes('Restarting')) {
disableRestart = true;
}
stdout += data;
const restartsCount = stdout.match(new RegExp(`Restarting ${printedArgs.replace(/\\/g, '\\\\')}`, 'g'))?.length ?? 0;
if (restarts === 0 || !isReady(data.toString())) {
return;
}
if (restartsCount >= restarts) {
cancelRestarts?.();
child.kill();
return;
}
cancelRestarts ??= restart(watchedFile);
if (isReady(data.toString())) {
disableRestart = false;
}
});
}
else {
// this test is doing it's own thing
return { child };
}
await once(child, 'exit');
cancelRestarts?.();
return { stderr, stdout };
Expand Down Expand Up @@ -248,6 +257,7 @@ describe('watch mode', { concurrency: false, timeout: 60_000 }, () => {
});
});


// TODO: Remove skip after https://github.com/nodejs/node/pull/45271 lands
it('should not watch when running an missing file', {
skip: !supportsRecursive
Expand Down Expand Up @@ -307,4 +317,70 @@ describe('watch mode', { concurrency: false, timeout: 60_000 }, () => {
`Completed running ${inspect(file)}`,
]);
});

it('should pass IPC messages from a spawning parent to the child and back', async () => {
const file = createTmpFile('console.log("running");\nprocess.on("message", (message) => {\n if (message === "exit") {\n process.exit(0);\n } else {\n console.log("Received:", message);\n process.send(message);\n }\n})');
const { child } = await spawnWithRestarts({
file,
args: [file],
spawnOptions: {
stdio: ['pipe', 'pipe', 'pipe', 'ipc'],
},
returnChild: true,
restarts: 2
});

let stderr = '';
let stdout = '';

child.stdout.on("data", data => stdout += data);
child.stderr.on("data", data => stderr += data);
async function waitForEcho(msg) {
const receivedPromise = new Promise((resolve) => {
const fn = (message) => {
if (message === msg) {
child.off("message", fn);
resolve();
}
};
child.on("message", fn);
});
child.send(msg);
await receivedPromise;
}
async function waitForText(text) {
const seenPromise = new Promise((resolve) => {
const fn = (data) => {
if (data.toString().includes(text)) {
resolve();
child.stdout.off("data", fn);
}
}
child.stdout.on("data", fn);
});
await seenPromise;
}

await waitForEcho("first message");
const stopRestarts = restart(file);
await waitForText("running");
stopRestarts();
await waitForEcho("second message");
const exitedPromise = once(child, 'exit');
child.send("exit");
await waitForText("Completed");
child.disconnect();
child.kill();
await exitedPromise;
assert.strictEqual(stderr, '');
const lines = stdout.split(/\r?\n/).filter(Boolean);
assert.deepStrictEqual(lines, [
'running',
'Received: first message',
`Restarting '${file}'`,
'running',
'Received: second message',
`Completed running ${inspect(file)}`,
]);
});
});
Loading