Skip to content

Commit

Permalink
Complete events support (#583)
Browse files Browse the repository at this point in the history
* Add SubListener::on_sample_lost empty stub.
* Process data inside listener callback.
* Add RMW_EVENT_MESSAGE_LOST to hasEvent and takeNextEvent.
* Add SubListener::on_requested_incompatible_qos empty stub.
* Process data inside listener callback.
* Add RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE to hasEvent and takeNextEvent.
* Translate policy id.
* Add PubListener::on_offered_incompatible_qos empty stub.
* Process data inside listener callback.
* Add RMW_EVENT_OFFERED_QOS_INCOMPATIBLE to hasEvent and takeNextEvent.
* Add incompatible qos events to supported list.
* Initialize added atomic_bool fields.

Signed-off-by: Miguel Company <[email protected]>
Co-authored-by: Chris Lalancette <[email protected]>
  • Loading branch information
MiguelCompany and clalancette authored Feb 22, 2022
1 parent 33c54b0 commit 66926c7
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds
explicit PubListener(CustomPublisherInfo * info)
: deadline_changes_(false),
liveliness_changes_(false),
incompatible_qos_changes_(false),
conditionMutex_(nullptr),
conditionVariable_(nullptr)
{
Expand Down Expand Up @@ -94,6 +95,11 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds
eprosima::fastdds::dds::DataWriter * writer,
const eprosima::fastdds::dds::LivelinessLostStatus & status) final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_offered_incompatible_qos(
eprosima::fastdds::dds::DataWriter *,
const eprosima::fastdds::dds::OfferedIncompatibleQosStatus &) final;

// EventListenerInterface implementation
RMW_FASTRTPS_SHARED_CPP_PUBLIC
Expand Down Expand Up @@ -141,6 +147,10 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds
eprosima::fastdds::dds::LivelinessLostStatus liveliness_lost_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::atomic_bool incompatible_qos_changes_;
eprosima::fastdds::dds::OfferedIncompatibleQosStatus incompatible_qos_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
: data_(false),
deadline_changes_(false),
liveliness_changes_(false),
sample_lost_changes_(false),
incompatible_qos_changes_(false),
conditionMutex_(nullptr),
conditionVariable_(nullptr)
{
Expand Down Expand Up @@ -112,6 +114,18 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
eprosima::fastdds::dds::DataReader *,
const eprosima::fastrtps::LivelinessChangedStatus &) final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_sample_lost(
eprosima::fastdds::dds::DataReader *,
const eprosima::fastdds::dds::SampleLostStatus &) final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_requested_incompatible_qos(
eprosima::fastdds::dds::DataReader *,
const eprosima::fastdds::dds::RequestedIncompatibleQosStatus &) final;

// EventListenerInterface implementation
RMW_FASTRTPS_SHARED_CPP_PUBLIC
bool
Expand Down Expand Up @@ -176,6 +190,14 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
eprosima::fastdds::dds::LivelinessChangedStatus liveliness_changed_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::atomic_bool sample_lost_changes_;
eprosima::fastdds::dds::SampleLostStatus sample_lost_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::atomic_bool incompatible_qos_changes_;
eprosima::fastdds::dds::RequestedIncompatibleQosStatus incompatible_qos_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

Expand Down
36 changes: 36 additions & 0 deletions rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
// limitations under the License.

#include "rmw_fastrtps_shared_cpp/custom_publisher_info.hpp"

#include "fastdds/dds/core/status/BaseStatus.hpp"
#include "fastdds/dds/core/status/DeadlineMissedStatus.hpp"

#include "event_helpers.hpp"
#include "types/event_types.hpp"

EventListenerInterface *
Expand Down Expand Up @@ -60,6 +63,25 @@ void PubListener::on_liveliness_lost(
liveliness_changes_.store(true, std::memory_order_relaxed);
}

void PubListener::on_offered_incompatible_qos(
eprosima::fastdds::dds::DataWriter * /* writer */,
const eprosima::fastdds::dds::OfferedIncompatibleQosStatus & status)
{
std::lock_guard<std::mutex> lock(internalMutex_);

// the change to incompatible_qos_status_ needs to be mutually exclusive with
// rmw_wait() which checks hasEvent() and decides if wait() needs to be called
ConditionalScopedLock clock(conditionMutex_, conditionVariable_);

// Assign absolute values
incompatible_qos_status_.last_policy_id = status.last_policy_id;
incompatible_qos_status_.total_count = status.total_count;
// Accumulate deltas
incompatible_qos_status_.total_count_change += status.total_count_change;

incompatible_qos_changes_.store(true, std::memory_order_relaxed);
}

bool PubListener::hasEvent(rmw_event_type_t event_type) const
{
assert(rmw_fastrtps_shared_cpp::internal::is_event_supported(event_type));
Expand All @@ -68,6 +90,8 @@ bool PubListener::hasEvent(rmw_event_type_t event_type) const
return liveliness_changes_.load(std::memory_order_relaxed);
case RMW_EVENT_OFFERED_DEADLINE_MISSED:
return deadline_changes_.load(std::memory_order_relaxed);
case RMW_EVENT_OFFERED_QOS_INCOMPATIBLE:
return incompatible_qos_changes_.load(std::memory_order_relaxed);
default:
break;
}
Expand Down Expand Up @@ -97,6 +121,18 @@ bool PubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info)
deadline_changes_.store(false, std::memory_order_relaxed);
}
break;
case RMW_EVENT_OFFERED_QOS_INCOMPATIBLE:
{
auto rmw_data = static_cast<rmw_requested_qos_incompatible_event_status_t *>(event_info);
rmw_data->total_count = incompatible_qos_status_.total_count;
rmw_data->total_count_change = incompatible_qos_status_.total_count_change;
rmw_data->last_policy_kind =
rmw_fastrtps_shared_cpp::internal::dds_qos_policy_to_rmw_qos_policy(
incompatible_qos_status_.last_policy_id);
incompatible_qos_status_.total_count_change = 0;
incompatible_qos_changes_.store(false, std::memory_order_relaxed);
}
break;
default:
return false;
}
Expand Down
63 changes: 63 additions & 0 deletions rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "fastdds/dds/core/status/DeadlineMissedStatus.hpp"
#include "fastdds/dds/core/status/LivelinessChangedStatus.hpp"

