Skip to content

Commit

Permalink
Fix notification lost (#3195)
Browse files Browse the repository at this point in the history
* fix notification lost

Signed-off-by: Chen Lihui <[email protected]>

* add a regression test

Signed-off-by: Chen Lihui <[email protected]>

* rename a variable name and update comments

Signed-off-by: Chen Lihui <[email protected]>

* fix uncrustify issue

Signed-off-by: Chen Lihui <[email protected]>

* make the regression test better

Signed-off-by: Chen Lihui <[email protected]>

* fix uncrustify

Signed-off-by: Chen Lihui <[email protected]>

* Refs #16192. Fix deadlock on WaitSetImpl.

Signed-off-by: Miguel Company <[email protected]>

Signed-off-by: Chen Lihui <[email protected]>
Signed-off-by: Miguel Company <[email protected]>
Co-authored-by: Chen Lihui <[email protected]>
  • Loading branch information
MiguelCompany and Chen Lihui authored Jan 10, 2023
1 parent dfe8204 commit 0cf75bd
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 4 deletions.
15 changes: 12 additions & 3 deletions src/cpp/fastdds/core/condition/WaitSetImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,16 @@ namespace detail {

WaitSetImpl::~WaitSetImpl()
{
std::lock_guard<std::mutex> guard(mutex_);
for (const Condition* c : entries_)
eprosima::utilities::collections::unordered_vector<const Condition*> old_entries;

{
// We only need to protect access to the collection.
std::lock_guard<std::mutex> guard(mutex_);
old_entries = entries_;
entries_.clear();
}

for (const Condition* c : old_entries)
{
c->get_notifier()->detach_from(this);
}
Expand Down Expand Up @@ -68,7 +76,7 @@ ReturnCode_t WaitSetImpl::attach_condition(
// Should wake_up when adding a new triggered condition
if (is_waiting_ && condition.get_trigger_value())
{
wake_up();
cond_.notify_one();
}
}
}
Expand Down Expand Up @@ -156,6 +164,7 @@ ReturnCode_t WaitSetImpl::get_conditions(

void WaitSetImpl::wake_up()
{
std::lock_guard<std::mutex> guard(mutex_);
cond_.notify_one();
}

Expand Down
71 changes: 70 additions & 1 deletion test/unittest/dds/core/condition/WaitSetImplTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

#include <algorithm>
#include <future>
#include <thread>

#include <gtest/gtest.h>

Expand All @@ -33,7 +35,7 @@ class TestCondition : public Condition
{
public:

bool trigger_value = false;
volatile bool trigger_value = false;

bool get_trigger_value() const override
{
Expand Down Expand Up @@ -197,6 +199,73 @@ TEST(WaitSetImplTests, wait)
}
}

TEST(WaitSetImplTests, fix_wait_notification_lost)
{
ConditionSeq conditions;
WaitSetImpl wait_set;

// Waiting should return the added connection after the trigger value is updated and the wait_set waken.
{
TestCondition triggered_condition;

// Expecting calls on the notifier of triggered_condition.
auto notifier = triggered_condition.get_notifier();
EXPECT_CALL(*notifier, attach_to(_)).Times(1);
EXPECT_CALL(*notifier, will_be_deleted(_)).Times(1);

class AnotherTestCondition : public Condition
{
public:

bool get_trigger_value() const override
{
// Time to simulate thread context switch or something else
std::this_thread::sleep_for(std::chrono::seconds(2));
return false;
}

}
second_simulator_condition;

// Expecting calls on the notifier of second_simulator_condition.
notifier = second_simulator_condition.get_notifier();
EXPECT_CALL(*notifier, attach_to(_)).Times(1);
EXPECT_CALL(*notifier, will_be_deleted(_)).Times(1);

wait_set.attach_condition(triggered_condition);
wait_set.attach_condition(second_simulator_condition);

std::promise<void> promise;
std::future<void> future = promise.get_future();
ReturnCode_t ret = ReturnCode_t::RETCODE_ERROR;
std::thread wait_conditions([&]()
{
// Not to use `WaitSetImpl::wait` with a timeout value, because the
// `condition_variable::wait_for` could call _Predicate function again.
ret = wait_set.wait(conditions, eprosima::fastrtps::c_TimeInfinite);
promise.set_value();
});

// One second sleep to make the `wait_set.wait` check `triggered_condition` in the above thread
std::this_thread::sleep_for(std::chrono::seconds(1));
triggered_condition.trigger_value = true;
wait_set.wake_up();

// Expecting get notification after wake_up, otherwise output error within 5 seconds.
future.wait_for(std::chrono::seconds(5));
EXPECT_EQ(ReturnCode_t::RETCODE_OK, ret);
EXPECT_EQ(1u, conditions.size());
EXPECT_NE(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &triggered_condition));

// Wake up the `wait_set` to make sure the thread exit
wait_set.wake_up();
wait_conditions.join();

wait_set.will_be_deleted(triggered_condition);
wait_set.will_be_deleted(second_simulator_condition);
}
}

int main(
int argc,
char** argv)
Expand Down

0 comments on commit 0cf75bd

Please sign in to comment.