mpmc_node_queue.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. #pragma once
  2. #include "../C/Baselib_Memory.h"
  3. #include "../C/Baselib_Atomic_LLSC.h"
  4. #include "mpmc_node.h"
  5. namespace baselib
  6. {
  7. BASELIB_CPP_INTERFACE
  8. {
  9. // In computer science, a queue is a collection in which the entities in the collection are kept in order and the principal (or only) operations on the
  10. // collection are the addition of entities to the rear terminal position, known as enqueue, and removal of entities from the front terminal position, known
  11. // as dequeue. This makes the queue a First-In-First-Out (FIFO) data structure. In a FIFO data structure, the first element added to the queue will be the
  12. // first one to be removed. This is equivalent to the requirement that once a new element is added, all elements that were added before have to be removed
  13. // before the new element can be removed. Often a peek or front operation is also entered, returning the value of the front element without dequeuing it.
  14. // A queue is an example of a linear data structure, or more abstractly a sequential collection.
  15. //
  16. // "Queue (abstract data type)", Wikipedia: The Free Encyclopedia
  17. // https://en.wikipedia.org/w/index.php?title=Queue_(abstract_data_type)&oldid=878671332
  18. //
  19. // This implementation is a lockless node queue capable of handling multiple concurrent producers and consumers
  20. //
  21. // Node types are required to inherit the mpmc_node class. No data from the inherited class is modified/copied, so no restrictions apply.
  22. // The node memory is allocated and destroyed by the user (user owned).
  23. // Dequeued nodes may be overwritten/discarded and/or reused.
  24. // Dequeued nodes may not be deleted (released from user space memory) while any consumer thread is in the scope of a deque call.
  25. //
  26. // Notes consumer threads:
  27. // While dequeued nodes may be reused and/or overwritten they must however remain in application readable memory (user space memory) until it can be
  28. // guaranteed no consumer thread is still processing the node i.e. not within the scope of a dequeue call.
  29. // Even though the value is ignored (discarded by version check) any consumer thread may still read the node link information.
  30. // Consumer threads are concurrently attempting to dequeue the front in a DCAS loop and the first to succeed will update the queue front and other
  31. // threads continue processing the next front node in the queue. Threads are garuanteed to progress dequeuing nodes even if another consumer
  32. // thread falls asleep during a dequeue, but may fail to dequeue in the combination of the queue getting pre-emptied and the thread resetting the
  33. // state (reload back) falls asleep while swapping the back (between 2x consecutive CAS operations).
  34. // This is usually an extremely infrequent occurence due to the combination required (can not happen unless there's exactly one item in the queue).
  35. // Producer threads always progress independently.
  36. //
  37. // Notes on producer threads:
  38. // A producer thread swaps the back and writes the link information in two consecutive atomic operations. If a producer thread falls asleep after the
  39. // swap and before the link information has been written, the consumer thread(s) will not advance past this point since it doesn't have
  40. // the information yet. Therefore the consumer threads calls will yield null until that particular producer thread wakes back up.
  41. //
  42. template<typename T>
  43. class alignas(sizeof(intptr_t) * 2)mpmc_node_queue
  44. {
  45. public:
  46. // Create a new queue instance.
  47. mpmc_node_queue()
  48. {
  49. m_FrontIntPtr = 1;
  50. m_Front.obj.idx = 1;
  51. m_Back.obj = 0;
  52. atomic_thread_fence(memory_order_seq_cst);
  53. }
  54. // Returns true if queue is empty.
  55. bool empty() const
  56. {
  57. return m_Back.load(memory_order_relaxed) == 0;
  58. }
  59. // Push a node to the back of the queue.
  60. void push_back(T* node)
  61. {
  62. node->next.store(0, memory_order_relaxed);
  63. if (T* prev = m_Back.exchange(node, memory_order_release))
  64. {
  65. prev->next.store(node, memory_order_release);
  66. }
  67. else
  68. {
  69. // store the new front (reload) and add one which will put idx back to an
  70. // even number, releasing the consumer threads (ptr is always null and idx odd at this point).
  71. if (PLATFORM_LLSC_NATIVE_SUPPORT)
  72. {
  73. m_FrontPair.ptr.store(node, memory_order_release);
  74. }
  75. else
  76. {
  77. m_FrontPair.ptr.store(node, memory_order_relaxed);
  78. m_FrontPair.idx.fetch_add(1, memory_order_release);
  79. }
  80. }
  81. }
  82. // Push a linked list of nodes to the back of the queue.
  83. void push_back(T* first_node, T* last_node)
  84. {
  85. last_node->next.store(0, memory_order_relaxed);
  86. if (T* prev = m_Back.exchange(last_node, memory_order_release))
  87. {
  88. prev->next.store(first_node, memory_order_release);
  89. }
  90. else
  91. {
  92. if (PLATFORM_LLSC_NATIVE_SUPPORT)
  93. {
  94. m_FrontPair.ptr.store(first_node, memory_order_release);
  95. }
  96. else
  97. {
  98. m_FrontPair.ptr.store(first_node, memory_order_relaxed);
  99. m_FrontPair.idx.fetch_add(1, memory_order_release);
  100. }
  101. }
  102. }
  103. // Try to pop frontmost node of the queue.
  104. //
  105. // Note that if null is returned, there may still be push operations in progress in a producer thread.
  106. // Use the "empty" function to check if a queue is empty.
  107. //
  108. // \returns front node of the queue or null.
  109. T* try_pop_front()
  110. {
  111. T* node, *next;
  112. if (PLATFORM_LLSC_NATIVE_SUPPORT)
  113. {
  114. intptr_t value;
  115. Baselib_atomic_llsc_ptr_acquire_release_v(&m_Front, &node, &next,
  116. {
  117. // If front bit 0 is set, queue back is being reloaded or queue is empty.
  118. value = reinterpret_cast<intptr_t>(node);
  119. if (value & 1)
  120. {
  121. Baselib_atomic_llsc_break();
  122. return 0;
  123. }
  124. // Fetch next node. If zero, node is the current backnode. LLSC Monitor is internally cleared by subsequent cmpxchg.
  125. if (!(next = static_cast<T*>(node->next.obj)))
  126. goto BackNode;
  127. });
  128. return node;
  129. BackNode:
  130. // - filters obsolete nodes
  131. // - Exclusive access (re-entrant block)
  132. T * front = node;
  133. if (!m_FrontPair.ptr.compare_exchange_strong(front, reinterpret_cast<T*>(value | 1), memory_order_acquire, memory_order_relaxed))
  134. return 0;
  135. // - filters incomplete nodes
  136. // - check if node is back == retrigger new back
  137. if (!m_Back.compare_exchange_strong(front, 0, memory_order_acquire, memory_order_relaxed))
  138. {
  139. // Back progressed or node is incomplete, restore access and return 0
  140. m_FrontIntPtr.fetch_and(~1, memory_order_release);
  141. return 0;
  142. }
  143. // Success, back == front node, back was set to zero above and index / access is restored by producers, so we return the back node.
  144. // LLSC monitors invalidates any obsolete nodes still in process in other threads.
  145. return node;
  146. }
  147. else
  148. {
  149. SequencedFrontPtr front, value;
  150. // Get front node. The DCAS while operation will update front on retry
  151. front = m_Front.load(memory_order_acquire);
  152. do
  153. {
  154. // If front idx bit 0 is set, queue back is being reloaded or queue is empty.
  155. if (front.idx & 1)
  156. return 0;
  157. // Fetch next node. If zero, node is the current backnode
  158. node = front.ptr;
  159. if (!(next = static_cast<T*>(node->next.load(memory_order_relaxed))))
  160. goto BackNodeDCAS;
  161. // On success, replace the current with the next node and return node. On fail, retry with updated front.
  162. value.ptr = next;
  163. value.idx = front.idx + 2;
  164. }
  165. while (!m_Front.compare_exchange_strong(front, value, memory_order_acquire, memory_order_relaxed));
  166. return node;
  167. BackNodeDCAS:
  168. // - filters obsolete nodes
  169. // - Exclusive access (re-entrant block)
  170. value.ptr = front.ptr;
  171. value.idx = front.idx | 1;
  172. if (!m_Front.compare_exchange_strong(front, value, memory_order_acquire, memory_order_relaxed))
  173. return 0;
  174. // - filters incomplete nodes
  175. // - check if node is back == retrigger new back
  176. value.ptr = node;
  177. if (!m_Back.compare_exchange_strong(value.ptr, 0, memory_order_acquire, memory_order_relaxed))
  178. {
  179. // Back progressed or node is incomplete, restore access and return 0
  180. m_FrontPair.idx.fetch_and(~1, memory_order_release);
  181. return 0;
  182. }
  183. // Success, back == front node, back was set to zero above and index / access is restored by producers, so we return the back node.
  184. // Version check invalidates any obsolete nodes in still in process in other threads.
  185. return node;
  186. }
  187. }
  188. private:
  189. typedef struct
  190. {
  191. T* ptr;
  192. intptr_t idx;
  193. } SequencedFrontPtr;
  194. typedef struct
  195. {
  196. atomic<T*> ptr;
  197. atomic<intptr_t> idx;
  198. } FrontPair;
  199. // Space out atomic members to individual cache lines. Required for native LLSC operations on some architectures, others to avoid false sharing
  200. char _cachelineSpacer0[PLATFORM_CACHE_LINE_SIZE];
  201. union
  202. {
  203. atomic<intptr_t> m_FrontIntPtr;
  204. FrontPair m_FrontPair;
  205. atomic<SequencedFrontPtr> m_Front;
  206. };
  207. char _cachelineSpacer1[PLATFORM_CACHE_LINE_SIZE - sizeof(SequencedFrontPtr)];
  208. atomic<T*> m_Back;
  209. char _cachelineSpacer2[PLATFORM_CACHE_LINE_SIZE - sizeof(T*)];
  210. // FrontPair is atomic reflections of the SequencedFront fields used for CAS vs DCAS ops. They must match in size and layout.
  211. // Do note that we can not check layout (offsetof) as the template class is incomplete!
  212. static_assert(sizeof(mpmc_node_queue::m_FrontPair) == sizeof(mpmc_node_queue::m_Front), "SequencedFrontPtr and FrontPair must be of equal size");
  213. // Verify mpmc_node is base of T
  214. static_assert(std::is_base_of<baselib::mpmc_node, T>::value, "Node class/struct used with baselib::mpmc_node_queue must derive from baselib::mpmc_node.");
  215. };
  216. }
  217. }