Skip to content

Commit

Permalink
Add support for listener callbacks.
Browse files Browse the repository at this point in the history
Signed-off-by: Andrea Sorbini <[email protected]>
  • Loading branch information
asorbini committed Mar 16, 2022
1 parent 35a37aa commit fe2fd49
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 41 deletions.
18 changes: 9 additions & 9 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_event_set_callback(
rmw_event_t * event,
rmw_event_callback_t callback,
const void * user_data);
const rmw_event_callback_t callback,
const void * const user_data);

/*****************************************************************************
* Info API
Expand Down Expand Up @@ -435,15 +435,15 @@ RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_service_set_on_new_request_callback(
rmw_service_t * rmw_service,
rmw_event_callback_t callback,
const void * user_data);
const rmw_event_callback_t callback,
const void * const user_data);

RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_client_set_on_new_response_callback(
rmw_client_t * rmw_client,
rmw_event_callback_t callback,
const void * user_data);
const rmw_event_callback_t callback,
const void * const user_data);

/*****************************************************************************
* Subscription API
Expand Down Expand Up @@ -560,9 +560,9 @@ rmw_api_connextdds_return_loaned_message_from_subscription(
RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_subscription_set_on_new_message_callback(
rmw_subscription_t * rmw_subscription,
rmw_event_callback_t callback,
const void * user_data);
rmw_subscription_t * const rmw_subscription,
const rmw_event_callback_t callback,
const void * const user_data);

/*****************************************************************************
* WaitSet API
Expand Down
72 changes: 72 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,44 @@ class RMW_Connext_StatusCondition : public RMW_Connext_Condition
virtual bool
has_status(const rmw_event_type_t event_type) = 0;

void
notify_new_event()
{
std::unique_lock<std::mutex> lock_mutex(new_event_mutex_);
if (new_event_cb_) {
new_event_cb_(user_data_, 1);
} else {
unread_events_count_++;
}
}

void
set_new_event_callback(
rmw_event_callback_t callback,
const void * user_data)
{
std::unique_lock<std::mutex> lock_mutex(new_event_mutex_);

if (callback) {
// Push events arrived before setting the executor's callback
if (unread_events_count_) {
callback(user_data, unread_events_count_);
unread_events_count_ = 0;
}
user_data_ = user_data;
new_event_cb_ = callback;
} else {
user_data_ = nullptr;
new_event_cb_ = nullptr;
}
}

protected:
DDS_StatusCondition * scond;
std::mutex new_event_mutex_;
rmw_event_callback_t new_event_cb_{nullptr};
const void * user_data_{nullptr};
uint64_t unread_events_count_ = 0;
};

void
Expand Down Expand Up @@ -589,6 +625,9 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
update_state(
[this, available]() {
this->triggered_data = available;
if (available) {
this->notify_new_data();
}
}, true /* notify */);

if (nullptr != this->loan_guard_condition) {
Expand Down Expand Up @@ -712,6 +751,34 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
return RMW_RET_OK;
}

void set_on_new_data_callback(
const rmw_event_callback_t callback,
const void * const user_data)
{
std::unique_lock<std::mutex> lock(new_data_event_mutex_);
if (callback) {
if (unread_data_events_count_ > 0) {
callback(user_data, unread_data_events_count_);
unread_data_events_count_ = 0;
new_data_event_cb_ = callback;
data_event_user_data_ = user_data;
}
} else {
new_data_event_cb_ = nullptr;
data_event_user_data_ = nullptr;
}
}

void notify_new_data()
{
std::unique_lock<std::mutex> lock_mutex(new_data_event_mutex_);
if (new_data_event_cb_) {
new_data_event_cb_(data_event_user_data_, 1);
} else {
unread_data_events_count_++;
}
}

protected:
void update_status_deadline(
const DDS_RequestedDeadlineMissedStatus * const status);
Expand Down Expand Up @@ -745,6 +812,11 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition

RMW_Connext_Subscriber * sub;

std::mutex new_data_event_mutex_;
rmw_event_callback_t new_data_event_cb_{nullptr};
const void * data_event_user_data_{nullptr};
uint64_t unread_data_events_count_ = 0;

friend class RMW_Connext_WaitSet;
};

Expand Down
8 changes: 8 additions & 0 deletions rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_deadline(

this->status_deadline.total_count_change = this->status_deadline.total_count;
this->status_deadline.total_count_change -= this->status_deadline_last.total_count;

this->notify_new_event();
}

void
Expand All @@ -720,6 +722,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_liveliness(
this->status_liveliness.alive_count_change -= this->status_liveliness_last.alive_count;
this->status_liveliness.not_alive_count_change -=
this->status_liveliness_last.not_alive_count;

this->notify_new_event();
}

