Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: refactor thread life cycle management #26099

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -584,13 +584,6 @@ void MessagePort::OnMessage() {
// Get the head of the message queue.
Mutex::ScopedLock lock(data_->mutex_);

if (stop_event_loop_) {
Debug(this, "MessagePort stops loop as requested");
CHECK(!data_->receiving_messages_);
uv_stop(env()->event_loop());
break;
}

Debug(this, "MessagePort has message, receiving = %d",
static_cast<int>(data_->receiving_messages_));

Expand Down Expand Up @@ -740,15 +733,6 @@ void MessagePort::Stop() {
data_->receiving_messages_ = false;
}

void MessagePort::StopEventLoop() {
Mutex::ScopedLock lock(data_->mutex_);
data_->receiving_messages_ = false;
stop_event_loop_ = true;

Debug(this, "Received StopEventLoop request");
TriggerAsync();
}

void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
MessagePort* port;
Expand Down
4 changes: 0 additions & 4 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,6 @@ class MessagePort : public HandleWrap {
void Start();
// Stop processing messages on this port as a receiving end.
void Stop();
// Stop processing messages on this port as a receiving end,
// and stop the event loop that this port is associated with.
void StopEventLoop();

static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand Down Expand Up @@ -206,7 +203,6 @@ class MessagePort : public HandleWrap {
inline uv_async_t* async();

std::unique_ptr<MessagePortData> data_ = nullptr;
bool stop_event_loop_ = false;

friend class MessagePortData;
};
Expand Down
137 changes: 75 additions & 62 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,46 @@ void WaitForWorkerInspectorToStop(Environment* child) {

} // anonymous namespace

void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
Mutex::ScopedLock lock(mutex_);
env_ = env;
async_ = new uv_async_t;
if (data != nullptr) async_->data = data;
CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
}

void AsyncRequest::Uninstall() {
Mutex::ScopedLock lock(mutex_);
if (async_ != nullptr)
env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
}

void AsyncRequest::Stop() {
Mutex::ScopedLock lock(mutex_);
stop_ = true;
if (async_ != nullptr) uv_async_send(async_);
}

void AsyncRequest::SetStopped(bool flag) {
Mutex::ScopedLock lock(mutex_);
stop_ = flag;
}

bool AsyncRequest::IsStopped() const {
Mutex::ScopedLock lock(mutex_);
return stop_;
}

uv_async_t* AsyncRequest::GetHandle() {
Mutex::ScopedLock lock(mutex_);
return async_;
}

void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
Mutex::ScopedLock lock(mutex_);
if (async_ != nullptr) tracker->TrackField("async_request", *async_);
}

Worker::Worker(Environment* env,
Local<Object> wrap,
const std::string& url,
Expand Down Expand Up @@ -98,8 +138,7 @@ Worker::Worker(Environment* env,
}

bool Worker::is_stopped() const {
Mutex::ScopedLock stopped_lock(stopped_mutex_);
return stopped_;
return thread_stopper_.IsStopped();
}

// This class contains data that is only relevant to the child thread itself,
Expand Down Expand Up @@ -207,6 +246,8 @@ void Worker::Run() {
Context::Scope context_scope(env_->context());
if (child_port != nullptr)
child_port->Close();
thread_stopper_.Uninstall();
thread_stopper_.SetStopped(true);
env_->stop_sub_worker_contexts();
env_->RunCleanup();
RunAtExit(env_.get());
Expand All @@ -215,23 +256,19 @@ void Worker::Run() {
WaitForWorkerInspectorToStop(env_.get());
#endif

{
Mutex::ScopedLock stopped_lock(stopped_mutex_);
stopped_ = true;
}

// This call needs to be made while the `Environment` is still alive
// because we assume that it is available for async tracking in the
// NodePlatform implementation.
platform_->DrainTasks(isolate_);
}
});

