Skip to content

Commit

Permalink
PR #9258 from Nir: Dispatcher exception + high CPU usage fix
Browse files Browse the repository at this point in the history
(cherry picked from commit d95be8a)
  • Loading branch information
maloel committed Jun 22, 2021
1 parent 3d849e6 commit 156c1bd
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 43 deletions.
41 changes: 26 additions & 15 deletions common/utilities/time/waiting-on.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class waiting_on
{
T _value;
std::condition_variable _cv;
std::atomic_bool _valid{ true };

friend class waiting_on;

Expand Down Expand Up @@ -58,9 +59,14 @@ class waiting_on
{
_cv.notify_one();
}
void signal_all()
// Invalidate the wait_state_t so the user will not use destroyed objects
void invalidate()
{
_cv.notify_all();
if ( _valid )
{
_valid = false;
_cv.notify_all();
}
}
};
private:
Expand All @@ -76,23 +82,24 @@ class waiting_on
public:
class in_thread_
{
std::weak_ptr< class wait_state_t > const _ptr;
std::weak_ptr< wait_state_t > const _ptr;
// We use an invalidator for invalidating the class when reference count is equal to Zero.
std::shared_ptr< std::nullptr_t > const _invalidator;

public:
in_thread_( waiting_on const& local )
in_thread_( waiting_on const & local )
: _ptr( local._ptr )
, _invalidator(
nullptr,
[weak_ptr = std::weak_ptr< wait_state_t >( local._ptr )]( std::nullptr_t * ) {
// We get here when the lambda we're in is destroyed -- so either we've
// already run (and signalled once) or we've never run. We signal anyway
// if anything's waiting they'll get woken up; otherwise nothing'll happen...
if( auto wait_state = weak_ptr.lock() )
wait_state->invalidate();
} )
{
}
#if 0 // TODO this causes major slowdowns! left in here for Eran to break his head against...
~in_thread_()
{
// We get here when the lambda we're in is destroyed -- so either we've already run
// (and signalled once) or we've never run. We signal anyway -- if anything's waiting
// they'll get woken up; otherwise nothing'll happen...
if( auto wait_state = still_alive() )
wait_state->signal_all();
}
#endif

std::shared_ptr< wait_state_t > still_alive() const { return _ptr.lock(); }

Expand Down Expand Up @@ -143,7 +150,11 @@ class waiting_on
// Following will issue (from CppCheck):
// warning: The lock is ineffective because the mutex is locked at the same scope as the mutex itself. [localMutex]
std::unique_lock< std::mutex > locker( m );
_ptr->_cv.wait_for( locker, timeout, pred );
_ptr->_cv.wait_for( locker, timeout, [&]() -> bool {
if( ! _ptr->_valid )
return true;
return pred();
} );
}
};

Expand Down
7 changes: 7 additions & 0 deletions src/concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,15 @@ class dispatcher
//
bool flush();


private:
// Return true if dispatcher is started (within a timeout).
// false if not or the dispatcher is no longer alive
//
bool _wait_for_start( int timeout_ms );

friend cancellable_timer;

single_consumer_queue<std::function<void(cancellable_timer)>> _queue;
std::thread _thread;

Expand Down
62 changes: 41 additions & 21 deletions src/dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,27 @@ dispatcher::dispatcher( unsigned int cap, std::function< void( action ) > on_dro
int timeout_ms = 5000;
while( _is_alive )
{
std::function< void( cancellable_timer ) > item;

if( _queue.dequeue( &item, timeout_ms ) )
if( _wait_for_start( timeout_ms ) )
{
cancellable_timer time(this);

try
{
// While we're dispatching the item, we cannot stop!
std::lock_guard< std::mutex > lock( _dispatch_mutex );
item( time );
}
catch( const std::exception & e )
std::function< void(cancellable_timer) > item;
if (_queue.dequeue(&item, timeout_ms))
{
LOG_ERROR( "Dispatcher [" << this << "] exception caught: " << e.what() );
}
catch( ... )
{
LOG_ERROR( "Dispatcher [" << this << "] unknown exception caught!" );
cancellable_timer time(this);

try
{
// While we're dispatching the item, we cannot stop!
std::lock_guard< std::mutex > lock(_dispatch_mutex);
item(time);
}
catch (const std::exception& e)
{
LOG_ERROR("Dispatcher [" << this << "] exception caught: " << e.what());
}
catch (...)
{
LOG_ERROR("Dispatcher [" << this << "] unknown exception caught!");
}
}
}
}
Expand All @@ -59,10 +61,14 @@ dispatcher::~dispatcher()

void dispatcher::start()
{
std::lock_guard< std::mutex > lock( _was_stopped_mutex );
_was_stopped = false;

{
std::lock_guard< std::mutex > lock(_was_stopped_mutex);
_was_stopped = false;
}
_queue.start();
// Wake up all threads that wait for the dispatcher to start
_was_stopped_cv.notify_all();

}


Expand All @@ -84,8 +90,8 @@ void dispatcher::stop()
{
std::lock_guard< std::mutex > lock( _was_stopped_mutex );
_was_stopped = true;
_was_stopped_cv.notify_all();
}
_was_stopped_cv.notify_all();

// Wait until any dispatched is done...
{
Expand Down Expand Up @@ -113,3 +119,17 @@ bool dispatcher::flush()
} );
return invoked;
}

