Create.cs 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. using Cysharp.Threading.Tasks.Internal;
  2. using System;
  3. using System.Threading;
  4. namespace Cysharp.Threading.Tasks.Linq
  5. {
  6. public static partial class UniTaskAsyncEnumerable
  7. {
  8. public static IUniTaskAsyncEnumerable<T> Create<T>(Func<IAsyncWriter<T>, CancellationToken, UniTask> create)
  9. {
  10. Error.ThrowArgumentNullException(create, nameof(create));
  11. return new Create<T>(create);
  12. }
  13. }
  14. public interface IAsyncWriter<T>
  15. {
  16. UniTask YieldAsync(T value);
  17. }
  18. internal sealed class Create<T> : IUniTaskAsyncEnumerable<T>
  19. {
  20. readonly Func<IAsyncWriter<T>, CancellationToken, UniTask> create;
  21. public Create(Func<IAsyncWriter<T>, CancellationToken, UniTask> create)
  22. {
  23. this.create = create;
  24. }
  25. public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
  26. {
  27. return new _Create(create, cancellationToken);
  28. }
  29. sealed class _Create : MoveNextSource, IUniTaskAsyncEnumerator<T>
  30. {
  31. readonly Func<IAsyncWriter<T>, CancellationToken, UniTask> create;
  32. readonly CancellationToken cancellationToken;
  33. int state = -1;
  34. AsyncWriter writer;
  35. public _Create(Func<IAsyncWriter<T>, CancellationToken, UniTask> create, CancellationToken cancellationToken)
  36. {
  37. this.create = create;
  38. this.cancellationToken = cancellationToken;
  39. TaskTracker.TrackActiveTask(this, 3);
  40. }
  41. public T Current { get; private set; }
  42. public UniTask DisposeAsync()
  43. {
  44. TaskTracker.RemoveTracking(this);
  45. return default;
  46. }
  47. public UniTask<bool> MoveNextAsync()
  48. {
  49. if (state == -2) return default;
  50. completionSource.Reset();
  51. MoveNext();
  52. return new UniTask<bool>(this, completionSource.Version);
  53. }
  54. void MoveNext()
  55. {
  56. try
  57. {
  58. switch (state)
  59. {
  60. case -1: // init
  61. {
  62. writer = new AsyncWriter(this);
  63. RunWriterTask(create(writer, cancellationToken)).Forget();
  64. if (Volatile.Read(ref state) == -2)
  65. {
  66. return; // complete synchronously
  67. }
  68. state = 0; // wait YieldAsync, it set TrySetResult(true)
  69. return;
  70. }
  71. case 0:
  72. writer.SignalWriter();
  73. return;
  74. default:
  75. goto DONE;
  76. }
  77. }
  78. catch (Exception ex)
  79. {
  80. state = -2;
  81. completionSource.TrySetException(ex);
  82. return;
  83. }
  84. DONE:
  85. state = -2;
  86. completionSource.TrySetResult(false);
  87. return;
  88. }
  89. async UniTaskVoid RunWriterTask(UniTask task)
  90. {
  91. try
  92. {
  93. await task;
  94. goto DONE;
  95. }
  96. catch (Exception ex)
  97. {
  98. Volatile.Write(ref state, -2);
  99. completionSource.TrySetException(ex);
  100. return;
  101. }
  102. DONE:
  103. Volatile.Write(ref state, -2);
  104. completionSource.TrySetResult(false);
  105. }
  106. public void SetResult(T value)
  107. {
  108. Current = value;
  109. completionSource.TrySetResult(true);
  110. }
  111. }
  112. sealed class AsyncWriter : IUniTaskSource, IAsyncWriter<T>
  113. {
  114. readonly _Create enumerator;
  115. UniTaskCompletionSourceCore<AsyncUnit> core;
  116. public AsyncWriter(_Create enumerator)
  117. {
  118. this.enumerator = enumerator;
  119. }
  120. public void GetResult(short token)
  121. {
  122. core.GetResult(token);
  123. }
  124. public UniTaskStatus GetStatus(short token)
  125. {
  126. return core.GetStatus(token);
  127. }
  128. public UniTaskStatus UnsafeGetStatus()
  129. {
  130. return core.UnsafeGetStatus();
  131. }
  132. public void OnCompleted(Action<object> continuation, object state, short token)
  133. {
  134. core.OnCompleted(continuation, state, token);
  135. }
  136. public UniTask YieldAsync(T value)
  137. {
  138. core.Reset();
  139. enumerator.SetResult(value);
  140. return new UniTask(this, core.Version);
  141. }
  142. public void SignalWriter()
  143. {
  144. core.TrySetResult(AsyncUnit.Default);
  145. }
  146. }
  147. }
  148. }