Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Santiago Paunovic <[email protected]>
  • Loading branch information
ivanpauno committed Jan 13, 2020
1 parent 5428599 commit a210c96
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 78 deletions.
40 changes: 18 additions & 22 deletions rmw_fastrtps_cpp/src/listener_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <cstring>
#include <thread>

#include "rcutils/logging_macros.h"
#include "rcutils/macros.h"

#include "rmw/allocators.h"
#include "rmw/error_handling.h"
Expand All @@ -37,8 +37,6 @@

using rmw_dds_common::operator<<;

static const char log_tag[] = "rmw_dds_common";

static
void
node_listener(rmw_context_t * context);
Expand All @@ -64,7 +62,10 @@ rmw_fastrtps_cpp::run_listener_thread(rmw_context_t * context)
common_context->thread_is_running.store(false);
if (common_context->listener_thread_gc) {
if (RMW_RET_OK != rmw_destroy_guard_condition(common_context->listener_thread_gc)) {
RCUTILS_LOG_ERROR_NAMED(log_tag, "Failed to destroy guard condition");
fprintf(
stderr,
RCUTILS_STRINGIFY(__FILE__) ":" RCUTILS_STRINGIFY(__function__) ":"
RCUTILS_STRINGIFY(__LINE__) ": Failed to destroy guard condition");
}
}
return RMW_RET_ERROR;
Expand Down Expand Up @@ -95,13 +96,15 @@ rmw_fastrtps_cpp::join_listener_thread(rmw_context_t * context)
return RMW_RET_OK;
}

static
void
terminate(const char * error_message)
{
RCUTILS_LOG_ERROR_NAMED(log_tag, "%s, terminating ...", error_message);
std::terminate();
}
#define TERMINATE(msg) \
do { \
fprintf( \
stderr, \
RCUTILS_STRINGIFY(__FILE__) ":" RCUTILS_STRINGIFY(__function__) ":" \
RCUTILS_STRINGIFY(__LINE__) RCUTILS_STRINGIFY(msg) ": %s, terminating ...", \
rmw_get_error_string().str); \
std::terminate(); \
} while (0)

