Skip to content

Commit

Permalink
LV2: allocation-free synchronization syscalls
Browse files Browse the repository at this point in the history
* Show waiters' ID in kernel explorer.
* Remove deque dependency from sys_sync.h
  • Loading branch information
elad335 committed Jul 25, 2022
1 parent 5685e4c commit c83eaf3
Show file tree
Hide file tree
Showing 26 changed files with 540 additions and 271 deletions.
18 changes: 18 additions & 0 deletions rpcs3/Emu/CPU/CPUThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,24 @@ u32* cpu_thread::get_pc2()
return nullptr;
}

cpu_thread* cpu_thread::get_next_cpu()
{
switch (id_type())
{
case 1:
{
return static_cast<ppu_thread*>(this)->next_cpu;
}
case 2:
{
return static_cast<spu_thread*>(this)->next_cpu;
}
default: break;
}

return nullptr;
}

std::shared_ptr<CPUDisAsm> make_disasm(const cpu_thread* cpu);

void cpu_thread::dump_all(std::string& ret) const
Expand Down
1 change: 1 addition & 0 deletions rpcs3/Emu/CPU/CPUThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class cpu_thread

u32 get_pc() const;
u32* get_pc2(); // Last PC before stepping for the debugger (may be null)
cpu_thread* get_next_cpu(); // Access next_cpu member if the is one

void notify();
cpu_thread& operator=(thread_state);
Expand Down
4 changes: 4 additions & 0 deletions rpcs3/Emu/Cell/PPUThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,10 @@ class ppu_thread : public cpu_thread
std::shared_ptr<utils::serial> optional_savestate_state;
bool interrupt_thread_executing = false;

atomic_t<ppu_thread*> next_cpu{}; // LV2 sleep queues' node link
atomic_t<ppu_thread*> next_ppu{}; // LV2 PPU running queue's node link
bool ack_suspend = false;

be_t<u64>* get_stack_arg(s32 i, u64 align = alignof(u64));
void exec_task();
void fast_call(u32 addr, u64 rtoc);
Expand Down
2 changes: 1 addition & 1 deletion rpcs3/Emu/Cell/SPUThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4832,7 +4832,7 @@ bool spu_thread::stop_and_signal(u32 code)

if (queue->events.empty())
{
queue->sq.emplace_back(this);
lv2_obj::emplace(queue->sq, this);
group->run_state = SPU_THREAD_GROUP_STATUS_WAITING;
group->waiter_spu_index = index;

Expand Down
2 changes: 2 additions & 0 deletions rpcs3/Emu/Cell/SPUThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,8 @@ class spu_thread : public cpu_thread
const u32 option; // sys_spu_thread_initialize option
const u32 lv2_id; // The actual id that is used by syscalls

atomic_t<spu_thread*> next_cpu{}; // LV2 thread queues' node link

// Thread name
atomic_ptr<std::string> spu_tname;

Expand Down
148 changes: 107 additions & 41 deletions rpcs3/Emu/Cell/lv2/lv2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "sys_crypto_engine.h"

#include <optional>
#include <deque>

extern std::string ppu_get_syscall_name(u64 code);

Expand Down Expand Up @@ -1187,14 +1188,18 @@ std::string ppu_get_syscall_name(u64 code)
}

DECLARE(lv2_obj::g_mutex);
DECLARE(lv2_obj::g_ppu);
DECLARE(lv2_obj::g_pending);
DECLARE(lv2_obj::g_waiting);
DECLARE(lv2_obj::g_to_sleep);
DECLARE(lv2_obj::g_ppu){};
DECLARE(lv2_obj::g_pending){};

thread_local DECLARE(lv2_obj::g_to_notify){};
thread_local DECLARE(lv2_obj::g_to_awake);

// Scheduler queue for timeouts (wait until -> thread)
static std::deque<std::pair<u64, class cpu_thread*>> g_waiting;

// Threads which must call lv2_obj::sleep before the scheduler starts
static std::deque<class cpu_thread*> g_to_sleep;

