Skip to content

Commit

Permalink
Equalize arena slots
Browse files Browse the repository at this point in the history
Previously arena::my_references served for performance optimization,
quota on number of workers was provided by
occupy_free_slot</*as_worker*/true>() logic. In the patch
arena::my_references is used for following to the reserved_for_masters
limit.
  • Loading branch information
Alexandr-Konovalov committed Jul 1, 2024
1 parent 04fe912 commit c744fcb
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 55 deletions.
80 changes: 43 additions & 37 deletions src/tbb/arena.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,34 +149,27 @@ void arena::on_thread_leaving(unsigned ref_param) {
}
}

std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) {
if ( lower >= upper ) return out_of_arena;
std::size_t arena::occupy_free_slot(thread_data& tls) {
// Start search for an empty slot from the one we occupied the last time
std::size_t index = tls.my_arena_index;
if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower;
__TBB_ASSERT( index >= lower && index < upper, nullptr);
std::size_t locked_slot = out_of_arena;
if ( index >= my_num_slots ) index = tls.my_random.get() % my_num_slots;
// Find a free slot
for ( std::size_t i = index; i < upper; ++i )
if (my_slots[i].try_occupy()) return i;
for ( std::size_t i = lower; i < index; ++i )
if (my_slots[i].try_occupy()) return i;
return out_of_arena;
}

template <bool as_worker>
std::size_t arena::occupy_free_slot(thread_data& tls) {
// Firstly, external threads try to occupy reserved slots
std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls, 0, my_num_reserved_slots );
if ( index == out_of_arena ) {
// Secondly, all threads try to occupy all non-reserved slots
index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots );
// Likely this arena is already saturated
if ( index == out_of_arena )
return out_of_arena;
}
for ( std::size_t i = index; i < my_num_slots; ++i )
if (my_slots[i].try_occupy()) {
locked_slot = i;
break;
}
if ( locked_slot == out_of_arena )
for ( std::size_t i = 0; i < index; ++i )
if (my_slots[i].try_occupy()) {
locked_slot = i;
break;
}

atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
return index;
if ( locked_slot != out_of_arena )
atomic_update( my_limit, (unsigned)(locked_slot + 1), std::less<unsigned>() );
return locked_slot;
}

