Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for content-filtered topics #12

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
29475a6
to support a feature of content filtered topic
Mar 23, 2021
e45098c
fix for unscritify
Mar 24, 2021
2d5903e
address review
Mar 25, 2021
f1a4217
continue to address review
Mar 26, 2021
cdea0df
use RMW_CONNEXT_LOG_ERROR* log
Mar 26, 2021
3f093b8
remove semicolon
Mar 26, 2021
e642c50
accessing loan_len in finalize need to be protected by mutex
Mar 29, 2021
7d6717f
make source code style consistent and update code structure
Mar 31, 2021
5571d49
add the lost `this->` and adjust location of a check statement
Apr 1, 2021
2f85b63
Pass pointer arguments as const
asorbini Apr 1, 2021
61a3bd5
Pass pointer arguments as const
asorbini Apr 1, 2021
6d065d1
Pass pointer arguments as const
asorbini Apr 1, 2021
11d653f
Pass pointer arguments as const
asorbini Apr 1, 2021
a297eaa
Pass pointer arguments as const
asorbini Apr 1, 2021
62eae2e
Pass pointer arguments as const
asorbini Apr 1, 2021
1c3ec51
Pass pointer arguments as const
asorbini Apr 1, 2021
d549fb1
Pass pointer arguments as const
asorbini Apr 1, 2021
80b0d9d
Pass pointer arguments as const
asorbini Apr 1, 2021
784bacf
Pass pointer arguments as const
asorbini Apr 1, 2021
f200086
Pass pointer arguments as const
asorbini Apr 1, 2021
1fa0c92
Pass pointer arguments as const
asorbini Apr 1, 2021
10f73bf
Pass pointer arguments as const
asorbini Apr 1, 2021
bcfe6a4
Pass pointer arguments as const
asorbini Apr 1, 2021
9869a73
Pass pointer arguments as const
asorbini Apr 1, 2021
f655944
Pass pointer arguments as const
asorbini Apr 1, 2021
836224a
Pass pointer arguments as const
asorbini Apr 1, 2021
2b72132
Pass pointer arguments as const
asorbini Apr 1, 2021
c56c7f9
Pass pointer arguments as const
asorbini Apr 1, 2021
b342ba6
Pass pointer arguments as const
asorbini Apr 1, 2021
a177eda
Pass pointer arguments as const
asorbini Apr 1, 2021
1d8d20a
Pass pointer arguments as const
asorbini Apr 1, 2021
ac63580
Pass pointer arguments as const
asorbini Apr 1, 2021
3cc9b05
Pass pointer arguments as const
asorbini Apr 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions rmw_connextdds/src/rmw_api_impl_ndds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,28 @@ rmw_subscription_get_actual_qos(
}


rmw_ret_t
rmw_subscription_set_cft_expression_parameters(
rmw_subscription_t * subscription,
const char * filter_expression,
const rcutils_string_array_t * expression_parameters)
{
return rmw_api_connextdds_subscription_set_cft_expression_parameters(
subscription, filter_expression, expression_parameters);
}


rmw_ret_t
rmw_subscription_get_cft_expression_parameters(
const rmw_subscription_t * subscription,
char ** filter_expression,
rcutils_string_array_t * expression_parameters)
{
return rmw_api_connextdds_subscription_get_cft_expression_parameters(
subscription, filter_expression, expression_parameters);
}


rmw_ret_t
rmw_destroy_subscription(
rmw_node_t * node,
Expand Down
13 changes: 13 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ rmw_connextdds_create_contentfilteredtopic(
DDS_Topic * const base_topic,
const char * const cft_name,
const char * const cft_filter,
const rcutils_string_array_t * const cft_expression_parameters,
DDS_TopicDescription ** const cft_out);

rmw_ret_t
Expand Down Expand Up @@ -207,6 +208,18 @@ rmw_connextdds_enable_security(
rmw_context_impl_t * const ctx,
DDS_DomainParticipantQos * const qos);

rmw_ret_t
rmw_connextdds_set_cft_filter_expression(
DDS_TopicDescription * const topic_desc,
const char * const filter_expression,
const rcutils_string_array_t * const expression_parameters);

rmw_ret_t
rmw_connextdds_get_cft_filter_expression(
DDS_TopicDescription * const topic_desc,
char ** const expr_out,
rcutils_string_array_t * const cft_params_out);

// Define some macro aliases for security-related properties
#ifndef DDS_SECURITY_PROPERTY_PREFIX
#define DDS_SECURITY_PROPERTY_PREFIX \
Expand Down
16 changes: 16 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,22 @@ rmw_api_connextdds_subscription_get_actual_qos(
rmw_qos_profile_t * qos);


RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_subscription_set_cft_expression_parameters(
rmw_subscription_t * subscription,
const char * filter_expression,
const rcutils_string_array_t * expression_parameters);


RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_subscription_get_cft_expression_parameters(
const rmw_subscription_t * subscription,
char ** filter_expression,
rcutils_string_array_t * expression_parameters);


RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_destroy_subscription(
Expand Down
55 changes: 50 additions & 5 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <string>
#include <vector>
#include <map>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <atomic>
Expand Down Expand Up @@ -273,6 +274,7 @@ class RMW_Connext_Subscriber
const rmw_qos_profile_t * const qos_policies,
const rmw_subscription_options_t * const subscriber_options,
const bool internal = false,
const rmw_node_t * const node = nullptr,
const RMW_Connext_MessageType msg_type = RMW_CONNEXT_MESSAGE_USERDATA,
const void * const intro_members = nullptr,
const bool intro_members_cpp = false,
Expand All @@ -281,7 +283,7 @@ class RMW_Connext_Subscriber
const char * const cft_filter = nullptr);

rmw_ret_t
finalize();
finalize(const bool reset_cft = false);

DDS_DataReader *
reader() const
Expand Down Expand Up @@ -353,7 +355,7 @@ class RMW_Connext_Subscriber
RMW_Connext_SubscriberStatusCondition *
condition()
{
return &this->status_condition;
return this->status_condition.get();
}

const rmw_gid_t * gid() const
Expand Down Expand Up @@ -396,7 +398,7 @@ class RMW_Connext_Subscriber
}