namespace cpu_counter
{
void remove(cpu_thread*) noexcept;
Expand Down Expand Up @@ -1260,7 +1265,7 @@ void lv2_obj::sleep_unlocked(cpu_thread& thread, u64 timeout, bool notify_later)

if (auto ppu = thread.try_get<ppu_thread>())
{
ppu_log.trace("sleep() - waiting (%zu)", g_pending.size());
ppu_log.trace("sleep() - waiting (%zu)", g_pending);

const auto [_, ok] = ppu->state.fetch_op([&](bs_t<cpu_flag>& val)
{
Expand All @@ -1280,10 +1285,11 @@ void lv2_obj::sleep_unlocked(cpu_thread& thread, u64 timeout, bool notify_later)
}

// Find and remove the thread
if (!unqueue(g_ppu, ppu))
if (!unqueue(g_ppu, ppu, &ppu_thread::next_ppu))
{
if (unqueue(g_to_sleep, ppu))
if (auto it = std::find(g_to_sleep.begin(), g_to_sleep.end(), ppu); it != g_to_sleep.end())
{
g_to_sleep.erase(it);
ppu->start_time = start_time;
on_to_sleep_update();
}
Expand All @@ -1293,15 +1299,19 @@ void lv2_obj::sleep_unlocked(cpu_thread& thread, u64 timeout, bool notify_later)
return;
}

unqueue(g_pending, ppu);
if (std::exchange(ppu->ack_suspend, false))
{
g_pending--;
}

ppu->raddr = 0; // Clear reservation
ppu->start_time = start_time;
}
else if (auto spu = thread.try_get<spu_thread>())
{
if (unqueue(g_to_sleep, spu))
if (auto it = std::find(g_to_sleep.begin(), g_to_sleep.end(), ppu); it != g_to_sleep.end())
{
g_to_sleep.erase(it);
on_to_sleep_update();
}

Expand Down Expand Up @@ -1344,7 +1354,7 @@ bool lv2_obj::awake_unlocked(cpu_thread* cpu, bool notify_later, s32 prio)
default:
{
// Priority set
if (static_cast<ppu_thread*>(cpu)->prio.exchange(prio) == prio || !unqueue(g_ppu, cpu))
if (static_cast<ppu_thread*>(cpu)->prio.exchange(prio) == prio || !unqueue(g_ppu, static_cast<ppu_thread*>(cpu), &ppu_thread::next_ppu))
{
return true;
}
Expand All @@ -1353,36 +1363,66 @@ bool lv2_obj::awake_unlocked(cpu_thread* cpu, bool notify_later, s32 prio)
}
case yield_cmd:
{
usz i = 0;

// Yield command
for (usz i = 0;; i++)
for (auto ppu_next = &g_ppu;; i++)
{
if (i + 1 >= g_ppu.size())
const auto ppu = +*ppu_next;

if (!ppu)
{
return false;
}

if (const auto ppu = g_ppu[i]; ppu == cpu)
if (ppu == cpu)
{
usz j = i + 1;
auto ppu2_next = &ppu->next_ppu;

if (auto next = +*ppu2_next; !next || next->prio != ppu->prio)
{
return false;
}

for (; j < g_ppu.size(); j++)
for (;; i++)
{
if (g_ppu[j]->prio != ppu->prio)
const auto next = +*ppu2_next;

if (auto next2 = +next->next_ppu; !next2 || next2->prio != ppu->prio)
{
break;
}

ppu2_next = &next->next_ppu;
}

if (j == i + 1)
if (ppu2_next == &ppu->next_ppu)
{
// Empty 'same prio' threads list
return false;
}

auto ppu2 = +*ppu2_next;

// Rotate current thread to the last position of the 'same prio' threads list
std::rotate(g_ppu.begin() + i, g_ppu.begin() + i + 1, g_ppu.begin() + j);
ppu_next->release(ppu2);

// Exchange forward pointers
if (ppu->next_ppu != ppu2)
{
auto ppu2_val = +ppu2->next_ppu;
ppu2->next_ppu.release(+ppu->next_ppu);
ppu->next_ppu.release(ppu2_val);
ppu2_next->release(ppu);
}
else
{
auto ppu2_val = +ppu2->next_ppu;
ppu2->next_ppu.release(ppu);
ppu->next_ppu.release(ppu2_val);
}

if (j <= g_cfg.core.ppu_threads + 0u)
if (i <= g_cfg.core.ppu_threads + 0u)
{
// Threads were rotated, but no context switch was made
return false;
Expand All @@ -1392,7 +1432,10 @@ bool lv2_obj::awake_unlocked(cpu_thread* cpu, bool notify_later, s32 prio)
cpu = nullptr; // Disable current thread enqueing, also enable threads list enqueing
break;
}

ppu_next = &ppu->next_ppu;
}

break;
}
case enqueue_cmd:
Expand All @@ -1403,20 +1446,25 @@ bool lv2_obj::awake_unlocked(cpu_thread* cpu, bool notify_later, s32 prio)

const auto emplace_thread = [](cpu_thread* const cpu)
{
for (auto it = g_ppu.cbegin(), end = g_ppu.cend();; it++)
for (auto it = &g_ppu;;)
{
if (it != end && *it == cpu)
const auto next = +*it;

if (next == cpu)
{
ppu_log.trace("sleep() - suspended (p=%zu)", g_pending.size());
ppu_log.trace("sleep() - suspended (p=%zu)", g_pending);
return false;
}

// Use priority, also preserve FIFO order
if (it == end || (*it)->prio > static_cast<ppu_thread*>(cpu)->prio)
if (!next || next->prio > static_cast<ppu_thread*>(cpu)->prio)
{
g_ppu.insert(it, static_cast<ppu_thread*>(cpu));
it->release(static_cast<ppu_thread*>(cpu));
static_cast<ppu_thread*>(cpu)->next_ppu.release(next);
break;
}

it = &next->next_ppu;
}

// Unregister timeout if necessary
Expand Down Expand Up @@ -1448,20 +1496,28 @@ bool lv2_obj::awake_unlocked(cpu_thread* cpu, bool notify_later, s32 prio)
}

// Remove pending if necessary
if (!g_pending.empty() && ((cpu && cpu == get_current_cpu_thread()) || prio == yield_cmd))
if (g_pending && ((cpu && cpu == get_current_cpu_thread()) || prio == yield_cmd))
{
unqueue(g_pending, get_current_cpu_thread());
if (auto cur = cpu_thread::get_current<ppu_thread>())
{
if (std::exchange(cur->ack_suspend, false))
{
g_pending--;
}
}
}

auto target = +g_ppu;

// Suspend threads if necessary
for (usz i = g_cfg.core.ppu_threads; changed_queue && i < g_ppu.size(); i++)
for (usz i = 0, thread_count = g_cfg.core.ppu_threads; changed_queue && target; target = target->next_ppu, i++)
{
const auto target = g_ppu[i];

if (!target->state.test_and_set(cpu_flag::suspend))
if (i >= thread_count && cpu_flag::suspend - target->state)
{
ppu_log.trace("suspend(): %s", target->id);
g_pending.emplace_back(target);
target->ack_suspend = true;
g_pending++;
ensure(!target->state.test_and_set(cpu_flag::suspend));

if (is_paused(target->state - cpu_flag::suspend))
{
Expand All @@ -1476,23 +1532,23 @@ bool lv2_obj::awake_unlocked(cpu_thread* cpu, bool notify_later, s32 prio)

void lv2_obj::cleanup()
{
g_ppu.clear();
g_pending.clear();
g_waiting.clear();
g_ppu = nullptr;
g_to_sleep.clear();
g_waiting.clear();
g_pending = 0;
}

void lv2_obj::schedule_all(bool notify_later)
{
if (g_pending.empty() && g_to_sleep.empty())
if (!g_pending && g_to_sleep.empty())
{
usz notify_later_idx = notify_later ? 0 : std::size(g_to_notify) - 1;

auto target = +g_ppu;

// Wake up threads
for (usz i = 0, x = std::min<usz>(g_cfg.core.ppu_threads, g_ppu.size()); i < x; i++)
for (usz x = g_cfg.core.ppu_threads; target && x; target = target->next_ppu, x--)
{
const auto target = g_ppu[i];

if (target->state & cpu_flag::suspend)
{
ppu_log.trace("schedule(): %s", target->id);
Expand Down Expand Up @@ -1557,9 +1613,19 @@ ppu_thread_status lv2_obj::ppu_state(ppu_thread* ppu, bool lock_idm, bool lock_l
opt_lock[1].emplace(lv2_obj::g_mutex);
}

const auto it = std::find(g_ppu.begin(), g_ppu.end(), ppu);
usz pos = umax;
usz i = 0;

for (auto target = +g_ppu; target; target = target->next_ppu, i++)
{
if (target == ppu)
{
pos = i;
break;
}
}

if (it == g_ppu.end())
if (pos == umax)
{
if (!ppu->interrupt_thread_executing)
{
Expand All @@ -1569,7 +1635,7 @@ ppu_thread_status lv2_obj::ppu_state(ppu_thread* ppu, bool lock_idm, bool lock_l
return PPU_THREAD_STATUS_SLEEP;
}

if (it - g_ppu.begin() >= g_cfg.core.ppu_threads)
if (pos >= g_cfg.core.ppu_threads + 0u)
{
return PPU_THREAD_STATUS_RUNNABLE;
}
Expand Down
Loading

0 comments on commit c83eaf3

Please sign in to comment.