Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete events support #583

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds
eprosima::fastdds::dds::DataWriter * writer,
const eprosima::fastdds::dds::LivelinessLostStatus & status) final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_offered_incompatible_qos(
eprosima::fastdds::dds::DataWriter *,
const eprosima::fastdds::dds::OfferedIncompatibleQosStatus &) final;

// EventListenerInterface implementation
RMW_FASTRTPS_SHARED_CPP_PUBLIC
Expand Down Expand Up @@ -141,6 +146,10 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds
eprosima::fastdds::dds::LivelinessLostStatus liveliness_lost_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::atomic_bool incompatible_qos_changes_;
eprosima::fastdds::dds::OfferedIncompatibleQosStatus incompatible_qos_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,18 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
eprosima::fastdds::dds::DataReader *,
const eprosima::fastrtps::LivelinessChangedStatus &) final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_sample_lost(
eprosima::fastdds::dds::DataReader *,
const eprosima::fastdds::dds::SampleLostStatus &) final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_requested_incompatible_qos(
eprosima::fastdds::dds::DataReader *,
const eprosima::fastdds::dds::RequestedIncompatibleQosStatus &) final;

// EventListenerInterface implementation
RMW_FASTRTPS_SHARED_CPP_PUBLIC
bool
Expand Down Expand Up @@ -176,6 +188,14 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds
eprosima::fastdds::dds::LivelinessChangedStatus liveliness_changed_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::atomic_bool sample_lost_changes_;
eprosima::fastdds::dds::SampleLostStatus sample_lost_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::atomic_bool incompatible_qos_changes_;
eprosima::fastdds::dds::RequestedIncompatibleQosStatus incompatible_qos_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

Expand Down
36 changes: 36 additions & 0 deletions rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
// limitations under the License.

#include "rmw_fastrtps_shared_cpp/custom_publisher_info.hpp"

#include "fastdds/dds/core/status/BaseStatus.hpp"
#include "fastdds/dds/core/status/DeadlineMissedStatus.hpp"

#include "event_helpers.hpp"
#include "types/event_types.hpp"

EventListenerInterface *
Expand Down Expand Up @@ -60,6 +63,25 @@ void PubListener::on_liveliness_lost(
liveliness_changes_.store(true, std::memory_order_relaxed);
}

void PubListener::on_offered_incompatible_qos(
eprosima::fastdds::dds::DataWriter * /* writer */,
const eprosima::fastdds::dds::OfferedIncompatibleQosStatus & status)
{
std::lock_guard<std::mutex> lock(internalMutex_);

// the change to incompatible_qos_status_ needs to be mutually exclusive with
// rmw_wait() which checks hasEvent() and decides if wait() needs to be called
ConditionalScopedLock clock(conditionMutex_, conditionVariable_);

// Assign absolute values
incompatible_qos_status_.last_policy_id = status.last_policy_id;
incompatible_qos_status_.total_count = status.total_count;
// Accumulate deltas
incompatible_qos_status_.total_count_change += status.total_count_change;

incompatible_qos_changes_.store(true, std::memory_order_relaxed);
}

bool PubListener::hasEvent(rmw_event_type_t event_type) const
{
assert(rmw_fastrtps_shared_cpp::internal::is_event_supported(event_type));
Expand All @@ -68,6 +90,8 @@ bool PubListener::hasEvent(rmw_event_type_t event_type) const
return liveliness_changes_.load(std::memory_order_relaxed);
case RMW_EVENT_OFFERED_DEADLINE_MISSED:
return deadline_changes_.load(std::memory_order_relaxed);
case RMW_EVENT_OFFERED_QOS_INCOMPATIBLE:
return incompatible_qos_changes_.load(std::memory_order_relaxed);
default:
break;
}
Expand Down Expand Up @@ -97,6 +121,18 @@ bool PubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info)
deadline_changes_.store(false, std::memory_order_relaxed);
}
break;
case RMW_EVENT_OFFERED_QOS_INCOMPATIBLE:
{
auto rmw_data = static_cast<rmw_requested_qos_incompatible_event_status_t *>(event_info);
rmw_data->total_count = incompatible_qos_status_.total_count;
rmw_data->total_count_change = incompatible_qos_status_.total_count_change;
rmw_data->last_policy_kind =
rmw_fastrtps_shared_cpp::internal::dds_qos_policy_to_rmw_qos_policy(
incompatible_qos_status_.last_policy_id);
incompatible_qos_status_.total_count_change = 0;
incompatible_qos_changes_.store(false, std::memory_order_relaxed);
}
break;
default:
return false;
}
Expand Down
63 changes: 63 additions & 0 deletions rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "fastdds/dds/core/status/DeadlineMissedStatus.hpp"
#include "fastdds/dds/core/status/LivelinessChangedStatus.hpp"

#include "event_helpers.hpp"
#include "types/event_types.hpp"

EventListenerInterface *
Expand Down Expand Up @@ -64,6 +65,43 @@ void SubListener::on_liveliness_changed(
liveliness_changes_.store(true, std::memory_order_relaxed);
}

void SubListener::on_sample_lost(
eprosima::fastdds::dds::DataReader * /* reader */,
const eprosima::fastdds::dds::SampleLostStatus & status)
{
std::lock_guard<std::mutex> lock(internalMutex_);

// the change to sample_lost_status_ needs to be mutually exclusive with
// rmw_wait() which checks hasEvent() and decides if wait() needs to be called
ConditionalScopedLock clock(conditionMutex_, conditionVariable_);

// Assign absolute values
sample_lost_status_.total_count = status.total_count;
// Accumulate deltas
sample_lost_status_.total_count_change += status.total_count_change;

sample_lost_changes_.store(true, std::memory_order_relaxed);
}

