event_queue_t, event_port_t rewritten

This commit is contained in:
Nekotekina 2015-03-04 07:42:04 +03:00
parent 78c37ff8b6
commit ad38e9f0fe
22 changed files with 386 additions and 584 deletions

View file

@ -10,6 +10,7 @@
#include "Emu/SysCalls/ErrorCodes.h"
#include "Emu/SysCalls/lv2/sys_spu.h"
#include "Emu/SysCalls/lv2/sys_event_flag.h"
#include "Emu/SysCalls/lv2/sys_event.h"
#include "Emu/SysCalls/lv2/sys_time.h"
#include "Emu/Cell/SPUDisAsm.h"
@ -589,24 +590,26 @@ void SPUThread::set_ch_value(u32 ch, u32 value)
LOG_NOTICE(SPU, "sys_spu_thread_send_event(spup=%d, data0=0x%x, data1=0x%x)", spup, value & 0x00ffffff, data);
}
std::shared_ptr<EventPort> port;// = SPUPs[spup];
LV2_LOCK;
std::lock_guard<std::mutex> lock(port->m_mutex);
std::shared_ptr<event_queue_t> queue = this->spup[spup].lock();
if (!port->eq)
if (!queue)
{
LOG_WARNING(SPU, "sys_spu_thread_send_event(spup=%d, data0=0x%x, data1=0x%x): event queue not connected", spup, (value & 0x00ffffff), data);
ch_in_mbox.push_uncond(CELL_ENOTCONN); // TODO: check error passing
return;
}
if (!port->eq->events.push(SYS_SPU_THREAD_EVENT_USER_KEY, GetId(), ((u64)spup << 32) | (value & 0x00ffffff), data))
if (queue->events.size() >= queue->size)
{
ch_in_mbox.push_uncond(CELL_EBUSY);
return;
}
queue->events.emplace_back(SYS_SPU_THREAD_EVENT_USER_KEY, GetId(), ((u64)spup << 32) | (value & 0x00ffffff), data);
ch_in_mbox.push_uncond(CELL_OK);
queue->cv.notify_one();
return;
}
else if (code < 128)
@ -627,23 +630,25 @@ void SPUThread::set_ch_value(u32 ch, u32 value)
LOG_WARNING(SPU, "sys_spu_thread_throw_event(spup=%d, data0=0x%x, data1=0x%x)", spup, value & 0x00ffffff, data);
}
std::shared_ptr<EventPort> port;// = SPUPs[spup];
LV2_LOCK;
std::lock_guard<std::mutex> lock(port->m_mutex);
std::shared_ptr<event_queue_t> queue = this->spup[spup].lock();
if (!port->eq)
if (!queue)
{
LOG_WARNING(SPU, "sys_spu_thread_throw_event(spup=%d, data0=0x%x, data1=0x%x): event queue not connected", spup, (value & 0x00ffffff), data);
return;
}
// TODO: check passing spup value
if (!port->eq->events.push(SYS_SPU_THREAD_EVENT_USER_KEY, GetId(), ((u64)spup << 32) | (value & 0x00ffffff), data))
if (queue->events.size() >= queue->size)
{
LOG_WARNING(SPU, "sys_spu_thread_throw_event(spup=%d, data0=0x%x, data1=0x%x) failed (queue is full)", spup, (value & 0x00ffffff), data);
return;
}
queue->events.emplace_back(SYS_SPU_THREAD_EVENT_USER_KEY, GetId(), ((u64)spup << 32) | (value & 0x00ffffff), data);
queue->cv.notify_one();
return;
}
else if (code == 128)
@ -954,74 +959,46 @@ void SPUThread::stop_and_signal(u32 code)
LOG_NOTICE(SPU, "sys_spu_thread_receive_event(spuq=0x%x)", spuq);
}
std::shared_ptr<EventQueue> eq;
//if (!SPUQs.GetEventQueue(FIX_SPUQ(spuq), eq))
//{
// ch_in_mbox.push_uncond(CELL_EINVAL); // TODO: check error value
// return;
//}
LV2_LOCK;
u32 tid = GetId();
eq->sq.push(tid, eq->protocol); // add thread to sleep queue
while (true)
auto found = this->spuq.find(spuq);
if (found == this->spuq.end())
{
u32 old_owner = eq->owner.compare_and_swap(0, tid);
ch_in_mbox.push_uncond(CELL_EINVAL); // TODO: check error value
return;
}
std::shared_ptr<event_queue_t> queue = found->second;
switch (s32 res = old_owner ? (old_owner == tid ? 1 : 2) : 0)
{
case 0:
{
const u32 next = eq->events.count() ? eq->sq.signal(eq->protocol) : 0;
if (next != tid)
{
if (!eq->owner.compare_and_swap_test(tid, next))
{
assert(!"sys_spu_thread_receive_event() failed (I)");
}
break;
}
// fallthrough
}
case 1:
{
sys_event_data event;
eq->events.pop(event);
if (!eq->owner.compare_and_swap_test(tid, 0))
{
assert(!"sys_spu_thread_receive_event() failed (II)");
}
ch_in_mbox.push_uncond(CELL_OK);
ch_in_mbox.push_uncond((u32)event.data1);
ch_in_mbox.push_uncond((u32)event.data2);
ch_in_mbox.push_uncond((u32)event.data3);
if (!eq->sq.invalidate(tid, eq->protocol) && !eq->sq.pop(tid, eq->protocol))
{
assert(!"sys_spu_thread_receive_event() failed (receiving)");
}
return;
}
}
// protocol is ignored in current implementation
queue->waiters++;
if (!~old_owner)
while (queue->events.empty())
{
if (queue->waiters < 0)
{
if (!eq->sq.invalidate(tid, eq->protocol))
{
assert(!"sys_spu_thread_receive_event() failed (cancelling)");
}
ch_in_mbox.push_uncond(CELL_ECANCELED);
queue->waiters--;
return;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1)); // hack
if (Emu.IsStopped())
{
LOG_WARNING(SPU, "sys_spu_thread_receive_event(spuq=0x%x) aborted", spuq);
return;
}
queue->cv.wait_for(lv2_lock, std::chrono::milliseconds(1));
}
auto& event = queue->events.front();
ch_in_mbox.push_uncond(CELL_OK);
ch_in_mbox.push_uncond((u32)event.data1);
ch_in_mbox.push_uncond((u32)event.data2);
ch_in_mbox.push_uncond((u32)event.data3);
queue->events.pop_front();
queue->waiters--;
return;
}