Skip to content

Commit

Permalink
Add listener callbacks to newly added events
Browse files Browse the repository at this point in the history
Signed-off-by: Mauro Passerino <[email protected]>
  • Loading branch information
Mauro Passerino authored and Jeffery Hsu committed Aug 30, 2022
1 parent d388d7d commit 67b20ae
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

#include "rmw/event_callback_type.h"

#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp"
#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

class ClientListener;
Expand Down Expand Up @@ -128,13 +129,7 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener
list_has_data_.store(true);
}

std::unique_lock<std::mutex> lock_mutex(on_new_response_m_);

if (on_new_response_cb_) {
on_new_response_cb_(user_data_, 1);
} else {
unread_count_++;
}
on_data_available_.call();
}
}
}
Expand Down Expand Up @@ -198,20 +193,7 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener
const void * user_data,
rmw_event_callback_t callback)
{
std::unique_lock<std::mutex> lock_mutex(on_new_response_m_);

if (callback) {
// Push events arrived before setting the the executor callback
if (unread_count_) {
callback(user_data, unread_count_);
unread_count_ = 0;
}
user_data_ = user_data;
on_new_response_cb_ = callback;
} else {
user_data_ = nullptr;
on_new_response_cb_ = nullptr;
}
on_data_available_.set_callback(user_data, callback);
}

private:
Expand All @@ -234,10 +216,8 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::set<eprosima::fastrtps::rtps::GUID_t> publishers_;

rmw_event_callback_t on_new_response_cb_{nullptr};
const void * user_data_{nullptr};
std::mutex on_new_response_m_;
uint64_t unread_count_ = 0;
// Callback to call when the listener detects events
EventTypeCallback on_data_available_;
};

class ClientPubListener : public eprosima::fastdds::dds::DataWriterListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,10 @@ class EventListenerInterface

// Provide handlers to perform an action when a
// new event from this listener has ocurred
virtual void set_on_new_event_callback(
virtual rmw_ret_t set_on_new_event_callback(
rmw_event_type_t event_type,
const void * user_data,
rmw_event_callback_t callback) = 0;

rmw_event_callback_t on_new_event_cb_{nullptr};
const void * user_data_{nullptr};
uint64_t unread_events_count_ = 0;
std::mutex on_new_event_m_;
};

class EventListenerInterface::ConditionalScopedLock
Expand Down Expand Up @@ -105,4 +101,51 @@ struct CustomEventInfo
virtual EventListenerInterface * getListener() const = 0;
};

class EventTypeCallback
{
public:
EventTypeCallback() = default;

EventTypeCallback(size_t depth)
{
history_depth_ = (depth > 0) ? depth : std::numeric_limits<size_t>::max();
}

void set_callback(const void * user_data, rmw_event_callback_t callback)
{
std::lock_guard<std::mutex> lock(mutex_);

if (callback) {
if (unread_count_) {
size_t count = std::min(unread_count_, history_depth_);
callback(user_data, count);
unread_count_ = 0;
}
user_data_ = user_data;
callback_ = callback;
} else {
user_data_ = nullptr;
callback_ = nullptr;
}
}

void call()
{
std::lock_guard<std::mutex> lock(mutex_);

if (callback_) {
callback_(user_data_, 1);
} else {
unread_count_++;
}
}

private:
std::mutex mutex_;
rmw_event_callback_t callback_{nullptr};
const void * user_data_{nullptr};
size_t unread_count_{0};
size_t history_depth_ = std::numeric_limits<size_t>::max();
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds
hasEvent(rmw_event_type_t event_type) const final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void set_on_new_event_callback(
rmw_ret_t
set_on_new_event_callback(
rmw_event_type_t event_type,
const void * user_data,
rmw_event_callback_t callback) final;

Expand Down Expand Up @@ -158,6 +160,11 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds

std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

// The callbacks to call when the listener detects events
EventTypeCallback on_liveliness_lost_;
EventTypeCallback on_offered_deadline_missed_;
EventTypeCallback on_offered_incompatible_qos_;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

#include "rmw/event_callback_type.h"

#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp"
#include "rmw_fastrtps_shared_cpp/guid_utils.hpp"
#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

Expand Down Expand Up @@ -249,13 +250,7 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener
list_has_data_.store(true);
}

std::unique_lock<std::mutex> lock_mutex(on_new_request_m_);

if (on_new_request_cb_) {
on_new_request_cb_(user_data_, 1);
} else {
unread_count_++;
}
on_data_available_.call();
}
}
}
Expand Down Expand Up @@ -313,20 +308,7 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener
const void * user_data,
rmw_event_callback_t callback)
{
std::unique_lock<std::mutex> lock_mutex(on_new_request_m_);

if (callback) {
// Push events arrived before setting the the executor callback
if (unread_count_) {
callback(user_data, unread_count_);
unread_count_ = 0;
}
user_data_ = user_data;
on_new_request_cb_ = callback;
} else {
user_data_ = nullptr;
on_new_request_cb_ = nullptr;
}
on_data_available_.set_callback(user_data, callback);
}

private:
Expand All @@ -337,10 +319,8 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener
std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

