-
Notifications
You must be signed in to change notification settings - Fork 421
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/ipm with rclcpp serialized messages #1931
base: rolling
Are you sure you want to change the base?
Feature/ipm with rclcpp serialized messages #1931
Conversation
const auto & sub_ids = publisher_it->second; | ||
|
||
// check if (de)serialization is needed | ||
if (sub_ids.take_subscriptions[Indicies::ownership_other].size() + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the correct spelling be Indices
?
@@ -199,41 +254,40 @@ class IntraProcessManager | |||
"Calling do_intra_process_publish for invalid or no longer existing publisher id"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this check also here? Should this function be private so that the user can only call do_intra_process_publish
(which takes care of checking the publisher ID)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the method is needed from the callback, which needs public access
return true; | ||
} else { | ||
// The method is only allowed to be called with valid data (serialized case). | ||
throw std::runtime_error("nullptr provided for serialized inter-process communication"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: rather than having a huge if-else, I personally prefer to check the failure conditions at the top of the function such that the actual "body" is not within the if-else scope.
if (intraprocess_manager == nullptr || serialized_message == nullptr) {
throw std::runtime_error("nullptr provided for serialized inter-process communication");
return false;
}
..... everything else here ....
} | ||
SubscribedTypeUniquePtr message(ptr, subscribed_type_deleter_); | ||
|
||
detail::do_intra_process_publish_same_type<SubscribedType, ROSMessageType, Alloc, Deleter>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry, but I have been out of the loop with the recent changes to the intra-process manager, so maybe this question may be unrelated related to this PR.
This class is a subscription buffer: why does it call a "publish" function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from comment:
- Helper function to expose method of IntraProcessManager
- for publishing messages with same data type.
- This is needed as the publisher of serialized message is not aware of the subscribed
- data type. While the subscription has all needed information (MessageT, Allocator) to
- cast and deserialize the message. The type information is forwarded by the helper function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this helper function should be modified.
It currently does two separate things:
- create a message of a certain type
- invoke intra-process publish
IMO the helper should just take care of creating the message.
Then it should return it as output such that intra-process publish is invoked elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've splitted it into two methods (one for deserialization and one for forwarding). Yet, the message type is only known within this class so it has to stay here (alternative would be generating a lambda function).
@@ -119,6 +123,61 @@ class GenericSubscription : public rclcpp::SubscriptionBase | |||
options.event_callbacks.message_lost_callback, | |||
RCL_SUBSCRIPTION_MESSAGE_LOST); | |||
} | |||
|
|||
// Setup intra process publishing if requested. | |||
if (rclcpp::detail::resolve_use_intra_process(options, *node_base)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
most of this seems identical to what is already done in subscription.hpp
... can we avoid this duplication?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think generic subscprition is not needed after the changes anymore. But I wanted to minimize the code changes for the first PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to have this moved to the SubscriptionBase
class as a protected helper function.
I'm afraid of these going out of sync by the time a new PR is produced.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed GenericSubscprition in general to avoid duplicated code, perhaps this breaks with some existing code in other repositories
callback_([callback]( | ||
std::shared_ptr<const rclcpp::SerializedMessage> serialized_message, | ||
const rclcpp::MessageInfo & message_info) mutable { | ||
callback.dispatch(serialized_message, message_info); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the linter happy with this indentation? it feels strange that the body is on the same line as the arguments.
moreover, why is this mutable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mutable is needed to get a copy of "callback" to avoid memory issues
* The NodeT type only needs to have a method called get_node_topics_interface() | ||
* which returns a shared_ptr to a NodeTopicsInterface. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment was intended for the new function above.
// Extract the NodeTopicsInterface from the NodeT. | ||
using rclcpp::node_interfaces::get_node_topics_interface; | ||
auto node_topics = get_node_topics_interface(node); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to support the QoS overrides feature here too, via the parameters interface. Is there a reason this was omitted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right, done
rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp
Outdated
Show resolved
Hide resolved
rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp
Outdated
Show resolved
Hide resolved
const rclcpp::SerializedMessage * serialized_message, | ||
IntraProcessManager * intraprocess_manager, | ||
uint64_t intra_process_publisher_id, | ||
void * untyped_allocator, | ||
const std::vector<uint64_t> & take_ownership_subscriptions, | ||
const std::vector<uint64_t> & take_shared_subscriptions) = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you document these parameters in the docblock? Thanks :)
if constexpr (std::is_same<SubscribedType, | ||
rclcpp::SerializedMessage>::value || | ||
std::is_same<SubscribedType, rclcpp::SerializedMessage>::value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're checking the same condition twice here. Maybe you mean for one of these to check ROSMessageType
?
get_subscription_count() > get_intra_process_subscription_count(); | ||
|
||
if (intra_process_is_enabled_ && inter_process_publish_needed) { | ||
this->do_intra_process_ros_message_publish(std::move(msg)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to handle inter-process communication separately from intra-process here (similar to how we publish non-serialized messages)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for serialized message it's currently needed
719f26f
to
d08f3dd
Compare
updated to review |
|
||
return true; | ||
|
||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
double return statement
// The type support library should stay loaded, so it is stored in the GenericSubscription | ||
std::shared_ptr<rcpputils::SharedLibrary> ts_lib_; | ||
}; | ||
typedef rclcpp::Subscription<rclcpp::SerializedMessage> GenericSubscription; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
love this!
nitpick: if you happen to do other changes, please consider to replace typedef
with using
I think I processed all comments |
while testing I found two bugs which I fixed |
@alsora are further steps required from us? thank you |
6df6d15
to
fb485c7
Compare
7793029
to
c9a2120
Compare
What is missing for approval? Thank you 😄 |
907f836
to
60bfac4
Compare
Hi @DensoADAS, this is a 1-year old PR and it looks like it has been completely rebased and/or reworked. I'll try to do it this week. |
@alsora Did you have already time for the review? Thank you. |
any updates? |
Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
…base.hpp Signed-off-by: Joshua Hampp <[email protected]> Co-authored-by: Jacob Perron <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
…base.hpp Signed-off-by: Joshua Hampp <[email protected]> Co-authored-by: Jacob Perron <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
780f372
to
a8e279c
Compare
Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
…age and subscriber has a ros message type for intra process communication Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
…mmunication Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]> Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]>
…with IPM Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]>
Signed-off-by: Joshua Hampp <[email protected]>
Enabling publishing rclcpp::SerializedMessage for IPM (rcl_serialized_message_t are not supported for IPM).
Generic publisher can be created now with rclcpp::Publisherrclcpp::SerializedMessage. Therefore GenericPublisher is not needed anymore.
uses changes from: #1928