SelectMany.cs 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892
  1. using Cysharp.Threading.Tasks.Internal;
  2. using System;
  3. using System.Threading;
  4. namespace Cysharp.Threading.Tasks.Linq
  5. {
  6. public static partial class UniTaskAsyncEnumerable
  7. {
  8. public static IUniTaskAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, IUniTaskAsyncEnumerable<TResult>> selector)
  9. {
  10. Error.ThrowArgumentNullException(source, nameof(source));
  11. Error.ThrowArgumentNullException(selector, nameof(selector));
  12. return new SelectMany<TSource, TResult, TResult>(source, selector, (x, y) => y);
  13. }
  14. public static IUniTaskAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, Int32, IUniTaskAsyncEnumerable<TResult>> selector)
  15. {
  16. Error.ThrowArgumentNullException(source, nameof(source));
  17. Error.ThrowArgumentNullException(selector, nameof(selector));
  18. return new SelectMany<TSource, TResult, TResult>(source, selector, (x, y) => y);
  19. }
  20. public static IUniTaskAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, IUniTaskAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  21. {
  22. Error.ThrowArgumentNullException(source, nameof(source));
  23. Error.ThrowArgumentNullException(collectionSelector, nameof(collectionSelector));
  24. return new SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  25. }
  26. public static IUniTaskAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, Int32, IUniTaskAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
  27. {
  28. Error.ThrowArgumentNullException(source, nameof(source));
  29. Error.ThrowArgumentNullException(collectionSelector, nameof(collectionSelector));
  30. return new SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  31. }
  32. public static IUniTaskAsyncEnumerable<TResult> SelectManyAwait<TSource, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask<IUniTaskAsyncEnumerable<TResult>>> selector)
  33. {
  34. Error.ThrowArgumentNullException(source, nameof(source));
  35. Error.ThrowArgumentNullException(selector, nameof(selector));
  36. return new SelectManyAwait<TSource, TResult, TResult>(source, selector, (x, y) => UniTask.FromResult(y));
  37. }
  38. public static IUniTaskAsyncEnumerable<TResult> SelectManyAwait<TSource, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, Int32, UniTask<IUniTaskAsyncEnumerable<TResult>>> selector)
  39. {
  40. Error.ThrowArgumentNullException(source, nameof(source));
  41. Error.ThrowArgumentNullException(selector, nameof(selector));
  42. return new SelectManyAwait<TSource, TResult, TResult>(source, selector, (x, y) => UniTask.FromResult(y));
  43. }
  44. public static IUniTaskAsyncEnumerable<TResult> SelectManyAwait<TSource, TCollection, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask<IUniTaskAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, UniTask<TResult>> resultSelector)
  45. {
  46. Error.ThrowArgumentNullException(source, nameof(source));
  47. Error.ThrowArgumentNullException(collectionSelector, nameof(collectionSelector));
  48. return new SelectManyAwait<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  49. }
  50. public static IUniTaskAsyncEnumerable<TResult> SelectManyAwait<TSource, TCollection, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, Int32, UniTask<IUniTaskAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, UniTask<TResult>> resultSelector)
  51. {
  52. Error.ThrowArgumentNullException(source, nameof(source));
  53. Error.ThrowArgumentNullException(collectionSelector, nameof(collectionSelector));
  54. return new SelectManyAwait<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  55. }
  56. public static IUniTaskAsyncEnumerable<TResult> SelectManyAwaitWithCancellation<TSource, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TResult>>> selector)
  57. {
  58. Error.ThrowArgumentNullException(source, nameof(source));
  59. Error.ThrowArgumentNullException(selector, nameof(selector));
  60. return new SelectManyAwaitWithCancellation<TSource, TResult, TResult>(source, selector, (x, y, c) => UniTask.FromResult(y));
  61. }
  62. public static IUniTaskAsyncEnumerable<TResult> SelectManyAwaitWithCancellation<TSource, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, Int32, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TResult>>> selector)
  63. {
  64. Error.ThrowArgumentNullException(source, nameof(source));
  65. Error.ThrowArgumentNullException(selector, nameof(selector));
  66. return new SelectManyAwaitWithCancellation<TSource, TResult, TResult>(source, selector, (x, y, c) => UniTask.FromResult(y));
  67. }
  68. public static IUniTaskAsyncEnumerable<TResult> SelectManyAwaitWithCancellation<TSource, TCollection, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, UniTask<TResult>> resultSelector)
  69. {
  70. Error.ThrowArgumentNullException(source, nameof(source));
  71. Error.ThrowArgumentNullException(collectionSelector, nameof(collectionSelector));
  72. return new SelectManyAwaitWithCancellation<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  73. }
  74. public static IUniTaskAsyncEnumerable<TResult> SelectManyAwaitWithCancellation<TSource, TCollection, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, Int32, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, UniTask<TResult>> resultSelector)
  75. {
  76. Error.ThrowArgumentNullException(source, nameof(source));
  77. Error.ThrowArgumentNullException(collectionSelector, nameof(collectionSelector));
  78. return new SelectManyAwaitWithCancellation<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
  79. }
  80. }
  81. internal sealed class SelectMany<TSource, TCollection, TResult> : IUniTaskAsyncEnumerable<TResult>
  82. {
  83. readonly IUniTaskAsyncEnumerable<TSource> source;
  84. readonly Func<TSource, IUniTaskAsyncEnumerable<TCollection>> selector1;
  85. readonly Func<TSource, int, IUniTaskAsyncEnumerable<TCollection>> selector2;
  86. readonly Func<TSource, TCollection, TResult> resultSelector;
  87. public SelectMany(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, IUniTaskAsyncEnumerable<TCollection>> selector, Func<TSource, TCollection, TResult> resultSelector)
  88. {
  89. this.source = source;
  90. this.selector1 = selector;
  91. this.selector2 = null;
  92. this.resultSelector = resultSelector;
  93. }
  94. public SelectMany(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, int, IUniTaskAsyncEnumerable<TCollection>> selector, Func<TSource, TCollection, TResult> resultSelector)
  95. {
  96. this.source = source;
  97. this.selector1 = null;
  98. this.selector2 = selector;
  99. this.resultSelector = resultSelector;
  100. }
  101. public IUniTaskAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken = default)
  102. {
  103. return new _SelectMany(source, selector1, selector2, resultSelector, cancellationToken);
  104. }
  105. sealed class _SelectMany : MoveNextSource, IUniTaskAsyncEnumerator<TResult>
  106. {
  107. static readonly Action<object> sourceMoveNextCoreDelegate = SourceMoveNextCore;
  108. static readonly Action<object> selectedSourceMoveNextCoreDelegate = SeletedSourceMoveNextCore;
  109. static readonly Action<object> selectedEnumeratorDisposeAsyncCoreDelegate = SelectedEnumeratorDisposeAsyncCore;
  110. readonly IUniTaskAsyncEnumerable<TSource> source;
  111. readonly Func<TSource, IUniTaskAsyncEnumerable<TCollection>> selector1;
  112. readonly Func<TSource, int, IUniTaskAsyncEnumerable<TCollection>> selector2;
  113. readonly Func<TSource, TCollection, TResult> resultSelector;
  114. CancellationToken cancellationToken;
  115. TSource sourceCurrent;
  116. int sourceIndex;
  117. IUniTaskAsyncEnumerator<TSource> sourceEnumerator;
  118. IUniTaskAsyncEnumerator<TCollection> selectedEnumerator;
  119. UniTask<bool>.Awaiter sourceAwaiter;
  120. UniTask<bool>.Awaiter selectedAwaiter;
  121. UniTask.Awaiter selectedDisposeAsyncAwaiter;
  122. public _SelectMany(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, IUniTaskAsyncEnumerable<TCollection>> selector1, Func<TSource, int, IUniTaskAsyncEnumerable<TCollection>> selector2, Func<TSource, TCollection, TResult> resultSelector, CancellationToken cancellationToken)
  123. {
  124. this.source = source;
  125. this.selector1 = selector1;
  126. this.selector2 = selector2;
  127. this.resultSelector = resultSelector;
  128. this.cancellationToken = cancellationToken;
  129. TaskTracker.TrackActiveTask(this, 3);
  130. }
  131. public TResult Current { get; private set; }
  132. public UniTask<bool> MoveNextAsync()
  133. {
  134. completionSource.Reset();
  135. // iterate selected field
  136. if (selectedEnumerator != null)
  137. {
  138. MoveNextSelected();
  139. }
  140. else
  141. {
  142. // iterate source field
  143. if (sourceEnumerator == null)
  144. {
  145. sourceEnumerator = source.GetAsyncEnumerator(cancellationToken);
  146. }
  147. MoveNextSource();
  148. }
  149. return new UniTask<bool>(this, completionSource.Version);
  150. }
  151. void MoveNextSource()
  152. {
  153. try
  154. {
  155. sourceAwaiter = sourceEnumerator.MoveNextAsync().GetAwaiter();
  156. }
  157. catch (Exception ex)
  158. {
  159. completionSource.TrySetException(ex);
  160. return;
  161. }
  162. if (sourceAwaiter.IsCompleted)
  163. {
  164. SourceMoveNextCore(this);
  165. }
  166. else
  167. {
  168. sourceAwaiter.SourceOnCompleted(sourceMoveNextCoreDelegate, this);
  169. }
  170. }
  171. void MoveNextSelected()
  172. {
  173. try
  174. {
  175. selectedAwaiter = selectedEnumerator.MoveNextAsync().GetAwaiter();
  176. }
  177. catch (Exception ex)
  178. {
  179. completionSource.TrySetException(ex);
  180. return;
  181. }
  182. if (selectedAwaiter.IsCompleted)
  183. {
  184. SeletedSourceMoveNextCore(this);
  185. }
  186. else
  187. {
  188. selectedAwaiter.SourceOnCompleted(selectedSourceMoveNextCoreDelegate, this);
  189. }
  190. }
  191. static void SourceMoveNextCore(object state)
  192. {
  193. var self = (_SelectMany)state;
  194. if (self.TryGetResult(self.sourceAwaiter, out var result))
  195. {
  196. if (result)
  197. {
  198. try
  199. {
  200. self.sourceCurrent = self.sourceEnumerator.Current;
  201. if (self.selector1 != null)
  202. {
  203. self.selectedEnumerator = self.selector1(self.sourceCurrent).GetAsyncEnumerator(self.cancellationToken);
  204. }
  205. else
  206. {
  207. self.selectedEnumerator = self.selector2(self.sourceCurrent, checked(self.sourceIndex++)).GetAsyncEnumerator(self.cancellationToken);
  208. }
  209. }
  210. catch (Exception ex)
  211. {
  212. self.completionSource.TrySetException(ex);
  213. return;
  214. }
  215. self.MoveNextSelected(); // iterated selected source.
  216. }
  217. else
  218. {
  219. self.completionSource.TrySetResult(false);
  220. }
  221. }
  222. }
  223. static void SeletedSourceMoveNextCore(object state)
  224. {
  225. var self = (_SelectMany)state;
  226. if (self.TryGetResult(self.selectedAwaiter, out var result))
  227. {
  228. if (result)
  229. {
  230. try
  231. {
  232. self.Current = self.resultSelector(self.sourceCurrent, self.selectedEnumerator.Current);
  233. }
  234. catch (Exception ex)
  235. {
  236. self.completionSource.TrySetException(ex);
  237. return;
  238. }
  239. self.completionSource.TrySetResult(true);
  240. }
  241. else
  242. {
  243. // dispose selected source and try iterate source.
  244. try
  245. {
  246. self.selectedDisposeAsyncAwaiter = self.selectedEnumerator.DisposeAsync().GetAwaiter();
  247. }
  248. catch (Exception ex)
  249. {
  250. self.completionSource.TrySetException(ex);
  251. return;
  252. }
  253. if (self.selectedDisposeAsyncAwaiter.IsCompleted)
  254. {
  255. SelectedEnumeratorDisposeAsyncCore(self);
  256. }
  257. else
  258. {
  259. self.selectedDisposeAsyncAwaiter.SourceOnCompleted(selectedEnumeratorDisposeAsyncCoreDelegate, self);
  260. }
  261. }
  262. }
  263. }
  264. static void SelectedEnumeratorDisposeAsyncCore(object state)
  265. {
  266. var self = (_SelectMany)state;
  267. if (self.TryGetResult(self.selectedDisposeAsyncAwaiter))
  268. {
  269. self.selectedEnumerator = null;
  270. self.selectedAwaiter = default;
  271. self.MoveNextSource(); // iterate next source
  272. }
  273. }
  274. public async UniTask DisposeAsync()
  275. {
  276. TaskTracker.RemoveTracking(this);
  277. if (selectedEnumerator != null)
  278. {
  279. await selectedEnumerator.DisposeAsync();
  280. }
  281. if (sourceEnumerator != null)
  282. {
  283. await sourceEnumerator.DisposeAsync();
  284. }
  285. }
  286. }
  287. }
  288. internal sealed class SelectManyAwait<TSource, TCollection, TResult> : IUniTaskAsyncEnumerable<TResult>
  289. {
  290. readonly IUniTaskAsyncEnumerable<TSource> source;
  291. readonly Func<TSource, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector1;
  292. readonly Func<TSource, int, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector2;
  293. readonly Func<TSource, TCollection, UniTask<TResult>> resultSelector;
  294. public SelectManyAwait(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector, Func<TSource, TCollection, UniTask<TResult>> resultSelector)
  295. {
  296. this.source = source;
  297. this.selector1 = selector;
  298. this.selector2 = null;
  299. this.resultSelector = resultSelector;
  300. }
  301. public SelectManyAwait(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, int, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector, Func<TSource, TCollection, UniTask<TResult>> resultSelector)
  302. {
  303. this.source = source;
  304. this.selector1 = null;
  305. this.selector2 = selector;
  306. this.resultSelector = resultSelector;
  307. }
  308. public IUniTaskAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken = default)
  309. {
  310. return new _SelectManyAwait(source, selector1, selector2, resultSelector, cancellationToken);
  311. }
  312. sealed class _SelectManyAwait : MoveNextSource, IUniTaskAsyncEnumerator<TResult>
  313. {
  314. static readonly Action<object> sourceMoveNextCoreDelegate = SourceMoveNextCore;
  315. static readonly Action<object> selectedSourceMoveNextCoreDelegate = SeletedSourceMoveNextCore;
  316. static readonly Action<object> selectedEnumeratorDisposeAsyncCoreDelegate = SelectedEnumeratorDisposeAsyncCore;
  317. static readonly Action<object> selectorAwaitCoreDelegate = SelectorAwaitCore;
  318. static readonly Action<object> resultSelectorAwaitCoreDelegate = ResultSelectorAwaitCore;
  319. readonly IUniTaskAsyncEnumerable<TSource> source;
  320. readonly Func<TSource, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector1;
  321. readonly Func<TSource, int, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector2;
  322. readonly Func<TSource, TCollection, UniTask<TResult>> resultSelector;
  323. CancellationToken cancellationToken;
  324. TSource sourceCurrent;
  325. int sourceIndex;
  326. IUniTaskAsyncEnumerator<TSource> sourceEnumerator;
  327. IUniTaskAsyncEnumerator<TCollection> selectedEnumerator;
  328. UniTask<bool>.Awaiter sourceAwaiter;
  329. UniTask<bool>.Awaiter selectedAwaiter;
  330. UniTask.Awaiter selectedDisposeAsyncAwaiter;
  331. // await additional
  332. UniTask<IUniTaskAsyncEnumerable<TCollection>>.Awaiter collectionSelectorAwaiter;
  333. UniTask<TResult>.Awaiter resultSelectorAwaiter;
  334. public _SelectManyAwait(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector1, Func<TSource, int, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector2, Func<TSource, TCollection, UniTask<TResult>> resultSelector, CancellationToken cancellationToken)
  335. {
  336. this.source = source;
  337. this.selector1 = selector1;
  338. this.selector2 = selector2;
  339. this.resultSelector = resultSelector;
  340. this.cancellationToken = cancellationToken;
  341. TaskTracker.TrackActiveTask(this, 3);
  342. }
  343. public TResult Current { get; private set; }
  344. public UniTask<bool> MoveNextAsync()
  345. {
  346. completionSource.Reset();
  347. // iterate selected field
  348. if (selectedEnumerator != null)
  349. {
  350. MoveNextSelected();
  351. }
  352. else
  353. {
  354. // iterate source field
  355. if (sourceEnumerator == null)
  356. {
  357. sourceEnumerator = source.GetAsyncEnumerator(cancellationToken);
  358. }
  359. MoveNextSource();
  360. }
  361. return new UniTask<bool>(this, completionSource.Version);
  362. }
  363. void MoveNextSource()
  364. {
  365. try
  366. {
  367. sourceAwaiter = sourceEnumerator.MoveNextAsync().GetAwaiter();
  368. }
  369. catch (Exception ex)
  370. {
  371. completionSource.TrySetException(ex);
  372. return;
  373. }
  374. if (sourceAwaiter.IsCompleted)
  375. {
  376. SourceMoveNextCore(this);
  377. }
  378. else
  379. {
  380. sourceAwaiter.SourceOnCompleted(sourceMoveNextCoreDelegate, this);
  381. }
  382. }
  383. void MoveNextSelected()
  384. {
  385. try
  386. {
  387. selectedAwaiter = selectedEnumerator.MoveNextAsync().GetAwaiter();
  388. }
  389. catch (Exception ex)
  390. {
  391. completionSource.TrySetException(ex);
  392. return;
  393. }
  394. if (selectedAwaiter.IsCompleted)
  395. {
  396. SeletedSourceMoveNextCore(this);
  397. }
  398. else
  399. {
  400. selectedAwaiter.SourceOnCompleted(selectedSourceMoveNextCoreDelegate, this);
  401. }
  402. }
  403. static void SourceMoveNextCore(object state)
  404. {
  405. var self = (_SelectManyAwait)state;
  406. if (self.TryGetResult(self.sourceAwaiter, out var result))
  407. {
  408. if (result)
  409. {
  410. try
  411. {
  412. self.sourceCurrent = self.sourceEnumerator.Current;
  413. if (self.selector1 != null)
  414. {
  415. self.collectionSelectorAwaiter = self.selector1(self.sourceCurrent).GetAwaiter();
  416. }
  417. else
  418. {
  419. self.collectionSelectorAwaiter = self.selector2(self.sourceCurrent, checked(self.sourceIndex++)).GetAwaiter();
  420. }
  421. if (self.collectionSelectorAwaiter.IsCompleted)
  422. {
  423. SelectorAwaitCore(self);
  424. }
  425. else
  426. {
  427. self.collectionSelectorAwaiter.SourceOnCompleted(selectorAwaitCoreDelegate, self);
  428. }
  429. }
  430. catch (Exception ex)
  431. {
  432. self.completionSource.TrySetException(ex);
  433. return;
  434. }
  435. }
  436. else
  437. {
  438. self.completionSource.TrySetResult(false);
  439. }
  440. }
  441. }
  442. static void SeletedSourceMoveNextCore(object state)
  443. {
  444. var self = (_SelectManyAwait)state;
  445. if (self.TryGetResult(self.selectedAwaiter, out var result))
  446. {
  447. if (result)
  448. {
  449. try
  450. {
  451. self.resultSelectorAwaiter = self.resultSelector(self.sourceCurrent, self.selectedEnumerator.Current).GetAwaiter();
  452. if (self.resultSelectorAwaiter.IsCompleted)
  453. {
  454. ResultSelectorAwaitCore(self);
  455. }
  456. else
  457. {
  458. self.resultSelectorAwaiter.SourceOnCompleted(resultSelectorAwaitCoreDelegate, self);
  459. }
  460. }
  461. catch (Exception ex)
  462. {
  463. self.completionSource.TrySetException(ex);
  464. return;
  465. }
  466. }
  467. else
  468. {
  469. // dispose selected source and try iterate source.
  470. try
  471. {
  472. self.selectedDisposeAsyncAwaiter = self.selectedEnumerator.DisposeAsync().GetAwaiter();
  473. }
  474. catch (Exception ex)
  475. {
  476. self.completionSource.TrySetException(ex);
  477. return;
  478. }
  479. if (self.selectedDisposeAsyncAwaiter.IsCompleted)
  480. {
  481. SelectedEnumeratorDisposeAsyncCore(self);
  482. }
  483. else
  484. {
  485. self.selectedDisposeAsyncAwaiter.SourceOnCompleted(selectedEnumeratorDisposeAsyncCoreDelegate, self);
  486. }
  487. }
  488. }
  489. }
  490. static void SelectedEnumeratorDisposeAsyncCore(object state)
  491. {
  492. var self = (_SelectManyAwait)state;
  493. if (self.TryGetResult(self.selectedDisposeAsyncAwaiter))
  494. {
  495. self.selectedEnumerator = null;
  496. self.selectedAwaiter = default;
  497. self.MoveNextSource(); // iterate next source
  498. }
  499. }
  500. static void SelectorAwaitCore(object state)
  501. {
  502. var self = (_SelectManyAwait)state;
  503. if (self.TryGetResult(self.collectionSelectorAwaiter, out var result))
  504. {
  505. self.selectedEnumerator = result.GetAsyncEnumerator(self.cancellationToken);
  506. self.MoveNextSelected(); // iterated selected source.
  507. }
  508. }
  509. static void ResultSelectorAwaitCore(object state)
  510. {
  511. var self = (_SelectManyAwait)state;
  512. if (self.TryGetResult(self.resultSelectorAwaiter, out var result))
  513. {
  514. self.Current = result;
  515. self.completionSource.TrySetResult(true);
  516. }
  517. }
  518. public async UniTask DisposeAsync()
  519. {
  520. TaskTracker.RemoveTracking(this);
  521. if (selectedEnumerator != null)
  522. {
  523. await selectedEnumerator.DisposeAsync();
  524. }
  525. if (sourceEnumerator != null)
  526. {
  527. await sourceEnumerator.DisposeAsync();
  528. }
  529. }
  530. }
  531. }
  532. internal sealed class SelectManyAwaitWithCancellation<TSource, TCollection, TResult> : IUniTaskAsyncEnumerable<TResult>
  533. {
  534. readonly IUniTaskAsyncEnumerable<TSource> source;
  535. readonly Func<TSource, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector1;
  536. readonly Func<TSource, int, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector2;
  537. readonly Func<TSource, TCollection, CancellationToken, UniTask<TResult>> resultSelector;
  538. public SelectManyAwaitWithCancellation(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector, Func<TSource, TCollection, CancellationToken, UniTask<TResult>> resultSelector)
  539. {
  540. this.source = source;
  541. this.selector1 = selector;
  542. this.selector2 = null;
  543. this.resultSelector = resultSelector;
  544. }
  545. public SelectManyAwaitWithCancellation(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector, Func<TSource, TCollection, CancellationToken, UniTask<TResult>> resultSelector)
  546. {
  547. this.source = source;
  548. this.selector1 = null;
  549. this.selector2 = selector;
  550. this.resultSelector = resultSelector;
  551. }
  552. public IUniTaskAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken = default)
  553. {
  554. return new _SelectManyAwaitWithCancellation(source, selector1, selector2, resultSelector, cancellationToken);
  555. }
  556. sealed class _SelectManyAwaitWithCancellation : MoveNextSource, IUniTaskAsyncEnumerator<TResult>
  557. {
  558. static readonly Action<object> sourceMoveNextCoreDelegate = SourceMoveNextCore;
  559. static readonly Action<object> selectedSourceMoveNextCoreDelegate = SeletedSourceMoveNextCore;
  560. static readonly Action<object> selectedEnumeratorDisposeAsyncCoreDelegate = SelectedEnumeratorDisposeAsyncCore;
  561. static readonly Action<object> selectorAwaitCoreDelegate = SelectorAwaitCore;
  562. static readonly Action<object> resultSelectorAwaitCoreDelegate = ResultSelectorAwaitCore;
  563. readonly IUniTaskAsyncEnumerable<TSource> source;
  564. readonly Func<TSource, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector1;
  565. readonly Func<TSource, int, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector2;
  566. readonly Func<TSource, TCollection, CancellationToken, UniTask<TResult>> resultSelector;
  567. CancellationToken cancellationToken;
  568. TSource sourceCurrent;
  569. int sourceIndex;
  570. IUniTaskAsyncEnumerator<TSource> sourceEnumerator;
  571. IUniTaskAsyncEnumerator<TCollection> selectedEnumerator;
  572. UniTask<bool>.Awaiter sourceAwaiter;
  573. UniTask<bool>.Awaiter selectedAwaiter;
  574. UniTask.Awaiter selectedDisposeAsyncAwaiter;
  575. // await additional
  576. UniTask<IUniTaskAsyncEnumerable<TCollection>>.Awaiter collectionSelectorAwaiter;
  577. UniTask<TResult>.Awaiter resultSelectorAwaiter;
  578. public _SelectManyAwaitWithCancellation(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector1, Func<TSource, int, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector2, Func<TSource, TCollection, CancellationToken, UniTask<TResult>> resultSelector, CancellationToken cancellationToken)
  579. {
  580. this.source = source;
  581. this.selector1 = selector1;
  582. this.selector2 = selector2;
  583. this.resultSelector = resultSelector;
  584. this.cancellationToken = cancellationToken;
  585. TaskTracker.TrackActiveTask(this, 3);
  586. }
  587. public TResult Current { get; private set; }
  588. public UniTask<bool> MoveNextAsync()
  589. {
  590. completionSource.Reset();
  591. // iterate selected field
  592. if (selectedEnumerator != null)
  593. {
  594. MoveNextSelected();
  595. }
  596. else
  597. {
  598. // iterate source field
  599. if (sourceEnumerator == null)
  600. {
  601. sourceEnumerator = source.GetAsyncEnumerator(cancellationToken);
  602. }
  603. MoveNextSource();
  604. }
  605. return new UniTask<bool>(this, completionSource.Version);
  606. }
  607. void MoveNextSource()
  608. {
  609. try
  610. {
  611. sourceAwaiter = sourceEnumerator.MoveNextAsync().GetAwaiter();
  612. }
  613. catch (Exception ex)
  614. {
  615. completionSource.TrySetException(ex);
  616. return;
  617. }
  618. if (sourceAwaiter.IsCompleted)
  619. {
  620. SourceMoveNextCore(this);
  621. }
  622. else
  623. {
  624. sourceAwaiter.SourceOnCompleted(sourceMoveNextCoreDelegate, this);
  625. }
  626. }
  627. void MoveNextSelected()
  628. {
  629. try
  630. {
  631. selectedAwaiter = selectedEnumerator.MoveNextAsync().GetAwaiter();
  632. }
  633. catch (Exception ex)
  634. {
  635. completionSource.TrySetException(ex);
  636. return;
  637. }
  638. if (selectedAwaiter.IsCompleted)
  639. {
  640. SeletedSourceMoveNextCore(this);
  641. }
  642. else
  643. {
  644. selectedAwaiter.SourceOnCompleted(selectedSourceMoveNextCoreDelegate, this);
  645. }
  646. }
  647. static void SourceMoveNextCore(object state)
  648. {
  649. var self = (_SelectManyAwaitWithCancellation)state;
  650. if (self.TryGetResult(self.sourceAwaiter, out var result))
  651. {
  652. if (result)
  653. {
  654. try
  655. {
  656. self.sourceCurrent = self.sourceEnumerator.Current;
  657. if (self.selector1 != null)
  658. {
  659. self.collectionSelectorAwaiter = self.selector1(self.sourceCurrent, self.cancellationToken).GetAwaiter();
  660. }
  661. else
  662. {
  663. self.collectionSelectorAwaiter = self.selector2(self.sourceCurrent, checked(self.sourceIndex++), self.cancellationToken).GetAwaiter();
  664. }
  665. if (self.collectionSelectorAwaiter.IsCompleted)
  666. {
  667. SelectorAwaitCore(self);
  668. }
  669. else
  670. {
  671. self.collectionSelectorAwaiter.SourceOnCompleted(selectorAwaitCoreDelegate, self);
  672. }
  673. }
  674. catch (Exception ex)
  675. {
  676. self.completionSource.TrySetException(ex);
  677. return;
  678. }
  679. }
  680. else
  681. {
  682. self.completionSource.TrySetResult(false);
  683. }
  684. }
  685. }
  686. static void SeletedSourceMoveNextCore(object state)
  687. {
  688. var self = (_SelectManyAwaitWithCancellation)state;
  689. if (self.TryGetResult(self.selectedAwaiter, out var result))
  690. {
  691. if (result)
  692. {
  693. try
  694. {
  695. self.resultSelectorAwaiter = self.resultSelector(self.sourceCurrent, self.selectedEnumerator.Current, self.cancellationToken).GetAwaiter();
  696. if (self.resultSelectorAwaiter.IsCompleted)
  697. {
  698. ResultSelectorAwaitCore(self);
  699. }
  700. else
  701. {
  702. self.resultSelectorAwaiter.SourceOnCompleted(resultSelectorAwaitCoreDelegate, self);
  703. }
  704. }
  705. catch (Exception ex)
  706. {
  707. self.completionSource.TrySetException(ex);
  708. return;
  709. }
  710. }
  711. else
  712. {
  713. // dispose selected source and try iterate source.
  714. try
  715. {
  716. self.selectedDisposeAsyncAwaiter = self.selectedEnumerator.DisposeAsync().GetAwaiter();
  717. }
  718. catch (Exception ex)
  719. {
  720. self.completionSource.TrySetException(ex);
  721. return;
  722. }
  723. if (self.selectedDisposeAsyncAwaiter.IsCompleted)
  724. {
  725. SelectedEnumeratorDisposeAsyncCore(self);
  726. }
  727. else
  728. {
  729. self.selectedDisposeAsyncAwaiter.SourceOnCompleted(selectedEnumeratorDisposeAsyncCoreDelegate, self);
  730. }
  731. }
  732. }
  733. }
  734. static void SelectedEnumeratorDisposeAsyncCore(object state)
  735. {
  736. var self = (_SelectManyAwaitWithCancellation)state;
  737. if (self.TryGetResult(self.selectedDisposeAsyncAwaiter))
  738. {
  739. self.selectedEnumerator = null;
  740. self.selectedAwaiter = default;
  741. self.MoveNextSource(); // iterate next source
  742. }
  743. }
  744. static void SelectorAwaitCore(object state)
  745. {
  746. var self = (_SelectManyAwaitWithCancellation)state;
  747. if (self.TryGetResult(self.collectionSelectorAwaiter, out var result))
  748. {
  749. self.selectedEnumerator = result.GetAsyncEnumerator(self.cancellationToken);
  750. self.MoveNextSelected(); // iterated selected source.
  751. }
  752. }
  753. static void ResultSelectorAwaitCore(object state)
  754. {
  755. var self = (_SelectManyAwaitWithCancellation)state;
  756. if (self.TryGetResult(self.resultSelectorAwaiter, out var result))
  757. {
  758. self.Current = result;
  759. self.completionSource.TrySetResult(true);
  760. }
  761. }
  762. public async UniTask DisposeAsync()
  763. {
  764. TaskTracker.RemoveTracking(this);
  765. if (selectedEnumerator != null)
  766. {
  767. await selectedEnumerator.DisposeAsync();
  768. }
  769. if (sourceEnumerator != null)
  770. {
  771. await sourceEnumerator.DisposeAsync();
  772. }
  773. }
  774. }
  775. }
  776. }