Thread.cpp refinement

Hide thread mutex
Safe notify() method
Other refactoring
This commit is contained in:
Nekotekina 2016-09-07 01:38:52 +03:00
parent da878c36bd
commit a5a2d43d7c
35 changed files with 532 additions and 591 deletions

View file

@ -8,6 +8,7 @@
#ifdef _WIN32
#include <Windows.h>
#include <Psapi.h>
#include <process.h>
#else
#ifdef __APPLE__
#define _XOPEN_SOURCE
@ -16,8 +17,16 @@
#include <errno.h>
#include <signal.h>
#include <ucontext.h>
#include <pthread.h>
#include <sys/time.h>
#include <sys/resource.h>
#endif
#include "sync.h"
thread_local u64 g_tls_fault_rsx = 0;
thread_local u64 g_tls_fault_spu = 0;
static void report_fatal_error(const std::string& msg)
{
std::string _msg = msg + "\n"
@ -1009,6 +1018,7 @@ bool handle_access_violation(u32 addr, bool is_writing, x64_context* context)
{
if (rsx::g_access_violation_handler && rsx::g_access_violation_handler(addr, is_writing))
{
g_tls_fault_rsx++;
return true;
}
@ -1139,6 +1149,7 @@ bool handle_access_violation(u32 addr, bool is_writing, x64_context* context)
// skip processed instruction
RIP(context) += i_size;
g_tls_fault_spu++;
return true;
}
@ -1878,103 +1889,65 @@ const bool s_self_test = []() -> bool
return true;
}();
#include <thread>
#include <mutex>
#include <condition_variable>
#include <exception>
#include <chrono>
thread_local DECLARE(thread_ctrl::g_tls_this_thread) = nullptr;
struct thread_ctrl::internal
{
std::mutex mutex;
std::condition_variable cond;
std::condition_variable jcv; // Allows simultaneous joining
std::condition_variable icv;
task_stack atexit;
std::exception_ptr exception; // Stored exception
std::chrono::high_resolution_clock::time_point time_limit;
#ifdef _WIN32
DWORD thread_id = 0;
#endif
x64_context _context{};
x64_context* const thread_ctx = &this->_context;
atomic_t<void(*)()> interrupt{}; // Interrupt function
};
thread_local thread_ctrl::internal* g_tls_internal = nullptr;
extern std::condition_variable& get_current_thread_cv()
{
return g_tls_internal->cond;
}
// TODO
extern atomic_t<u32> g_thread_count(0);
thread_local DECLARE(thread_ctrl::g_tls_this_thread) = nullptr;
extern thread_local std::string(*g_tls_log_prefix)();
void thread_ctrl::start(const std::shared_ptr<thread_ctrl>& ctrl, task_stack task)
{
reinterpret_cast<std::thread&>(ctrl->m_thread) = std::thread([ctrl, task = std::move(task)]
#ifdef _WIN32
using thread_result = uint;
using thread_type = thread_result(__stdcall*)(void* arg);
#else
using thread_result = void*;
using thread_type = thread_result(*)(void* arg);
#endif
// Thread entry point
const thread_type entry = [](void* arg) -> thread_result
{
// Recover shared_ptr from short-circuited thread_ctrl object pointer
const std::shared_ptr<thread_ctrl> ctrl = static_cast<thread_ctrl*>(arg)->m_self;
try
{
ctrl->initialize();
task.exec();
task_stack{std::move(ctrl->m_task)}.invoke();
}
catch (...)
{
ctrl->m_data->exception = std::current_exception();
// Capture exception
ctrl->finalize(std::current_exception());
return 0;
}
ctrl->finalize();
});
}
ctrl->finalize(nullptr);
return 0;
};
void thread_ctrl::wait_start(u64 timeout)
{
m_data->time_limit = std::chrono::high_resolution_clock::now() + std::chrono::microseconds(timeout);
}
ctrl->m_self = ctrl;
ctrl->m_task = std::move(task);
bool thread_ctrl::wait_wait(u64 timeout)
{
std::unique_lock<std::mutex> lock(m_data->mutex, std::adopt_lock);
// TODO: implement simple thread pool
#ifdef _WIN32
std::uintptr_t thread = _beginthreadex(nullptr, 0, entry, ctrl.get(), 0, nullptr);
verify("thread_ctrl::start" HERE), thread != 0;
#else
pthread_t thread;
verify("thread_ctrl::start" HERE), pthread_create(&thread, nullptr, entry, ctrl.get());
#endif
if (timeout && m_data->cond.wait_until(lock, m_data->time_limit) == std::cv_status::timeout)
{
lock.release();
return false;
}
m_data->cond.wait(lock);
lock.release();
return true;
}
void thread_ctrl::test()
{
if (m_data && m_data->exception)
{
std::rethrow_exception(m_data->exception);
}
// TODO: this is unsafe and must be duplicated in thread_ctrl::initialize
ctrl->m_thread = thread;
}
void thread_ctrl::initialize()
{
// Initialize TLS variable
g_tls_this_thread = this;
g_tls_internal = this->m_data;
#ifdef _WIN32
m_data->thread_id = GetCurrentThreadId();
#endif
g_tls_log_prefix = []
{
@ -1983,8 +1956,7 @@ void thread_ctrl::initialize()
++g_thread_count;
#if defined(_MSC_VER)
#ifdef _MSC_VER
struct THREADNAME_INFO
{
DWORD dwType;
@ -2010,11 +1982,10 @@ void thread_ctrl::initialize()
{
}
}
#endif
}
void thread_ctrl::finalize() noexcept
void thread_ctrl::finalize(std::exception_ptr eptr) noexcept
{
// Disable and discard possible interrupts
interrupt_disable();
@ -2023,135 +1994,213 @@ void thread_ctrl::finalize() noexcept
// TODO
vm::reservation_free();
// Call atexit functions
if (m_data) m_data->atexit.exec();
// Run atexit functions
m_task.invoke();
m_task.reset();
#ifdef _WIN32
ULONG64 cycles{};
QueryThreadCycleTime(GetCurrentThread(), &cycles);
FILETIME ctime, etime, ktime, utime;
GetThreadTimes(GetCurrentThread(), &ctime, &etime, &ktime, &utime);
const u64 time = ((ktime.dwLowDateTime | (u64)ktime.dwHighDateTime << 32) + (utime.dwLowDateTime | (u64)utime.dwHighDateTime << 32)) * 100ull;
#elif __linux__
const u64 cycles = 0; // Not supported
struct ::rusage stats{};
::getrusage(RUSAGE_THREAD, &stats);
const u64 time = (stats.ru_utime.tv_sec + stats.ru_stime.tv_sec) * 1000000000ull + (stats.ru_utime.tv_usec + stats.ru_stime.tv_usec) * 1000ull;
#else
const u64 cycles = 0;
const u64 time = 0;
#endif
LOG_NOTICE(GENERAL, "Thread time: %fs (%fGc); Faults: %u [rsx:%u, spu:%u];",
time / 1000000000.,
cycles / 1000000000.,
vm::g_tls_fault_count,
g_tls_fault_rsx,
g_tls_fault_spu);
--g_thread_count;
#ifdef _WIN32
ULONG64 time;
QueryThreadCycleTime(GetCurrentThread(), &time);
LOG_NOTICE(GENERAL, "Thread time: %f Gc", time / 1000000000.);
#endif
// Untangle circular reference, set exception
semaphore_lock{m_mutex}, m_self.reset(), m_exception = eptr;
// Signal joining waiters
m_jcv.notify_all();
}
void thread_ctrl::push_atexit(task_stack task)
void thread_ctrl::_push(task_stack task)
{
m_data->atexit.push(std::move(task));
g_tls_this_thread->m_task.push(std::move(task));
}
bool thread_ctrl::_wait_for(u64 usec)
{
auto _this = g_tls_this_thread;
struct half_lock
{
semaphore<>& ref;
void lock()
{
// Used to avoid additional lock + unlock
}
void unlock()
{
ref.post();
}
}
_lock{_this->m_mutex};
if (u32 sig = _this->m_signal.load())
{
thread_ctrl::test();
if (sig & 1)
{
_this->m_signal &= ~1;
return true;
}
}
_this->m_mutex.wait();
while (_this->m_cond.wait(_lock, usec))
{
if (u32 sig = _this->m_signal.load())
{
thread_ctrl::test();
if (sig & 1)
{
_this->m_signal &= ~1;
return true;
}
}
if (usec != -1)
{
return false;
}
_this->m_mutex.wait();
if (u32 sig = _this->m_signal.load())
{
if (sig & 2 && _this->m_exception)
{
_this->_throw();
}
if (sig & 1)
{
_this->m_signal &= ~1;
_this->m_mutex.post();
return true;
}
}
}
// Timeout
return false;
}
[[noreturn]] void thread_ctrl::_throw()
{
std::exception_ptr ex = std::exchange(m_exception, std::exception_ptr{});
m_signal &= ~3;
m_mutex.post();
std::rethrow_exception(std::move(ex));
}
void thread_ctrl::_notify(cond_variable thread_ctrl::* ptr)
{
// Optimized lock + unlock
if (!m_mutex.get())
{
m_mutex.wait();
m_mutex.post();
}
(this->*ptr).notify_one();
}
thread_ctrl::thread_ctrl(std::string&& name)
: m_name(std::move(name))
{
static_assert(sizeof(std::thread) <= sizeof(m_thread), "Small storage");
#pragma push_macro("new")
#undef new
new (&m_thread) std::thread;
#pragma pop_macro("new")
initialize_once();
}
thread_ctrl::~thread_ctrl()
{
if (reinterpret_cast<std::thread&>(m_thread).joinable())
if (m_thread)
{
reinterpret_cast<std::thread&>(m_thread).detach();
#ifdef _WIN32
CloseHandle((HANDLE)m_thread.raw());
#else
pthread_detach(m_thread.raw());
#endif
}
delete m_data;
reinterpret_cast<std::thread&>(m_thread).~thread();
}
void thread_ctrl::initialize_once()
std::exception_ptr thread_ctrl::get_exception() const
{
if (UNLIKELY(!m_data))
{
auto ptr = new thread_ctrl::internal;
semaphore_lock lock(m_mutex);
return m_exception;
}
if (!m_data.compare_and_swap_test(nullptr, ptr))
{
delete ptr;
}
void thread_ctrl::set_exception(std::exception_ptr ptr)
{
semaphore_lock lock(m_mutex);
m_exception = ptr;
if (m_exception)
{
m_signal |= 2;
m_cond.notify_one();
}
else
{
m_signal &= ~2;
}
}
void thread_ctrl::join()
{
// Increase contention counter
const u32 _j = m_joining++;
#ifdef _WIN32
//verify("thread_ctrl::join" HERE), WaitForSingleObjectEx((HANDLE)m_thread.load(), -1, false) == WAIT_OBJECT_0;
#endif
if (LIKELY(_j >= 0x80000000))
{
// Already joined (signal condition)
m_joining = 0x80000000;
}
else if (LIKELY(_j == 0))
{
// Winner joins the thread
reinterpret_cast<std::thread&>(m_thread).join();
semaphore_lock lock(m_mutex);
// Notify others if necessary
if (UNLIKELY(m_joining.exchange(0x80000000) != 1))
{
// Serialize for reliable notification
m_data->mutex.lock();
m_data->mutex.unlock();
m_data->jcv.notify_all();
}
}
else
while (m_self)
{
// Hard way
std::unique_lock<std::mutex> lock(m_data->mutex);
m_data->jcv.wait(lock, [&] { return m_joining >= 0x80000000; });
m_jcv.wait(lock);
}
if (UNLIKELY(m_data && m_data->exception && !std::uncaught_exception()))
if (UNLIKELY(m_exception && !std::uncaught_exception()))
{
std::rethrow_exception(m_data->exception);
std::rethrow_exception(m_exception);
}
}
void thread_ctrl::lock()
{
m_data->mutex.lock();
}
void thread_ctrl::unlock()
{
m_data->mutex.unlock();
}
void thread_ctrl::lock_notify()
{
if (UNLIKELY(g_tls_this_thread == this))
{
return;
}
// Serialize for reliable notification, condition is assumed to be changed externally
m_data->mutex.lock();
m_data->mutex.unlock();
m_data->cond.notify_one();
}
void thread_ctrl::notify()
{
m_data->cond.notify_one();
if (!(m_signal & 1))
{
m_signal |= 1;
_notify(&thread_ctrl::m_cond);
}
}
void thread_ctrl::set_exception(std::exception_ptr e)
{
m_data->exception = e;
}
static thread_local x64_context s_tls_context{};
static void _handle_interrupt(x64_context* ctx)
{
// Copy context for further use (TODO: is it safe on all platforms?)
g_tls_internal->_context = *ctx;
s_tls_context = *ctx;
thread_ctrl::handle_interrupt();
}
@ -2166,7 +2215,7 @@ static thread_local void(*s_tls_handler)() = nullptr;
s_tls_handler();
// Restore context in the case of return
const auto ctx = g_tls_internal->thread_ctx;
const auto ctx = &s_tls_context;
if (s_tls_ret_pos)
{
@ -2188,26 +2237,22 @@ static thread_local void(*s_tls_handler)() = nullptr;
void thread_ctrl::handle_interrupt()
{
const auto _this = g_tls_this_thread;
const auto ctx = g_tls_internal->thread_ctx;
const auto ctx = &s_tls_context;
if (_this->m_guard & 0x80000000)
{
// Discard interrupt if interrupts are disabled
if (g_tls_internal->interrupt.exchange(nullptr))
if (_this->m_iptr.exchange(nullptr))
{
_this->lock();
_this->unlock();
g_tls_internal->icv.notify_one();
_this->_notify(&thread_ctrl::m_icv);
}
}
else if (_this->m_guard == 0)
{
// Set interrupt immediately if no guard set
if (const auto handler = g_tls_internal->interrupt.exchange(nullptr))
if (const auto handler = _this->m_iptr.exchange(nullptr))
{
_this->lock();
_this->unlock();
g_tls_internal->icv.notify_one();
_this->_notify(&thread_ctrl::m_icv);
#ifdef _WIN32
// Install function call
@ -2234,13 +2279,15 @@ void thread_ctrl::handle_interrupt()
void thread_ctrl::interrupt(void(*handler)())
{
semaphore_lock lock(m_mutex);
verify(HERE), this != g_tls_this_thread; // TODO: self-interrupt
verify(HERE), m_data->interrupt.compare_and_swap_test(nullptr, handler); // TODO: multiple interrupts
verify(HERE), m_iptr.compare_and_swap_test(nullptr, handler); // TODO: multiple interrupts
#ifdef _WIN32
const auto ctx = m_data->thread_ctx;
const auto ctx = &s_tls_context;
const HANDLE nt = OpenThread(THREAD_ALL_ACCESS, FALSE, m_data->thread_id);
const HANDLE nt = (HANDLE)m_thread.load();//OpenThread(THREAD_ALL_ACCESS, FALSE, m_data->thread_id);
verify(HERE), nt;
verify(HERE), SuspendThread(nt) != -1;
@ -2254,28 +2301,24 @@ void thread_ctrl::interrupt(void(*handler)())
RIP(ctx) = _rip;
verify(HERE), ResumeThread(nt) != -1;
CloseHandle(nt);
//CloseHandle(nt);
#else
pthread_kill(reinterpret_cast<std::thread&>(m_thread).native_handle(), SIGUSR1);
pthread_kill(m_thread.load(), SIGUSR1);
#endif
std::unique_lock<std::mutex> lock(m_data->mutex, std::adopt_lock);
while (m_data->interrupt)
while (m_iptr)
{
m_data->icv.wait(lock);
m_icv.wait(lock);
}
lock.release();
}
void thread_ctrl::test_interrupt()
{
if (m_guard & 0x80000000)
{
if (m_data->interrupt.exchange(nullptr))
if (m_iptr.exchange(nullptr))
{
lock(), unlock(), m_data->icv.notify_one();
_notify(&thread_ctrl::m_icv);
}
return;
@ -2286,18 +2329,30 @@ void thread_ctrl::test_interrupt()
m_guard = 0;
// Execute delayed interrupt handler
if (const auto handler = m_data->interrupt.exchange(nullptr))
if (const auto handler = m_iptr.exchange(nullptr))
{
lock(), unlock(), m_data->icv.notify_one();
_notify(&thread_ctrl::m_icv);
return handler();
}
}
}
void thread_ctrl::sleep(u64 useconds)
void thread_ctrl::test()
{
std::this_thread::sleep_for(std::chrono::microseconds(useconds));
const auto _this = g_tls_this_thread;
if (_this->m_signal & 2)
{
_this->m_mutex.wait();
if (_this->m_exception)
{
_this->_throw();
}
_this->m_mutex.post();
}
}
@ -2341,3 +2396,7 @@ void named_thread::start_thread(const std::shared_ptr<void>& _this)
on_exit();
});
}
task_stack::task_base::~task_base()
{
}

View file

@ -7,6 +7,9 @@
#include <string>
#include <memory>
#include "sema.h"
#include "cond.h"
// Will report exception and call std::abort() if put in catch(...)
[[noreturn]] void catch_all_exceptions();
@ -17,19 +20,19 @@ class task_stack
{
std::unique_ptr<task_base> next;
virtual ~task_base() = default;
virtual ~task_base();
virtual void exec()
virtual void invoke()
{
if (next)
{
next->exec();
next->invoke();
}
}
};
template<typename F>
struct task_type : task_base
template <typename F>
struct task_type final : task_base
{
std::remove_reference_t<F> func;
@ -38,10 +41,10 @@ class task_stack
{
}
void exec() override
void invoke() final override
{
func();
task_base::exec();
task_base::invoke();
}
};
@ -50,7 +53,7 @@ class task_stack
public:
task_stack() = default;
template<typename F>
template <typename F>
task_stack(F&& func)
: m_stack(new task_type<F>(std::forward<F>(func)))
{
@ -70,11 +73,11 @@ public:
m_stack.reset();
}
void exec() const
void invoke() const
{
if (m_stack)
{
m_stack->exec();
m_stack->invoke();
}
}
};
@ -82,23 +85,41 @@ public:
// Thread control class
class thread_ctrl final
{
public: // TODO
struct internal;
private:
// Current thread
static thread_local thread_ctrl* g_tls_this_thread;
// Thread handle storage
std::aligned_storage_t<16> m_thread;
// Self pointer
std::shared_ptr<thread_ctrl> m_self;
// Thread join contention counter
atomic_t<u32> m_joining{};
// Thread handle (platform-specific)
atomic_t<std::uintptr_t> m_thread{0};
// Thread mutex
mutable semaphore<> m_mutex;
// Thread condition variable
cond_variable m_cond;
// Thread flags
atomic_t<u32> m_signal{0};
// Thread joining condition variable
cond_variable m_jcv;
// Remotely set or caught exception
std::exception_ptr m_exception;
// Thread initial task or atexit task
task_stack m_task;
// Thread interrupt guard counter
volatile u32 m_guard = 0x80000000;
// Thread internals
atomic_t<internal*> m_data{};
// Thread interrupt condition variable
cond_variable m_icv;
// Interrupt function
atomic_t<void(*)()> m_iptr{nullptr};
// Fixed name
std::string m_name;
@ -110,19 +131,19 @@ private:
void initialize();
// Called at the thread end
void finalize() noexcept;
void finalize(std::exception_ptr) noexcept;
// Get atexit function
void push_atexit(task_stack);
// Add task (atexit)
static void _push(task_stack);
// Start waiting
void wait_start(u64 timeout);
// Internal waiting function, may throw. Infinite value is -1.
static bool _wait_for(u64 usec);
// Proceed waiting
bool wait_wait(u64 timeout);
// Internal throwing function. Mutex must be locked and will be unlocked.
[[noreturn]] void _throw();
// Check exception
void test();
// Internal notification function
void _notify(cond_variable thread_ctrl::*);
public:
thread_ctrl(std::string&& name);
@ -137,63 +158,22 @@ public:
return m_name;
}
// Initialize internal data
void initialize_once();
// Get exception
std::exception_ptr get_exception() const;
// Set exception
void set_exception(std::exception_ptr ptr);
// Get thread result (may throw, simultaneous joining allowed)
void join();
// Lock thread mutex
void lock();
// Lock conditionally (double-checked)
template<typename F>
bool lock_if(F&& pred)
{
if (pred())
{
lock();
try
{
if (LIKELY(pred()))
{
return true;
}
else
{
unlock();
return false;
}
}
catch (...)
{
unlock();
throw;
}
}
else
{
return false;
}
}
// Unlock thread mutex (internal data must be initialized)
void unlock();
// Lock, unlock, notify the thread (required if the condition changed locklessly)
void lock_notify();
// Notify the thread (internal data must be initialized)
// Notify the thread
void notify();
// Set exception (internal data must be initialized, thread mutex must be locked)
void set_exception(std::exception_ptr);
// Internal
static void handle_interrupt();
// Interrupt thread with specified handler call (thread mutex must be locked)
// Interrupt thread with specified handler call
void interrupt(void(*handler)());
// Interrupt guard recursive enter
@ -226,90 +206,45 @@ public:
// Check interrupt if delayed by guard scope
void test_interrupt();
// Current thread sleeps for specified amount of microseconds.
// Wrapper for std::this_thread::sleep, doesn't require valid thread_ctrl.
[[deprecated]] static void sleep(u64 useconds);
// Wait until pred(). Abortable, may throw. Thread must be locked.
// Timeout in microseconds (zero means infinite).
template<typename F>
static inline auto wait_for(u64 useconds, F&& pred)
// Wait once with timeout. Abortable, may throw. May spuriously return false.
static inline bool wait_for(u64 usec)
{
if (useconds)
{
g_tls_this_thread->wait_start(useconds);
}
while (true)
{
g_tls_this_thread->test();
if (auto&& result = pred())
{
return result;
}
else if (!g_tls_this_thread->wait_wait(useconds) && useconds)
{
return result;
}
}
return _wait_for(usec);
}
// Wait once. Abortable, may throw. Thread must be locked.
// Timeout in microseconds (zero means infinite).
static inline bool wait_for(u64 useconds = 0)
{
if (useconds)
{
g_tls_this_thread->wait_start(useconds);
}
g_tls_this_thread->test();
if (!g_tls_this_thread->wait_wait(useconds) && useconds)
{
return false;
}
g_tls_this_thread->test();
return true;
}
// Wait until pred(). Abortable, may throw. Thread must be locked.
template<typename F>
static inline auto wait(F&& pred)
{
while (true)
{
g_tls_this_thread->test();
if (auto&& result = pred())
{
return result;
}
g_tls_this_thread->wait_wait(0);
}
}
// Wait once. Abortable, may throw. Thread must be locked.
// Wait. Abortable, may throw.
static inline void wait()
{
g_tls_this_thread->test();
g_tls_this_thread->wait_wait(0);
g_tls_this_thread->test();
_wait_for(-1);
}
// Wait eternally. Abortable, may throw. Thread must be locked.
// Wait until pred(). Abortable, may throw.
template<typename F, typename RT = std::result_of_t<F()>>
static inline RT wait(F&& pred)
{
while (true)
{
if (RT result = pred())
{
return result;
}
_wait_for(-1);
}
}
// Wait eternally until aborted.
[[noreturn]] static inline void eternalize()
{
while (true)
{
g_tls_this_thread->test();
g_tls_this_thread->wait_wait(0);
_wait_for(-1);
}
}
// Test exception (may throw).
static void test();
// Get current thread (may be nullptr)
static thread_ctrl* get_current()
{
@ -320,14 +255,14 @@ public:
template<typename F>
static inline void atexit(F&& func)
{
return g_tls_this_thread->push_atexit(std::forward<F>(func));
_push(std::forward<F>(func));
}
// Named thread factory
// Create detached named thread
template<typename N, typename F>
static inline void spawn(N&& name, F&& func)
{
auto&& out = std::make_shared<thread_ctrl>(std::forward<N>(name));
auto out = std::make_shared<thread_ctrl>(std::forward<N>(name));
thread_ctrl::start(out, std::forward<F>(func));
}
@ -382,7 +317,7 @@ public:
}
// Access thread_ctrl
thread_ctrl* operator->() const
thread_ctrl* get() const
{
return m_thread.get();
}
@ -392,60 +327,12 @@ public:
return m_thread->join();
}
void lock() const
{
return m_thread->lock();
}
void unlock() const
{
return m_thread->unlock();
}
void lock_notify() const
{
return m_thread->lock_notify();
}
void notify() const
{
return m_thread->notify();
}
};
// Simple thread mutex locker
class thread_lock final
{
thread_ctrl* m_thread;
public:
thread_lock(const thread_lock&) = delete;
// Lock specified thread
thread_lock(thread_ctrl* thread)
: m_thread(thread)
{
m_thread->lock();
}
// Lock specified named_thread
thread_lock(named_thread& thread)
: thread_lock(thread.operator->())
{
}
// Lock current thread
thread_lock()
: thread_lock(thread_ctrl::get_current())
{
}
~thread_lock()
{
m_thread->unlock();
}
};
// Interrupt guard scope
class thread_guard final
{
@ -455,24 +342,24 @@ public:
thread_guard(const thread_guard&) = delete;
thread_guard(thread_ctrl* thread)
: m_thread(thread)
//: m_thread(thread)
{
m_thread->guard_enter();
//m_thread->guard_enter();
}
thread_guard(named_thread& thread)
: thread_guard(thread.operator->())
//: thread_guard(thread.get())
{
}
thread_guard()
: thread_guard(thread_ctrl::get_current())
//: thread_guard(thread_ctrl::get_current())
{
}
~thread_guard() noexcept(false)
{
m_thread->guard_leave();
//m_thread->guard_leave();
}
};
@ -498,7 +385,7 @@ public:
}
// Access thread_ctrl
thread_ctrl* operator->() const
thread_ctrl* get() const
{
return m_thread.get();
}

