Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reused shared memory with PREALLOCATED_WITH_REALLOC #783

Open
mauropasse opened this issue Oct 2, 2024 · 18 comments
Open

Reused shared memory with PREALLOCATED_WITH_REALLOC #783

mauropasse opened this issue Oct 2, 2024 · 18 comments
Assignees

Comments

@mauropasse
Copy link
Contributor

On the PR ros2/rclcpp#2624 I provide a fix to allow the user to make copies of loaned messages, keeping them beyond the subscription callback scope. With the fix, the loaned message is returned to the DDS when the user's copy of the message goes out of scope.

In the PR I add a unit test to verify that memory is not reused until all entities have returned the loan.
The tests fully pass when history memory policy is set as DYNAMIC_REUSABLE, but fails when set as PREALLOCATED_WITH_REALLOC:

[  FAILED  ] 2 tests, listed below:
[  FAILED  ] TestSharedMemory.user_is_last_loan_owner
[  FAILED  ] TestSharedMemory.sub_is_last_loan_owner

The failures mean that a user is provided with loaned memory which has been previously provided, and is still in use:

  • Case 1: The user keeps copies (loaned not returned)
  • Case 2: A not-spinning subscription is keeping copies in its internal buffer.

While a temporary solution would be to only use DYNAMIC_REUSABLE on shared memory, this mode has the issue that a undesired copy is performed on a multi-process system, as I describe in #782

So in short:

PREALLOCATED_WITH_REALLOC: Issues shared memory still in use
DYNAMIC_REUSABLE: A copy of the message happens on a multi-process system

Do you think this is a bug, or the use of PREALLOCATED_WITH_REALLOC inherently breaks proper shared memory management?

@mauropasse mauropasse changed the title Reused shared memory with PREALLOCATED_WITH_REALLOC Reused shared memory with PREALLOCATED_WITH_REALLOC Oct 2, 2024
@fujitatomoya fujitatomoya self-assigned this Oct 4, 2024
@mauropasse
Copy link
Contributor Author

mauropasse commented Oct 7, 2024

These failures can be explained by the documentation in eProsima's datasharing-delivery-constraints:

Using Data-sharing mechanism, the DataWriter’s history is shared with the DataReaders.
This means that the effective HistoryQos depth on the DataReader is, at most, 
the Datawriter’s HistoryQos depth.

To avoid confusions, set the DataReaders’ history depth to a value equal or less than the DataWriter’s.

In the failure [ FAILED ] TestSharedMemory.sub_is_last_loan_owner, the subscription has set a QoS depth larger than the publisher depth, so the failure seems kind of expected.

My question is, should the subscription/reader log some warning in this case? Like some QoS mismatch event:
[WARNING]: HistoryQos depth on the DataReader should be equal or lesser than the DataWriter.

