Skip to content

Commit

Permalink
src: align worker and main thread code with embedder API
Browse files Browse the repository at this point in the history
This addresses some long-standing TODOs by Joyee and me about
making the embedder API more powerful and us less reliant on
internal APIs for creating the main thread and Workers.

Backport-PR-URL: #35241
PR-URL: #30467
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Gireesh Punathil <[email protected]>
  • Loading branch information
addaleax committed Sep 23, 2020
1 parent e809a5c commit 808dedc
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 115 deletions.
90 changes: 84 additions & 6 deletions src/api/environment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
#include "node_v8_platform-inl.h"
#include "uv.h"

#if HAVE_INSPECTOR
#include "inspector/worker_inspector.h" // ParentInspectorHandle
#endif

namespace node {
using errors::TryCatchScope;
using v8::Array;
Expand Down Expand Up @@ -319,26 +323,40 @@ Environment* CreateEnvironment(IsolateData* isolate_data,
const char* const* argv,
int exec_argc,
const char* const* exec_argv) {
return CreateEnvironment(
isolate_data, context,
std::vector<std::string>(argv, argv + argc),
std::vector<std::string>(exec_argv, exec_argv + exec_argc));
}

Environment* CreateEnvironment(
IsolateData* isolate_data,
Local<Context> context,
const std::vector<std::string>& args,
const std::vector<std::string>& exec_args,
EnvironmentFlags::Flags flags,
ThreadId thread_id) {
Isolate* isolate = context->GetIsolate();
HandleScope handle_scope(isolate);
Context::Scope context_scope(context);
// TODO(addaleax): This is a much better place for parsing per-Environment
// options than the global parse call.
std::vector<std::string> args(argv, argv + argc);
std::vector<std::string> exec_args(exec_argv, exec_argv + exec_argc);
// TODO(addaleax): Provide more sensible flags, in an embedder-accessible way.
Environment* env = new Environment(
isolate_data,
context,
args,
exec_args,
static_cast<Environment::Flags>(Environment::kOwnsProcessState |
Environment::kOwnsInspector));
env->InitializeLibuv(per_process::v8_is_profiling);
flags,
thread_id);
if (flags & EnvironmentFlags::kOwnsProcessState) {
env->set_abort_on_uncaught_exception(false);
}

if (env->RunBootstrapping().IsEmpty()) {
FreeEnvironment(env);
return nullptr;
}

return env;
}

Expand All @@ -363,6 +381,58 @@ void FreeEnvironment(Environment* env) {
delete env;
}

InspectorParentHandle::~InspectorParentHandle() {}

// Hide the internal handle class from the public API.
#if HAVE_INSPECTOR
struct InspectorParentHandleImpl : public InspectorParentHandle {
std::unique_ptr<inspector::ParentInspectorHandle> impl;

explicit InspectorParentHandleImpl(
std::unique_ptr<inspector::ParentInspectorHandle>&& impl)
: impl(std::move(impl)) {}
};
#endif

NODE_EXTERN std::unique_ptr<InspectorParentHandle> GetInspectorParentHandle(
Environment* env,
ThreadId thread_id,
const char* url) {
CHECK_NOT_NULL(env);
CHECK_NE(thread_id.id, static_cast<uint64_t>(-1));
#if HAVE_INSPECTOR
return std::make_unique<InspectorParentHandleImpl>(
env->inspector_agent()->GetParentHandle(thread_id.id, url));
#else
return {};
#endif
}

void LoadEnvironment(Environment* env) {
USE(LoadEnvironment(env, {}));
}

MaybeLocal<Value> LoadEnvironment(
Environment* env,
std::unique_ptr<InspectorParentHandle> inspector_parent_handle) {
env->InitializeLibuv(per_process::v8_is_profiling);
env->InitializeDiagnostics();

#if HAVE_INSPECTOR
if (inspector_parent_handle) {
env->InitializeInspector(
std::move(static_cast<InspectorParentHandleImpl*>(
inspector_parent_handle.get())->impl));
} else {
env->InitializeInspector({});
}
#endif

// TODO(joyeecheung): Allow embedders to customize the entry
// point more directly without using _third_party_main.js
return StartExecution(env);
}

Environment* GetCurrentEnvironment(Local<Context> context) {
return Environment::GetCurrent(context);
}
Expand Down Expand Up @@ -579,4 +649,12 @@ void AddLinkedBinding(Environment* env,
AddLinkedBinding(env, mod);
}

