From 6806e3d5c73e472b497140dff9fc5058976c4997 Mon Sep 17 00:00:00 2001 From: Nekotekina Date: Mon, 26 Oct 2020 04:02:39 +0300 Subject: [PATCH] atomic.cpp: implement notify callback Notification can be very heavy, especially if we need to wake many threads. Callback is set for cpu_thread in order to set wait flag accordingly. --- Utilities/Thread.cpp | 2 +- rpcs3/Emu/CPU/CPUThread.cpp | 25 +++++++++++++ rpcs3/util/atomic.cpp | 70 +++++++++++++++++++++++++++++++++++-- rpcs3/util/atomic.hpp | 1 + 4 files changed, 94 insertions(+), 4 deletions(-) diff --git a/Utilities/Thread.cpp b/Utilities/Thread.cpp index 47fc3b3edb..8b32f895a1 100644 --- a/Utilities/Thread.cpp +++ b/Utilities/Thread.cpp @@ -1970,7 +1970,7 @@ bool thread_base::finalize(thread_state result_state) noexcept void thread_base::finalize() noexcept { - atomic_storage_futex::set_wait_callback([](const void*){ return true; }); + atomic_storage_futex::set_wait_callback(nullptr); g_tls_log_prefix = []() -> std::string { return {}; }; thread_ctrl::g_tls_this_thread = nullptr; } diff --git a/rpcs3/Emu/CPU/CPUThread.cpp b/rpcs3/Emu/CPU/CPUThread.cpp index 1c142110fc..97376f3e23 100644 --- a/rpcs3/Emu/CPU/CPUThread.cpp +++ b/rpcs3/Emu/CPU/CPUThread.cpp @@ -446,6 +446,29 @@ void cpu_thread::operator()() return; } + atomic_storage_futex::set_notify_callback([](const void*, u64 progress) + { + static thread_local bool wait_set = false; + + cpu_thread* _cpu = get_current_cpu_thread(); + + // Wait flag isn't set asynchronously so this should be thread-safe + if (progress == 0 && !(_cpu->state & cpu_flag::wait)) + { + // Operation just started and syscall is imminent + _cpu->state += cpu_flag::wait + cpu_flag::temp; + wait_set = true; + return; + } + + if (progress == umax && std::exchange(wait_set, false)) + { + // Operation finished: need to clean wait flag + verify(HERE), !_cpu->check_state(); + return; + } + }); + static thread_local struct thread_cleanup_t { cpu_thread* _this; @@ -469,6 +492,8 @@ void cpu_thread::operator()() ptr->compare_and_swap(_this, nullptr); } + atomic_storage_futex::set_notify_callback(nullptr); + g_fxo->get()->remove(_this, s_tls_thread_slot); _this = nullptr; diff --git a/rpcs3/util/atomic.cpp b/rpcs3/util/atomic.cpp index b5d48b3f12..67eefbf27f 100644 --- a/rpcs3/util/atomic.cpp +++ b/rpcs3/util/atomic.cpp @@ -41,6 +41,9 @@ static constexpr u64 one_v = Mask & (0 - Mask); // Callback for wait() function, returns false if wait should return static thread_local bool(*s_tls_wait_cb)(const void* data) = [](const void*){ return true; }; +// Callback for notification functions for optimizations +static thread_local void(*s_tls_notify_cb)(const void* data, u64 progress) = [](const void*, u64){}; + // Compare data in memory with old value, and return true if they are equal template static inline bool ptr_cmp(const void* data, std::size_t size, u64 old_value, u64 mask) @@ -682,11 +685,17 @@ SAFE_BUFFERS void atomic_storage_futex::wait(const void* data, std::size_t size, } // Platform specific wake-up function -static inline bool alert_sema(atomic_t* sema) +static inline bool alert_sema(atomic_t* sema, const void* data, u64 progress) { #ifdef USE_FUTEX if (sema->load() == 1 && sema->compare_and_swap_test(1, 2)) { + if (!progress) + { + // Imminent notification + s_tls_notify_cb(data, 0); + } + // Use "wake all" arg for robustness, only 1 thread is expected futex(sema, FUTEX_WAKE_PRIVATE, 0x7fff'ffff); return true; @@ -713,6 +722,12 @@ static inline bool alert_sema(atomic_t* sema) { if (auto cond = cond_get(cond_id)) { + if (!progress) + { + // Imminent notification + s_tls_notify_cb(data, 0); + } + // Not super efficient: locking is required to avoid lost notifications cond->mtx.lock(); cond->mtx.unlock(); @@ -738,8 +753,15 @@ static inline bool alert_sema(atomic_t* sema) // Check if tid is neither 0 nor -1 if (tid + 1 > 1 && sema->compare_and_swap_test(tid, -1)) { + if (!progress) + { + // Imminent notification + s_tls_notify_cb(data, 0); + } + if (NtAlertThreadByThreadId(tid) == NTSTATUS_SUCCESS) { + // Could be some dead thread otherwise return true; } } @@ -749,6 +771,12 @@ static inline bool alert_sema(atomic_t* sema) if (sema->load() == 1 && sema->compare_and_swap_test(1, 2)) { + if (!progress) + { + // Imminent notification + s_tls_notify_cb(data, 0); + } + // Can wait in rare cases, which is its annoying weakness NtReleaseKeyedEvent(nullptr, sema, 1, nullptr); return true; @@ -764,6 +792,22 @@ void atomic_storage_futex::set_wait_callback(bool(*cb)(const void* data)) { s_tls_wait_cb = cb; } + else + { + s_tls_wait_cb = [](const void*){ return true; }; + } +} + +void atomic_storage_futex::set_notify_callback(void(*cb)(const void*, u64)) +{ + if (cb) + { + s_tls_notify_cb = cb; + } + else + { + s_tls_notify_cb = [](const void*, u64){}; + } } void atomic_storage_futex::raw_notify(const void* data) @@ -785,15 +829,20 @@ void atomic_storage_futex::notify_one(const void* data) return; } + u64 progress = 0; + for (u64 bits = slot->sema_bits; bits; bits &= bits - 1) { const auto sema = &slot->sema_data[std::countr_zero(bits)]; - if (alert_sema(sema)) + if (alert_sema(sema, data, progress)) { + s_tls_notify_cb(data, ++progress); break; } } + + s_tls_notify_cb(data, -1); } void atomic_storage_futex::notify_all(const void* data) @@ -807,6 +856,8 @@ void atomic_storage_futex::notify_all(const void* data) return; } + u64 progress = 0; + #if defined(_WIN32) && !defined(USE_FUTEX) if (!NtAlertThreadByThreadId) { @@ -825,6 +876,12 @@ void atomic_storage_futex::notify_all(const void* data) if (sema->load() == 1 && sema->compare_and_swap_test(1, 2)) { // Waiters locked for notification + if (bits == copy) + { + // Notify imminent notification + s_tls_notify_cb(data, 0); + } + continue; } @@ -847,6 +904,8 @@ void atomic_storage_futex::notify_all(const void* data) continue; } + s_tls_notify_cb(data, ++progress); + // Remove the bit from next stage copy &= ~(1ull << id); } @@ -856,8 +915,10 @@ void atomic_storage_futex::notify_all(const void* data) for (u64 bits = copy; bits; bits &= bits - 1) { NtReleaseKeyedEvent(nullptr, &slot->sema_data[std::countr_zero(bits)], 1, nullptr); + s_tls_notify_cb(data, ++progress); } + s_tls_notify_cb(data, -1); return; } #endif @@ -866,9 +927,12 @@ void atomic_storage_futex::notify_all(const void* data) { const auto sema = &slot->sema_data[std::countr_zero(bits)]; - if (alert_sema(sema)) + if (alert_sema(sema, data, progress)) { + s_tls_notify_cb(data, ++progress); continue; } } + + s_tls_notify_cb(data, -1); } diff --git a/rpcs3/util/atomic.hpp b/rpcs3/util/atomic.hpp index 73395057b5..205e002bff 100644 --- a/rpcs3/util/atomic.hpp +++ b/rpcs3/util/atomic.hpp @@ -27,6 +27,7 @@ private: public: static void set_wait_callback(bool(*cb)(const void* data)); + static void set_notify_callback(void(*cb)(const void* data, u64 progress)); static void raw_notify(const void* data); };