Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: use rwlock for sibling group #38783

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1331,7 +1331,7 @@ Maybe<bool> SiblingGroup::Dispatch(
std::shared_ptr<Message> message,
std::string* error) {

Mutex::ScopedLock lock(group_mutex_);
RwLock::ScopedReadLock lock(group_mutex_);

// The source MessagePortData is not part of this group.
if (ports_.find(source) == ports_.end()) {
Expand Down Expand Up @@ -1376,7 +1376,7 @@ void SiblingGroup::Entangle(MessagePortData* port) {
}

void SiblingGroup::Entangle(std::initializer_list<MessagePortData*> ports) {
Mutex::ScopedLock lock(group_mutex_);
RwLock::ScopedWriteLock lock(group_mutex_);
for (MessagePortData* data : ports) {
ports_.insert(data);
CHECK(!data->group_);
Expand All @@ -1386,7 +1386,7 @@ void SiblingGroup::Entangle(std::initializer_list<MessagePortData*> ports) {

void SiblingGroup::Disentangle(MessagePortData* data) {
auto self = shared_from_this(); // Keep alive until end of function.
Mutex::ScopedLock lock(group_mutex_);
RwLock::ScopedWriteLock lock(group_mutex_);
ports_.erase(data);
data->group_.reset();

Expand Down
4 changes: 2 additions & 2 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ class SiblingGroup final : public std::enable_shared_from_this<SiblingGroup> {
size_t size() const { return ports_.size(); }

private:
std::string name_;
const std::string name_;
RwLock group_mutex_; // Protects ports_.
std::set<MessagePortData*> ports_;
Mutex group_mutex_;

static void CheckSiblingGroup(const std::string& name);

Expand Down
76 changes: 76 additions & 0 deletions src/node_mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ namespace node {
template <typename Traits> class ConditionVariableBase;
template <typename Traits> class MutexBase;
struct LibuvMutexTraits;
struct LibuvRwlockTraits;

using ConditionVariable = ConditionVariableBase<LibuvMutexTraits>;
using Mutex = MutexBase<LibuvMutexTraits>;
using RwLock = MutexBase<LibuvRwlockTraits>;

template <typename T, typename MutexT = Mutex>
class ExclusiveAccess {
Expand Down Expand Up @@ -70,6 +72,8 @@ class MutexBase {
inline ~MutexBase();
inline void Lock();
inline void Unlock();
inline void RdLock();
inline void RdUnlock();

MutexBase(const MutexBase&) = delete;
MutexBase& operator=(const MutexBase&) = delete;
Expand All @@ -92,6 +96,21 @@ class MutexBase {
const MutexBase& mutex_;
};

class ScopedReadLock {
public:
inline explicit ScopedReadLock(const MutexBase& mutex);
inline ~ScopedReadLock();

ScopedReadLock(const ScopedReadLock&) = delete;
ScopedReadLock& operator=(const ScopedReadLock&) = delete;

private:
template <typename> friend class ConditionVariableBase;
const MutexBase& mutex_;
};

using ScopedWriteLock = ScopedLock;

class ScopedUnlock {
public:
inline explicit ScopedUnlock(const ScopedLock& scoped_lock);
Expand Down Expand Up @@ -167,6 +186,42 @@ struct LibuvMutexTraits {
static inline void mutex_unlock(MutexT* mutex) {
uv_mutex_unlock(mutex);
}

static inline void mutex_rdlock(MutexT* mutex) {
uv_mutex_lock(mutex);
}

static inline void mutex_rdunlock(MutexT* mutex) {
uv_mutex_unlock(mutex);
}
};

struct LibuvRwlockTraits {
using MutexT = uv_rwlock_t;

static inline int mutex_init(MutexT* mutex) {
return uv_rwlock_init(mutex);
}

static inline void mutex_destroy(MutexT* mutex) {
uv_rwlock_destroy(mutex);
}

static inline void mutex_lock(MutexT* mutex) {
uv_rwlock_wrlock(mutex);
}

static inline void mutex_unlock(MutexT* mutex) {
uv_rwlock_wrunlock(mutex);
}

static inline void mutex_rdlock(MutexT* mutex) {
uv_rwlock_rdlock(mutex);
}

static inline void mutex_rdunlock(MutexT* mutex) {
uv_rwlock_rdunlock(mutex);
}
};

template <typename Traits>
Expand Down Expand Up @@ -214,6 +269,16 @@ void MutexBase<Traits>::Unlock() {
Traits::mutex_unlock(&mutex_);
}

template <typename Traits>
void MutexBase<Traits>::RdLock() {
Traits::mutex_rdlock(&mutex_);
}

template <typename Traits>
void MutexBase<Traits>::RdUnlock() {
Traits::mutex_rdunlock(&mutex_);
}

template <typename Traits>
MutexBase<Traits>::ScopedLock::ScopedLock(const MutexBase& mutex)
: mutex_(mutex) {
Expand All @@ -229,6 +294,17 @@ MutexBase<Traits>::ScopedLock::~ScopedLock() {
Traits::mutex_unlock(&mutex_.mutex_);
}

template <typename Traits>
MutexBase<Traits>::ScopedReadLock::ScopedReadLock(const MutexBase& mutex)
: mutex_(mutex) {
Traits::mutex_rdlock(&mutex_.mutex_);
}

template <typename Traits>
MutexBase<Traits>::ScopedReadLock::~ScopedReadLock() {
Traits::mutex_rdunlock(&mutex_.mutex_);
}

template <typename Traits>
MutexBase<Traits>::ScopedUnlock::ScopedUnlock(const ScopedLock& scoped_lock)
: mutex_(scoped_lock.mutex_) {
Expand Down