The goal would be for the user for somehow to know that if he wants shared memory to safely handle memory (i.e. its stored messages won't silently be overridden):

  • Subscription depth should be <= Publisher depth
  • User N copies should be <= Publisher depth

Maybe this makes sense to just be mentioned on the ROS2 documentation about shared memory & loaned messages.

@mauropasse
Copy link
Contributor Author

Currently the user can still ask for Loaned Messages even if all the shared memory has been exhausted, like when the previous loaned messages hasn't been returned (either because the user has copies, or subscriptions has them in its buffers).

In this case, the user is provided with memory still in use, which will silently override user/subscription stored messages.

I think the right behavior would be for the publisher to throw when the user requests loans and memory has been exhausted. What do you think?

@fujitatomoya
Copy link
Collaborator

My question is, should the subscription/reader log some warning in this case? Like some QoS mismatch event:
[WARNING]: HistoryQos depth on the DataReader should be equal or lesser than the DataWriter.

IMO, sounds reasonable if they are doing data-sharing. and this is probably request for Fast-DDS, and we (ROS 2) can catch the QoS incompatibility event from rmw implementation?

In this case, the user is provided with memory still in use, which will silently override user/subscription stored messages.
I think the right behavior would be for the publisher to throw when the user requests loans and memory has been exhausted.

IMO this depends on the application requirement.

  • If publisher stops publishing because the memory blocks are not returned by user, all the subscription waiting for the next messages and sub-system will stop immediately. if the 3rd party application put a hold on the shared memory pointer, they can stop the entire system. for me this is more like security issue. because of this, i am not sure stopping publishing would be the right way to take, including throwing exception.
  • maybe publisher goes on publishing the data to overwrite the data? but there is a possibility that subscriber reads the corrupted data or overwritten one updated already by publisher. i think there has been a discussion on is_valid API to verify if the data is still intact on subscriber side, but not sure if this comes from DDS or vendor specific API. (I guess that is not the part of DDS, but vendor API can check SampleIfo.valid_data flag?) i think that this is_valid API would be useful for this case.
  • Give some time to wait until the shared memory is released in the configuration. like https://fast-dds.docs.eprosima.com/en/latest/fastdds/transport/datasharing.html#blocking-reuse-of-samples-until-acknowledged
  • after all, most safest way is currently implemented that disabling loaning the memory on subscriptions.

CC: @MiguelCompany @wjwwood @clalancette @alsora

@MiguelCompany
Copy link
Collaborator

@mauropasse Currently, the only way to make the publisher block till the shared payload is not being used is using RELIABLE, and KEEP_ALL.

You can also increase extra_samples in ResourceLimitsQosPolicy to make the pool bigger (for instance, having extra_samples = depth would make the pool twice the depth).

One thing that could be done is adding a warning when a sample is marked for reuse without it being acknowledged. There is this callback in the DataWriterListener that could be of use.

My question is, should the subscription/reader log some warning in this case? Like some QoS mismatch event:
[WARNING]: HistoryQos depth on the DataReader should be equal or lesser than the DataWriter.

The problem with this is that HistoryQos is not transmitted in discovery.

@mauropasse
Copy link
Contributor Author

Great information @MiguelCompany. I'll explore the options!

@mauropasse
Copy link
Contributor Author

@MiguelCompany I tested these scenarios based on your suggestions:

  • KEEP_ALL on publisher: Shared memory isn't reused, even if loans were returned by the sub/user (since the publisher still keeps the loans). No memory corruption or duplicates detected, so it serves the purpose (but might not be the most performant option due possible dynamic allocations?)
  • extra_samples: Works well; it extends the shared memory pool beyond the pub QoS depth. This option lets subs/users hold additional samples.

One thing that could be done is adding a warning when a sample is marked for reuse without it being acknowledged.

  • I tested RELIABLE with on_unacknowledged_sample_removed: It didn't trigger, as the DDS sub likely sent the ACK back to pub even when loans weren’t returned? (subscription not spinning, storing messages). I overrided the API on custom_publisher_info.hpp: CustomDataWriterListener adding a warning, but it didn't show anything.

If the subscription is not spinning but it receives the loaned messages, I guess the ACK is still sent in DDS? So the on_unacknowledged_sample_removed wouldn't be useful in this case?
Thanks!

@MiguelCompany
Copy link
Collaborator

  • KEEP_ALL on publisher: Shared memory isn't reused, even if loans were returned by the sub/user (since the publisher still keeps the loans). No memory corruption or duplicates detected, so it serves the purpose (but might not be the most performant option due possible dynamic allocations?)

No dynamic allocations here. The only possible issue is that, since the publisher is blocked until the oldest sample is acknowledged (or reliability.max_blocking_time elapses), it could hinder the publication rate.

  • I tested RELIABLE with on_unacknowledged_sample_removed: It didn't trigger, as the DDS sub likely sent the ACK back to pub even when loans weren’t returned? (subscription not spinning, storing messages). I overrided the API on custom_publisher_info.hpp: CustomDataWriterListener adding a warning, but it didn't show anything.

That is awkward. Was the reader also RELIABLE? Samples are automatically acknowledged for BEST_EFFORT readers.
Just in case, could you check again with the changes on this commit? I just added an error log when the writer detects the situation of a sample being removed without acknowledgement.

@mauropasse
Copy link
Contributor Author

That is awkward. Was the reader also RELIABLE?

Yes. So this is the scenario just to clarify:

Single process: [Pub / Sub_A / Sub_B] = RELIABLE
1. Publisher (depth=1) publishes loaned messages
2. Subscription_A  (depth=10) doesn't spin, stores messages
3. Subscription_B spin, user stores 10 messages

After publishing the 2nd message, the publisher is provided with same memory 
used in the 1st message, despite loans not being returned. This is where we'd like to see the warning. 

When is usually the ACK sent by the subscription? When executed or when message is received in DataReader?
What is the relation with this callback, and the fact that asking a loan will provide memory still in use?

The only case in which I manage to see this callback in action (with also the logs you added in the commit) is setting:

Pub: best effort
Sub: reliable

[WARN] [1728658509.233333338] [pub_sub_node]: New publisher discovered on topic '/topic',
   offering incompatible QoS. No messages will be sent to it. Last incompatible policy: RELIABILITY_QOS_POLICY

[INFO] [1728658509.243104727] [pub_sub_node]: Publishing: '1'
2024-10-11 14:55:09.243 [RTPS_HISTORY Error] Unacknowledged change removed: test_msgs::msg::dds_::BasicTypes_ -> Function prepare_change
[WARN] [1728658509.243487964] [rmw_fastrtps_shared_cpp]: CustomPublisherInfo: on_unacknowledged_sample_removed!!
memory reused: 0x56141ada4bc0

[INFO] [1728658509.253650552] [pub_sub_node]: Publishing: '2'
[WARN] [1728658509.253678907] [rmw_fastrtps_shared_cpp]: CustomPublisherInfo: on_unacknowledged_sample_removed!!
2024-10-11 14:55:09.253 [RTPS_HISTORY Error] Unacknowledged change removed: test_msgs::msg::dds_::BasicTypes_ -> Function prepare_change
memory reused: 0x56141ada4b50

@mauropasse
Copy link
Contributor Author

Since my test is single process, the send_datasharing_ack() is not called, but I'm unsure if is the same ACK sender that causes the callback on_unacknowledged_sample_removed() to not be called.

@mauropasse
Copy link
Contributor Author

I expanded the test to check multi-process behavior (pub/sub in different processes):

  • Subscription not spinning (thus not returning loans):
    • The on_unacknowledged_sample_removed() callback is invoked correctly when memory is reused (since no ACK is sent to the publisher).
    • The user (publisher process) could be warned about memory still in use.
  • Subscription spinning (but user keeps messages, not returning loans):
    • The callback is not invoked, even if memory is reused, likely because the subscription sent ACK to the publisher.
  • Single-process:
    • The callback is not invoked, regardless of whether the subscription is spinning.

So, in most cases, the on_unacknowledged_sample_removed() callback doesn't serve to notify the user about memory reuse.

A different approach to verify if memory is still in use could be:

  • If the publisher knows how many same-host subscriptions exist, it could infer if memory is still in use:
    • The loan must be returned X times (where X = N subscriptions + 1 for the transient_local publisher).

Could logic be implemented to track this? What do you think @MiguelCompany

@fujitatomoya
Copy link
Collaborator

@mauropasse thank you for checking this, for us this is really interesting topic!

Subscription spinning (but user keeps messages, not returning loans):
The callback is not invoked, even if memory is reused, likely because the subscription sent ACK to the publisher.

this is because that ACK is sent to the corresponding publisher when the message is taken on the subscription? right?

while (ReturnCode_t::RETCODE_OK == info->data_reader_->take(data_values, info_seq, 1)) {

for me, this looks like okay by design of on_unacknowledged_sample_removed? because subscription sent the ACK for that message.
i mean that on_unacknowledged_sample_removed does not tell the data is valid or not...

i think we need the is_valid API in the end, so that application can check the data integrity right before accessing the data...

Just FYI, with previous comment on #783 (comment)

Pub: best effort
Sub: reliable

I do not think this is gonna communicate at all...

offering incompatible QoS. No messages will be sent to it. Last incompatible policy: RELIABILITY_QOS_POLICY

DDS detected the incompatible QoS, and no messages are delivered to this concerned subscription. because DDS cannot guarantee RELIABLE durability on this subscription.

@MiguelCompany
Copy link
Collaborator

@mauropasse @fujitatomoya This is in fact getting interesting...

this is because that ACK is sent to the corresponding publisher when the message is taken on the subscription? right?

@fujitatomoya you nailed it! It seems Fast DDS is acknowledging the sample when taking it.

So the following table is a summary on when samples are acknowledged

Same process Inter-process without data-sharing Inter-process with data-sharing
Sample acknowledged... ... when published ... when added to history ... on take

A different approach to verify if memory is still in use could be:

  • If the publisher knows how many same-host subscriptions exist, it could infer if memory is still in use:

    • The loan must be returned X times (where X = N subscriptions + 1 for the transient_local publisher).

Could logic be implemented to track this? What do you think @MiguelCompany

I think it would be better to extend the current mechanism and make it possible to acknowledge the sample when the loan is returned. This way we could use on_unacknowledged_sample_removed to warn the user.

Either that or expose is_valid API as @fujitatomoya suggested

i think we need the is_valid API in the end, so that application can check the data integrity right before accessing the data...

@MiguelCompany
Copy link
Collaborator

I thought of another possibility. The user could be warned when an item is added to the LoanManager, and the address of the sample already exists there:

void add_item(
std::unique_ptr<Item> item)
{
std::lock_guard<std::mutex> guard(mtx);
items.push_back(std::move(item));
}

@fujitatomoya
Copy link
Collaborator

I think it would be better to extend the current mechanism and make it possible to acknowledge the sample when the loan is returned. This way we could use on_unacknowledged_sample_removed to warn the user.

as Fast-DDS, i think this sounds more robust and reliable? is there any side effect for this?
because doing this means, the DataReader takes more time to send the ACK to the DataWriter, i am not sure if DataWriter's behavior could be different from this...

Either that or expose is_valid API as @fujitatomoya suggested

IMO, either way (above or below) we take, is_valid API is useful for user application to check the data validity right before accessing it.
warning and callback is also useful, but user application needs to sync the state between accessor and warning event in the application.

I thought of another possibility. The user could be warned when an item is added to the LoanManager, and the address of the sample already exists there

this sounds reasonable in ROS 2 RMW implementation.
if that is overwriting the same VA (unique_ptr) that already exists in the items list, we can print the warning.
and if we can have the user callback here, we can notify the user via this callback.

@mauropasse
Copy link
Contributor Author

mauropasse commented Oct 21, 2024

I've been testing the approaches using add_item and on_unacknowledged_sample_removed to warn the user about its messages being overridden, and they're useful.
So we have two situations:

  • Spinning subscription, user keeps messages (outside subscription callback) which are overridden:

    • Subscription process: Warning via add_item when subscription executed
      • Note: The warning happens after the message is overriden, published and sub executed
    • Publisher process: on_unacknowledged_... doesn't warn, since the ACK was sent when message was taken
      • If send_datasharing_ack were to be sent on rmw_return_loaned_... instead of rmw_take_loaned_... the publisher process could warn.
      • This warning would happen on rmw_publish, after the message has already been overridden.
  • Not-spinning subscription, receiving and storing a duplicated message which also overridden the 1st one.

    • Subscription process: add_item is not called, since subscription is not executed. No warning
      • If somehow add_item were to be called on receiving the message instead of executing subscription, we could have a warn here.
    • Publisher process: on_unacknowledged_... triggers, generating the warning.
      • The warning doesn't trigger if pub/sub were in the same process!

So we have then,

  • User message overridden
    • Subscription process: Warning - when execute_subscription called
    • Publisher process: could be also warned but requires changing when send_datasharing_ack is done.
  • Subscription message overridden:
    • Subscription process: No warning
    • Publisher process: Warning (when publish called) - only applies for multi process pub/sub.

In both cases, the warnings happen after the messages has been overriden.
Ideally it would be nice for the warning to happen in all processes, and when the loaned is borrowed (before overrides), but these warnings I tested above seem a step in the right direction!

--- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp
+++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp
 
+  void on_unacknowledged_sample_removed(
+      eprosima::fastdds::dds::DataWriter* datawriter,
+     const eprosima::fastdds::dds::InstanceHandle_t& handle) override
+  {
+    RCUTILS_LOG_WARN_NAMED(
+      "rmw_fastrtps_shared_cpp",
+      "A shared message held by a subscription has been overriden.");
+  }
+
 private:
   RMWPublisherEvent * publisher_event_;
 };

--- a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp
+++ b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp
   void add_item(std::unique_ptr<Item> item)
   {
     std::lock_guard<std::mutex> guard(mtx);
+    // Check if the new item already exists in the list
+    for (const auto& existing_item : items) {
+      if (existing_item->data_seq.buffer()[0] == item->data_seq.buffer()[0]) {
+        RCUTILS_LOG_WARN_NAMED(
+          "rmw_fastrtps_shared_cpp",
+          "Subscription recieved a message still held by the user (which was overridden)"); }}

is_valid would also be useful. Any suggestions for its implementation?

@fujitatomoya
Copy link
Collaborator

@mauropasse no major objections, i think this is better for user application. one concern is that when we are using the loaned messages, usually what we care most is performance. adding the extra checking to crawl through the existing_item list would be not ideal for the performance? especially we have a large depth in the HistoryCache?

@mauropasse
Copy link
Contributor Author

mauropasse commented Oct 24, 2024

@fujitatomoya I think it wouldn't hurt performances much.

Only in an (uncommon?) situation, there may be some CPU usage when a non-spinning subscription (with many stored messages) starts spinning, but the user doesn't discard the messages.

For each execute_subscription, an item is added to a list and compared with previous items (which would be zero, since the subscription wasn't spinning before).
If the user discards the message in the callback, the item is removed from the list. However, if the user keeps a copy of the message and doesn't return the loan, the item remains stored.

The overhead comes then from iterating over the messages that the user has kept outside of the subscription callback, when a new message is processed. So I think for most situations, it would be iterating over an empty list?

On other topic, what it worries me is that we still don't have a warning for single-process case where a subscription keeps duplicated an overridden messages.
Suppose, subscription_depth = 100 / publisher_depth = 1 -> Subscription has 100 overridden messages, all the same!

@fujitatomoya
Copy link
Collaborator

@mauropasse yeah probably the overhead can be ignored mostly when the application returns the memory to the middleware.

On other topic, what it worries me is that we still don't have a warning for single-process case where a subscription keeps duplicated an overridden messages.
Suppose, subscription_depth = 100 / publisher_depth = 1 -> Subscription has 100 overridden messages, all the same!

is this really configurable with Fast-DDS? i mean History Object is coupled between DataWriter and DataReader, that means i think subscription depth is also managed with 1 in this case?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants