From 156c1bd163fbc735d07a98970a504408d5a6a9e3 Mon Sep 17 00:00:00 2001 From: Eran Date: Tue, 22 Jun 2021 12:51:10 +0300 Subject: [PATCH] PR #9258 from Nir: Dispatcher exception + high CPU usage fix (cherry picked from commit d95be8ae5a06b1b4b8e15017ae2fefc52e1c692e) --- common/utilities/time/waiting-on.h | 41 +++-- src/concurrency.h | 7 + src/dispatcher.cpp | 62 +++++--- src/media/playback/playback_sensor.cpp | 4 +- .../utilities/concurrency/test-dispatcher.cpp | 97 ++++++++++++ unit-tests/utilities/concurrency/test-scq.cpp | 149 ++++++++++++++++++ unit-tests/utilities/time/test-waiting-on.cpp | 8 +- 7 files changed, 325 insertions(+), 43 deletions(-) create mode 100644 unit-tests/utilities/concurrency/test-dispatcher.cpp create mode 100644 unit-tests/utilities/concurrency/test-scq.cpp diff --git a/common/utilities/time/waiting-on.h b/common/utilities/time/waiting-on.h index 85b410d2db..7c155e64f9 100644 --- a/common/utilities/time/waiting-on.h +++ b/common/utilities/time/waiting-on.h @@ -31,6 +31,7 @@ class waiting_on { T _value; std::condition_variable _cv; + std::atomic_bool _valid{ true }; friend class waiting_on; @@ -58,9 +59,14 @@ class waiting_on { _cv.notify_one(); } - void signal_all() + // Invalidate the wait_state_t so the user will not use destroyed objects + void invalidate() { - _cv.notify_all(); + if ( _valid ) + { + _valid = false; + _cv.notify_all(); + } } }; private: @@ -76,23 +82,24 @@ class waiting_on public: class in_thread_ { - std::weak_ptr< class wait_state_t > const _ptr; + std::weak_ptr< wait_state_t > const _ptr; + // We use an invalidator for invalidating the class when reference count is equal to Zero. + std::shared_ptr< std::nullptr_t > const _invalidator; public: - in_thread_( waiting_on const& local ) + in_thread_( waiting_on const & local ) : _ptr( local._ptr ) + , _invalidator( + nullptr, + [weak_ptr = std::weak_ptr< wait_state_t >( local._ptr )]( std::nullptr_t * ) { + // We get here when the lambda we're in is destroyed -- so either we've + // already run (and signalled once) or we've never run. We signal anyway + // if anything's waiting they'll get woken up; otherwise nothing'll happen... + if( auto wait_state = weak_ptr.lock() ) + wait_state->invalidate(); + } ) { } -#if 0 // TODO this causes major slowdowns! left in here for Eran to break his head against... - ~in_thread_() - { - // We get here when the lambda we're in is destroyed -- so either we've already run - // (and signalled once) or we've never run. We signal anyway -- if anything's waiting - // they'll get woken up; otherwise nothing'll happen... - if( auto wait_state = still_alive() ) - wait_state->signal_all(); - } -#endif std::shared_ptr< wait_state_t > still_alive() const { return _ptr.lock(); } @@ -143,7 +150,11 @@ class waiting_on // Following will issue (from CppCheck): // warning: The lock is ineffective because the mutex is locked at the same scope as the mutex itself. [localMutex] std::unique_lock< std::mutex > locker( m ); - _ptr->_cv.wait_for( locker, timeout, pred ); + _ptr->_cv.wait_for( locker, timeout, [&]() -> bool { + if( ! _ptr->_valid ) + return true; + return pred(); + } ); } }; diff --git a/src/concurrency.h b/src/concurrency.h index bba872c262..5e2599fbe0 100644 --- a/src/concurrency.h +++ b/src/concurrency.h @@ -354,8 +354,15 @@ class dispatcher // bool flush(); + private: + // Return true if dispatcher is started (within a timeout). + // false if not or the dispatcher is no longer alive + // + bool _wait_for_start( int timeout_ms ); + friend cancellable_timer; + single_consumer_queue> _queue; std::thread _thread; diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index b466c5de9f..9da0dfef38 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -17,25 +17,27 @@ dispatcher::dispatcher( unsigned int cap, std::function< void( action ) > on_dro int timeout_ms = 5000; while( _is_alive ) { - std::function< void( cancellable_timer ) > item; - - if( _queue.dequeue( &item, timeout_ms ) ) + if( _wait_for_start( timeout_ms ) ) { - cancellable_timer time(this); - - try - { - // While we're dispatching the item, we cannot stop! - std::lock_guard< std::mutex > lock( _dispatch_mutex ); - item( time ); - } - catch( const std::exception & e ) + std::function< void(cancellable_timer) > item; + if (_queue.dequeue(&item, timeout_ms)) { - LOG_ERROR( "Dispatcher [" << this << "] exception caught: " << e.what() ); - } - catch( ... ) - { - LOG_ERROR( "Dispatcher [" << this << "] unknown exception caught!" ); + cancellable_timer time(this); + + try + { + // While we're dispatching the item, we cannot stop! + std::lock_guard< std::mutex > lock(_dispatch_mutex); + item(time); + } + catch (const std::exception& e) + { + LOG_ERROR("Dispatcher [" << this << "] exception caught: " << e.what()); + } + catch (...) + { + LOG_ERROR("Dispatcher [" << this << "] unknown exception caught!"); + } } } } @@ -59,10 +61,14 @@ dispatcher::~dispatcher() void dispatcher::start() { - std::lock_guard< std::mutex > lock( _was_stopped_mutex ); - _was_stopped = false; - + { + std::lock_guard< std::mutex > lock(_was_stopped_mutex); + _was_stopped = false; + } _queue.start(); + // Wake up all threads that wait for the dispatcher to start + _was_stopped_cv.notify_all(); + } @@ -84,8 +90,8 @@ void dispatcher::stop() { std::lock_guard< std::mutex > lock( _was_stopped_mutex ); _was_stopped = true; - _was_stopped_cv.notify_all(); } + _was_stopped_cv.notify_all(); // Wait until any dispatched is done... { @@ -113,3 +119,17 @@ bool dispatcher::flush() } ); return invoked; } + +// Return true if dispatcher is started (within a timeout). +// false if not or the dispatcher is no longer alive +// +bool dispatcher::_wait_for_start( int timeout_ms ) +{ + // If the dispatcher is not started wait for a start event, if not such event within given timeout do nothing. + // If during the wait the thread destructor is called (_is_aliva = false) do nothing as well. + std::unique_lock< std::mutex > lock(_was_stopped_mutex); + return _was_stopped_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this]() { + return !_was_stopped.load() || !_is_alive; + } ) && _is_alive; +} + diff --git a/src/media/playback/playback_sensor.cpp b/src/media/playback/playback_sensor.cpp index 55c9c15fd5..3346d1c6c4 100644 --- a/src/media/playback/playback_sensor.cpp +++ b/src/media/playback/playback_sensor.cpp @@ -71,7 +71,7 @@ void playback_sensor::open(const stream_profiles& requests) //Playback can only play the streams that were recorded. //Go over the requested profiles and see if they are available LOG_DEBUG("Open Sensor " << m_sensor_id); - + std::lock_guard l(m_mutex); for (auto&& r : requests) { if (std::find_if(std::begin(m_available_profiles), @@ -94,6 +94,7 @@ void playback_sensor::open(const stream_profiles& requests) std::make_shared< dispatcher >( _default_queue_size, on_drop_callback ) ) ); m_dispatchers[profile->get_unique_id()]->start(); + device_serializer::stream_identifier f{ get_device_index(), m_sensor_id, profile->get_stream_type(), static_cast(profile->get_stream_index()) }; opened_streams.push_back(f); } @@ -104,6 +105,7 @@ void playback_sensor::open(const stream_profiles& requests) void playback_sensor::close() { LOG_DEBUG("Close sensor " << m_sensor_id); + std::lock_guard l(m_mutex); std::vector closed_streams; for (auto&& dispatcher : m_dispatchers) { diff --git a/unit-tests/utilities/concurrency/test-dispatcher.cpp b/unit-tests/utilities/concurrency/test-dispatcher.cpp new file mode 100644 index 0000000000..a7f22e2e6e --- /dev/null +++ b/unit-tests/utilities/concurrency/test-dispatcher.cpp @@ -0,0 +1,97 @@ +// License: Apache 2.0. See LICENSE file in root directory. +// Copyright(c) 2021 Intel Corporation. All Rights Reserved. + +//#cmake:add-file ../../../src/dispatcher.cpp + +#include +#include +#include + +#include +#include + +using namespace utilities::time; + +// We use this function as a CPU stress test function +int fibo( int num ) +{ + if( num < 2 ) + return 1; + return fibo( num - 1 ) + fibo( num - 2 ); +} + +TEST_CASE( "dispatcher main flow" ) +{ + dispatcher d(3); + std::atomic_bool run = { false }; + auto func = [&](dispatcher::cancellable_timer c) + { + c.try_sleep(std::chrono::seconds(1)); + run = true; + }; + + d.start(); + REQUIRE(d.empty()); + // We want to make sure that if we invoke some functions, the dispatcher is not empty. + // We add 2 functions that take some time so that even if the first one pop, + // the second will still be in the queue and it will not be empty + d.invoke(func); + d.invoke(func); + REQUIRE_FALSE(d.empty()); + REQUIRE(d.flush()); + REQUIRE(run); + d.stop(); +} + +TEST_CASE( "invoke and wait" ) +{ + dispatcher d(2); + + std::atomic_bool run = { false }; + auto func = [&](dispatcher::cancellable_timer c) + { + std::this_thread::sleep_for(std::chrono::seconds(3)); + run = true; + }; + + d.start(); + stopwatch sw; + d.invoke_and_wait(func, []() {return false; }, true); + REQUIRE(sw.get_elapsed_ms() > 3000); // verify we get here only after the function call ended + d.stop(); +} + +TEST_CASE("verify stop() not consuming high CPU usage") +{ + // using shared_ptr because no copy constructor is allowed for a dispatcher. + std::vector> dispatchers; + + for (int i = 0 ; i < 32; ++i) + { + dispatchers.push_back(std::make_shared(10)); + } + + for (auto &&dispatcher : dispatchers) + { + dispatcher->start(); + } + + for (auto&& dispatcher : dispatchers) + { + dispatcher->stop(); + } + + + // Allow some time for all threads to do some work + std::this_thread::sleep_for(std::chrono::seconds(5)); + + stopwatch sw; + + // Do some stress work + REQUIRE(fibo(40) == 165580141); + // Verify the stress test did not take too long. + // We had an issue that stop() call cause a high CPU usage and therefore other operations stall, + // This test took > 9 seconds on an 8 cores PC, after the fix it took ~1.5 sec on 1 core run (on release configuration). + // We allow 9 seconds to support debug configuration as well + REQUIRE(sw.get_elapsed_ms() < 9000); +} diff --git a/unit-tests/utilities/concurrency/test-scq.cpp b/unit-tests/utilities/concurrency/test-scq.cpp new file mode 100644 index 0000000000..72bb16fc28 --- /dev/null +++ b/unit-tests/utilities/concurrency/test-scq.cpp @@ -0,0 +1,149 @@ +// License: Apache 2.0. See LICENSE file in root directory. +// Copyright(c) 2021 Intel Corporation. All Rights Reserved. + +//#cmake:add-file ../../../src/dispatcher.cpp + +#include +#include +#include + +#include +#include + +using namespace utilities::time; + +TEST_CASE( "dequeue doesn't wait after stop" ) +{ + single_consumer_queue< std::function< void( void ) > > scq; + std::function< void( void ) > f; + std::function< void( void ) > * f_ptr = &f; + + scq.enqueue( []() {} ); + REQUIRE( scq.size() == 1 ); + REQUIRE( scq.peek( &f_ptr ) ); + + REQUIRE( scq.started() ); + REQUIRE_FALSE( scq.stopped() ); + + scq.stop(); + + REQUIRE_FALSE( scq.peek( &f_ptr ) ); + REQUIRE( scq.stopped() ); + REQUIRE_FALSE( scq.started() ); + + REQUIRE( scq.empty() ); + + timer t( std::chrono::seconds( 1 ) ); + t.start(); + scq.dequeue( &f, 2000 ); + REQUIRE_FALSE( t.has_expired() ); // Verify no timeout, dequeue return in less than 10 seconds +} + + +TEST_CASE( "dequeue doesn't wait when queue is not empty" ) +{ + single_consumer_queue< std::function< void( void ) > > scq; + timer t( std::chrono::seconds( 1 ) ); + std::function< void( void ) > f; + + scq.enqueue( []() {} ); + t.start(); + scq.dequeue( &f, 3000 ); + REQUIRE_FALSE( t.has_expired() ); // Verify no timeout, dequeue return in less than 1 seconds + REQUIRE( scq.empty() ); +} + +TEST_CASE( "dequeue wait when queue is empty" ) +{ + single_consumer_queue< std::function< void( void ) > > scq; + timer t( std::chrono::milliseconds( 2900 ) ); + + std::function< void( void ) > f; + + t.start(); + REQUIRE_FALSE( scq.dequeue( &f, 3000 ) ); + REQUIRE( t.has_expired() ); // Verify timeout, dequeue return after >= 3 seconds +} + +TEST_CASE( "try dequeue" ) +{ + single_consumer_queue< std::function< void( void ) > > scq; + std::function< void( void ) > f; + + REQUIRE_FALSE( scq.try_dequeue( &f ) ); // nothing on queue + scq.enqueue( []() {} ); + REQUIRE( scq.try_dequeue( &f ) ); // 1 item on queue + REQUIRE_FALSE( scq.try_dequeue( &f ) ); // 0 items on queue +} + +TEST_CASE( "blocking enqueue" ) +{ + single_consumer_queue< std::function< void( void ) > > scq; + std::function< void( void ) > f; + stopwatch sw; + + sw.reset(); + + // dequeue an item after 5 seconds, we wait for the blocking call and verify we return after this dequeue call ~ 5 seconds + std::thread dequeue_thread( [&]() { + std::this_thread::sleep_for( std::chrono::seconds( 5 ) ); + scq.dequeue( &f, 1000 ); + } ); + + REQUIRE( sw.get_elapsed_ms() < 1000 ); + + for( int i = 0; i < 10; ++i ) + { + scq.blocking_enqueue( []() {} ); + } + REQUIRE( sw.get_elapsed_ms() < 1000 ); + REQUIRE( scq.size() == 10 ); // verify queue is full (default capacity is 10) + scq.blocking_enqueue( []() {} ); // add a blocking call (item 11) + REQUIRE( sw.get_elapsed_ms() > 5000 ); // verify the blocking call return on dequeue time + REQUIRE( sw.get_elapsed_ms() < 6000 ); // verify the blocking call return on dequeue time + + dequeue_thread.join(); // verify clean exit with no threads alive. +} + +TEST_CASE("verify mutex protection") +{ + single_consumer_queue< int > scq; + stopwatch sw; + + const int MAX_SIZE_FOR_THREAD = 20; + std::thread enqueue_thread1( [&]() { + for( int i = 0; i < MAX_SIZE_FOR_THREAD; ++i ) + { + std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) ); + scq.blocking_enqueue(std::move(i)); + } + } ); + + std::thread enqueue_thread2( [&]() { + for( int i = 0; i < MAX_SIZE_FOR_THREAD; ++i ) + { + std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) ); + scq.blocking_enqueue(std::move(20 + i)); + } + } ); + + std::vector all_values; + for( int i = 0; i < MAX_SIZE_FOR_THREAD * 2; ++i ) + { + int val; + scq.dequeue( &val, 1000 ); + all_values.push_back(val); + } + + REQUIRE(all_values.size() == MAX_SIZE_FOR_THREAD * 2); + + std::sort(all_values.begin(), all_values.end()); + + for (int i = 0; i < all_values.size(); ++i) + { + REQUIRE(all_values[i] == i); + } + + enqueue_thread1.join(); + enqueue_thread2.join(); +} diff --git a/unit-tests/utilities/time/test-waiting-on.cpp b/unit-tests/utilities/time/test-waiting-on.cpp index 34d91fce21..f901c5e77f 100644 --- a/unit-tests/utilities/time/test-waiting-on.cpp +++ b/unit-tests/utilities/time/test-waiting-on.cpp @@ -18,6 +18,7 @@ using utilities::time::waiting_on; bool invoke( size_t delay_in_thread, size_t timeout ) { waiting_on< bool > invoked( false ); + auto lambda = [delay_in_thread, invoked = invoked.in_thread()]() { //std::cout << "In thread" << std::endl; std::this_thread::sleep_for( std::chrono::seconds( delay_in_thread )); @@ -120,12 +121,7 @@ TEST_CASE( "Not invoked but still notified" ) } ); auto wait_end = std::chrono::high_resolution_clock::now(); auto waited_ms = std::chrono::duration_cast( wait_end - wait_start ).count(); -#if 1 - // TODO: the requires below depend on the commented-out ~waiting_on::in_frame_(), but it also - // causes unintended slowdowns when the playback is done - REQUIRE( waited_ms > 4990 ); -#else + REQUIRE( waited_ms > 1990 ); REQUIRE( waited_ms < 3000 ); // Up to a second buffer -#endif }