std::uintptr_t arena::calculate_stealing_threshold() {
Expand All @@ -189,16 +182,16 @@ void arena::process(thread_data& tls) {
__TBB_ASSERT( is_alive(my_guard), nullptr);
__TBB_ASSERT( my_num_slots >= 1, nullptr);

std::size_t index = occupy_free_slot</*as_worker*/true>(tls);
std::size_t index = occupy_free_slot(tls);
if (index == out_of_arena) {
on_thread_leaving(ref_worker);
return;
}

my_tc_client.get_pm_client()->register_thread();

__TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
tls.attach_arena(*this, index);
// remeber that we occupied a slot from workers' quota
tls.attach_arena(*this, index, /*is_worker_slot*/ true);
// worker thread enters the dispatch loop to look for a work
tls.my_inbox.set_is_idle(true);
if (tls.my_arena_slot->is_task_pool_published()) {
Expand Down Expand Up @@ -396,7 +389,14 @@ bool arena::is_top_priority() const {

bool arena::try_join() {
if (is_joinable()) {
my_references += arena::ref_worker;
// check quota for number of workers in arena
unsigned curr = my_references.fetch_add(arena::ref_worker);
curr += arena::ref_worker;
unsigned workers = curr >> arena::ref_external_bits;
if (workers > my_num_slots - my_num_reserved_slots) {
my_references -= arena::ref_worker;
return false;
}
return true;
}
return false;
Expand Down Expand Up @@ -626,24 +626,26 @@ void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_a

class nested_arena_context : no_copy {
public:
nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index)
nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index, bool is_worker_slot)
: m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext)
{
if (td.my_arena != &nested_arena) {
m_orig_arena = td.my_arena;
m_orig_slot_index = td.my_arena_index;
__TBB_ASSERT(td.my_worker_slot_type != thread_data::slot_type::undefined, "Invalid slot type");
m_orig_is_worker_slot = td.my_worker_slot_type == thread_data::slot_type::worker;
m_orig_last_observer = td.my_last_observer;

td.detach_task_dispatcher();
td.attach_arena(nested_arena, slot_index);
td.attach_arena(nested_arena, slot_index, is_worker_slot);
if (td.my_inbox.is_idle_state(true))
td.my_inbox.set_is_idle(false);
task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
td.enter_task_dispatcher(task_disp, m_orig_execute_data_ext.task_disp->m_stealing_threshold);

// If the calling thread occupies the slots out of external thread reserve we need to notify the
// market that this arena requires one worker less.
if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
if (td.my_worker_slot_type == thread_data::slot_type::worker) {
td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ -1);
}

Expand Down Expand Up @@ -679,15 +681,15 @@ class nested_arena_context : no_copy {

// Notify the market that this thread releasing a one slot
// that can be used by a worker thread.
if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
if (td.my_worker_slot_type == thread_data::slot_type::worker) {
td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ 1);
}

td.leave_task_dispatcher();
td.my_arena_slot->release();
td.my_arena->my_exit_monitors.notify_one(); // do not relax!

td.attach_arena(*m_orig_arena, m_orig_slot_index);
td.attach_arena(*m_orig_arena, m_orig_slot_index, m_orig_is_worker_slot);
td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp);
__TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);
}
Expand All @@ -700,6 +702,7 @@ class nested_arena_context : no_copy {
observer_proxy* m_orig_last_observer{ nullptr };
task_dispatcher* m_task_dispatcher{ nullptr };
unsigned m_orig_slot_index{};
bool m_orig_is_worker_slot{};
bool m_orig_fifo_tasks_allowed{};
bool m_orig_critical_task_allowed{};
};
Expand Down Expand Up @@ -757,8 +760,11 @@ void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {

bool same_arena = td->my_arena == a;
std::size_t index1 = td->my_arena_index;
__TBB_ASSERT(td->my_worker_slot_type != thread_data::slot_type::undefined, "Invalid slot type");
bool is_worker_slot = td->my_worker_slot_type == thread_data::slot_type::worker;
if (!same_arena) {
index1 = a->occupy_free_slot</*as_worker */false>(*td);
index1 = a->occupy_free_slot(*td);
is_worker_slot = false;
if (index1 == arena::out_of_arena) {
concurrent_monitor::thread_context waiter((std::uintptr_t)&d);
d1::wait_context wo(1);
Expand All @@ -774,10 +780,10 @@ void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {
a->my_exit_monitors.cancel_wait(waiter);
break;
}
index2 = a->occupy_free_slot</*as_worker*/false>(*td);
index2 = a->occupy_free_slot(*td);
if (index2 != arena::out_of_arena) {
a->my_exit_monitors.cancel_wait(waiter);
nested_arena_context scope(*td, *a, index2 );
nested_arena_context scope(*td, *a, index2, /*is_worker_slot=*/false);
r1::wait(wo, exec_context);
__TBB_ASSERT(!exec_context.my_exception.load(std::memory_order_relaxed), nullptr); // exception can be thrown above, not deferred
break;
Expand All @@ -802,7 +808,7 @@ void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {

context_guard_helper</*report_tasks=*/false> context_guard;
context_guard.set_ctx(a->my_default_ctx);
nested_arena_context scope(*td, *a, index1);
nested_arena_context scope(*td, *a, index1, is_worker_slot);
#if _WIN64
try {
#endif
Expand Down
3 changes: 0 additions & 3 deletions src/tbb/arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,7 @@ class arena: public padded<arena_base>

static const std::size_t out_of_arena = ~size_t(0);
//! Tries to occupy a slot in the arena. On success, returns the slot index; if no slot is available, returns out_of_arena.
template <bool as_worker>
std::size_t occupy_free_slot(thread_data&);
//! Tries to occupy a slot in the specified range.
std::size_t occupy_free_slot_in_range(thread_data& tls, std::size_t lower, std::size_t upper);

std::uintptr_t calculate_stealing_threshold();

Expand Down
2 changes: 1 addition & 1 deletion src/tbb/governor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ void governor::init_external_thread() {
arena& a = arena::create(thr_control, num_slots, num_reserved_slots, arena_priority_level);
// External thread always occupies the first slot
thread_data& td = *new(cache_aligned_allocate(sizeof(thread_data))) thread_data(0, false);
td.attach_arena(a, /*slot index*/ 0);
td.attach_arena(a, /*slot index*/ 0, /*is_worker_slot*/ false);
__TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);

stack_size = a.my_threading_control->worker_stack_size();
Expand Down
15 changes: 13 additions & 2 deletions src/tbb/thread_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,16 @@ class thread_data : public ::rml::job
, public d1::intrusive_list_node
, no_copy {
public:
enum class slot_type {
worker,
master,
undefined
};

thread_data(unsigned short index, bool is_worker)
: my_arena_index{ index }
, my_is_worker{ is_worker }
, my_worker_slot_type{ slot_type::undefined }
, my_task_dispatcher{ nullptr }
, my_arena{ nullptr }
, my_last_client{ nullptr }
Expand Down Expand Up @@ -131,7 +138,7 @@ class thread_data : public ::rml::job
#endif /* __TBB_RESUMABLE_TASKS */
}

void attach_arena(arena& a, std::size_t index);
void attach_arena(arena& a, std::size_t index, bool is_worker_slot);
bool is_attached_to(arena*);
void attach_task_dispatcher(task_dispatcher&);
void detach_task_dispatcher();
Expand All @@ -145,6 +152,9 @@ class thread_data : public ::rml::job
//! Indicates if the thread is created by RML
const bool my_is_worker;

//! Is the slot occupied in arena begongs to workers' quota?
slot_type my_worker_slot_type;

//! The current task dipsatcher
task_dispatcher* my_task_dispatcher;

Expand Down Expand Up @@ -202,9 +212,10 @@ class thread_data : public ::rml::job
d1::task_group_context my_default_context;
};

inline void thread_data::attach_arena(arena& a, std::size_t index) {
inline void thread_data::attach_arena(arena& a, std::size_t index, bool is_worker_slot) {
my_arena = &a;
my_arena_index = static_cast<unsigned short>(index);
my_worker_slot_type = is_worker_slot? slot_type::worker : slot_type::master;
my_arena_slot = a.my_slots + index;
// Read the current slot mail_outbox and attach it to the mail_inbox (remove inbox later maybe)
my_inbox.attach(my_arena->mailbox(index));
Expand Down
32 changes: 20 additions & 12 deletions test/tbb/test_task_arena.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,31 +111,37 @@ class ArenaObserver : public tbb::task_scheduler_observer {
int myId; // unique observer/arena id within a test
int myMaxConcurrency; // concurrency of the associated arena
int myNumReservedSlots; // reserved slots in the associated arena
std::atomic<int> myNumActiveWorkers;
void on_scheduler_entry( bool is_worker ) override {
int current_index = tbb::this_task_arena::current_thread_index();
CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
if (is_worker) {
CHECK(current_index >= myNumReservedSlots);
int currNumActiveWorkers = ++myNumActiveWorkers;
CHECK(currNumActiveWorkers <= myMaxConcurrency - myNumReservedSlots);
}
CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry");
old_id.local() = local_id.local();
CHECK_MESSAGE(old_id.local() != myId, "double entry to the same arena");
local_id.local() = myId;
slot_id.local() = current_index;
}
void on_scheduler_exit( bool /*is_worker*/ ) override {
void on_scheduler_exit( bool is_worker ) override {
CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken");
CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index());
slot_id.local() = -2;
local_id.local() = old_id.local();
old_id.local() = 0;
if (is_worker) {
--myNumActiveWorkers;
}
}
public:
ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id)
: tbb::task_scheduler_observer(a)
, myId(id)
, myMaxConcurrency(maxConcurrency)
, myNumReservedSlots(numReservedSlots) {
, myNumReservedSlots(numReservedSlots)
, myNumActiveWorkers(0) {
CHECK(myId);
observe(true);
}
Expand Down Expand Up @@ -453,12 +459,8 @@ class TestArenaConcurrencyBody : utils::NoAssign {
int max_arena_concurrency = tbb::this_task_arena::max_concurrency();
REQUIRE( max_arena_concurrency == my_max_concurrency );
if ( my_worker_barrier ) {
if ( local_id.local() == 1 ) {
// External thread in a reserved slot
CHECK_MESSAGE( idx < my_reserved_slots, "External threads are supposed to use only reserved slots in this test" );
} else {
if ( local_id.local() != 1 ) {
// Worker thread
CHECK( idx >= my_reserved_slots );
my_worker_barrier->wait();
}
} else if ( my_barrier )
Expand Down Expand Up @@ -1545,20 +1547,26 @@ class simple_observer : public tbb::task_scheduler_observer {
int my_idx;
int myMaxConcurrency; // concurrency of the associated arena
int myNumReservedSlots; // reserved slots in the associated arena
std::atomic<int> myNumActiveWorkers;
void on_scheduler_entry( bool is_worker ) override {
int current_index = tbb::this_task_arena::current_thread_index();
CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
if (is_worker) {
CHECK(current_index >= myNumReservedSlots);
int currNumActiveWorkers = ++myNumActiveWorkers;
CHECK(currNumActiveWorkers <= myMaxConcurrency - myNumReservedSlots);
}
}
void on_scheduler_exit( bool is_worker ) override {
if (is_worker) {
--myNumActiveWorkers;
}
}
void on_scheduler_exit( bool /*is_worker*/ ) override
{}
public:
simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots)
: tbb::task_scheduler_observer(a), my_idx(idx_counter++)
, myMaxConcurrency(maxConcurrency)
, myNumReservedSlots(numReservedSlots) {
, myNumReservedSlots(numReservedSlots)
, myNumActiveWorkers(0) {
observe(true);
}

Expand Down

0 comments on commit c744fcb

Please sign in to comment.