10 #ifndef __PION_PIONLOCKEDQUEUE_HEADER__
11 #define __PION_PIONLOCKEDQUEUE_HEADER__
14 #include <boost/cstdint.hpp>
15 #include <boost/noncopyable.hpp>
16 #include <boost/thread/thread.hpp>
17 #include <boost/thread/mutex.hpp>
18 #include <boost/thread/condition.hpp>
19 #include <boost/detail/atomic_count.hpp>
20 #include <pion/PionConfig.hpp>
21 #include <pion/PionException.hpp>
22 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
23 #include <boost/lockfree/detail/freelist.hpp>
41 boost::uint32_t MaxSize = 250000,
42 boost::uint32_t SleepMilliSec = 10 >
44 private boost::noncopyable
52 boost::uint32_t version;
57 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
58 return new (m_free_list.allocate())
QueueNode();
66 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
67 node_ptr->~QueueNode();
68 m_free_list.deallocate(node_ptr);
79 m_head_ptr->next = NULL;
80 m_head_ptr->version = 0;
91 inline bool dequeue(T& t, boost::uint32_t& version) {
93 boost::mutex::scoped_lock head_lock(m_head_mutex);
94 QueueNode *new_head_ptr = m_head_ptr->next;
96 version = m_head_ptr->version;
101 version = new_head_ptr->version;
102 t = new_head_ptr->data;
106 m_head_ptr = new_head_ptr;
131 m_wakeup_time(boost::posix_time::not_a_date_time) {}
139 template <
typename DurationType>
141 : m_is_running(true), m_next_ptr(NULL), m_wakeup_time(d)
145 inline bool isRunning(
void)
const {
return m_is_running; }
148 inline void stop(
void) { m_is_running =
false; m_wakeup_event.notify_one(); }
151 inline void reset(
void) { m_is_running =
true; m_next_ptr = NULL; }
154 inline bool hasWakeupTimer(
void)
const {
return !m_wakeup_time.is_not_a_date_time(); }
158 return m_wakeup_time;
166 volatile bool m_is_running;
168 boost::condition m_wakeup_event;
169 boost::posix_time::time_duration m_wakeup_time;
175 : m_head_ptr(NULL), m_tail_ptr(NULL), m_idle_ptr(NULL),
176 m_next_version(1), m_size(0)
188 inline bool empty(
void)
const {
return (m_head_ptr->next == NULL); }
197 boost::mutex::scoped_lock tail_lock(m_tail_mutex);
198 boost::mutex::scoped_lock head_lock(m_head_mutex);
201 m_tail_ptr = m_head_ptr;
202 m_head_ptr = m_head_ptr->next;
218 boost::system_time wakeup_time;
219 while (
size() >= MaxSize) {
220 wakeup_time = boost::get_system_time()
221 + boost::posix_time::millisec(SleepMilliSec);
222 boost::thread::sleep(wakeup_time);
229 node_ptr->next = NULL;
230 node_ptr->version = 0;
233 boost::mutex::scoped_lock tail_lock(m_tail_mutex);
234 node_ptr->version = (m_next_version += 2);
235 m_tail_ptr->next = node_ptr;
238 m_tail_ptr = node_ptr;
246 m_idle_ptr = m_idle_ptr->m_next_ptr;
247 idle_ptr->m_wakeup_event.notify_one();
262 boost::uint32_t last_known_version;
265 if (
dequeue(t, last_known_version) )
269 boost::mutex::scoped_lock tail_lock(m_tail_mutex);
270 if (m_tail_ptr->version == last_known_version) {
272 thread_info.m_next_ptr = m_idle_ptr;
273 m_idle_ptr = & thread_info;
277 const boost::posix_time::ptime wakeup_time(boost::get_system_time() + thread_info.
getWakeupTimer());
278 if (!thread_info.m_wakeup_event.timed_wait(tail_lock, wakeup_time))
282 thread_info.m_wakeup_event.wait(tail_lock);
296 inline bool pop(T& t) { boost::uint32_t version;
return dequeue(t, version); }
301 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
307 boost::mutex m_head_mutex;
310 boost::mutex m_tail_mutex;
313 QueueNode * m_head_ptr;
316 QueueNode * m_tail_ptr;
319 ConsumerThread * m_idle_ptr;
322 boost::uint32_t m_next_version;
325 boost::detail::atomic_count m_size;