shared_mutex cleanup

Rewrite to use unsigned arithmetic, shrink to u32
Ensure zero default unlocked state (will need it later)
Inline all public methods, rewrite lock_upgrade()
Remove try_lock_degrade(), lock_degrade() methods
Implement lock_downgrade() (now trivial)
Remove is_reading(), added is_free()
Added reader_lock::try_upgrade()
This commit is contained in:
Nekotekina 2018-09-21 20:17:16 +03:00
parent bdf85ed900
commit 9e5b633779
2 changed files with 134 additions and 225 deletions

View file

@ -2,255 +2,148 @@
#include "sync.h" #include "sync.h"
#include <climits> #include <climits>
#include <vector>
#include <algorithm>
void shared_mutex::imp_lock_shared(s64 _old) void shared_mutex::imp_lock_shared(u32 val)
{ {
verify("shared_mutex overflow" HERE), _old <= c_max; verify("shared_mutex underflow" HERE), val < c_err;
for (int i = 0; i < 10; i++) for (int i = 0; i < 10; i++)
{ {
busy_wait(); busy_wait();
const s64 value = m_value.load(); if (try_lock_shared())
if (value >= c_min && m_value.compare_and_swap_test(value, value - c_min))
{ {
return; return;
} }
} }
#ifdef _WIN32 // Acquire writer lock and downgrade
// Acquire writer lock const u32 old = m_value.fetch_add(c_one);
imp_wait(m_value.load());
// Convert to reader lock if (old == 0)
s64 value = m_value.fetch_add(c_one - c_min);
// Proceed exclusively
return;
if (value != 0)
{
imp_unlock(value);
}
// Wait as a reader if necessary
if (value + c_one - c_min < 0)
{
NtWaitForKeyedEvent(nullptr, (int*)&m_value + 1, false, nullptr);
}
#else
// Acquire writer lock
imp_wait(0);
// Convert to reader lock
m_value += c_one - c_min;
// Disabled code
while (false)
{
const s64 value0 = m_value.fetch_op([](s64& value)
{
if (value >= c_min)
{
value -= c_min;
}
});
if (value0 >= c_min)
{ {
lock_downgrade();
return; return;
} }
// Acquire writer lock verify("shared_mutex overflow" HERE), (old % c_sig) + c_one < c_sig;
imp_wait(value0); imp_wait();
lock_downgrade();
// Convert to reader lock
s64 value1 = m_value.fetch_add(c_one - c_min);
if (value1 != 0)
{
imp_unlock(value1);
} }
value1 += c_one - c_min; void shared_mutex::imp_unlock_shared(u32 old)
if (value1 > 0)
{ {
return; verify("shared_mutex underflow" HERE), old - 1 < c_err;
}
// Wait as a reader if necessary
while (futex((int*)&m_value.raw() + IS_LE_MACHINE, FUTEX_WAIT_BITSET_PRIVATE, int(value1 >> 32), nullptr, nullptr, INT_MIN))
{
value1 = m_value.load();
if (value1 >= 0)
{
return;
}
}
// If blocked by writers, release the reader lock and try again
const s64 value2 = m_value.fetch_op([](s64& value)
{
if (value < 0)
{
value += c_min;
}
});
if (value2 >= 0)
{
return;
}
imp_unlock_shared(value2);
}
#endif
}
void shared_mutex::imp_unlock_shared(s64 _old)
{
verify("shared_mutex overflow" HERE), _old + c_min <= c_max;
// Check reader count, notify the writer if necessary // Check reader count, notify the writer if necessary
if ((_old + c_min) % c_one == 0) if ((old - 1) % c_one == 0)
{ {
#ifdef _WIN32 #ifdef _WIN32
NtReleaseKeyedEvent(nullptr, &m_value, false, nullptr); NtReleaseKeyedEvent(nullptr, &m_value, false, nullptr);
#else #else
m_value -= c_sig; m_value += c_sig;
futex((int*)&m_value.raw() + IS_LE_MACHINE, FUTEX_WAKE_BITSET_PRIVATE, 1, nullptr, nullptr, u32(c_sig >> 32)); futex(reinterpret_cast<int*>(&m_value.raw()), FUTEX_WAKE_BITSET_PRIVATE, 1, nullptr, nullptr, c_sig);
#endif #endif
} }
} }
void shared_mutex::imp_wait(s64) void shared_mutex::imp_wait()
{ {
#ifdef _WIN32 #ifdef _WIN32
if (m_value.sub_fetch(c_one))
{
NtWaitForKeyedEvent(nullptr, &m_value, false, nullptr); NtWaitForKeyedEvent(nullptr, &m_value, false, nullptr);
}
#else #else
if (!m_value.sub_fetch(c_one))
{
// Return immediately if locked
return;
}
while (true) while (true)
{ {
// Load new value, try to acquire c_sig // Load new value, try to acquire c_sig
const s64 value = m_value.fetch_op([](s64& value) auto [value, ok] = m_value.fetch_op([](u32& value)
{ {
if (value <= c_one - c_sig) if (value >= c_sig)
{ {
value += c_sig; value -= c_sig;
return true;
} }
return false;
}); });
if (value <= c_one - c_sig) if (ok)
{ {
return; return;
} }
futex((int*)&m_value.raw() + IS_LE_MACHINE, FUTEX_WAIT_BITSET_PRIVATE, int(value >> 32), nullptr, nullptr, u32(c_sig >> 32)); futex(reinterpret_cast<int*>(&m_value.raw()), FUTEX_WAIT_BITSET_PRIVATE, value, nullptr, nullptr, c_sig);
} }
#endif #endif
} }
void shared_mutex::imp_lock(s64 _old) void shared_mutex::imp_lock(u32 val)
{ {
verify("shared_mutex overflow" HERE), _old <= c_max; verify("shared_mutex underflow" HERE), val < c_err;
for (int i = 0; i < 10; i++) for (int i = 0; i < 10; i++)
{ {
busy_wait(); busy_wait();
const s64 value = m_value.load(); if (try_lock())
if (value == c_one && m_value.compare_and_swap_test(c_one, 0))
{ {
return; return;
} }
} }
imp_wait(m_value.load()); const u32 old = m_value.fetch_add(c_one);
if (old == 0)
{
return;
} }
void shared_mutex::imp_unlock(s64 _old) verify("shared_mutex overflow" HERE), (old % c_sig) + c_one < c_sig;
imp_wait();
}
void shared_mutex::imp_unlock(u32 old)
{ {
verify("shared_mutex overflow" HERE), _old + c_one <= c_max; verify("shared_mutex underflow" HERE), old - c_one < c_err;
// 1) Notify the next writer if necessary // 1) Notify the next writer if necessary
// 2) Notify all readers otherwise if necessary // 2) Notify all readers otherwise if necessary (currently indistinguishable from writers)
#ifdef _WIN32 #ifdef _WIN32
if (_old + c_one <= 0) if (old - c_one)
{ {
NtReleaseKeyedEvent(nullptr, &m_value, false, nullptr); NtReleaseKeyedEvent(nullptr, &m_value, false, nullptr);
} }
else if (s64 count = -_old / c_min * 0)
{
// Disabled code
while (count--)
{
NtReleaseKeyedEvent(nullptr, (int*)&m_value + 1, false, nullptr);
}
}
#else #else
if (_old + c_one <= 0) if (old - c_one)
{ {
m_value -= c_sig; m_value += c_sig;
futex((int*)&m_value.raw() + IS_LE_MACHINE, FUTEX_WAKE_BITSET_PRIVATE, 1, nullptr, nullptr, u32(c_sig >> 32)); futex(reinterpret_cast<int*>(&m_value.raw()), FUTEX_WAKE_BITSET_PRIVATE, 1, nullptr, nullptr, c_sig);
}
else if (false)
{
// Disabled code
futex((int*)&m_value.raw() + IS_LE_MACHINE, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, nullptr, INT_MIN);
} }
#endif #endif
} }
void shared_mutex::imp_lock_upgrade() void shared_mutex::imp_lock_upgrade()
{ {
// TODO for (int i = 0; i < 10; i++)
unlock_shared(); {
lock(); busy_wait();
if (try_lock_upgrade())
{
return;
}
} }
void shared_mutex::imp_lock_degrade() // Convert to writer lock
const u32 old = m_value.fetch_add(c_one - 1);
verify("shared_mutex overflow" HERE), (old % c_sig) + c_one - 1 < c_sig;
if (old % c_one == 1)
{ {
// TODO return;
unlock();
lock_shared();
} }
bool shared_mutex::try_lock_shared() imp_wait();
{
// Conditional decrement
return m_value.fetch_dec_sat(c_min - 1, c_min) >= c_min;
}
bool shared_mutex::try_lock()
{
// Conditional decrement (TODO: obtain c_sig)
return m_value.compare_and_swap_test(c_one, 0);
}
bool shared_mutex::try_lock_upgrade()
{
// TODO
return m_value.compare_and_swap_test(c_one - c_min, 0);
}
bool shared_mutex::try_lock_degrade()
{
// TODO
return m_value.compare_and_swap_test(0, c_one - c_min);
} }