#include "event_helpers.hpp"
#include "types/event_types.hpp"

EventListenerInterface *
Expand Down Expand Up @@ -64,6 +65,43 @@ void SubListener::on_liveliness_changed(
liveliness_changes_.store(true, std::memory_order_relaxed);
}

void SubListener::on_sample_lost(
eprosima::fastdds::dds::DataReader * /* reader */,
const eprosima::fastdds::dds::SampleLostStatus & status)
{
std::lock_guard<std::mutex> lock(internalMutex_);

// the change to sample_lost_status_ needs to be mutually exclusive with
// rmw_wait() which checks hasEvent() and decides if wait() needs to be called
ConditionalScopedLock clock(conditionMutex_, conditionVariable_);

// Assign absolute values
sample_lost_status_.total_count = status.total_count;
// Accumulate deltas
sample_lost_status_.total_count_change += status.total_count_change;

sample_lost_changes_.store(true, std::memory_order_relaxed);
}

void SubListener::on_requested_incompatible_qos(
eprosima::fastdds::dds::DataReader * /* reader */,
const eprosima::fastdds::dds::RequestedIncompatibleQosStatus & status)
{
std::lock_guard<std::mutex> lock(internalMutex_);

// the change to incompatible_qos_status_ needs to be mutually exclusive with
// rmw_wait() which checks hasEvent() and decides if wait() needs to be called
ConditionalScopedLock clock(conditionMutex_, conditionVariable_);

// Assign absolute values
incompatible_qos_status_.last_policy_id = status.last_policy_id;
incompatible_qos_status_.total_count = status.total_count;
// Accumulate deltas
incompatible_qos_status_.total_count_change += status.total_count_change;

incompatible_qos_changes_.store(true, std::memory_order_relaxed);
}

