Skip to content

Commit

Permalink
Merge pull request ros2#5 from alsora/asoragna/add-predicate
Browse files Browse the repository at this point in the history
add missing predicate to event_queue_cv wait
  • Loading branch information
iRobot ROS authored Oct 12, 2020
2 parents e1263ee + c441ea5 commit eb0bb01
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions rclcpp/src/rclcpp/executors/events_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ EventsExecutor::spin()
RCLCPP_SCOPE_EXIT(this->spinning.store(false););

// When condition variable is notified, check this predicate to proceed
auto predicate = [this]() { return !event_queue_.empty(); };
auto has_event_predicate = [this]() {return !event_queue_.empty();};

// Local event queue
std::queue<ExecutorEvent> local_event_queue;
Expand All @@ -93,7 +93,7 @@ EventsExecutor::spin()
{
std::unique_lock<std::mutex> lock(event_queue_mutex_);
// We wait here until something has been pushed to the event queue
event_queue_cv_.wait(lock, predicate);
event_queue_cv_.wait(lock, has_event_predicate);

// We got an event! Swap queues and execute events
std::swap(local_event_queue, event_queue_);
Expand All @@ -118,18 +118,21 @@ EventsExecutor::spin_some(std::chrono::nanoseconds max_duration)
// - A timer triggers
// - An executor event is received and processed

// When condition variable is notified, check this predicate to proceed
auto has_event_predicate = [this]() {return !event_queue_.empty();};

std::queue<ExecutorEvent> local_event_queue;

// Select the smallest between input max_duration and timer timeout
auto next_timer_timeout = timers_manager_->get_head_timeout();
if (next_timer_timeout < max_duration) {
max_duration = next_timer_timeout;
}

std::queue<ExecutorEvent> local_event_queue;

{
// Wait until timeout or event
std::unique_lock<std::mutex> lock(event_queue_mutex_);
event_queue_cv_.wait_for(lock, max_duration);
event_queue_cv_.wait_for(lock, max_duration, has_event_predicate);
std::swap(local_event_queue, event_queue_);
}

Expand All @@ -149,6 +152,9 @@ EventsExecutor::spin_all(std::chrono::nanoseconds max_duration)
}
RCLCPP_SCOPE_EXIT(this->spinning.store(false););

// When condition variable is notified, check this predicate to proceed
auto has_event_predicate = [this]() {return !event_queue_.empty();};

std::queue<ExecutorEvent> local_event_queue;

auto start = std::chrono::steady_clock::now();
Expand All @@ -167,7 +173,7 @@ EventsExecutor::spin_all(std::chrono::nanoseconds max_duration)
{
// Wait until timeout or event
std::unique_lock<std::mutex> lock(event_queue_mutex_);
event_queue_cv_.wait_for(lock, max_duration);
event_queue_cv_.wait_for(lock, max_duration, has_event_predicate);
}

// Keep executing until work available or timeout expired
Expand Down Expand Up @@ -204,13 +210,16 @@ EventsExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
timeout = next_timer_timeout;
}

// When condition variable is notified, check this predicate to proceed
auto has_event_predicate = [this]() {return !event_queue_.empty();};

ExecutorEvent event;
bool has_event = false;

{
// Wait until timeout or event arrives
std::unique_lock<std::mutex> lock(event_queue_mutex_);
event_queue_cv_.wait_for(lock, timeout);
event_queue_cv_.wait_for(lock, timeout, has_event_predicate);

// Grab first event from queue if it exists
has_event = !event_queue_.empty();
Expand Down

0 comments on commit eb0bb01

Please sign in to comment.