Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reference implementation for parallel_block feature #1570

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
4 changes: 4 additions & 0 deletions include/oneapi/tbb/detail/_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -534,4 +534,8 @@
#define __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1
#endif

#if TBB_PREVIEW_PARALLEL_BLOCK || __TBB_BUILD
#define __TBB_PREVIEW_PARALLEL_BLOCK 1
#endif

#endif // __TBB_detail__config_H
134 changes: 118 additions & 16 deletions include/oneapi/tbb/task_arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ TBB_EXPORT void __TBB_EXPORTED_FUNC isolate_within_arena(d1::delegate_base& d, s
TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_arena_base*);
TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_group_context&, d1::task_arena_base*);
TBB_EXPORT void __TBB_EXPORTED_FUNC submit(d1::task&, d1::task_group_context&, arena*, std::uintptr_t);

#if __TBB_PREVIEW_PARALLEL_BLOCK
TBB_EXPORT void __TBB_EXPORTED_FUNC register_parallel_block(d1::task_arena_base&);
TBB_EXPORT void __TBB_EXPORTED_FUNC unregister_parallel_block(d1::task_arena_base&, bool);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we anticipate any future extension? In other words, does it make sense to accept an untyped pointer for any future details?

Suggested change
TBB_EXPORT void __TBB_EXPORTED_FUNC unregister_parallel_block(d1::task_arena_base&, bool);
TBB_EXPORT void __TBB_EXPORTED_FUNC unregister_parallel_block(d1::task_arena_base&, void* /*supplemental details*/);

Also, I don't think single bit can be passed. At least a word (or even more?) is used anyway. Then, it makes sense to accept at least that width and do not kill opportunity for future extensions:

Suggested change
TBB_EXPORT void __TBB_EXPORTED_FUNC unregister_parallel_block(d1::task_arena_base&, bool);
TBB_EXPORT void __TBB_EXPORTED_FUNC unregister_parallel_block(d1::task_arena_base&, unsigned short/*supplemental details*/);

Please double check what size of information is passed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I also have been thinking about that. Perhaps we should pass std::uintptr_t instead of void*?

#endif
} // namespace r1

namespace d2 {
Expand Down Expand Up @@ -122,6 +127,15 @@ class task_arena_base {
normal = 2 * priority_stride,
high = 3 * priority_stride
};

#if __TBB_PREVIEW_PARALLEL_BLOCK
enum class workers_leave : int {
automatic = 0,
fast = 1,
delayed = 2
};
#endif

#if __TBB_ARENA_BINDING
using constraints = tbb::detail::d1::constraints;
#endif /*__TBB_ARENA_BINDING*/
Expand Down Expand Up @@ -154,6 +168,11 @@ class task_arena_base {
//! Number of threads per core
int my_max_threads_per_core;

#if __TBB_PREVIEW_PARALLEL_BLOCK
//! Defines the initial behavior of the workers_leave state machine
workers_leave my_workers_leave;
#endif

// Backward compatibility checks.
core_type_id core_type() const {
return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_core_type : automatic;
Expand All @@ -167,7 +186,11 @@ class task_arena_base {
, core_type_support_flag = 1
};

task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority)
task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority
#if __TBB_PREVIEW_PARALLEL_BLOCK
, workers_leave wl
#endif
)
: my_version_and_traits(default_flags | core_type_support_flag)
, my_initialization_state(do_once_state::uninitialized)
, my_arena(nullptr)
Expand All @@ -177,10 +200,17 @@ class task_arena_base {
, my_numa_id(automatic)
, my_core_type(automatic)
, my_max_threads_per_core(automatic)
#if __TBB_PREVIEW_PARALLEL_BLOCK
, my_workers_leave(wl)
#endif
{}

#if __TBB_ARENA_BINDING
task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority)
task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority
#if __TBB_PREVIEW_PARALLEL_BLOCK
, workers_leave wl
#endif
)
: my_version_and_traits(default_flags | core_type_support_flag)
, my_initialization_state(do_once_state::uninitialized)
, my_arena(nullptr)
Expand All @@ -190,6 +220,9 @@ class task_arena_base {
, my_numa_id(constraints_.numa_id)
, my_core_type(constraints_.core_type)
, my_max_threads_per_core(constraints_.max_threads_per_core)
#if __TBB_PREVIEW_PARALLEL_BLOCK
, my_workers_leave(wl)
#endif
{}
#endif /*__TBB_ARENA_BINDING*/
public:
Expand Down Expand Up @@ -259,31 +292,55 @@ class task_arena : public task_arena_base {
* Value of 1 is default and reflects behavior of implicit arenas.
**/
task_arena(int max_concurrency_ = automatic, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal)
: task_arena_base(max_concurrency_, reserved_for_masters, a_priority)
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_BLOCK
, workers_leave wl = workers_leave::automatic
#endif
)
: task_arena_base(max_concurrency_, reserved_for_masters, a_priority
#if __TBB_PREVIEW_PARALLEL_BLOCK
, wl
#endif
)
{}

#if __TBB_ARENA_BINDING
//! Creates task arena pinned to certain NUMA node
task_arena(const constraints& constraints_, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal)
: task_arena_base(constraints_, reserved_for_masters, a_priority)
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_BLOCK
, workers_leave wl = workers_leave::automatic
#endif
)
: task_arena_base(constraints_, reserved_for_masters, a_priority
#if __TBB_PREVIEW_PARALLEL_BLOCK
, wl
#endif
)
{}

