mpsc_node_queue.h 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. #pragma once
  2. #include "../C/Baselib_Memory.h"
  3. #include "mpsc_node.h"
  4. namespace baselib
  5. {
  6. BASELIB_CPP_INTERFACE
  7. {
  8. // In computer science, a queue is a collection in which the entities in the collection are kept in order and the principal (or only) operations on the
  9. // collection are the addition of entities to the rear terminal position, known as enqueue, and removal of entities from the front terminal position, known
  10. // as dequeue. This makes the queue a First-In-First-Out (FIFO) data structure. In a FIFO data structure, the first element added to the queue will be the
  11. // first one to be removed. This is equivalent to the requirement that once a new element is added, all elements that were added before have to be removed
  12. // before the new element can be removed. Often a peek or front operation is also entered, returning the value of the front element without dequeuing it.
  13. // A queue is an example of a linear data structure, or more abstractly a sequential collection.
  14. //
  15. // "Queue (abstract data type)", Wikipedia: The Free Encyclopedia
  16. // https://en.wikipedia.org/w/index.php?title=Queue_(abstract_data_type)&oldid=878671332
  17. //
  18. // This implementation is a lockless node queue capable of handling multiple producers and a single consumer (exclusive access)
  19. //
  20. // Node types are required to inherit the mpsc_node class. No data from the inherited class is modified/copied, so no restrictions apply.
  21. // The node memory is allocated and destroyed by the user (user owned).
  22. // Dequeued nodes may be deleted, overwritten/discarded and/or reused.
  23. //
  24. // Notes consumer threads:
  25. // Only one consumer thread will exclusively access the front node. Other consumer threads will always progress, either by failing to dequeue or
  26. // successfully dequeuing the next node once the current thread thread opens access. As opposed to the parallel consumer implementation,
  27. // this is significantly more performant as no DCAS-operations/loops are involved, but if the consumer thread with current exclusive access falls asleep
  28. // when dequeuing, no other threads will successfully dequeue until the thread wakes up.
  29. // Producer threads always progress independently.
  30. //
  31. // Notes on producer threads:
  32. // A producer thread swaps the back and writes the link information in two consecutive atomic operations. If a producer thread falls asleep after the
  33. // swap and before the link information has been written, the consumer thread(s) will not advance past this point since it doesn't have
  34. // the information yet. Therefore the consumer threads calls will yield null until that particular producer thread wakes back up.
  35. //
  36. template<typename T>
  37. class alignas(sizeof(intptr_t) * 2)mpsc_node_queue
  38. {
  39. public:
  40. // Create a new queue instance.
  41. mpsc_node_queue()
  42. {
  43. m_Front.obj = 0;
  44. m_Back.obj = 0;
  45. atomic_thread_fence(memory_order_seq_cst);
  46. }
  47. // Returns true if queue is empty.
  48. bool empty() const
  49. {
  50. return m_Back.load(memory_order_relaxed) == 0;
  51. }
  52. // Push a node to the back of the queue.
  53. void push_back(T* node)
  54. {
  55. node->next.store(0, memory_order_relaxed);
  56. if (T* prev = m_Back.exchange(node, memory_order_release))
  57. prev->next.store(node, memory_order_release);
  58. else
  59. m_Front.store(node, memory_order_release);
  60. }
  61. // Push a linked list of nodes to the back of the queue.
  62. void push_back(T* first_node, T* last_node)
  63. {
  64. last_node->next.store(0, memory_order_relaxed);
  65. if (T* prev = m_Back.exchange(last_node, memory_order_release))
  66. prev->next.store(first_node, memory_order_release);
  67. else
  68. m_Front.store(first_node, memory_order_release);
  69. }
  70. // Try to pop frontmost node of the queue.
  71. //
  72. // Note that if null is returned, there may still be push operations in progress in a producer thread.
  73. // Use the "empty" function to check if a queue is empty.
  74. //
  75. // \returns front node of the queue or null.
  76. T* try_pop_front()
  77. {
  78. T* node, *next, *expected;
  79. // acquire thread exclusive access of front node, return 0 if fail or queue is empty
  80. intptr_t front = m_FrontIntPtr.fetch_or(1, memory_order_acquire);
  81. if ((front & 1) | !(front >> 1))
  82. return 0;
  83. node = (T*)front;
  84. next = static_cast<T*>(node->next.load(memory_order_relaxed));
  85. if (!next)
  86. {
  87. // Set to zero, assuming we got the head. Exclusive access maintained as only producer can write zero.
  88. m_Front.store(0, memory_order_release);
  89. // - filters incomplete nodes
  90. // - check if node is back == retrigger new back
  91. expected = node;
  92. if (!m_Back.compare_exchange_strong(expected, 0, memory_order_acquire, memory_order_relaxed))
  93. {
  94. // Back progressed or node is incomplete, reset front ptr and return 0.
  95. m_Front.store(node, memory_order_release);
  96. return 0;
  97. }
  98. // Successfully got the back, so just return node.
  99. return node;
  100. }
  101. // Store next (clear block) and return node
  102. m_Front.store(next, memory_order_release);
  103. return node;
  104. }
  105. private:
  106. // Space out atomic members to individual cache lines. Required for native LLSC operations on some architectures, others to avoid false sharing
  107. char _cachelineSpacer0[PLATFORM_CACHE_LINE_SIZE];
  108. union
  109. {
  110. atomic<T*> m_Front;
  111. atomic<intptr_t> m_FrontIntPtr;
  112. };
  113. char _cachelineSpacer1[PLATFORM_CACHE_LINE_SIZE - sizeof(T*)];
  114. atomic<T*> m_Back;
  115. char _cachelineSpacer2[PLATFORM_CACHE_LINE_SIZE - sizeof(T*)];
  116. // Verify mpsc_node is base of T
  117. static_assert(std::is_base_of<baselib::mpsc_node, T>::value, "Node class/struct used with baselib::mpsc_node_queue must derive from baselib::mpsc_node.");
  118. };
  119. }
  120. }