Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

module: have a single hooks thread for all workers #52706

Merged
merged 13 commits into from
May 8, 2024
2 changes: 2 additions & 0 deletions lib/internal/main/worker_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ port.on('message', (message) => {
filename,
hasStdin,
publicPort,
hooksPort,
workerData,
} = message;

Expand All @@ -109,6 +110,7 @@ port.on('message', (message) => {
}

require('internal/worker').assignEnvironmentData(environmentData);
require('internal/worker').hooksPort = hooksPort;

if (SharedArrayBuffer !== undefined) {
// The counter is only passed to the workers created by the main thread,
Expand Down
127 changes: 84 additions & 43 deletions lib/internal/modules/esm/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const {
const { exitCodes: { kUnsettledTopLevelAwait } } = internalBinding('errors');
const { URL } = require('internal/url');
const { canParse: URLCanParse } = internalBinding('url');
const { receiveMessageOnPort } = require('worker_threads');
const { receiveMessageOnPort, isMainThread } = require('worker_threads');
const {
isAnyArrayBuffer,
isArrayBufferView,
Expand Down Expand Up @@ -482,6 +482,8 @@ class HooksProxy {
*/
#worker;

#portToHooksThread;
dygabo marked this conversation as resolved.
Show resolved Hide resolved

/**
* The last notification ID received from the worker. This is used to detect
* if the worker has already sent a notification before putting the main
Expand All @@ -499,26 +501,38 @@ class HooksProxy {
#isReady = false;

constructor() {
const { InternalWorker } = require('internal/worker');
MessageChannel ??= require('internal/worker/io').MessageChannel;

const { InternalWorker, hooksPort } = require('internal/worker');
const lock = new SharedArrayBuffer(SHARED_MEMORY_BYTE_LENGTH);
this.#lock = new Int32Array(lock);

this.#worker = new InternalWorker(loaderWorkerId, {
stderr: false,
stdin: false,
stdout: false,
trackUnmanagedFds: false,
workerData: {
lock,
},
});
this.#worker.unref(); // ! Allows the process to eventually exit.
this.#worker.on('exit', process.exit);
if (isMainThread) {
// Main thread is the only one that creates the internal single hooks worker
this.#worker = new InternalWorker(loaderWorkerId, {
dygabo marked this conversation as resolved.
Show resolved Hide resolved
stderr: false,
stdin: false,
stdout: false,
trackUnmanagedFds: false,
workerData: {
lock,
},
});
this.#worker.unref(); // ! Allows the process to eventually exit.
this.#worker.on('exit', process.exit);
this.#portToHooksThread = this.#worker;
} else {
this.#portToHooksThread = hooksPort;
}
}

waitForWorker() {
// There is one Hooks instance for each worker thread. But only one of these Hooks instances
// has an InternalWorker. That was the Hooks instance created for the main thread.
// It means for all Hooks instances that are not on the main thread => they are ready because they
// delegate to the single InternalWorker anyway.
if (!isMainThread) {
return;
}

if (!this.#isReady) {
const { kIsOnline } = require('internal/worker');
if (!this.#worker[kIsOnline]) {
Expand All @@ -535,6 +549,37 @@ class HooksProxy {
}
}

#postMessageToWorker(method, type, transferList, args) {
this.waitForWorker();

MessageChannel ??= require('internal/worker/io').MessageChannel;

const {
port1: fromHooksThread,
port2: toHooksThread,
} = new MessageChannel();

// Pass work to the worker.
debug(`post ${type} message to worker`, { method, args, transferList });
const usedTransferList = [toHooksThread];
if (transferList) {
ArrayPrototypePushApply(usedTransferList, transferList);
}

this.#portToHooksThread.postMessage(
{
__proto__: null,
args,
lock: this.#lock,
method,
port: toHooksThread,
},
usedTransferList,
);

return fromHooksThread;
}

/**
* Invoke a remote method asynchronously.
* @param {string} method Method to invoke
Expand All @@ -543,22 +588,7 @@ class HooksProxy {
* @returns {Promise<any>}
*/
async makeAsyncRequest(method, transferList, ...args) {
this.waitForWorker();

MessageChannel ??= require('internal/worker/io').MessageChannel;
const asyncCommChannel = new MessageChannel();

// Pass work to the worker.
debug('post async message to worker', { method, args, transferList });
const finalTransferList = [asyncCommChannel.port2];
if (transferList) {
ArrayPrototypePushApply(finalTransferList, transferList);
}
this.#worker.postMessage({
__proto__: null,
method, args,
port: asyncCommChannel.port2,
}, finalTransferList);
const fromHooksThread = this.#postMessageToWorker(method, 'Async', transferList, args);

if (this.#numberOfPendingAsyncResponses++ === 0) {
// On the next lines, the main thread will await a response from the worker thread that might
Expand All @@ -567,7 +597,11 @@ class HooksProxy {
// However we want to keep the process alive until the worker thread responds (or until the
// event loop of the worker thread is also empty), so we ref the worker until we get all the
// responses back.
this.#worker.ref();
if (this.#worker) {
this.#worker.ref();
} else {
this.#portToHooksThread.ref();
}
}

let response;
Expand All @@ -576,18 +610,26 @@ class HooksProxy {
await AtomicsWaitAsync(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId).value;
this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);

response = receiveMessageOnPort(asyncCommChannel.port1);
response = receiveMessageOnPort(fromHooksThread);
} while (response == null);
debug('got async response from worker', { method, args }, this.#lock);

if (--this.#numberOfPendingAsyncResponses === 0) {
// We got all the responses from the worker, its job is done (until next time).
this.#worker.unref();
if (this.#worker) {
this.#worker.unref();
} else {
this.#portToHooksThread.unref();
}
}

if (response.message.status === 'exit') {
process.exit(response.message.body);
}

const body = this.#unwrapMessage(response);
asyncCommChannel.port1.close();
return body;
fromHooksThread.close();

return this.#unwrapMessage(response);
}

/**
Expand All @@ -598,11 +640,7 @@ class HooksProxy {
* @returns {any}
*/
makeSyncRequest(method, transferList, ...args) {
this.waitForWorker();

// Pass work to the worker.
debug('post sync message to worker', { method, args, transferList });
this.#worker.postMessage({ __proto__: null, method, args }, transferList);
const fromHooksThread = this.#postMessageToWorker(method, 'Sync', transferList, args);

let response;
do {
Expand All @@ -611,14 +649,17 @@ class HooksProxy {
AtomicsWait(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId);
this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);

response = this.#worker.receiveMessageSync();
response = receiveMessageOnPort(fromHooksThread);
} while (response == null);
debug('got sync response from worker', { method, args });
if (response.message.status === 'never-settle') {
process.exit(kUnsettledTopLevelAwait);
} else if (response.message.status === 'exit') {
process.exit(response.message.body);
}

fromHooksThread.close();
dygabo marked this conversation as resolved.
Show resolved Hide resolved

return this.#unwrapMessage(response);
}

Expand Down
19 changes: 16 additions & 3 deletions lib/internal/modules/esm/loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const { ModuleWrap, kEvaluating, kEvaluated } = internalBinding('module_wrap');
const {
urlToFilename,
} = require('internal/modules/helpers');
const { isMainThread } = require('worker_threads');
let defaultResolve, defaultLoad, defaultLoadSync, importMetaInitializer;

/**
Expand Down Expand Up @@ -607,21 +608,26 @@ class CustomizedModuleLoader {
*/
constructor() {
getHooksProxy();
_hasCustomizations = true;
}

/**
* Register some loader specifier.
* Register a loader specifier.
* @param {string} originalSpecifier The specified URL path of the loader to
* be registered.
* @param {string} parentURL The parent URL from where the loader will be
* registered if using it package name as specifier
* @param {any} [data] Arbitrary data to be passed from the custom loader
* (user-land) to the worker.
* @param {any[]} [transferList] Objects in `data` that are changing ownership
* @returns {{ format: string, url: URL['href'] }}
* @returns {{ format: string, url: URL['href'] } | undefined}
*/
register(originalSpecifier, parentURL, data, transferList) {
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data);
if (isMainThread) {
// Only the main thread has a Hooks instance with worker thread. All other Worker threads
// delegate their hooks to the HooksThread of the main thread.
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data);
}
}

/**
Expand Down Expand Up @@ -719,6 +725,12 @@ function getHooksProxy() {
return hooksProxy;
}

let _hasCustomizations = false;
function hasCustomizations() {
return _hasCustomizations;
}


let cascadedLoader;

/**
Expand Down Expand Up @@ -780,6 +792,7 @@ function register(specifier, parentURL = undefined, options) {

module.exports = {
createModuleLoader,
hasCustomizations,
getHooksProxy,
getOrInitializeCascadedLoader,
register,
Expand Down
Loading