//! Copies settings from another task_arena
task_arena(const task_arena &s) // copy settings but not the reference or instance
task_arena(const task_arena &a) // copy settings but not the reference or instance
: task_arena_base(
constraints{}
.set_numa_id(s.my_numa_id)
.set_max_concurrency(s.my_max_concurrency)
.set_core_type(s.my_core_type)
.set_max_threads_per_core(s.my_max_threads_per_core)
, s.my_num_reserved_slots, s.my_priority)
.set_numa_id(a.my_numa_id)
.set_max_concurrency(a.my_max_concurrency)
.set_core_type(a.my_core_type)
.set_max_threads_per_core(a.my_max_threads_per_core)
, a.my_num_reserved_slots, a.my_priority
#if __TBB_PREVIEW_PARALLEL_BLOCK
, a.my_workers_leave
#endif
)
{}
#else
//! Copies settings from another task_arena
task_arena(const task_arena& a) // copy settings but not the reference or instance
: task_arena_base(a.my_max_concurrency, a.my_num_reserved_slots, a.my_priority)
: task_arena_base(a.my_max_concurrency, a.my_num_reserved_slots, a.my_priority
#if __TBB_PREVIEW_PARALLEL_BLOCK
, a.my_workers_leave
#endif
)
{}
#endif /*__TBB_ARENA_BINDING*/

