mirror of
https://github.com/RPCS3/rpcs3.git
synced 2025-07-07 15:31:26 +12:00
Optimize cpu_thread::suspend_all
Reduce internal thread status polling. Refactor utility functions.
This commit is contained in:
parent
050c3e1d6b
commit
b74c5e04f5
1 changed files with 121 additions and 54 deletions
|
@ -11,6 +11,7 @@
|
||||||
|
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
#include <numeric>
|
||||||
#include <map>
|
#include <map>
|
||||||
|
|
||||||
DECLARE(cpu_thread::g_threads_created){0};
|
DECLARE(cpu_thread::g_threads_created){0};
|
||||||
|
@ -20,6 +21,8 @@ DECLARE(cpu_thread::g_suspend_counter){0};
|
||||||
LOG_CHANNEL(profiler);
|
LOG_CHANNEL(profiler);
|
||||||
LOG_CHANNEL(sys_log, "SYS");
|
LOG_CHANNEL(sys_log, "SYS");
|
||||||
|
|
||||||
|
static thread_local u64 s_tls_thread_slot = -1;
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
void fmt_class_string<cpu_flag>::format(std::string& out, u64 arg)
|
void fmt_class_string<cpu_flag>::format(std::string& out, u64 arg)
|
||||||
{
|
{
|
||||||
|
@ -253,20 +256,24 @@ struct cpu_counter
|
||||||
alignas(64) atomic_t<u32> cpu_array_sema{0};
|
alignas(64) atomic_t<u32> cpu_array_sema{0};
|
||||||
|
|
||||||
// Semaphore subdivision for each array slot (64 x N in total)
|
// Semaphore subdivision for each array slot (64 x N in total)
|
||||||
atomic_t<u64> cpu_array_bits[6]{};
|
alignas(64) atomic_t<u64> cpu_array_bits[3]{};
|
||||||
|
|
||||||
|
// Copy of array bits for internal use
|
||||||
|
alignas(64) u64 cpu_copy_bits[3]{};
|
||||||
|
|
||||||
// All registered threads
|
// All registered threads
|
||||||
atomic_t<cpu_thread*> cpu_array[sizeof(cpu_array_bits) * 8]{};
|
atomic_t<cpu_thread*> cpu_array[sizeof(cpu_array_bits) * 8]{};
|
||||||
|
|
||||||
u64 add(cpu_thread* _this)
|
u64 add(cpu_thread* _this, bool restore = false) noexcept
|
||||||
{
|
{
|
||||||
if (!cpu_array_sema.try_inc(sizeof(cpu_counter::cpu_array_bits) * 8))
|
|
||||||
{
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
u64 array_slot = -1;
|
u64 array_slot = -1;
|
||||||
|
|
||||||
|
if (!restore && !cpu_array_sema.try_inc(sizeof(cpu_counter::cpu_array_bits) * 8))
|
||||||
|
{
|
||||||
|
sys_log.fatal("Too many threads.");
|
||||||
|
return array_slot;
|
||||||
|
}
|
||||||
|
|
||||||
for (u32 i = 0;; i = (i + 1) % ::size32(cpu_array_bits))
|
for (u32 i = 0;; i = (i + 1) % ::size32(cpu_array_bits))
|
||||||
{
|
{
|
||||||
const auto [bits, ok] = cpu_array_bits[i].fetch_op([](u64& bits) -> u64
|
const auto [bits, ok] = cpu_array_bits[i].fetch_op([](u64& bits) -> u64
|
||||||
|
@ -285,44 +292,92 @@ struct cpu_counter
|
||||||
{
|
{
|
||||||
// Get actual slot number
|
// Get actual slot number
|
||||||
array_slot = i * 64 + std::countr_one(bits);
|
array_slot = i * 64 + std::countr_one(bits);
|
||||||
break;
|
|
||||||
|
// Register thread
|
||||||
|
if (cpu_array[array_slot].compare_and_swap_test(nullptr, _this)) [[likely]]
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
sys_log.fatal("Unexpected slot registration failure (%u).", array_slot);
|
||||||
|
cpu_array_bits[array_slot / 64] &= ~(1ull << (array_slot % 64));
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register and wait if necessary
|
if (!restore)
|
||||||
verify("cpu_counter::add()" HERE), cpu_array[array_slot].exchange(_this) == nullptr;
|
{
|
||||||
|
// First time (thread created)
|
||||||
|
_this->state += cpu_flag::wait;
|
||||||
|
cpu_suspend_lock.lock_unlock();
|
||||||
|
}
|
||||||
|
|
||||||
_this->state += cpu_flag::wait;
|
|
||||||
cpu_suspend_lock.lock_unlock();
|
|
||||||
return array_slot;
|
return array_slot;
|
||||||
}
|
}
|
||||||
|
|
||||||
void remove(cpu_thread* _this, u64 slot)
|
void remove(cpu_thread* _this, u64 slot) noexcept
|
||||||
{
|
{
|
||||||
// Unregister and wait if necessary
|
// Unregister and wait if necessary
|
||||||
_this->state += cpu_flag::wait;
|
_this->state += cpu_flag::wait;
|
||||||
if (cpu_array[slot].exchange(nullptr) != _this)
|
|
||||||
|
std::lock_guard lock(cpu_suspend_lock);
|
||||||
|
|
||||||
|
if (!cpu_array[slot].compare_and_swap_test(_this, nullptr))
|
||||||
|
{
|
||||||
sys_log.fatal("Inconsistency for array slot %u", slot);
|
sys_log.fatal("Inconsistency for array slot %u", slot);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
cpu_array_bits[slot / 64] &= ~(1ull << (slot % 64));
|
cpu_array_bits[slot / 64] &= ~(1ull << (slot % 64));
|
||||||
cpu_array_sema--;
|
cpu_array_sema--;
|
||||||
cpu_suspend_lock.lock_unlock();
|
}
|
||||||
|
|
||||||
|
// Remove temporarily
|
||||||
|
void remove(cpu_thread* _this) noexcept
|
||||||
|
{
|
||||||
|
// Unregister temporarily (called from check_state)
|
||||||
|
const u64 index = s_tls_thread_slot;
|
||||||
|
|
||||||
|
if (index >= std::size(cpu_array))
|
||||||
|
{
|
||||||
|
sys_log.fatal("Index out of bounds (%u).", index);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cpu_array[index].load() == _this && cpu_array[index].compare_and_swap_test(_this, nullptr))
|
||||||
|
{
|
||||||
|
cpu_array_bits[index / 64] &= ~(1ull << (index % 64));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
sys_log.fatal("Thread not found in cpu_array (%s).", _this->get_name());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
template <typename F>
|
template <bool UseCopy = false, typename F>
|
||||||
void for_all_cpu(F func) noexcept
|
void for_all_cpu(F func) noexcept
|
||||||
{
|
{
|
||||||
auto ctr = g_fxo->get<cpu_counter>();
|
const auto ctr = g_fxo->get<cpu_counter>();
|
||||||
|
|
||||||
for (u32 i = 0; i < ::size32(ctr->cpu_array_bits); i++)
|
for (u32 i = 0; i < ::size32(ctr->cpu_array_bits); i++)
|
||||||
{
|
{
|
||||||
for (u64 bits = ctr->cpu_array_bits[i]; bits; bits &= bits - 1)
|
for (u64 bits = (UseCopy ? ctr->cpu_copy_bits[i] : ctr->cpu_array_bits[i].load()); bits; bits &= bits - 1)
|
||||||
{
|
{
|
||||||
const u64 index = i * 64 + std::countr_zero(bits);
|
const u64 index = i * 64 + std::countr_zero(bits);
|
||||||
|
|
||||||
if (cpu_thread* cpu = ctr->cpu_array[index].load())
|
if (cpu_thread* cpu = ctr->cpu_array[index].load())
|
||||||
{
|
{
|
||||||
func(cpu);
|
if constexpr (std::is_invocable_v<F, cpu_thread*, u64>)
|
||||||
|
{
|
||||||
|
func(cpu, index);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if constexpr (std::is_invocable_v<F, cpu_thread*>)
|
||||||
|
{
|
||||||
|
func(cpu);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -382,23 +437,20 @@ void cpu_thread::operator()()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register thread in g_cpu_array
|
// Register thread in g_cpu_array
|
||||||
const u64 array_slot = g_fxo->get<cpu_counter>()->add(this);
|
s_tls_thread_slot = g_fxo->get<cpu_counter>()->add(this);
|
||||||
|
|
||||||
if (array_slot == umax)
|
if (s_tls_thread_slot == umax)
|
||||||
{
|
{
|
||||||
sys_log.fatal("Too many threads.");
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
static thread_local struct thread_cleanup_t
|
static thread_local struct thread_cleanup_t
|
||||||
{
|
{
|
||||||
cpu_thread* _this;
|
cpu_thread* _this;
|
||||||
u64 slot;
|
|
||||||
std::string name;
|
std::string name;
|
||||||
|
|
||||||
thread_cleanup_t(cpu_thread* _this, u64 slot)
|
thread_cleanup_t(cpu_thread* _this)
|
||||||
: _this(_this)
|
: _this(_this)
|
||||||
, slot(slot)
|
|
||||||
, name(thread_ctrl::get_name())
|
, name(thread_ctrl::get_name())
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
@ -415,7 +467,7 @@ void cpu_thread::operator()()
|
||||||
ptr->compare_and_swap(_this, nullptr);
|
ptr->compare_and_swap(_this, nullptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
g_fxo->get<cpu_counter>()->remove(_this, slot);
|
g_fxo->get<cpu_counter>()->remove(_this, s_tls_thread_slot);
|
||||||
|
|
||||||
_this = nullptr;
|
_this = nullptr;
|
||||||
}
|
}
|
||||||
|
@ -428,7 +480,7 @@ void cpu_thread::operator()()
|
||||||
cleanup();
|
cleanup();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} cleanup{this, array_slot};
|
} cleanup{this};
|
||||||
|
|
||||||
// Check thread status
|
// Check thread status
|
||||||
while (!(state & (cpu_flag::exit + cpu_flag::dbg_global_stop)) && thread_ctrl::state() != thread_state::aborting)
|
while (!(state & (cpu_flag::exit + cpu_flag::dbg_global_stop)) && thread_ctrl::state() != thread_state::aborting)
|
||||||
|
@ -555,7 +607,8 @@ bool cpu_thread::check_state() noexcept
|
||||||
{
|
{
|
||||||
return retval;
|
return retval;
|
||||||
}
|
}
|
||||||
else if (!cpu_sleep_called && state0 & cpu_flag::suspend)
|
|
||||||
|
if (!cpu_sleep_called && state0 & cpu_flag::suspend)
|
||||||
{
|
{
|
||||||
cpu_sleep();
|
cpu_sleep();
|
||||||
cpu_sleep_called = true;
|
cpu_sleep_called = true;
|
||||||
|
@ -678,8 +731,11 @@ void cpu_thread::suspend_work::push(cpu_thread* _this) noexcept
|
||||||
// Value must be reliable because cpu_flag::wait hasn't been observed only (but not if pause is set)
|
// Value must be reliable because cpu_flag::wait hasn't been observed only (but not if pause is set)
|
||||||
const u64 susp_ctr = g_suspend_counter;
|
const u64 susp_ctr = g_suspend_counter;
|
||||||
|
|
||||||
|
// cpu_counter object
|
||||||
|
const auto ctr = g_fxo->get<cpu_counter>();
|
||||||
|
|
||||||
// Try to push workload
|
// Try to push workload
|
||||||
auto& queue = g_fxo->get<cpu_counter>()->cpu_suspend_work;
|
auto& queue = ctr->cpu_suspend_work;
|
||||||
|
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
|
@ -689,7 +745,7 @@ void cpu_thread::suspend_work::push(cpu_thread* _this) noexcept
|
||||||
if (!_this && next)
|
if (!_this && next)
|
||||||
{
|
{
|
||||||
// If _this == nullptr, it only works if this is the first workload pushed
|
// If _this == nullptr, it only works if this is the first workload pushed
|
||||||
g_fxo->get<cpu_counter>()->cpu_suspend_lock.lock_unlock();
|
ctr->cpu_suspend_lock.lock_unlock();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -698,34 +754,42 @@ void cpu_thread::suspend_work::push(cpu_thread* _this) noexcept
|
||||||
if (!next)
|
if (!next)
|
||||||
{
|
{
|
||||||
// First thread to push the work to the workload list pauses all threads and processes it
|
// First thread to push the work to the workload list pauses all threads and processes it
|
||||||
std::lock_guard lock(g_fxo->get<cpu_counter>()->cpu_suspend_lock);
|
std::lock_guard lock(ctr->cpu_suspend_lock);
|
||||||
|
|
||||||
for_all_cpu([&](cpu_thread* cpu)
|
// Copy of thread bits
|
||||||
|
decltype(ctr->cpu_copy_bits) copy2{};
|
||||||
|
|
||||||
|
for (u32 i = 0; i < ::size32(ctr->cpu_copy_bits); i++)
|
||||||
{
|
{
|
||||||
if (!(cpu->state & cpu_flag::pause) && cpu != _this)
|
copy2[i] = ctr->cpu_copy_bits[i] = ctr->cpu_array_bits[i].load();
|
||||||
|
}
|
||||||
|
|
||||||
|
for_all_cpu([&](cpu_thread* cpu, u64 index)
|
||||||
|
{
|
||||||
|
if (cpu == _this || cpu->state.fetch_add(cpu_flag::pause) & cpu_flag::wait)
|
||||||
{
|
{
|
||||||
cpu->state += cpu_flag::pause;
|
// Clear bits as long as wait flag is set
|
||||||
|
ctr->cpu_copy_bits[index / 64] &= ~(1ull << (index % 64));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cpu == _this)
|
||||||
|
{
|
||||||
|
copy2[index / 64] &= ~(1ull << (index % 64));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
busy_wait(500);
|
busy_wait(500);
|
||||||
|
|
||||||
while (true)
|
while (std::accumulate(std::begin(ctr->cpu_copy_bits), std::end(ctr->cpu_copy_bits), u64{0}, std::bit_or()))
|
||||||
{
|
{
|
||||||
bool ok = true;
|
// Check only CPUs which haven't acknowledged their waiting state yet
|
||||||
|
for_all_cpu<true>([&](cpu_thread* cpu, u64 index)
|
||||||
for_all_cpu([&](cpu_thread* cpu)
|
|
||||||
{
|
{
|
||||||
if (!(cpu->state & cpu_flag::wait) && cpu != _this)
|
if (cpu->state & cpu_flag::wait)
|
||||||
{
|
{
|
||||||
ok = false;
|
ctr->cpu_copy_bits[index / 64] &= ~(1ull << (index % 64));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
if (ok) [[likely]]
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract queue and reverse element order (FILO to FIFO) (TODO: maybe leave order as is?)
|
// Extract queue and reverse element order (FILO to FIFO) (TODO: maybe leave order as is?)
|
||||||
|
@ -754,12 +818,12 @@ void cpu_thread::suspend_work::push(cpu_thread* _this) noexcept
|
||||||
// Finalization
|
// Finalization
|
||||||
g_suspend_counter++;
|
g_suspend_counter++;
|
||||||
|
|
||||||
for_all_cpu([&](cpu_thread* cpu)
|
// Exact bitset for flag pause removal
|
||||||
|
std::memcpy(ctr->cpu_copy_bits, copy2, sizeof(copy2));
|
||||||
|
|
||||||
|
for_all_cpu<true>([&](cpu_thread* cpu)
|
||||||
{
|
{
|
||||||
if (cpu != _this)
|
cpu->state -= cpu_flag::pause;
|
||||||
{
|
|
||||||
cpu->state -= cpu_flag::pause;
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -792,11 +856,14 @@ void cpu_thread::stop_all() noexcept
|
||||||
{
|
{
|
||||||
std::lock_guard lock(g_fxo->get<cpu_counter>()->cpu_suspend_lock);
|
std::lock_guard lock(g_fxo->get<cpu_counter>()->cpu_suspend_lock);
|
||||||
|
|
||||||
for_all_cpu([](cpu_thread* cpu)
|
auto on_stop = [](u32, cpu_thread& cpu)
|
||||||
{
|
{
|
||||||
cpu->state += cpu_flag::dbg_global_stop;
|
cpu.state += cpu_flag::dbg_global_stop;
|
||||||
cpu->abort();
|
cpu.abort();
|
||||||
});
|
};
|
||||||
|
|
||||||
|
idm::select<named_thread<ppu_thread>>(on_stop);
|
||||||
|
idm::select<named_thread<spu_thread>>(on_stop);
|
||||||
}
|
}
|
||||||
|
|
||||||
sys_log.notice("All CPU threads have been signaled.");
|
sys_log.notice("All CPU threads have been signaled.");
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue