Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Equalize arena slots #1436

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 43 additions & 37 deletions src/tbb/arena.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,34 +149,27 @@ void arena::on_thread_leaving(unsigned ref_param) {
}
}

std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) {
if ( lower >= upper ) return out_of_arena;
std::size_t arena::occupy_free_slot(thread_data& tls) {
// Start search for an empty slot from the one we occupied the last time
std::size_t index = tls.my_arena_index;
if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower;
__TBB_ASSERT( index >= lower && index < upper, nullptr);
std::size_t locked_slot = out_of_arena;
if ( index >= my_num_slots ) index = tls.my_random.get() % my_num_slots;
// Find a free slot
for ( std::size_t i = index; i < upper; ++i )
if (my_slots[i].try_occupy()) return i;
for ( std::size_t i = lower; i < index; ++i )
if (my_slots[i].try_occupy()) return i;
return out_of_arena;
}

template <bool as_worker>
std::size_t arena::occupy_free_slot(thread_data& tls) {
// Firstly, external threads try to occupy reserved slots
std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls, 0, my_num_reserved_slots );
if ( index == out_of_arena ) {
// Secondly, all threads try to occupy all non-reserved slots
index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots );
// Likely this arena is already saturated
if ( index == out_of_arena )
return out_of_arena;
}
for ( std::size_t i = index; i < my_num_slots; ++i )
if (my_slots[i].try_occupy()) {
locked_slot = i;
break;
}
pavelkumbrasev marked this conversation as resolved.
Show resolved Hide resolved
if ( locked_slot == out_of_arena )
for ( std::size_t i = 0; i < index; ++i )
if (my_slots[i].try_occupy()) {
locked_slot = i;
break;
}
pavelkumbrasev marked this conversation as resolved.
Show resolved Hide resolved

atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
return index;
if ( locked_slot != out_of_arena )
atomic_update( my_limit, (unsigned)(locked_slot + 1), std::less<unsigned>() );
pavelkumbrasev marked this conversation as resolved.
Show resolved Hide resolved
return locked_slot;
}