void
node_listener(rmw_context_t * context)
Expand All @@ -124,7 +127,7 @@ node_listener(rmw_context_t * context)
// number of conditions of a subscription is 2
rmw_wait_set_t * wait_set = rmw_create_wait_set(context, 2);
if (nullptr == wait_set) {
terminate("failed to create wait set");
TERMINATE("failed to create wait set");
}
if (RMW_RET_OK != rmw_wait(
&subscriptions,
Expand All @@ -135,7 +138,7 @@ node_listener(rmw_context_t * context)
wait_set,
nullptr))
{
terminate("rmw_wait failed");
TERMINATE("rmw_wait failed");
}
if (subscriptions_buffer[0]) {
rmw_dds_common::msg::ParticipantEntitiesInfo msg;
Expand All @@ -146,7 +149,7 @@ node_listener(rmw_context_t * context)
&taken,
nullptr))
{
terminate("rmw_take failed");
TERMINATE("rmw_take failed");
}
if (taken) {
if (std::memcmp(
Expand All @@ -158,17 +161,10 @@ node_listener(rmw_context_t * context)
continue;
}
common_context->graph_cache.update_participant_entities(msg);
if (rcutils_logging_logger_is_enabled_for("rmw_dds_common",
RCUTILS_LOG_SEVERITY_DEBUG))
{
std::ostringstream ss;
ss << common_context->graph_cache;
RCUTILS_LOG_DEBUG_NAMED(log_tag, "%s", ss.str().c_str());
}
}
}
if (RMW_RET_OK != rmw_destroy_wait_set(wait_set)) {
terminate("failed to destroy wait set");
TERMINATE("failed to destroy wait set");
}
}
}
2 changes: 1 addition & 1 deletion rmw_fastrtps_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ rmw_create_subscription(
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.associate_reader(
info->subscription_gid, common_context->gid, node->name, node->namespace_);
info->subscription_gid_, common_context->gid, node->name, node->namespace_);
rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
common_context->pub,
Expand Down
2 changes: 1 addition & 1 deletion rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ create_subscription(
RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber");
goto fail;
}
info->subscription_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
info->subscription_gid_ = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->subscriber_->getGuid());
rmw_subscription = rmw_subscription_allocate();
if (!rmw_subscription) {
Expand Down
77 changes: 33 additions & 44 deletions rmw_fastrtps_dynamic_cpp/src/listener_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <cstring>
#include <thread>

#include "rcutils/logging_macros.h"
#include "rcutils/macros.h"

#include "rmw/allocators.h"
#include "rmw/error_handling.h"
Expand All @@ -37,8 +37,6 @@

using rmw_dds_common::operator<<;

static const char log_tag[] = "rmw_dds_common";

static
void
node_listener(rmw_context_t * context);
Expand All @@ -49,31 +47,28 @@ rmw_fastrtps_dynamic_cpp::run_listener_thread(rmw_context_t * context)
auto common_context = static_cast<rmw_dds_common::Context *>(context->impl->common);
common_context->thread_is_running.store(true);
common_context->listener_thread_gc = rmw_create_guard_condition(context);
auto clean = [ = ]() {
common_context->thread_is_running.store(false);
if (common_context->listener_thread_gc) {
if (RMW_RET_OK != rmw_destroy_guard_condition(common_context->listener_thread_gc)) {
RCUTILS_LOG_ERROR_NAMED(log_tag, "Failed to destroy guard condition");
}
}
};
if (!common_context->listener_thread_gc) {
if (common_context->listener_thread_gc) {
try {
common_context->listener_thread = std::thread(node_listener, context);
return RMW_RET_OK;
} catch (const std::exception & exc) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Failed to create std::thread: %s", exc.what());
} catch (...) {
RMW_SET_ERROR_MSG("Failed to create std::thread");
}
} else {
RMW_SET_ERROR_MSG("Failed to create guard condition");
clean();
return RMW_RET_ERROR;
}
try {
common_context->listener_thread = std::thread(node_listener, context);
} catch (const std::exception & exc) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("Failed to create std::thread: %s", exc.what());
clean();
return RMW_RET_ERROR;
} catch (...) {
RMW_SET_ERROR_MSG("Failed to create std::thread");
clean();
return RMW_RET_ERROR;
common_context->thread_is_running.store(false);
if (common_context->listener_thread_gc) {
if (RMW_RET_OK != rmw_destroy_guard_condition(common_context->listener_thread_gc)) {
fprintf(
stderr,
RCUTILS_STRINGIFY(__FILE__) ":" RCUTILS_STRINGIFY(__function__) ":"
RCUTILS_STRINGIFY(__LINE__) ": Failed to destroy guard condition");
}
}
return RMW_RET_OK;
return RMW_RET_ERROR;
}

rmw_ret_t
Expand Down Expand Up @@ -101,13 +96,15 @@ rmw_fastrtps_dynamic_cpp::join_listener_thread(rmw_context_t * context)
return RMW_RET_OK;
}

static
void
terminate(const char * error_message)
{
RCUTILS_LOG_ERROR_NAMED(log_tag, "%s, terminating ...", error_message);
std::terminate();
}
#define TERMINATE(msg) \
do { \
fprintf( \
stderr, \
RCUTILS_STRINGIFY(__FILE__) ":" RCUTILS_STRINGIFY(__function__) ":" \
RCUTILS_STRINGIFY(__LINE__) RCUTILS_STRINGIFY(msg) ": %s, terminating ...", \
rmw_get_error_string().str); \
std::terminate(); \
} while (0)

