Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add listener callbacks to newly added events #625

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

#include "rmw/event_callback_type.h"

#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp"
#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

class ClientListener;
Expand Down Expand Up @@ -128,13 +129,7 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener
list_has_data_.store(true);
}

std::unique_lock<std::mutex> lock_mutex(on_new_response_m_);

if (on_new_response_cb_) {
on_new_response_cb_(user_data_, 1);
} else {
unread_count_++;
}
on_data_available_.call();
}
}
}
Expand Down Expand Up @@ -198,20 +193,7 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener
const void * user_data,
rmw_event_callback_t callback)
{
std::unique_lock<std::mutex> lock_mutex(on_new_response_m_);

if (callback) {
// Push events arrived before setting the the executor callback
if (unread_count_) {
callback(user_data, unread_count_);
unread_count_ = 0;
}
user_data_ = user_data;
on_new_response_cb_ = callback;
} else {
user_data_ = nullptr;
on_new_response_cb_ = nullptr;
}
on_data_available_.set_callback(user_data, callback);
}

private:
Expand All @@ -234,10 +216,8 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::set<eprosima::fastrtps::rtps::GUID_t> publishers_;

rmw_event_callback_t on_new_response_cb_{nullptr};
const void * user_data_{nullptr};
std::mutex on_new_response_m_;
uint64_t unread_count_ = 0;
// Callback to call when the listener detects events
EventTypeCallback on_data_available_;
};

class ClientPubListener : public eprosima::fastdds::dds::DataWriterListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
#ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_
#define RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_

#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <limits>
#include <list>
#include <memory>
#include <mutex>
Expand Down Expand Up @@ -62,14 +64,10 @@ class EventListenerInterface

// Provide handlers to perform an action when a
// new event from this listener has ocurred
virtual void set_on_new_event_callback(
virtual rmw_ret_t set_on_new_event_callback(
rmw_event_type_t event_type,
const void * user_data,
rmw_event_callback_t callback) = 0;

rmw_event_callback_t on_new_event_cb_{nullptr};
const void * user_data_{nullptr};
uint64_t unread_events_count_ = 0;
std::mutex on_new_event_m_;
};

class EventListenerInterface::ConditionalScopedLock
Expand Down Expand Up @@ -105,4 +103,51 @@ struct CustomEventInfo
virtual EventListenerInterface * getListener() const = 0;
};

class EventTypeCallback
{
public:
EventTypeCallback() = default;

explicit EventTypeCallback(size_t depth)
{
history_depth_ = (depth > 0) ? depth : std::numeric_limits<size_t>::max();
}

void set_callback(const void * user_data, rmw_event_callback_t callback)
{
std::lock_guard<std::mutex> lock(mutex_);

if (callback) {
if (unread_count_) {
size_t count = std::min(unread_count_, history_depth_);
callback(user_data, count);
unread_count_ = 0;
}
user_data_ = user_data;
callback_ = callback;
} else {
user_data_ = nullptr;
callback_ = nullptr;
}
}

void call()
{
std::lock_guard<std::mutex> lock(mutex_);

if (callback_) {
callback_(user_data_, 1);
} else {
unread_count_++;
}
}

private:
std::mutex mutex_;
rmw_event_callback_t callback_{nullptr};
const void * user_data_{nullptr};
size_t unread_count_{0};
size_t history_depth_ = std::numeric_limits<size_t>::max();
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds
hasEvent(rmw_event_type_t event_type) const final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void set_on_new_event_callback(
rmw_ret_t
set_on_new_event_callback(
rmw_event_type_t event_type,
const void * user_data,
rmw_event_callback_t callback) final;

Expand Down Expand Up @@ -158,6 +160,11 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds

std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

// The callbacks to call when the listener detects events
EventTypeCallback on_liveliness_lost_;
EventTypeCallback on_offered_deadline_missed_;
EventTypeCallback on_offered_incompatible_qos_;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

#include "rmw/event_callback_type.h"

#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp"
#include "rmw_fastrtps_shared_cpp/guid_utils.hpp"
#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

Expand Down Expand Up @@ -249,13 +250,7 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener
list_has_data_.store(true);
}

std::unique_lock<std::mutex> lock_mutex(on_new_request_m_);

if (on_new_request_cb_) {
on_new_request_cb_(user_data_, 1);
} else {
unread_count_++;
}
on_data_available_.call();
}
}
}
Expand Down Expand Up @@ -313,20 +308,7 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener
const void * user_data,
rmw_event_callback_t callback)
{
std::unique_lock<std::mutex> lock_mutex(on_new_request_m_);

if (callback) {
// Push events arrived before setting the the executor callback
if (unread_count_) {
callback(user_data, unread_count_);
unread_count_ = 0;
}
user_data_ = user_data;
on_new_request_cb_ = callback;
} else {
user_data_ = nullptr;
on_new_request_cb_ = nullptr;
}
on_data_available_.set_callback(user_data, callback);
}