static std::atomic<uint64_t> next_thread_id{0};

ThreadId AllocateEnvironmentThreadId() {
ThreadId ret;
ret.id = next_thread_id++;
return ret;
}

} // namespace node
11 changes: 7 additions & 4 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -813,8 +813,9 @@ void Environment::SetImmediateThreadsafe(Fn&& cb, CallbackFlags::Flags flags) {
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_threadsafe_.Push(std::move(callback));
if (task_queues_async_initialized_)
uv_async_send(&task_queues_async_);
}
uv_async_send(&task_queues_async_);
}

template <typename Fn>
Expand All @@ -824,8 +825,9 @@ void Environment::RequestInterrupt(Fn&& cb) {
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_interrupts_.Push(std::move(callback));
if (task_queues_async_initialized_)
uv_async_send(&task_queues_async_);
}
uv_async_send(&task_queues_async_);
RequestInterruptFromV8();
}

Expand Down Expand Up @@ -858,11 +860,11 @@ inline bool Environment::is_main_thread() const {
}

inline bool Environment::owns_process_state() const {
return flags_ & kOwnsProcessState;
return flags_ & EnvironmentFlags::kOwnsProcessState;
}

inline bool Environment::owns_inspector() const {
return flags_ & kOwnsInspector;
return flags_ & EnvironmentFlags::kOwnsInspector;
}

