Skip to content

Commit

Permalink
Heap-of-heaps attempt #1 (segfaults)
Browse files Browse the repository at this point in the history
  • Loading branch information
nightduck committed Apr 8, 2022
1 parent 01f7946 commit 8d3fd55
Showing 1 changed file with 99 additions and 7 deletions.
106 changes: 99 additions & 7 deletions rclcpp/include/rclcpp/executors/fixed_prio_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ namespace rclcpp

namespace experimental {

class ComparePrio {
public:
using eq = std::shared_ptr<std::deque<std::pair<int,std::shared_ptr<rclcpp::AnyExecutable>>>>;
bool operator()(const eq &lhs, const eq &rhs) const {
if (lhs->empty()) {
return false;
} else if (rhs->empty()) {
return true;
} else {
return lhs->begin()->first > rhs->begin()->first;
}
}
};

class CBG_Work {
public:
RCLCPP_SMART_PTR_DEFINITIONS(CBG_Work)
Expand All @@ -48,8 +62,41 @@ class CBG_Work {
bool add_work(rclcpp::AnyExecutable& exec, int prio) {
{
std::lock_guard<std::mutex> lk(mux);
heap.emplace(std::pair<int, std::shared_ptr<rclcpp::AnyExecutable>>
(prio, std::make_shared<rclcpp::AnyExecutable>(exec)));

std::shared_ptr<std::deque<std::pair<int,std::shared_ptr<rclcpp::AnyExecutable>>>> q = NULL;
if (exec.subscription != NULL) {
// Check if entry exist in dict
auto h = sub_dict.find(exec.subscription);
if (h == sub_dict.end()) {
// If not, create empty deque, put it in dict
auto ret = sub_dict.emplace(exec.subscription,
std::make_shared<std::deque<std::pair<int,std::shared_ptr<rclcpp::AnyExecutable>>>>());
q = ret.first->second;
}
} else if (exec.timer != NULL) {
// Check if entry exist in dict, if not, create an entry for it
auto h = tmr_dict.find(exec.timer);
if (h == tmr_dict.end()) {
// If not, create empty deque, put it in dict
auto ret = tmr_dict.emplace(exec.timer,
std::make_shared<std::deque<std::pair<int,std::shared_ptr<rclcpp::AnyExecutable>>>>());
q = ret.first->second;
}
} else {
assert(false);
}

// If an entry for this exists (or was just created above), add this message to it
q->push_back({prio, std::make_shared<rclcpp::AnyExecutable>(exec)});

// NOTE/TODO: Priority inheritance for subscriptions? If they have to be executed in
// order, then they should be guaranteed to have uniform priority, or older messages
// should inherit priority of previous messages, maybe iterate through deque with a max
// operation?

// If the queue was empty before now, put it in the heap
if (q->size() == 1)
heap.insert(q);

priority = std::max(prio, priority);
}
Expand All @@ -63,16 +110,49 @@ class CBG_Work {
// If no work is available, sleep on conditional lock until there is
std::shared_ptr<rclcpp::AnyExecutable> get_work() {
std::unique_lock<std::mutex> lk(mux);

cond.wait(
lk, [this]{return !heap.empty();}
);

auto ret = heap.top();
heap.pop();
priority = heap.begin()->get()->front().first;
running = heap.begin()->get()->front().second;

// Remove running from the heap
heap.begin()->get()->pop_front();
heap.erase(heap.begin());

// Update queues
if (running->subscription != NULL) {
auto h = sub_dict.find(running->subscription);
assert(h != sub_dict.end()); // If subscription has message in heap, then it should have at least an empty deque in the sub dictionary

if (h->second->empty()) {
// If deque is empty, remove it, to indicate there's no pending or running work on this subscription
sub_dict.erase(h->first);
} else {
// If deque isn't empty, put it back in heap. It won't necessarily go back to the top.
// This means all max prio executables get to run in round-robin
heap.insert(h->second);
}
} else if (running->timer != NULL) {
auto h = tmr_dict.find(running->timer);
assert(h != tmr_dict.end()); // If subscription has message in heap, then it should have at least an empty deque in the sub dictionary

if (h->second->empty()) {
// If deque is empty, remove it, to indicate there's no pending or running work on this subscription
tmr_dict.erase(h->first);
} else {
// If deque isn't empty, put it back in heap. It won't necessarily go back to the top.
// This means all max prio executables get to run in round-robin
heap.insert(h->second);
}
} else {
assert(false);
}

lk.unlock();

priority = ret.first;
running = ret.second;
return running;
}

Expand All @@ -85,7 +165,19 @@ class CBG_Work {
std::shared_ptr<rclcpp::AnyExecutable> running;

// TODO: In future, this will be rbtree of min-max heaps, to enforce queue sizes on subs
std::priority_queue<std::pair<int,std::shared_ptr<rclcpp::AnyExecutable>>> heap;
//std::priority_queue<std::pair<int,std::shared_ptr<rclcpp::AnyExecutable>>> heap;

// TODO: This is a binary search tree. It won't always be balanced, but is necessary because
// rbtrees don't return equivalent nodes in order, ruining round-robin between subscriptions of
// equivalent priority. Find a data structure that maintains the benefits of BSTs and RBTs
std::multiset<std::shared_ptr<std::deque<std::pair<int,std::shared_ptr<rclcpp::AnyExecutable>>>>,
ComparePrio> heap;

// Dictionaries of deques, representing backlogged work.
std::unordered_map<rclcpp::SubscriptionBase::SharedPtr,
std::shared_ptr<std::deque<std::pair<int,std::shared_ptr<rclcpp::AnyExecutable>>>>> sub_dict;
std::unordered_map<rclcpp::TimerBase::SharedPtr,
std::shared_ptr<std::deque<std::pair<int,std::shared_ptr<rclcpp::AnyExecutable>>>>> tmr_dict;
};
} // namespace experimental

Expand Down

0 comments on commit 8d3fd55

Please sign in to comment.