private:
Expand All @@ -337,10 +319,8 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener
std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

rmw_event_callback_t on_new_request_cb_{nullptr};
const void * user_data_{nullptr};
std::mutex on_new_request_m_;
uint64_t unread_count_ = 0;
// Callback to call when the listener detects events
EventTypeCallback on_data_available_;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#define RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_

#include <atomic>
#include <algorithm>
#include <condition_variable>
#include <limits>
#include <memory>
Expand Down Expand Up @@ -91,9 +90,9 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
sample_lost_changes_(false),
incompatible_qos_changes_(false),
conditionMutex_(nullptr),
conditionVariable_(nullptr)
conditionVariable_(nullptr),
on_data_available_(qos_depth)
{
qos_depth_ = (qos_depth > 0) ? qos_depth : std::numeric_limits<size_t>::max();
// Field is not used right now
(void)info;
}
Expand All @@ -115,19 +114,13 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
update_has_data(reader);
}

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_data_available(eprosima::fastdds::dds::DataReader * reader) final
{
update_has_data(reader);

std::unique_lock<std::mutex> lock_mutex(on_new_message_m_);
on_data_available(eprosima::fastdds::dds::DataReader * reader) final;

if (on_new_message_cb_) {
on_new_message_cb_(user_data_, 1);
} else {
new_data_unread_count_++;
}
}
RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
set_on_data_available_callback(const void * user_data, rmw_event_callback_t callback);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
Expand Down Expand Up @@ -159,7 +152,9 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
hasEvent(rmw_event_type_t event_type) const final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void set_on_new_event_callback(
rmw_ret_t
set_on_new_event_callback(
rmw_event_type_t event_type,
const void * user_data,
rmw_event_callback_t callback) final;

Expand Down Expand Up @@ -209,30 +204,6 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
return publishers_.size();
}

// Provide handlers to perform an action when a
// new event from this listener has ocurred
void
set_on_new_message_callback(
const void * user_data,
rmw_event_callback_t callback)
{
std::unique_lock<std::mutex> lock_mutex(on_new_message_m_);

if (callback) {
// Push events arrived before setting the executor's callback
if (new_data_unread_count_) {
auto unread_count = std::min(new_data_unread_count_, qos_depth_);
callback(user_data, unread_count);
new_data_unread_count_ = 0;
}
user_data_ = user_data;
on_new_message_cb_ = callback;
} else {
user_data_ = nullptr;
on_new_message_cb_ = nullptr;
}
}

private:
mutable std::mutex internalMutex_;

Expand All @@ -259,10 +230,12 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds

std::set<eprosima::fastrtps::rtps::GUID_t> publishers_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

rmw_event_callback_t on_new_message_cb_{nullptr};
std::mutex on_new_message_m_;
size_t qos_depth_;
size_t new_data_unread_count_ = 0;
// The callbacks to call when the listener detects events
EventTypeCallback on_sample_lost_;
EventTypeCallback on_data_available_;
EventTypeCallback on_liveliness_changed_;
EventTypeCallback on_requested_deadline_missed_;
EventTypeCallback on_requested_incompatible_qos_;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_
Loading