Concat.cs 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. using Cysharp.Threading.Tasks.Internal;
  2. using System;
  3. using System.Threading;
  4. namespace Cysharp.Threading.Tasks.Linq
  5. {
  6. public static partial class UniTaskAsyncEnumerable
  7. {
  8. public static IUniTaskAsyncEnumerable<TSource> Concat<TSource>(this IUniTaskAsyncEnumerable<TSource> first, IUniTaskAsyncEnumerable<TSource> second)
  9. {
  10. Error.ThrowArgumentNullException(first, nameof(first));
  11. Error.ThrowArgumentNullException(second, nameof(second));
  12. return new Concat<TSource>(first, second);
  13. }
  14. }
  15. internal sealed class Concat<TSource> : IUniTaskAsyncEnumerable<TSource>
  16. {
  17. readonly IUniTaskAsyncEnumerable<TSource> first;
  18. readonly IUniTaskAsyncEnumerable<TSource> second;
  19. public Concat(IUniTaskAsyncEnumerable<TSource> first, IUniTaskAsyncEnumerable<TSource> second)
  20. {
  21. this.first = first;
  22. this.second = second;
  23. }
  24. public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
  25. {
  26. return new _Concat(first, second, cancellationToken);
  27. }
  28. sealed class _Concat : MoveNextSource, IUniTaskAsyncEnumerator<TSource>
  29. {
  30. static readonly Action<object> MoveNextCoreDelegate = MoveNextCore;
  31. enum IteratingState
  32. {
  33. IteratingFirst,
  34. IteratingSecond,
  35. Complete
  36. }
  37. readonly IUniTaskAsyncEnumerable<TSource> first;
  38. readonly IUniTaskAsyncEnumerable<TSource> second;
  39. CancellationToken cancellationToken;
  40. IteratingState iteratingState;
  41. IUniTaskAsyncEnumerator<TSource> enumerator;
  42. UniTask<bool>.Awaiter awaiter;
  43. public _Concat(IUniTaskAsyncEnumerable<TSource> first, IUniTaskAsyncEnumerable<TSource> second, CancellationToken cancellationToken)
  44. {
  45. this.first = first;
  46. this.second = second;
  47. this.cancellationToken = cancellationToken;
  48. this.iteratingState = IteratingState.IteratingFirst;
  49. TaskTracker.TrackActiveTask(this, 3);
  50. }
  51. public TSource Current { get; private set; }
  52. public UniTask<bool> MoveNextAsync()
  53. {
  54. cancellationToken.ThrowIfCancellationRequested();
  55. if (iteratingState == IteratingState.Complete) return CompletedTasks.False;
  56. completionSource.Reset();
  57. StartIterate();
  58. return new UniTask<bool>(this, completionSource.Version);
  59. }
  60. void StartIterate()
  61. {
  62. if (enumerator == null)
  63. {
  64. if (iteratingState == IteratingState.IteratingFirst)
  65. {
  66. enumerator = first.GetAsyncEnumerator(cancellationToken);
  67. }
  68. else if (iteratingState == IteratingState.IteratingSecond)
  69. {
  70. enumerator = second.GetAsyncEnumerator(cancellationToken);
  71. }
  72. }
  73. try
  74. {
  75. awaiter = enumerator.MoveNextAsync().GetAwaiter();
  76. }
  77. catch (Exception ex)
  78. {
  79. completionSource.TrySetException(ex);
  80. return;
  81. }
  82. if (awaiter.IsCompleted)
  83. {
  84. MoveNextCoreDelegate(this);
  85. }
  86. else
  87. {
  88. awaiter.SourceOnCompleted(MoveNextCoreDelegate, this);
  89. }
  90. }
  91. static void MoveNextCore(object state)
  92. {
  93. var self = (_Concat)state;
  94. if (self.TryGetResult(self.awaiter, out var result))
  95. {
  96. if (result)
  97. {
  98. self.Current = self.enumerator.Current;
  99. self.completionSource.TrySetResult(true);
  100. }
  101. else
  102. {
  103. if (self.iteratingState == IteratingState.IteratingFirst)
  104. {
  105. self.RunSecondAfterDisposeAsync().Forget();
  106. return;
  107. }
  108. self.iteratingState = IteratingState.Complete;
  109. self.completionSource.TrySetResult(false);
  110. }
  111. }
  112. }
  113. async UniTaskVoid RunSecondAfterDisposeAsync()
  114. {
  115. try
  116. {
  117. await enumerator.DisposeAsync();
  118. enumerator = null;
  119. awaiter = default;
  120. iteratingState = IteratingState.IteratingSecond;
  121. }
  122. catch (Exception ex)
  123. {
  124. completionSource.TrySetException(ex);
  125. }
  126. StartIterate();
  127. }
  128. public UniTask DisposeAsync()
  129. {
  130. TaskTracker.RemoveTracking(this);
  131. if (enumerator != null)
  132. {
  133. return enumerator.DisposeAsync();
  134. }
  135. return default;
  136. }
  137. }
  138. }
  139. }