void
Expand Down Expand Up @@ -852,6 +856,8 @@ RMW_Connext_PublisherStatusCondition::update_status_deadline(

this->status_deadline.total_count_change = this->status_deadline.total_count;
this->status_deadline.total_count_change -= this->status_deadline_last.total_count;

this->notify_new_event();
}

void
Expand All @@ -863,6 +869,8 @@ RMW_Connext_PublisherStatusCondition::update_status_liveliness(

this->status_liveliness.total_count_change = this->status_liveliness.total_count;
this->status_liveliness.total_count_change -= this->status_liveliness_last.total_count;

this->notify_new_event();
}

void
Expand Down
92 changes: 60 additions & 32 deletions rmw_connextdds_common/src/common/rmw_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,86 @@
******************************************************************************/
rmw_ret_t
rmw_api_connextdds_event_set_callback(
rmw_event_t * event,
rmw_event_callback_t callback,
const void * user_data)
rmw_event_t * const event,
const rmw_event_callback_t callback,
const void * const user_data)
{
UNUSED_ARG(event);
UNUSED_ARG(callback);
UNUSED_ARG(user_data);
RMW_CONNEXT_LOG_ERROR_SET("rmw_event_set_callback not implemented")
return RMW_RET_UNSUPPORTED;
RMW_CHECK_ARGUMENT_FOR_NULL(event, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
event,
event->implementation_identifier,
RMW_CONNEXTDDS_ID,
return RMW_RET_INVALID_ARGUMENT);

RMW_Connext_StatusCondition * condition = nullptr;
if (RMW_Connext_Event::reader_event(event)) {
condition = RMW_Connext_Event::subscriber(event)->condition();
} else {
condition = RMW_Connext_Event::publisher(event)->condition();
}
condition->set_new_event_callback(callback, user_data);
return RMW_RET_OK;
}

/******************************************************************************
* Service Listener API
******************************************************************************/
rmw_ret_t
rmw_api_connextdds_service_set_on_new_request_callback(
rmw_service_t * rmw_service,
rmw_event_callback_t callback,
const void * user_data)
rmw_service_t * const service,
const rmw_event_callback_t callback,
const void * const user_data)
{
UNUSED_ARG(rmw_service);
UNUSED_ARG(callback);
UNUSED_ARG(user_data);
RMW_CONNEXT_LOG_ERROR_SET("rmw_service_set_on_new_request_callback not implemented")
return RMW_RET_UNSUPPORTED;
RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
service,
service->implementation_identifier,
RMW_CONNEXTDDS_ID,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

RMW_Connext_Service * const svc_impl =
reinterpret_cast<RMW_Connext_Service *>(service->data);
svc_impl->subscriber()->condition()->set_on_new_data_callback(callback, user_data);
return RMW_RET_OK;
}

rmw_ret_t
rmw_api_connextdds_client_set_on_new_response_callback(
rmw_client_t * rmw_client,
rmw_event_callback_t callback,
const void * user_data)
rmw_client_t * const client,
const rmw_event_callback_t callback,
const void * const user_data)
{
UNUSED_ARG(rmw_client);
UNUSED_ARG(callback);
UNUSED_ARG(user_data);
RMW_CONNEXT_LOG_ERROR_SET("rmw_client_set_on_new_response_callback not implemented")
return RMW_RET_UNSUPPORTED;
RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
client,
client->implementation_identifier,
RMW_CONNEXTDDS_ID,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

RMW_Connext_Client * const client_impl =
reinterpret_cast<RMW_Connext_Client *>(client->data);
client_impl->subscriber()->condition()->set_on_new_data_callback(callback, user_data);
return RMW_RET_OK;
}

/******************************************************************************
* Subscription Listener API
******************************************************************************/
rmw_ret_t
rmw_api_connextdds_subscription_set_on_new_message_callback(
rmw_subscription_t * rmw_subscription,
rmw_event_callback_t callback,
const void * user_data)
rmw_subscription_t * const subscription,
const rmw_event_callback_t callback,
const void * const user_data)
{
UNUSED_ARG(rmw_subscription);
UNUSED_ARG(callback);
UNUSED_ARG(user_data);
RMW_CONNEXT_LOG_ERROR_SET("rmw_subscription_set_on_new_message_callback not implemented")
return RMW_RET_UNSUPPORTED;
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription,
subscription->implementation_identifier,
RMW_CONNEXTDDS_ID,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

RMW_Connext_Subscriber * const sub_impl =
reinterpret_cast<RMW_Connext_Subscriber *>(subscription->data);
sub_impl->condition()->set_on_new_data_callback(callback, user_data);
return RMW_RET_OK;
}

0 comments on commit fe2fd49

Please sign in to comment.