From 67da7fd22e072b3ab0070eb884f9689a9297e877 Mon Sep 17 00:00:00 2001 From: pavelkumbrasev Date: Wed, 13 Nov 2024 10:56:04 +0000 Subject: [PATCH 01/11] Add initial impl of Parallel Block Signed-off-by: pavelkumbrasev --- include/oneapi/tbb/detail/_config.h | 4 ++ include/oneapi/tbb/task_arena.h | 101 +++++++++++++++++++++++----- src/tbb/arena.cpp | 45 +++++++++++-- src/tbb/arena.h | 86 ++++++++++++++++++++++- src/tbb/waiters.h | 14 +++- test/common/config.h | 3 + test/tbb/test_task_arena.cpp | 38 +++++++++++ 7 files changed, 264 insertions(+), 27 deletions(-) diff --git a/include/oneapi/tbb/detail/_config.h b/include/oneapi/tbb/detail/_config.h index e676b1558b..bf2c085609 100644 --- a/include/oneapi/tbb/detail/_config.h +++ b/include/oneapi/tbb/detail/_config.h @@ -534,4 +534,8 @@ #define __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1 #endif +#if TBB_PREVIEW_PARALLEL_BLOCK || __TBB_BUILD +#define __TBB_PREVIEW_PARALLEL_BLOCK 1 +#endif + #endif // __TBB_detail__config_H diff --git a/include/oneapi/tbb/task_arena.h b/include/oneapi/tbb/task_arena.h index 5ce41d99c9..17f7fdb4a0 100644 --- a/include/oneapi/tbb/task_arena.h +++ b/include/oneapi/tbb/task_arena.h @@ -122,6 +122,14 @@ class task_arena_base { normal = 2 * priority_stride, high = 3 * priority_stride }; + +#if __TBB_PREVIEW_PARALLEL_BLOCK + enum class workers_leave : int { + fast = 1, + delayed = 2 + }; +#endif + #if __TBB_ARENA_BINDING using constraints = tbb::detail::d1::constraints; #endif /*__TBB_ARENA_BINDING*/ @@ -154,6 +162,11 @@ class task_arena_base { //! Number of threads per core int my_max_threads_per_core; +#if __TBB_PREVIEW_PARALLEL_BLOCK + //! Defines the initial behavior of the workers_leave state machine + workers_leave my_workers_leave; +#endif + // Backward compatibility checks. core_type_id core_type() const { return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_core_type : automatic; @@ -167,7 +180,11 @@ class task_arena_base { , core_type_support_flag = 1 }; - task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority) + task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority +#if __TBB_PREVIEW_PARALLEL_BLOCK + , workers_leave wl +#endif + ) : my_version_and_traits(default_flags | core_type_support_flag) , my_initialization_state(do_once_state::uninitialized) , my_arena(nullptr) @@ -177,10 +194,17 @@ class task_arena_base { , my_numa_id(automatic) , my_core_type(automatic) , my_max_threads_per_core(automatic) +#if __TBB_PREVIEW_PARALLEL_BLOCK + , my_workers_leave(wl) +#endif {} #if __TBB_ARENA_BINDING - task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority) + task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority +#if __TBB_PREVIEW_PARALLEL_BLOCK + , workers_leave wl +#endif + ) : my_version_and_traits(default_flags | core_type_support_flag) , my_initialization_state(do_once_state::uninitialized) , my_arena(nullptr) @@ -190,6 +214,9 @@ class task_arena_base { , my_numa_id(constraints_.numa_id) , my_core_type(constraints_.core_type) , my_max_threads_per_core(constraints_.max_threads_per_core) +#if __TBB_PREVIEW_PARALLEL_BLOCK + , my_workers_leave(wl) +#endif {} #endif /*__TBB_ARENA_BINDING*/ public: @@ -259,31 +286,55 @@ class task_arena : public task_arena_base { * Value of 1 is default and reflects behavior of implicit arenas. **/ task_arena(int max_concurrency_ = automatic, unsigned reserved_for_masters = 1, - priority a_priority = priority::normal) - : task_arena_base(max_concurrency_, reserved_for_masters, a_priority) + priority a_priority = priority::normal +#if __TBB_PREVIEW_PARALLEL_BLOCK + , workers_leave wl = workers_leave::delayed +#endif + ) + : task_arena_base(max_concurrency_, reserved_for_masters, a_priority +#if __TBB_PREVIEW_PARALLEL_BLOCK + , wl +#endif + ) {} #if __TBB_ARENA_BINDING //! Creates task arena pinned to certain NUMA node task_arena(const constraints& constraints_, unsigned reserved_for_masters = 1, - priority a_priority = priority::normal) - : task_arena_base(constraints_, reserved_for_masters, a_priority) + priority a_priority = priority::normal +#if __TBB_PREVIEW_PARALLEL_BLOCK + , workers_leave wl = workers_leave::delayed +#endif + ) + : task_arena_base(constraints_, reserved_for_masters, a_priority +#if __TBB_PREVIEW_PARALLEL_BLOCK + , wl +#endif + ) {} //! Copies settings from another task_arena - task_arena(const task_arena &s) // copy settings but not the reference or instance + task_arena(const task_arena &a) // copy settings but not the reference or instance : task_arena_base( constraints{} - .set_numa_id(s.my_numa_id) - .set_max_concurrency(s.my_max_concurrency) - .set_core_type(s.my_core_type) - .set_max_threads_per_core(s.my_max_threads_per_core) - , s.my_num_reserved_slots, s.my_priority) + .set_numa_id(a.my_numa_id) + .set_max_concurrency(a.my_max_concurrency) + .set_core_type(a.my_core_type) + .set_max_threads_per_core(a.my_max_threads_per_core) + , a.my_num_reserved_slots, a.my_priority +#if __TBB_PREVIEW_PARALLEL_BLOCK + , a.my_workers_leave +#endif + ) {} #else //! Copies settings from another task_arena task_arena(const task_arena& a) // copy settings but not the reference or instance - : task_arena_base(a.my_max_concurrency, a.my_num_reserved_slots, a.my_priority) + : task_arena_base(a.my_max_concurrency, a.my_num_reserved_slots, a.my_priority +#if __TBB_PREVIEW_PARALLEL_BLOCK + , a.my_workers_leave +#endif + ) {} #endif /*__TBB_ARENA_BINDING*/ @@ -292,7 +343,11 @@ class task_arena : public task_arena_base { //! Creates an instance of task_arena attached to the current arena of the thread explicit task_arena( attach ) - : task_arena_base(automatic, 1, priority::normal) // use default settings if attach fails + : task_arena_base(automatic, 1, priority::normal +#if __TBB_PREVIEW_PARALLEL_BLOCK + , workers_leave::delayed +#endif + ) // use default settings if attach fails { if (r1::attach(*this)) { mark_initialized(); @@ -311,13 +366,20 @@ class task_arena : public task_arena_base { //! Overrides concurrency level and forces initialization of internal representation void initialize(int max_concurrency_, unsigned reserved_for_masters = 1, - priority a_priority = priority::normal) + priority a_priority = priority::normal +#if __TBB_PREVIEW_PARALLEL_BLOCK + , workers_leave wl = workers_leave::delayed +#endif + ) { __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena"); if( !is_active() ) { my_max_concurrency = max_concurrency_; my_num_reserved_slots = reserved_for_masters; my_priority = a_priority; +#if __TBB_PREVIEW_PARALLEL_BLOCK + my_workers_leave = wl; +#endif r1::initialize(*this); mark_initialized(); } @@ -325,7 +387,11 @@ class task_arena : public task_arena_base { #if __TBB_ARENA_BINDING void initialize(constraints constraints_, unsigned reserved_for_masters = 1, - priority a_priority = priority::normal) + priority a_priority = priority::normal +#if __TBB_PREVIEW_PARALLEL_BLOCK + , workers_leave wl = workers_leave::delayed +#endif + ) { __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena"); if( !is_active() ) { @@ -335,6 +401,9 @@ class task_arena : public task_arena_base { my_max_threads_per_core = constraints_.max_threads_per_core; my_num_reserved_slots = reserved_for_masters; my_priority = a_priority; +#if __TBB_PREVIEW_PARALLEL_BLOCK + my_workers_leave = wl; +#endif r1::initialize(*this); mark_initialized(); } diff --git a/src/tbb/arena.cpp b/src/tbb/arena.cpp index 6ca062d02f..ae8113a139 100644 --- a/src/tbb/arena.cpp +++ b/src/tbb/arena.cpp @@ -241,7 +241,12 @@ void arena::process(thread_data& tls) { __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join"); } -arena::arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level) { +arena::arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level +#if __TBB_PREVIEW_PARALLEL_BLOCK + , tbb::task_arena::workers_leave wl +#endif +) +{ __TBB_ASSERT( !my_guard, "improperly allocated arena?" ); __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" ); __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" ); @@ -276,10 +281,18 @@ arena::arena(threading_control* control, unsigned num_slots, unsigned num_reserv my_critical_task_stream.initialize(my_num_slots); #endif my_mandatory_requests = 0; + +#if __TBB_PREVIEW_PARALLEL_BLOCK + my_thread_leave.set_initial_state(wl); +#endif } arena& arena::allocate_arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, - unsigned priority_level) + unsigned priority_level +#if __TBB_PREVIEW_PARALLEL_BLOCK + , tbb::task_arena::workers_leave wl +#endif +) { __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" ); __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" ); @@ -290,7 +303,11 @@ arena& arena::allocate_arena(threading_control* control, unsigned num_slots, uns std::memset( storage, 0, n ); return *new( storage + num_arena_slots(num_slots, num_reserved_slots) * sizeof(mail_outbox) ) - arena(control, num_slots, num_reserved_slots, priority_level); + arena(control, num_slots, num_reserved_slots, priority_level +#if __TBB_PREVIEW_PARALLEL_BLOCK + , wl +#endif + ); } void arena::free_arena () { @@ -340,6 +357,9 @@ bool arena::has_enqueued_tasks() { } void arena::request_workers(int mandatory_delta, int workers_delta, bool wakeup_threads) { +#if __TBB_PREVIEW_PARALLEL_BLOCK + my_thread_leave.restore_state_if_needed(); +#endif my_threading_control->adjust_demand(my_tc_client, mandatory_delta, workers_delta); if (wakeup_threads) { @@ -443,11 +463,20 @@ void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& advertise_new_work(); } -arena& arena::create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned arena_priority_level, d1::constraints constraints) { +arena& arena::create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned arena_priority_level, d1::constraints constraints +#if __TBB_PREVIEW_PARALLEL_BLOCK + , tbb::task_arena::workers_leave wl +#endif +) +{ __TBB_ASSERT(num_slots > 0, NULL); __TBB_ASSERT(num_reserved_slots <= num_slots, NULL); // Add public market reference for an external thread/task_arena (that adds an internal reference in exchange). - arena& a = arena::allocate_arena(control, num_slots, num_reserved_slots, arena_priority_level); + arena& a = arena::allocate_arena(control, num_slots, num_reserved_slots, arena_priority_level +#if __TBB_PREVIEW_PARALLEL_BLOCK + , wl +#endif + ); a.my_tc_client = control->create_client(a); // We should not publish arena until all fields are initialized control->publish_client(a.my_tc_client, constraints); @@ -568,7 +597,11 @@ void task_arena_impl::initialize(d1::task_arena_base& ta) { __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized"); unsigned priority_level = arena_priority_level(ta.my_priority); threading_control* thr_control = threading_control::register_public_reference(); - arena& a = arena::create(thr_control, unsigned(ta.my_max_concurrency), ta.my_num_reserved_slots, priority_level, arena_constraints); + arena& a = arena::create(thr_control, unsigned(ta.my_max_concurrency), ta.my_num_reserved_slots, priority_level, arena_constraints +#if __TBB_PREVIEW_PARALLEL_BLOCK + , ta.my_workers_leave +#endif + ); ta.my_arena.store(&a, std::memory_order_release); #if __TBB_CPUBIND_PRESENT diff --git a/src/tbb/arena.h b/src/tbb/arena.h index 1e95f117b2..d28ce3f354 100644 --- a/src/tbb/arena.h +++ b/src/tbb/arena.h @@ -179,6 +179,69 @@ class atomic_flag { } }; +#if __TBB_PREVIEW_PARALLEL_BLOCK +class thread_leave_manager { + static const std::uintptr_t FAST_LEAVE = 1; + static const std::uintptr_t ONE_TIME_FAST_LEAVE = 1 << 1; + static const std::uintptr_t DELAYED_LEAVE = 1 << 2; + static const std::uintptr_t PARALLEL_BLOCK = 1 << 3; + + std::atomic my_state{0}; +public: + void set_initial_state(tbb::task_arena::workers_leave wl) { + if (wl == tbb::task_arena::workers_leave::delayed) { + my_state.store(DELAYED_LEAVE, std::memory_order_relaxed); + } else { + my_state.store(FAST_LEAVE, std::memory_order_relaxed); + } + } + + void restore_state_if_needed() { + std::uintptr_t curr = ONE_TIME_FAST_LEAVE; + if (my_state.load(std::memory_order_relaxed) == curr) { + my_state.compare_exchange_strong(curr, DELAYED_LEAVE); + } + } + + void register_parallel_block() { + __TBB_ASSERT(my_state.load(std::memory_order_relaxed) != 0, "The initial state was not set"); + + std::uintptr_t prev = my_state.load(std::memory_order_relaxed); + std::uintptr_t desired{}; + do { + if (prev & PARALLEL_BLOCK) { + desired = PARALLEL_BLOCK + prev; + } else if (prev == ONE_TIME_FAST_LEAVE) { + desired = PARALLEL_BLOCK | DELAYED_LEAVE; + } else { + desired = PARALLEL_BLOCK | prev; + } + } while (!my_state.compare_exchange_strong(prev, desired)); + } + + void register_parallel_block(bool enable_fast_leave) { + __TBB_ASSERT(my_state.load(std::memory_order_relaxed) != 0, "The initial state was not set"); + + std::uintptr_t prev = my_state.load(std::memory_order_relaxed); + std::uintptr_t desired{}; + do { + if (((prev - PARALLEL_BLOCK) & PARALLEL_BLOCK) != 0) { + desired = PARALLEL_BLOCK - prev; + } else { + desired = enable_fast_leave ? ONE_TIME_FAST_LEAVE : prev - PARALLEL_BLOCK; + } + } while (!my_state.compare_exchange_strong(prev, desired)); + } + + bool should_leave() { + __TBB_ASSERT(my_state.load(std::memory_order_relaxed) != 0, "The initial state was not set"); + + std::uintptr_t curr = my_state.load(std::memory_order_relaxed); + return curr == FAST_LEAVE || curr == ONE_TIME_FAST_LEAVE; + } +}; +#endif /* __TBB_PREVIEW_PARALLEL_BLOCK */ + //! The structure of an arena, except the array of slots. /** Separated in order to simplify padding. Intrusive list node base class is used by market to form a list of arenas. **/ @@ -245,6 +308,11 @@ struct arena_base : padded { //! Waiting object for external threads that cannot join the arena. concurrent_monitor my_exit_monitors; +#if __TBB_PREVIEW_PARALLEL_BLOCK + //! Manages state of thread_leave state machine + thread_leave_manager my_thread_leave; +#endif + //! Coroutines (task_dispathers) cache buffer arena_co_cache my_co_cache; @@ -281,13 +349,25 @@ class arena: public padded }; //! Constructor - arena(threading_control* control, unsigned max_num_workers, unsigned num_reserved_slots, unsigned priority_level); + arena(threading_control* control, unsigned max_num_workers, unsigned num_reserved_slots, unsigned priority_level +#if __TBB_PREVIEW_PARALLEL_BLOCK + , tbb::task_arena::workers_leave wl +#endif + ); //! Allocate an instance of arena. static arena& allocate_arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, - unsigned priority_level); + unsigned priority_level +#if __TBB_PREVIEW_PARALLEL_BLOCK + , tbb::task_arena::workers_leave wl +#endif + ); - static arena& create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned arena_priority_level, d1::constraints constraints = d1::constraints{}); + static arena& create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned arena_priority_level, d1::constraints constraints = d1::constraints{} +#if __TBB_PREVIEW_PARALLEL_BLOCK + , tbb::task_arena::workers_leave wl = tbb::task_arena::workers_leave::delayed +#endif + ); static int unsigned num_arena_slots ( unsigned num_slots, unsigned num_reserved_slots ) { return num_reserved_slots == 0 ? num_slots : max(2u, num_slots); diff --git a/src/tbb/waiters.h b/src/tbb/waiters.h index 8ed431f857..c27a6a4749 100644 --- a/src/tbb/waiters.h +++ b/src/tbb/waiters.h @@ -58,7 +58,12 @@ class outermost_worker_waiter : public waiter_base { __TBB_ASSERT(t == nullptr, nullptr); if (is_worker_should_leave(slot)) { - if (!governor::hybrid_cpu()) { + if (!governor::hybrid_cpu() +#if __TBB_PREVIEW_PARALLEL_BLOCK + && !my_arena.my_thread_leave.should_leave() +#endif + ) + { static constexpr std::chrono::microseconds worker_wait_leave_duration(1000); static_assert(worker_wait_leave_duration > std::chrono::steady_clock::duration(1), "Clock resolution is not enough for measured interval."); @@ -70,7 +75,12 @@ class outermost_worker_waiter : public waiter_base { return true; } - if (my_arena.my_threading_control->is_any_other_client_active()) { + if (my_arena.my_threading_control->is_any_other_client_active() +#if __TBB_PREVIEW_PARALLEL_BLOCK + || my_arena.my_thread_leave.should_leave() +#endif + ) + { break; } d0::yield(); diff --git a/test/common/config.h b/test/common/config.h index c7ff8ba63a..c83fbf2f40 100644 --- a/test/common/config.h +++ b/test/common/config.h @@ -39,6 +39,9 @@ #ifndef TBB_PREVIEW_ISOLATED_TASK_GROUP #define TBB_PREVIEW_ISOLATED_TASK_GROUP 1 #endif +#ifndef TBB_PREVIEW_PARALLEL_BLOCK +#define TBB_PREVIEW_PARALLEL_BLOCK 1 +#endif #endif #include "oneapi/tbb/detail/_config.h" diff --git a/test/tbb/test_task_arena.cpp b/test/tbb/test_task_arena.cpp index 6bd93d4c0e..8f6d424089 100644 --- a/test/tbb/test_task_arena.cpp +++ b/test/tbb/test_task_arena.cpp @@ -2068,3 +2068,41 @@ TEST_CASE("worker threads occupy slots in correct range") { while (counter < 42) { utils::yield(); } } + +#if TBB_PREVIEW_PARALLEL_BLOCK + +//! \brief \ref interface \ref requirement +TEST_CASE("Check that leave faster with workers_leave::fast") { + std::vector start_times_for_delayed_leave; + std::vector start_times_for_fast_leave; + + std::vector start_times(utils::get_platform_max_threads()); + utils::SpinBarrier barrier(utils::get_platform_max_threads()); + auto measure_start_time = [&] { + start_times[tbb::this_task_arena::current_thread_index()] = std::chrono::steady_clock::now(); + barrier.wait(); + }; + + auto get_longest_start = [&] (std::chrono::steady_clock::time_point start_time) { + std::size_t longest_time = 0; + for (auto& time : start_times) { + longest_time = std::max(longest_time, std::chrono::duration_cast(time - start_time).count()); + } + return longest_time; + }; + + // Measure delayed leave + { + tbb::task_arena a(utils::get_platform_max_threads(), 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::delayed); + + for (int i = 0; i < 1000; ++i) { + a.execute([] { + auto start_time = std::chono::steady_clock::now(); + tbb::parallel_for(0, utils::get_platform_max_threads(), measure_start_time, tbb::static_partitioner{}); + }); + start_times_for_delayed_leave.push_back(get_longest_start(start_time)); + } + } +} + +#endif From 1557e7934df352e7178df64961e3834ff7718cfa Mon Sep 17 00:00:00 2001 From: pavelkumbrasev Date: Mon, 18 Nov 2024 15:56:52 +0000 Subject: [PATCH 02/11] Add test for workers_leave Signed-off-by: pavelkumbrasev --- src/tbb/arena.h | 2 +- test/tbb/test_task_arena.cpp | 45 +++++++++++++++++++++++++++--------- 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/src/tbb/arena.h b/src/tbb/arena.h index d28ce3f354..fcb0d089e6 100644 --- a/src/tbb/arena.h +++ b/src/tbb/arena.h @@ -219,7 +219,7 @@ class thread_leave_manager { } while (!my_state.compare_exchange_strong(prev, desired)); } - void register_parallel_block(bool enable_fast_leave) { + void unregister_parallel_block(bool enable_fast_leave) { __TBB_ASSERT(my_state.load(std::memory_order_relaxed) != 0, "The initial state was not set"); std::uintptr_t prev = my_state.load(std::memory_order_relaxed); diff --git a/test/tbb/test_task_arena.cpp b/test/tbb/test_task_arena.cpp index 8f6d424089..b9e0dbfef6 100644 --- a/test/tbb/test_task_arena.cpp +++ b/test/tbb/test_task_arena.cpp @@ -2073,12 +2073,16 @@ TEST_CASE("worker threads occupy slots in correct range") { //! \brief \ref interface \ref requirement TEST_CASE("Check that leave faster with workers_leave::fast") { + std::size_t num_threads = utils::get_platform_max_threads(); + std::size_t num_runs = 1000; std::vector start_times_for_delayed_leave; + start_times_for_delayed_leave.reserve(num_runs); std::vector start_times_for_fast_leave; + start_times_for_fast_leave.reserve(num_runs); - std::vector start_times(utils::get_platform_max_threads()); - utils::SpinBarrier barrier(utils::get_platform_max_threads()); - auto measure_start_time = [&] { + std::vector start_times(num_threads); + utils::SpinBarrier barrier(num_threads); + auto measure_start_time = [&] (std::size_t) { start_times[tbb::this_task_arena::current_thread_index()] = std::chrono::steady_clock::now(); barrier.wait(); }; @@ -2086,23 +2090,42 @@ TEST_CASE("Check that leave faster with workers_leave::fast") { auto get_longest_start = [&] (std::chrono::steady_clock::time_point start_time) { std::size_t longest_time = 0; for (auto& time : start_times) { - longest_time = std::max(longest_time, std::chrono::duration_cast(time - start_time).count()); + longest_time = std::max(longest_time, (std::size_t)std::chrono::duration_cast(time - start_time).count()); } return longest_time; }; - // Measure delayed leave + std::size_t avg_start_time_delayed = 0; + { + tbb::task_arena a(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::delayed); + + for (std::size_t i = 0; i < num_runs; ++i) { + a.execute([&] { + auto start_time = std::chrono::steady_clock::now(); + tbb::parallel_for(std::size_t(0), num_threads, measure_start_time, tbb::static_partitioner{}); + start_times_for_delayed_leave.push_back(get_longest_start(start_time)); + }); + std::this_thread::sleep_for(std::chrono::microseconds(i)); + } + avg_start_time_delayed = std::accumulate(start_times_for_delayed_leave.begin(), start_times_for_delayed_leave.end(), 0) / num_runs; + } + + std::size_t avg_start_time_fast = 0; { - tbb::task_arena a(utils::get_platform_max_threads(), 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::delayed); + tbb::task_arena a(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::fast); - for (int i = 0; i < 1000; ++i) { - a.execute([] { - auto start_time = std::chono::steady_clock::now(); - tbb::parallel_for(0, utils::get_platform_max_threads(), measure_start_time, tbb::static_partitioner{}); + for (std::size_t i = 0; i < num_runs; ++i) { + a.execute([&] { + auto start_time = std::chrono::steady_clock::now(); + tbb::parallel_for(std::size_t(0), num_threads, measure_start_time, tbb::static_partitioner{}); + start_times_for_fast_leave.push_back(get_longest_start(start_time)); }); - start_times_for_delayed_leave.push_back(get_longest_start(start_time)); + std::this_thread::sleep_for(std::chrono::microseconds(i)); } + avg_start_time_fast = std::accumulate(start_times_for_fast_leave.begin(), start_times_for_fast_leave.end(), 0) / num_runs; } + + REQUIRE(avg_start_time_delayed < avg_start_time_fast); } #endif From 4edb96af4fceac7f7fd94323472c50316c90bfa7 Mon Sep 17 00:00:00 2001 From: pavelkumbrasev Date: Wed, 20 Nov 2024 14:35:59 +0000 Subject: [PATCH 03/11] Fix thread_leave_manager Signed-off-by: pavelkumbrasev --- src/tbb/arena.h | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/tbb/arena.h b/src/tbb/arena.h index fcb0d089e6..cb921fab6a 100644 --- a/src/tbb/arena.h +++ b/src/tbb/arena.h @@ -185,6 +185,7 @@ class thread_leave_manager { static const std::uintptr_t ONE_TIME_FAST_LEAVE = 1 << 1; static const std::uintptr_t DELAYED_LEAVE = 1 << 2; static const std::uintptr_t PARALLEL_BLOCK = 1 << 3; + static const std::uintptr_t PARALLEL_BLOCK_MASK = ~((1LLU << 32) - 1) & ~(0x7); std::atomic my_state{0}; public: @@ -199,6 +200,8 @@ class thread_leave_manager { void restore_state_if_needed() { std::uintptr_t curr = ONE_TIME_FAST_LEAVE; if (my_state.load(std::memory_order_relaxed) == curr) { + // Potentially can override desicion of the parallel block from future epoch + // but it is not a problem because it does not violate the correctness my_state.compare_exchange_strong(curr, DELAYED_LEAVE); } } @@ -209,7 +212,7 @@ class thread_leave_manager { std::uintptr_t prev = my_state.load(std::memory_order_relaxed); std::uintptr_t desired{}; do { - if (prev & PARALLEL_BLOCK) { + if (prev & PARALLEL_BLOCK_MASK) { desired = PARALLEL_BLOCK + prev; } else if (prev == ONE_TIME_FAST_LEAVE) { desired = PARALLEL_BLOCK | DELAYED_LEAVE; @@ -225,10 +228,10 @@ class thread_leave_manager { std::uintptr_t prev = my_state.load(std::memory_order_relaxed); std::uintptr_t desired{}; do { - if (((prev - PARALLEL_BLOCK) & PARALLEL_BLOCK) != 0) { - desired = PARALLEL_BLOCK - prev; + if (((prev - PARALLEL_BLOCK) & PARALLEL_BLOCK_MASK) != 0) { + desired = prev - PARALLEL_BLOCK; } else { - desired = enable_fast_leave ? ONE_TIME_FAST_LEAVE : prev - PARALLEL_BLOCK; + desired = enable_fast_leave && (prev - PARALLEL_BLOCK == DELAYED_LEAVE) ? ONE_TIME_FAST_LEAVE : prev - PARALLEL_BLOCK; } } while (!my_state.compare_exchange_strong(prev, desired)); } From 37864ddf7cd5dd3064682e498c90a2d59f5187a1 Mon Sep 17 00:00:00 2001 From: "Isaev, Ilya" Date: Wed, 20 Nov 2024 17:15:00 +0100 Subject: [PATCH 04/11] Add parallel_block API, improve tests Signed-off-by: Isaev, Ilya --- include/oneapi/tbb/task_arena.h | 17 ++++++ src/tbb/arena.cpp | 20 ++++++++ src/tbb/def/lin64-tbb.def | 2 + test/tbb/test_task_arena.cpp | 91 +++++++++++++++++++++------------ 4 files changed, 98 insertions(+), 32 deletions(-) diff --git a/include/oneapi/tbb/task_arena.h b/include/oneapi/tbb/task_arena.h index 17f7fdb4a0..11c3e6dca3 100644 --- a/include/oneapi/tbb/task_arena.h +++ b/include/oneapi/tbb/task_arena.h @@ -95,6 +95,11 @@ TBB_EXPORT void __TBB_EXPORTED_FUNC isolate_within_arena(d1::delegate_base& d, s TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_arena_base*); TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_group_context&, d1::task_arena_base*); TBB_EXPORT void __TBB_EXPORTED_FUNC submit(d1::task&, d1::task_group_context&, arena*, std::uintptr_t); + +#if __TBB_PREVIEW_PARALLEL_BLOCK +TBB_EXPORT void __TBB_EXPORTED_FUNC register_parallel_block(d1::task_arena_base&); +TBB_EXPORT void __TBB_EXPORTED_FUNC unregister_parallel_block(d1::task_arena_base&, bool); +#endif } // namespace r1 namespace d2 { @@ -473,6 +478,18 @@ class task_arena : public task_arena_base { return execute_impl(f); } +#if __TBB_PREVIEW_PARALLEL_BLOCK + void parallel_block_start() { + initialize(); + r1::register_parallel_block(*this); + enqueue([]{}); + } + void parallel_block_end(bool set_one_time_fast_leave = false) { + __TBB_ASSERT(my_initialization_state.load(std::memory_order_relaxed) == do_once_state::initialized, nullptr); + r1::unregister_parallel_block(*this, set_one_time_fast_leave); + } +#endif + #if __TBB_EXTRA_DEBUG //! Returns my_num_reserved_slots int debug_reserved_slots() const { diff --git a/src/tbb/arena.cpp b/src/tbb/arena.cpp index ae8113a139..2b267a3682 100644 --- a/src/tbb/arena.cpp +++ b/src/tbb/arena.cpp @@ -529,6 +529,8 @@ struct task_arena_impl { static int max_concurrency(const d1::task_arena_base*); static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*); static d1::slot_id execution_slot(const d1::task_arena_base&); + static void register_parallel_block(d1::task_arena_base&); + static void unregister_parallel_block(d1::task_arena_base&, bool); }; void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) { @@ -563,6 +565,14 @@ d1::slot_id __TBB_EXPORTED_FUNC execution_slot(const d1::task_arena_base& arena) return task_arena_impl::execution_slot(arena); } +void __TBB_EXPORTED_FUNC register_parallel_block(d1::task_arena_base& ta) { + task_arena_impl::register_parallel_block(ta); +} + +void __TBB_EXPORTED_FUNC unregister_parallel_block(d1::task_arena_base& ta, bool one_time_fast_leave) { + task_arena_impl::unregister_parallel_block(ta, one_time_fast_leave); +} + void task_arena_impl::initialize(d1::task_arena_base& ta) { // Enforce global market initialization to properly initialize soft limit (void)governor::get_thread_data(); @@ -908,6 +918,16 @@ int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) { return int(governor::default_num_threads()); } +#if __TBB_PREVIEW_PARALLEL_BLOCK +void task_arena_impl::register_parallel_block(d1::task_arena_base& ta) { + ta.my_arena.load(std::memory_order_relaxed)->my_thread_leave.register_parallel_block(); +} + +void task_arena_impl::unregister_parallel_block(d1::task_arena_base& ta, bool one_time_fast_leave) { + ta.my_arena.load(std::memory_order_relaxed)->my_thread_leave.unregister_parallel_block(one_time_fast_leave); +} +#endif + void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) { // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it? thread_data* tls = governor::get_thread_data(); diff --git a/src/tbb/def/lin64-tbb.def b/src/tbb/def/lin64-tbb.def index 41aca2e932..afb9c2e8e0 100644 --- a/src/tbb/def/lin64-tbb.def +++ b/src/tbb/def/lin64-tbb.def @@ -107,6 +107,8 @@ _ZN3tbb6detail2r17enqueueERNS0_2d14taskEPNS2_15task_arena_baseE; _ZN3tbb6detail2r17enqueueERNS0_2d14taskERNS2_18task_group_contextEPNS2_15task_arena_baseE; _ZN3tbb6detail2r14waitERNS0_2d115task_arena_baseE; _ZN3tbb6detail2r114execution_slotERKNS0_2d115task_arena_baseE; +_ZN3tbb6detail2r123register_parallel_blockERNS0_2d115task_arena_baseE; +_ZN3tbb6detail2r125unregister_parallel_blockERNS0_2d115task_arena_baseEb; /* System topology parsing and threads pinning (governor.cpp) */ _ZN3tbb6detail2r115numa_node_countEv; diff --git a/test/tbb/test_task_arena.cpp b/test/tbb/test_task_arena.cpp index b9e0dbfef6..567ed5e468 100644 --- a/test/tbb/test_task_arena.cpp +++ b/test/tbb/test_task_arena.cpp @@ -2071,14 +2071,12 @@ TEST_CASE("worker threads occupy slots in correct range") { #if TBB_PREVIEW_PARALLEL_BLOCK -//! \brief \ref interface \ref requirement -TEST_CASE("Check that leave faster with workers_leave::fast") { +template +std::size_t measure_avg_start_time(tbb::task_arena& ta, const F& ) { std::size_t num_threads = utils::get_platform_max_threads(); - std::size_t num_runs = 1000; - std::vector start_times_for_delayed_leave; - start_times_for_delayed_leave.reserve(num_runs); - std::vector start_times_for_fast_leave; - start_times_for_fast_leave.reserve(num_runs); + std::size_t num_runs = 100; + std::vector longest_start_times; + longest_start_times.reserve(num_runs); std::vector start_times(num_threads); utils::SpinBarrier barrier(num_threads); @@ -2095,37 +2093,66 @@ TEST_CASE("Check that leave faster with workers_leave::fast") { return longest_time; }; - std::size_t avg_start_time_delayed = 0; - { - tbb::task_arena a(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::delayed); - - for (std::size_t i = 0; i < num_runs; ++i) { - a.execute([&] { - auto start_time = std::chrono::steady_clock::now(); - tbb::parallel_for(std::size_t(0), num_threads, measure_start_time, tbb::static_partitioner{}); - start_times_for_delayed_leave.push_back(get_longest_start(start_time)); - }); - std::this_thread::sleep_for(std::chrono::microseconds(i)); - } - avg_start_time_delayed = std::accumulate(start_times_for_delayed_leave.begin(), start_times_for_delayed_leave.end(), 0) / num_runs; + for (std::size_t i = 0; i < num_runs; ++i) { + ta.execute([&] { + auto start_time = std::chrono::steady_clock::now(); + tbb::parallel_for(std::size_t(0), num_threads, measure_start_time, tbb::static_partitioner{}); + longest_start_times.push_back(get_longest_start(start_time)); + }); + std::this_thread::sleep_for(std::chrono::microseconds(500)); } + return std::accumulate(longest_start_times.begin(), longest_start_times.end(), 0) / num_runs; +} - std::size_t avg_start_time_fast = 0; - { - tbb::task_arena a(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::fast); +//! \brief \ref interface \ref requirement +TEST_CASE("Check that workers leave faster with workers_leave::fast") { + std::size_t num_threads = utils::get_platform_max_threads(); + std::size_t successes = 0; + std::size_t num_trials = 100; + double required_success_rate = 0.8; + for (std::size_t i = 0; i < num_trials; ++i) { + std::size_t avg_start_time_delayed = 0; + { + tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::delayed); + avg_start_time_delayed = measure_avg_start_time(ta, []{}); + } - for (std::size_t i = 0; i < num_runs; ++i) { - a.execute([&] { - auto start_time = std::chrono::steady_clock::now(); - tbb::parallel_for(std::size_t(0), num_threads, measure_start_time, tbb::static_partitioner{}); - start_times_for_fast_leave.push_back(get_longest_start(start_time)); - }); - std::this_thread::sleep_for(std::chrono::microseconds(i)); + std::size_t avg_start_time_fast = 0; + { + tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::fast); + avg_start_time_fast = measure_avg_start_time(ta, []{}); } - avg_start_time_fast = std::accumulate(start_times_for_fast_leave.begin(), start_times_for_fast_leave.end(), 0) / num_runs; + successes += avg_start_time_delayed < avg_start_time_fast ? 1 : 0; } + REQUIRE(successes >= num_trials * required_success_rate); +} - REQUIRE(avg_start_time_delayed < avg_start_time_fast); +TEST_CASE("Check that parallel_block sticks threads to task_arena") { + std::size_t num_threads = utils::get_platform_max_threads(); + std::size_t successes = 0; + std::size_t num_trials = 100; + double required_success_rate = 0.8; + for (std::size_t i = 0; i < num_trials; ++i) { + std::size_t avg_start_time_delayed = 0; + { + tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::delayed); + avg_start_time_delayed = measure_avg_start_time(ta, []{}); + } + std::this_thread::sleep_for(std::chrono::microseconds(1000)); + std::size_t avg_start_time_fast = 0; + { + tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::fast); + // ta.parallel_block_start(); + avg_start_time_fast = measure_avg_start_time(ta, []{}); + // ta.parallel_block_end(true); + } + std::cout << avg_start_time_delayed << " " << avg_start_time_fast << std::endl; + successes += (avg_start_time_delayed < avg_start_time_fast ? + (double)avg_start_time_delayed / avg_start_time_fast >= 0.7 : + (double)avg_start_time_fast / avg_start_time_delayed >= 0.7) ? 1 : 0; + } + REQUIRE(successes >= num_trials * required_success_rate); } + #endif From 448695942c582efd701c5b9bab2227f9ebfc0d82 Mon Sep 17 00:00:00 2001 From: "Isaev, Ilya" Date: Mon, 25 Nov 2024 16:12:22 +0100 Subject: [PATCH 05/11] Improve tests for parallel_block Signed-off-by: Isaev, Ilya --- include/oneapi/tbb/task_arena.h | 19 ++++- src/tbb/waiters.h | 2 +- test/tbb/test_task_arena.cpp | 125 +++++++++++++++++++++----------- 3 files changed, 102 insertions(+), 44 deletions(-) diff --git a/include/oneapi/tbb/task_arena.h b/include/oneapi/tbb/task_arena.h index 11c3e6dca3..f7d3b2ae92 100644 --- a/include/oneapi/tbb/task_arena.h +++ b/include/oneapi/tbb/task_arena.h @@ -479,15 +479,30 @@ class task_arena : public task_arena_base { } #if __TBB_PREVIEW_PARALLEL_BLOCK - void parallel_block_start() { + void start_parallel_block() { initialize(); r1::register_parallel_block(*this); + // Trigger worker threads to join arena enqueue([]{}); } - void parallel_block_end(bool set_one_time_fast_leave = false) { + void end_parallel_block(bool set_one_time_fast_leave = false) { __TBB_ASSERT(my_initialization_state.load(std::memory_order_relaxed) == do_once_state::initialized, nullptr); r1::unregister_parallel_block(*this, set_one_time_fast_leave); } + + class scoped_parallel_block { + task_arena& arena; + bool one_time_fast_leave; + public: + scoped_parallel_block(task_arena& ta, bool set_one_time_fast_leave = true) + : arena(ta), one_time_fast_leave(set_one_time_fast_leave) + { + arena.start_parallel_block(); + } + ~scoped_parallel_block() { + arena.end_parallel_block(one_time_fast_leave); + } + }; #endif #if __TBB_EXTRA_DEBUG diff --git a/src/tbb/waiters.h b/src/tbb/waiters.h index c27a6a4749..16ab9c5b07 100644 --- a/src/tbb/waiters.h +++ b/src/tbb/waiters.h @@ -54,7 +54,7 @@ class outermost_worker_waiter : public waiter_base { public: using waiter_base::waiter_base; - bool continue_execution(arena_slot& slot, d1::task*& t) const { + bool continue_execution(arena_slot& slot, d1::task*& t) { __TBB_ASSERT(t == nullptr, nullptr); if (is_worker_should_leave(slot)) { diff --git a/test/tbb/test_task_arena.cpp b/test/tbb/test_task_arena.cpp index 567ed5e468..1185c0212a 100644 --- a/test/tbb/test_task_arena.cpp +++ b/test/tbb/test_task_arena.cpp @@ -2071,10 +2071,15 @@ TEST_CASE("worker threads occupy slots in correct range") { #if TBB_PREVIEW_PARALLEL_BLOCK -template -std::size_t measure_avg_start_time(tbb::task_arena& ta, const F& ) { +struct dummy_func { + void operator()() const { + } +}; + +template +std::size_t measure_avg_start_time(tbb::task_arena& ta, const F1& start_block = F1{}, const F2& end_block = F2{}) { std::size_t num_threads = utils::get_platform_max_threads(); - std::size_t num_runs = 100; + std::size_t num_runs = 1000; std::vector longest_start_times; longest_start_times.reserve(num_runs); @@ -2096,63 +2101,101 @@ std::size_t measure_avg_start_time(tbb::task_arena& ta, const F& ) { for (std::size_t i = 0; i < num_runs; ++i) { ta.execute([&] { auto start_time = std::chrono::steady_clock::now(); + start_block(); tbb::parallel_for(std::size_t(0), num_threads, measure_start_time, tbb::static_partitioner{}); + end_block(); longest_start_times.push_back(get_longest_start(start_time)); }); - std::this_thread::sleep_for(std::chrono::microseconds(500)); + std::this_thread::sleep_for(std::chrono::microseconds(i)); } - return std::accumulate(longest_start_times.begin(), longest_start_times.end(), 0) / num_runs; + // return std::accumulate(longest_start_times.begin(), longest_start_times.end(), 0) / num_runs; + return utils::median(longest_start_times.begin(), longest_start_times.end()); } //! \brief \ref interface \ref requirement TEST_CASE("Check that workers leave faster with workers_leave::fast") { std::size_t num_threads = utils::get_platform_max_threads(); - std::size_t successes = 0; - std::size_t num_trials = 100; - double required_success_rate = 0.8; - for (std::size_t i = 0; i < num_trials; ++i) { - std::size_t avg_start_time_delayed = 0; - { - tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::delayed); - avg_start_time_delayed = measure_avg_start_time(ta, []{}); + std::size_t num_trials = 20; + std::vector avg_start_time_delayed(num_trials); + { + tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::delayed); + for (std::size_t i = 0; i < num_trials; ++i) { + auto avg_start_time = measure_avg_start_time(ta); + avg_start_time_delayed[i] = avg_start_time; } - - std::size_t avg_start_time_fast = 0; - { - tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::fast); - avg_start_time_fast = measure_avg_start_time(ta, []{}); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::vector avg_start_time_fast(num_trials); + { + tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::fast); + for (std::size_t i = 0; i < num_trials; ++i) { + auto avg_start_time = measure_avg_start_time(ta); + avg_start_time_fast[i] = avg_start_time; } - successes += avg_start_time_delayed < avg_start_time_fast ? 1 : 0; } - REQUIRE(successes >= num_trials * required_success_rate); + std::sort(avg_start_time_delayed.begin(), avg_start_time_delayed.end()); + std::sort(avg_start_time_fast.begin(), avg_start_time_fast.end()); + auto delayed_avg = std::accumulate(avg_start_time_delayed.begin(), avg_start_time_delayed.begin() + num_trials * 0.9, 0) / num_trials; + auto fast_avg = std::accumulate(avg_start_time_fast.begin(), avg_start_time_fast.begin() + num_trials * 0.9, 0) / num_trials; + REQUIRE(delayed_avg < fast_avg); } -TEST_CASE("Check that parallel_block sticks threads to task_arena") { +TEST_CASE("parallel_block retains workers in task_arena") { std::size_t num_threads = utils::get_platform_max_threads(); - std::size_t successes = 0; - std::size_t num_trials = 100; - double required_success_rate = 0.8; - for (std::size_t i = 0; i < num_trials; ++i) { - std::size_t avg_start_time_delayed = 0; - { - tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::delayed); - avg_start_time_delayed = measure_avg_start_time(ta, []{}); + std::size_t num_trials = 20; + std::vector avg_start_time_delayed(num_trials); + { + tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::fast); + for (std::size_t i = 0; i < num_trials; ++i) { + ta.start_parallel_block(); + auto avg_start_time = measure_avg_start_time(ta); + ta.end_parallel_block(true); + avg_start_time_delayed[i] = avg_start_time; } - std::this_thread::sleep_for(std::chrono::microseconds(1000)); - std::size_t avg_start_time_fast = 0; - { - tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::fast); - // ta.parallel_block_start(); - avg_start_time_fast = measure_avg_start_time(ta, []{}); - // ta.parallel_block_end(true); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::vector avg_start_time_fast(num_trials); + { + tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::fast); + for (std::size_t i = 0; i < num_trials; ++i) { + auto avg_start_time = measure_avg_start_time(ta); + avg_start_time_fast[i] = avg_start_time; } - std::cout << avg_start_time_delayed << " " << avg_start_time_fast << std::endl; - successes += (avg_start_time_delayed < avg_start_time_fast ? - (double)avg_start_time_delayed / avg_start_time_fast >= 0.7 : - (double)avg_start_time_fast / avg_start_time_delayed >= 0.7) ? 1 : 0; } - REQUIRE(successes >= num_trials * required_success_rate); + std::sort(avg_start_time_delayed.begin(), avg_start_time_delayed.end()); + std::sort(avg_start_time_fast.begin(), avg_start_time_fast.end()); + auto delayed_avg = std::accumulate(avg_start_time_delayed.begin(), avg_start_time_delayed.begin() + num_trials * 0.9, 0) / num_trials; + auto fast_avg = std::accumulate(avg_start_time_fast.begin(), avg_start_time_fast.begin() + num_trials * 0.9, 0) / num_trials; + REQUIRE(delayed_avg < fast_avg); } +TEST_CASE("Test one-time fast leave") { + std::size_t num_threads = utils::get_platform_max_threads(); + std::size_t num_trials = 20; + std::vector avg_start_time_delayed(num_trials); + { + tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::delayed); + for (std::size_t i = 0; i < num_trials; ++i) { + auto avg_start_time = measure_avg_start_time(ta); + avg_start_time_delayed[i] = avg_start_time; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::vector avg_start_time_fast(num_trials); + { + tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::delayed); + auto start_block = [&ta] { ta.start_parallel_block(); }; + auto end_block = [&ta] { ta.end_parallel_block(true); }; + for (std::size_t i = 0; i < num_trials; ++i) { + auto avg_start_time = measure_avg_start_time(ta, start_block, end_block); + avg_start_time_fast[i] = avg_start_time; + } + } + std::sort(avg_start_time_delayed.begin(), avg_start_time_delayed.end()); + std::sort(avg_start_time_fast.begin(), avg_start_time_fast.end()); + auto delayed_avg = std::accumulate(avg_start_time_delayed.begin(), avg_start_time_delayed.begin() + num_trials * 0.9, 0) / num_trials; + auto fast_avg = std::accumulate(avg_start_time_fast.begin(), avg_start_time_fast.begin() + num_trials * 0.9, 0) / num_trials; + REQUIRE(delayed_avg < fast_avg); +} #endif From a7483eea31eeab0f10bc31a22f6a9b7557230fd9 Mon Sep 17 00:00:00 2001 From: "Isaev, Ilya" Date: Tue, 26 Nov 2024 12:59:23 +0100 Subject: [PATCH 06/11] Add workers_leave::automatic, let hybric systems take advantage of parallel_block Signed-off-by: Isaev, Ilya --- include/oneapi/tbb/task_arena.h | 11 ++++++----- src/tbb/arena.h | 28 ++++++++++++++++------------ src/tbb/def/lin32-tbb.def | 2 ++ src/tbb/waiters.h | 11 +++++++---- test/tbb/test_task_arena.cpp | 6 +++--- 5 files changed, 34 insertions(+), 24 deletions(-) diff --git a/include/oneapi/tbb/task_arena.h b/include/oneapi/tbb/task_arena.h index f7d3b2ae92..69241b73c2 100644 --- a/include/oneapi/tbb/task_arena.h +++ b/include/oneapi/tbb/task_arena.h @@ -130,6 +130,7 @@ class task_arena_base { #if __TBB_PREVIEW_PARALLEL_BLOCK enum class workers_leave : int { + automatic = 0, fast = 1, delayed = 2 }; @@ -293,7 +294,7 @@ class task_arena : public task_arena_base { task_arena(int max_concurrency_ = automatic, unsigned reserved_for_masters = 1, priority a_priority = priority::normal #if __TBB_PREVIEW_PARALLEL_BLOCK - , workers_leave wl = workers_leave::delayed + , workers_leave wl = workers_leave::automatic #endif ) : task_arena_base(max_concurrency_, reserved_for_masters, a_priority @@ -308,7 +309,7 @@ class task_arena : public task_arena_base { task_arena(const constraints& constraints_, unsigned reserved_for_masters = 1, priority a_priority = priority::normal #if __TBB_PREVIEW_PARALLEL_BLOCK - , workers_leave wl = workers_leave::delayed + , workers_leave wl = workers_leave::automatic #endif ) : task_arena_base(constraints_, reserved_for_masters, a_priority @@ -350,7 +351,7 @@ class task_arena : public task_arena_base { explicit task_arena( attach ) : task_arena_base(automatic, 1, priority::normal #if __TBB_PREVIEW_PARALLEL_BLOCK - , workers_leave::delayed + , workers_leave::automatic #endif ) // use default settings if attach fails { @@ -373,7 +374,7 @@ class task_arena : public task_arena_base { void initialize(int max_concurrency_, unsigned reserved_for_masters = 1, priority a_priority = priority::normal #if __TBB_PREVIEW_PARALLEL_BLOCK - , workers_leave wl = workers_leave::delayed + , workers_leave wl = workers_leave::automatic #endif ) { @@ -394,7 +395,7 @@ class task_arena : public task_arena_base { void initialize(constraints constraints_, unsigned reserved_for_masters = 1, priority a_priority = priority::normal #if __TBB_PREVIEW_PARALLEL_BLOCK - , workers_leave wl = workers_leave::delayed + , workers_leave wl = workers_leave::automatic #endif ) { diff --git a/src/tbb/arena.h b/src/tbb/arena.h index cb921fab6a..cfb26067d5 100644 --- a/src/tbb/arena.h +++ b/src/tbb/arena.h @@ -181,15 +181,19 @@ class atomic_flag { #if __TBB_PREVIEW_PARALLEL_BLOCK class thread_leave_manager { - static const std::uintptr_t FAST_LEAVE = 1; - static const std::uintptr_t ONE_TIME_FAST_LEAVE = 1 << 1; - static const std::uintptr_t DELAYED_LEAVE = 1 << 2; - static const std::uintptr_t PARALLEL_BLOCK = 1 << 3; - static const std::uintptr_t PARALLEL_BLOCK_MASK = ~((1LLU << 32) - 1) & ~(0x7); + static const std::uint64_t FAST_LEAVE = 1; + static const std::uint64_t ONE_TIME_FAST_LEAVE = 1 << 1; + static const std::uint64_t DELAYED_LEAVE = 1 << 2; + static const std::uint64_t PARALLEL_BLOCK = 1 << 3; + static const std::uint64_t PARALLEL_BLOCK_MASK = ~((1LLU << 32) - 1) & ~(0x7); - std::atomic my_state{0}; + std::atomic my_state{0}; public: void set_initial_state(tbb::task_arena::workers_leave wl) { + if (wl == tbb::task_arena::workers_leave::automatic) { + wl = governor::hybrid_cpu() ? tbb::task_arena::workers_leave::fast + : tbb::task_arena::workers_leave::delayed; + } if (wl == tbb::task_arena::workers_leave::delayed) { my_state.store(DELAYED_LEAVE, std::memory_order_relaxed); } else { @@ -198,7 +202,7 @@ class thread_leave_manager { } void restore_state_if_needed() { - std::uintptr_t curr = ONE_TIME_FAST_LEAVE; + std::uint64_t curr = ONE_TIME_FAST_LEAVE; if (my_state.load(std::memory_order_relaxed) == curr) { // Potentially can override desicion of the parallel block from future epoch // but it is not a problem because it does not violate the correctness @@ -209,8 +213,8 @@ class thread_leave_manager { void register_parallel_block() { __TBB_ASSERT(my_state.load(std::memory_order_relaxed) != 0, "The initial state was not set"); - std::uintptr_t prev = my_state.load(std::memory_order_relaxed); - std::uintptr_t desired{}; + std::uint64_t prev = my_state.load(std::memory_order_relaxed); + std::uint64_t desired{}; do { if (prev & PARALLEL_BLOCK_MASK) { desired = PARALLEL_BLOCK + prev; @@ -225,8 +229,8 @@ class thread_leave_manager { void unregister_parallel_block(bool enable_fast_leave) { __TBB_ASSERT(my_state.load(std::memory_order_relaxed) != 0, "The initial state was not set"); - std::uintptr_t prev = my_state.load(std::memory_order_relaxed); - std::uintptr_t desired{}; + std::uint64_t prev = my_state.load(std::memory_order_relaxed); + std::uint64_t desired{}; do { if (((prev - PARALLEL_BLOCK) & PARALLEL_BLOCK_MASK) != 0) { desired = prev - PARALLEL_BLOCK; @@ -239,7 +243,7 @@ class thread_leave_manager { bool should_leave() { __TBB_ASSERT(my_state.load(std::memory_order_relaxed) != 0, "The initial state was not set"); - std::uintptr_t curr = my_state.load(std::memory_order_relaxed); + std::uint64_t curr = my_state.load(std::memory_order_relaxed); return curr == FAST_LEAVE || curr == ONE_TIME_FAST_LEAVE; } }; diff --git a/src/tbb/def/lin32-tbb.def b/src/tbb/def/lin32-tbb.def index 737e8ec2af..a33fabbf16 100644 --- a/src/tbb/def/lin32-tbb.def +++ b/src/tbb/def/lin32-tbb.def @@ -107,6 +107,8 @@ _ZN3tbb6detail2r17enqueueERNS0_2d14taskEPNS2_15task_arena_baseE; _ZN3tbb6detail2r17enqueueERNS0_2d14taskERNS2_18task_group_contextEPNS2_15task_arena_baseE; _ZN3tbb6detail2r14waitERNS0_2d115task_arena_baseE; _ZN3tbb6detail2r114execution_slotERKNS0_2d115task_arena_baseE; +_ZN3tbb6detail2r123register_parallel_blockERNS0_2d115task_arena_baseE; +_ZN3tbb6detail2r125unregister_parallel_blockERNS0_2d115task_arena_baseEb; /* System topology parsing and threads pinning (governor.cpp) */ _ZN3tbb6detail2r115numa_node_countEv; diff --git a/src/tbb/waiters.h b/src/tbb/waiters.h index 16ab9c5b07..e7a38db2de 100644 --- a/src/tbb/waiters.h +++ b/src/tbb/waiters.h @@ -58,9 +58,11 @@ class outermost_worker_waiter : public waiter_base { __TBB_ASSERT(t == nullptr, nullptr); if (is_worker_should_leave(slot)) { - if (!governor::hybrid_cpu() + if ( #if __TBB_PREVIEW_PARALLEL_BLOCK - && !my_arena.my_thread_leave.should_leave() + !my_arena.my_thread_leave.should_leave() +#else + !governor::hybrid_cpu() #endif ) { @@ -75,10 +77,11 @@ class outermost_worker_waiter : public waiter_base { return true; } - if (my_arena.my_threading_control->is_any_other_client_active() + if ( #if __TBB_PREVIEW_PARALLEL_BLOCK - || my_arena.my_thread_leave.should_leave() + my_arena.my_thread_leave.should_leave() || #endif + my_arena.my_threading_control->is_any_other_client_active() ) { break; diff --git a/test/tbb/test_task_arena.cpp b/test/tbb/test_task_arena.cpp index 1185c0212a..9007b5c2df 100644 --- a/test/tbb/test_task_arena.cpp +++ b/test/tbb/test_task_arena.cpp @@ -2137,7 +2137,7 @@ TEST_CASE("Check that workers leave faster with workers_leave::fast") { std::sort(avg_start_time_fast.begin(), avg_start_time_fast.end()); auto delayed_avg = std::accumulate(avg_start_time_delayed.begin(), avg_start_time_delayed.begin() + num_trials * 0.9, 0) / num_trials; auto fast_avg = std::accumulate(avg_start_time_fast.begin(), avg_start_time_fast.begin() + num_trials * 0.9, 0) / num_trials; - REQUIRE(delayed_avg < fast_avg); + WARN_MESSAGE(delayed_avg < fast_avg, "Expected workers start new work faster with delayed leave"); } TEST_CASE("parallel_block retains workers in task_arena") { @@ -2166,7 +2166,7 @@ TEST_CASE("parallel_block retains workers in task_arena") { std::sort(avg_start_time_fast.begin(), avg_start_time_fast.end()); auto delayed_avg = std::accumulate(avg_start_time_delayed.begin(), avg_start_time_delayed.begin() + num_trials * 0.9, 0) / num_trials; auto fast_avg = std::accumulate(avg_start_time_fast.begin(), avg_start_time_fast.begin() + num_trials * 0.9, 0) / num_trials; - REQUIRE(delayed_avg < fast_avg); + WARN_MESSAGE(delayed_avg < fast_avg, "Expected workers start new work faster when using parallel_block"); } TEST_CASE("Test one-time fast leave") { @@ -2195,7 +2195,7 @@ TEST_CASE("Test one-time fast leave") { std::sort(avg_start_time_fast.begin(), avg_start_time_fast.end()); auto delayed_avg = std::accumulate(avg_start_time_delayed.begin(), avg_start_time_delayed.begin() + num_trials * 0.9, 0) / num_trials; auto fast_avg = std::accumulate(avg_start_time_fast.begin(), avg_start_time_fast.begin() + num_trials * 0.9, 0) / num_trials; - REQUIRE(delayed_avg < fast_avg); + WARN_MESSAGE(delayed_avg < fast_avg, "Expected one-time fast leave setting to slow workers to start new work"); } #endif From 755e74f788e76d8cb95489a4a10277b7f667b16f Mon Sep 17 00:00:00 2001 From: "Isaev, Ilya" Date: Tue, 26 Nov 2024 17:30:07 +0100 Subject: [PATCH 07/11] Fix Windows compilation Signed-off-by: Isaev, Ilya --- src/tbb/def/win64-tbb.def | 2 ++ test/tbb/test_task_arena.cpp | 31 +++++++++++++++++-------------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/src/tbb/def/win64-tbb.def b/src/tbb/def/win64-tbb.def index 96bafc0163..b68a46da2a 100644 --- a/src/tbb/def/win64-tbb.def +++ b/src/tbb/def/win64-tbb.def @@ -101,6 +101,8 @@ EXPORTS ?enqueue@r1@detail@tbb@@YAXAEAVtask@d1@23@PEAVtask_arena_base@523@@Z ?enqueue@r1@detail@tbb@@YAXAEAVtask@d1@23@AEAVtask_group_context@523@PEAVtask_arena_base@523@@Z ?execution_slot@r1@detail@tbb@@YAGAEBVtask_arena_base@d1@23@@Z +?register_parallel_block@r1@detail@tbb@@YAXAEAVtask_arena_base@d1@23@@Z +?unregister_parallel_block@r1@detail@tbb@@YAXAEAVtask_arena_base@d1@23@_N@Z ; System topology parsing and threads pinning (governor.cpp) ?numa_node_count@r1@detail@tbb@@YAIXZ diff --git a/test/tbb/test_task_arena.cpp b/test/tbb/test_task_arena.cpp index 9007b5c2df..ef7a9b791d 100644 --- a/test/tbb/test_task_arena.cpp +++ b/test/tbb/test_task_arena.cpp @@ -14,6 +14,7 @@ limitations under the License. */ +#include "common/dummy_body.h" #include "common/test.h" #define __TBB_EXTRA_DEBUG 1 @@ -41,6 +42,7 @@ #include #include #include +#include //#include "harness_fp.h" @@ -2099,6 +2101,7 @@ std::size_t measure_avg_start_time(tbb::task_arena& ta, const F1& start_block = }; for (std::size_t i = 0; i < num_runs; ++i) { + // std::cout << i << std::endl; ta.execute([&] { auto start_time = std::chrono::steady_clock::now(); start_block(); @@ -2106,9 +2109,9 @@ std::size_t measure_avg_start_time(tbb::task_arena& ta, const F1& start_block = end_block(); longest_start_times.push_back(get_longest_start(start_time)); }); - std::this_thread::sleep_for(std::chrono::microseconds(i)); + // std::this_thread::sleep_for(std::chrono::microseconds(500)); + utils::doDummyWork(i*100); } - // return std::accumulate(longest_start_times.begin(), longest_start_times.end(), 0) / num_runs; return utils::median(longest_start_times.begin(), longest_start_times.end()); } @@ -2118,7 +2121,7 @@ TEST_CASE("Check that workers leave faster with workers_leave::fast") { std::size_t num_trials = 20; std::vector avg_start_time_delayed(num_trials); { - tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::delayed); + tbb::task_arena ta((int)num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::delayed); for (std::size_t i = 0; i < num_trials; ++i) { auto avg_start_time = measure_avg_start_time(ta); avg_start_time_delayed[i] = avg_start_time; @@ -2127,7 +2130,7 @@ TEST_CASE("Check that workers leave faster with workers_leave::fast") { std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::vector avg_start_time_fast(num_trials); { - tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::fast); + tbb::task_arena ta((int)num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::fast); for (std::size_t i = 0; i < num_trials; ++i) { auto avg_start_time = measure_avg_start_time(ta); avg_start_time_fast[i] = avg_start_time; @@ -2135,8 +2138,8 @@ TEST_CASE("Check that workers leave faster with workers_leave::fast") { } std::sort(avg_start_time_delayed.begin(), avg_start_time_delayed.end()); std::sort(avg_start_time_fast.begin(), avg_start_time_fast.end()); - auto delayed_avg = std::accumulate(avg_start_time_delayed.begin(), avg_start_time_delayed.begin() + num_trials * 0.9, 0) / num_trials; - auto fast_avg = std::accumulate(avg_start_time_fast.begin(), avg_start_time_fast.begin() + num_trials * 0.9, 0) / num_trials; + auto delayed_avg = std::accumulate(avg_start_time_delayed.begin(), avg_start_time_delayed.begin() + int(num_trials * 0.9), 0ull) / num_trials; + auto fast_avg = std::accumulate(avg_start_time_fast.begin(), avg_start_time_fast.begin() + int(num_trials * 0.9), 0ull) / num_trials; WARN_MESSAGE(delayed_avg < fast_avg, "Expected workers start new work faster with delayed leave"); } @@ -2145,7 +2148,7 @@ TEST_CASE("parallel_block retains workers in task_arena") { std::size_t num_trials = 20; std::vector avg_start_time_delayed(num_trials); { - tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::fast); + tbb::task_arena ta((int)num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::fast); for (std::size_t i = 0; i < num_trials; ++i) { ta.start_parallel_block(); auto avg_start_time = measure_avg_start_time(ta); @@ -2156,7 +2159,7 @@ TEST_CASE("parallel_block retains workers in task_arena") { std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::vector avg_start_time_fast(num_trials); { - tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::fast); + tbb::task_arena ta((int)num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::fast); for (std::size_t i = 0; i < num_trials; ++i) { auto avg_start_time = measure_avg_start_time(ta); avg_start_time_fast[i] = avg_start_time; @@ -2164,8 +2167,8 @@ TEST_CASE("parallel_block retains workers in task_arena") { } std::sort(avg_start_time_delayed.begin(), avg_start_time_delayed.end()); std::sort(avg_start_time_fast.begin(), avg_start_time_fast.end()); - auto delayed_avg = std::accumulate(avg_start_time_delayed.begin(), avg_start_time_delayed.begin() + num_trials * 0.9, 0) / num_trials; - auto fast_avg = std::accumulate(avg_start_time_fast.begin(), avg_start_time_fast.begin() + num_trials * 0.9, 0) / num_trials; + auto delayed_avg = std::accumulate(avg_start_time_delayed.begin(), avg_start_time_delayed.begin() + int(num_trials * 0.9), 0ull) / num_trials; + auto fast_avg = std::accumulate(avg_start_time_fast.begin(), avg_start_time_fast.begin() + int(num_trials * 0.9), 0ull) / num_trials; WARN_MESSAGE(delayed_avg < fast_avg, "Expected workers start new work faster when using parallel_block"); } @@ -2174,7 +2177,7 @@ TEST_CASE("Test one-time fast leave") { std::size_t num_trials = 20; std::vector avg_start_time_delayed(num_trials); { - tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::delayed); + tbb::task_arena ta((int)num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::delayed); for (std::size_t i = 0; i < num_trials; ++i) { auto avg_start_time = measure_avg_start_time(ta); avg_start_time_delayed[i] = avg_start_time; @@ -2183,7 +2186,7 @@ TEST_CASE("Test one-time fast leave") { std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::vector avg_start_time_fast(num_trials); { - tbb::task_arena ta(num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::delayed); + tbb::task_arena ta((int)num_threads, 1, tbb::task_arena::priority::normal, tbb::task_arena::workers_leave::delayed); auto start_block = [&ta] { ta.start_parallel_block(); }; auto end_block = [&ta] { ta.end_parallel_block(true); }; for (std::size_t i = 0; i < num_trials; ++i) { @@ -2193,8 +2196,8 @@ TEST_CASE("Test one-time fast leave") { } std::sort(avg_start_time_delayed.begin(), avg_start_time_delayed.end()); std::sort(avg_start_time_fast.begin(), avg_start_time_fast.end()); - auto delayed_avg = std::accumulate(avg_start_time_delayed.begin(), avg_start_time_delayed.begin() + num_trials * 0.9, 0) / num_trials; - auto fast_avg = std::accumulate(avg_start_time_fast.begin(), avg_start_time_fast.begin() + num_trials * 0.9, 0) / num_trials; + auto delayed_avg = std::accumulate(avg_start_time_delayed.begin(), avg_start_time_delayed.begin() + int(num_trials * 0.9), 0ull) / num_trials; + auto fast_avg = std::accumulate(avg_start_time_fast.begin(), avg_start_time_fast.begin() + int(num_trials * 0.9), 0ull) / num_trials; WARN_MESSAGE(delayed_avg < fast_avg, "Expected one-time fast leave setting to slow workers to start new work"); } From 68d7a52123071a58ab019bb61cab2743ca802d77 Mon Sep 17 00:00:00 2001 From: "Isaev, Ilya" Date: Wed, 27 Nov 2024 17:13:56 +0100 Subject: [PATCH 08/11] Add win32 exports Signed-off-by: Isaev, Ilya --- src/tbb/def/win32-tbb.def | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/tbb/def/win32-tbb.def b/src/tbb/def/win32-tbb.def index 94b5441701..0b2695c80a 100644 --- a/src/tbb/def/win32-tbb.def +++ b/src/tbb/def/win32-tbb.def @@ -101,6 +101,8 @@ EXPORTS ?wait@r1@detail@tbb@@YAXAAVtask_arena_base@d1@23@@Z ?enqueue@r1@detail@tbb@@YAXAAVtask@d1@23@AAVtask_group_context@523@PAVtask_arena_base@523@@Z ?execution_slot@r1@detail@tbb@@YAGABVtask_arena_base@d1@23@@Z +?register_parallel_block@r1@detail@tbb@@YAXAAVtask_arena_base@d1@23@@Z +?unregister_parallel_block@r1@detail@tbb@@YAXAAVtask_arena_base@d1@23@_N@Z ; System topology parsing and threads pinning (governor.cpp) ?numa_node_count@r1@detail@tbb@@YAIXZ From 13aae5526cbd8e852a6426a7f0ae75c32e484b43 Mon Sep 17 00:00:00 2001 From: "Isaev, Ilya" Date: Wed, 27 Nov 2024 17:39:49 +0100 Subject: [PATCH 09/11] Add mac64 symbols Signed-off-by: Isaev, Ilya --- src/tbb/def/mac64-tbb.def | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/tbb/def/mac64-tbb.def b/src/tbb/def/mac64-tbb.def index 38bc48d30e..403259eec1 100644 --- a/src/tbb/def/mac64-tbb.def +++ b/src/tbb/def/mac64-tbb.def @@ -109,6 +109,8 @@ __ZN3tbb6detail2r17enqueueERNS0_2d14taskEPNS2_15task_arena_baseE __ZN3tbb6detail2r17enqueueERNS0_2d14taskERNS2_18task_group_contextEPNS2_15task_arena_baseE __ZN3tbb6detail2r14waitERNS0_2d115task_arena_baseE __ZN3tbb6detail2r114execution_slotERKNS0_2d115task_arena_baseE +__ZN3tbb6detail2r123register_parallel_blockERNS0_2d115task_arena_baseE +__ZN3tbb6detail2r125unregister_parallel_blockERNS0_2d115task_arena_baseEb # System topology parsing and threads pinning (governor.cpp) __ZN3tbb6detail2r115numa_node_countEv From 6538ecaf6140afcf1eb29c119e5c0629ab3760e3 Mon Sep 17 00:00:00 2001 From: "Isaev, Ilya" Date: Thu, 28 Nov 2024 16:31:15 +0100 Subject: [PATCH 10/11] Correct mask, add a clarifying comment to it Signed-off-by: Isaev, Ilya --- src/tbb/arena.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/tbb/arena.h b/src/tbb/arena.h index cfb26067d5..04f26fb7ed 100644 --- a/src/tbb/arena.h +++ b/src/tbb/arena.h @@ -185,7 +185,9 @@ class thread_leave_manager { static const std::uint64_t ONE_TIME_FAST_LEAVE = 1 << 1; static const std::uint64_t DELAYED_LEAVE = 1 << 2; static const std::uint64_t PARALLEL_BLOCK = 1 << 3; - static const std::uint64_t PARALLEL_BLOCK_MASK = ~((1LLU << 32) - 1) & ~(0x7); + // Use 29 bits for the parallel block state + reference counter, + // reserve 32 most significant bits. + static const std::uint64_t PARALLEL_BLOCK_MASK = ((1LLU << 32) - 1) & (PARALLEL_BLOCK - 1); std::atomic my_state{0}; public: From fa41197428ace09d98b61eb73a2f4905cebe5f01 Mon Sep 17 00:00:00 2001 From: Ilya Isaev Date: Thu, 28 Nov 2024 16:33:48 +0100 Subject: [PATCH 11/11] Apply suggested typo fix Co-authored-by: Aleksei Fedotov --- src/tbb/arena.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tbb/arena.h b/src/tbb/arena.h index 04f26fb7ed..698cac68dd 100644 --- a/src/tbb/arena.h +++ b/src/tbb/arena.h @@ -206,7 +206,7 @@ class thread_leave_manager { void restore_state_if_needed() { std::uint64_t curr = ONE_TIME_FAST_LEAVE; if (my_state.load(std::memory_order_relaxed) == curr) { - // Potentially can override desicion of the parallel block from future epoch + // Potentially can override decision of the parallel block from future epoch // but it is not a problem because it does not violate the correctness my_state.compare_exchange_strong(curr, DELAYED_LEAVE); }