Threads improved, ID manager improved

This commit is contained in:
Nekotekina 2015-11-26 11:06:29 +03:00
parent 78bfd54ad4
commit ca6783ba9a
48 changed files with 1113 additions and 990 deletions

View file

@ -199,7 +199,7 @@ void LogManager::log(LogMessage msg)
prefix = "E ";
break;
}
if (auto thr = get_current_thread_ctrl())
if (auto thr = thread_ctrl::get_current())
{
prefix += "{" + thr->get_name() + "} ";
}

View file

@ -5,7 +5,7 @@
void sleep_queue_entry_t::add_entry()
{
m_queue.emplace_back(m_thread.shared_from_this());
m_queue.emplace_back(std::static_pointer_cast<CPUThread>(m_thread.shared_from_this()));
}
void sleep_queue_entry_t::remove_entry()
@ -33,7 +33,7 @@ bool sleep_queue_entry_t::find() const
return false;
}
sleep_queue_entry_t::sleep_queue_entry_t(CPUThread& cpu, sleep_queue_t& queue)
sleep_queue_entry_t::sleep_queue_entry_t(sleep_entry_t& cpu, sleep_queue_t& queue)
: m_thread(cpu)
, m_queue(queue)
{
@ -41,7 +41,7 @@ sleep_queue_entry_t::sleep_queue_entry_t(CPUThread& cpu, sleep_queue_t& queue)
cpu.sleep();
}
sleep_queue_entry_t::sleep_queue_entry_t(CPUThread& cpu, sleep_queue_t& queue, const defer_sleep_t&)
sleep_queue_entry_t::sleep_queue_entry_t(sleep_entry_t& cpu, sleep_queue_t& queue, const defer_sleep_t&)
: m_thread(cpu)
, m_queue(queue)
{

View file

@ -1,15 +1,14 @@
#pragma once
class CPUThread;
using sleep_queue_t = std::deque<std::shared_ptr<CPUThread>>;
using sleep_entry_t = class CPUThread;
using sleep_queue_t = std::deque<std::shared_ptr<sleep_entry_t>>;
static struct defer_sleep_t {} const defer_sleep{};
// automatic object handling a thread entry in the sleep queue
class sleep_queue_entry_t final
{
CPUThread& m_thread;
sleep_entry_t& m_thread;
sleep_queue_t& m_queue;
void add_entry();
@ -18,10 +17,10 @@ class sleep_queue_entry_t final
public:
// add specified thread to the sleep queue
sleep_queue_entry_t(CPUThread& cpu, sleep_queue_t& queue);
sleep_queue_entry_t(sleep_entry_t& entry, sleep_queue_t& queue);
// don't add specified thread to the sleep queue
sleep_queue_entry_t(CPUThread& cpu, sleep_queue_t& queue, const defer_sleep_t&);
sleep_queue_entry_t(sleep_entry_t& entry, sleep_queue_t& queue, const defer_sleep_t&);
// removes specified thread from the sleep queue if added
~sleep_queue_entry_t();

View file

@ -19,6 +19,27 @@
#include <ucontext.h>
#endif
static const auto s_terminate_handler_set = std::set_terminate([]()
{
if (std::uncaught_exception())
{
try
{
throw;
}
catch (const std::exception& ex)
{
std::printf("Unhandled exception: %s\n", ex.what());
}
catch (...)
{
std::printf("Unhandled exception of unknown type.\n");
}
}
std::abort();
});
void SetCurrentThreadDebugName(const char* threadName)
{
#if defined(_MSC_VER) // this is VS-specific way to set thread names for the debugger
@ -113,10 +134,11 @@ enum x64_op_t : u32
X64OP_STOS,
X64OP_XCHG,
X64OP_CMPXCHG,
X64OP_LOAD_AND_STORE, // lock and [mem],reg
X64OP_LOAD_OR_STORE, // TODO: lock or [mem], reg
X64OP_INC, // TODO: lock inc [mem]
X64OP_DEC, // TODO: lock dec [mem]
X64OP_LOAD_AND_STORE, // lock and [mem], reg
X64OP_LOAD_OR_STORE, // lock or [mem], reg (TODO)
X64OP_LOAD_XOR_STORE, // lock xor [mem], reg (TODO)
X64OP_INC, // lock inc [mem] (TODO)
X64OP_DEC, // lock dec [mem] (TODO)
};
void decode_x64_reg_op(const u8* code, x64_op_t& out_op, x64_reg_t& out_reg, size_t& out_size, size_t& out_length)
@ -1132,10 +1154,7 @@ const PVOID exception_handler = (atexit([]{ RemoveVectoredExceptionHandler(excep
const u64 addr64 = (u64)pExp->ExceptionRecord->ExceptionInformation[1] - (u64)vm::base(0);
const bool is_writing = pExp->ExceptionRecord->ExceptionInformation[0] != 0;
if (pExp->ExceptionRecord->ExceptionCode == EXCEPTION_ACCESS_VIOLATION &&
(u32)addr64 == addr64 &&
get_current_thread_ctrl() &&
handle_access_violation((u32)addr64, is_writing, pExp->ContextRecord))
if (pExp->ExceptionRecord->ExceptionCode == EXCEPTION_ACCESS_VIOLATION && (u32)addr64 == addr64 && thread_ctrl::get_current() && handle_access_violation((u32)addr64, is_writing, pExp->ContextRecord))
{
return EXCEPTION_CONTINUE_EXECUTION;
}
@ -1164,7 +1183,7 @@ void signal_handler(int sig, siginfo_t* info, void* uct)
const bool is_writing = ((ucontext_t*)uct)->uc_mcontext.gregs[REG_ERR] & 0x2;
#endif
if ((u32)addr64 == addr64 && get_current_thread_ctrl())
if ((u32)addr64 == addr64 && thread_ctrl::get_current())
{
if (handle_access_violation((u32)addr64, is_writing, (ucontext_t*)uct))
{
@ -1191,94 +1210,112 @@ const int sigaction_result = []() -> int
#endif
thread_local thread_ctrl_t* g_tls_this_thread = nullptr;
thread_local thread_ctrl* thread_ctrl::g_tls_this_thread = nullptr;
const thread_ctrl_t* get_current_thread_ctrl()
{
return g_tls_this_thread;
}
// TODO
std::atomic<u32> g_thread_count{ 0 };
std::string thread_ctrl_t::get_name() const
void thread_ctrl::initialize()
{
return m_name();
}
SetCurrentThreadDebugName(g_tls_this_thread->m_name().c_str());
named_thread_t::named_thread_t(std::function<std::string()> name, std::function<void()> func)
{
start(std::move(name), std::move(func));
}
#if defined(_MSC_VER)
_set_se_translator(_se_translator); // not essential, disable if necessary
#endif
named_thread_t::~named_thread_t()
{
if (m_thread)
#ifdef _WIN32
if (!exception_handler || !exception_filter)
#else
if (sigaction_result == -1)
#endif
{
std::printf("Fatal: thread neither joined nor detached\n");
std::printf("Exceptions handlers are not set correctly.\n");
std::terminate();
}
// TODO
g_thread_count++;
}
void thread_ctrl::finalize() noexcept
{
// TODO
vm::reservation_free();
// TODO
g_thread_count--;
// Call atexit functions
for (const auto& func : decltype(m_atexit)(std::move(g_tls_this_thread->m_atexit)))
{
func();
}
}
thread_ctrl::~thread_ctrl()
{
m_thread.detach();
if (m_future.valid())
{
try
{
m_future.get();
}
catch (const std::exception& ex)
{
LOG_ERROR(GENERAL, "Abandoned exception: %s", ex.what());
}
catch (EmulationStopped)
{
}
}
}
std::string thread_ctrl::get_name() const
{
CHECK_ASSERTION(m_name);
return m_name();
}
std::string named_thread_t::get_name() const
{
if (!m_thread)
{
throw EXCEPTION("Invalid thread");
}
if (!m_thread->m_name)
{
throw EXCEPTION("Invalid name getter");
}
return m_thread->m_name();
return fmt::format("('%s') Unnamed Thread", typeid(*this).name());
}
std::atomic<u32> g_thread_count{ 0 };
void named_thread_t::start(std::function<std::string()> name, std::function<void()> func)
void named_thread_t::start()
{
if (m_thread)
CHECK_ASSERTION(m_thread == nullptr);
// Get shared_ptr instance (will throw if called from the constructor or the object has been created incorrectly)
auto ptr = shared_from_this();
// Make name getter
auto name = [wptr = std::weak_ptr<named_thread_t>(ptr), type = &typeid(*this)]()
{
throw EXCEPTION("Thread already exists");
}
// Return actual name if available
if (const auto ptr = wptr.lock())
{
return ptr->get_name();
}
else
{
return fmt::format("('%s') Deleted Thread", type->name());
}
};
// create new thread control variable
m_thread = std::make_shared<thread_ctrl_t>(std::move(name));
// start thread
m_thread->m_thread = std::thread([](std::shared_ptr<thread_ctrl_t> ctrl, std::function<void()> func)
// Run thread
m_thread = thread_ctrl::spawn(std::move(name), [thread = std::move(ptr)]()
{
g_tls_this_thread = ctrl.get();
SetCurrentThreadDebugName(ctrl->get_name().c_str());
#if defined(_MSC_VER)
_set_se_translator(_se_translator);
#endif
#ifdef _WIN32
if (!exception_handler || !exception_filter)
{
LOG_ERROR(GENERAL, "exception_handler not set");
return;
}
#else
if (sigaction_result == -1)
{
printf("sigaction() failed");
exit(EXIT_FAILURE);
}
#endif
try
{
g_thread_count++;
if (rpcs3::config.misc.log.hle_logging.value())
{
LOG_NOTICE(GENERAL, "Thread started");
}
func();
thread->on_task();
if (rpcs3::config.misc.log.hle_logging.value())
{
@ -1295,75 +1332,24 @@ void named_thread_t::start(std::function<std::string()> name, std::function<void
LOG_NOTICE(GENERAL, "Thread aborted");
}
for (auto& func : ctrl->m_atexit)
{
func();
func = nullptr;
}
vm::reservation_free();
g_thread_count--;
}, m_thread, std::move(func));
}
void named_thread_t::detach()
{
if (!m_thread)
{
throw EXCEPTION("Invalid thread");
}
// +clear m_thread
const auto ctrl = std::move(m_thread);
// notify if detached by another thread
if (g_tls_this_thread != m_thread.get())
{
// lock for reliable notification
std::lock_guard<std::mutex> lock(mutex);
cv.notify_one();
}
ctrl->m_thread.detach();
thread->on_exit();
});
}
void named_thread_t::join()
{
if (!m_thread)
CHECK_ASSERTION(m_thread != nullptr);
try
{
throw EXCEPTION("Invalid thread");
m_thread->join();
m_thread.reset();
}
if (g_tls_this_thread == m_thread.get())
catch (...)
{
throw EXCEPTION("Deadlock");
m_thread.reset();
throw;
}
// +clear m_thread
const auto ctrl = std::move(m_thread);
{
// lock for reliable notification
std::lock_guard<std::mutex> lock(mutex);
cv.notify_one();
}
ctrl->m_thread.join();
}
bool named_thread_t::is_current() const
{
if (!m_thread)
{
throw EXCEPTION("Invalid thread");
}
return g_tls_this_thread == m_thread.get();
}
const std::function<bool()> SQUEUE_ALWAYS_EXIT = [](){ return true; };

View file

@ -1,107 +1,171 @@
#pragma once
const class thread_ctrl_t* get_current_thread_ctrl();
// Named thread control class
class thread_ctrl_t final
// Thread control class
class thread_ctrl final
{
friend class named_thread_t;
template<typename T> friend void current_thread_register_atexit(T);
// Thread handler
std::thread m_thread;
static thread_local thread_ctrl* g_tls_this_thread;
// Name getter
const std::function<std::string()> m_name;
std::function<std::string()> m_name;
// Functions executed at thread exit (temporarily)
std::vector<std::function<void()>> m_atexit;
// Thread handle (be careful)
std::thread m_thread;
// Thread result
std::future<void> m_future;
// Functions scheduled at thread exit
std::deque<std::function<void()>> m_atexit;
// Called at the thread start
static void initialize();
// Called at the thread end
static void finalize() noexcept;
public:
thread_ctrl_t(std::function<std::string()> name)
: m_name(std::move(name))
template<typename T>
thread_ctrl(T&& name)
: m_name(std::forward<T>(name))
{
}
thread_ctrl_t(const thread_ctrl_t&) = delete;
// Disable copy/move constructors and operators
thread_ctrl(const thread_ctrl&) = delete;
~thread_ctrl();
// Get thread name
std::string get_name() const;
// Get future result (may throw)
void join()
{
return m_future.get();
}
// Get current thread (may be nullptr)
static const thread_ctrl* get_current()
{
return g_tls_this_thread;
}
// Register function at thread exit (for the current thread)
template<typename T>
static inline void at_exit(T&& func)
{
CHECK_ASSERTION(g_tls_this_thread);
g_tls_this_thread->m_atexit.emplace_front(std::forward<T>(func));
}
// Named thread factory
template<typename N, typename F>
static inline std::shared_ptr<thread_ctrl> spawn(N&& name, F&& func)
{
auto ctrl = std::make_shared<thread_ctrl>(std::forward<N>(name));
std::promise<void> promise;
ctrl->m_future = promise.get_future();
ctrl->m_thread = std::thread([ctrl, task = std::forward<F>(func)](std::promise<void> promise)
{
g_tls_this_thread = ctrl.get();
try
{
initialize();
task();
finalize();
promise.set_value();
}
catch (...)
{
finalize();
promise.set_exception(std::current_exception());
}
}, std::move(promise));
return ctrl;
}
};
// Register function at thread exit (temporarily)
template<typename T> void current_thread_register_atexit(T func)
{
extern thread_local thread_ctrl_t* g_tls_this_thread;
g_tls_this_thread->m_atexit.emplace_back(func);
}
class named_thread_t
class named_thread_t : public std::enable_shared_from_this<named_thread_t>
{
// Pointer to managed resource (shared with actual thread)
std::shared_ptr<thread_ctrl_t> m_thread;
std::shared_ptr<thread_ctrl> m_thread;
public:
// Thread mutex for external use
std::mutex mutex;
// Thread condition variable for external use
// Thread condition variable for external use (this thread waits on it, other threads may notify)
std::condition_variable cv;
// Thread mutex for external use (can be used with `cv`)
std::mutex mutex;
protected:
// Thread task (called in the thread)
virtual void on_task() = 0;
// Thread finalization (called after on_task)
virtual void on_exit() {}
// ID initialization (called through id_aux_initialize)
virtual void on_id_aux_initialize() { start(); }
// ID finalization (called through id_aux_finalize)
virtual void on_id_aux_finalize() { join(); }
public:
// Initialize in empty state
named_thread_t() = default;
// Create named thread
named_thread_t(std::function<std::string()> name, std::function<void()> func);
virtual ~named_thread_t() = default;
// Deleted copy/move constructors + copy/move operators
named_thread_t(const named_thread_t&) = delete;
// Destructor, calls std::terminate if the thread is neither joined nor detached
virtual ~named_thread_t();
public:
// Get thread name
std::string get_name() const;
virtual std::string get_name() const;
// Create named thread (current state must be empty)
void start(std::function<std::string()> name, std::function<void()> func);
// Start thread (cannot be called from the constructor: should throw bad_weak_ptr in such case)
void start();
// Detach thread -> empty state
void detach();
// Join thread -> empty state
// Join thread (get future result)
void join();
// Check whether the thread is not in "empty state"
bool joinable() const { return m_thread.operator bool(); }
bool is_started() const { return m_thread.operator bool(); }
// Check whether it is the currently running thread
bool is_current() const;
// Compare with the current thread
bool is_current() const { CHECK_ASSERTION(m_thread); return thread_ctrl::get_current() == m_thread.get(); }
// Get internal thread pointer
const thread_ctrl_t* get_thread_ctrl() const { return m_thread.get(); }
// Get thread_ctrl
const thread_ctrl* get_thread_ctrl() const { return m_thread.get(); }
friend void id_aux_initialize(named_thread_t* ptr) { ptr->on_id_aux_initialize(); }
friend void id_aux_finalize(named_thread_t* ptr) { ptr->on_id_aux_finalize(); }
};
// Wrapper for named_thread_t, joins automatically in the destructor
class autojoin_thread_t final
// Wrapper for named thread, joins automatically in the destructor, can only be used in function scope
class scope_thread_t final
{
named_thread_t m_thread;
std::shared_ptr<thread_ctrl> m_thread;
public:
autojoin_thread_t(std::function<std::string()> name, std::function<void()> func)
: m_thread(std::move(name), std::move(func))
template<typename N, typename F>
scope_thread_t(N&& name, F&& func)
: m_thread(thread_ctrl::spawn(std::forward<N>(name), std::forward<F>(func)))
{
}
autojoin_thread_t(const autojoin_thread_t&) = delete;
// Deleted copy/move constructors + copy/move operators
scope_thread_t(const scope_thread_t&) = delete;
~autojoin_thread_t() noexcept(false) // Allow exceptions
// Destructor with exceptions allowed
~scope_thread_t() noexcept(false)
{
m_thread.join();
m_thread->join();
}
};