Skip to content

Commit

Permalink
worker_threads: properly handle env and NODE_OPTIONS in workers
Browse files Browse the repository at this point in the history
  • Loading branch information
lundibundi committed Feb 10, 2020
1 parent 0c87f8c commit 503003e
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 75 deletions.
4 changes: 2 additions & 2 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1358,8 +1358,8 @@ E('ERR_VM_MODULE_NOT_MODULE',
'Provided module is not an instance of Module', Error);
E('ERR_VM_MODULE_STATUS', 'Module status %s', Error);
E('ERR_WASI_ALREADY_STARTED', 'WASI instance has already started', Error);
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors) =>
`Initiated Worker with invalid execArgv flags: ${errors.join(', ')}`,
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors, msg = 'invalid execArgv flags') =>
`Initiated Worker with ${msg}: ${errors.join(', ')}`,
Error);
E('ERR_WORKER_NOT_RUNNING', 'Worker instance not running', Error);
E('ERR_WORKER_OUT_OF_MEMORY', 'Worker terminated due to reaching memory limit',
Expand Down
13 changes: 6 additions & 7 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,17 +125,16 @@ class Worker extends EventEmitter {

const url = options.eval ? null : pathToFileURL(filename);
// Set up the C++ handle for the worker, as well as some internal wiring.
this[kHandle] = new WorkerImpl(url, options.execArgv,
this[kHandle] = new WorkerImpl(url,
env === process.env ? null : env,
options.execArgv,
parseResourceLimits(options.resourceLimits));
if (this[kHandle].invalidExecArgv) {
throw new ERR_WORKER_INVALID_EXEC_ARGV(this[kHandle].invalidExecArgv);
}
if (env === process.env) {
// This may be faster than manually cloning the object in C++, especially
// when recursively spawning Workers.
this[kHandle].cloneParentEnvVars();
} else if (env !== undefined) {
this[kHandle].setEnvVars(env);
if (this[kHandle].invalidNodeOptions) {
throw new ERR_WORKER_INVALID_EXEC_ARGV(
this[kHandle].invalidNodeOptions, 'invalid NODE_OPTIONS env variable');
}
this[kHandle].onexit = (code, customErr) => this[kOnExit](code, customErr);
this[kPort] = this[kHandle].messagePort;
Expand Down
118 changes: 79 additions & 39 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <string>
#include <vector>

using node::kAllowedInEnvironment;
using node::kDisallowedInEnvironment;
using v8::Array;
using v8::ArrayBuffer;
Expand Down Expand Up @@ -47,14 +48,15 @@ Worker::Worker(Environment* env,
Local<Object> wrap,
const std::string& url,
std::shared_ptr<PerIsolateOptions> per_isolate_opts,
std::vector<std::string>&& exec_argv)
std::vector<std::string>&& exec_argv,
std::shared_ptr<KVStore> env_vars)
: AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
per_isolate_opts_(per_isolate_opts),
exec_argv_(exec_argv),
platform_(env->isolate_data()->platform()),
start_profiler_idle_notifier_(env->profiler_idle_notifier_started()),
thread_id_(Environment::AllocateThreadId()),
env_vars_(env->env_vars()) {
env_vars_(env_vars) {
Debug(this, "Creating new worker instance with thread id %llu", thread_id_);

// Set up everything that needs to be set up in the parent environment.
Expand Down Expand Up @@ -442,6 +444,7 @@ Worker::~Worker() {

void Worker::New(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Isolate* isolate = args.GetIsolate();

CHECK(args.IsConstructCall());

Expand All @@ -452,24 +455,81 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) {

std::string url;
std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
std::shared_ptr<KVStore> env_vars = nullptr;

std::vector<std::string> exec_argv_out;
bool has_explicit_exec_argv = false;

CHECK_EQ(args.Length(), 3);
CHECK_EQ(args.Length(), 4);
// Argument might be a string or URL
if (!args[0]->IsNullOrUndefined()) {
Utf8Value value(
args.GetIsolate(),
args[0]->ToString(env->context()).FromMaybe(Local<String>()));
isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
url.append(value.out(), value.length());
}

if (args[1]->IsArray()) {
Local<Array> array = args[1].As<Array>();
if (args[1]->IsNull()) {
// Means worker.env = { ...process.env }.
env_vars = env->env_vars()->Clone(isolate);
} else if (args[1]->IsObject()) {
// User provided env.
env_vars = KVStore::CreateMapKVStore();
env_vars->AssignFromObject(isolate->GetCurrentContext(),
args[1].As<Object>());
} else {
// Env is shared.
env_vars = env->env_vars();
}

if (args[1]->IsObject() || args[2]->IsArray()) {
per_isolate_opts.reset(new PerIsolateOptions());

HandleEnvOptions(
per_isolate_opts->per_env, [isolate, &env_vars](const char* name) {
MaybeLocal<String> value =
env_vars->Get(isolate, OneByteString(isolate, name));
return value.IsEmpty() ? std::string{}
: std::string(*String::Utf8Value(
isolate, value.ToLocalChecked()));
});

#ifndef NODE_WITHOUT_NODE_OPTIONS
MaybeLocal<String> maybe_node_opts =
env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
if (!maybe_node_opts.IsEmpty()) {
std::string node_options(
*String::Utf8Value(isolate, maybe_node_opts.ToLocalChecked()));
std::vector<std::string> errors{};
std::vector<std::string> env_argv =
ParseNodeOptionsEnvVar(node_options, &errors);
// [0] is expected to be the program name, add dummy string.
env_argv.insert(env_argv.begin(), "");
std::vector<std::string> invalid_args{};
options_parser::Parse(&env_argv,
nullptr,
&invalid_args,
per_isolate_opts.get(),
kAllowedInEnvironment,
&errors);
if (errors.size() > 0 && args[1]->IsObject()) {
// Only fail for explicitly provided env, this protects from failures
// when NODE_OPTIONS from parent's env is used (which is the default).
Local<Value> error;
if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
Local<String> key =
FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
// Ignore the return value of Set() because exceptions bubble up to JS
// when we return anyway.
USE(args.This()->Set(env->context(), key, error));
return;
}
}
#endif
}

if (args[2]->IsArray()) {
Local<Array> array = args[2].As<Array>();
// The first argument is reserved for program name, but we don't need it
// in workers.
has_explicit_exec_argv = true;
std::vector<std::string> exec_argv = {""};
uint32_t length = array->Length();
for (uint32_t i = 0; i < length; i++) {
Expand All @@ -491,8 +551,6 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) {

std::vector<std::string> invalid_args{};
std::vector<std::string> errors{};
per_isolate_opts.reset(new PerIsolateOptions());

// Using invalid_args as the v8_args argument as it stores unknown
// options for the per isolate parser.
options_parser::Parse(
Expand All @@ -519,40 +577,24 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) {
USE(args.This()->Set(env->context(), key, error));
return;
}
}
if (!has_explicit_exec_argv)
} else {
exec_argv_out = env->exec_argv();
}

Worker* worker =
new Worker(env, args.This(), url, per_isolate_opts,
std::move(exec_argv_out));
Worker* worker = new Worker(env,
args.This(),
url,
per_isolate_opts,
std::move(exec_argv_out),
env_vars);

CHECK(args[2]->IsFloat64Array());
Local<Float64Array> limit_info = args[2].As<Float64Array>();
CHECK(args[3]->IsFloat64Array());
Local<Float64Array> limit_info = args[3].As<Float64Array>();
CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
limit_info->CopyContents(worker->resource_limits_,
sizeof(worker->resource_limits_));
}

void Worker::CloneParentEnvVars(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
CHECK(w->thread_joined_); // The Worker has not started yet.

w->env_vars_ = w->env()->env_vars()->Clone(args.GetIsolate());
}

void Worker::SetEnvVars(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
CHECK(w->thread_joined_); // The Worker has not started yet.

CHECK(args[0]->IsObject());
w->env_vars_ = KVStore::CreateMapKVStore();
w->env_vars_->AssignFromObject(args.GetIsolate()->GetCurrentContext(),
args[0].As<Object>());
}

void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
Expand Down Expand Up @@ -724,8 +766,6 @@ void InitWorker(Local<Object> target,
w->InstanceTemplate()->SetInternalFieldCount(1);
w->Inherit(AsyncWrap::GetConstructorTemplate(env));

env->SetProtoMethod(w, "setEnvVars", Worker::SetEnvVars);
env->SetProtoMethod(w, "cloneParentEnvVars", Worker::CloneParentEnvVars);
env->SetProtoMethod(w, "startThread", Worker::StartThread);
env->SetProtoMethod(w, "stopThread", Worker::StopThread);
env->SetProtoMethod(w, "ref", Worker::Ref);
Expand Down
3 changes: 2 additions & 1 deletion src/node_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class Worker : public AsyncWrap {
v8::Local<v8::Object> wrap,
const std::string& url,
std::shared_ptr<PerIsolateOptions> per_isolate_opts,
std::vector<std::string>&& exec_argv);
std::vector<std::string>&& exec_argv,
std::shared_ptr<KVStore> env_vars);
~Worker() override;

// Run the worker. This is only called from the worker thread.
Expand Down
100 changes: 75 additions & 25 deletions test/parallel/test-cli-node-options.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,44 @@ if (process.config.variables.node_without_node_options)

const assert = require('assert');
const exec = require('child_process').execFile;
const { Worker } = require('worker_threads');

const tmpdir = require('../common/tmpdir');
tmpdir.refresh();

const printA = require.resolve('../fixtures/printA.js');
const printSpaceA = require.resolve('../fixtures/print A.js');

expect(` -r ${printA} `, 'A\nB\n');
expect(`-r ${printA}`, 'A\nB\n');
expect(`-r ${JSON.stringify(printA)}`, 'A\nB\n');
expect(`-r ${JSON.stringify(printSpaceA)}`, 'A\nB\n');
expect(`-r ${printA} -r ${printA}`, 'A\nB\n');
expect(` -r ${printA} -r ${printA}`, 'A\nB\n');
expect(` --require ${printA} --require ${printA}`, 'A\nB\n');
expectNoWorker(` -r ${printA} `, 'A\nB\n');
expectNoWorker(`-r ${printA}`, 'A\nB\n');
expectNoWorker(`-r ${JSON.stringify(printA)}`, 'A\nB\n');
expectNoWorker(`-r ${JSON.stringify(printSpaceA)}`, 'A\nB\n');
expectNoWorker(`-r ${printA} -r ${printA}`, 'A\nB\n');
expectNoWorker(` -r ${printA} -r ${printA}`, 'A\nB\n');
expectNoWorker(` --require ${printA} --require ${printA}`, 'A\nB\n');
expect('--no-deprecation', 'B\n');
expect('--no-warnings', 'B\n');
expect('--no_warnings', 'B\n');
expect('--trace-warnings', 'B\n');
expect('--redirect-warnings=_', 'B\n');
expect('--trace-deprecation', 'B\n');
expect('--trace-sync-io', 'B\n');
expect('--trace-events-enabled', 'B\n');
expectNoWorker('--trace-events-enabled', 'B\n');
expect('--track-heap-objects', 'B\n');
expect('--throw-deprecation', 'B\n');
expect('--zero-fill-buffers', 'B\n');
expect('--v8-pool-size=10', 'B\n');
expect('--trace-event-categories node', 'B\n');
// eslint-disable-next-line no-template-curly-in-string
expect('--trace-event-file-pattern {pid}-${rotation}.trace_events', 'B\n');
expect('--throw-deprecation',
/.*DeprecationWarning: Buffer\(\) is deprecated due to security and usability issues.*/,
'new Buffer(42)',
true);
expectNoWorker('--zero-fill-buffers', 'B\n');
expectNoWorker('--v8-pool-size=10', 'B\n');
expectNoWorker('--trace-event-categories node', 'B\n');
expectNoWorker(
// eslint-disable-next-line no-template-curly-in-string
'--trace-event-file-pattern {pid}-${rotation}.trace_events',
'B\n'
);
// eslint-disable-next-line no-template-curly-in-string
expect('--trace-event-file-pattern {pid}-${rotation}.trace_events ' +
expectNoWorker('--trace-event-file-pattern {pid}-${rotation}.trace_events ' +
'--trace-event-categories node.async_hooks', 'B\n');
expect('--unhandled-rejections=none', 'B\n');

Expand All @@ -53,24 +60,30 @@ if (common.isLinux && ['arm', 'x64'].includes(process.arch)) {
}

if (common.hasCrypto) {
expect('--use-openssl-ca', 'B\n');
expect('--use-bundled-ca', 'B\n');
expect('--openssl-config=_ossl_cfg', 'B\n');
expectNoWorker('--use-openssl-ca', 'B\n');
expectNoWorker('--use-bundled-ca', 'B\n');
expectNoWorker('--openssl-config=_ossl_cfg', 'B\n');
}

// V8 options
expect('--abort_on-uncaught_exception', 'B\n');
expect('--disallow-code-generation-from-strings', 'B\n');
expect('--max-old-space-size=0', 'B\n');
expect('--stack-trace-limit=100',
/(\s*at f \(\[eval\]:1:\d*\)\r?\n){100}/,
/(\s*at f \(\[(eval|worker eval)\]:1:\d*\)\r?\n)/,
'(function f() { f(); })();',
true);
// Unsupported on arm. See https://crbug.com/v8/8713.
if (!['arm', 'arm64'].includes(process.arch))
expect('--interpreted-frames-native-stack', 'B\n');

function expect(opt, want, command = 'console.log("B")', wantsError = false) {
function expectNoWorker(opt, want, command, wantsError) {
expect(opt, want, command, wantsError, false);
}

function expect(
opt, want, command = 'console.log("B")', wantsError = false, testWorker = true
) {
const argv = ['-e', command];
const opts = {
cwd: tmpdir.path,
Expand All @@ -79,15 +92,52 @@ function expect(opt, want, command = 'console.log("B")', wantsError = false) {
};
if (typeof want === 'string')
want = new RegExp(want);
exec(process.execPath, argv, opts, common.mustCall((err, stdout, stderr) => {

const test = (type) => common.mustCall((err, stdout) => {
const o = JSON.stringify(opt);
if (wantsError) {
stdout = stderr;
assert.ok(err, `${type}: expected error for ${o}`);
stdout = err.stack;
} else {
assert.ifError(err);
assert.ifError(err, `${type}: failed for ${o}`);
}
if (want.test(stdout)) return;

const o = JSON.stringify(opt);
assert.fail(`For ${o}, failed to find ${want} in: <\n${stdout}\n>`);
assert.fail(
`${type}: for ${o}, failed to find ${want} in: <\n${stdout}\n>`
);
});

exec(process.execPath, argv, opts, test('child process'));
if (testWorker)
workerTest(opts, command, wantsError, test('worker'));
}

async function collectStream(readable) {
readable.setEncoding('utf8');
let data = '';
for await (const chunk of readable) {
data += chunk;
}
return data;
}

function workerTest(opts, command, wantsError, test) {
let workerError = null;
const worker = new Worker(command, {
...opts,
execArgv: [],
eval: true,
stdout: true,
stderr: true
});
worker.on('error', (err) => {
workerError = err;
});
worker.on('exit', common.mustCall((code) => {
assert.strictEqual(code, wantsError ? 1 : 0);
collectStream(worker.stdout).then((stdout) => {
test(workerError, stdout);
});
}));
}
Loading

0 comments on commit 503003e

Please sign in to comment.