SPU/event queue: Atomically resume SPU group

This commit is contained in:
Eladash 2021-07-28 07:36:46 +03:00 committed by Ivan
parent 5816505e61
commit bf61c826d5
2 changed files with 98 additions and 66 deletions

View file

@ -4265,6 +4265,40 @@ bool spu_thread::set_ch_value(u32 ch, u32 value)
fmt::throw_exception("Unknown/illegal channel in WRCH (ch=%d [%s], value=0x%x)", ch, ch < 128 ? spu_ch_name[ch] : "???", value); fmt::throw_exception("Unknown/illegal channel in WRCH (ch=%d [%s], value=0x%x)", ch, ch < 128 ? spu_ch_name[ch] : "???", value);
} }
extern void resume_spu_thread_group_from_waiting(spu_thread& spu)
{
const auto group = spu.group;
std::lock_guard lock(group->mutex);
if (group->run_state == SPU_THREAD_GROUP_STATUS_WAITING)
{
group->run_state = SPU_THREAD_GROUP_STATUS_RUNNING;
}
else if (group->run_state == SPU_THREAD_GROUP_STATUS_WAITING_AND_SUSPENDED)
{
group->run_state = SPU_THREAD_GROUP_STATUS_SUSPENDED;
}
for (auto& thread : group->threads)
{
if (thread)
{
if (thread.get() == &spu)
{
constexpr auto flags = cpu_flag::suspend + cpu_flag::signal;
ensure(((thread->state ^= flags) & flags) == cpu_flag::signal);
}
else
{
thread->state -= cpu_flag::suspend;
}
thread->state.notify_one(cpu_flag::suspend + cpu_flag::signal);
}
}
}
bool spu_thread::stop_and_signal(u32 code) bool spu_thread::stop_and_signal(u32 code)
{ {
spu_log.trace("stop_and_signal(code=0x%x)", code); spu_log.trace("stop_and_signal(code=0x%x)", code);
@ -4293,6 +4327,23 @@ bool spu_thread::stop_and_signal(u32 code)
return true; return true;
} }
auto get_queue = [this](u32 spuq) -> const std::shared_ptr<lv2_event_queue>&
{
for (auto& v : this->spuq)
{
if (spuq == v.first)
{
if (lv2_obj::check(v.second))
{
return v.second;
}
}
}
static const std::shared_ptr<lv2_event_queue> empty;
return empty;
};
switch (code) switch (code)
{ {
case 0x001: case 0x001:
@ -4338,6 +4389,8 @@ bool spu_thread::stop_and_signal(u32 code)
spu_function_logger logger(*this, "sys_spu_thread_receive_event"); spu_function_logger logger(*this, "sys_spu_thread_receive_event");
std::shared_ptr<lv2_event_queue> queue;
while (true) while (true)
{ {
// Check group status, wait if necessary // Check group status, wait if necessary
@ -4355,7 +4408,15 @@ bool spu_thread::stop_and_signal(u32 code)
thread_ctrl::wait_on(state, old);; thread_ctrl::wait_on(state, old);;
} }
std::lock_guard lock(group->mutex); reader_lock{group->mutex}, queue = get_queue(spuq);
if (!queue)
{
return ch_in_mbox.set_values(1, CELL_EINVAL), true;
}
// Lock queue's mutex first, then group's mutex
std::scoped_lock lock(queue->mutex, group->mutex);
if (is_stopped()) if (is_stopped())
{ {
@ -4368,27 +4429,12 @@ bool spu_thread::stop_and_signal(u32 code)
continue; continue;
} }
lv2_event_queue* queue = nullptr; if (queue != get_queue(spuq))
for (auto& v : this->spuq)
{ {
if (spuq == v.first) // Try again
{ continue;
if (lv2_obj::check(v.second))
{
queue = v.second.get();
break;
}
}
} }
if (!queue)
{
return ch_in_mbox.set_values(1, CELL_EINVAL), true;
}
std::lock_guard qlock(queue->mutex);
if (!queue->exists) if (!queue->exists)
{ {
return ch_in_mbox.set_values(1, CELL_EINVAL), true; return ch_in_mbox.set_values(1, CELL_EINVAL), true;
@ -4440,30 +4486,6 @@ bool spu_thread::stop_and_signal(u32 code)
thread_ctrl::wait_on(state, old); thread_ctrl::wait_on(state, old);
} }
std::lock_guard lock(group->mutex);
if (group->run_state == SPU_THREAD_GROUP_STATUS_WAITING)
{
group->run_state = SPU_THREAD_GROUP_STATUS_RUNNING;
}
else if (group->run_state == SPU_THREAD_GROUP_STATUS_WAITING_AND_SUSPENDED)
{
group->run_state = SPU_THREAD_GROUP_STATUS_SUSPENDED;
}
for (auto& thread : group->threads)
{
if (thread)
{
thread->state -= cpu_flag::suspend;
if (thread.get() != this)
{
thread->state.notify_one(cpu_flag::suspend);
}
}
}
return true; return true;
} }
@ -4486,29 +4508,36 @@ bool spu_thread::stop_and_signal(u32 code)
spu_log.trace("sys_spu_thread_tryreceive_event(spuq=0x%x)", spuq); spu_log.trace("sys_spu_thread_tryreceive_event(spuq=0x%x)", spuq);
std::lock_guard lock(group->mutex); std::shared_ptr<lv2_event_queue> queue;
lv2_event_queue* queue = nullptr; reader_lock{group->mutex}, queue = get_queue(spuq);
for (auto& v : this->spuq) std::unique_lock<shared_mutex> qlock, group_lock;
while (true)
{ {
if (spuq == v.first) if (!queue)
{ {
if (lv2_obj::check(v.second)) return ch_in_mbox.set_values(1, CELL_EINVAL), true;
{ }
queue = v.second.get();
break; // Lock queue's mutex first, then group's mutex
} qlock = std::unique_lock{queue->mutex};
group_lock = std::unique_lock{group->mutex};
if (const auto& queue0 = get_queue(spuq); queue != queue0)
{
// Keep atleast one reference of the pointer so mutex unlock can work
const auto old_ref = std::exchange(queue, queue0);
group_lock.unlock();
qlock.unlock();
}
else
{
break;
} }
} }
if (!queue)
{
return ch_in_mbox.set_values(1, CELL_EINVAL), true;
}
std::lock_guard qlock(queue->mutex);
if (!queue->exists) if (!queue->exists)
{ {
return ch_in_mbox.set_values(1, CELL_EINVAL), true; return ch_in_mbox.set_values(1, CELL_EINVAL), true;

View file

@ -32,6 +32,8 @@ std::shared_ptr<lv2_event_queue> lv2_event_queue::find(u64 ipc_key)
return g_fxo->get<ipc_manager<lv2_event_queue, u64>>().get(ipc_key); return g_fxo->get<ipc_manager<lv2_event_queue, u64>>().get(ipc_key);
} }
extern void resume_spu_thread_group_from_waiting(spu_thread& spu);
CellError lv2_event_queue::send(lv2_event event) CellError lv2_event_queue::send(lv2_event event)
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
@ -74,9 +76,7 @@ CellError lv2_event_queue::send(lv2_event event)
const u32 data2 = static_cast<u32>(std::get<2>(event)); const u32 data2 = static_cast<u32>(std::get<2>(event));
const u32 data3 = static_cast<u32>(std::get<3>(event)); const u32 data3 = static_cast<u32>(std::get<3>(event));
spu.ch_in_mbox.set_values(4, CELL_OK, data1, data2, data3); spu.ch_in_mbox.set_values(4, CELL_OK, data1, data2, data3);
resume_spu_thread_group_from_waiting(spu);
spu.state += cpu_flag::signal;
spu.state.notify_one(cpu_flag::signal);
} }
return {}; return {};
@ -161,11 +161,15 @@ error_code sys_event_queue_destroy(ppu_thread& ppu, u32 equeue_id, s32 mode)
if (mode == SYS_EVENT_QUEUE_DESTROY_FORCE) if (mode == SYS_EVENT_QUEUE_DESTROY_FORCE)
{ {
std::deque<cpu_thread*> sq;
std::lock_guard lock(queue->mutex); std::lock_guard lock(queue->mutex);
sq = std::move(queue->sq);
if (queue->type == SYS_PPU_QUEUE) if (queue->type == SYS_PPU_QUEUE)
{ {
for (auto cpu : queue->sq) for (auto cpu : sq)
{ {
static_cast<ppu_thread&>(*cpu).gpr[3] = CELL_ECANCELED; static_cast<ppu_thread&>(*cpu).gpr[3] = CELL_ECANCELED;
queue->append(cpu); queue->append(cpu);
@ -178,11 +182,10 @@ error_code sys_event_queue_destroy(ppu_thread& ppu, u32 equeue_id, s32 mode)
} }
else else
{ {
for (auto cpu : queue->sq) for (auto cpu : sq)
{ {
static_cast<spu_thread&>(*cpu).ch_in_mbox.set_values(1, CELL_ECANCELED); static_cast<spu_thread&>(*cpu).ch_in_mbox.set_values(1, CELL_ECANCELED);
cpu->state += cpu_flag::signal; resume_spu_thread_group_from_waiting(static_cast<spu_thread&>(*cpu));
cpu->state.notify_one(cpu_flag::signal);
} }
} }
} }