SQueue renamed and moved

This commit is contained in:
Nekotekina 2014-12-25 01:24:17 +03:00
parent 69388f032e
commit 7613d749ec
23 changed files with 311 additions and 254 deletions

View file

@ -1,4 +1,5 @@
#pragma once
#include "Emu/Memory/atomic_type.h"
static std::thread::id main_thread;
@ -71,33 +72,6 @@ public:
bool joinable() const;
};
class s_mutex_t
{
};
class s_shared_mutex_t
{
};
class s_cond_var_t
{
//public:
// s_cond_var_t();
// ~s_cond_var_t();
//
// s_cond_var_t(s_cond_var_t& right) = delete;
// s_cond_var_t& operator = (s_cond_var_t& right) = delete;
//
// void wait();
// void wait_for();
//
// void notify();
// void notify_all();
};
class slw_mutex_t
{
@ -173,3 +147,227 @@ public:
// signal all threads waiting on waiter_op() with the same signal_id (signaling only hints those threads that corresponding conditions are *probably* met)
void notify(u64 signal_id);
};
template<typename T, u32 sq_size = 256>
class squeue_t
{
struct squeue_sync_var_t
{
struct
{
u32 position : 31;
u32 read_lock : 1;
};
struct
{
u32 count : 31;
u32 write_lock : 1;
};
};
atomic_le_t<squeue_sync_var_t> m_sync;
mutable std::mutex m_rcv_mutex, m_wcv_mutex;
mutable std::condition_variable m_rcv, m_wcv;
T m_data[sq_size];
public:
squeue_t()
{
m_sync.write_relaxed({});
}
u32 get_max_size() const
{
return sq_size;
}
bool is_full() const volatile
{
return m_sync.read_relaxed().count == sq_size;
}
bool push(const T& data, const volatile bool* do_exit = nullptr)
{
u32 pos = 0;
while (!m_sync.atomic_op_sync(true, [&pos](squeue_sync_var_t& sync) -> bool
{
assert(sync.count <= sq_size);
assert(sync.position < sq_size);
if (sync.write_lock || sync.count == sq_size)
{
return false;
}
sync.write_lock = 1;
pos = sync.position + sync.count;
return true;
}))
{
if (Emu.IsStopped() || (do_exit && *do_exit))
{
return false;
}
std::unique_lock<std::mutex> wcv_lock(m_wcv_mutex);
m_wcv.wait_for(wcv_lock, std::chrono::milliseconds(1));
}
m_data[pos >= sq_size ? pos - sq_size : pos] = data;
m_sync.atomic_op([](squeue_sync_var_t& sync)
{
assert(sync.count <= sq_size);
assert(sync.position < sq_size);
assert(sync.write_lock);
sync.write_lock = 0;
sync.count++;
});
m_rcv.notify_one();
m_wcv.notify_one();
return true;
}
bool try_push(const T& data)
{
static const volatile bool no_wait = true;
return push(data, &no_wait);
}
bool pop(T& data, const volatile bool* do_exit = nullptr)
{
u32 pos = 0;
while (!m_sync.atomic_op_sync(true, [&pos](squeue_sync_var_t& sync) -> bool
{
assert(sync.count <= sq_size);
assert(sync.position < sq_size);
if (sync.read_lock || !sync.count)
{
return false;
}
sync.read_lock = 1;
pos = sync.position;
return true;
}))
{
if (Emu.IsStopped() || (do_exit && *do_exit))
{
return false;
}
std::unique_lock<std::mutex> rcv_lock(m_rcv_mutex);
m_rcv.wait_for(rcv_lock, std::chrono::milliseconds(1));
}
data = m_data[pos];
m_sync.atomic_op([](squeue_sync_var_t& sync)
{
assert(sync.count <= sq_size);
assert(sync.position < sq_size);
assert(sync.read_lock);
sync.read_lock = 0;
sync.position++;
sync.count--;
if (sync.position == sq_size)
{
sync.position = 0;
}
});
m_rcv.notify_one();
m_wcv.notify_one();
return true;
}
bool try_pop(T& data)
{
static const volatile bool no_wait = true;
return pop(data, &no_wait);
}
void clear()
{
while (!m_sync.atomic_op_sync(true, [](squeue_sync_var_t& sync) -> bool
{
assert(sync.count <= sq_size);
assert(sync.position < sq_size);
if (sync.read_lock || sync.write_lock)
{
return false;
}
sync.read_lock = 1;
sync.write_lock = 1;
return true;
}))
{
std::unique_lock<std::mutex> rcv_lock(m_rcv_mutex);
m_rcv.wait_for(rcv_lock, std::chrono::milliseconds(1));
}
m_sync.exchange({});
m_wcv.notify_one();
m_rcv.notify_one();
}
bool peek(T& data, u32 start_pos = 0, const volatile bool* do_exit = nullptr)
{
assert(start_pos < sq_size);
u32 pos = 0;
while (!m_sync.atomic_op_sync(true, [&pos, start_pos](squeue_sync_var_t& sync) -> bool
{
assert(sync.count <= sq_size);
assert(sync.position < sq_size);
if (sync.read_lock || sync.count <= start_pos)
{
return false;
}
sync.read_lock = 1;
pos = sync.position + start_pos;
return true;
}))
{
if (Emu.IsStopped() || (do_exit && *do_exit))
{
return false;
}
std::unique_lock<std::mutex> rcv_lock(m_rcv_mutex);
m_rcv.wait_for(rcv_lock, std::chrono::milliseconds(1));
}
data = m_data[pos >= sq_size ? pos - sq_size : pos];
m_sync.atomic_op([](squeue_sync_var_t& sync)
{
assert(sync.count <= sq_size);
assert(sync.position < sq_size);
assert(sync.read_lock);
sync.read_lock = 0;
});
m_rcv.notify_one();
return true;
}
bool try_peek(T& data, u32 start_pos = 0)
{
static const volatile bool no_wait = true;
return peek(data, start_pos, &no_wait);
}
};