Skip to content

Commit

Permalink
backport: Add support for listener callbacks (#76) (d4330cc)
Browse files Browse the repository at this point in the history
* Add support for listener callbacks.

* Fix wrong debug assertion when converting DDS_Duration values

* Clarify interactions between count_unread_samples() and take_next()

* Notify on changed matched status

Signed-off-by: Christopher Wecht <[email protected]>
Signed-off-by: Andrea Sorbini <[email protected]>
Signed-off-by: Taxo Rubio <[email protected]>
  • Loading branch information
asorbini authored and trubio-rti committed Aug 5, 2024
1 parent 94bde75 commit 1435ce3
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 41 deletions.
5 changes: 5 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ rmw_ret_t
rmw_connextdds_take_samples(
RMW_Connext_Subscriber * const sub);

rmw_ret_t
rmw_connextdds_count_unread_samples(
RMW_Connext_Subscriber * const sub,
size_t & unread_count);

rmw_ret_t
rmw_connextdds_return_samples(
RMW_Connext_Subscriber * const sub);
Expand Down
18 changes: 9 additions & 9 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_event_set_callback(
rmw_event_t * event,
rmw_event_callback_t callback,
const void * user_data);
const rmw_event_callback_t callback,
const void * const user_data);

/*****************************************************************************
* Info API
Expand Down Expand Up @@ -437,15 +437,15 @@ RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_service_set_on_new_request_callback(
rmw_service_t * rmw_service,
rmw_event_callback_t callback,
const void * user_data);
const rmw_event_callback_t callback,
const void * const user_data);

RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_client_set_on_new_response_callback(
rmw_client_t * rmw_client,
rmw_event_callback_t callback,
const void * user_data);
const rmw_event_callback_t callback,
const void * const user_data);

/*****************************************************************************
* Subscription API
Expand Down Expand Up @@ -574,9 +574,9 @@ rmw_api_connextdds_return_loaned_message_from_subscription(
RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_subscription_set_on_new_message_callback(
rmw_subscription_t * rmw_subscription,
rmw_event_callback_t callback,
const void * user_data);
rmw_subscription_t * const rmw_subscription,
const rmw_event_callback_t callback,
const void * const user_data);

/*****************************************************************************
* WaitSet API
Expand Down
24 changes: 24 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,30 @@ class RMW_Connext_Subscriber
return has_data;
}

rmw_ret_t
count_unread_samples(size_t & unread_count)
{
// The action of counting unread samples is not currently mutually exclusive
// with the action of taking samples out of the reader cache. Unfortunately
// we cannot use a mutex to synchronize calls between
// count_unread_samples() and and take_next() because we would run the risk
// of a deadlock. This is because count_unread_samples() is supposed to be
// called from within the reader's "exclusive area" (since it is
// executed within a listener callback), while take_next() is usually called
// from an executor/application thread and it must acquire the reader's
// "exclusive area" before taking samples.
// This might mean that an application which relies on data callbacks and the
// count provided by this function, and which at the same time polls data
// on the subscription, might end up missing some samples in the total count
// notified to it via callback because the samples were taken out of the
// cache before they could be notified to the listener.
// Fortunately, in Connext the listener callback is notified right after a
// sample is added to the reader cache and before any mutex is released,
// which should allow this function to always report a correct number, as
// long as it is only called within a (DDS) listener callback.
return rmw_connextdds_count_unread_samples(this, unread_count);
}

DDS_Subscriber * dds_subscriber() const
{
return DDS_DataReader_get_subscriber(this->dds_reader);
Expand Down
89 changes: 89 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define RMW_CONNEXTDDS__RMW_WAITSET_STD_HPP_

#include "rmw_connextdds/context.hpp"
#include <condition_variable>

/******************************************************************************
* Alternative implementation of WaitSets and Conditions using C++ std
Expand Down Expand Up @@ -121,6 +122,22 @@ class RMW_Connext_Condition
}
}

template<typename FunctorT, typename FunctorA>
void
perform_action_and_update_state(FunctorA && action, FunctorT && update_condition)
{
std::lock_guard<std::mutex> internal_lock(this->mutex_internal);

action();

if (nullptr != this->waitset_mutex) {
std::lock_guard<std::mutex> lock(*this->waitset_mutex);
update_condition();
} else {
update_condition();
}
}

protected:
std::mutex mutex_internal;
std::mutex * waitset_mutex;
Expand Down Expand Up @@ -333,8 +350,55 @@ class RMW_Connext_StatusCondition : public RMW_Connext_Condition
virtual bool
has_status(const rmw_event_type_t event_type) = 0;

void
notify_new_event(rmw_event_type_t event_type)
{
std::unique_lock<std::mutex> lock_mutex(new_event_mutex_);
if (new_event_cb_[event_type]) {
new_event_cb_[event_type](user_data_[event_type], 1);
} else {
unread_events_count_[event_type]++;
}
}

void
set_new_event_callback(
rmw_event_type_t event_type,
rmw_event_callback_t callback,
const void * user_data)
{
std::unique_lock<std::mutex> lock_mutex(new_event_mutex_);

if (callback) {
// Push events arrived before setting the executor's callback
if (unread_events_count_[event_type] > 0) {
callback(user_data, unread_events_count_[event_type]);
unread_events_count_[event_type] = 0;
}
user_data_[event_type] = user_data;
new_event_cb_[event_type] = callback;
} else {
user_data_[event_type] = nullptr;
new_event_cb_[event_type] = nullptr;
}
}

void
on_inconsistent_topic(const struct DDS_InconsistentTopicStatus * status);

void
update_status_inconsistent_topic(const struct DDS_InconsistentTopicStatus * status);

protected:
DDS_StatusCondition * scond;
std::mutex new_event_mutex_;
rmw_event_callback_t new_event_cb_[RMW_EVENT_INVALID] = {};
const void * user_data_[RMW_EVENT_INVALID] = {};
uint64_t unread_events_count_[RMW_EVENT_INVALID] = {0};

bool triggered_inconsistent_topic{false};

struct DDS_InconsistentTopicStatus status_inconsistent_topic;
};

void
Expand Down Expand Up @@ -712,6 +776,26 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
return RMW_RET_OK;
}

void set_on_new_data_callback(
const rmw_event_callback_t callback,
const void * const user_data)
{
std::unique_lock<std::mutex> lock(new_data_event_mutex_);
if (callback) {
if (unread_data_events_count_ > 0) {
callback(user_data, unread_data_events_count_);
unread_data_events_count_ = 0;
}
new_data_event_cb_ = callback;
data_event_user_data_ = user_data;
} else {
new_data_event_cb_ = nullptr;
data_event_user_data_ = nullptr;
}
}

void notify_new_data();

protected:
void update_status_deadline(
const DDS_RequestedDeadlineMissedStatus * const status);
Expand Down Expand Up @@ -745,6 +829,11 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition

RMW_Connext_Subscriber * sub;

std::mutex new_data_event_mutex_;
rmw_event_callback_t new_data_event_cb_{nullptr};
const void * data_event_user_data_{nullptr};
uint64_t unread_data_events_count_ = 0;

friend class RMW_Connext_WaitSet;
};

Expand Down
37 changes: 37 additions & 0 deletions rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ RMW_Connext_DataReaderListener_on_data_available(

UNUSED_ARG(reader);

self->notify_new_data();
self->set_data_available(true);
}

Expand Down Expand Up @@ -706,6 +707,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_deadline(

this->status_deadline.total_count_change = this->status_deadline.total_count;
this->status_deadline.total_count_change -= this->status_deadline_last.total_count;

this->notify_new_event(RMW_EVENT_REQUESTED_DEADLINE_MISSED);
}

void
Expand All @@ -720,6 +723,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_liveliness(
this->status_liveliness.alive_count_change -= this->status_liveliness_last.alive_count;
this->status_liveliness.not_alive_count_change -=
this->status_liveliness_last.not_alive_count;

this->notify_new_event(RMW_EVENT_LIVELINESS_CHANGED);
}

void
Expand All @@ -731,6 +736,7 @@ RMW_Connext_SubscriberStatusCondition::update_status_qos(

this->status_qos.total_count_change = this->status_qos.total_count;
this->status_qos.total_count_change -= this->status_qos_last.total_count;
this->notify_new_event(RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE);
}

void
Expand All @@ -743,6 +749,31 @@ RMW_Connext_SubscriberStatusCondition::update_status_sample_lost(
this->status_sample_lost.total_count_change = this->status_sample_lost.total_count;
this->status_sample_lost.total_count_change -=
this->status_sample_lost_last.total_count;
this->notify_new_event(RMW_EVENT_MESSAGE_LOST);
}

void
RMW_Connext_SubscriberStatusCondition::notify_new_data()
{
size_t unread_samples = 0;
std::unique_lock<std::mutex> lock_mutex(new_data_event_mutex_);
perform_action_and_update_state(
[this, &unread_samples]() {
const rmw_ret_t rc = this->sub->count_unread_samples(unread_samples);
if (RMW_RET_OK != rc) {
RMW_CONNEXT_LOG_ERROR("failed to count unread samples on DDS Reader")
}
},
[this, &unread_samples]() {
if (unread_samples == 0) {
return;
}
if (new_data_event_cb_) {
new_data_event_cb_(data_event_user_data_, unread_samples);
} else {
unread_data_events_count_ += unread_samples;
}
});
}

rmw_ret_t
Expand Down Expand Up @@ -852,6 +883,8 @@ RMW_Connext_PublisherStatusCondition::update_status_deadline(

this->status_deadline.total_count_change = this->status_deadline.total_count;
this->status_deadline.total_count_change -= this->status_deadline_last.total_count;

this->notify_new_event(RMW_EVENT_OFFERED_DEADLINE_MISSED);
}

void
Expand All @@ -863,6 +896,8 @@ RMW_Connext_PublisherStatusCondition::update_status_liveliness(

this->status_liveliness.total_count_change = this->status_liveliness.total_count;
this->status_liveliness.total_count_change -= this->status_liveliness_last.total_count;

this->notify_new_event(RMW_EVENT_LIVELINESS_CHANGED);
}

void
Expand All @@ -874,4 +909,6 @@ RMW_Connext_PublisherStatusCondition::update_status_qos(

this->status_qos.total_count_change = this->status_qos.total_count;
this->status_qos.total_count_change -= this->status_qos_last.total_count;

this->notify_new_event(RMW_EVENT_OFFERED_QOS_INCOMPATIBLE);
}
Loading

0 comments on commit 1435ce3

Please sign in to comment.