diff --git a/rpcs3/Emu/Cell/SPUThread.cpp b/rpcs3/Emu/Cell/SPUThread.cpp index 6a99a1b1b7..eeca00fa97 100644 --- a/rpcs3/Emu/Cell/SPUThread.cpp +++ b/rpcs3/Emu/Cell/SPUThread.cpp @@ -1316,9 +1316,9 @@ void SPUThread::stop_and_signal(u32 code) { group->state = SPU_THREAD_GROUP_STATUS_WAITING; - for (auto& t : group->threads) + for (auto& thread : group->threads) { - if (t) t->sleep(); // trigger status check + if (thread) thread->sleep(); // trigger status check } } else @@ -1326,35 +1326,37 @@ void SPUThread::stop_and_signal(u32 code) throw EXCEPTION("Unexpected SPU Thread Group state (%d)", group->state); } - // protocol is ignored in current implementation - queue->waiters++; - - // wait on the event queue - while (queue->events.empty() && !queue->cancelled) + if (queue->events.empty() || !queue->sq.empty()) { - CHECK_EMU_STATUS; + // add waiter; protocol is ignored in current implementation + sleep_queue_entry_t waiter(*this, queue->sq); + + // wait on the event queue + while (!unsignal()) + { + CHECK_EMU_STATUS; if (is_stopped()) throw CPUThreadStop{}; - queue->cv.wait_for(lv2_lock, std::chrono::milliseconds(1)); + cv.wait(lv2_lock); + } } - if (queue->cancelled) + if (queue->events.empty()) { + if (Emu.GetIdManager().check_id(queue->id)) + { + throw EXCEPTION("Unexpected"); + } + ch_in_mbox.set_values(1, CELL_ECANCELED); } else { auto& event = queue->events.front(); - ch_in_mbox.set_values(4, CELL_OK, static_cast(event.data1), static_cast(event.data2), static_cast(event.data3)); + ch_in_mbox.set_values(4, CELL_OK, static_cast(std::get<1>(event)), static_cast(std::get<2>(event)), static_cast(std::get<3>(event))); queue->events.pop_front(); - queue->waiters--; - - if (queue->events.size()) - { - queue->cv.notify_one(); - } } // restore thread group status @@ -1371,9 +1373,9 @@ void SPUThread::stop_and_signal(u32 code) throw EXCEPTION("Unexpected SPU Thread Group state (%d)", group->state); } - for (auto& t : group->threads) + for (auto& thread : group->threads) { - if (t) t->awake(); // untrigger status check + if (thread) thread->awake(); // untrigger status check } group->cv.notify_all(); @@ -1408,11 +1410,11 @@ void SPUThread::stop_and_signal(u32 code) throw EXCEPTION("Invalid SPU Thread Group"); } - for (auto t : group->threads) + for (auto thread : group->threads) { - if (t && t.get() != this) + if (thread && thread.get() != this) { - t->stop(); + thread->stop(); } } diff --git a/rpcs3/Emu/SysCalls/lv2/sleep_queue.h b/rpcs3/Emu/SysCalls/lv2/sleep_queue.h index c26c61cb6b..59d6a68d2f 100644 --- a/rpcs3/Emu/SysCalls/lv2/sleep_queue.h +++ b/rpcs3/Emu/SysCalls/lv2/sleep_queue.h @@ -43,7 +43,7 @@ enum using sleep_queue_t = std::deque>; -static struct defer_sleep_t{} const defer_sleep; +static struct defer_sleep_t{} const defer_sleep{}; // automatic object handling a thread entry in the sleep queue class sleep_queue_entry_t final diff --git a/rpcs3/Emu/SysCalls/lv2/sys_event.cpp b/rpcs3/Emu/SysCalls/lv2/sys_event.cpp index 2aaa833447..0c59923a43 100644 --- a/rpcs3/Emu/SysCalls/lv2/sys_event.cpp +++ b/rpcs3/Emu/SysCalls/lv2/sys_event.cpp @@ -6,7 +6,6 @@ #include "Emu/Cell/PPUThread.h" #include "Emu/Event.h" -#include "sleep_queue.h" #include "sys_process.h" #include "sys_event.h" @@ -21,11 +20,25 @@ lv2_event_queue_t::lv2_event_queue_t(u32 protocol, s32 type, u64 name, u64 key, , name(name) , key(key) , size(size) - , cancelled(false) - , waiters(0) { } +void lv2_event_queue_t::push(lv2_lock_t& lv2_lock, u64 source, u64 data1, u64 data2, u64 data3) +{ + CHECK_LV2_LOCK(lv2_lock); + + events.emplace_back(source, data1, data2, data3); + + // notify waiter; protocol is ignored in current implementation + for (auto& waiter : sq) + { + if (waiter->signal()) + { + return; + } + } +} + s32 sys_event_queue_create(vm::ptr equeue_id, vm::ptr attr, u64 event_queue_key, s32 size) { sys_event.Warning("sys_event_queue_create(equeue_id=*0x%x, attr=*0x%x, event_queue_key=0x%llx, size=%d)", equeue_id, attr, event_queue_key, size); @@ -37,20 +50,18 @@ s32 sys_event_queue_create(vm::ptr equeue_id, vm::ptrprotocol; - switch (protocol) + if (protocol != SYS_SYNC_FIFO && protocol != SYS_SYNC_PRIORITY) { - case SYS_SYNC_FIFO: break; - case SYS_SYNC_PRIORITY: break; - default: sys_event.Error("sys_event_queue_create(): unknown protocol (0x%x)", protocol); return CELL_EINVAL; + sys_event.Error("sys_event_queue_create(): unknown protocol (0x%x)", protocol); + return CELL_EINVAL; } const u32 type = attr->type; - switch (type) + if (type != SYS_PPU_QUEUE && type != SYS_SPU_QUEUE) { - case SYS_PPU_QUEUE: break; - case SYS_SPU_QUEUE: break; - default: sys_event.Error("sys_event_queue_create(): unknown type (0x%x)", type); return CELL_EINVAL; + sys_event.Error("sys_event_queue_create(): unknown type (0x%x)", type); + return CELL_EINVAL; } const auto queue = Emu.GetEventManager().MakeEventQueue(event_queue_key, protocol, type, attr->name_u64, event_queue_key, size); @@ -83,24 +94,21 @@ s32 sys_event_queue_destroy(u32 equeue_id, s32 mode) return CELL_EINVAL; } - if (!mode && queue->waiters) + if (!mode && queue->sq.size()) { return CELL_EBUSY; } - if (queue->cancelled.exchange(true)) - { - throw EXCEPTION("Unexpected value"); - } - - if (queue->waiters) - { - queue->cv.notify_all(); - } - + // cleanup Emu.GetEventManager().UnregisterKey(queue->key); Emu.GetIdManager().remove(equeue_id); + // signal all threads to return CELL_ECANCELED + for (auto& thread : queue->sq) + { + thread->signal(); + } + return CELL_OK; } @@ -129,15 +137,11 @@ s32 sys_event_queue_tryreceive(u32 equeue_id, vm::ptr event_array, s32 count = 0; - while (!queue->waiters && count < size && queue->events.size()) + while (queue->sq.empty() && count < size && queue->events.size()) { auto& dest = event_array[count++]; - auto& event = queue->events.front(); - dest.source = event.source; - dest.data1 = event.data1; - dest.data2 = event.data2; - dest.data3 = event.data3; + std::tie(dest.source, dest.data1, dest.data2, dest.data3) = queue->events.front(); queue->events.pop_front(); } @@ -147,9 +151,9 @@ s32 sys_event_queue_tryreceive(u32 equeue_id, vm::ptr event_array, return CELL_OK; } -s32 sys_event_queue_receive(PPUThread& CPU, u32 equeue_id, vm::ptr dummy_event, u64 timeout) +s32 sys_event_queue_receive(PPUThread& ppu, u32 equeue_id, vm::ptr dummy_event, u64 timeout) { - sys_event.Log("sys_event_queue_receive(equeue_id=0x%x, event=*0x%x, timeout=0x%llx)", equeue_id, dummy_event, timeout); + sys_event.Log("sys_event_queue_receive(equeue_id=0x%x, *0x%x, timeout=0x%llx)", equeue_id, dummy_event, timeout); const u64 start_time = get_system_time(); @@ -167,36 +171,47 @@ s32 sys_event_queue_receive(PPUThread& CPU, u32 equeue_id, vm::ptr return CELL_EINVAL; } - // protocol is ignored in current implementation - queue->waiters++; - - while (queue->events.empty()) + if (queue->events.empty() || !queue->sq.empty()) { - CHECK_EMU_STATUS; + // add waiter; protocol is ignored in current implementation + sleep_queue_entry_t waiter(ppu, queue->sq); - if (queue->cancelled) + while (!ppu.unsignal()) { - return CELL_ECANCELED; - } + CHECK_EMU_STATUS; - if (timeout && get_system_time() - start_time > timeout) - { - queue->waiters--; - return CELL_ETIMEDOUT; - } + if (timeout) + { + const u64 passed = get_system_time() - start_time; - queue->cv.wait_for(lv2_lock, std::chrono::milliseconds(1)); + if (passed >= timeout) + { + return CELL_ETIMEDOUT; + } + + ppu.cv.wait_for(lv2_lock, std::chrono::microseconds(timeout - passed)); + } + else + { + ppu.cv.wait(lv2_lock); + } + } } - // event data is returned in registers (second arg is not used) - auto& event = queue->events.front(); - CPU.GPR[4] = event.source; - CPU.GPR[5] = event.data1; - CPU.GPR[6] = event.data2; - CPU.GPR[7] = event.data3; + if (queue->events.empty()) + { + if (Emu.GetIdManager().check_id(equeue_id)) + { + throw EXCEPTION("Unexpected"); + } + + return CELL_ECANCELED; + } + + // event data is returned in registers (dummy_event is not used) + std::tie(ppu.GPR[4], ppu.GPR[5], ppu.GPR[6], ppu.GPR[7]) = queue->events.front(); queue->events.pop_front(); - queue->waiters--; return CELL_OK; } @@ -225,7 +240,7 @@ s32 sys_event_port_create(vm::ptr eport_id, s32 port_type, u64 name) if (port_type != SYS_EVENT_PORT_LOCAL) { - sys_event.Error("sys_event_port_create(): invalid port_type (%d)", port_type); + sys_event.Error("sys_event_port_create(): unknown port type (%d)", port_type); return CELL_EINVAL; } diff --git a/rpcs3/Emu/SysCalls/lv2/sys_event.h b/rpcs3/Emu/SysCalls/lv2/sys_event.h index 96bc38e3ed..d8504b9c21 100644 --- a/rpcs3/Emu/SysCalls/lv2/sys_event.h +++ b/rpcs3/Emu/SysCalls/lv2/sys_event.h @@ -1,5 +1,7 @@ #pragma once +#include "sleep_queue.h" + namespace vm { using namespace ps3; } // Event Queue Type @@ -68,22 +70,6 @@ struct sys_event_t be_t data3; }; -struct lv2_event_t -{ - const u64 source; - const u64 data1; - const u64 data2; - const u64 data3; - - lv2_event_t(u64 source, u64 data1, u64 data2, u64 data3) - : source(source) - , data1(data1) - , data2(data2) - , data3(data3) - { - } -}; - struct lv2_event_queue_t { const u32 id; @@ -93,26 +79,14 @@ struct lv2_event_queue_t const u64 key; const s32 size; - std::deque events; - std::atomic cancelled; + // tuple elements: source, data1, data2, data3 + std::deque> events; - // TODO: use sleep queue, possibly remove condition variable - std::condition_variable cv; - std::atomic waiters; + sleep_queue_t sq; lv2_event_queue_t(u32 protocol, s32 type, u64 name, u64 key, s32 size); - void push(lv2_lock_t& lv2_lock, u64 source, u64 data1, u64 data2, u64 data3) - { - CHECK_LV2_LOCK(lv2_lock); - - events.emplace_back(source, data1, data2, data3); - - if (waiters) - { - cv.notify_one(); - } - } + void push(lv2_lock_t& lv2_lock, u64 source, u64 data1, u64 data2, u64 data3); }; REG_ID_TYPE(lv2_event_queue_t, 0x8D); // SYS_EVENT_QUEUE_OBJECT @@ -121,6 +95,7 @@ struct lv2_event_port_t { const s32 type; // port type, must be SYS_EVENT_PORT_LOCAL const u64 name; // passed as event source (generated from id and process id if not set) + std::weak_ptr queue; // event queue this port is connected to lv2_event_port_t(s32 type, u64 name) @@ -137,7 +112,7 @@ class PPUThread; // SysCalls s32 sys_event_queue_create(vm::ptr equeue_id, vm::ptr attr, u64 event_queue_key, s32 size); s32 sys_event_queue_destroy(u32 equeue_id, s32 mode); -s32 sys_event_queue_receive(PPUThread& CPU, u32 equeue_id, vm::ptr dummy_event, u64 timeout); +s32 sys_event_queue_receive(PPUThread& ppu, u32 equeue_id, vm::ptr dummy_event, u64 timeout); s32 sys_event_queue_tryreceive(u32 equeue_id, vm::ptr event_array, s32 size, vm::ptr number); s32 sys_event_queue_drain(u32 event_queue_id); diff --git a/rpcs3/Emu/SysCalls/lv2/sys_time.cpp b/rpcs3/Emu/SysCalls/lv2/sys_time.cpp index 3ed358e199..b9b4101cf8 100644 --- a/rpcs3/Emu/SysCalls/lv2/sys_time.cpp +++ b/rpcs3/Emu/SysCalls/lv2/sys_time.cpp @@ -55,7 +55,7 @@ u64 get_timebased_time() LARGE_INTEGER count; if (!QueryPerformanceCounter(&count)) { - throw EXCEPTION("System error 0x%x", GetLastError()); + throw EXCEPTION("Unexpected"); } const u64 time = count.QuadPart; @@ -82,7 +82,7 @@ u64 get_system_time() LARGE_INTEGER count; if (!QueryPerformanceCounter(&count)) { - throw EXCEPTION("System error 0x%x", GetLastError()); + throw EXCEPTION("Unexpected"); } const u64 time = count.QuadPart; @@ -122,7 +122,7 @@ s32 sys_time_get_current_time(vm::ptr sec, vm::ptr nsec) LARGE_INTEGER count; if (!QueryPerformanceCounter(&count)) { - throw EXCEPTION("System error 0x%x", GetLastError()); + throw EXCEPTION("Unexpected"); } // get time difference in nanoseconds