SkipLast.cs 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. using Cysharp.Threading.Tasks.Internal;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Threading;
  5. namespace Cysharp.Threading.Tasks.Linq
  6. {
  7. public static partial class UniTaskAsyncEnumerable
  8. {
  9. public static IUniTaskAsyncEnumerable<TSource> SkipLast<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Int32 count)
  10. {
  11. Error.ThrowArgumentNullException(source, nameof(source));
  12. // non skip.
  13. if (count <= 0)
  14. {
  15. return source;
  16. }
  17. return new SkipLast<TSource>(source, count);
  18. }
  19. }
  20. internal sealed class SkipLast<TSource> : IUniTaskAsyncEnumerable<TSource>
  21. {
  22. readonly IUniTaskAsyncEnumerable<TSource> source;
  23. readonly int count;
  24. public SkipLast(IUniTaskAsyncEnumerable<TSource> source, int count)
  25. {
  26. this.source = source;
  27. this.count = count;
  28. }
  29. public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
  30. {
  31. return new _SkipLast(source, count, cancellationToken);
  32. }
  33. sealed class _SkipLast : MoveNextSource, IUniTaskAsyncEnumerator<TSource>
  34. {
  35. static readonly Action<object> MoveNextCoreDelegate = MoveNextCore;
  36. readonly IUniTaskAsyncEnumerable<TSource> source;
  37. readonly int count;
  38. CancellationToken cancellationToken;
  39. IUniTaskAsyncEnumerator<TSource> enumerator;
  40. UniTask<bool>.Awaiter awaiter;
  41. Queue<TSource> queue;
  42. bool continueNext;
  43. public _SkipLast(IUniTaskAsyncEnumerable<TSource> source, int count, CancellationToken cancellationToken)
  44. {
  45. this.source = source;
  46. this.count = count;
  47. this.cancellationToken = cancellationToken;
  48. TaskTracker.TrackActiveTask(this, 3);
  49. }
  50. public TSource Current { get; private set; }
  51. public UniTask<bool> MoveNextAsync()
  52. {
  53. cancellationToken.ThrowIfCancellationRequested();
  54. if (enumerator == null)
  55. {
  56. enumerator = source.GetAsyncEnumerator(cancellationToken);
  57. queue = new Queue<TSource>();
  58. }
  59. completionSource.Reset();
  60. SourceMoveNext();
  61. return new UniTask<bool>(this, completionSource.Version);
  62. }
  63. void SourceMoveNext()
  64. {
  65. try
  66. {
  67. LOOP:
  68. awaiter = enumerator.MoveNextAsync().GetAwaiter();
  69. if (awaiter.IsCompleted)
  70. {
  71. continueNext = true;
  72. MoveNextCore(this);
  73. if (continueNext)
  74. {
  75. continueNext = false;
  76. goto LOOP; // avoid recursive
  77. }
  78. }
  79. else
  80. {
  81. awaiter.SourceOnCompleted(MoveNextCoreDelegate, this);
  82. }
  83. }
  84. catch (Exception ex)
  85. {
  86. completionSource.TrySetException(ex);
  87. }
  88. }
  89. static void MoveNextCore(object state)
  90. {
  91. var self = (_SkipLast)state;
  92. if (self.TryGetResult(self.awaiter, out var result))
  93. {
  94. if (result)
  95. {
  96. if (self.queue.Count == self.count)
  97. {
  98. self.continueNext = false;
  99. var deq = self.queue.Dequeue();
  100. self.Current = deq;
  101. self.queue.Enqueue(self.enumerator.Current);
  102. self.completionSource.TrySetResult(true);
  103. }
  104. else
  105. {
  106. self.queue.Enqueue(self.enumerator.Current);
  107. if (!self.continueNext)
  108. {
  109. self.SourceMoveNext();
  110. }
  111. }
  112. }
  113. else
  114. {
  115. self.continueNext = false;
  116. self.completionSource.TrySetResult(false);
  117. }
  118. }
  119. else
  120. {
  121. self.continueNext = false;
  122. }
  123. }
  124. public UniTask DisposeAsync()
  125. {
  126. TaskTracker.RemoveTracking(this);
  127. if (enumerator != null)
  128. {
  129. return enumerator.DisposeAsync();
  130. }
  131. return default;
  132. }
  133. }
  134. }
  135. }