Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fastrtps18 event callbacks policies #275

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rmw_fastrtps_shared_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ find_package(rmw REQUIRED)
include_directories(include)

add_library(rmw_fastrtps_shared_cpp
src/custom_publisher_info.cpp
src/custom_subscriber_info.cpp
src/demangle.cpp
src/namespace_prefix.cpp
src/qos.cpp
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_
#define RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_

#include <atomic>
#include <condition_variable>
#include <list>
#include <memory>
#include <mutex>
#include <utility>

#include "fastcdr/FastBuffer.h"

#include "fastrtps/subscriber/SampleInfo.h"
#include "fastrtps/subscriber/Subscriber.h"
#include "fastrtps/subscriber/SubscriberListener.h"
#include "fastrtps/participant/Participant.h"
#include "fastrtps/publisher/Publisher.h"
#include "fastrtps/publisher/PublisherListener.h"

#include "rmw/event.h"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"


class EventListenerInterface
{
protected:
class ConditionalScopedLock;

public:
/// Connect a condition variable so a waiter can be notified of new data.
virtual void attachCondition(
emersonknapp marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear if we can call that more than once?

std::mutex * conditionMutex,
std::condition_variable * conditionVariable) = 0;

/// Unset the information from attachCondition.
virtual void detachCondition() = 0;

/// Check if there is new data available for a specific event type.
/**
* \param event_type The event type to check on.
* \return `true` if new data is available.
*/
virtual bool hasEvent(rmw_event_type_t event_type) const = 0;

/// Take ready data for an event type.
/**
* \param event_type The event type to get data for.
* \param event_info A preallocated event information (from rmw/types.h) to fill with data
* \return `true` if data was successfully taken.
* \return `false` if data was not available, in this case nothing was written to event_info.
*/
virtual bool takeNextEvent(rmw_event_type_t event_type, void * event_info) = 0;
};
emersonknapp marked this conversation as resolved.
Show resolved Hide resolved

class EventListenerInterface::ConditionalScopedLock
{
public:
ConditionalScopedLock(
std::mutex * mutex,
std::condition_variable * condition_variable = nullptr)
: mutex_(mutex), cv_(condition_variable)
{
if (nullptr != mutex_) {
mutex_->lock();
}
}

~ConditionalScopedLock()
{
if (nullptr != mutex_) {
mutex_->unlock();
if (nullptr != cv_) {
cv_->notify_all();
}
}
}

private:
std::mutex * mutex_;
std::condition_variable * cv_;
};

struct CustomEventInfo
{
virtual EventListenerInterface * getListener() const = 0;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_
#define RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_

#include <atomic>
#include <mutex>
#include <condition_variable>
#include <set>

#include "fastrtps/publisher/Publisher.h"
Expand All @@ -25,29 +27,43 @@
#include "rmw/rmw.h"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"
#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp"


class PubListener;

typedef struct CustomPublisherInfo
typedef struct CustomPublisherInfo : public CustomEventInfo
{
virtual ~CustomPublisherInfo() = default;

eprosima::fastrtps::Publisher * publisher_;
PubListener * listener_;
rmw_fastrtps_shared_cpp::TypeSupport * type_support_;
rmw_gid_t publisher_gid;
const char * typesupport_identifier_;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
EventListenerInterface *
getListener() const final;
} CustomPublisherInfo;

class PubListener : public eprosima::fastrtps::PublisherListener
class PubListener : public EventListenerInterface, public eprosima::fastrtps::PublisherListener
emersonknapp marked this conversation as resolved.
Show resolved Hide resolved
{
public:
explicit PubListener(CustomPublisherInfo * info)
: deadline_changes_(false),
liveliness_changes_(false),
conditionMutex_(nullptr),
conditionVariable_(nullptr)
{
(void) info;
}

// PublisherListener implementation
RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
onPublicationMatched(
eprosima::fastrtps::Publisher * pub, eprosima::fastrtps::rtps::MatchingInfo & info)
eprosima::fastrtps::Publisher * pub, eprosima::fastrtps::rtps::MatchingInfo & info) final
{
(void) pub;
std::lock_guard<std::mutex> lock(internalMutex_);
Expand All @@ -58,16 +74,67 @@ class PubListener : public eprosima::fastrtps::PublisherListener
}
}

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_offered_deadline_missed(
eprosima::fastrtps::Publisher * publisher,
const eprosima::fastrtps::OfferedDeadlineMissedStatus & status) final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_liveliness_lost(
eprosima::fastrtps::Publisher * publisher,
const eprosima::fastrtps::LivelinessLostStatus & status) final;


// EventListenerInterface implementation
RMW_FASTRTPS_SHARED_CPP_PUBLIC
bool
hasEvent(rmw_event_type_t event_type) const final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
bool
takeNextEvent(rmw_event_type_t event_type, void * event_info) final;

// PubListener API
size_t subscriptionCount()
{
std::lock_guard<std::mutex> lock(internalMutex_);
return subscriptions_.size();
}

void
attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable)
{
std::lock_guard<std::mutex> lock(internalMutex_);
conditionMutex_ = conditionMutex;
conditionVariable_ = conditionVariable;
}

