Implement cpu_thread::suspend_all

Remove Accurate PUTLLC option.
Implement fallback path for SPU transactions.
This commit is contained in:
Nekotekina 2019-06-06 21:32:35 +03:00
parent 17d0dcb7a2
commit 5d45a3e47d
18 changed files with 843 additions and 362 deletions

View file

@ -19,10 +19,13 @@ void fmt_class_string<cpu_flag>::format(std::string& out, u64 arg)
{
case cpu_flag::stop: return "STOP";
case cpu_flag::exit: return "EXIT";
case cpu_flag::wait: return "w";
case cpu_flag::pause: return "p";
case cpu_flag::suspend: return "s";
case cpu_flag::ret: return "ret";
case cpu_flag::signal: return "sig";
case cpu_flag::memory: return "mem";
case cpu_flag::jit_return: return "JIT";
case cpu_flag::dbg_global_pause: return "G-PAUSE";
case cpu_flag::dbg_global_stop: return "G-EXIT";
case cpu_flag::dbg_pause: return "PAUSE";
@ -42,10 +45,43 @@ void fmt_class_string<bs_t<cpu_flag>>::format(std::string& out, u64 arg)
thread_local cpu_thread* g_tls_current_cpu_thread = nullptr;
// For coordination and notification
alignas(64) shared_cond g_cpu_array_lock;
// For cpu_flag::pause bit setting/removing
alignas(64) shared_mutex g_cpu_pause_lock;
// For cpu_flag::pause
alignas(64) atomic_t<u64> g_cpu_pause_ctr{0};
// Semaphore for global thread array (global counter)
alignas(64) atomic_t<u32> g_cpu_array_sema{0};
// Semaphore subdivision for each array slot (64 x N in total)
atomic_t<u64> g_cpu_array_bits[6]{};
// All registered threads
atomic_t<cpu_thread*> g_cpu_array[sizeof(g_cpu_array_bits) * 8]{};
template <typename F>
void for_all_cpu(F&& func) noexcept
{
for (u32 i = 0; i < ::size32(g_cpu_array_bits); i++)
{
for (u64 bits = g_cpu_array_bits[i]; bits; bits &= bits - 1)
{
const u64 index = i * 64 + utils::cnttz64(bits, true);
if (cpu_thread* cpu = g_cpu_array[index].load())
{
func(cpu);
}
}
}
}
void cpu_thread::operator()()
{
state -= cpu_flag::exit;
g_tls_current_cpu_thread = this;
if (g_cfg.core.thread_scheduler_enabled)
@ -58,6 +94,48 @@ void cpu_thread::operator()()
thread_ctrl::set_native_priority(-1);
}
// Register thread in g_cpu_array
if (!g_cpu_array_sema.try_inc(sizeof(g_cpu_array_bits) * 8))
{
LOG_FATAL(GENERAL, "Too many threads");
Emu.Pause();
return;
}
u64 array_slot = -1;
for (u32 i = 0;; i = (i + 1) % ::size32(g_cpu_array_bits))
{
if (LIKELY(~g_cpu_array_bits[i]))
{
const u64 found = g_cpu_array_bits[i].atomic_op([](u64& bits) -> u64
{
// Find empty array slot and set its bit
if (LIKELY(~bits))
{
const u64 bit = utils::cnttz64(~bits, true);
bits |= 1ull << bit;
return bit;
}
return 64;
});
if (LIKELY(found < 64))
{
// Fixup
array_slot = i * 64 + found;
break;
}
}
}
// Register and wait if necessary
verify("g_cpu_array[...] -> this" HERE), g_cpu_array[array_slot].exchange(this) == nullptr;
state += cpu_flag::wait;
g_cpu_array_lock.wait_all();
// Check thread status
while (!(state & (cpu_flag::exit + cpu_flag::dbg_global_stop)))
{
@ -86,6 +164,13 @@ void cpu_thread::operator()()
thread_ctrl::wait();
}
// Unregister and wait if necessary
state += cpu_flag::wait;
verify("g_cpu_array[...] -> null" HERE), g_cpu_array[array_slot].exchange(nullptr) == this;
g_cpu_array_bits[array_slot / 64] &= ~(1ull << (array_slot % 64));
g_cpu_array_sema--;
g_cpu_array_lock.wait_all();
}
void cpu_thread::on_abort()
@ -105,7 +190,7 @@ cpu_thread::cpu_thread(u32 id)
g_threads_created++;
}
bool cpu_thread::check_state()
bool cpu_thread::check_state() noexcept
{
#ifdef WITH_GDB_DEBUGGER
if (state & cpu_flag::dbg_pause)
@ -117,6 +202,11 @@ bool cpu_thread::check_state()
bool cpu_sleep_called = false;
bool cpu_flag_memory = false;
if (!(state & cpu_flag::wait))
{
state += cpu_flag::wait;
}
while (true)
{
if (state & cpu_flag::memory)
@ -131,8 +221,9 @@ bool cpu_thread::check_state()
state -= cpu_flag::memory;
}
if (state & cpu_flag::exit + cpu_flag::jit_return + cpu_flag::dbg_global_stop)
if (state & (cpu_flag::exit + cpu_flag::jit_return + cpu_flag::dbg_global_stop))
{
state += cpu_flag::wait;
return true;
}
@ -141,7 +232,24 @@ bool cpu_thread::check_state()
cpu_sleep_called = false;
}
if (!is_paused())
const auto [state0, escape] = state.fetch_op([&](bs_t<cpu_flag>& flags)
{
// Check pause flags which hold thread inside check_state
if (flags & (cpu_flag::pause + cpu_flag::suspend + cpu_flag::dbg_global_pause + cpu_flag::dbg_pause))
{
return false;
}
// Atomically clean wait flag and escape
if (!(flags & (cpu_flag::exit + cpu_flag::jit_return + cpu_flag::dbg_global_stop + cpu_flag::ret + cpu_flag::stop)))
{
flags -= cpu_flag::wait;
}
return true;
});
if (escape)
{
if (cpu_flag_memory)
{
@ -150,14 +258,43 @@ bool cpu_thread::check_state()
break;
}
else if (!cpu_sleep_called && state & cpu_flag::suspend)
else if (!cpu_sleep_called && state0 & cpu_flag::suspend)
{
cpu_sleep();
cpu_sleep_called = true;
continue;
}
thread_ctrl::wait();
if (state & cpu_flag::wait)
{
// Spin wait once for a bit before resorting to thread_ctrl::wait
for (u32 i = 0; i < 10; i++)
{
if (state0 & (cpu_flag::pause + cpu_flag::suspend))
{
busy_wait(500);
}
else
{
break;
}
}
if (!(state0 & (cpu_flag::pause + cpu_flag::suspend)))
{
continue;
}
}
if (state0 & (cpu_flag::suspend + cpu_flag::dbg_global_pause + cpu_flag::dbg_pause))
{
thread_ctrl::wait();
}
else
{
// If only cpu_flag::pause was set, notification won't arrive
g_cpu_array_lock.wait_all();
}
}
const auto state_ = state.load();
@ -196,3 +333,90 @@ std::string cpu_thread::dump() const
{
return fmt::format("Type: %s\n" "State: %s\n", typeid(*this).name(), state.load());
}
cpu_thread::suspend_all::suspend_all(cpu_thread* _this) noexcept
: m_lock(g_cpu_array_lock.try_shared_lock())
, m_this(_this)
{
// TODO
if (!m_lock)
{
LOG_FATAL(GENERAL, "g_cpu_array_lock: too many concurrent accesses");
Emu.Pause();
return;
}
if (m_this)
{
m_this->state += cpu_flag::wait;
}
g_cpu_pause_ctr++;
reader_lock lock(g_cpu_pause_lock);
for_all_cpu([](cpu_thread* cpu)
{
cpu->state += cpu_flag::pause;
});
busy_wait(500);
while (true)
{
bool ok = true;
for_all_cpu([&](cpu_thread* cpu)
{
if (!(cpu->state & cpu_flag::wait))
{
ok = false;
}
});
if (LIKELY(ok))
{
break;
}
busy_wait(500);
}
}
cpu_thread::suspend_all::~suspend_all()
{
// Make sure the latest thread does the cleanup and notifies others
u64 pause_ctr = 0;
while ((pause_ctr = g_cpu_pause_ctr), !g_cpu_array_lock.wait_all(m_lock))
{
if (pause_ctr)
{
std::lock_guard lock(g_cpu_pause_lock);
// Detect possible unfortunate reordering of flag clearing after suspend_all's reader lock
if (g_cpu_pause_ctr != pause_ctr)
{
continue;
}
for_all_cpu([&](cpu_thread* cpu)
{
if (g_cpu_pause_ctr == pause_ctr)
{
cpu->state -= cpu_flag::pause;
}
});
}
if (g_cpu_array_lock.notify_all(m_lock))
{
break;
}
}
if (m_this)
{
m_this->check_state();
}
}