Rewrite cpu_thread::suspend_all

Now it's a function of higher order.
Make only one thread do the hard work of thread pausing.
This commit is contained in:
Nekotekina 2020-10-09 20:33:12 +03:00
parent 6d83c9cc0e
commit 050c3e1d6b
10 changed files with 299 additions and 415 deletions

View file

@ -15,6 +15,7 @@
DECLARE(cpu_thread::g_threads_created){0};
DECLARE(cpu_thread::g_threads_deleted){0};
DECLARE(cpu_thread::g_suspend_counter){0};
LOG_CHANNEL(profiler);
LOG_CHANNEL(sys_log, "SYS");
@ -245,6 +246,9 @@ struct cpu_counter
// For synchronizing suspend_all operation
alignas(64) shared_mutex cpu_suspend_lock;
// Workload linked list
alignas(64) atomic_t<cpu_thread::suspend_work*> cpu_suspend_work{};
// Semaphore for global thread array (global counter)
alignas(64) atomic_t<u32> cpu_array_sema{0};
@ -306,7 +310,7 @@ struct cpu_counter
};
template <typename F>
void for_all_cpu(F&& func) noexcept
void for_all_cpu(F func) noexcept
{
auto ctr = g_fxo->get<cpu_counter>();
@ -475,6 +479,7 @@ bool cpu_thread::check_state() noexcept
bool cpu_sleep_called = false;
bool escape, retval;
u64 susp_ctr = -1;
while (true)
{
@ -483,6 +488,16 @@ bool cpu_thread::check_state() noexcept
{
bool store = false;
// Easy way obtain suspend counter
if (flags & cpu_flag::pause && !(flags & cpu_flag::wait))
{
susp_ctr = g_suspend_counter;
}
else
{
susp_ctr = -1;
}
if (flags & cpu_flag::signal)
{
flags -= cpu_flag::signal;
@ -559,8 +574,22 @@ bool cpu_thread::check_state() noexcept
continue;
}
// If only cpu_flag::pause was set, notification won't arrive
g_fxo->get<cpu_counter>()->cpu_suspend_lock.lock_unlock();
// If only cpu_flag::pause was set, wait on suspend counter instead
if (state0 & cpu_flag::pause)
{
// Hard way
if (susp_ctr == umax)
{
g_fxo->get<cpu_counter>()->cpu_suspend_lock.lock_unlock();
continue;
}
// Wait for current suspend_all operation
while (busy_wait(), g_suspend_counter == susp_ctr)
{
g_suspend_counter.wait(susp_ctr);
}
}
}
}
}
@ -641,69 +670,114 @@ std::string cpu_thread::dump_misc() const
return fmt::format("Type: %s\n" "State: %s\n", typeid(*this).name(), state.load());
}
cpu_thread::suspend_all::suspend_all(cpu_thread* _this) noexcept
: m_this(_this)
void cpu_thread::suspend_work::push(cpu_thread* _this) noexcept
{
if (m_this)
{
m_this->state += cpu_flag::wait;
}
// Can't allow pre-set wait bit (it'd be a problem)
verify(HERE), !_this || !(_this->state & cpu_flag::wait);
g_fxo->get<cpu_counter>()->cpu_suspend_lock.lock_vip();
// Value must be reliable because cpu_flag::wait hasn't been observed only (but not if pause is set)
const u64 susp_ctr = g_suspend_counter;
for_all_cpu([](cpu_thread* cpu)
// Try to push workload
auto& queue = g_fxo->get<cpu_counter>()->cpu_suspend_work;
do
{
// Should be atomic
if (!(cpu->state & cpu_flag::pause))
// Load current head
next = queue.load();
if (!_this && next)
{
cpu->state += cpu_flag::pause;
// If _this == nullptr, it only works if this is the first workload pushed
g_fxo->get<cpu_counter>()->cpu_suspend_lock.lock_unlock();
continue;
}
});
}
while (!queue.compare_and_swap_test(next, this));
busy_wait(500);
while (true)
if (!next)
{
bool ok = true;
// First thread to push the work to the workload list pauses all threads and processes it
std::lock_guard lock(g_fxo->get<cpu_counter>()->cpu_suspend_lock);
for_all_cpu([&](cpu_thread* cpu)
{
if (!(cpu->state & cpu_flag::wait))
if (!(cpu->state & cpu_flag::pause) && cpu != _this)
{
ok = false;
cpu->state += cpu_flag::pause;
}
});
if (ok) [[likely]]
busy_wait(500);
while (true)
{
break;
bool ok = true;
for_all_cpu([&](cpu_thread* cpu)
{
if (!(cpu->state & cpu_flag::wait) && cpu != _this)
{
ok = false;
}
});
if (ok) [[likely]]
{
break;
}
}
busy_wait(500);
}
}
// Extract queue and reverse element order (FILO to FIFO) (TODO: maybe leave order as is?)
auto* head = queue.exchange(nullptr);
if (auto* prev = head->next)
{
head->next = nullptr;
do
{
auto* pre2 = prev->next;
prev->next = head;
head = std::exchange(prev, pre2);
}
while (prev);
}
// Execute all stored workload
for (; head; head = head->next)
{
head->exec(head->func_ptr, head->res_buf);
}
// Finalization
g_suspend_counter++;
cpu_thread::suspend_all::~suspend_all()
{
// Make sure the latest thread does the cleanup and notifies others
if (g_fxo->get<cpu_counter>()->cpu_suspend_lock.downgrade_unique_vip_lock_to_low_or_unlock())
{
for_all_cpu([&](cpu_thread* cpu)
{
cpu->state -= cpu_flag::pause;
if (cpu != _this)
{
cpu->state -= cpu_flag::pause;
}
});
g_fxo->get<cpu_counter>()->cpu_suspend_lock.unlock_low();
}
else
{
g_fxo->get<cpu_counter>()->cpu_suspend_lock.lock_unlock();
// Seems safe to set pause on self because wait flag hasn't been observed yet
_this->state += cpu_flag::pause + cpu_flag::wait;
// Subscribe for notification broadcast
while (busy_wait(), g_suspend_counter == susp_ctr)
{
g_suspend_counter.wait(susp_ctr);
}
_this->check_state();
return;
}
if (m_this)
{
m_this->check_state();
}
g_suspend_counter.notify_all();
}
void cpu_thread::stop_all() noexcept
@ -716,7 +790,7 @@ void cpu_thread::stop_all() noexcept
}
else
{
::vip_lock lock(g_fxo->get<cpu_counter>()->cpu_suspend_lock);
std::lock_guard lock(g_fxo->get<cpu_counter>()->cpu_suspend_lock);
for_all_cpu([](cpu_thread* cpu)
{