Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom watch mode + fibers restored #3

Open
wants to merge 7 commits into
base: v18.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions deps/v8/src/base/platform/platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,13 @@ class V8_BASE_EXPORT Thread {
static LocalStorageKey CreateThreadLocalKey();
static void DeleteThreadLocalKey(LocalStorageKey key);
static void* GetThreadLocal(LocalStorageKey key);
static int GetThreadLocalInt(LocalStorageKey key) {
return static_cast<int>(reinterpret_cast<intptr_t>(GetThreadLocal(key)));
}
static void SetThreadLocal(LocalStorageKey key, void* value);
static void SetThreadLocalInt(LocalStorageKey key, int value) {
SetThreadLocal(key, reinterpret_cast<void*>(static_cast<intptr_t>(value)));
}
static bool HasThreadLocal(LocalStorageKey key) {
return GetThreadLocal(key) != nullptr;
}
Expand Down
7 changes: 6 additions & 1 deletion deps/v8/src/execution/thread-id.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,27 @@ namespace internal {

namespace {

thread_local int thread_id = 0;
DEFINE_LAZY_LEAKY_OBJECT_GETTER(base::Thread::LocalStorageKey, GetThreadIdKey,
base::Thread::CreateThreadLocalKey())

std::atomic<int> next_thread_id{1};

} // namespace

// static
ThreadId ThreadId::TryGetCurrent() {
int thread_id = base::Thread::GetThreadLocalInt(*GetThreadIdKey());
return thread_id == 0 ? Invalid() : ThreadId(thread_id);
}

// static
int ThreadId::GetCurrentThreadId() {
auto key = *GetThreadIdKey();
int thread_id = base::Thread::GetThreadLocalInt(key);
if (thread_id == 0) {
thread_id = next_thread_id.fetch_add(1);
CHECK_LE(1, thread_id);
base::Thread::SetThreadLocalInt(key, thread_id);
}
return thread_id;
}
Expand Down
23 changes: 16 additions & 7 deletions lib/internal/main/watch_mode.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ function start() {
process.stdout.write(`${red}Failed running ${kCommandStr}${white}\n`);
}
});
return child;
}

async function killAndWait(signal = kKillSignal, force = false) {
Expand Down Expand Up @@ -91,29 +92,37 @@ function reportGracefulTermination() {
};
}

async function stop() {
async function stop(child) {
// without this line, the child process is still able to receive IPC, but is unable to send additional messages
watcher.destroyIPC(child);
watcher.clearFileFilters();
const clearGraceReport = reportGracefulTermination();
await killAndWait();
clearGraceReport();
}

async function restart() {
async function restart(child) {
if (!kPreserveOutput) process.stdout.write(clear);
process.stdout.write(`${green}Restarting ${kCommandStr}${white}\n`);
await stop();
start();
if (process.send) {
// if there is a parent process, there's a good chance it would like to know that the process is restarting
process.send({
['watch:restarting']: {}
});
}
await stop(child);
return start();
}

(async () => {
emitExperimentalWarning('Watch mode');

let child;
try {
start();
child = start();

// eslint-disable-next-line no-unused-vars
for await (const _ of on(watcher, 'changed')) {
await restart();
child = await restart(child);
}
} catch (error) {
triggerUncaughtException(error, true /* fromPromise */);
Expand Down
83 changes: 76 additions & 7 deletions lib/internal/watch_mode/files_watcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ const { kEmptyObject } = require('internal/util');
const { TIMEOUT_MAX } = require('internal/timers');

const EventEmitter = require('events');
const { watch } = require('fs');
const { fileURLToPath } = require('url');
const { watch, existsSync } = require('fs');
const { fileURLToPath } = require('internal/url');
const { resolve, dirname } = require('path');
const { setTimeout } = require('timers');
const { setTimeout, clearTimeout, setInterval, clearInterval } = require('timers');


const supportsRecursiveWatching = process.platform === 'win32' ||
Expand All @@ -29,15 +29,21 @@ class FilesWatcher extends EventEmitter {
#depencencyOwners = new SafeMap();
#ownerDependencies = new SafeMap();
#throttle;
#renameInterval;
#renameTimeout;
#mode;
#wantsPassthroughIPC = false;

constructor({ throttle = 500, mode = 'filter' } = kEmptyObject) {
constructor({ throttle = 200, mode = 'filter', renameInterval = 1000, renameTimeout = 60_000 } = kEmptyObject) {
super();

validateNumber(throttle, 'options.throttle', 0, TIMEOUT_MAX);
validateOneOf(mode, 'options.mode', ['filter', 'all']);
this.#throttle = throttle;
this.#mode = mode;
this.#renameInterval = renameInterval;
this.#renameTimeout = renameTimeout;
this.#wantsPassthroughIPC = !!process.send;
}

#isPathWatched(path) {
Expand Down Expand Up @@ -68,7 +74,10 @@ class FilesWatcher extends EventEmitter {
watcher.handle.close();
}

#onChange(trigger) {
#onChange(eventType, trigger, recursive) {
if (eventType === 'rename' && !recursive) {
return this.#rewatch(trigger);
}
if (this.#throttling.has(trigger)) {
return;
}
Expand All @@ -81,6 +90,34 @@ class FilesWatcher extends EventEmitter {
setTimeout(() => this.#throttling.delete(trigger), this.#throttle).unref();
}

// When a file is removed, wait for it to be re-added.
// Often this re-add is immediate - some editors (e.g., gedit) and some docker mount modes do this.
#rewatch(path) {
if (this.#isPathWatched(path)) {
this.#unwatch(this.#watchers.get(path));
this.#watchers.delete(path);
if (existsSync(path)) {
this.watchPath(path, false);
return;
}
let timeout;

// Wait for the file to exist - check every `renameInterval` ms
const interval = setInterval(async () => {
if (existsSync(path)) {
clearInterval(interval);
clearTimeout(timeout);
this.watchPath(path, false);
}
}, this.#renameInterval);

// Don't wait forever - after `renameTimeout` ms, stop trying
timeout = setTimeout(() => {
clearInterval(interval);
}, this.#renameTimeout);
}
}

get watchedPaths() {
return [...this.#watchers.keys()];
}
Expand All @@ -91,8 +128,8 @@ class FilesWatcher extends EventEmitter {
}
const watcher = watch(path, { recursive });
watcher.on('change', (eventType, fileName) => this
.#onChange(recursive ? resolve(path, fileName) : path));
this.#watchers.set(path, { handle: watcher, recursive });
.#onChange(eventType, recursive ? resolve(path, fileName) : path, recursive));
this.#watchers.set(path, { path, handle: watcher, recursive });
if (recursive) {
this.#removeWatchedChildren(path);
}
Expand All @@ -117,17 +154,49 @@ class FilesWatcher extends EventEmitter {
this.#ownerDependencies.set(owner, dependencies);
}
}


#setupIPC(child) {
child._ipcMessages = {
parentToChild: message => child.send(message),
childToParent: message => process.send(message)
};
process.on("message", child._ipcMessages.parentToChild);
child.on("message", child._ipcMessages.childToParent);
}

destroyIPC(child) {
if (this.#wantsPassthroughIPC) {
process.off("message", child._ipcMessages.parentToChild);
child.off("message", child._ipcMessages.childToParent);
}
}

watchChildProcessModules(child, key = null) {
if (this.#wantsPassthroughIPC) {
this.#setupIPC(child);
}
if (this.#mode !== 'filter') {
return;
}
let sentInitial = false;
child.on('message', (message) => {
try {
let sendInitial = false;
if (ArrayIsArray(message['watch:require'])) {
ArrayPrototypeForEach(message['watch:require'], (file) => this.filterFile(file, key));
sendInitial = true;
}
if (ArrayIsArray(message['watch:import'])) {
ArrayPrototypeForEach(message['watch:import'], (file) => this.filterFile(fileURLToPath(file), key));
sendInitial = true;
}
if (sendInitial && process.send) {
sentInitial = true;
// if there is a parent process, there's a good chance it would like to know that the process is restarted (or is in the process of coming up)
process.send({
['watch:restarted']: {}
});
}
} catch {
// Failed watching file. ignore
Expand Down
122 changes: 99 additions & 23 deletions test/sequential/test-watch-mode.mjs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import fs from 'fs/promises';
import * as common from '../common/index.mjs';
import * as fixtures from '../common/fixtures.mjs';
import tmpdir from '../common/tmpdir.js';
Expand Down Expand Up @@ -34,6 +35,8 @@ async function spawnWithRestarts({
watchedFile = file,
restarts = 1,
isReady,
spawnOptions,
returnChild = false
}) {
args ??= [file];
const printedArgs = inspect(args.slice(args.indexOf(file)).join(' '));
Expand All @@ -44,30 +47,36 @@ async function spawnWithRestarts({
let cancelRestarts;

disableRestart = true;
const child = spawn(execPath, ['--watch', '--no-warnings', ...args], { encoding: 'utf8' });
child.stderr.on('data', (data) => {
stderr += data;
});
child.stdout.on('data', async (data) => {
if (data.toString().includes('Restarting')) {
disableRestart = true;
}
stdout += data;
const restartsCount = stdout.match(new RegExp(`Restarting ${printedArgs.replace(/\\/g, '\\\\')}`, 'g'))?.length ?? 0;
if (restarts === 0 || !isReady(data.toString())) {
return;
}
if (restartsCount >= restarts) {
cancelRestarts?.();
child.kill();
return;
}
cancelRestarts ??= restart(watchedFile);
if (isReady(data.toString())) {
disableRestart = false;
}
});
const child = spawn(execPath, ['--watch', '--no-warnings', ...args], { encoding: 'utf8', ...spawnOptions });

if (!returnChild) {
child.stderr.on('data', (data) => {
stderr += data;
});
child.stdout.on('data', async (data) => {
if (data.toString().includes('Restarting')) {
disableRestart = true;
}
stdout += data;
const restartsCount = stdout.match(new RegExp(`Restarting ${printedArgs.replace(/\\/g, '\\\\')}`, 'g'))?.length ?? 0;
if (restarts === 0 || !isReady(data.toString())) {
return;
}
if (restartsCount >= restarts) {
cancelRestarts?.();
child.kill();
return;
}
cancelRestarts ??= restart(watchedFile);
if (isReady(data.toString())) {
disableRestart = false;
}
});
}
else {
// this test is doing it's own thing
return { child };
}
await once(child, 'exit');
cancelRestarts?.();
return { stderr, stdout };
Expand Down Expand Up @@ -248,6 +257,7 @@ describe('watch mode', { concurrency: false, timeout: 60_000 }, () => {
});
});


// TODO: Remove skip after https://github.com/nodejs/node/pull/45271 lands
it('should not watch when running an missing file', {
skip: !supportsRecursive
Expand Down Expand Up @@ -307,4 +317,70 @@ describe('watch mode', { concurrency: false, timeout: 60_000 }, () => {
`Completed running ${inspect(file)}`,
]);
});

it('should pass IPC messages from a spawning parent to the child and back', async () => {
const file = createTmpFile('console.log("running");\nprocess.on("message", (message) => {\n if (message === "exit") {\n process.exit(0);\n } else {\n console.log("Received:", message);\n process.send(message);\n }\n})');
const { child } = await spawnWithRestarts({
file,
args: [file],
spawnOptions: {
stdio: ['pipe', 'pipe', 'pipe', 'ipc'],
},
returnChild: true,
restarts: 2
});

let stderr = '';
let stdout = '';

child.stdout.on("data", data => stdout += data);
child.stderr.on("data", data => stderr += data);
async function waitForEcho(msg) {
const receivedPromise = new Promise((resolve) => {
const fn = (message) => {
if (message === msg) {
child.off("message", fn);
resolve();
}
};
child.on("message", fn);
});
child.send(msg);
await receivedPromise;
}
async function waitForText(text) {
const seenPromise = new Promise((resolve) => {
const fn = (data) => {
if (data.toString().includes(text)) {
resolve();
child.stdout.off("data", fn);
}
}
child.stdout.on("data", fn);
});
await seenPromise;
}

await waitForEcho("first message");
const stopRestarts = restart(file);
await waitForText("running");
stopRestarts();
await waitForEcho("second message");
const exitedPromise = once(child, 'exit');
child.send("exit");
await waitForText("Completed");
child.disconnect();
child.kill();
await exitedPromise;
assert.strictEqual(stderr, '');
const lines = stdout.split(/\r?\n/).filter(Boolean);
assert.deepStrictEqual(lines, [
'running',
'Received: first message',
`Restarting '${file}'`,
'running',
'Received: second message',
`Completed running ${inspect(file)}`,
]);
});
});