Rewrite condition variables

Implement helper functions balanced_wait_until and balanced_awaken
They include new path for Windows 8.1+ (WaitOnAddress)

shared_mutex, cond_variable, cond_one, cond_x16 modified to use it
Added helper function utils::popcnt16
Replace most semaphore<> with shared_mutex
This commit is contained in:
Nekotekina 2018-11-26 18:55:22 +03:00
parent f442a8a84c
commit 96cabeadff
23 changed files with 269 additions and 338 deletions

View file

@ -9,110 +9,47 @@
bool cond_variable::imp_wait(u32 _old, u64 _timeout) noexcept
{
verify(HERE), _old != -1; // Very unlikely: it requires 2^32 distinct threads to wait simultaneously
const bool is_inf = _timeout > max_timeout;
verify("cond_variable overflow" HERE), (_old & 0xffff) == 0; // Very unlikely: it requires 65535 distinct threads to wait simultaneously
return balanced_wait_until(m_value, _timeout, [&](u32& value, auto... ret) -> int
{
if (value >> 16)
{
// Success
value -= 0x10001;
return +1;
}
if constexpr (sizeof...(ret))
{
// Retire
value -= 1;
return -1;
}
return 0;
});
#ifdef _WIN32
LARGE_INTEGER timeout;
timeout.QuadPart = _timeout * -10;
if (HRESULT rc = _timeout ? NtWaitForKeyedEvent(nullptr, &m_value, false, is_inf ? nullptr : &timeout) : WAIT_TIMEOUT)
if (_old >= 0x10000 && !OptWaitOnAddress && m_value)
{
verify(HERE), rc == WAIT_TIMEOUT;
// Retire
while (!m_value.try_dec())
{
timeout.QuadPart = 0;
if (HRESULT rc2 = NtWaitForKeyedEvent(nullptr, &m_value, false, &timeout))
{
verify(HERE), rc2 == WAIT_TIMEOUT;
SwitchToThread();
continue;
}
return true;
}
return false;
}
return true;
#else
timespec timeout;
timeout.tv_sec = _timeout / 1000000;
timeout.tv_nsec = (_timeout % 1000000) * 1000;
for (u32 value = _old + 1;; value = m_value)
{
const int err = futex(&m_value, FUTEX_WAIT_PRIVATE, value, is_inf ? nullptr : &timeout) == 0
? 0
: errno;
// Normal or timeout wakeup
if (!err || (!is_inf && err == ETIMEDOUT))
{
// Cleanup (remove waiter)
verify(HERE), m_value--;
return !err;
}
// Not a wakeup
verify(HERE), err == EAGAIN;
// Workaround possibly stolen signal
imp_wake(1);
}
#endif
}
void cond_variable::imp_wake(u32 _count) noexcept
{
#ifdef _WIN32
// Try to subtract required amount of waiters
const u32 count = m_value.atomic_op([=](u32& value)
balanced_awaken(m_value, m_value.atomic_op([&](u32& value) -> u32
{
if (value > _count)
{
value -= _count;
return _count;
}
// Subtract already signaled number from total amount of waiters
const u32 can_sig = (value & 0xffff) - (value >> 16);
const u32 num_sig = std::min<u32>(can_sig, _count);
return std::exchange(value, 0);
});
for (u32 i = count; i > 0; i--)
{
NtReleaseKeyedEvent(nullptr, &m_value, false, nullptr);
}
#else
for (u32 i = _count; i > 0; std::this_thread::yield())
{
const u32 value = m_value;
// Constrain remaining amount with imaginary waiter count
if (i > value)
{
i = value;
}
if (!value || i == 0)
{
// Nothing to do
return;
}
if (const int res = futex(&m_value, FUTEX_WAKE_PRIVATE, i > INT_MAX ? INT_MAX : i))
{
verify(HERE), res >= 0 && (u32)res <= i;
i -= res;
}
if (!m_value || i == 0)
{
// Escape
return;
}
}
#endif
value += num_sig << 16;
return num_sig;
}));
}
bool notifier::imp_try_lock(u32 count)
@ -209,62 +146,29 @@ bool notifier::wait(u64 usec_timeout)
return res;
}
bool cond_one::imp_wait(u32 _old, u64 _timeout) noexcept
bool cond_one::imp_wait(u64 _timeout) noexcept
{
verify(HERE), _old == c_lock;
// State transition: c_sig -> c_lock \ c_lock -> c_wait
const u32 _old = m_value.fetch_sub(1);
if (LIKELY(_old == c_sig))
return true;
const bool is_inf = _timeout > cond_variable::max_timeout;
#ifdef _WIN32
LARGE_INTEGER timeout;
timeout.QuadPart = _timeout * -10;
if (HRESULT rc = _timeout ? NtWaitForKeyedEvent(nullptr, &m_value, false, is_inf ? nullptr : &timeout) : WAIT_TIMEOUT)
return balanced_wait_until(m_value, _timeout, [&](u32& value, auto... ret) -> int
{
verify(HERE), rc == WAIT_TIMEOUT;
// Retire
const bool signaled = m_value.exchange(c_lock) == c_sig;
while (signaled)
if (value == c_sig)
{
timeout.QuadPart = 0;
if (HRESULT rc2 = NtWaitForKeyedEvent(nullptr, &m_value, false, &timeout))
{
verify(HERE), rc2 == WAIT_TIMEOUT;
SwitchToThread();
continue;
}
return true;
value = c_lock;
return +1;
}
return false;
}
#else
timespec timeout;
timeout.tv_sec = _timeout / 1000000;
timeout.tv_nsec = (_timeout % 1000000) * 1000;
for (u32 value = _old - 1; value != c_sig; value = m_value)
{
const int err = futex(&m_value, FUTEX_WAIT_PRIVATE, value, is_inf ? nullptr : &timeout) == 0
? 0
: errno;
// Normal or timeout wakeup
if (!err || (!is_inf && err == ETIMEDOUT))
if constexpr (sizeof...(ret))
{
return m_value.exchange(c_lock) == c_sig;
value = c_lock;
return -1;
}
// Not a wakeup
verify(HERE), err == EAGAIN;
}
#endif
verify(HERE), m_value.exchange(c_lock) == c_sig;
return true;
return 0;
});
}
void cond_one::imp_notify() noexcept
@ -287,79 +191,54 @@ void cond_one::imp_notify() noexcept
return;
}
#ifdef _WIN32
NtReleaseKeyedEvent(nullptr, &m_value, false, nullptr);
#else
futex(&m_value, FUTEX_WAKE_PRIVATE, 1);
#endif
balanced_awaken(m_value, 1);
}
bool cond_x16::imp_wait(u32 _new, u32 slot, u64 _timeout) noexcept
bool cond_x16::imp_wait(u32 slot, u64 _timeout) noexcept
{
const u32 wait_bit = c_wait << slot;
const u32 lock_bit = c_lock << slot;
const bool is_inf = _timeout > cond_variable::max_timeout;
#ifdef _WIN32
LARGE_INTEGER timeout;
timeout.QuadPart = _timeout * -10;
if (HRESULT rc = _timeout ? NtWaitForKeyedEvent(nullptr, &m_cvx16, false, is_inf ? nullptr : &timeout) : WAIT_TIMEOUT)
// Change state from c_lock to c_wait
const u32 old_ = m_cvx16.fetch_op([=](u32& cvx16)
{
verify(HERE), rc == WAIT_TIMEOUT;
// Retire
const bool signaled = this->retire(slot);
while (signaled)
if (cvx16 & wait_bit)
{
timeout.QuadPart = 0;
// c_sig -> c_lock
cvx16 &= ~wait_bit;
}
else
{
cvx16 |= wait_bit;
cvx16 &= ~lock_bit;
}
});
if (HRESULT rc2 = NtWaitForKeyedEvent(nullptr, &m_cvx16, false, &timeout))
{
verify(HERE), rc2 == WAIT_TIMEOUT;
SwitchToThread();
continue;
}
if (old_ & wait_bit)
{
// Already signaled, return without waiting
return true;
}
return true;
return balanced_wait_until(m_cvx16, _timeout, [&](u32& cvx16, auto... ret) -> int
{
if (cvx16 & lock_bit)
{
// c_sig -> c_lock
cvx16 &= ~wait_bit;
return +1;
}
return false;
}
if (!this->retire(slot))
{
// Stolen notification: restore balance
NtReleaseKeyedEvent(nullptr, &m_cvx16, false, nullptr);
}
#else
timespec timeout;
timeout.tv_sec = _timeout / 1000000;
timeout.tv_nsec = (_timeout % 1000000) * 1000;
for (u32 value = _new; ((value >> slot) & c_sig) != c_sig; value = m_cvx16)
{
const int err = futex(&m_cvx16, FUTEX_WAIT_PRIVATE, value, is_inf ? nullptr : &timeout) == 0
? 0
: errno;
// Normal or timeout wakeup
if (!err || (!is_inf && err == ETIMEDOUT))
if constexpr (sizeof...(ret))
{
return this->retire(slot);
// Retire
cvx16 |= lock_bit;
cvx16 &= ~wait_bit;
return -1;
}
// Not a wakeup
verify(HERE), err == EAGAIN;
}
// Convert c_sig to c_lock
m_cvx16 &= ~wait_bit;
#endif
return true;
return 0;
});
}
void cond_x16::imp_notify() noexcept
@ -386,13 +265,5 @@ void cond_x16::imp_notify() noexcept
return;
}
#ifdef _WIN32
for (u32 i = 0; i < 16; i++)
{
if ((wait_mask >> i) & 1)
NtReleaseKeyedEvent(nullptr, &m_cvx16, false, nullptr);
}
#else
futex(&m_cvx16, FUTEX_WAKE_PRIVATE, INT_MAX);
#endif
balanced_awaken(m_cvx16, utils::popcnt16(wait_mask));
}