Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport rmw callbacks implementation to Humble [ros2-73] #157

Merged
merged 3 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ rmw_ret_t
rmw_connextdds_take_samples(
RMW_Connext_Subscriber * const sub);

rmw_ret_t
rmw_connextdds_count_unread_samples(
RMW_Connext_Subscriber * const sub,
size_t & unread_count);

rmw_ret_t
rmw_connextdds_return_samples(
RMW_Connext_Subscriber * const sub);
Expand Down
18 changes: 9 additions & 9 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_event_set_callback(
rmw_event_t * event,
rmw_event_callback_t callback,
const void * user_data);
const rmw_event_callback_t callback,
const void * const user_data);

/*****************************************************************************
* Info API
Expand Down Expand Up @@ -437,15 +437,15 @@ RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_service_set_on_new_request_callback(
rmw_service_t * rmw_service,
rmw_event_callback_t callback,
const void * user_data);
const rmw_event_callback_t callback,
const void * const user_data);

RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_client_set_on_new_response_callback(
rmw_client_t * rmw_client,
rmw_event_callback_t callback,
const void * user_data);
const rmw_event_callback_t callback,
const void * const user_data);

/*****************************************************************************
* Subscription API
Expand Down Expand Up @@ -574,9 +574,9 @@ rmw_api_connextdds_return_loaned_message_from_subscription(
RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_subscription_set_on_new_message_callback(
rmw_subscription_t * rmw_subscription,
rmw_event_callback_t callback,
const void * user_data);
rmw_subscription_t * const rmw_subscription,
const rmw_event_callback_t callback,
const void * const user_data);

/*****************************************************************************
* WaitSet API
Expand Down
24 changes: 24 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,30 @@ class RMW_Connext_Subscriber
return has_data;
}

rmw_ret_t
count_unread_samples(size_t & unread_count)
{
// The action of counting unread samples is not currently mutually exclusive
// with the action of taking samples out of the reader cache. Unfortunately
// we cannot use a mutex to synchronize calls between
// count_unread_samples() and and take_next() because we would run the risk
// of a deadlock. This is because count_unread_samples() is supposed to be
// called from within the reader's "exclusive area" (since it is
// executed within a listener callback), while take_next() is usually called
// from an executor/application thread and it must acquire the reader's
// "exclusive area" before taking samples.
// This might mean that an application which relies on data callbacks and the
// count provided by this function, and which at the same time polls data
// on the subscription, might end up missing some samples in the total count
// notified to it via callback because the samples were taken out of the
// cache before they could be notified to the listener.
// Fortunately, in Connext the listener callback is notified right after a
// sample is added to the reader cache and before any mutex is released,
// which should allow this function to always report a correct number, as
// long as it is only called within a (DDS) listener callback.
return rmw_connextdds_count_unread_samples(this, unread_count);
}

DDS_Subscriber * dds_subscriber() const
{
return DDS_DataReader_get_subscriber(this->dds_reader);
Expand Down
89 changes: 89 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define RMW_CONNEXTDDS__RMW_WAITSET_STD_HPP_

#include "rmw_connextdds/context.hpp"
#include <condition_variable>

/******************************************************************************
* Alternative implementation of WaitSets and Conditions using C++ std
Expand Down Expand Up @@ -121,6 +122,22 @@ class RMW_Connext_Condition
}
}

template<typename FunctorT, typename FunctorA>
void
perform_action_and_update_state(FunctorA && action, FunctorT && update_condition)
{
std::lock_guard<std::mutex> internal_lock(this->mutex_internal);

action();

if (nullptr != this->waitset_mutex) {
std::lock_guard<std::mutex> lock(*this->waitset_mutex);
update_condition();
} else {
update_condition();
}
}

protected:
std::mutex mutex_internal;
std::mutex * waitset_mutex;
Expand Down Expand Up @@ -333,8 +350,55 @@ class RMW_Connext_StatusCondition : public RMW_Connext_Condition
virtual bool
has_status(const rmw_event_type_t event_type) = 0;

void
notify_new_event(rmw_event_type_t event_type)
{
std::unique_lock<std::mutex> lock_mutex(new_event_mutex_);
if (new_event_cb_[event_type]) {
new_event_cb_[event_type](user_data_[event_type], 1);
} else {
unread_events_count_[event_type]++;
}
}

void
set_new_event_callback(
rmw_event_type_t event_type,
rmw_event_callback_t callback,
const void * user_data)
{
std::unique_lock<std::mutex> lock_mutex(new_event_mutex_);

if (callback) {
// Push events arrived before setting the executor's callback
if (unread_events_count_[event_type] > 0) {
callback(user_data, unread_events_count_[event_type]);
unread_events_count_[event_type] = 0;
}
user_data_[event_type] = user_data;
new_event_cb_[event_type] = callback;
} else {
user_data_[event_type] = nullptr;
new_event_cb_[event_type] = nullptr;
}
}

void
on_inconsistent_topic(const struct DDS_InconsistentTopicStatus * status);

void
update_status_inconsistent_topic(const struct DDS_InconsistentTopicStatus * status);

protected:
DDS_StatusCondition * scond;
std::mutex new_event_mutex_;
rmw_event_callback_t new_event_cb_[RMW_EVENT_INVALID] = {};
const void * user_data_[RMW_EVENT_INVALID] = {};
uint64_t unread_events_count_[RMW_EVENT_INVALID] = {0};

bool triggered_inconsistent_topic{false};

struct DDS_InconsistentTopicStatus status_inconsistent_topic;
Comment on lines +394 to +401
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There changes obviously break the ABI compatibility, are we allowed to do that in this particular time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There changes obviously break the ABI compatibility, are we allowed to do that in this particular time?

It's true that this breaks ABI. In this case, I approved it anyway because the ABI is between rmw_connextdds_common and rmw_connextdds; it seems doubtful someone is relying on that. Also, for some other RMWs we have broken ABI in this way before if it was "internal" to the RMW.

But it is a conversation worth having; should we allow this kind of ABI breakage in stable distributions?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think that is okay in this case to break the ABI, i do not expect there could be someone relying on rmw implementation APIs...

But it is a conversation worth having; should we allow this kind of ABI breakage in stable distributions?

IMO, i think rmw implementation can break the internal ABI in stable distributions. as long as RMW interface is compatible, there should be no problem? agree that this is worth to bring up for the discussion.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you be ok with this change then? I prefer to be sure than to revert.

As I see it, this would only break compatibility from programs that directly call the Connext RMW API, which would be defeating the purpose of the RMW in the first place. The ABI of the rmw_connextdds and rmw packages is still stable, so it shouldn't be an issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you be ok with this change then? I prefer to be sure than to revert.

Let's hold off until next week; we'll have a discussion in the ROS PMC meeting about it.

};

void
Expand Down Expand Up @@ -712,6 +776,26 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
return RMW_RET_OK;
}

void set_on_new_data_callback(
const rmw_event_callback_t callback,
const void * const user_data)
{
std::unique_lock<std::mutex> lock(new_data_event_mutex_);
if (callback) {
if (unread_data_events_count_ > 0) {
callback(user_data, unread_data_events_count_);
unread_data_events_count_ = 0;
}
new_data_event_cb_ = callback;
data_event_user_data_ = user_data;
} else {
new_data_event_cb_ = nullptr;
data_event_user_data_ = nullptr;
}
}

void notify_new_data();

protected:
void update_status_deadline(
const DDS_RequestedDeadlineMissedStatus * const status);
Expand Down Expand Up @@ -745,6 +829,11 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition

RMW_Connext_Subscriber * sub;

std::mutex new_data_event_mutex_;
rmw_event_callback_t new_data_event_cb_{nullptr};
const void * data_event_user_data_{nullptr};
uint64_t unread_data_events_count_ = 0;

friend class RMW_Connext_WaitSet;
};

Expand Down
37 changes: 37 additions & 0 deletions rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ RMW_Connext_DataReaderListener_on_data_available(

UNUSED_ARG(reader);

self->notify_new_data();
self->set_data_available(true);
}

Expand Down Expand Up @@ -706,6 +707,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_deadline(

this->status_deadline.total_count_change = this->status_deadline.total_count;
this->status_deadline.total_count_change -= this->status_deadline_last.total_count;

this->notify_new_event(RMW_EVENT_REQUESTED_DEADLINE_MISSED);
}

void
Expand All @@ -720,6 +723,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_liveliness(
this->status_liveliness.alive_count_change -= this->status_liveliness_last.alive_count;
this->status_liveliness.not_alive_count_change -=
this->status_liveliness_last.not_alive_count;

this->notify_new_event(RMW_EVENT_LIVELINESS_CHANGED);
}

void
Expand All @@ -731,6 +736,7 @@ RMW_Connext_SubscriberStatusCondition::update_status_qos(

this->status_qos.total_count_change = this->status_qos.total_count;
this->status_qos.total_count_change -= this->status_qos_last.total_count;
this->notify_new_event(RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE);
}

void
Expand All @@ -743,6 +749,31 @@ RMW_Connext_SubscriberStatusCondition::update_status_sample_lost(
this->status_sample_lost.total_count_change = this->status_sample_lost.total_count;
this->status_sample_lost.total_count_change -=
this->status_sample_lost_last.total_count;
this->notify_new_event(RMW_EVENT_MESSAGE_LOST);
}

void
RMW_Connext_SubscriberStatusCondition::notify_new_data()
{
size_t unread_samples = 0;
std::unique_lock<std::mutex> lock_mutex(new_data_event_mutex_);
perform_action_and_update_state(
[this, &unread_samples]() {
const rmw_ret_t rc = this->sub->count_unread_samples(unread_samples);
if (RMW_RET_OK != rc) {
RMW_CONNEXT_LOG_ERROR("failed to count unread samples on DDS Reader")
}
},
[this, &unread_samples]() {
if (unread_samples == 0) {
return;
}
if (new_data_event_cb_) {
new_data_event_cb_(data_event_user_data_, unread_samples);
} else {
unread_data_events_count_ += unread_samples;
}
});
}

rmw_ret_t
Expand Down Expand Up @@ -852,6 +883,8 @@ RMW_Connext_PublisherStatusCondition::update_status_deadline(

this->status_deadline.total_count_change = this->status_deadline.total_count;
this->status_deadline.total_count_change -= this->status_deadline_last.total_count;

this->notify_new_event(RMW_EVENT_OFFERED_DEADLINE_MISSED);
}

void
Expand All @@ -863,6 +896,8 @@ RMW_Connext_PublisherStatusCondition::update_status_liveliness(

this->status_liveliness.total_count_change = this->status_liveliness.total_count;
this->status_liveliness.total_count_change -= this->status_liveliness_last.total_count;

this->notify_new_event(RMW_EVENT_LIVELINESS_CHANGED);
}

void
Expand All @@ -874,4 +909,6 @@ RMW_Connext_PublisherStatusCondition::update_status_qos(

this->status_qos.total_count_change = this->status_qos.total_count;
this->status_qos.total_count_change -= this->status_qos_last.total_count;

this->notify_new_event(RMW_EVENT_OFFERED_QOS_INCOMPATIBLE);
}
Loading