rmw_event_callback_t on_new_request_cb_{nullptr};
const void * user_data_{nullptr};
std::mutex on_new_request_m_;
uint64_t unread_count_ = 0;
// Callback to call when the listener detects events
EventTypeCallback on_data_available_;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
sample_lost_changes_(false),
incompatible_qos_changes_(false),
conditionMutex_(nullptr),
conditionVariable_(nullptr)
conditionVariable_(nullptr),
on_data_available_(qos_depth)
{
qos_depth_ = (qos_depth > 0) ? qos_depth : std::numeric_limits<size_t>::max();
// Field is not used right now
(void)info;
}
Expand All @@ -115,19 +115,13 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
update_has_data(reader);
}

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_data_available(eprosima::fastdds::dds::DataReader * reader) final
{
update_has_data(reader);

std::unique_lock<std::mutex> lock_mutex(on_new_message_m_);
on_data_available(eprosima::fastdds::dds::DataReader * reader) final;

if (on_new_message_cb_) {
on_new_message_cb_(user_data_, 1);
} else {
new_data_unread_count_++;
}
}
RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
set_on_data_available_callback(const void * user_data, rmw_event_callback_t callback);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
Expand Down Expand Up @@ -159,7 +153,9 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
hasEvent(rmw_event_type_t event_type) const final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void set_on_new_event_callback(
rmw_ret_t
set_on_new_event_callback(
rmw_event_type_t event_type,
const void * user_data,
rmw_event_callback_t callback) final;

Expand Down Expand Up @@ -209,30 +205,6 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
return publishers_.size();
}

// Provide handlers to perform an action when a
// new event from this listener has ocurred
void
set_on_new_message_callback(
const void * user_data,
rmw_event_callback_t callback)
{
std::unique_lock<std::mutex> lock_mutex(on_new_message_m_);

if (callback) {
// Push events arrived before setting the executor's callback
if (new_data_unread_count_) {
auto unread_count = std::min(new_data_unread_count_, qos_depth_);
callback(user_data, unread_count);
new_data_unread_count_ = 0;
}
user_data_ = user_data;
on_new_message_cb_ = callback;
} else {
user_data_ = nullptr;
on_new_message_cb_ = nullptr;
}
}

private:
mutable std::mutex internalMutex_;

Expand All @@ -259,10 +231,12 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds

std::set<eprosima::fastrtps::rtps::GUID_t> publishers_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

rmw_event_callback_t on_new_message_cb_{nullptr};
std::mutex on_new_message_m_;
size_t qos_depth_;
size_t new_data_unread_count_ = 0;
// The callbacks to call when the listener detects events
EventTypeCallback on_sample_lost_;
EventTypeCallback on_data_available_;
EventTypeCallback on_liveliness_changed_;
EventTypeCallback on_requested_deadline_missed_;
EventTypeCallback on_requested_incompatible_qos_;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_
52 changes: 24 additions & 28 deletions rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rmw/error_handling.h"

#include "rmw_fastrtps_shared_cpp/custom_publisher_info.hpp"

#include "fastdds/dds/core/status/BaseStatus.hpp"
Expand Down Expand Up @@ -44,13 +46,7 @@ PubListener::on_offered_deadline_missed(

deadline_changes_.store(true, std::memory_order_relaxed);

std::unique_lock<std::mutex> lock_mutex(on_new_event_m_);

if (on_new_event_cb_) {
on_new_event_cb_(user_data_, 1);
} else {
unread_events_count_++;
}
on_offered_deadline_missed_.call();
}

void PubListener::on_liveliness_lost(
Expand All @@ -70,13 +66,7 @@ void PubListener::on_liveliness_lost(

liveliness_changes_.store(true, std::memory_order_relaxed);

std::unique_lock<std::mutex> lock_mutex(on_new_event_m_);

if (on_new_event_cb_) {
on_new_event_cb_(user_data_, 1);
} else {
unread_events_count_++;
}
on_liveliness_lost_.call();
}

void PubListener::on_offered_incompatible_qos(
Expand All @@ -96,6 +86,8 @@ void PubListener::on_offered_incompatible_qos(
incompatible_qos_status_.total_count_change += status.total_count_change;

incompatible_qos_changes_.store(true, std::memory_order_relaxed);

on_offered_incompatible_qos_.call();
}

bool PubListener::hasEvent(rmw_event_type_t event_type) const
Expand All @@ -114,24 +106,28 @@ bool PubListener::hasEvent(rmw_event_type_t event_type) const
return false;
}

void PubListener::set_on_new_event_callback(
rmw_ret_t PubListener::set_on_new_event_callback(
rmw_event_type_t event_type,
const void * user_data,
rmw_event_callback_t callback)
{
std::unique_lock<std::mutex> lock_mutex(on_new_event_m_);

if (callback) {
// Push events arrived before setting the executor's callback
if (unread_events_count_) {
callback(user_data, unread_events_count_);
unread_events_count_ = 0;
}
user_data_ = user_data;
on_new_event_cb_ = callback;
} else {
user_data_ = nullptr;
on_new_event_cb_ = nullptr;
switch (event_type)
{
case RMW_EVENT_LIVELINESS_LOST:
on_liveliness_lost_.set_callback(user_data, callback);
break;
case RMW_EVENT_OFFERED_DEADLINE_MISSED:
on_offered_deadline_missed_.set_callback(user_data, callback);
break;
case RMW_EVENT_OFFERED_QOS_INCOMPATIBLE:
on_offered_incompatible_qos_.set_callback(user_data, callback);
break;
default:
RMW_SET_ERROR_MSG("provided event_type is not supported");
return RMW_RET_UNSUPPORTED;
break;
}
return RMW_RET_OK;
}

bool PubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info)
Expand Down
Loading

0 comments on commit 67b20ae

Please sign in to comment.