Aggregate.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. using Cysharp.Threading.Tasks.Internal;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Threading;
  5. namespace Cysharp.Threading.Tasks.Linq
  6. {
  7. public static partial class UniTaskAsyncEnumerable
  8. {
  9. public static UniTask<TSource> AggregateAsync<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, CancellationToken cancellationToken = default)
  10. {
  11. Error.ThrowArgumentNullException(source, nameof(source));
  12. Error.ThrowArgumentNullException(accumulator, nameof(accumulator));
  13. return Aggregate.AggregateAsync(source, accumulator, cancellationToken);
  14. }
  15. public static UniTask<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, CancellationToken cancellationToken = default)
  16. {
  17. Error.ThrowArgumentNullException(source, nameof(source));
  18. Error.ThrowArgumentNullException(accumulator, nameof(accumulator));
  19. return Aggregate.AggregateAsync(source, seed, accumulator, cancellationToken);
  20. }
  21. public static UniTask<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken = default)
  22. {
  23. Error.ThrowArgumentNullException(source, nameof(source));
  24. Error.ThrowArgumentNullException(accumulator, nameof(accumulator));
  25. Error.ThrowArgumentNullException(accumulator, nameof(resultSelector));
  26. return Aggregate.AggregateAsync(source, seed, accumulator, resultSelector, cancellationToken);
  27. }
  28. public static UniTask<TSource> AggregateAwaitAsync<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, TSource, UniTask<TSource>> accumulator, CancellationToken cancellationToken = default)
  29. {
  30. Error.ThrowArgumentNullException(source, nameof(source));
  31. Error.ThrowArgumentNullException(accumulator, nameof(accumulator));
  32. return Aggregate.AggregateAwaitAsync(source, accumulator, cancellationToken);
  33. }
  34. public static UniTask<TAccumulate> AggregateAwaitAsync<TSource, TAccumulate>(this IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, UniTask<TAccumulate>> accumulator, CancellationToken cancellationToken = default)
  35. {
  36. Error.ThrowArgumentNullException(source, nameof(source));
  37. Error.ThrowArgumentNullException(accumulator, nameof(accumulator));
  38. return Aggregate.AggregateAwaitAsync(source, seed, accumulator, cancellationToken);
  39. }
  40. public static UniTask<TResult> AggregateAwaitAsync<TSource, TAccumulate, TResult>(this IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, UniTask<TAccumulate>> accumulator, Func<TAccumulate, UniTask<TResult>> resultSelector, CancellationToken cancellationToken = default)
  41. {
  42. Error.ThrowArgumentNullException(source, nameof(source));
  43. Error.ThrowArgumentNullException(accumulator, nameof(accumulator));
  44. Error.ThrowArgumentNullException(accumulator, nameof(resultSelector));
  45. return Aggregate.AggregateAwaitAsync(source, seed, accumulator, resultSelector, cancellationToken);
  46. }
  47. public static UniTask<TSource> AggregateAwaitWithCancellationAsync<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, UniTask<TSource>> accumulator, CancellationToken cancellationToken = default)
  48. {
  49. Error.ThrowArgumentNullException(source, nameof(source));
  50. Error.ThrowArgumentNullException(accumulator, nameof(accumulator));
  51. return Aggregate.AggregateAwaitWithCancellationAsync(source, accumulator, cancellationToken);
  52. }
  53. public static UniTask<TAccumulate> AggregateAwaitWithCancellationAsync<TSource, TAccumulate>(this IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, UniTask<TAccumulate>> accumulator, CancellationToken cancellationToken = default)
  54. {
  55. Error.ThrowArgumentNullException(source, nameof(source));
  56. Error.ThrowArgumentNullException(accumulator, nameof(accumulator));
  57. return Aggregate.AggregateAwaitWithCancellationAsync(source, seed, accumulator, cancellationToken);
  58. }
  59. public static UniTask<TResult> AggregateAwaitWithCancellationAsync<TSource, TAccumulate, TResult>(this IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, UniTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, UniTask<TResult>> resultSelector, CancellationToken cancellationToken = default)
  60. {
  61. Error.ThrowArgumentNullException(source, nameof(source));
  62. Error.ThrowArgumentNullException(accumulator, nameof(accumulator));
  63. Error.ThrowArgumentNullException(accumulator, nameof(resultSelector));
  64. return Aggregate.AggregateAwaitWithCancellationAsync(source, seed, accumulator, resultSelector, cancellationToken);
  65. }
  66. }
  67. internal static class Aggregate
  68. {
  69. internal static async UniTask<TSource> AggregateAsync<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, CancellationToken cancellationToken)
  70. {
  71. var e = source.GetAsyncEnumerator(cancellationToken);
  72. try
  73. {
  74. TSource value;
  75. if (await e.MoveNextAsync())
  76. {
  77. value = e.Current;
  78. }
  79. else
  80. {
  81. throw Error.NoElements();
  82. }
  83. while (await e.MoveNextAsync())
  84. {
  85. value = accumulator(value, e.Current);
  86. }
  87. return value;
  88. }
  89. finally
  90. {
  91. if (e != null)
  92. {
  93. await e.DisposeAsync();
  94. }
  95. }
  96. }
  97. internal static async UniTask<TAccumulate> AggregateAsync<TSource, TAccumulate>(IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, CancellationToken cancellationToken)
  98. {
  99. var e = source.GetAsyncEnumerator(cancellationToken);
  100. try
  101. {
  102. TAccumulate value = seed;
  103. while (await e.MoveNextAsync())
  104. {
  105. value = accumulator(value, e.Current);
  106. }
  107. return value;
  108. }
  109. finally
  110. {
  111. if (e != null)
  112. {
  113. await e.DisposeAsync();
  114. }
  115. }
  116. }
  117. internal static async UniTask<TResult> AggregateAsync<TSource, TAccumulate, TResult>(IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken)
  118. {
  119. var e = source.GetAsyncEnumerator(cancellationToken);
  120. try
  121. {
  122. TAccumulate value = seed;
  123. while (await e.MoveNextAsync())
  124. {
  125. value = accumulator(value, e.Current);
  126. }
  127. return resultSelector(value);
  128. }
  129. finally
  130. {
  131. if (e != null)
  132. {
  133. await e.DisposeAsync();
  134. }
  135. }
  136. }
  137. // with async
  138. internal static async UniTask<TSource> AggregateAwaitAsync<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, TSource, UniTask<TSource>> accumulator, CancellationToken cancellationToken)
  139. {
  140. var e = source.GetAsyncEnumerator(cancellationToken);
  141. try
  142. {
  143. TSource value;
  144. if (await e.MoveNextAsync())
  145. {
  146. value = e.Current;
  147. }
  148. else
  149. {
  150. throw Error.NoElements();
  151. }
  152. while (await e.MoveNextAsync())
  153. {
  154. value = await accumulator(value, e.Current);
  155. }
  156. return value;
  157. }
  158. finally
  159. {
  160. if (e != null)
  161. {
  162. await e.DisposeAsync();
  163. }
  164. }
  165. }
  166. internal static async UniTask<TAccumulate> AggregateAwaitAsync<TSource, TAccumulate>(IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, UniTask<TAccumulate>> accumulator, CancellationToken cancellationToken)
  167. {
  168. var e = source.GetAsyncEnumerator(cancellationToken);
  169. try
  170. {
  171. TAccumulate value = seed;
  172. while (await e.MoveNextAsync())
  173. {
  174. value = await accumulator(value, e.Current);
  175. }
  176. return value;
  177. }
  178. finally
  179. {
  180. if (e != null)
  181. {
  182. await e.DisposeAsync();
  183. }
  184. }
  185. }
  186. internal static async UniTask<TResult> AggregateAwaitAsync<TSource, TAccumulate, TResult>(IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, UniTask<TAccumulate>> accumulator, Func<TAccumulate, UniTask<TResult>> resultSelector, CancellationToken cancellationToken)
  187. {
  188. var e = source.GetAsyncEnumerator(cancellationToken);
  189. try
  190. {
  191. TAccumulate value = seed;
  192. while (await e.MoveNextAsync())
  193. {
  194. value = await accumulator(value, e.Current);
  195. }
  196. return await resultSelector(value);
  197. }
  198. finally
  199. {
  200. if (e != null)
  201. {
  202. await e.DisposeAsync();
  203. }
  204. }
  205. }
  206. // with cancellation
  207. internal static async UniTask<TSource> AggregateAwaitWithCancellationAsync<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, UniTask<TSource>> accumulator, CancellationToken cancellationToken)
  208. {
  209. var e = source.GetAsyncEnumerator(cancellationToken);
  210. try
  211. {
  212. TSource value;
  213. if (await e.MoveNextAsync())
  214. {
  215. value = e.Current;
  216. }
  217. else
  218. {
  219. throw Error.NoElements();
  220. }
  221. while (await e.MoveNextAsync())
  222. {
  223. value = await accumulator(value, e.Current, cancellationToken);
  224. }
  225. return value;
  226. }
  227. finally
  228. {
  229. if (e != null)
  230. {
  231. await e.DisposeAsync();
  232. }
  233. }
  234. }
  235. internal static async UniTask<TAccumulate> AggregateAwaitWithCancellationAsync<TSource, TAccumulate>(IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, UniTask<TAccumulate>> accumulator, CancellationToken cancellationToken)
  236. {
  237. var e = source.GetAsyncEnumerator(cancellationToken);
  238. try
  239. {
  240. TAccumulate value = seed;
  241. while (await e.MoveNextAsync())
  242. {
  243. value = await accumulator(value, e.Current, cancellationToken);
  244. }
  245. return value;
  246. }
  247. finally
  248. {
  249. if (e != null)
  250. {
  251. await e.DisposeAsync();
  252. }
  253. }
  254. }
  255. internal static async UniTask<TResult> AggregateAwaitWithCancellationAsync<TSource, TAccumulate, TResult>(IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, UniTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, UniTask<TResult>> resultSelector, CancellationToken cancellationToken)
  256. {
  257. var e = source.GetAsyncEnumerator(cancellationToken);
  258. try
  259. {
  260. TAccumulate value = seed;
  261. while (await e.MoveNextAsync())
  262. {
  263. value = await accumulator(value, e.Current, cancellationToken);
  264. }
  265. return await resultSelector(value, cancellationToken);
  266. }
  267. finally
  268. {
  269. if (e != null)
  270. {
  271. await e.DisposeAsync();
  272. }
  273. }
  274. }
  275. }
  276. }