-
Notifications
You must be signed in to change notification settings - Fork 117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Fast-DDS Waitsets instead of listeners #619
Use Fast-DDS Waitsets instead of listeners #619
Conversation
d49bf2e
to
8b23ff6
Compare
@clalancette Could you run a CI job against this PR, please? |
@@ -191,6 +116,11 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener | |||
info_->response_subscriber_matched_count_.store(publishers_.size()); | |||
} | |||
|
|||
size_t get_unread_responses() | |||
{ | |||
return info_->response_reader_->get_unread_count(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't recommend this. This change could take into account same samples repeatly if they were not read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@richiware Please see this discussion from which I understand the correct implementation is to return the total number of unread changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great to have some input from @alsora and/or @mauropasse on this, BTW
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MiguelCompany @richiware our previous implementation relied simply on a counter to check unread events, instead of asking the DDS about unread messages. For example:
void on_data_available(eprosima::fastdds::dds::DataReader * reader)
{
if (on_new_message_cb_) {
on_new_message_cb_(user_data_, 1);
} else {
new_data_unread_count_++;
}
}
// Then when setting the user callback:
void set_on_new_message_callback(const void * user_data, rmw_event_callback_t callback)
{
if (new_data_unread_count_) {
callback(user_data, new_data_unread_count_);
new_data_unread_count_ = 0;
}
}
The reason for having a different new_data_unread_count_
than the unread samples on the DDS queue, is that this function (set_on_new_message_callback
) is sometimes called very fast twice in a row. With our approach:
First time: callback(user_data, new_data_unread_count_);
Second time: the callback is not called, since new_data_unread_count_ = 0
.
With your approach here, calling info_->response_reader_->get_unread_count();
could potentially (if set_on_new_message_callback
is called many times fast) lead to problems.
In our case, every call of the callback(...)
pushes events into a queue, which are then processed (messages are read). With your approach here, the queue could potentially have dulicated/triplicated/.. events per unread sample, if we didn't have time to process them between calls.
I'd stay with the approach taken in this PR: #625
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mauropasse The thing is that there is no guarantee that on_data_available
is called once per sample.
Another thing to consider is the case where you have KEEP_LAST with a depth of 1. As explained on this comment, there is always a possibility that some samples that were notified to the callback will not actually be read by the application layer because of QoS settings (mainly KEEP_LAST history). The samples could be counted but then be pushed out of the cache by newer samples before they can be take()n out of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the KEEP_LAST, we took that into account and we limited the amount of new_data_unread_count_
to be as big as the QoS history depth.
In case of multiple events but a single sample on DDS queue (due history = 1), we have means to mitigate it with a "bounded queue" on the executor, which checks the QoS depth before pushing new events into the queue.
In the normal queue, not performing this check, we just have "no op" messages taken (no messages taken).
With regards to the on_data_available
not being called once per sample, this issue would affect also the default SingleThreadedExecutor
(waitset not awaken since on_data_available
is not called), and I don't have an answer for that issue right now. It has never happened before to me and I didn't know there was no guarantee for on_data_available
called once per sample.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about using
return info_->response_reader_->get_unread_count(true);
? (i.e. mark_as_read = true)
and keeping a count here?
does marking samples as read affect how take()
work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@richiware @MiguelCompany was this issue resolved?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it was. We did exactly what you mention, see here
RCPPUTILS_TSA_GUARDED_BY( | ||
discovery_m_); | ||
|
||
std::mutex discovery_m_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would move this up, before the declaration of subscriptions_
.
I also think we should make this mutable, in case we need to take it inside a const method in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in ec7a576
|
||
std::set<eprosima::fastrtps::rtps::GUID_t> subscriptions_ | ||
RCPPUTILS_TSA_GUARDED_BY(internalMutex_); | ||
RCPPUTILS_TSA_GUARDED_BY( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep this on a single line. Applies to all the RCPPUTILS_TSA_GUARDED_BY(
below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in ec7a576
|
||
std::atomic_bool deadline_changes_; | ||
bool deadline_changes_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either keep this as an atomic_bool
, or mark it with RCPPUTILS_TSA_GUARDED_BY
.
Applies to all booleans below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 850eeeb
subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY( | ||
mutex_); | ||
clients_endpoints_map_t clients_endpoints_ RCPPUTILS_TSA_GUARDED_BY( | ||
mutex_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY( | |
mutex_); | |
clients_endpoints_map_t clients_endpoints_ RCPPUTILS_TSA_GUARDED_BY( | |
mutex_); | |
subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); | |
clients_endpoints_map_t clients_endpoints_ RCPPUTILS_TSA_GUARDED_BY(mutex_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in ec7a576
private: | ||
CustomSubscriberInfo * subscriber_info_ = nullptr; | ||
|
||
bool deadline_changes_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either keep booleans as atomic_bool
or add RCPPUTILS_TSA_GUARDED_BY
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 850eeeb
|
||
if (subscriptions) { | ||
for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { | ||
void * data = subscriptions->subscribers[i]; | ||
auto custom_subscriber_info = static_cast<CustomSubscriberInfo *>(data); | ||
custom_subscriber_info->listener_->attachCondition(conditionMutex, conditionVariable); | ||
no_has_wait |= (0 < custom_subscriber_info->data_reader_->get_unread_count()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use RETCODE_OK == data_reader_->get_first_untaken_info()
, which means the reader has something to take.
Same for the rest of get_unread_count
below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 5ff6c5e
if (!changed_statuses.is_active(eprosima::fastdds::dds::StatusMask::data_available())) { | ||
subscriptions->subscribers[i] = 0; | ||
} | ||
} else if (0 == custom_subscriber_info->data_reader_->get_unread_count()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use get_first_untaken_info
instead of get_unread_count
Applies to services and clients as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 5ff6c5e
return c == condition; | ||
})) | ||
{ | ||
if (!condition->get_trigger_value()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to check if the condition has been triggered while waiting. Just this check is enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 5ff6c5e
e71cbd2
to
1d8e860
Compare
@jsantiago-eProsima Please fix linters |
eprosima::fastdds::dds::RequestedIncompatibleQosStatus incompatible_qos_status_ | ||
RCPPUTILS_TSA_GUARDED_BY( | ||
discovery_m_); | ||
RCPPUTILS_TSA_GUARDED_BY(discovery_m_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RCPPUTILS_TSA_GUARDED_BY(discovery_m_); | |
RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 5ff6c5e
RCUTILS_SAFE_FWRITE_TO_STDERR( \ | ||
RCUTILS_STRINGIFY(__FILE__) ":" RCUTILS_STRINGIFY(__function__) ":" \ | ||
RCUTILS_STRINGIFY(__LINE__) RCUTILS_STRINGIFY("failed to create waitset") \ | ||
": ros discovery info listener thread will shutdown ...\n"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to have a TERMINATE_THREAD_WITH_RETURN, or remove the break;
from TERMINATE_THREAD so it can be used inside and outside the loop. In the latter case I would rename it to something like LOG_THREAD_FATAL_ERROR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good to me.
Would be great if a maintainer takes another look and launches a CI
Friendly ping for a CI run @clalancette @nuclearsandwich, I think this is highly relevant CC: @SteveMacenski |
e6ffa30
to
bbe51b6
Compare
052b97e
to
e9aac70
Compare
@alsora According to our local testing, this should now be good to go. |
1ed2199
to
3f796a2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but DCO needs to be fixed
Signed-off-by: Ricardo González Moreno <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>
@fujitatomoya @ivanpauno I think we've addressed all comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with green CI
@richiware thanks! CI: |
The failure on
is unrelated to this pull request (it's flaky). In order to test it we built ros2 rolling from sources in two docker Basically, the test launches three talker nodes and waits till all are discovered (oddly it checks discovery twice). The test can be run directly from the docker images doing:
I cannot explain why it appears in two consecutive ci runs. |
CI failure are unrelated (see #619 (comment)), i will go ahead to merge this. |
@richiware @MiguelCompany should we backport this to humble? |
Yes. We should. Let's try: @Mergifyio backport humble |
Signed-off-by: Miguel Company <[email protected]>
@fujitatomoya I just created the backport to humble on #633 It is basically a cherry-pick of 1dcfe80 into humble |
Signed-off-by: Miguel Company <[email protected]>
Signed-off-by: Miguel Company <[email protected]>
Signed-off-by: Miguel Company <[email protected]>
Signed-off-by: Miguel Company <[email protected]>
Signed-off-by: Miguel Company <[email protected]>
Signed-off-by: Miguel Company <[email protected]> Signed-off-by: Miguel Company <[email protected]> Co-authored-by: Ricardo González <[email protected]>
Since version 2.4.0, Fast-DDS supports Waitsets. This PR brings new Fast-DDS Waitsets into
rmw_fastrtps
. The main motivation is to fix bugs related to the current "waitset" mechanism implemented on top of Fast-DDS listeners, like #613.List of functionalities to be supported: