diff --git a/src/tbb/arena.cpp b/src/tbb/arena.cpp index 0e7cf43c3b..013c91ddfd 100644 --- a/src/tbb/arena.cpp +++ b/src/tbb/arena.cpp @@ -149,34 +149,31 @@ void arena::on_thread_leaving(unsigned ref_param) { } } -std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) { - if ( lower >= upper ) return out_of_arena; +std::size_t arena::occupy_free_slot(thread_data& tls) { // Start search for an empty slot from the one we occupied the last time std::size_t index = tls.my_arena_index; - if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower; - __TBB_ASSERT( index >= lower && index < upper, nullptr); + std::size_t locked_slot = out_of_arena; + if ( index >= my_num_slots ) index = tls.my_random.get() % my_num_slots; // Find a free slot - for ( std::size_t i = index; i < upper; ++i ) - if (my_slots[i].try_occupy()) return i; - for ( std::size_t i = lower; i < index; ++i ) - if (my_slots[i].try_occupy()) return i; - return out_of_arena; -} - -template -std::size_t arena::occupy_free_slot(thread_data& tls) { - // Firstly, external threads try to occupy reserved slots - std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls, 0, my_num_reserved_slots ); - if ( index == out_of_arena ) { - // Secondly, all threads try to occupy all non-reserved slots - index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots ); - // Likely this arena is already saturated - if ( index == out_of_arena ) - return out_of_arena; + for (std::size_t i = index; i < my_num_slots; ++i) { + if (my_slots[i].try_occupy()) { + locked_slot = i; + break; + } + } + if (locked_slot == out_of_arena) { + for (std::size_t i = 0; i < index; ++i) { + if (my_slots[i].try_occupy()) { + locked_slot = i; + break; + } + } } - atomic_update( my_limit, (unsigned)(index + 1), std::less() ); - return index; + if (locked_slot != out_of_arena) { + atomic_update( my_limit, (unsigned)(locked_slot + 1), std::less() ); + } + return locked_slot; } std::uintptr_t arena::calculate_stealing_threshold() { @@ -189,7 +186,7 @@ void arena::process(thread_data& tls) { __TBB_ASSERT( is_alive(my_guard), nullptr); __TBB_ASSERT( my_num_slots >= 1, nullptr); - std::size_t index = occupy_free_slot(tls); + std::size_t index = occupy_free_slot(tls); if (index == out_of_arena) { on_thread_leaving(ref_worker); return; @@ -197,8 +194,8 @@ void arena::process(thread_data& tls) { my_tc_client.get_pm_client()->register_thread(); - __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" ); - tls.attach_arena(*this, index); + // remember that we occupied a slot from workers' quota + tls.attach_arena(*this, index, /*is_worker_slot*/ true); // worker thread enters the dispatch loop to look for a work tls.my_inbox.set_is_idle(true); if (tls.my_arena_slot->is_task_pool_published()) { @@ -396,7 +393,13 @@ bool arena::is_top_priority() const { bool arena::try_join() { if (is_joinable()) { - my_references += arena::ref_worker; + // check quota for number of workers in arena + unsigned curr = my_references.fetch_add(arena::ref_worker) + arena::ref_worker; + unsigned workers = curr >> arena::ref_external_bits; + if (workers > my_num_slots - my_num_reserved_slots) { + my_references -= arena::ref_worker; + return false; + } return true; } return false; @@ -626,16 +629,17 @@ void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_a class nested_arena_context : no_copy { public: - nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index) + nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index, bool is_worker_slot) : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext) { if (td.my_arena != &nested_arena) { m_orig_arena = td.my_arena; m_orig_slot_index = td.my_arena_index; + m_orig_is_worker_slot = td.my_is_workers_slot_occupied; m_orig_last_observer = td.my_last_observer; td.detach_task_dispatcher(); - td.attach_arena(nested_arena, slot_index); + td.attach_arena(nested_arena, slot_index, is_worker_slot); if (td.my_inbox.is_idle_state(true)) td.my_inbox.set_is_idle(false); task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher(); @@ -643,7 +647,7 @@ class nested_arena_context : no_copy { // If the calling thread occupies the slots out of external thread reserve we need to notify the // market that this arena requires one worker less. - if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { + if (td.my_is_workers_slot_occupied) { td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ -1); } @@ -679,7 +683,7 @@ class nested_arena_context : no_copy { // Notify the market that this thread releasing a one slot // that can be used by a worker thread. - if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { + if (td.my_is_workers_slot_occupied) { td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ 1); } @@ -687,7 +691,7 @@ class nested_arena_context : no_copy { td.my_arena_slot->release(); td.my_arena->my_exit_monitors.notify_one(); // do not relax! - td.attach_arena(*m_orig_arena, m_orig_slot_index); + td.attach_arena(*m_orig_arena, m_orig_slot_index, m_orig_is_worker_slot); td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp); __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr); } @@ -700,6 +704,7 @@ class nested_arena_context : no_copy { observer_proxy* m_orig_last_observer{ nullptr }; task_dispatcher* m_task_dispatcher{ nullptr }; unsigned m_orig_slot_index{}; + bool m_orig_is_worker_slot{}; bool m_orig_fifo_tasks_allowed{}; bool m_orig_critical_task_allowed{}; }; @@ -757,8 +762,10 @@ void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) { bool same_arena = td->my_arena == a; std::size_t index1 = td->my_arena_index; + bool is_worker_slot = td->my_is_workers_slot_occupied; if (!same_arena) { - index1 = a->occupy_free_slot(*td); + index1 = a->occupy_free_slot(*td); + is_worker_slot = false; if (index1 == arena::out_of_arena) { concurrent_monitor::thread_context waiter((std::uintptr_t)&d); d1::wait_context wo(1); @@ -774,10 +781,10 @@ void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) { a->my_exit_monitors.cancel_wait(waiter); break; } - index2 = a->occupy_free_slot(*td); + index2 = a->occupy_free_slot(*td); if (index2 != arena::out_of_arena) { a->my_exit_monitors.cancel_wait(waiter); - nested_arena_context scope(*td, *a, index2 ); + nested_arena_context scope(*td, *a, index2, /*is_worker_slot=*/false); r1::wait(wo, exec_context); __TBB_ASSERT(!exec_context.my_exception.load(std::memory_order_relaxed), nullptr); // exception can be thrown above, not deferred break; @@ -802,7 +809,7 @@ void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) { context_guard_helper context_guard; context_guard.set_ctx(a->my_default_ctx); - nested_arena_context scope(*td, *a, index1); + nested_arena_context scope(*td, *a, index1, is_worker_slot); #if _WIN64 try { #endif diff --git a/src/tbb/arena.h b/src/tbb/arena.h index 1e95f117b2..a5bf22947a 100644 --- a/src/tbb/arena.h +++ b/src/tbb/arena.h @@ -366,10 +366,7 @@ class arena: public padded static const std::size_t out_of_arena = ~size_t(0); //! Tries to occupy a slot in the arena. On success, returns the slot index; if no slot is available, returns out_of_arena. - template std::size_t occupy_free_slot(thread_data&); - //! Tries to occupy a slot in the specified range. - std::size_t occupy_free_slot_in_range(thread_data& tls, std::size_t lower, std::size_t upper); std::uintptr_t calculate_stealing_threshold(); diff --git a/src/tbb/governor.cpp b/src/tbb/governor.cpp index 218a2bc533..86c3954811 100644 --- a/src/tbb/governor.cpp +++ b/src/tbb/governor.cpp @@ -216,7 +216,7 @@ void governor::init_external_thread() { arena& a = arena::create(thr_control, num_slots, num_reserved_slots, arena_priority_level); // External thread always occupies the first slot thread_data& td = *new(cache_aligned_allocate(sizeof(thread_data))) thread_data(0, false); - td.attach_arena(a, /*slot index*/ 0); + td.attach_arena(a, /*slot index*/ 0, /*is_worker_slot*/ false); __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr); stack_size = a.my_threading_control->worker_stack_size(); diff --git a/src/tbb/thread_data.h b/src/tbb/thread_data.h index 9dfa492a72..c0e51a5c7b 100644 --- a/src/tbb/thread_data.h +++ b/src/tbb/thread_data.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2020-2023 Intel Corporation + Copyright (c) 2020-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -101,6 +101,7 @@ class thread_data : public ::rml::job thread_data(unsigned short index, bool is_worker) : my_arena_index{ index } , my_is_worker{ is_worker } + , my_is_workers_slot_occupied{ false } , my_task_dispatcher{ nullptr } , my_arena{ nullptr } , my_last_client{ nullptr } @@ -131,7 +132,7 @@ class thread_data : public ::rml::job #endif /* __TBB_RESUMABLE_TASKS */ } - void attach_arena(arena& a, std::size_t index); + void attach_arena(arena& a, std::size_t index, bool is_worker_slot); bool is_attached_to(arena*); void attach_task_dispatcher(task_dispatcher&); void detach_task_dispatcher(); @@ -145,6 +146,9 @@ class thread_data : public ::rml::job //! Indicates if the thread is created by RML const bool my_is_worker; + //! Is the slot occupied in arena belongs to workers' quota? + bool my_is_workers_slot_occupied; + //! The current task dipsatcher task_dispatcher* my_task_dispatcher; @@ -202,9 +206,10 @@ class thread_data : public ::rml::job d1::task_group_context my_default_context; }; -inline void thread_data::attach_arena(arena& a, std::size_t index) { +inline void thread_data::attach_arena(arena& a, std::size_t index, bool is_worker_slot) { my_arena = &a; my_arena_index = static_cast(index); + my_is_workers_slot_occupied = is_worker_slot; my_arena_slot = a.my_slots + index; // Read the current slot mail_outbox and attach it to the mail_inbox (remove inbox later maybe) my_inbox.attach(my_arena->mailbox(index)); diff --git a/test/tbb/test_task_arena.cpp b/test/tbb/test_task_arena.cpp index fd930f1995..9630f4764b 100644 --- a/test/tbb/test_task_arena.cpp +++ b/test/tbb/test_task_arena.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2023 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -111,11 +111,13 @@ class ArenaObserver : public tbb::task_scheduler_observer { int myId; // unique observer/arena id within a test int myMaxConcurrency; // concurrency of the associated arena int myNumReservedSlots; // reserved slots in the associated arena + std::atomic myNumActiveWorkers; void on_scheduler_entry( bool is_worker ) override { int current_index = tbb::this_task_arena::current_thread_index(); CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); if (is_worker) { - CHECK(current_index >= myNumReservedSlots); + int currNumActiveWorkers = ++myNumActiveWorkers; + CHECK(currNumActiveWorkers <= myMaxConcurrency - myNumReservedSlots); } CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry"); old_id.local() = local_id.local(); @@ -123,19 +125,23 @@ class ArenaObserver : public tbb::task_scheduler_observer { local_id.local() = myId; slot_id.local() = current_index; } - void on_scheduler_exit( bool /*is_worker*/ ) override { + void on_scheduler_exit( bool is_worker ) override { CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken"); CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index()); slot_id.local() = -2; local_id.local() = old_id.local(); old_id.local() = 0; + if (is_worker) { + --myNumActiveWorkers; + } } public: ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id) : tbb::task_scheduler_observer(a) , myId(id) , myMaxConcurrency(maxConcurrency) - , myNumReservedSlots(numReservedSlots) { + , myNumReservedSlots(numReservedSlots) + , myNumActiveWorkers(0) { CHECK(myId); observe(true); } @@ -433,12 +439,11 @@ void TestArenaEntryConsistency() { class TestArenaConcurrencyBody : utils::NoAssign { tbb::task_arena &my_a; int my_max_concurrency; - int my_reserved_slots; utils::SpinBarrier *my_barrier; utils::SpinBarrier *my_worker_barrier; public: - TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, utils::SpinBarrier *b = nullptr, utils::SpinBarrier *wb = nullptr ) - : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {} + TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, utils::SpinBarrier *b = nullptr, utils::SpinBarrier *wb = nullptr ) + : my_a(a), my_max_concurrency(max_concurrency), my_barrier(b), my_worker_barrier(wb) {} // NativeParallelFor's functor void operator()( int ) const { CHECK_MESSAGE( local_id.local() == 0, "TLS was not cleaned?" ); @@ -453,12 +458,8 @@ class TestArenaConcurrencyBody : utils::NoAssign { int max_arena_concurrency = tbb::this_task_arena::max_concurrency(); REQUIRE( max_arena_concurrency == my_max_concurrency ); if ( my_worker_barrier ) { - if ( local_id.local() == 1 ) { - // External thread in a reserved slot - CHECK_MESSAGE( idx < my_reserved_slots, "External threads are supposed to use only reserved slots in this test" ); - } else { + if ( local_id.local() != 1 ) { // Worker thread - CHECK( idx >= my_reserved_slots ); my_worker_barrier->wait(); } } else if ( my_barrier ) @@ -476,7 +477,7 @@ void TestArenaConcurrency( int p, int reserved = 0, int step = 1) { ResetTLS(); utils::SpinBarrier b( p ); utils::SpinBarrier wb( p-reserved ); - TestArenaConcurrencyBody test( a, p, reserved, &b, &wb ); + TestArenaConcurrencyBody test( a, p, &b, &wb ); for ( int i = reserved; i < p; ++i ) // requests p-reserved worker threads a.enqueue( test ); if ( reserved==1 ) @@ -487,7 +488,7 @@ void TestArenaConcurrency( int p, int reserved = 0, int step = 1) { } { // Check if multiple external threads alone can achieve maximum concurrency. ResetTLS(); utils::SpinBarrier b( p ); - utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) ); + utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, &b ) ); a.debug_wait_until_empty(); } { // Check oversubscription by external threads. #if !_WIN32 || !_WIN64 @@ -500,7 +501,7 @@ void TestArenaConcurrency( int p, int reserved = 0, int step = 1) { #endif { ResetTLS(); - utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p, reserved)); + utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p)); a.debug_wait_until_empty(); } } @@ -1545,20 +1546,26 @@ class simple_observer : public tbb::task_scheduler_observer { int my_idx; int myMaxConcurrency; // concurrency of the associated arena int myNumReservedSlots; // reserved slots in the associated arena + std::atomic myNumActiveWorkers; void on_scheduler_entry( bool is_worker ) override { int current_index = tbb::this_task_arena::current_thread_index(); CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); if (is_worker) { - CHECK(current_index >= myNumReservedSlots); + int currNumActiveWorkers = ++myNumActiveWorkers; + CHECK(currNumActiveWorkers <= myMaxConcurrency - myNumReservedSlots); + } + } + void on_scheduler_exit( bool is_worker ) override { + if (is_worker) { + --myNumActiveWorkers; } } - void on_scheduler_exit( bool /*is_worker*/ ) override - {} public: simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots) : tbb::task_scheduler_observer(a), my_idx(idx_counter++) , myMaxConcurrency(maxConcurrency) - , myNumReservedSlots(numReservedSlots) { + , myNumReservedSlots(numReservedSlots) + , myNumActiveWorkers(0) { observe(true); }