void SubListener::on_requested_incompatible_qos(
eprosima::fastdds::dds::DataReader * /* reader */,
const eprosima::fastdds::dds::RequestedIncompatibleQosStatus & status)
{
std::lock_guard<std::mutex> lock(internalMutex_);

// the change to incompatible_qos_status_ needs to be mutually exclusive with
// rmw_wait() which checks hasEvent() and decides if wait() needs to be called
ConditionalScopedLock clock(conditionMutex_, conditionVariable_);

// Assign absolute values
incompatible_qos_status_.last_policy_id = status.last_policy_id;
incompatible_qos_status_.total_count = status.total_count;
// Accumulate deltas
incompatible_qos_status_.total_count_change += status.total_count_change;

incompatible_qos_changes_.store(true, std::memory_order_relaxed);
}

bool SubListener::hasEvent(rmw_event_type_t event_type) const
{
assert(rmw_fastrtps_shared_cpp::internal::is_event_supported(event_type));
Expand All @@ -72,6 +110,10 @@ bool SubListener::hasEvent(rmw_event_type_t event_type) const
return liveliness_changes_.load(std::memory_order_relaxed);
case RMW_EVENT_REQUESTED_DEADLINE_MISSED:
return deadline_changes_.load(std::memory_order_relaxed);
case RMW_EVENT_MESSAGE_LOST:
return sample_lost_changes_.load(std::memory_order_relaxed);
case RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE:
return incompatible_qos_changes_.load(std::memory_order_relaxed);
default:
break;
}
Expand Down Expand Up @@ -104,6 +146,27 @@ bool SubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info)
deadline_changes_.store(false, std::memory_order_relaxed);
}
break;
case RMW_EVENT_MESSAGE_LOST:
{
auto rmw_data = static_cast<rmw_message_lost_status_t *>(event_info);
rmw_data->total_count = sample_lost_status_.total_count;
rmw_data->total_count_change = sample_lost_status_.total_count_change;
sample_lost_status_.total_count_change = 0;
sample_lost_changes_.store(false, std::memory_order_relaxed);
}
break;
case RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE:
{
auto rmw_data = static_cast<rmw_requested_qos_incompatible_event_status_t *>(event_info);
rmw_data->total_count = incompatible_qos_status_.total_count;
rmw_data->total_count_change = incompatible_qos_status_.total_count_change;
rmw_data->last_policy_kind =
rmw_fastrtps_shared_cpp::internal::dds_qos_policy_to_rmw_qos_policy(
incompatible_qos_status_.last_policy_id);
incompatible_qos_status_.total_count_change = 0;
incompatible_qos_changes_.store(false, std::memory_order_relaxed);
}
break;
default:
return false;
}
Expand Down
33 changes: 33 additions & 0 deletions rmw_fastrtps_shared_cpp/src/event_helpers.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2022 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef EVENT_HELPERS_HPP_
#define EVENT_HELPERS_HPP_

#include "fastdds/dds/core/policy/QosPolicies.hpp"

#include "rmw/qos_policy_kind.h"

namespace rmw_fastrtps_shared_cpp
{
namespace internal
{

rmw_qos_policy_kind_t dds_qos_policy_to_rmw_qos_policy(
eprosima::fastdds::dds::QosPolicyId_t policy_id);

} // namespace internal
} // namespace rmw_fastrtps_shared_cpp

#endif // EVENT_HELPERS_HPP_
29 changes: 28 additions & 1 deletion rmw_fastrtps_shared_cpp/src/rmw_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
#include "rmw/impl/cpp/macros.hpp"

#include "rmw_fastrtps_shared_cpp/rmw_common.hpp"
#include "event_helpers.hpp"
#include "types/event_types.hpp"

static const std::unordered_set<rmw_event_type_t> g_rmw_event_type_set{
RMW_EVENT_LIVELINESS_CHANGED,
RMW_EVENT_REQUESTED_DEADLINE_MISSED,
RMW_EVENT_LIVELINESS_LOST,
RMW_EVENT_OFFERED_DEADLINE_MISSED
RMW_EVENT_OFFERED_DEADLINE_MISSED,
RMW_EVENT_MESSAGE_LOST,
RMW_EVENT_OFFERED_QOS_INCOMPATIBLE,
RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE
};

namespace rmw_fastrtps_shared_cpp
Expand All @@ -36,6 +40,29 @@ bool is_event_supported(rmw_event_type_t event_type)
return g_rmw_event_type_set.count(event_type) == 1;
}

rmw_qos_policy_kind_t dds_qos_policy_to_rmw_qos_policy(
eprosima::fastdds::dds::QosPolicyId_t policy_id)
{
using eprosima::fastdds::dds::QosPolicyId_t;

switch (policy_id) {
case QosPolicyId_t::DURABILITY_QOS_POLICY_ID:
return RMW_QOS_POLICY_DURABILITY;
case QosPolicyId_t::DEADLINE_QOS_POLICY_ID:
return RMW_QOS_POLICY_DEADLINE;
case QosPolicyId_t::LIVELINESS_QOS_POLICY_ID:
return RMW_QOS_POLICY_LIVELINESS;
case QosPolicyId_t::RELIABILITY_QOS_POLICY_ID:
return RMW_QOS_POLICY_RELIABILITY;
case QosPolicyId_t::HISTORY_QOS_POLICY_ID:
return RMW_QOS_POLICY_HISTORY;
case QosPolicyId_t::LIFESPAN_QOS_POLICY_ID:
return RMW_QOS_POLICY_LIFESPAN;
default:
return RMW_QOS_POLICY_INVALID;
}
}

} // namespace internal

rmw_ret_t
Expand Down