bool SubListener::hasEvent(rmw_event_type_t event_type) const
{
assert(rmw_fastrtps_shared_cpp::internal::is_event_supported(event_type));
Expand All @@ -72,6 +110,10 @@ bool SubListener::hasEvent(rmw_event_type_t event_type) const
return liveliness_changes_.load(std::memory_order_relaxed);
case RMW_EVENT_REQUESTED_DEADLINE_MISSED:
return deadline_changes_.load(std::memory_order_relaxed);
case RMW_EVENT_MESSAGE_LOST:
return sample_lost_changes_.load(std::memory_order_relaxed);
case RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE:
return incompatible_qos_changes_.load(std::memory_order_relaxed);
default:
break;
}
Expand Down Expand Up @@ -104,6 +146,27 @@ bool SubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info)
deadline_changes_.store(false, std::memory_order_relaxed);
}
break;
case RMW_EVENT_MESSAGE_LOST:
{
auto rmw_data = static_cast<rmw_message_lost_status_t *>(event_info);
rmw_data->total_count = sample_lost_status_.total_count;
rmw_data->total_count_change = sample_lost_status_.total_count_change;
sample_lost_status_.total_count_change = 0;
sample_lost_changes_.store(false, std::memory_order_relaxed);
}
break;
case RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE:
{
auto rmw_data = static_cast<rmw_requested_qos_incompatible_event_status_t *>(event_info);
rmw_data->total_count = incompatible_qos_status_.total_count;
rmw_data->total_count_change = incompatible_qos_status_.total_count_change;
rmw_data->last_policy_kind =
rmw_fastrtps_shared_cpp::internal::dds_qos_policy_to_rmw_qos_policy(
incompatible_qos_status_.last_policy_id);
incompatible_qos_status_.total_count_change = 0;
incompatible_qos_changes_.store(false, std::memory_order_relaxed);
}
break;
default:
return false;
}
Expand Down
33 changes: 33 additions & 0 deletions rmw_fastrtps_shared_cpp/src/event_helpers.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2022 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef EVENT_HELPERS_HPP_
#define EVENT_HELPERS_HPP_

#include "fastdds/dds/core/policy/QosPolicies.hpp"

#include "rmw/qos_policy_kind.h"

namespace rmw_fastrtps_shared_cpp
{
namespace internal
{

rmw_qos_policy_kind_t dds_qos_policy_to_rmw_qos_policy(
eprosima::fastdds::dds::QosPolicyId_t policy_id);

} // namespace internal
} // namespace rmw_fastrtps_shared_cpp

#endif // EVENT_HELPERS_HPP_
29 changes: 28 additions & 1 deletion rmw_fastrtps_shared_cpp/src/rmw_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
#include "rmw/impl/cpp/macros.hpp"

#include "rmw_fastrtps_shared_cpp/rmw_common.hpp"
#include "event_helpers.hpp"
#include "types/event_types.hpp"

static const std::unordered_set<rmw_event_type_t> g_rmw_event_type_set{
RMW_EVENT_LIVELINESS_CHANGED,
RMW_EVENT_REQUESTED_DEADLINE_MISSED,
RMW_EVENT_LIVELINESS_LOST,
RMW_EVENT_OFFERED_DEADLINE_MISSED
RMW_EVENT_OFFERED_DEADLINE_MISSED,
RMW_EVENT_MESSAGE_LOST,
RMW_EVENT_OFFERED_QOS_INCOMPATIBLE,
RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE
};

namespace rmw_fastrtps_shared_cpp
Expand All @@ -36,6 +40,29 @@ bool is_event_supported(rmw_event_type_t event_type)
return g_rmw_event_type_set.count(event_type) == 1;
}

rmw_qos_policy_kind_t dds_qos_policy_to_rmw_qos_policy(
eprosima::fastdds::dds::QosPolicyId_t policy_id)
{
using eprosima::fastdds::dds::QosPolicyId_t;

switch (policy_id) {
case QosPolicyId_t::DURABILITY_QOS_POLICY_ID:
return RMW_QOS_POLICY_DURABILITY;
case QosPolicyId_t::DEADLINE_QOS_POLICY_ID:
return RMW_QOS_POLICY_DEADLINE;
case QosPolicyId_t::LIVELINESS_QOS_POLICY_ID:
return RMW_QOS_POLICY_LIVELINESS;
case QosPolicyId_t::RELIABILITY_QOS_POLICY_ID:
return RMW_QOS_POLICY_RELIABILITY;
case QosPolicyId_t::HISTORY_QOS_POLICY_ID:
return RMW_QOS_POLICY_HISTORY;
case QosPolicyId_t::LIFESPAN_QOS_POLICY_ID:
return RMW_QOS_POLICY_LIFESPAN;
default:
return RMW_QOS_POLICY_INVALID;
}
}

} // namespace internal

rmw_ret_t
Expand Down

0 comments on commit 66926c7

Please sign in to comment.