Expand All @@ -292,7 +349,11 @@ class task_arena : public task_arena_base {

//! Creates an instance of task_arena attached to the current arena of the thread
explicit task_arena( attach )
: task_arena_base(automatic, 1, priority::normal) // use default settings if attach fails
: task_arena_base(automatic, 1, priority::normal
#if __TBB_PREVIEW_PARALLEL_BLOCK
, workers_leave::automatic
#endif
) // use default settings if attach fails
{
if (r1::attach(*this)) {
mark_initialized();
Expand All @@ -311,21 +372,32 @@ class task_arena : public task_arena_base {

//! Overrides concurrency level and forces initialization of internal representation
void initialize(int max_concurrency_, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal)
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_BLOCK
, workers_leave wl = workers_leave::automatic
#endif
)
{
__TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
if( !is_active() ) {
my_max_concurrency = max_concurrency_;
my_num_reserved_slots = reserved_for_masters;
my_priority = a_priority;
#if __TBB_PREVIEW_PARALLEL_BLOCK
my_workers_leave = wl;
#endif
r1::initialize(*this);
mark_initialized();
}
}

#if __TBB_ARENA_BINDING
void initialize(constraints constraints_, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal)
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_BLOCK
, workers_leave wl = workers_leave::automatic
#endif
)
{
__TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
if( !is_active() ) {
Expand All @@ -335,6 +407,9 @@ class task_arena : public task_arena_base {
my_max_threads_per_core = constraints_.max_threads_per_core;
my_num_reserved_slots = reserved_for_masters;
my_priority = a_priority;
#if __TBB_PREVIEW_PARALLEL_BLOCK
my_workers_leave = wl;
#endif
r1::initialize(*this);
mark_initialized();
}
Expand Down Expand Up @@ -404,6 +479,33 @@ class task_arena : public task_arena_base {
return execute_impl<decltype(f())>(f);
}

#if __TBB_PREVIEW_PARALLEL_BLOCK
void start_parallel_block() {
initialize();
r1::register_parallel_block(*this);
// Trigger worker threads to join arena
enqueue([]{});
}
void end_parallel_block(bool set_one_time_fast_leave = false) {
__TBB_ASSERT(my_initialization_state.load(std::memory_order_relaxed) == do_once_state::initialized, nullptr);
r1::unregister_parallel_block(*this, set_one_time_fast_leave);
}

class scoped_parallel_block {
task_arena& arena;
bool one_time_fast_leave;
public:
scoped_parallel_block(task_arena& ta, bool set_one_time_fast_leave = true)
: arena(ta), one_time_fast_leave(set_one_time_fast_leave)
{
arena.start_parallel_block();
}
~scoped_parallel_block() {
arena.end_parallel_block(one_time_fast_leave);
}
};
#endif

#if __TBB_EXTRA_DEBUG
//! Returns my_num_reserved_slots
int debug_reserved_slots() const {
Expand Down
65 changes: 59 additions & 6 deletions src/tbb/arena.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,12 @@ void arena::process(thread_data& tls) {
__TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join");
}

arena::arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level) {
arena::arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level
#if __TBB_PREVIEW_PARALLEL_BLOCK
, tbb::task_arena::workers_leave wl
#endif
)
{
__TBB_ASSERT( !my_guard, "improperly allocated arena?" );
__TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" );
__TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" );
Expand Down Expand Up @@ -276,10 +281,18 @@ arena::arena(threading_control* control, unsigned num_slots, unsigned num_reserv
my_critical_task_stream.initialize(my_num_slots);
#endif
my_mandatory_requests = 0;

#if __TBB_PREVIEW_PARALLEL_BLOCK
my_thread_leave.set_initial_state(wl);
#endif
}

arena& arena::allocate_arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots,
unsigned priority_level)
unsigned priority_level
#if __TBB_PREVIEW_PARALLEL_BLOCK
, tbb::task_arena::workers_leave wl
#endif
)
{
__TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
__TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" );
Expand All @@ -290,7 +303,11 @@ arena& arena::allocate_arena(threading_control* control, unsigned num_slots, uns
std::memset( storage, 0, n );

return *new( storage + num_arena_slots(num_slots, num_reserved_slots) * sizeof(mail_outbox) )
arena(control, num_slots, num_reserved_slots, priority_level);
arena(control, num_slots, num_reserved_slots, priority_level
#if __TBB_PREVIEW_PARALLEL_BLOCK
, wl
#endif
);
}

void arena::free_arena () {
Expand Down Expand Up @@ -340,6 +357,9 @@ bool arena::has_enqueued_tasks() {
}

void arena::request_workers(int mandatory_delta, int workers_delta, bool wakeup_threads) {
#if __TBB_PREVIEW_PARALLEL_BLOCK
my_thread_leave.restore_state_if_needed();
#endif
my_threading_control->adjust_demand(my_tc_client, mandatory_delta, workers_delta);

if (wakeup_threads) {
Expand Down Expand Up @@ -443,11 +463,20 @@ void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data&
advertise_new_work<work_enqueued>();
}

arena& arena::create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned arena_priority_level, d1::constraints constraints) {
arena& arena::create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned arena_priority_level, d1::constraints constraints
#if __TBB_PREVIEW_PARALLEL_BLOCK
, tbb::task_arena::workers_leave wl
#endif
)
{
__TBB_ASSERT(num_slots > 0, NULL);
__TBB_ASSERT(num_reserved_slots <= num_slots, NULL);
// Add public market reference for an external thread/task_arena (that adds an internal reference in exchange).
arena& a = arena::allocate_arena(control, num_slots, num_reserved_slots, arena_priority_level);
arena& a = arena::allocate_arena(control, num_slots, num_reserved_slots, arena_priority_level
#if __TBB_PREVIEW_PARALLEL_BLOCK
, wl
#endif
);
a.my_tc_client = control->create_client(a);
// We should not publish arena until all fields are initialized
control->publish_client(a.my_tc_client, constraints);
Expand Down Expand Up @@ -500,6 +529,8 @@ struct task_arena_impl {
static int max_concurrency(const d1::task_arena_base*);
static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*);
static d1::slot_id execution_slot(const d1::task_arena_base&);
static void register_parallel_block(d1::task_arena_base&);
static void unregister_parallel_block(d1::task_arena_base&, bool);
};

void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) {
Expand Down Expand Up @@ -534,6 +565,14 @@ d1::slot_id __TBB_EXPORTED_FUNC execution_slot(const d1::task_arena_base& arena)
return task_arena_impl::execution_slot(arena);
}

void __TBB_EXPORTED_FUNC register_parallel_block(d1::task_arena_base& ta) {
task_arena_impl::register_parallel_block(ta);
}

void __TBB_EXPORTED_FUNC unregister_parallel_block(d1::task_arena_base& ta, bool one_time_fast_leave) {
task_arena_impl::unregister_parallel_block(ta, one_time_fast_leave);
}

void task_arena_impl::initialize(d1::task_arena_base& ta) {
// Enforce global market initialization to properly initialize soft limit
(void)governor::get_thread_data();
Expand Down Expand Up @@ -568,7 +607,11 @@ void task_arena_impl::initialize(d1::task_arena_base& ta) {
__TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized");
unsigned priority_level = arena_priority_level(ta.my_priority);
threading_control* thr_control = threading_control::register_public_reference();
arena& a = arena::create(thr_control, unsigned(ta.my_max_concurrency), ta.my_num_reserved_slots, priority_level, arena_constraints);
arena& a = arena::create(thr_control, unsigned(ta.my_max_concurrency), ta.my_num_reserved_slots, priority_level, arena_constraints
#if __TBB_PREVIEW_PARALLEL_BLOCK
, ta.my_workers_leave
#endif
);

ta.my_arena.store(&a, std::memory_order_release);
#if __TBB_CPUBIND_PRESENT
Expand Down Expand Up @@ -875,6 +918,16 @@ int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) {
return int(governor::default_num_threads());
}

#if __TBB_PREVIEW_PARALLEL_BLOCK
void task_arena_impl::register_parallel_block(d1::task_arena_base& ta) {
ta.my_arena.load(std::memory_order_relaxed)->my_thread_leave.register_parallel_block();
}

void task_arena_impl::unregister_parallel_block(d1::task_arena_base& ta, bool one_time_fast_leave) {
ta.my_arena.load(std::memory_order_relaxed)->my_thread_leave.unregister_parallel_block(one_time_fast_leave);
}
#endif

void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) {
// TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it?
thread_data* tls = governor::get_thread_data();
Expand Down
Loading
Loading