if (this->internal) {
return this->status_condition.trigger_loan_guard_condition(this->loan_len > 0);
return this->status_condition->trigger_loan_guard_condition(this->loan_len > 0);
}

return RMW_RET_OK;
Expand Down Expand Up @@ -437,6 +439,18 @@ class RMW_Connext_Subscriber
rmw_message_info_t * const message_info,
bool * const taken);

rmw_ret_t
set_cft_expression_parameters(
const char * const filter_expression,
const rcutils_string_array_t * const expression_parameters
);

rmw_ret_t
get_cft_expression_parameters(
char ** const filter_expression,
rcutils_string_array_t * const expression_parameters
);

bool
has_data()
{
Expand Down Expand Up @@ -467,6 +481,17 @@ class RMW_Connext_Subscriber
return this->dds_topic;
}

static std::string get_atomic_id()
{
static std::atomic_uint64_t id;
return std::to_string(id++);
}

bool is_cft_supported()
{
return nullptr != dds_topic_cft;
}

const bool internal;
const bool ignore_local;

Expand All @@ -478,12 +503,16 @@ class RMW_Connext_Subscriber
RMW_Connext_MessageTypeSupport * type_support;
rmw_gid_t ros_gid;
const bool created_topic;
RMW_Connext_SubscriberStatusCondition status_condition;
std::shared_ptr<RMW_Connext_SubscriberStatusCondition> status_condition;
RMW_Connext_UntypedSampleSeq loan_data;
DDS_SampleInfoSeq loan_info;
size_t loan_len;
size_t loan_next;
std::mutex loan_mutex;
std::mutex cft_mutex;
const rmw_node_t * const node;
rmw_qos_profile_t qos_policies;
rmw_subscription_options_t subscriber_options;

RMW_Connext_Subscriber(
rmw_context_impl_t * const ctx,
Expand All @@ -493,7 +522,23 @@ class RMW_Connext_Subscriber
const bool ignore_local,
const bool created_topic,
DDS_TopicDescription * const dds_topic_cft,
const bool internal);
const bool internal,
const rmw_node_t * const node,
const rmw_qos_profile_t * const qos_policies,
const rmw_subscription_options_t * const subscriber_options);

static
DDS_DataReader *
initialize_datareader(
rmw_context_impl_t * const ctx,
DDS_DomainParticipant * const dp,
DDS_Subscriber * const sub,
const std::string & fqtopic_name,
const rmw_qos_profile_t * const qos_policies,
RMW_Connext_MessageTypeSupport * const type_support,
const rmw_subscription_options_t * const subscriber_options,
const bool internal,
DDS_TopicDescription * const sub_topic);

friend class RMW_Connext_SubscriberStatusCondition;
};
Expand Down
16 changes: 10 additions & 6 deletions rmw_connextdds_common/src/common/rmw_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,16 @@ rmw_api_connextdds_take_event(
} else {
condition = RMW_Connext_Event::publisher(event_handle)->condition();
}
rmw_ret_t rc = condition->get_status(event_handle->event_type, event_info);
if (RMW_RET_OK != rc) {
RMW_CONNEXT_LOG_ERROR_SET("failed to get status from DDS entity")
return rc;
if (nullptr != condition) {
rmw_ret_t rc = condition->get_status(event_handle->event_type, event_info);
if (RMW_RET_OK != rc) {
RMW_CONNEXT_LOG_ERROR_SET("failed to get status from DDS entity")
return rc;
}

*taken = true;
} else {
RMW_CONNEXT_LOG_WARNING("condition was reset because of resetting cft")
}

*taken = true;
return RMW_RET_OK;
}
Loading