if (thread_stopper_.IsStopped()) return;
{
HandleScope handle_scope(isolate_);
Local<Context> context = NewContext(isolate_);
if (is_stopped()) return;

if (thread_stopper_.IsStopped()) return;
CHECK(!context.IsEmpty());
Context::Scope context_scope(context);
{
Expand All @@ -253,6 +290,14 @@ void Worker::Run() {
Debug(this, "Created Environment for worker with id %llu", thread_id_);

if (is_stopped()) return;
thread_stopper_.Install(env_.get(), env_.get(), [](uv_async_t* handle) {
Environment* env_ = static_cast<Environment*>(handle->data);
uv_stop(env_->event_loop());
});
uv_unref(reinterpret_cast<uv_handle_t*>(thread_stopper_.GetHandle()));

Debug(this, "Created Environment for worker with id %llu", thread_id_);
if (thread_stopper_.IsStopped()) return;
{
HandleScope handle_scope(isolate_);
Mutex::ScopedLock lock(mutex_);
Expand All @@ -268,7 +313,7 @@ void Worker::Run() {
Debug(this, "Created message port for worker %llu", thread_id_);
}

if (is_stopped()) return;
if (thread_stopper_.IsStopped()) return;
{
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
StartWorkerInspector(env_.get(),
Expand All @@ -289,22 +334,21 @@ void Worker::Run() {
Debug(this, "Loaded environment for worker %llu", thread_id_);
}

if (is_stopped()) return;
if (thread_stopper_.IsStopped()) return;
{
SealHandleScope seal(isolate_);
bool more;
env_->performance_state()->Mark(
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
do {
if (is_stopped()) break;
if (thread_stopper_.IsStopped()) break;
uv_run(&data.loop_, UV_RUN_DEFAULT);
if (is_stopped()) break;
if (thread_stopper_.IsStopped()) break;

platform_->DrainTasks(isolate_);

more = uv_loop_alive(&data.loop_);
if (more && !is_stopped())
continue;
if (more && !thread_stopper_.IsStopped()) continue;

EmitBeforeExit(env_.get());

Expand All @@ -319,7 +363,7 @@ void Worker::Run() {

{
int exit_code;
bool stopped = is_stopped();
bool stopped = thread_stopper_.IsStopped();
if (!stopped)
exit_code = EmitExit(env_.get());
Mutex::ScopedLock lock(mutex_);
Expand All @@ -341,34 +385,11 @@ void Worker::JoinThread() {
thread_joined_ = true;

env()->remove_sub_worker_context(this);

if (thread_exit_async_) {
env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) {
delete async;
});

if (scheduled_on_thread_stopped_)
OnThreadStopped();
}
OnThreadStopped();
on_thread_finished_.Uninstall();
}

void Worker::OnThreadStopped() {
{
Mutex::ScopedLock lock(mutex_);
scheduled_on_thread_stopped_ = false;

Debug(this, "Worker %llu thread stopped", thread_id_);

{
Mutex::ScopedLock stopped_lock(stopped_mutex_);
CHECK(stopped_);
}

parent_port_ = nullptr;
}

JoinThread();

{
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());
Expand All @@ -391,7 +412,7 @@ Worker::~Worker() {
Mutex::ScopedLock lock(mutex_);
JoinThread();

CHECK(stopped_);
CHECK(thread_stopper_.IsStopped());
CHECK(thread_joined_);

// This has most likely already happened within the worker thread -- this
Expand Down Expand Up @@ -480,16 +501,15 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
Mutex::ScopedLock lock(w->mutex_);

w->env()->add_sub_worker_context(w);
w->stopped_ = false;
w->thread_joined_ = false;
w->thread_stopper_.SetStopped(false);

w->thread_exit_async_.reset(new uv_async_t);
w->thread_exit_async_->data = w;
CHECK_EQ(uv_async_init(w->env()->event_loop(),
w->thread_exit_async_.get(),
[](uv_async_t* handle) {
static_cast<Worker*>(handle->data)->OnThreadStopped();
}), 0);
w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) {
Worker* w_ = static_cast<Worker*>(handle->data);
CHECK(w_->thread_stopper_.IsStopped());
w_->parent_port_ = nullptr;
w_->JoinThread();
});

uv_thread_options_t thread_options;
thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
Expand All @@ -505,9 +525,7 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
w->Run();

Mutex::ScopedLock lock(w->mutex_);
CHECK(w->thread_exit_async_);
w->scheduled_on_thread_stopped_ = true;
uv_async_send(w->thread_exit_async_.get());
w->on_thread_finished_.Stop();
}, static_cast<void*>(w)), 0);
}

Expand All @@ -523,28 +541,23 @@ void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
if (w->thread_exit_async_)
uv_ref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
uv_ref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
}

void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
if (w->thread_exit_async_)
uv_unref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
uv_unref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
}

void Worker::Exit(int code) {
Mutex::ScopedLock lock(mutex_);
Mutex::ScopedLock stopped_lock(stopped_mutex_);

Debug(this, "Worker %llu called Exit(%d)", thread_id_, code);

if (!stopped_) {
stopped_ = true;
if (!thread_stopper_.IsStopped()) {
exit_code_ = code;
if (child_port_ != nullptr)
child_port_->StopEventLoop();
Debug(this, "Received StopEventLoop request");
thread_stopper_.Stop();
if (isolate_ != nullptr)
isolate_->TerminateExecution();
}
Expand Down
42 changes: 27 additions & 15 deletions src/node_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,35 @@

#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "node_messaging.h"
#include <unordered_map>
#include "node_messaging.h"
#include "uv.h"

namespace node {
namespace worker {

class WorkerThreadData;

class AsyncRequest : public MemoryRetainer {
public:
AsyncRequest() {}
void Install(Environment* env, void* data, uv_async_cb target);
void Uninstall();
void Stop();
void SetStopped(bool flag);
bool IsStopped() const;
uv_async_t* GetHandle();
void MemoryInfo(MemoryTracker* tracker) const override;
SET_MEMORY_INFO_NAME(AsyncRequest)
SET_SELF_SIZE(AsyncRequest)

private:
Environment* env_;
uv_async_t* async_ = nullptr;
mutable Mutex mutex_;
bool stop_ = true;
};
addaleax marked this conversation as resolved.
Show resolved Hide resolved

// A worker thread, as represented in its parent thread.
class Worker : public AsyncWrap {
public:
Expand All @@ -31,11 +52,9 @@ class Worker : public AsyncWrap {
void JoinThread();

void MemoryInfo(MemoryTracker* tracker) const override {
tracker->TrackFieldWithSize(
"isolate_data", sizeof(IsolateData), "IsolateData");
tracker->TrackFieldWithSize("env", sizeof(Environment), "Environment");
tracker->TrackField("thread_exit_async", *thread_exit_async_);
addaleax marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We’re not tracking the uv_async_t anymore, right? Maybe we should add something like tracker->TrackInlineField() that allows us to keep track of MemoryRetainers that are direct members of the class…

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean - the one represented by thread_exit_async_ ? that is replaced withAsyncRequest objects that creates uv_async_t objects, and tracks through the interface method. the async_ field in AsyncRequest is still a pointer, direct member of neither AsyncRequest nor Worker.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neither *thread_stopper_.async_ nor *on_thread_finished_.async_ are tracked, yes, because we don’t inform the tracker about the existence of the AsyncRequest fields.

Also, side note: I’m just noticing that we have the IsolateData and Environment fields listed here as well, which I’m not sure makes sense given that they are no longer directly allocated by this object…

Copy link
Member Author

@gireeshpunathil gireeshpunathil Feb 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I don't understand. *thread_stopper_.async and *on_thread_finished_.async_ are not tracked through the tracker instance or of worker, but those are tracked through the tracker instance of AsyncRequest object (line 98):

void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
  Mutex::ScopedLock lock(mutex_);
  if (async_ != nullptr) tracker->TrackField("async_request", *async_);
}

Isn't it enough? I hope we don't need multiple trackers for the same allocation?

For the IsolateData and Environment: I just removed those from being actively tracked by the worker and pushed in under this PR itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gireeshpunathil The problem is that the memory tracker doesn’t know that it should call AsyncRequest::MemoryInfo. Currently, the way to inform it would be adding tracker->TrackField("thread_stopper_", &thread_stopper_);, but then we would end up tracking the memory for the AsyncRequest itself twice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gireeshpunathil Should we change this PR to use TrackInlineField now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@addaleax - yes. Though I knew this depend on #26161 for a moment I forgot about that!

tracker->TrackField("parent_port", parent_port_);
tracker->TrackInlineField(&thread_stopper_, "thread_stopper_");
tracker->TrackInlineField(&on_thread_finished_, "on_thread_finished_");
}

SET_MEMORY_INFO_NAME(Worker)
Expand Down Expand Up @@ -67,16 +86,6 @@ class Worker : public AsyncWrap {
// This mutex protects access to all variables listed below it.
mutable Mutex mutex_;

// Currently only used for telling the parent thread that the child
// thread exited.
std::unique_ptr<uv_async_t> thread_exit_async_;
bool scheduled_on_thread_stopped_ = false;

// This mutex only protects stopped_. If both locks are acquired, this needs
// to be the latter one.
mutable Mutex stopped_mutex_;
bool stopped_ = true;

bool thread_joined_ = true;
int exit_code_ = 0;
uint64_t thread_id_ = -1;
Expand All @@ -96,6 +105,9 @@ class Worker : public AsyncWrap {
// instance refers to it via its [kPort] property.
MessagePort* parent_port_ = nullptr;

AsyncRequest thread_stopper_;
AsyncRequest on_thread_finished_;

friend class WorkerThreadData;
};

Expand Down
Loading