// Return true if dispatcher is started (within a timeout).
// false if not or the dispatcher is no longer alive
//
bool dispatcher::_wait_for_start( int timeout_ms )
{
// If the dispatcher is not started wait for a start event, if not such event within given timeout do nothing.
// If during the wait the thread destructor is called (_is_aliva = false) do nothing as well.
std::unique_lock< std::mutex > lock(_was_stopped_mutex);
return _was_stopped_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this]() {
return !_was_stopped.load() || !_is_alive;
} ) && _is_alive;
}

4 changes: 3 additions & 1 deletion src/media/playback/playback_sensor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void playback_sensor::open(const stream_profiles& requests)
//Playback can only play the streams that were recorded.
//Go over the requested profiles and see if they are available
LOG_DEBUG("Open Sensor " << m_sensor_id);

std::lock_guard<std::mutex> l(m_mutex);
for (auto&& r : requests)
{
if (std::find_if(std::begin(m_available_profiles),
Expand All @@ -94,6 +94,7 @@ void playback_sensor::open(const stream_profiles& requests)
std::make_shared< dispatcher >( _default_queue_size, on_drop_callback ) ) );

m_dispatchers[profile->get_unique_id()]->start();

device_serializer::stream_identifier f{ get_device_index(), m_sensor_id, profile->get_stream_type(), static_cast<uint32_t>(profile->get_stream_index()) };
opened_streams.push_back(f);
}
Expand All @@ -104,6 +105,7 @@ void playback_sensor::open(const stream_profiles& requests)
void playback_sensor::close()
{
LOG_DEBUG("Close sensor " << m_sensor_id);
std::lock_guard<std::mutex> l(m_mutex);
std::vector<device_serializer::stream_identifier> closed_streams;
for (auto&& dispatcher : m_dispatchers)
{
Expand Down
97 changes: 97 additions & 0 deletions unit-tests/utilities/concurrency/test-dispatcher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// License: Apache 2.0. See LICENSE file in root directory.
// Copyright(c) 2021 Intel Corporation. All Rights Reserved.

//#cmake:add-file ../../../src/dispatcher.cpp

#include <unit-tests/test.h>
#include <common/utilities/time/timer.h>
#include <src/concurrency.h>

#include <algorithm>
#include <vector>

using namespace utilities::time;

// We use this function as a CPU stress test function
int fibo( int num )
{
if( num < 2 )
return 1;
return fibo( num - 1 ) + fibo( num - 2 );
}

TEST_CASE( "dispatcher main flow" )
{
dispatcher d(3);
std::atomic_bool run = { false };
auto func = [&](dispatcher::cancellable_timer c)
{
c.try_sleep(std::chrono::seconds(1));
run = true;
};

d.start();
REQUIRE(d.empty());
// We want to make sure that if we invoke some functions, the dispatcher is not empty.
// We add 2 functions that take some time so that even if the first one pop,
// the second will still be in the queue and it will not be empty
d.invoke(func);
d.invoke(func);
REQUIRE_FALSE(d.empty());
REQUIRE(d.flush());
REQUIRE(run);
d.stop();
}

TEST_CASE( "invoke and wait" )
{
dispatcher d(2);

std::atomic_bool run = { false };
auto func = [&](dispatcher::cancellable_timer c)
{
std::this_thread::sleep_for(std::chrono::seconds(3));
run = true;
};

d.start();
stopwatch sw;
d.invoke_and_wait(func, []() {return false; }, true);
REQUIRE(sw.get_elapsed_ms() > 3000); // verify we get here only after the function call ended
d.stop();
}

TEST_CASE("verify stop() not consuming high CPU usage")
{
// using shared_ptr because no copy constructor is allowed for a dispatcher.
std::vector<std::shared_ptr<dispatcher>> dispatchers;

for (int i = 0 ; i < 32; ++i)
{
dispatchers.push_back(std::make_shared<dispatcher>(10));
}

for (auto &&dispatcher : dispatchers)
{
dispatcher->start();
}

for (auto&& dispatcher : dispatchers)
{
dispatcher->stop();
}


// Allow some time for all threads to do some work
std::this_thread::sleep_for(std::chrono::seconds(5));

stopwatch sw;

// Do some stress work
REQUIRE(fibo(40) == 165580141);
// Verify the stress test did not take too long.
// We had an issue that stop() call cause a high CPU usage and therefore other operations stall,
// This test took > 9 seconds on an 8 cores PC, after the fix it took ~1.5 sec on 1 core run (on release configuration).
// We allow 9 seconds to support debug configuration as well
REQUIRE(sw.get_elapsed_ms() < 9000);
}
Loading

0 comments on commit 156c1bd

Please sign in to comment.