Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make service wait for response reader #390

Merged
merged 10 commits into from
Jun 17, 2020
1 change: 1 addition & 0 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ rmw_create_client(
}

info->writer_guid_ = info->request_publisher_->getGuid();
info->reader_guid_ = info->response_subscriber_->getGuid();

rmw_client = rmw_client_allocate();
if (!rmw_client) {
Expand Down
7 changes: 6 additions & 1 deletion rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,9 @@ rmw_create_service(
RMW_SET_ERROR_MSG("failed to get datawriter qos");
goto fail;
}
info->pub_listener_ = new ServicePubListener();
info->response_publisher_ =
Domain::createPublisher(participant, publisherParam, nullptr);
Domain::createPublisher(participant, publisherParam, info->pub_listener_);
if (!info->response_publisher_) {
RMW_SET_ERROR_MSG("create_service() could not create publisher");
goto fail;
Expand Down Expand Up @@ -255,6 +256,10 @@ rmw_create_service(
Domain::removePublisher(info->response_publisher_);
}

if (info->pub_listener_) {
delete info->pub_listener_;
}

if (info->request_subscriber_) {
rmw_gid_t gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_subscriber_->getGuid());
Expand Down
3 changes: 2 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,12 @@ rmw_create_client(
info->request_publisher_ =
Domain::createPublisher(participant, publisherParam, info->pub_listener_);
if (!info->request_publisher_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher");
RMW_SET_ERROR_MSG("create_client() could not create publisher");
goto fail;
}

info->writer_guid_ = info->request_publisher_->getGuid();
info->reader_guid_ = info->response_subscriber_->getGuid();

rmw_client = rmw_client_allocate();
if (!rmw_client) {
Expand Down
7 changes: 6 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,9 @@ rmw_create_service(
RMW_SET_ERROR_MSG("failed to get datawriter qos");
goto fail;
}
info->pub_listener_ = new ServicePubListener();
info->response_publisher_ =
Domain::createPublisher(participant, publisherParam, nullptr);
Domain::createPublisher(participant, publisherParam, info->pub_listener_);
if (!info->response_publisher_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher");
goto fail;
Expand Down Expand Up @@ -285,6 +286,10 @@ rmw_create_service(
Domain::removePublisher(info->response_publisher_);
}

if (info->pub_listener_) {
delete info->pub_listener_;
}

if (info->request_subscriber_) {
rmw_gid_t gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_subscriber_->getGuid());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ typedef struct CustomClientInfo
eprosima::fastrtps::Publisher * request_publisher_;
ClientListener * listener_;
eprosima::fastrtps::rtps::GUID_t writer_guid_;
eprosima::fastrtps::rtps::GUID_t reader_guid_;
eprosima::fastrtps::Participant * participant_;
const char * typesupport_identifier_;
ClientPubListener * pub_listener_;
Expand Down Expand Up @@ -88,7 +89,8 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
if (eprosima::fastrtps::rtps::ALIVE == response.sample_info_.sampleKind) {
response.sample_identity_ = response.sample_info_.related_sample_identity;

if (response.sample_identity_.writer_guid() == info_->writer_guid_) {
if (response.sample_identity_.writer_guid() == info_->reader_guid_ ||
response.sample_identity_.writer_guid() == info_->writer_guid_) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the linter doesn't accept that style (it the linter is happy, no change needed).

Suggested style:

        if (response.sample_identity_.writer_guid() == info_->reader_guid_ ||
          response.sample_identity_.writer_guid() == info_->writer_guid_)
        {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed on 13c540b

std::lock_guard<std::mutex> lock(internalMutex_);

if (conditionMutex_ != nullptr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

class ServiceListener;
class ServicePubListener;

typedef struct CustomServiceInfo
{
Expand All @@ -44,6 +45,7 @@ typedef struct CustomServiceInfo
eprosima::fastrtps::Subscriber * request_subscriber_;
eprosima::fastrtps::Publisher * response_publisher_;
ServiceListener * listener_;
ServicePubListener * pub_listener_;
eprosima::fastrtps::Participant * participant_;
const char * typesupport_identifier_;
} CustomServiceInfo;
Expand Down Expand Up @@ -84,6 +86,12 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
if (sub->takeNextData(&data, &request.sample_info_)) {
if (eprosima::fastrtps::rtps::ALIVE == request.sample_info_.sampleKind) {
request.sample_identity_ = request.sample_info_.sample_identity;
// Use response subscriber guid (on related_sample_identity) when present.
const eprosima::fastrtps::rtps::GUID_t& reader_guid =
request.sample_info_.related_sample_identity.writer_guid();
if (reader_guid != eprosima::fastrtps::rtps::GUID_t::unknown() ) {
request.sample_identity_.writer_guid() = reader_guid;
}

std::lock_guard<std::mutex> lock(internalMutex_);

Expand Down Expand Up @@ -159,4 +167,45 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
};

class ServicePubListener : public eprosima::fastrtps::PublisherListener
{
public:
explicit ServicePubListener() = default;

template< class Rep, class Period >
bool wait_for_subscription(
const eprosima::fastrtps::rtps::GUID_t & guid,
const std::chrono::duration<Rep, Period> & rel_time)
{
auto guid_is_present = [this, guid]() -> bool
{
return subscriptions_.find(guid) != subscriptions_.end();
};

std::unique_lock<std::mutex> lock(mutex_);
return cv_.wait_for(lock, rel_time, guid_is_present);
}

void onPublicationMatched(
eprosima::fastrtps::Publisher * pub,
eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
{
(void) pub;
std::lock_guard<std::mutex> lock(mutex_);
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) {
subscriptions_.insert(matchingInfo.remoteEndpointGuid);
} else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
subscriptions_.erase(matchingInfo.remoteEndpointGuid);
} else {
return;
}
cv_.notify_all();
}

private:
std::mutex mutex_;
std::set<eprosima::fastrtps::rtps::GUID_t> subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: prefer std::unordered_set

Copy link
Collaborator Author

@MiguelCompany MiguelCompany May 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 3c48b25. I had to add a hash function for GUID_t.

std::condition_variable cv_;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_
1 change: 1 addition & 0 deletions rmw_fastrtps_shared_cpp/src/rmw_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ __rmw_send_request(
data.is_cdr_buffer = false;
data.data = const_cast<void *>(ros_request);
data.impl = info->request_type_support_impl_;
wparams.related_sample_identity().writer_guid() = info->reader_guid_;
if (info->request_publisher_->write(&data, wparams)) {
returnedValue = RMW_RET_OK;
*sequence_id = ((int64_t)wparams.sample_identity().sequence_number().high) << 32 |
Expand Down
16 changes: 16 additions & 0 deletions rmw_fastrtps_shared_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ __rmw_send_response(
wparams.related_sample_identity().sequence_number().low =
(int32_t)(request_header->sequence_number & 0xFFFFFFFF);

// According to the list of possible entity kinds in section 9.3.1.2 of RTPS
// readers will have this bit on, while writers will not. We use this to know
// if the related guid is the request writer or the response reader.
constexpr uint8_t entity_id_is_reader_bit = 0x04;
Copy link
Contributor

@hidmic hidmic May 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MiguelCompany mind to add a TODO for a proper fix? I opened an #392 to track that work.

const eprosima::fastrtps::rtps::GUID_t & related_guid =
wparams.related_sample_identity().writer_guid();
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
if((related_guid.entityId.value[3] & entity_id_is_reader_bit) != 0)
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will cause a linter error

Suggested change
if((related_guid.entityId.value[3] & entity_id_is_reader_bit) != 0)
{
if ((related_guid.entityId.value[3] & entity_id_is_reader_bit) != 0) {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed on 13c540b

// Related guid is a reader, so it is the response subscription guid.
// Wait for the response writer to be matched with it.
if(!info->pub_listener_->wait_for_subscription(related_guid, std::chrono::milliseconds(100))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if(!info->pub_listener_->wait_for_subscription(related_guid, std::chrono::milliseconds(100))) {
if (!info->pub_listener_->wait_for_subscription(related_guid, std::chrono::milliseconds(100))) {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed on 13c540b

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rmw_send_response was not blocking, and it should be continue being not blocking.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we change this to 0 timeout? That way we at least know that it won't work and can react to it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the conversation below, I assume the code is ok as it is right now.

If this be changed to an indefinite block, please tell me and I'll do it

RMW_SET_ERROR_MSG("client will not receive response");
return RMW_RET_ERROR;
}
}

rmw_fastrtps_shared_cpp::SerializedData data;
data.is_cdr_buffer = false;
data.data = const_cast<void *>(ros_response);
Expand Down
3 changes: 3 additions & 0 deletions rmw_fastrtps_shared_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ __rmw_destroy_service(
if (info->response_publisher_ != nullptr) {
Domain::removePublisher(info->response_publisher_);
}
if (info->pub_listener_ != nullptr) {
delete info->pub_listener_;
}
if (info->listener_ != nullptr) {
delete info->listener_;
}
Expand Down