Skip to content

Commit

Permalink
Update the LoanManager to do internal locking. (#552)
Browse files Browse the repository at this point in the history
The original motivation to do this is to please the clang
thread safety analysis, which can't quite figure out the
locking regime with an external mutex.  Instead, move major
functionality into the LoanManager class, and then mark
the item list as private.

One thing to note is that the logic of
__rmw_take_loaned_message_internal is slightly changed here.
Before it would emplace an Item onto the LoanManager's list,
then look through the data_reader to see if it could take an
item.  If it could, it would just fill that Item in and return.
If it couldn't, it would pop that item out of the list.  This
commit changes it so that we instantiate a temporary Item.
If we can take an item, we then push that Item into the LoanManager
list.  We never pop it.  This should be equivalent (and
slightly more performant), but is a change from what we had
before.

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette authored Aug 16, 2021
1 parent 1fd0702 commit ba50353
Showing 1 changed file with 39 additions and 19 deletions.
58 changes: 39 additions & 19 deletions rmw_fastrtps_shared_cpp/src/rmw_take.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <memory>
#include <utility>

#include "rmw/allocators.h"
#include "rmw/error_handling.h"
Expand Down Expand Up @@ -401,8 +402,32 @@ struct LoanManager
{
}

void add_item(std::unique_ptr<Item> item)
{
std::lock_guard<std::mutex> guard(mtx);
items.push_back(std::move(item));
}

std::unique_ptr<Item> erase_item(void * loaned_message)
{
std::unique_ptr<Item> ret{nullptr};

std::lock_guard<std::mutex> guard(mtx);
for (auto it = items.begin(); it != items.end(); ++it) {
if (loaned_message == (*it)->data_seq.buffer()[0]) {
ret = std::move(*it);
items.erase(it);
break;
}
}

return ret;
}

private:
std::mutex mtx;
eprosima::fastrtps::ResourceLimitedVector<Item> items RCPPUTILS_TSA_GUARDED_BY(mtx);
using ItemVector = eprosima::fastrtps::ResourceLimitedVector<std::unique_ptr<Item>>;
ItemVector items RCPPUTILS_TSA_GUARDED_BY(mtx);
};

void
Expand Down Expand Up @@ -440,13 +465,8 @@ __rmw_take_loaned_message_internal(
RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT);

auto info = static_cast<CustomSubscriberInfo *>(subscription->data);
auto loan_mgr = info->loan_manager_;
std::unique_lock<std::mutex> guard(loan_mgr->mtx);
auto item = loan_mgr->items.emplace_back();
if (nullptr == item) {
RMW_SET_ERROR_MSG("Out of resources for loaned message info");
return RMW_RET_ERROR;
}

auto item = std::make_unique<rmw_fastrtps_shared_cpp::LoanManager::Item>();

while (ReturnCode_t::RETCODE_OK == info->data_reader_->take(item->data_seq, item->info_seq, 1)) {
if (item->info_seq[0].valid_data) {
Expand All @@ -456,6 +476,9 @@ __rmw_take_loaned_message_internal(
*loaned_message = item->data_seq.buffer()[0];
*taken = true;
info->listener_->update_has_data(info->data_reader_);

info->loan_manager_->add_item(std::move(item));

return RMW_RET_OK;
}

Expand All @@ -464,7 +487,6 @@ __rmw_take_loaned_message_internal(
}

// No data available, return loan information.
loan_mgr->items.pop_back();
*taken = false;
info->listener_->update_has_data(info->data_reader_);
return RMW_RET_OK;
Expand All @@ -487,17 +509,15 @@ __rmw_return_loaned_message_from_subscription(
RMW_CHECK_ARGUMENT_FOR_NULL(loaned_message, RMW_RET_INVALID_ARGUMENT);

auto info = static_cast<CustomSubscriberInfo *>(subscription->data);
auto loan_mgr = info->loan_manager_;
std::lock_guard<std::mutex> guard(loan_mgr->mtx);
for (auto it = loan_mgr->items.begin(); it != loan_mgr->items.end(); ++it) {
if (loaned_message == it->data_seq.buffer()[0]) {
if (!info->data_reader_->return_loan(it->data_seq, it->info_seq)) {
RMW_SET_ERROR_MSG("Error returning loan");
return RMW_RET_ERROR;
}
loan_mgr->items.erase(it);
return RMW_RET_OK;
std::unique_ptr<rmw_fastrtps_shared_cpp::LoanManager::Item> item;
item = info->loan_manager_->erase_item(loaned_message);
if (item != nullptr) {
if (!info->data_reader_->return_loan(item->data_seq, item->info_seq)) {
RMW_SET_ERROR_MSG("Error returning loan");
return RMW_RET_ERROR;
}

return RMW_RET_OK;
}

RMW_SET_ERROR_MSG("Trying to return message not loaned by this subscription");
Expand Down

0 comments on commit ba50353

Please sign in to comment.