mpmc_fixed_queue.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. #pragma once
  2. #include "Atomic.h"
  3. #include "heap_allocator.h"
  4. #include "../C/Baselib_Memory.h"
  5. #include <algorithm>
  6. namespace baselib
  7. {
  8. BASELIB_CPP_INTERFACE
  9. {
  10. // In computer science, a queue is a collection in which the entities in the collection are kept in order and the principal (or only) operations on the
  11. // collection are the addition of entities to the rear terminal position, known as enqueue, and removal of entities from the front terminal position, known
  12. // as dequeue. This makes the queue a First-In-First-Out (FIFO) data structure. In a FIFO data structure, the first element added to the queue will be the
  13. // first one to be removed. This is equivalent to the requirement that once a new element is added, all elements that were added before have to be removed
  14. // before the new element can be removed. Often a peek or front operation is also entered, returning the value of the front element without dequeuing it.
  15. // A queue is an example of a linear data structure, or more abstractly a sequential collection.
  16. //
  17. // "Queue (abstract data type)", Wikipedia: The Free Encyclopedia
  18. // https://en.wikipedia.org/w/index.php?title=Queue_(abstract_data_type)&oldid=878671332
  19. //
  20. // This implementation is a fixed size queue capable of handling multiple concurrent producers and consumers
  21. //
  22. // Implementation of the queue is lockfree in the sense that one thread always progress. Either by inserting an element or failing to insert an element.
  23. // Not though, that the data structure in it self is not lock free. In theory if a thread writing an element gets pre-emptied that thread may block reads
  24. // from proceeding past that point until the writer thread wake up and complete it's operation.
  25. template<typename value_type, bool cacheline_aligned = true>
  26. class mpmc_fixed_queue
  27. {
  28. public:
  29. // Create a new queue instance capable of holding at most `capacity` number of elements.
  30. // `buffer` is an optional user defined memory block large enough to hold the queue data structure.
  31. // The size required is obtained by `buffer_size`, alignment requirements by `buffer_alignment`.
  32. // If `buffer` is not set (default), the queue will internally allocate memory using baselib heap_allocator.
  33. mpmc_fixed_queue(uint32_t capacity, void *buffer = nullptr)
  34. : m_SlotAllocator()
  35. , m_Slot(static_cast<Slot*>(buffer ? buffer : m_SlotAllocator.allocate(buffer_size(capacity))))
  36. , m_UserAllocatedSlots(buffer ? nullptr : m_Slot)
  37. , m_NumberOfSlots(capacity ? capacity : 2)
  38. , m_Capacity(capacity)
  39. , m_ReadPos(0)
  40. , m_WritePos(0)
  41. {
  42. // a zero sized queue uses two slots - the first indicating the queue is empty, the other indicating it is full.
  43. if (capacity == 0)
  44. {
  45. m_Slot[0].checksum.store(WriteableChecksum(0), baselib::memory_order_relaxed);
  46. m_Slot[1].checksum.store(ReadableChecksumPrevGen(1), baselib::memory_order_relaxed);
  47. m_WritePos = 1; // Point at the second slot which indicates a full queue
  48. }
  49. else
  50. {
  51. // fill queue with 'writable slots'
  52. for (uint32_t pos = 0; pos < capacity; ++pos)
  53. m_Slot[pos].checksum.store(WriteableChecksum(pos), baselib::memory_order_relaxed);
  54. }
  55. baselib::atomic_thread_fence(baselib::memory_order_seq_cst);
  56. }
  57. // Destroy queue, guaranteed to also destroy any elements held by the queue.
  58. //
  59. // If there are other threads currently accessing the queue behavior is undefined.
  60. ~mpmc_fixed_queue()
  61. {
  62. for (;;)
  63. {
  64. const uint32_t pos = m_ReadPos.fetch_add(1, baselib::memory_order_relaxed);
  65. Slot& slot = m_Slot[SlotIndex(pos)];
  66. if (slot.checksum.load(baselib::memory_order_acquire) != ReadableChecksum(pos))
  67. break;
  68. slot.value.~value_type();
  69. }
  70. m_SlotAllocator.deallocate(m_UserAllocatedSlots, buffer_size(static_cast<uint32_t>(m_Capacity)));
  71. baselib::atomic_thread_fence(baselib::memory_order_seq_cst);
  72. }
  73. // Try to pop front most element off the queue
  74. //
  75. // Note that if several push operations are executed in parallel, the one returning first might not have pushed a new head.
  76. // Which means that for the user it seems there is a new element in the queue, whereas for the queue the still non-present head will block the removal of any entries.
  77. //
  78. // \returns true if element was popped, false if queue was empty
  79. COMPILER_WARN_UNUSED_RESULT
  80. bool try_pop_front(value_type& value)
  81. {
  82. while (true)
  83. {
  84. // Load current position and checksum.
  85. uint32_t pos = m_ReadPos.load(baselib::memory_order_relaxed);
  86. Slot* slot = &m_Slot[SlotIndex(pos)];
  87. uint32_t checksum = slot->checksum.load(baselib::memory_order_acquire);
  88. // As long as it looks like we can read from this slot.
  89. while (checksum == ReadableChecksum(pos))
  90. {
  91. // Try to acquire it and read slot on success.
  92. if (m_ReadPos.compare_exchange_weak(pos, pos + 1, baselib::memory_order_relaxed, baselib::memory_order_relaxed))
  93. {
  94. value = std::move(slot->value);
  95. slot->value.~value_type();
  96. slot->checksum.store(WriteableChecksumNextGen(pos), baselib::memory_order_release);
  97. return true;
  98. }
  99. // Reload checksum and try again (compare_exchange already reloaded the position)
  100. else
  101. {
  102. slot = &m_Slot[SlotIndex(pos)];
  103. checksum = slot->checksum.load(baselib::memory_order_acquire);
  104. }
  105. }
  106. // Is queue empty?
  107. if (checksum == WriteableChecksum(pos))
  108. return false;
  109. }
  110. }
  111. // Try to append a new element to the end of the queue.
  112. //
  113. // Note that if several pop operations are executed in parallel, the one returning first might not have popped the head.
  114. // Which means that for the user it seems there is a new free slot in the queue, whereas for the queue the still present head will block the addition of new entries.
  115. //
  116. // \returns true if element was appended, false if queue was full.
  117. template<class ... Args>
  118. COMPILER_WARN_UNUSED_RESULT
  119. bool try_emplace_back(Args&& ... args)
  120. {
  121. while (true)
  122. {
  123. // Load current position and checksum.
  124. uint32_t pos = m_WritePos.load(baselib::memory_order_relaxed);
  125. Slot* slot = &m_Slot[SlotIndex(pos)];
  126. uint32_t checksum = slot->checksum.load(baselib::memory_order_acquire);
  127. // As long as it looks like we can write to this slot.
  128. while (checksum == WriteableChecksum(pos))
  129. {
  130. // Try to acquire it and write slot on success.
  131. if (m_WritePos.compare_exchange_weak(pos, pos + 1, baselib::memory_order_relaxed, baselib::memory_order_relaxed))
  132. {
  133. new(&slot->value) value_type(std::forward<Args>(args)...);
  134. slot->checksum.store(ReadableChecksum(pos), baselib::memory_order_release);
  135. return true;
  136. }
  137. // Reload checksum and try again (compare_exchange already reloaded the position)
  138. else
  139. {
  140. slot = &m_Slot[SlotIndex(pos)];
  141. checksum = slot->checksum.load(baselib::memory_order_acquire);
  142. }
  143. }
  144. // Is queue full?
  145. if (checksum == ReadableChecksumPrevGen(pos))
  146. return false;
  147. }
  148. }
  149. // Try to push an element to the end of the queue.
  150. //
  151. // Note that if several pop operations are executed in parallel, the one returning first might not have popped the head.
  152. // Which means that for the user it seems there is a new free slot in the queue, whereas for the queue the still present head will block the addition of new entries.
  153. //
  154. // \returns true if element was pushed, false if queue was full.
  155. COMPILER_WARN_UNUSED_RESULT
  156. bool try_push_back(const value_type& value)
  157. {
  158. return try_emplace_back(value);
  159. }
  160. // Try to push an element to the end of the queue.
  161. //
  162. // Note that if several pop operations are executed in parallel, the one returning first might not have popped the head.
  163. // Which means that for the user it seems there is a new free slot in the queue, whereas for the queue the still present head will block the addition of new entries.
  164. //
  165. // \returns true if element was pushed, false if queue was full.
  166. COMPILER_WARN_UNUSED_RESULT
  167. bool try_push_back(value_type&& value)
  168. {
  169. return try_emplace_back(std::forward<value_type>(value));
  170. }
  171. // \returns the number of elements that can fit in the queue.
  172. size_t capacity() const
  173. {
  174. return m_Capacity;
  175. }
  176. // Calculate the size in bytes of an memory buffer required to hold `capacity` number of elements.
  177. //
  178. // \returns Buffer size in bytes.
  179. static constexpr size_t buffer_size(uint32_t capacity)
  180. {
  181. return sizeof(Slot) * (capacity ? capacity : 2);
  182. }
  183. // Calculate the required alignment for a memory buffer containing `value_type` elements.
  184. //
  185. // \returns Alignment requirement
  186. static constexpr size_t buffer_alignment()
  187. {
  188. return SlotAlignment;
  189. }
  190. private:
  191. static constexpr uint32_t MinTypeAlignment = alignof(value_type) > sizeof(void*) ? alignof(value_type) : sizeof(void*);
  192. static constexpr uint32_t SlotAlignment = cacheline_aligned && PLATFORM_CACHE_LINE_SIZE > MinTypeAlignment ? PLATFORM_CACHE_LINE_SIZE : MinTypeAlignment;
  193. static constexpr uint32_t ReadableBit = (uint32_t)1 << 31;
  194. static constexpr uint32_t WritableMask = ~ReadableBit;
  195. static constexpr uint32_t WriteableChecksum(uint32_t pos) { return pos & WritableMask; }
  196. static constexpr uint32_t ReadableChecksum(uint32_t pos) { return pos | ReadableBit; }
  197. constexpr uint32_t WriteableChecksumNextGen(uint32_t pos) const { return (pos + m_NumberOfSlots) & WritableMask; }
  198. constexpr uint32_t ReadableChecksumPrevGen(uint32_t pos) const { return (pos - m_NumberOfSlots) | ReadableBit; }
  199. constexpr uint32_t SlotIndex(uint32_t pos) const { return pos % m_NumberOfSlots; }
  200. const baselib::heap_allocator<SlotAlignment> m_SlotAllocator;
  201. struct alignas(SlotAlignment) Slot
  202. {
  203. value_type value;
  204. baselib::atomic<uint32_t> checksum;
  205. };
  206. Slot *const m_Slot;
  207. void *const m_UserAllocatedSlots;
  208. // benchmarks show using uint32_t gives ~3x perf boost on 64bit platforms compared to size_t (uint64_t)
  209. const uint32_t m_NumberOfSlots;
  210. const size_t m_Capacity;
  211. alignas(PLATFORM_CACHE_LINE_SIZE) baselib::atomic<uint32_t> m_ReadPos;
  212. alignas(PLATFORM_CACHE_LINE_SIZE) baselib::atomic<uint32_t> m_WritePos;
  213. };
  214. }
  215. }