Skip to content

Commit

Permalink
process: move worker bootstrap code into worker_thread_only.js
Browse files Browse the repository at this point in the history
Move worker bootstrap code into worker_thread_only.js from
internal/worker.js since they are only run once during bootstrap.
  • Loading branch information
joyeecheung committed Dec 24, 2018
1 parent 4765b70 commit 1c52afd
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 83 deletions.
97 changes: 89 additions & 8 deletions lib/internal/process/worker_thread_only.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,21 @@ const {
threadId
} = internalBinding('worker');

const debug = require('util').debuglog('worker');

const {
messageTypes,
kStdioWantsMoreDataCallback,
kWaitingStreams,
ReadableWorkerStdio,
WritableWorkerStdio
} = require('internal/worker/io');

const {
createMessageHandler,
createWorkerFatalExeception
} = require('internal/worker');
let debuglog;
function debug(...args) {
if (!debuglog) {
debuglog = require('util').debuglog('worker');
}
return debuglog(...args);
}

const workerStdio = {};

Expand All @@ -36,12 +39,90 @@ function initializeWorkerStdio() {
};
}

function createMessageHandler(port) {
const publicWorker = require('worker_threads');

return function(message) {
if (message.type === messageTypes.LOAD_SCRIPT) {
const { filename, doEval, workerData, publicPort, hasStdin } = message;
publicWorker.parentPort = publicPort;
publicWorker.workerData = workerData;

if (!hasStdin)
workerStdio.stdin.push(null);

debug(`[${threadId}] starts worker script ${filename} ` +
`(eval = ${eval}) at cwd = ${process.cwd()}`);
port.unref();
port.postMessage({ type: messageTypes.UP_AND_RUNNING });
if (doEval) {
const { evalScript } = require('internal/process/execution');
evalScript('[worker eval]', filename);
} else {
process.argv[1] = filename; // script filename
require('module').runMain();
}
return;
} else if (message.type === messageTypes.STDIO_PAYLOAD) {
const { stream, chunk, encoding } = message;
workerStdio[stream].push(chunk, encoding);
return;
} else if (message.type === messageTypes.STDIO_WANTS_MORE_DATA) {
const { stream } = message;
workerStdio[stream][kStdioWantsMoreDataCallback]();
return;
}

require('assert').fail(`Unknown worker message type ${message.type}`);
};
}

// XXX(joyeecheung): this has to be returned as an anonymous function
// wrapped in a closure, see the comment of the original
// process._fatalException in lib/internal/process/execution.js
function createWorkerFatalExeception(port) {
const {
fatalException: originalFatalException
} = require('internal/process/execution');

return (error) => {
debug(`[${threadId}] gets fatal exception`);
let caught = false;
try {
caught = originalFatalException.call(this, error);
} catch (e) {
error = e;
}
debug(`[${threadId}] fatal exception caught = ${caught}`);

if (!caught) {
let serialized;
try {
const { serializeError } = require('internal/error-serdes');
serialized = serializeError(error);
} catch {}
debug(`[${threadId}] fatal exception serialized = ${!!serialized}`);
if (serialized)
port.postMessage({
type: messageTypes.ERROR_MESSAGE,
error: serialized
});
else
port.postMessage({ type: messageTypes.COULD_NOT_SERIALIZE_ERROR });

const { clearAsyncIdStack } = require('internal/async_hooks');
clearAsyncIdStack();

process.exit();
}
};
}

function setup() {
debug(`[${threadId}] is setting up worker child environment`);

const port = getEnvMessagePort();
const publicWorker = require('worker_threads');
port.on('message', createMessageHandler(publicWorker, port, workerStdio));
port.on('message', createMessageHandler(port));
port.start();

return {
Expand Down
76 changes: 1 addition & 75 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const {
ERR_WORKER_UNSUPPORTED_EXTENSION,
} = require('internal/errors').codes;
const { validateString } = require('internal/validators');
const { clearAsyncIdStack } = require('internal/async_hooks');

const {
drainMessagePort,
Expand All @@ -24,7 +23,7 @@ const {
ReadableWorkerStdio,
WritableWorkerStdio,
} = require('internal/worker/io');
const { serializeError, deserializeError } = require('internal/error-serdes');
const { deserializeError } = require('internal/error-serdes');
const { pathToFileURL } = require('url');

const {
Expand Down Expand Up @@ -219,77 +218,6 @@ class Worker extends EventEmitter {
}
}

function createMessageHandler(publicWorker, port, workerStdio) {
return function(message) {
if (message.type === messageTypes.LOAD_SCRIPT) {
const { filename, doEval, workerData, publicPort, hasStdin } = message;
publicWorker.parentPort = publicPort;
publicWorker.workerData = workerData;

if (!hasStdin)
workerStdio.stdin.push(null);

debug(`[${threadId}] starts worker script ${filename} ` +
`(eval = ${eval}) at cwd = ${process.cwd()}`);
port.unref();
port.postMessage({ type: messageTypes.UP_AND_RUNNING });
if (doEval) {
const { evalScript } = require('internal/process/execution');
evalScript('[worker eval]', filename);
} else {
process.argv[1] = filename; // script filename
require('module').runMain();
}
return;
} else if (message.type === messageTypes.STDIO_PAYLOAD) {
const { stream, chunk, encoding } = message;
workerStdio[stream].push(chunk, encoding);
return;
} else if (message.type === messageTypes.STDIO_WANTS_MORE_DATA) {
const { stream } = message;
workerStdio[stream][kStdioWantsMoreDataCallback]();
return;
}

assert.fail(`Unknown worker message type ${message.type}`);
};
}

function createWorkerFatalExeception(port) {
const {
fatalException: originalFatalException
} = require('internal/process/execution');

return function(error) {
debug(`[${threadId}] gets fatal exception`);
let caught = false;
try {
caught = originalFatalException.call(this, error);
} catch (e) {
error = e;
}
debug(`[${threadId}] fatal exception caught = ${caught}`);

if (!caught) {
let serialized;
try {
serialized = serializeError(error);
} catch {}
debug(`[${threadId}] fatal exception serialized = ${!!serialized}`);
if (serialized)
port.postMessage({
type: messageTypes.ERROR_MESSAGE,
error: serialized
});
else
port.postMessage({ type: messageTypes.COULD_NOT_SERIALIZE_ERROR });
clearAsyncIdStack();

process.exit();
}
};
}

function pipeWithoutWarning(source, dest) {
const sourceMaxListeners = source._maxListeners;
const destMaxListeners = dest._maxListeners;
Expand All @@ -303,8 +231,6 @@ function pipeWithoutWarning(source, dest) {
}

module.exports = {
createMessageHandler,
createWorkerFatalExeception,
threadId,
Worker,
isMainThread
Expand Down

0 comments on commit 1c52afd

Please sign in to comment.