View file

@ -20,13 +20,13 @@ public:
constexpr cond_variable() = default;
// Intrusive wait algorithm for lockable objects
template <typename T, void (T::*Unlock)() = &T::unlock, void (T::*Lock)() = &T::lock>
template <typename T>
explicit_bool_t wait(T& object, u64 usec_timeout = -1)
{
const u32 _old = m_value.fetch_add(1); // Increment waiter counter
(object.*Unlock)();
object.unlock();
const bool res = imp_wait(_old, usec_timeout);
(object.*Lock)();
object.lock();
return res;
}

View file

@ -99,27 +99,39 @@ public:
}
};
// Simplified shared (reader) lock implementation, std::shared_lock compatible.
// Simplified shared (reader) lock implementation.
class reader_lock final
{
shared_mutex& m_mutex;
void lock()
{
m_mutex.lock_shared();
}
void unlock()
{
m_mutex.unlock_shared();
}
friend class cond_variable;
public:
reader_lock(const reader_lock&) = delete;
explicit reader_lock(shared_mutex& mutex)
: m_mutex(mutex)
{
m_mutex.lock_shared();
lock();
}
~reader_lock()
{
m_mutex.unlock_shared();
unlock();
}
};
// Simplified exclusive (writer) lock implementation, std::lock_guard compatible.
// Simplified exclusive (writer) lock implementation.
class writer_lock final
{
shared_mutex& m_mutex;

View file

@ -13,6 +13,8 @@ class semaphore_base
void imp_post(s32 _old);
friend class semaphore_lock;
protected:
explicit constexpr semaphore_base(s32 value)
: m_value{value}
@ -108,3 +110,34 @@ public:
return Max;
}
};
class semaphore_lock
{
semaphore_base& m_base;
void lock()
{
m_base.wait();
}
void unlock()
{
m_base.post(INT32_MAX);
}
friend class cond_variable;
public:
explicit semaphore_lock(const semaphore_lock&) = delete;
semaphore_lock(semaphore_base& sema)
: m_base(sema)
{
lock();
}
~semaphore_lock()
{
unlock();
}
};