Sync primitives reworked

(rwlock rewritten)
This commit is contained in:
Nekotekina 2015-01-02 02:41:29 +03:00
parent ac75b62f4d
commit f3cd908d5c
22 changed files with 641 additions and 503 deletions

View file

@ -314,32 +314,6 @@ public:
return pop(data, &no_wait);
}
void clear()
{
while (m_sync.atomic_op_sync(SQSVR_OK, [](squeue_sync_var_t& sync) -> u32
{
assert(sync.count <= sq_size);
assert(sync.position < sq_size);
if (sync.pop_lock || sync.push_lock)
{
return SQSVR_LOCKED;
}
sync.pop_lock = 1;
sync.push_lock = 1;
return SQSVR_OK;
}))
{
std::unique_lock<std::mutex> rcv_lock(m_rcv_mutex);
m_rcv.wait_for(rcv_lock, std::chrono::milliseconds(1));
}
m_sync.exchange({});
m_wcv.notify_one();
m_rcv.notify_one();
}
bool peek(T& data, u32 start_pos = 0, const volatile bool* do_exit = nullptr)
{
assert(start_pos < sq_size);
@ -393,4 +367,93 @@ public:
return peek(data, start_pos, &no_wait);
}
class squeue_data_t
{
T* const m_data;
const u32 m_pos;
const u32 m_count;
squeue_data_t(T* data, u32 pos, u32 count)
: m_data(data)
, m_pos(pos)
, m_count(count)
{
}
public:
T& operator [] (u32 index)
{
assert(index < m_count);
index += m_pos;
index = index < sq_size ? index : index - sq_size;
return m_data[index];
}
};
void process(void(*proc)(squeue_data_t data))
{
u32 pos, count;
while (m_sync.atomic_op_sync(SQSVR_OK, [&pos, &count](squeue_sync_var_t& sync) -> u32
{
assert(sync.count <= sq_size);
assert(sync.position < sq_size);
if (sync.pop_lock || sync.push_lock)
{
return SQSVR_LOCKED;
}
pos = sync.position;
count = sync.count;
sync.pop_lock = 1;
sync.push_lock = 1;
return SQSVR_OK;
}))
{
std::unique_lock<std::mutex> rcv_lock(m_rcv_mutex);
m_rcv.wait_for(rcv_lock, std::chrono::milliseconds(1));
}
proc(squeue_data_t(m_data, pos, count));
m_sync.atomic_op([](squeue_sync_var_t& sync)
{
assert(sync.count <= sq_size);
assert(sync.position < sq_size);
assert(sync.pop_lock && sync.push_lock);
sync.pop_lock = 0;
sync.push_lock = 0;
});
m_wcv.notify_one();
m_rcv.notify_one();
}
void clear()
{
while (m_sync.atomic_op_sync(SQSVR_OK, [](squeue_sync_var_t& sync) -> u32
{
assert(sync.count <= sq_size);
assert(sync.position < sq_size);
if (sync.pop_lock || sync.push_lock)
{
return SQSVR_LOCKED;
}
sync.pop_lock = 1;
sync.push_lock = 1;
return SQSVR_OK;
}))
{
std::unique_lock<std::mutex> rcv_lock(m_rcv_mutex);
m_rcv.wait_for(rcv_lock, std::chrono::milliseconds(1));
}
m_sync.exchange({});
m_wcv.notify_one();
m_rcv.notify_one();
}
};