void
detachCondition()
{
std::lock_guard<std::mutex> lock(internalMutex_);
conditionMutex_ = nullptr;
conditionVariable_ = nullptr;
}

private:
std::mutex internalMutex_;
std::set<eprosima::fastrtps::rtps::GUID_t>
subscriptions_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
mutable std::mutex internalMutex_;

std::set<eprosima::fastrtps::rtps::GUID_t> subscriptions_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::atomic_bool deadline_changes_;
eprosima::fastrtps::OfferedDeadlineMissedStatus offered_deadline_missed_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::atomic_bool liveliness_changes_;
eprosima::fastrtps::LivelinessLostStatus liveliness_lost_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
emersonknapp marked this conversation as resolved.
Show resolved Hide resolved
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,47 @@

#include "rcpputils/thread_safety_annotations.hpp"

#include "rmw/impl/cpp/macros.hpp"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"
#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp"


class SubListener;

typedef struct CustomSubscriberInfo
typedef struct CustomSubscriberInfo : public CustomEventInfo
{
virtual ~CustomSubscriberInfo() = default;

eprosima::fastrtps::Subscriber * subscriber_;
SubListener * listener_;
rmw_fastrtps_shared_cpp::TypeSupport * type_support_;
const char * typesupport_identifier_;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
EventListenerInterface *
getListener() const final;
} CustomSubscriberInfo;

class SubListener : public eprosima::fastrtps::SubscriberListener
class SubListener : public EventListenerInterface, public eprosima::fastrtps::SubscriberListener
{
public:
explicit SubListener(CustomSubscriberInfo * info)
: data_(0),
conditionMutex_(nullptr), conditionVariable_(nullptr)
deadline_changes_(false),
liveliness_changes_(false),
conditionMutex_(nullptr),
conditionVariable_(nullptr)
{
// Field is not used right now
(void)info;
}

// SubscriberListener implementation
void
onSubscriptionMatched(
eprosima::fastrtps::Subscriber * sub, eprosima::fastrtps::rtps::MatchingInfo & info)
eprosima::fastrtps::Subscriber * /*sub*/, eprosima::fastrtps::rtps::MatchingInfo & info) final
{
(void)sub;

std::lock_guard<std::mutex> lock(internalMutex_);
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == info.status) {
publishers_.insert(info.remoteEndpointGuid);
Expand All @@ -64,23 +76,39 @@ class SubListener : public eprosima::fastrtps::SubscriberListener
}

void
onNewDataMessage(eprosima::fastrtps::Subscriber * sub)
onNewDataMessage(eprosima::fastrtps::Subscriber * sub) final
{
(void)sub;
std::lock_guard<std::mutex> lock(internalMutex_);

if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
// the change to data_ needs to be mutually exclusive with rmw_wait()
// which checks hasData() and decides if wait() needs to be called
data_ = sub->getUnreadCount();
clock.unlock();
conditionVariable_->notify_one();
} else {
data_ = sub->getUnreadCount();
}
// the change to liveliness_lost_count_ needs to be mutually exclusive with
// rmw_wait() which checks hasEvent() and decides if wait() needs to be called
ConditionalScopedLock clock(conditionMutex_, conditionVariable_);

data_.store(sub->getUnreadCount(), std::memory_order_relaxed);
}

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_requested_deadline_missed(
eprosima::fastrtps::Subscriber *,
const eprosima::fastrtps::RequestedDeadlineMissedStatus &) final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_liveliness_changed(
eprosima::fastrtps::Subscriber *,
const eprosima::fastrtps::LivelinessChangedStatus &) final;

// EventListenerInterface implementation
RMW_FASTRTPS_SHARED_CPP_PUBLIC
bool
hasEvent(rmw_event_type_t event_type) const final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
bool
takeNextEvent(rmw_event_type_t event_type, void * event_info) final;

// SubListener API
void
attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable)
{
Expand All @@ -98,22 +126,17 @@ class SubListener : public eprosima::fastrtps::SubscriberListener
}

bool
hasData()
hasData() const
{
return data_ > 0;
return data_.load(std::memory_order_relaxed) > 0;
}

void
data_taken(eprosima::fastrtps::Subscriber * sub)
{
std::lock_guard<std::mutex> lock(internalMutex_);

if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
data_ = sub->getUnreadCount();
} else {
data_ = sub->getUnreadCount();
}
ConditionalScopedLock clock(conditionMutex_);
data_.store(sub->getUnreadCount(), std::memory_order_relaxed);
}

size_t publisherCount()
Expand All @@ -123,8 +146,18 @@ class SubListener : public eprosima::fastrtps::SubscriberListener
}

private:
std::mutex internalMutex_;
mutable std::mutex internalMutex_;

std::atomic_size_t data_;

std::atomic_bool deadline_changes_;
eprosima::fastrtps::RequestedDeadlineMissedStatus requested_deadline_missed_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::atomic_bool liveliness_changes_;
eprosima::fastrtps::LivelinessChangedStatus liveliness_changed_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

Expand Down
Loading