diff --git a/src/node_messaging.cc b/src/node_messaging.cc index 34977557c5bfb8..7ca3ad14d03406 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -584,13 +584,6 @@ void MessagePort::OnMessage() { // Get the head of the message queue. Mutex::ScopedLock lock(data_->mutex_); - if (stop_event_loop_) { - Debug(this, "MessagePort stops loop as requested"); - CHECK(!data_->receiving_messages_); - uv_stop(env()->event_loop()); - break; - } - Debug(this, "MessagePort has message, receiving = %d", static_cast(data_->receiving_messages_)); @@ -740,15 +733,6 @@ void MessagePort::Stop() { data_->receiving_messages_ = false; } -void MessagePort::StopEventLoop() { - Mutex::ScopedLock lock(data_->mutex_); - data_->receiving_messages_ = false; - stop_event_loop_ = true; - - Debug(this, "Received StopEventLoop request"); - TriggerAsync(); -} - void MessagePort::Start(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); MessagePort* port; diff --git a/src/node_messaging.h b/src/node_messaging.h index cfda69ae7ff16b..9055a8bf961844 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -159,9 +159,6 @@ class MessagePort : public HandleWrap { void Start(); // Stop processing messages on this port as a receiving end. void Stop(); - // Stop processing messages on this port as a receiving end, - // and stop the event loop that this port is associated with. - void StopEventLoop(); static void New(const v8::FunctionCallbackInfo& args); static void PostMessage(const v8::FunctionCallbackInfo& args); @@ -206,7 +203,6 @@ class MessagePort : public HandleWrap { inline uv_async_t* async(); std::unique_ptr data_ = nullptr; - bool stop_event_loop_ = false; friend class MessagePortData; }; diff --git a/src/node_worker.cc b/src/node_worker.cc index 2c8222f7f5229c..0f1535074cca6c 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -58,6 +58,46 @@ void WaitForWorkerInspectorToStop(Environment* child) { } // anonymous namespace +void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) { + Mutex::ScopedLock lock(mutex_); + env_ = env; + async_ = new uv_async_t; + if (data != nullptr) async_->data = data; + CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0); +} + +void AsyncRequest::Uninstall() { + Mutex::ScopedLock lock(mutex_); + if (async_ != nullptr) + env_->CloseHandle(async_, [](uv_async_t* async) { delete async; }); +} + +void AsyncRequest::Stop() { + Mutex::ScopedLock lock(mutex_); + stop_ = true; + if (async_ != nullptr) uv_async_send(async_); +} + +void AsyncRequest::SetStopped(bool flag) { + Mutex::ScopedLock lock(mutex_); + stop_ = flag; +} + +bool AsyncRequest::IsStopped() const { + Mutex::ScopedLock lock(mutex_); + return stop_; +} + +uv_async_t* AsyncRequest::GetHandle() { + Mutex::ScopedLock lock(mutex_); + return async_; +} + +void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const { + Mutex::ScopedLock lock(mutex_); + if (async_ != nullptr) tracker->TrackField("async_request", *async_); +} + Worker::Worker(Environment* env, Local wrap, const std::string& url, @@ -98,8 +138,7 @@ Worker::Worker(Environment* env, } bool Worker::is_stopped() const { - Mutex::ScopedLock stopped_lock(stopped_mutex_); - return stopped_; + return thread_stopper_.IsStopped(); } // This class contains data that is only relevant to the child thread itself, @@ -207,6 +246,8 @@ void Worker::Run() { Context::Scope context_scope(env_->context()); if (child_port != nullptr) child_port->Close(); + thread_stopper_.Uninstall(); + thread_stopper_.SetStopped(true); env_->stop_sub_worker_contexts(); env_->RunCleanup(); RunAtExit(env_.get()); @@ -215,11 +256,6 @@ void Worker::Run() { WaitForWorkerInspectorToStop(env_.get()); #endif - { - Mutex::ScopedLock stopped_lock(stopped_mutex_); - stopped_ = true; - } - // This call needs to be made while the `Environment` is still alive // because we assume that it is available for async tracking in the // NodePlatform implementation. @@ -227,11 +263,12 @@ void Worker::Run() { } }); + if (thread_stopper_.IsStopped()) return; { HandleScope handle_scope(isolate_); Local context = NewContext(isolate_); - if (is_stopped()) return; + if (thread_stopper_.IsStopped()) return; CHECK(!context.IsEmpty()); Context::Scope context_scope(context); { @@ -253,6 +290,14 @@ void Worker::Run() { Debug(this, "Created Environment for worker with id %llu", thread_id_); if (is_stopped()) return; + thread_stopper_.Install(env_.get(), env_.get(), [](uv_async_t* handle) { + Environment* env_ = static_cast(handle->data); + uv_stop(env_->event_loop()); + }); + uv_unref(reinterpret_cast(thread_stopper_.GetHandle())); + + Debug(this, "Created Environment for worker with id %llu", thread_id_); + if (thread_stopper_.IsStopped()) return; { HandleScope handle_scope(isolate_); Mutex::ScopedLock lock(mutex_); @@ -268,7 +313,7 @@ void Worker::Run() { Debug(this, "Created message port for worker %llu", thread_id_); } - if (is_stopped()) return; + if (thread_stopper_.IsStopped()) return; { #if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR StartWorkerInspector(env_.get(), @@ -289,22 +334,21 @@ void Worker::Run() { Debug(this, "Loaded environment for worker %llu", thread_id_); } - if (is_stopped()) return; + if (thread_stopper_.IsStopped()) return; { SealHandleScope seal(isolate_); bool more; env_->performance_state()->Mark( node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START); do { - if (is_stopped()) break; + if (thread_stopper_.IsStopped()) break; uv_run(&data.loop_, UV_RUN_DEFAULT); - if (is_stopped()) break; + if (thread_stopper_.IsStopped()) break; platform_->DrainTasks(isolate_); more = uv_loop_alive(&data.loop_); - if (more && !is_stopped()) - continue; + if (more && !thread_stopper_.IsStopped()) continue; EmitBeforeExit(env_.get()); @@ -319,7 +363,7 @@ void Worker::Run() { { int exit_code; - bool stopped = is_stopped(); + bool stopped = thread_stopper_.IsStopped(); if (!stopped) exit_code = EmitExit(env_.get()); Mutex::ScopedLock lock(mutex_); @@ -341,34 +385,11 @@ void Worker::JoinThread() { thread_joined_ = true; env()->remove_sub_worker_context(this); - - if (thread_exit_async_) { - env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) { - delete async; - }); - - if (scheduled_on_thread_stopped_) - OnThreadStopped(); - } + OnThreadStopped(); + on_thread_finished_.Uninstall(); } void Worker::OnThreadStopped() { - { - Mutex::ScopedLock lock(mutex_); - scheduled_on_thread_stopped_ = false; - - Debug(this, "Worker %llu thread stopped", thread_id_); - - { - Mutex::ScopedLock stopped_lock(stopped_mutex_); - CHECK(stopped_); - } - - parent_port_ = nullptr; - } - - JoinThread(); - { HandleScope handle_scope(env()->isolate()); Context::Scope context_scope(env()->context()); @@ -391,7 +412,7 @@ Worker::~Worker() { Mutex::ScopedLock lock(mutex_); JoinThread(); - CHECK(stopped_); + CHECK(thread_stopper_.IsStopped()); CHECK(thread_joined_); // This has most likely already happened within the worker thread -- this @@ -480,16 +501,15 @@ void Worker::StartThread(const FunctionCallbackInfo& args) { Mutex::ScopedLock lock(w->mutex_); w->env()->add_sub_worker_context(w); - w->stopped_ = false; w->thread_joined_ = false; + w->thread_stopper_.SetStopped(false); - w->thread_exit_async_.reset(new uv_async_t); - w->thread_exit_async_->data = w; - CHECK_EQ(uv_async_init(w->env()->event_loop(), - w->thread_exit_async_.get(), - [](uv_async_t* handle) { - static_cast(handle->data)->OnThreadStopped(); - }), 0); + w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) { + Worker* w_ = static_cast(handle->data); + CHECK(w_->thread_stopper_.IsStopped()); + w_->parent_port_ = nullptr; + w_->JoinThread(); + }); uv_thread_options_t thread_options; thread_options.flags = UV_THREAD_HAS_STACK_SIZE; @@ -505,9 +525,7 @@ void Worker::StartThread(const FunctionCallbackInfo& args) { w->Run(); Mutex::ScopedLock lock(w->mutex_); - CHECK(w->thread_exit_async_); - w->scheduled_on_thread_stopped_ = true; - uv_async_send(w->thread_exit_async_.get()); + w->on_thread_finished_.Stop(); }, static_cast(w)), 0); } @@ -523,28 +541,23 @@ void Worker::StopThread(const FunctionCallbackInfo& args) { void Worker::Ref(const FunctionCallbackInfo& args) { Worker* w; ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); - if (w->thread_exit_async_) - uv_ref(reinterpret_cast(w->thread_exit_async_.get())); + uv_ref(reinterpret_cast(w->on_thread_finished_.GetHandle())); } void Worker::Unref(const FunctionCallbackInfo& args) { Worker* w; ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); - if (w->thread_exit_async_) - uv_unref(reinterpret_cast(w->thread_exit_async_.get())); + uv_unref(reinterpret_cast(w->on_thread_finished_.GetHandle())); } void Worker::Exit(int code) { Mutex::ScopedLock lock(mutex_); - Mutex::ScopedLock stopped_lock(stopped_mutex_); Debug(this, "Worker %llu called Exit(%d)", thread_id_, code); - - if (!stopped_) { - stopped_ = true; + if (!thread_stopper_.IsStopped()) { exit_code_ = code; - if (child_port_ != nullptr) - child_port_->StopEventLoop(); + Debug(this, "Received StopEventLoop request"); + thread_stopper_.Stop(); if (isolate_ != nullptr) isolate_->TerminateExecution(); } diff --git a/src/node_worker.h b/src/node_worker.h index dad0713fd92df2..fb94bdc307e8b6 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -3,14 +3,35 @@ #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS -#include "node_messaging.h" #include +#include "node_messaging.h" +#include "uv.h" namespace node { namespace worker { class WorkerThreadData; +class AsyncRequest : public MemoryRetainer { + public: + AsyncRequest() {} + void Install(Environment* env, void* data, uv_async_cb target); + void Uninstall(); + void Stop(); + void SetStopped(bool flag); + bool IsStopped() const; + uv_async_t* GetHandle(); + void MemoryInfo(MemoryTracker* tracker) const override; + SET_MEMORY_INFO_NAME(AsyncRequest) + SET_SELF_SIZE(AsyncRequest) + + private: + Environment* env_; + uv_async_t* async_ = nullptr; + mutable Mutex mutex_; + bool stop_ = true; +}; + // A worker thread, as represented in its parent thread. class Worker : public AsyncWrap { public: @@ -31,11 +52,9 @@ class Worker : public AsyncWrap { void JoinThread(); void MemoryInfo(MemoryTracker* tracker) const override { - tracker->TrackFieldWithSize( - "isolate_data", sizeof(IsolateData), "IsolateData"); - tracker->TrackFieldWithSize("env", sizeof(Environment), "Environment"); - tracker->TrackField("thread_exit_async", *thread_exit_async_); tracker->TrackField("parent_port", parent_port_); + tracker->TrackInlineField(&thread_stopper_, "thread_stopper_"); + tracker->TrackInlineField(&on_thread_finished_, "on_thread_finished_"); } SET_MEMORY_INFO_NAME(Worker) @@ -67,16 +86,6 @@ class Worker : public AsyncWrap { // This mutex protects access to all variables listed below it. mutable Mutex mutex_; - // Currently only used for telling the parent thread that the child - // thread exited. - std::unique_ptr thread_exit_async_; - bool scheduled_on_thread_stopped_ = false; - - // This mutex only protects stopped_. If both locks are acquired, this needs - // to be the latter one. - mutable Mutex stopped_mutex_; - bool stopped_ = true; - bool thread_joined_ = true; int exit_code_ = 0; uint64_t thread_id_ = -1; @@ -96,6 +105,9 @@ class Worker : public AsyncWrap { // instance refers to it via its [kPort] property. MessagePort* parent_port_ = nullptr; + AsyncRequest thread_stopper_; + AsyncRequest on_thread_finished_; + friend class WorkerThreadData; }; diff --git a/test/pummel/test-heapdump-worker.js b/test/pummel/test-heapdump-worker.js index 06679964a23a1c..2a3c93a7ad3e68 100644 --- a/test/pummel/test-heapdump-worker.js +++ b/test/pummel/test-heapdump-worker.js @@ -9,8 +9,8 @@ const worker = new Worker('setInterval(() => {}, 100);', { eval: true }); validateSnapshotNodes('Node / Worker', [ { children: [ - { node_name: 'Node / uv_async_t', edge_name: 'thread_exit_async' }, - { node_name: 'Node / Environment', edge_name: 'env' }, + { node_name: 'Node / AsyncRequest', edge_name: 'thread_stopper_' }, + { node_name: 'Node / AsyncRequest', edge_name: 'on_thread_finished_' }, { node_name: 'Node / MessagePort', edge_name: 'parent_port' }, { node_name: 'Worker', edge_name: 'wrapped' } ]