inline uint64_t Environment::thread_id() const {
Expand Down Expand Up @@ -1176,6 +1178,7 @@ void Environment::RemoveCleanupHook(void (*fn)(void*), void* arg) {
inline void Environment::RegisterFinalizationGroupForCleanup(
v8::Local<v8::FinalizationGroup> group) {
cleanup_finalization_groups_.emplace_back(isolate(), group);
DCHECK(task_queues_async_initialized_);
uv_async_send(&task_queues_async_);
}

Expand Down
47 changes: 34 additions & 13 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,6 @@ void TrackingTraceStateObserver::UpdateTraceCategoryState() {
USE(cb->Call(env_->context(), Undefined(isolate), arraysize(args), args));
}

static std::atomic<uint64_t> next_thread_id{0};

uint64_t Environment::AllocateThreadId() {
return next_thread_id++;
}

void Environment::CreateProperties() {
HandleScope handle_scope(isolate_);
Local<Context> ctx = context();
Expand Down Expand Up @@ -319,8 +313,8 @@ Environment::Environment(IsolateData* isolate_data,
Local<Context> context,
const std::vector<std::string>& args,
const std::vector<std::string>& exec_args,
Flags flags,
uint64_t thread_id)
EnvironmentFlags::Flags flags,
ThreadId thread_id)
: isolate_(context->GetIsolate()),
isolate_data_(isolate_data),
immediate_info_(context->GetIsolate()),
Expand All @@ -332,14 +326,23 @@ Environment::Environment(IsolateData* isolate_data,
should_abort_on_uncaught_toggle_(isolate_, 1),
stream_base_state_(isolate_, StreamBase::kNumStreamBaseStateFields),
flags_(flags),
thread_id_(thread_id == kNoThreadId ? AllocateThreadId() : thread_id),
thread_id_(thread_id.id == static_cast<uint64_t>(-1) ?
AllocateEnvironmentThreadId().id : thread_id.id),
fs_stats_field_array_(isolate_, kFsStatsBufferLength),
fs_stats_field_bigint_array_(isolate_, kFsStatsBufferLength),
context_(context->GetIsolate(), context) {
// We'll be creating new objects so make sure we've entered the context.
HandleScope handle_scope(isolate());
Context::Scope context_scope(context);

// Set some flags if only kDefaultFlags was passed. This can make API version
// transitions easier for embedders.
if (flags_ & EnvironmentFlags::kDefaultFlags) {
flags_ = flags_ |
EnvironmentFlags::kOwnsProcessState |
EnvironmentFlags::kOwnsInspector;
}

set_env_vars(per_process::system_environment);
enabled_debug_list_.Parse(this);

Expand All @@ -358,6 +361,10 @@ Environment::Environment(IsolateData* isolate_data,

AssignToContext(context, ContextInfo(""));

static uv_once_t init_once = UV_ONCE_INIT;
uv_once(&init_once, InitThreadLocalOnce);
uv_key_set(&thread_local_env, this);

if (tracing::AgentWriterHandle* writer = GetTracingAgentWriter()) {
trace_state_observer_ = std::make_unique<TrackingTraceStateObserver>(this);
if (TracingController* tracing_controller = writer->GetTracingController())
Expand Down Expand Up @@ -407,6 +414,9 @@ Environment::Environment(IsolateData* isolate_data,
Environment::~Environment() {
if (interrupt_data_ != nullptr) *interrupt_data_ = nullptr;

// FreeEnvironment() should have set this.
CHECK(is_stopping());

isolate()->GetHeapProfiler()->RemoveBuildEmbedderGraphCallback(
BuildEmbedderGraph, this);

Expand Down Expand Up @@ -493,6 +503,15 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));
uv_unref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));

{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
task_queues_async_initialized_ = true;
if (native_immediates_threadsafe_.size() > 0 ||
native_immediates_interrupts_.size() > 0) {
uv_async_send(&task_queues_async_);
}
}

// Register clean-up cb to be called to clean up the handles
// when the environment is freed, note that they are not cleaned in
// the one environment per process setup, but will be called in
Expand All @@ -502,10 +521,6 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
if (start_profiler_idle_notifier) {
StartProfilerIdleNotifier();
}

static uv_once_t init_once = UV_ONCE_INIT;
uv_once(&init_once, InitThreadLocalOnce);
uv_key_set(&thread_local_env, this);
}

void Environment::ExitEnv() {
Expand Down Expand Up @@ -539,6 +554,11 @@ void Environment::RegisterHandleCleanups() {
}

void Environment::CleanupHandles() {
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
task_queues_async_initialized_ = false;
}

Isolate::DisallowJavascriptExecutionScope disallow_js(isolate(),
Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);

Expand Down Expand Up @@ -1103,6 +1123,7 @@ void Environment::CleanupFinalizationGroups() {
if (try_catch.HasCaught() && !try_catch.HasTerminated())
errors::TriggerUncaughtException(isolate(), try_catch);
// Re-schedule the execution of the remainder of the queue.
CHECK(task_queues_async_initialized_);
uv_async_send(&task_queues_async_);
return;
}
Expand Down
20 changes: 8 additions & 12 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -874,12 +874,6 @@ class Environment : public MemoryRetainer {
inline void PushAsyncCallbackScope();
inline void PopAsyncCallbackScope();

enum Flags {
kNoFlags = 0,
kOwnsProcessState = 1 << 1,
kOwnsInspector = 1 << 2,
};

static inline Environment* GetCurrent(v8::Isolate* isolate);
static inline Environment* GetCurrent(v8::Local<v8::Context> context);
static inline Environment* GetCurrent(
Expand All @@ -898,8 +892,8 @@ class Environment : public MemoryRetainer {
v8::Local<v8::Context> context,
const std::vector<std::string>& args,
const std::vector<std::string>& exec_args,
Flags flags = Flags(),
uint64_t thread_id = kNoThreadId);
EnvironmentFlags::Flags flags,
ThreadId thread_id);
~Environment() override;

void InitializeLibuv(bool start_profiler_idle_notifier);
Expand Down Expand Up @@ -1068,9 +1062,6 @@ class Environment : public MemoryRetainer {
inline bool has_serialized_options() const;
inline void set_has_serialized_options(bool has_serialized_options);

static uint64_t AllocateThreadId();
static constexpr uint64_t kNoThreadId = -1;

inline bool is_main_thread() const;
inline bool owns_process_state() const;
inline bool owns_inspector() const;
Expand Down Expand Up @@ -1350,7 +1341,7 @@ class Environment : public MemoryRetainer {
bool has_serialized_options_ = false;

std::atomic_bool can_call_into_js_ { true };
Flags flags_;
uint64_t flags_;
uint64_t thread_id_;
std::unordered_set<worker::Worker*> sub_worker_contexts_;

Expand Down Expand Up @@ -1409,6 +1400,11 @@ class Environment : public MemoryRetainer {
Mutex native_immediates_threadsafe_mutex_;
NativeImmediateQueue native_immediates_threadsafe_;
NativeImmediateQueue native_immediates_interrupts_;
// Also guarded by native_immediates_threadsafe_mutex_. This can be used when
// trying to post tasks from other threads to an Environment, as the libuv
// handle for the immediate queues (task_queues_async_) may not be initialized
// yet or already have been destroyed.
bool task_queues_async_initialized_ = false;

void RunAndClearNativeImmediates(bool only_refed = false);
void RunAndClearInterrupts();
Expand Down
Loading

0 comments on commit 808dedc

Please sign in to comment.