void
node_listener(rmw_context_t * context)
Expand All @@ -130,7 +127,7 @@ node_listener(rmw_context_t * context)
// number of conditions of a subscription is 2
rmw_wait_set_t * wait_set = rmw_create_wait_set(context, 2);
if (nullptr == wait_set) {
terminate("failed to create wait set");
TERMINATE("failed to create wait set");
}
if (RMW_RET_OK != rmw_wait(
&subscriptions,
Expand All @@ -141,7 +138,7 @@ node_listener(rmw_context_t * context)
wait_set,
nullptr))
{
terminate("rmw_wait failed");
TERMINATE("rmw_wait failed");
}
if (subscriptions_buffer[0]) {
rmw_dds_common::msg::ParticipantEntitiesInfo msg;
Expand All @@ -152,10 +149,9 @@ node_listener(rmw_context_t * context)
&taken,
nullptr))
{
terminate("rmw_take failed");
TERMINATE("rmw_take failed");
}
if (taken) {
// TODO(ivanpauno): Should the program be terminated if taken is false?
if (std::memcmp(
reinterpret_cast<char *>(common_context->gid.data),
reinterpret_cast<char *>(&msg.gid.data),
Expand All @@ -165,17 +161,10 @@ node_listener(rmw_context_t * context)
continue;
}
common_context->graph_cache.update_participant_entities(msg);
if (rcutils_logging_logger_is_enabled_for("rmw_dds_common",
RCUTILS_LOG_SEVERITY_DEBUG))
{
std::ostringstream ss;
ss << common_context->graph_cache;
RCUTILS_LOG_DEBUG_NAMED("rmw_fastrtps_dynamic_cpp", "%s", ss.str().c_str());
}
}
}
if (RMW_RET_OK != rmw_destroy_wait_set(wait_set)) {
terminate("failed to destroy wait set");
TERMINATE("failed to destroy wait set");
}
}
}
2 changes: 1 addition & 1 deletion rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ rmw_create_subscription(
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.associate_reader(
info->subscription_gid, common_context->gid, node->name, node->namespace_);
info->subscription_gid_, common_context->gid, node->name, node->namespace_);
rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
common_context->pub,
Expand Down
2 changes: 1 addition & 1 deletion rmw_fastrtps_dynamic_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ create_subscription(
RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber");
goto fail;
}
info->subscription_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
info->subscription_gid_ = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->subscriber_->getGuid());
rmw_subscription = rmw_subscription_allocate();
if (!rmw_subscription) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ struct CustomSubscriberInfo : public CustomEventInfo
eprosima::fastrtps::Subscriber * subscriber_;
SubListener * listener_;
rmw_fastrtps_shared_cpp::TypeSupport * type_support_;
rmw_gid_t subscription_gid;
rmw_gid_t subscription_gid_;
const char * typesupport_identifier_;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
Expand Down
6 changes: 6 additions & 0 deletions rmw_fastrtps_shared_cpp/src/demangle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,9 @@ _demangle_service_type_only(const std::string & dds_type_name)
std::string type_name = dds_type_name.substr(start, suffix_position - start);
return type_namespace + type_name;
}

std::string
_identity_demangle(const std::string & name)
{
return name;
}
5 changes: 5 additions & 0 deletions rmw_fastrtps_shared_cpp/src/demangle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ _demangle_service_reply_from_topic(const std::string & topic_name);
std::string
_demangle_service_type_only(const std::string & dds_type_name);

/// Used when ros names are not mangled.
std::string
_identity_demangle(const std::string & name);


using DemangleFunction = std::string (*)(const std::string &);
using MangleFunction = DemangleFunction;

Expand Down
5 changes: 2 additions & 3 deletions rmw_fastrtps_shared_cpp/src/rmw_node_info_and_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,9 @@ __rmw_get_topic_names_and_types_by_node(
}
auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);

DemangleFunction no_op{[](const std::string & x) {return x;}};
if (no_demangle) {
demangle_topic = no_op;
demangle_type = no_op;
demangle_topic = _identity_demangle;
demangle_type = _identity_demangle;
}

return get_names_and_types_by_node(
Expand Down
2 changes: 1 addition & 1 deletion rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ __rmw_destroy_subscription(
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.dissociate_reader(
info->subscription_gid, common_context->gid, node->name, node->namespace_);
info->subscription_gid_, common_context->gid, node->name, node->namespace_);
rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish(
identifier,
common_context->pub,
Expand Down
5 changes: 2 additions & 3 deletions rmw_fastrtps_shared_cpp/src/rmw_topic_names_and_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ __rmw_get_topic_names_and_types(

DemangleFunction demangle_topic = _demangle_ros_topic_from_topic;
DemangleFunction demangle_type = _demangle_if_ros_type;
DemangleFunction no_op{[](const std::string & x) {return x;}};

if (no_demangle) {
demangle_topic = no_op;
demangle_type = no_op;
demangle_topic = _identity_demangle;
demangle_type = _identity_demangle;
}
auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);

Expand Down

0 comments on commit a210c96

Please sign in to comment.