Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for listener callbacks #76

Merged
merged 8 commits into from
Jun 14, 2023
5 changes: 5 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ rmw_ret_t
rmw_connextdds_take_samples(
RMW_Connext_Subscriber * const sub);

rmw_ret_t
rmw_connextdds_count_unread_samples(
RMW_Connext_Subscriber * const sub,
size_t & unread_count);

rmw_ret_t
rmw_connextdds_return_samples(
RMW_Connext_Subscriber * const sub);
Expand Down
18 changes: 9 additions & 9 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_event_set_callback(
rmw_event_t * event,
rmw_event_callback_t callback,
const void * user_data);
const rmw_event_callback_t callback,
const void * const user_data);

/*****************************************************************************
* Info API
Expand Down Expand Up @@ -443,15 +443,15 @@ RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_service_set_on_new_request_callback(
rmw_service_t * rmw_service,
rmw_event_callback_t callback,
const void * user_data);
const rmw_event_callback_t callback,
const void * const user_data);

RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_client_set_on_new_response_callback(
rmw_client_t * rmw_client,
rmw_event_callback_t callback,
const void * user_data);
const rmw_event_callback_t callback,
const void * const user_data);

/*****************************************************************************
* Subscription API
Expand Down Expand Up @@ -580,9 +580,9 @@ rmw_api_connextdds_return_loaned_message_from_subscription(
RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_subscription_set_on_new_message_callback(
rmw_subscription_t * rmw_subscription,
rmw_event_callback_t callback,
const void * user_data);
rmw_subscription_t * const rmw_subscription,
const rmw_event_callback_t callback,
const void * const user_data);

/*****************************************************************************
* WaitSet API
Expand Down
24 changes: 24 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,30 @@ class RMW_Connext_Subscriber
return has_data;
}

rmw_ret_t
count_unread_samples(size_t & unread_count)
{
// The action of counting unread samples is not currently mutually exclusive
// with the action of taking samples out of the reader cache. Unfortunately
// we cannot use a mutex to synchronize calls between
// count_unread_samples() and and take_next() because we would run the risk
// of a deadlock. This is because count_unread_samples() is supposed to be
// called from within the reader's "exclusive area" (since it is
// executed within a listener callback), while take_next() is usually called
// from an executor/application thread and it must acquire the reader's
// "exclusive area" before taking samples.
// This might mean that an application which relies on data callbacks and the
// count provided by this function, and which at the same time polls data
// on the subscription, might end up missing some samples in the total count
// notified to it via callback because the samples were taken out of the
// cache before they could be notified to the listener.
// Fortunately, in Connext the listener callback is notified right after a
// sample is added to the reader cache and before any mutex is released,
// which should allow this function to always report a correct number, as
// long as it is only called within a (DDS) listener callback.
return rmw_connextdds_count_unread_samples(this, unread_count);
}

DDS_Subscriber * dds_subscriber() const
{
return DDS_DataReader_get_subscriber(this->dds_reader);
Expand Down
74 changes: 74 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,22 @@ class RMW_Connext_Condition
}
}

template<typename FunctorT, typename FunctorA>
void
perform_action_and_update_state(FunctorA && action, FunctorT && update_condition)
{
std::lock_guard<std::mutex> internal_lock(this->mutex_internal);

action();

if (nullptr != this->waitset_mutex) {
std::lock_guard<std::mutex> lock(*this->waitset_mutex);
update_condition();
} else {
update_condition();
}
}

protected:
std::mutex mutex_internal;
std::mutex * waitset_mutex;
Expand Down Expand Up @@ -335,6 +351,36 @@ class RMW_Connext_StatusCondition : public RMW_Connext_Condition
has_status(const rmw_event_type_t event_type) = 0;

void
notify_new_event(rmw_event_type_t event_type)
{
std::unique_lock<std::mutex> lock_mutex(new_event_mutex_);
if (new_event_cb_[event_type]) {
new_event_cb_[event_type](user_data_[event_type], 1);
} else {
unread_events_count_[event_type]++;
}
}

void
set_new_event_callback(
rmw_event_type_t event_type,
rmw_event_callback_t callback,
const void * user_data)
{
std::unique_lock<std::mutex> lock_mutex(new_event_mutex_);

if (callback) {
// Push events arrived before setting the executor's callback
if (unread_events_count_[event_type] > 0) {
callback(user_data, unread_events_count_[event_type]);
unread_events_count_[event_type] = 0;
}
user_data_[event_type] = user_data;
new_event_cb_[event_type] = callback;
} else {
user_data_[event_type] = nullptr;
new_event_cb_[event_type] = nullptr;
}
on_inconsistent_topic(const struct DDS_InconsistentTopicStatus * status);

void
Expand All @@ -358,6 +404,10 @@ class RMW_Connext_StatusCondition : public RMW_Connext_Condition

protected:
DDS_StatusCondition * scond;
std::mutex new_event_mutex_;
rmw_event_callback_t new_event_cb_[RMW_EVENT_INVALID] = {};
const void * user_data_[RMW_EVENT_INVALID] = {};
uint64_t unread_events_count_[RMW_EVENT_INVALID] = {0};

bool triggered_inconsistent_topic{false};

Expand Down Expand Up @@ -779,6 +829,25 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
return RMW_RET_OK;
}

void set_on_new_data_callback(
const rmw_event_callback_t callback,
const void * const user_data)
{
std::unique_lock<std::mutex> lock(new_data_event_mutex_);
if (callback) {
if (unread_data_events_count_ > 0) {
callback(user_data, unread_data_events_count_);
unread_data_events_count_ = 0;
}
new_data_event_cb_ = callback;
data_event_user_data_ = user_data;
} else {
new_data_event_cb_ = nullptr;
data_event_user_data_ = nullptr;
}
}

void notify_new_data();
inline rmw_ret_t
get_matched_status(rmw_matched_status_t * const status)
{
Expand Down Expand Up @@ -838,6 +907,11 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition

RMW_Connext_Subscriber * sub;

std::mutex new_data_event_mutex_;
rmw_event_callback_t new_data_event_cb_{nullptr};
const void * data_event_user_data_{nullptr};
uint64_t unread_data_events_count_ = 0;

friend class RMW_Connext_WaitSet;
};

Expand Down
37 changes: 37 additions & 0 deletions rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ RMW_Connext_DataReaderListener_on_data_available(

UNUSED_ARG(reader);

self->notify_new_data();
self->set_data_available(true);
}

Expand Down Expand Up @@ -799,6 +800,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_deadline(

this->status_deadline.total_count_change = this->status_deadline.total_count;
this->status_deadline.total_count_change -= this->status_deadline_last.total_count;

this->notify_new_event(RMW_EVENT_REQUESTED_DEADLINE_MISSED);
}

void
Expand All @@ -813,6 +816,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_liveliness(
this->status_liveliness.alive_count_change -= this->status_liveliness_last.alive_count;
this->status_liveliness.not_alive_count_change -=
this->status_liveliness_last.not_alive_count;

this->notify_new_event(RMW_EVENT_LIVELINESS_CHANGED);
}

void
Expand All @@ -824,6 +829,7 @@ RMW_Connext_SubscriberStatusCondition::update_status_qos(

this->status_qos.total_count_change = this->status_qos.total_count;
this->status_qos.total_count_change -= this->status_qos_last.total_count;
this->notify_new_event(RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE);
}

void
Expand All @@ -836,6 +842,31 @@ RMW_Connext_SubscriberStatusCondition::update_status_sample_lost(
this->status_sample_lost.total_count_change = this->status_sample_lost.total_count;
this->status_sample_lost.total_count_change -=
this->status_sample_lost_last.total_count;
this->notify_new_event(RMW_EVENT_MESSAGE_LOST);
}

void
RMW_Connext_SubscriberStatusCondition::notify_new_data()
{
size_t unread_samples = 0;
std::unique_lock<std::mutex> lock_mutex(new_data_event_mutex_);
perform_action_and_update_state(
[this, &unread_samples]() {
const rmw_ret_t rc = this->sub->count_unread_samples(unread_samples);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to double check here, as I don't really know the underlying connext APIs.
When is a sample marked as read?

If this notify_new_data function is invoked whenever the DDS receives a message, I would expect unread_samples to always be 1.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to double check here, as I don't really know the underlying connext APIs.
When is a sample marked as read?

In general in DDS, samples have a "sample state" associated with them, which is "NOT_READ" when they are added to a DataReader's cache, and it transitions to "READ" once they have been returned at least once by a call to DataReader::read() or DataReader::take() (in the case of take() the "READ" state cannot be really observed because the sample is removed from the cache, but theoretically the sample transitions...).

Both read() and take() take a "sample state" argument to possibly filter the samples in the cache (in addition to two more states, "Instance" and "View". So the count_unread_samples() function takes advantage of this to scan the reader's cache and only ever receive samples that haven't yet been observed by the "application layer" (i.e. the RMW/ROS2).

The RMW will then take()'s all the samples out of the cache (when the upper layers call rmw_take_message()) and it does to without constraints on the "sample state".

If this notify_new_data function is invoked whenever the DDS receives a message, I would expect unread_samples to always be 1.

There is no guarantee in DDS that the callback will be invoked for every samples. In fact the protocol explicitly allows an implementation to coalesce multiple events in a single notification. This is true for every of the observable events, not just DATA_AVAILABLE.

Short of explicit guarantees of an implementation, it is not correct to assume that a DDS listener will be notified for every received sample. The listener must actually inspect the reader cache to determine how many samples are available.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I ended up with this implementation that counts the messages in the reader queue upon DATA_AVAILABLE, because the implementation from rmw_fastrtps_cpp which assumes only 1 sample didn't work for Connext (sometimes the listener would receive only one notification while having multiple samples in the queue)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the detailed description, this makes sense!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I'd like to point out about this implementation is that there is always a possibility that some samples that were notified to the callback will not actually be read by the application layer because of QoS settings (mainly KEEP_LAST history). The samples could be counted but then be pushed out of the cache by newer samples before they can be take()n out of it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I this true for both connext dds variantions, Pro and Micro? I'm just asking because the call to count_unread_samples adds quite some overhead here. I don't remember the exact numbers, but I think it was at least 1µs. It's not that much, but it is a gain, which can add up quite quickly for latency sensitive applications with "sufficiently large" chains of subscriptions.

if (RMW_RET_OK != rc) {
RMW_CONNEXT_LOG_ERROR("failed to count unread samples on DDS Reader")
}
},
[this, &unread_samples]() {
if (unread_samples == 0) {
return;
}
if (new_data_event_cb_) {
new_data_event_cb_(data_event_user_data_, unread_samples);
} else {
unread_data_events_count_ += unread_samples;
}
});
}

void
Expand Down Expand Up @@ -998,6 +1029,8 @@ RMW_Connext_PublisherStatusCondition::update_status_deadline(

this->status_deadline.total_count_change = this->status_deadline.total_count;
this->status_deadline.total_count_change -= this->status_deadline_last.total_count;

this->notify_new_event(RMW_EVENT_OFFERED_DEADLINE_MISSED);
}

void
Expand All @@ -1009,6 +1042,8 @@ RMW_Connext_PublisherStatusCondition::update_status_liveliness(

this->status_liveliness.total_count_change = this->status_liveliness.total_count;
this->status_liveliness.total_count_change -= this->status_liveliness_last.total_count;

this->notify_new_event(RMW_EVENT_LIVELINESS_CHANGED);
}

void
Expand All @@ -1020,6 +1055,8 @@ RMW_Connext_PublisherStatusCondition::update_status_qos(

this->status_qos.total_count_change = this->status_qos.total_count;
this->status_qos.total_count_change -= this->status_qos_last.total_count;

this->notify_new_event(RMW_EVENT_OFFERED_QOS_INCOMPATIBLE);
}

void
Expand Down
Loading