diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index 84dd6ff38..a1260d356 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -191,7 +191,7 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); eprosima::fastdds::dds::RequestedIncompatibleQosStatus incompatible_qos_status_ - RCPPUTILS_TSA_GUARDED_BY(discovery_m_); + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); std::set publishers_ RCPPUTILS_TSA_GUARDED_BY( discovery_m_); diff --git a/rmw_fastrtps_shared_cpp/src/listener_thread.cpp b/rmw_fastrtps_shared_cpp/src/listener_thread.cpp index f998bfaca..cde9cae51 100644 --- a/rmw_fastrtps_shared_cpp/src/listener_thread.cpp +++ b/rmw_fastrtps_shared_cpp/src/listener_thread.cpp @@ -105,13 +105,12 @@ rmw_fastrtps_shared_cpp::join_listener_thread( return RMW_RET_OK; } -#define TERMINATE_THREAD(msg) \ +#define LOG_THREAD_FATAL_ERROR(msg) \ { \ RCUTILS_SAFE_FWRITE_TO_STDERR( \ RCUTILS_STRINGIFY(__FILE__) ":" RCUTILS_STRINGIFY(__function__) ":" \ RCUTILS_STRINGIFY(__LINE__) RCUTILS_STRINGIFY(msg) \ ": ros discovery info listener thread will shutdown ...\n"); \ - break; \ } void @@ -124,12 +123,10 @@ node_listener( auto common_context = static_cast(context->impl->common); // number of conditions of a subscription is 2 rmw_wait_set_t * wait_set = rmw_fastrtps_shared_cpp::__rmw_create_wait_set( - context->implementation_identifier, context, 2); + context->implementation_identifier, context, 2); if (nullptr == wait_set) { - RCUTILS_SAFE_FWRITE_TO_STDERR( \ - RCUTILS_STRINGIFY(__FILE__) ":" RCUTILS_STRINGIFY(__function__) ":" \ - RCUTILS_STRINGIFY(__LINE__) RCUTILS_STRINGIFY("failed to create waitset") \ - ": ros discovery info listener thread will shutdown ...\n"); + LOG_THREAD_FATAL_ERROR("failed to create waitset"); + return; } while (common_context->thread_is_running.load()) { assert(nullptr != common_context->sub); @@ -152,7 +149,8 @@ node_listener( wait_set, nullptr)) { - TERMINATE_THREAD("rmw_wait failed"); + LOG_THREAD_FATAL_ERROR("rmw_wait failed"); + break; } if (subscriptions_buffer[0]) { rmw_dds_common::msg::ParticipantEntitiesInfo msg; @@ -166,7 +164,8 @@ node_listener( &taken, nullptr)) { - TERMINATE_THREAD("__rmw_take failed"); + LOG_THREAD_FATAL_ERROR("__rmw_take failed"); + break; } if (taken) { if (std::memcmp( @@ -183,11 +182,8 @@ node_listener( } } if (RMW_RET_OK != rmw_fastrtps_shared_cpp::__rmw_destroy_wait_set( - context->implementation_identifier, wait_set)) + context->implementation_identifier, wait_set)) { - RCUTILS_SAFE_FWRITE_TO_STDERR( \ - RCUTILS_STRINGIFY(__FILE__) ":" RCUTILS_STRINGIFY(__function__) ":" \ - RCUTILS_STRINGIFY(__LINE__) RCUTILS_STRINGIFY("failed to destroy waitset") \ - ": ros discovery info listener thread will shutdown ...\n"); + LOG_THREAD_FATAL_ERROR("failed to destroy waitset"); } } diff --git a/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp b/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp index 8f916a419..46dc16369 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp @@ -63,7 +63,9 @@ __rmw_wait( void * data = subscriptions->subscribers[i]; auto custom_subscriber_info = static_cast(data); eprosima::fastdds::dds::SampleInfo sample_info; - skip_wait |= (ReturnCode_t::RETCODE_OK == custom_subscriber_info->data_reader_->get_first_untaken_info(&sample_info)); + skip_wait |= + (ReturnCode_t::RETCODE_OK == + custom_subscriber_info->data_reader_->get_first_untaken_info(&sample_info)); fastdds_wait_set->attach_condition( custom_subscriber_info->data_reader_->get_statuscondition()); } @@ -74,7 +76,9 @@ __rmw_wait( void * data = clients->clients[i]; auto custom_client_info = static_cast(data); eprosima::fastdds::dds::SampleInfo sample_info; - skip_wait |= (ReturnCode_t::RETCODE_OK == custom_client_info->response_reader_->get_first_untaken_info(&sample_info)); + skip_wait |= + (ReturnCode_t::RETCODE_OK == + custom_client_info->response_reader_->get_first_untaken_info(&sample_info)); fastdds_wait_set->attach_condition( custom_client_info->response_reader_->get_statuscondition()); } @@ -85,7 +89,9 @@ __rmw_wait( void * data = services->services[i]; auto custom_service_info = static_cast(data); eprosima::fastdds::dds::SampleInfo sample_info; - skip_wait |= (ReturnCode_t::RETCODE_OK == custom_service_info->request_reader_->get_first_untaken_info(&sample_info)); + skip_wait |= + (ReturnCode_t::RETCODE_OK == + custom_service_info->request_reader_->get_first_untaken_info(&sample_info)); fastdds_wait_set->attach_condition( custom_service_info->request_reader_->get_statuscondition()); } @@ -132,7 +138,9 @@ __rmw_wait( custom_subscriber_info->data_reader_->get_statuscondition(); fastdds_wait_set->detach_condition(status_condition); eprosima::fastdds::dds::SampleInfo sample_info; - if (ReturnCode_t::RETCODE_OK != custom_subscriber_info->data_reader_->get_first_untaken_info(&sample_info)) { + if (ReturnCode_t::RETCODE_OK != + custom_subscriber_info->data_reader_->get_first_untaken_info(&sample_info)) + { subscriptions->subscribers[i] = 0; } } @@ -146,7 +154,9 @@ __rmw_wait( custom_client_info->response_reader_->get_statuscondition(); fastdds_wait_set->detach_condition(status_condition); eprosima::fastdds::dds::SampleInfo sample_info; - if (ReturnCode_t::RETCODE_OK != custom_client_info->response_reader_->get_first_untaken_info(&sample_info)) { + if (ReturnCode_t::RETCODE_OK != + custom_client_info->response_reader_->get_first_untaken_info(&sample_info)) + { clients->clients[i] = 0; } } @@ -160,7 +170,9 @@ __rmw_wait( custom_service_info->request_reader_->get_statuscondition(); fastdds_wait_set->detach_condition(status_condition); eprosima::fastdds::dds::SampleInfo sample_info; - if (ReturnCode_t::RETCODE_OK != custom_service_info->request_reader_->get_first_untaken_info(&sample_info)) { + if (ReturnCode_t::RETCODE_OK != + custom_service_info->request_reader_->get_first_untaken_info(&sample_info)) + { services->services[i] = 0; } } @@ -178,8 +190,7 @@ __rmw_wait( &custom_event_info->get_listener()->event_guard[event->event_type]; bool active = false; - if (ReturnCode_t::RETCODE_OK == ret_code) - { + if (ReturnCode_t::RETCODE_OK == ret_code) { if (guard_condition->get_trigger_value()) { active = true; guard_condition->set_trigger_value(false); @@ -198,9 +209,8 @@ __rmw_wait( void * data = guard_conditions->guard_conditions[i]; auto condition = static_cast(data); fastdds_wait_set->detach_condition(*condition); - if (!condition->get_trigger_value()) - { - guard_conditions->guard_conditions[i] = 0; + if (!condition->get_trigger_value()) { + guard_conditions->guard_conditions[i] = 0; } condition->set_trigger_value(false); }