Skip to content

Commit

Permalink
capicxx-core-runtime 3.1.12.6
Browse files Browse the repository at this point in the history
  • Loading branch information
lutzbichler committed Jul 22, 2019
1 parent 2fd0625 commit 99ebf34
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 57 deletions.
6 changes: 6 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
Changes
=======

v3.1.12.6
- Fix race condition leading to event delivery after unsubscription

v3.1.12.5
- Initialize valueType in copy ctors of Variant class

v3.1.12.4
- Fix calling of registered subscription status handlers for
selective broadcasts
Expand Down
87 changes: 34 additions & 53 deletions include/CommonAPI/Event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ class Event {
ListenersMap pendingSubscriptions_;
SubscriptionsSet pendingUnsubscriptions_;

std::mutex notificationMutex_;
std::mutex subscriptionMutex_;
std::recursive_mutex mutex_;
std::mutex abi_placeholder_;
};

template<typename ... Arguments_>
Expand All @@ -107,13 +107,14 @@ typename Event<Arguments_...>::Subscription Event<Arguments_...>::subscribe(List
bool isFirstListener;
Listeners listeners;

subscriptionMutex_.lock();
subscription = nextSubscription_++;
isFirstListener = (0 == pendingSubscriptions_.size()) && (pendingUnsubscriptions_.size() == subscriptions_.size());
listener = std::move(listener);
listeners = std::make_tuple(listener, std::move(errorListener));
pendingSubscriptions_[subscription] = std::move(listeners);
subscriptionMutex_.unlock();
{
std::lock_guard<std::recursive_mutex> itsLock(mutex_);
subscription = nextSubscription_++;
isFirstListener = (0 == pendingSubscriptions_.size()) && (pendingUnsubscriptions_.size() == subscriptions_.size());
listener = std::move(listener);
listeners = std::make_tuple(listener, std::move(errorListener));
pendingSubscriptions_[subscription] = std::move(listeners);
}

if (isFirstListener) {
if (!pendingUnsubscriptions_.empty())
Expand All @@ -131,30 +132,31 @@ void Event<Arguments_...>::unsubscribe(const Subscription subscription) {
bool hasUnsubscribed(false);
Listener listener;

subscriptionMutex_.lock();
auto listenerIterator = subscriptions_.find(subscription);
if (subscriptions_.end() != listenerIterator) {
if (pendingUnsubscriptions_.end() == pendingUnsubscriptions_.find(subscription)) {
if (0 == pendingSubscriptions_.erase(subscription)) {
pendingUnsubscriptions_.insert(subscription);
listener = std::get<0>(listenerIterator->second);
hasUnsubscribed = true;
{
std::lock_guard<std::recursive_mutex> itsLock(mutex_);
auto listenerIterator = subscriptions_.find(subscription);
if (subscriptions_.end() != listenerIterator) {
if (pendingUnsubscriptions_.end() == pendingUnsubscriptions_.find(subscription)) {
if (0 == pendingSubscriptions_.erase(subscription)) {
pendingUnsubscriptions_.insert(subscription);
listener = std::get<0>(listenerIterator->second);
hasUnsubscribed = true;
}
isLastListener = (pendingUnsubscriptions_.size() == subscriptions_.size());
}
isLastListener = (pendingUnsubscriptions_.size() == subscriptions_.size());
}
}
else {
listenerIterator = pendingSubscriptions_.find(subscription);
if (pendingSubscriptions_.end() != listenerIterator) {
listener = std::get<0>(listenerIterator->second);
if (0 != pendingSubscriptions_.erase(subscription)) {
isLastListener = (pendingUnsubscriptions_.size() == subscriptions_.size());
hasUnsubscribed = true;
else {
listenerIterator = pendingSubscriptions_.find(subscription);
if (pendingSubscriptions_.end() != listenerIterator) {
listener = std::get<0>(listenerIterator->second);
if (0 != pendingSubscriptions_.erase(subscription)) {
isLastListener = (pendingUnsubscriptions_.size() == subscriptions_.size());
hasUnsubscribed = true;
}
}
}
isLastListener = isLastListener && (0 == pendingSubscriptions_.size());
}
isLastListener = isLastListener && (0 == pendingSubscriptions_.size());
subscriptionMutex_.unlock();

if (hasUnsubscribed) {
onListenerRemoved(listener, subscription);
Expand All @@ -166,8 +168,7 @@ void Event<Arguments_...>::unsubscribe(const Subscription subscription) {

template<typename ... Arguments_>
void Event<Arguments_...>::notifyListeners(const Arguments_&... eventArguments) {
subscriptionMutex_.lock();
notificationMutex_.lock();
std::lock_guard<std::recursive_mutex> itsLock(mutex_);
for (auto iterator = pendingUnsubscriptions_.begin();
iterator != pendingUnsubscriptions_.end();
iterator++) {
Expand All @@ -182,18 +183,14 @@ void Event<Arguments_...>::notifyListeners(const Arguments_&... eventArguments)
}
pendingSubscriptions_.clear();

subscriptionMutex_.unlock();
for (auto iterator = subscriptions_.begin(); iterator != subscriptions_.end(); iterator++) {
(std::get<0>(iterator->second))(eventArguments...);
}

notificationMutex_.unlock();
}

template<typename ... Arguments_>
void Event<Arguments_...>::notifySpecificListener(const Subscription subscription, const Arguments_&... eventArguments) {
subscriptionMutex_.lock();
notificationMutex_.lock();
std::lock_guard<std::recursive_mutex> itsLock(mutex_);
for (auto iterator = pendingUnsubscriptions_.begin();
iterator != pendingUnsubscriptions_.end();
iterator++) {
Expand All @@ -209,22 +206,16 @@ void Event<Arguments_...>::notifySpecificListener(const Subscription subscriptio
}
pendingSubscriptions_.clear();


subscriptionMutex_.unlock();
for (auto iterator = subscriptions_.begin(); iterator != subscriptions_.end(); iterator++) {
if (subscription == iterator->first) {
(std::get<0>(iterator->second))(eventArguments...);
}
}

notificationMutex_.unlock();
}

template<typename ... Arguments_>
void Event<Arguments_...>::notifySpecificError(const Subscription subscription, const CallStatus status) {

subscriptionMutex_.lock();
notificationMutex_.lock();
std::lock_guard<std::recursive_mutex> itsLock(mutex_);
for (auto iterator = pendingUnsubscriptions_.begin();
iterator != pendingUnsubscriptions_.end();
iterator++) {
Expand All @@ -239,7 +230,6 @@ void Event<Arguments_...>::notifySpecificError(const Subscription subscription,
}
pendingSubscriptions_.clear();

subscriptionMutex_.unlock();
for (auto iterator = subscriptions_.begin(); iterator != subscriptions_.end(); iterator++) {
if (subscription == iterator->first) {
ErrorListener listener = std::get<1>(iterator->second);
Expand All @@ -249,10 +239,7 @@ void Event<Arguments_...>::notifySpecificError(const Subscription subscription,
}
}

notificationMutex_.unlock();

if (status != CommonAPI::CallStatus::SUCCESS) {
subscriptionMutex_.lock();
auto listenerIterator = subscriptions_.find(subscription);
if (subscriptions_.end() != listenerIterator) {
if (pendingUnsubscriptions_.end() == pendingUnsubscriptions_.find(subscription)) {
Expand All @@ -267,14 +254,12 @@ void Event<Arguments_...>::notifySpecificError(const Subscription subscription,
pendingSubscriptions_.erase(subscription);
}
}
subscriptionMutex_.unlock();
}
}

template<typename ... Arguments_>
void Event<Arguments_...>::notifyErrorListeners(const CallStatus status) {
subscriptionMutex_.lock();
notificationMutex_.lock();
std::lock_guard<std::recursive_mutex> itsLock(mutex_);
for (auto iterator = pendingUnsubscriptions_.begin();
iterator != pendingUnsubscriptions_.end();
iterator++) {
Expand All @@ -289,16 +274,12 @@ void Event<Arguments_...>::notifyErrorListeners(const CallStatus status) {
}
pendingSubscriptions_.clear();

subscriptionMutex_.unlock();

for (auto iterator = subscriptions_.begin(); iterator != subscriptions_.end(); iterator++) {
ErrorListener listener = std::get<1>(iterator->second);
if (listener) {
listener(status);
}
}

notificationMutex_.unlock();
}


Expand Down
15 changes: 11 additions & 4 deletions include/CommonAPI/Variant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -619,15 +619,18 @@ Variant<Types_...>::Variant()
}

template<typename... Types_>
Variant<Types_...>::Variant(const Variant &_source) {
Variant<Types_...>::Variant(const Variant &_source) :
valueType_(_source.valueType_)
{
AssignmentVisitor<Types_...> visitor(*this, false);
ApplyVoidVisitor<
AssignmentVisitor<Types_...> , Variant<Types_...>, Types_...
>::visit(visitor, _source);
}

template<typename... Types_>
Variant<Types_...>::Variant(Variant &&_source)
Variant<Types_...>::Variant(Variant &&_source) :
valueType_(_source.valueType_)
{
AssignmentVisitor<Types_...> visitor(*this, false);
ApplyVoidVisitor<
Expand Down Expand Up @@ -688,7 +691,9 @@ template<typename Type_>
Variant<Types_...>::Variant(const Type_ &_value,
typename std::enable_if<!std::is_const<Type_>::value>::type*,
typename std::enable_if<!std::is_reference<Type_>::value>::type*,
typename std::enable_if<!std::is_same<Type_, Variant<Types_...>>::value>::type*) {
typename std::enable_if<!std::is_same<Type_, Variant<Types_...>>::value>::type*) :
valueType_(0x0)
{
set<typename TypeSelector<Type_, Types_...>::type>(_value, false);
}

Expand All @@ -697,7 +702,9 @@ template<typename Type_>
Variant<Types_...>::Variant(Type_ &&_value,
typename std::enable_if<!std::is_const<Type_>::value>::type*,
typename std::enable_if<!std::is_reference<Type_>::value>::type*,
typename std::enable_if<!std::is_same<Type_, Variant<Types_...>>::value>::type*) {
typename std::enable_if<!std::is_same<Type_, Variant<Types_...>>::value>::type*) :
valueType_(0x0)
{
set<typename TypeSelector<Type_, Types_...>::type>(std::move(_value), false);
}

Expand Down

0 comments on commit 99ebf34

Please sign in to comment.