View file

@ -4,39 +4,41 @@
#include "types.h" #include "types.h"
#include "Atomic.h" #include "Atomic.h"
// Shared mutex. // Shared mutex with small size (u32).
class shared_mutex final class shared_mutex final
{ {
enum : s64 enum : u32
{ {
c_one = 1ull << 31, // Fixed-point 1.0 value (one writer) c_one = 1u << 7, // Fixed-point 1.0 value (one writer, 1.0/(max_readers+1) is 1)
c_min = 0x00000001, // Fixed-point 1.0/max_readers value c_sig = 1u << 30,
c_sig = 1ull << 62, c_err = 1u << 31,
c_max = c_one
}; };
atomic_t<s64> m_value{c_one}; // Semaphore-alike counter atomic_t<u32> m_value{};
void imp_lock_shared(s64 _old);
void imp_unlock_shared(s64 _old);
void imp_wait(s64 _old);
void imp_lock(s64 _old);
void imp_unlock(s64 _old);
void imp_lock_shared(u32 val);
void imp_unlock_shared(u32 old);
void imp_wait();
void imp_lock(u32 val);
void imp_unlock(u32 old);
void imp_lock_upgrade(); void imp_lock_upgrade();
void imp_lock_degrade();
public: public:
constexpr shared_mutex() = default; constexpr shared_mutex() = default;
bool try_lock_shared(); bool try_lock_shared()
{
const u32 value = m_value.load();
// Conditional increment
return value < c_one - 1 && m_value.compare_and_swap_test(value, value + 1);
}
void lock_shared() void lock_shared()
{ {
const s64 value = m_value.load(); const u32 value = m_value.load();
// Fast path: decrement if positive if (UNLIKELY(value >= c_one - 1 || !m_value.compare_and_swap_test(value, value + 1)))
if (UNLIKELY(value < c_min || value > c_one || !m_value.compare_and_swap_test(value, value - c_min)))
{ {
imp_lock_shared(value); imp_lock_shared(value);
} }
@ -44,23 +46,25 @@ public:
void unlock_shared() void unlock_shared()
{ {
// Unconditional increment // Unconditional decrement (can result in broken state)
const s64 value = m_value.fetch_add(c_min); const u32 value = m_value.fetch_sub(1);
if (value < 0 || value > c_one - c_min) if (UNLIKELY(value >= c_one))
{ {
imp_unlock_shared(value); imp_unlock_shared(value);
} }
} }
bool try_lock(); bool try_lock()
{
return m_value == 0 && m_value.compare_and_swap_test(0, c_one);
}
void lock() void lock()
{ {
// Try to lock const u32 value = m_value.compare_and_swap(0, c_one);
const s64 value = m_value.compare_and_swap(c_one, 0);
if (value != c_one) if (UNLIKELY(value))
{ {
imp_lock(value); imp_lock(value);
} }
@ -68,43 +72,47 @@ public:
void unlock() void unlock()
{ {
// Unconditional increment // Unconditional decrement (can result in broken state)
const s64 value = m_value.fetch_add(c_one); const u32 value = m_value.fetch_sub(c_one);
if (value != 0) if (UNLIKELY(value != c_one))
{ {
imp_unlock(value); imp_unlock(value);
} }
} }
bool try_lock_upgrade(); bool try_lock_upgrade()
{
const u32 value = m_value.load();
// Conditional increment, try to convert a single reader into a writer, ignoring other writers
return (value + c_one - 1) % c_one == 0 && m_value.compare_and_swap_test(value, value + c_one - 1);
}
void lock_upgrade() void lock_upgrade()
{ {
if (!m_value.compare_and_swap_test(c_one - c_min, 0)) if (UNLIKELY(!try_lock_upgrade()))
{ {
imp_lock_upgrade(); imp_lock_upgrade();
} }
} }
bool try_lock_degrade(); void lock_downgrade()
void lock_degrade()
{ {
if (!m_value.compare_and_swap_test(0, c_one - c_min)) // Convert to reader lock (can result in broken state)
{ m_value -= c_one - 1;
imp_lock_degrade();
}
} }
bool is_reading() const // Check whether can immediately obtain an exclusive (writer) lock
bool is_free() const
{ {
return (m_value.load() % c_one) != 0; return m_value.load() == 0;
} }
// Check whether can immediately obtain a shared (reader) lock
bool is_lockable() const bool is_lockable() const
{ {
return m_value.load() >= c_min; return m_value.load() < c_one - 1;
} }
}; };
@ -117,13 +125,15 @@ class reader_lock final
public: public:
reader_lock(const reader_lock&) = delete; reader_lock(const reader_lock&) = delete;
reader_lock& operator=(const reader_lock&) = delete;
explicit reader_lock(shared_mutex& mutex) explicit reader_lock(shared_mutex& mutex)
: m_mutex(mutex) : m_mutex(mutex)
{ {
m_mutex.lock_shared(); m_mutex.lock_shared();
} }
// One-way lock upgrade // One-way lock upgrade; note that the observed state could have been changed
void upgrade() void upgrade()
{ {
if (!m_upgraded) if (!m_upgraded)
@ -133,6 +143,12 @@ public:
} }
} }
// Try to upgrade; if it succeeds, the observed state has NOT been changed
bool try_upgrade()
{
return m_upgraded || (m_upgraded = m_mutex.try_lock_upgrade());
}
~reader_lock() ~reader_lock()
{ {
m_upgraded ? m_mutex.unlock() : m_mutex.unlock_shared(); m_upgraded ? m_mutex.unlock() : m_mutex.unlock_shared();