std::uintptr_t arena::calculate_stealing_threshold() {
Expand All @@ -189,16 +182,16 @@ void arena::process(thread_data& tls) {
__TBB_ASSERT( is_alive(my_guard), nullptr);
__TBB_ASSERT( my_num_slots >= 1, nullptr);

std::size_t index = occupy_free_slot</*as_worker*/true>(tls);
std::size_t index = occupy_free_slot(tls);
if (index == out_of_arena) {
on_thread_leaving(ref_worker);
return;
}

my_tc_client.get_pm_client()->register_thread();

__TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
tls.attach_arena(*this, index);
// remeber that we occupied a slot from workers' quota
tls.attach_arena(*this, index, /*is_worker_slot*/ true);
// worker thread enters the dispatch loop to look for a work
tls.my_inbox.set_is_idle(true);
if (tls.my_arena_slot->is_task_pool_published()) {
Expand Down Expand Up @@ -396,7 +389,14 @@ bool arena::is_top_priority() const {

bool arena::try_join() {
if (is_joinable()) {
my_references += arena::ref_worker;
// check quota for number of workers in arena
unsigned curr = my_references.fetch_add(arena::ref_worker);
curr += arena::ref_worker;
pavelkumbrasev marked this conversation as resolved.
Show resolved Hide resolved
unsigned workers = curr >> arena::ref_external_bits;
if (workers > my_num_slots - my_num_reserved_slots) {
my_references -= arena::ref_worker;
return false;
}
return true;
}
return false;
Expand Down Expand Up @@ -626,24 +626,26 @@ void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_a

class nested_arena_context : no_copy {
public:
nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index)
nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index, bool is_worker_slot)
: m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext)
{
if (td.my_arena != &nested_arena) {
m_orig_arena = td.my_arena;
m_orig_slot_index = td.my_arena_index;
__TBB_ASSERT(td.my_worker_slot_type != thread_data::slot_type::undefined, "Invalid slot type");
m_orig_is_worker_slot = td.my_worker_slot_type == thread_data::slot_type::worker;
m_orig_last_observer = td.my_last_observer;

td.detach_task_dispatcher();
td.attach_arena(nested_arena, slot_index);
td.attach_arena(nested_arena, slot_index, is_worker_slot);
if (td.my_inbox.is_idle_state(true))
td.my_inbox.set_is_idle(false);
task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
td.enter_task_dispatcher(task_disp, m_orig_execute_data_ext.task_disp->m_stealing_threshold);

// If the calling thread occupies the slots out of external thread reserve we need to notify the
// market that this arena requires one worker less.
if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
if (td.my_worker_slot_type == thread_data::slot_type::worker) {
td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ -1);
}

Expand Down Expand Up @@ -679,15 +681,15 @@ class nested_arena_context : no_copy {

// Notify the market that this thread releasing a one slot
// that can be used by a worker thread.
if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
if (td.my_worker_slot_type == thread_data::slot_type::worker) {
td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ 1);
}

td.leave_task_dispatcher();
td.my_arena_slot->release();
td.my_arena->my_exit_monitors.notify_one(); // do not relax!

td.attach_arena(*m_orig_arena, m_orig_slot_index);
td.attach_arena(*m_orig_arena, m_orig_slot_index, m_orig_is_worker_slot);
td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp);
__TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);
}
Expand All @@ -700,6 +702,7 @@ class nested_arena_context : no_copy {
observer_proxy* m_orig_last_observer{ nullptr };
task_dispatcher* m_task_dispatcher{ nullptr };
unsigned m_orig_slot_index{};
bool m_orig_is_worker_slot{};
bool m_orig_fifo_tasks_allowed{};
bool m_orig_critical_task_allowed{};
};
Expand Down Expand Up @@ -757,8 +760,11 @@ void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {

bool same_arena = td->my_arena == a;
std::size_t index1 = td->my_arena_index;
__TBB_ASSERT(td->my_worker_slot_type != thread_data::slot_type::undefined, "Invalid slot type");
bool is_worker_slot = td->my_worker_slot_type == thread_data::slot_type::worker;
if (!same_arena) {
index1 = a->occupy_free_slot</*as_worker */false>(*td);
index1 = a->occupy_free_slot(*td);
is_worker_slot = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we unset the flag?
I did not get the idea: how we distinguish if external thread occupied the workers slot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, here can be a problem place. Logic behind is_worker_slot = false; is that the calling thread got index1 slot as a master, so not consuming from quota of workers slots. Is it the invalid reasoning?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there are more external threads than reserved slots then these external threads will take slots from workers quota.
Am I missing something?

if (index1 == arena::out_of_arena) {
concurrent_monitor::thread_context waiter((std::uintptr_t)&d);
d1::wait_context wo(1);
Expand All @@ -774,10 +780,10 @@ void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {
a->my_exit_monitors.cancel_wait(waiter);
break;
}
index2 = a->occupy_free_slot</*as_worker*/false>(*td);
index2 = a->occupy_free_slot(*td);
if (index2 != arena::out_of_arena) {
a->my_exit_monitors.cancel_wait(waiter);
nested_arena_context scope(*td, *a, index2 );
nested_arena_context scope(*td, *a, index2, /*is_worker_slot=*/false);
r1::wait(wo, exec_context);
__TBB_ASSERT(!exec_context.my_exception.load(std::memory_order_relaxed), nullptr); // exception can be thrown above, not deferred
break;
Expand All @@ -802,7 +808,7 @@ void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {

context_guard_helper</*report_tasks=*/false> context_guard;
context_guard.set_ctx(a->my_default_ctx);
nested_arena_context scope(*td, *a, index1);
nested_arena_context scope(*td, *a, index1, is_worker_slot);
#if _WIN64
try {
#endif
Expand Down
3 changes: 0 additions & 3 deletions src/tbb/arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,7 @@ class arena: public padded<arena_base>

static const std::size_t out_of_arena = ~size_t(0);
//! Tries to occupy a slot in the arena. On success, returns the slot index; if no slot is available, returns out_of_arena.
template <bool as_worker>
std::size_t occupy_free_slot(thread_data&);
//! Tries to occupy a slot in the specified range.
std::size_t occupy_free_slot_in_range(thread_data& tls, std::size_t lower, std::size_t upper);

std::uintptr_t calculate_stealing_threshold();

Expand Down
2 changes: 1 addition & 1 deletion src/tbb/governor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ void governor::init_external_thread() {
arena& a = arena::create(thr_control, num_slots, num_reserved_slots, arena_priority_level);
// External thread always occupies the first slot
thread_data& td = *new(cache_aligned_allocate(sizeof(thread_data))) thread_data(0, false);
td.attach_arena(a, /*slot index*/ 0);
td.attach_arena(a, /*slot index*/ 0, /*is_worker_slot*/ false);
__TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);

stack_size = a.my_threading_control->worker_stack_size();
Expand Down
15 changes: 13 additions & 2 deletions src/tbb/thread_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,16 @@ class thread_data : public ::rml::job
, public d1::intrusive_list_node
, no_copy {
public:
enum class slot_type {
worker,
master,
undefined
};

thread_data(unsigned short index, bool is_worker)
: my_arena_index{ index }
, my_is_worker{ is_worker }
, my_worker_slot_type{ slot_type::undefined }
, my_task_dispatcher{ nullptr }
, my_arena{ nullptr }
, my_last_client{ nullptr }
Expand Down Expand Up @@ -131,7 +138,7 @@ class thread_data : public ::rml::job
#endif /* __TBB_RESUMABLE_TASKS */
}

void attach_arena(arena& a, std::size_t index);
void attach_arena(arena& a, std::size_t index, bool is_worker_slot);
bool is_attached_to(arena*);
void attach_task_dispatcher(task_dispatcher&);
void detach_task_dispatcher();
Expand All @@ -145,6 +152,9 @@ class thread_data : public ::rml::job
//! Indicates if the thread is created by RML
const bool my_is_worker;

//! Is the slot occupied in arena begongs to workers' quota?
slot_type my_worker_slot_type;
pavelkumbrasev marked this conversation as resolved.
Show resolved Hide resolved

//! The current task dipsatcher
task_dispatcher* my_task_dispatcher;

Expand Down Expand Up @@ -202,9 +212,10 @@ class thread_data : public ::rml::job
d1::task_group_context my_default_context;
};

inline void thread_data::attach_arena(arena& a, std::size_t index) {
inline void thread_data::attach_arena(arena& a, std::size_t index, bool is_worker_slot) {
my_arena = &a;
my_arena_index = static_cast<unsigned short>(index);
my_worker_slot_type = is_worker_slot? slot_type::worker : slot_type::master;
my_arena_slot = a.my_slots + index;
// Read the current slot mail_outbox and attach it to the mail_inbox (remove inbox later maybe)
my_inbox.attach(my_arena->mailbox(index));
Expand Down
32 changes: 20 additions & 12 deletions test/tbb/test_task_arena.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,31 +111,37 @@ class ArenaObserver : public tbb::task_scheduler_observer {
int myId; // unique observer/arena id within a test
int myMaxConcurrency; // concurrency of the associated arena
int myNumReservedSlots; // reserved slots in the associated arena
std::atomic<int> myNumActiveWorkers;
void on_scheduler_entry( bool is_worker ) override {
int current_index = tbb::this_task_arena::current_thread_index();
CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
if (is_worker) {
CHECK(current_index >= myNumReservedSlots);
int currNumActiveWorkers = ++myNumActiveWorkers;
CHECK(currNumActiveWorkers <= myMaxConcurrency - myNumReservedSlots);
}
CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry");
old_id.local() = local_id.local();
CHECK_MESSAGE(old_id.local() != myId, "double entry to the same arena");
local_id.local() = myId;
slot_id.local() = current_index;
}
void on_scheduler_exit( bool /*is_worker*/ ) override {
void on_scheduler_exit( bool is_worker ) override {
CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken");
CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index());
slot_id.local() = -2;
local_id.local() = old_id.local();
old_id.local() = 0;
if (is_worker) {
--myNumActiveWorkers;
}
}
public:
ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id)
: tbb::task_scheduler_observer(a)
, myId(id)
, myMaxConcurrency(maxConcurrency)
, myNumReservedSlots(numReservedSlots) {
, myNumReservedSlots(numReservedSlots)
, myNumActiveWorkers(0) {
CHECK(myId);
observe(true);
}
Expand Down Expand Up @@ -453,12 +459,8 @@ class TestArenaConcurrencyBody : utils::NoAssign {
int max_arena_concurrency = tbb::this_task_arena::max_concurrency();
REQUIRE( max_arena_concurrency == my_max_concurrency );
if ( my_worker_barrier ) {
if ( local_id.local() == 1 ) {
// External thread in a reserved slot
CHECK_MESSAGE( idx < my_reserved_slots, "External threads are supposed to use only reserved slots in this test" );
} else {
if ( local_id.local() != 1 ) {
// Worker thread
CHECK( idx >= my_reserved_slots );
my_worker_barrier->wait();
}
} else if ( my_barrier )
Expand Down Expand Up @@ -1545,20 +1547,26 @@ class simple_observer : public tbb::task_scheduler_observer {
int my_idx;
int myMaxConcurrency; // concurrency of the associated arena
int myNumReservedSlots; // reserved slots in the associated arena
std::atomic<int> myNumActiveWorkers;
void on_scheduler_entry( bool is_worker ) override {
int current_index = tbb::this_task_arena::current_thread_index();
CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
if (is_worker) {
CHECK(current_index >= myNumReservedSlots);
int currNumActiveWorkers = ++myNumActiveWorkers;
CHECK(currNumActiveWorkers <= myMaxConcurrency - myNumReservedSlots);
}
}
void on_scheduler_exit( bool is_worker ) override {
if (is_worker) {
--myNumActiveWorkers;
}
}
void on_scheduler_exit( bool /*is_worker*/ ) override
{}
public:
simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots)
: tbb::task_scheduler_observer(a), my_idx(idx_counter++)
, myMaxConcurrency(maxConcurrency)
, myNumReservedSlots(numReservedSlots) {
, myNumReservedSlots(numReservedSlots)
, myNumActiveWorkers(0) {
observe(true);
}

Expand Down
Loading