ConditionVariable_FutexBased.inl.h 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. #pragma once
  2. #include "../CountdownTimer.h"
  3. #include "../../C/Baselib_SystemFutex.h"
  4. #include "../../C/Baselib_Thread.h"
  5. #if !PLATFORM_FUTEX_NATIVE_SUPPORT
  6. #error "Only use this implementation on top of a proper futex, in all other situations us ConditionVariable_SemaphoreBased.inl.h"
  7. #endif
  8. namespace baselib
  9. {
  10. BASELIB_CPP_INTERFACE
  11. {
  12. inline void ConditionVariable::Wait()
  13. {
  14. m_Data.waiters.fetch_add(1, memory_order_relaxed);
  15. m_Lock.Release();
  16. while (!m_Data.TryConsumeWakeup())
  17. {
  18. Baselib_SystemFutex_Wait(&m_Data.wakeups.obj, 0, std::numeric_limits<uint32_t>::max());
  19. }
  20. m_Lock.Acquire();
  21. }
  22. inline bool ConditionVariable::TimedWait(const timeout_ms timeoutInMilliseconds)
  23. {
  24. m_Data.waiters.fetch_add(1, memory_order_relaxed);
  25. m_Lock.Release();
  26. uint32_t timeLeft = timeoutInMilliseconds.count();
  27. auto timer = CountdownTimer::StartNew(timeoutInMilliseconds);
  28. do
  29. {
  30. Baselib_SystemFutex_Wait(&m_Data.wakeups.obj, 0, timeLeft);
  31. if (m_Data.TryConsumeWakeup())
  32. {
  33. m_Lock.Acquire();
  34. return true;
  35. }
  36. timeLeft = timer.GetTimeLeftInMilliseconds().count();
  37. }
  38. while (timeLeft);
  39. do
  40. {
  41. int32_t waiters = m_Data.waiters.load(memory_order_relaxed);
  42. while (waiters > 0)
  43. {
  44. if (m_Data.waiters.compare_exchange_weak(waiters, waiters - 1, memory_order_relaxed, memory_order_relaxed))
  45. {
  46. m_Lock.Acquire();
  47. return false;
  48. }
  49. }
  50. Baselib_Thread_YieldExecution();
  51. }
  52. while (!m_Data.TryConsumeWakeup());
  53. m_Lock.Acquire();
  54. return true;
  55. }
  56. inline void ConditionVariable::Notify(uint16_t count)
  57. {
  58. int32_t waitingThreads = m_Data.waiters.load(memory_order_acquire);
  59. do
  60. {
  61. int32_t threadsToWakeup = count < waitingThreads ? count : waitingThreads;
  62. if (threadsToWakeup == 0)
  63. {
  64. atomic_thread_fence(memory_order_release);
  65. return;
  66. }
  67. if (m_Data.waiters.compare_exchange_weak(waitingThreads, waitingThreads - threadsToWakeup, memory_order_relaxed, memory_order_relaxed))
  68. {
  69. m_Data.wakeups.fetch_add(threadsToWakeup, memory_order_release);
  70. Baselib_SystemFutex_Notify(&m_Data.wakeups.obj, threadsToWakeup, Baselib_WakeupFallbackStrategy_OneByOne);
  71. return;
  72. }
  73. }
  74. while (waitingThreads > 0);
  75. }
  76. }
  77. }