Join.cs 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728
  1. using Cysharp.Threading.Tasks.Internal;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Threading;
  6. namespace Cysharp.Threading.Tasks.Linq
  7. {
  8. public static partial class UniTaskAsyncEnumerable
  9. {
  10. public static IUniTaskAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IUniTaskAsyncEnumerable<TOuter> outer, IUniTaskAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, TInner, TResult> resultSelector)
  11. {
  12. Error.ThrowArgumentNullException(outer, nameof(outer));
  13. Error.ThrowArgumentNullException(inner, nameof(inner));
  14. Error.ThrowArgumentNullException(outerKeySelector, nameof(outerKeySelector));
  15. Error.ThrowArgumentNullException(innerKeySelector, nameof(innerKeySelector));
  16. Error.ThrowArgumentNullException(resultSelector, nameof(resultSelector));
  17. return new Join<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, EqualityComparer<TKey>.Default);
  18. }
  19. public static IUniTaskAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IUniTaskAsyncEnumerable<TOuter> outer, IUniTaskAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, TInner, TResult> resultSelector, IEqualityComparer<TKey> comparer)
  20. {
  21. Error.ThrowArgumentNullException(outer, nameof(outer));
  22. Error.ThrowArgumentNullException(inner, nameof(inner));
  23. Error.ThrowArgumentNullException(outerKeySelector, nameof(outerKeySelector));
  24. Error.ThrowArgumentNullException(innerKeySelector, nameof(innerKeySelector));
  25. Error.ThrowArgumentNullException(resultSelector, nameof(resultSelector));
  26. Error.ThrowArgumentNullException(comparer, nameof(comparer));
  27. return new Join<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  28. }
  29. public static IUniTaskAsyncEnumerable<TResult> JoinAwait<TOuter, TInner, TKey, TResult>(this IUniTaskAsyncEnumerable<TOuter> outer, IUniTaskAsyncEnumerable<TInner> inner, Func<TOuter, UniTask<TKey>> outerKeySelector, Func<TInner, UniTask<TKey>> innerKeySelector, Func<TOuter, TInner, UniTask<TResult>> resultSelector)
  30. {
  31. Error.ThrowArgumentNullException(outer, nameof(outer));
  32. Error.ThrowArgumentNullException(inner, nameof(inner));
  33. Error.ThrowArgumentNullException(outerKeySelector, nameof(outerKeySelector));
  34. Error.ThrowArgumentNullException(innerKeySelector, nameof(innerKeySelector));
  35. Error.ThrowArgumentNullException(resultSelector, nameof(resultSelector));
  36. return new JoinAwait<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, EqualityComparer<TKey>.Default);
  37. }
  38. public static IUniTaskAsyncEnumerable<TResult> JoinAwait<TOuter, TInner, TKey, TResult>(this IUniTaskAsyncEnumerable<TOuter> outer, IUniTaskAsyncEnumerable<TInner> inner, Func<TOuter, UniTask<TKey>> outerKeySelector, Func<TInner, UniTask<TKey>> innerKeySelector, Func<TOuter, TInner, UniTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  39. {
  40. Error.ThrowArgumentNullException(outer, nameof(outer));
  41. Error.ThrowArgumentNullException(inner, nameof(inner));
  42. Error.ThrowArgumentNullException(outerKeySelector, nameof(outerKeySelector));
  43. Error.ThrowArgumentNullException(innerKeySelector, nameof(innerKeySelector));
  44. Error.ThrowArgumentNullException(resultSelector, nameof(resultSelector));
  45. Error.ThrowArgumentNullException(comparer, nameof(comparer));
  46. return new JoinAwait<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  47. }
  48. public static IUniTaskAsyncEnumerable<TResult> JoinAwaitWithCancellation<TOuter, TInner, TKey, TResult>(this IUniTaskAsyncEnumerable<TOuter> outer, IUniTaskAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, UniTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, UniTask<TKey>> innerKeySelector, Func<TOuter, TInner, CancellationToken, UniTask<TResult>> resultSelector)
  49. {
  50. Error.ThrowArgumentNullException(outer, nameof(outer));
  51. Error.ThrowArgumentNullException(inner, nameof(inner));
  52. Error.ThrowArgumentNullException(outerKeySelector, nameof(outerKeySelector));
  53. Error.ThrowArgumentNullException(innerKeySelector, nameof(innerKeySelector));
  54. Error.ThrowArgumentNullException(resultSelector, nameof(resultSelector));
  55. return new JoinAwaitWithCancellation<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, EqualityComparer<TKey>.Default);
  56. }
  57. public static IUniTaskAsyncEnumerable<TResult> JoinAwaitWithCancellation<TOuter, TInner, TKey, TResult>(this IUniTaskAsyncEnumerable<TOuter> outer, IUniTaskAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, UniTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, UniTask<TKey>> innerKeySelector, Func<TOuter, TInner, CancellationToken, UniTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  58. {
  59. Error.ThrowArgumentNullException(outer, nameof(outer));
  60. Error.ThrowArgumentNullException(inner, nameof(inner));
  61. Error.ThrowArgumentNullException(outerKeySelector, nameof(outerKeySelector));
  62. Error.ThrowArgumentNullException(innerKeySelector, nameof(innerKeySelector));
  63. Error.ThrowArgumentNullException(resultSelector, nameof(resultSelector));
  64. Error.ThrowArgumentNullException(comparer, nameof(comparer));
  65. return new JoinAwaitWithCancellation<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
  66. }
  67. }
  68. internal sealed class Join<TOuter, TInner, TKey, TResult> : IUniTaskAsyncEnumerable<TResult>
  69. {
  70. readonly IUniTaskAsyncEnumerable<TOuter> outer;
  71. readonly IUniTaskAsyncEnumerable<TInner> inner;
  72. readonly Func<TOuter, TKey> outerKeySelector;
  73. readonly Func<TInner, TKey> innerKeySelector;
  74. readonly Func<TOuter, TInner, TResult> resultSelector;
  75. readonly IEqualityComparer<TKey> comparer;
  76. public Join(IUniTaskAsyncEnumerable<TOuter> outer, IUniTaskAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, TInner, TResult> resultSelector, IEqualityComparer<TKey> comparer)
  77. {
  78. this.outer = outer;
  79. this.inner = inner;
  80. this.outerKeySelector = outerKeySelector;
  81. this.innerKeySelector = innerKeySelector;
  82. this.resultSelector = resultSelector;
  83. this.comparer = comparer;
  84. }
  85. public IUniTaskAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken = default)
  86. {
  87. return new _Join(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer, cancellationToken);
  88. }
  89. sealed class _Join : MoveNextSource, IUniTaskAsyncEnumerator<TResult>
  90. {
  91. static readonly Action<object> MoveNextCoreDelegate = MoveNextCore;
  92. readonly IUniTaskAsyncEnumerable<TOuter> outer;
  93. readonly IUniTaskAsyncEnumerable<TInner> inner;
  94. readonly Func<TOuter, TKey> outerKeySelector;
  95. readonly Func<TInner, TKey> innerKeySelector;
  96. readonly Func<TOuter, TInner, TResult> resultSelector;
  97. readonly IEqualityComparer<TKey> comparer;
  98. CancellationToken cancellationToken;
  99. ILookup<TKey, TInner> lookup;
  100. IUniTaskAsyncEnumerator<TOuter> enumerator;
  101. UniTask<bool>.Awaiter awaiter;
  102. TOuter currentOuterValue;
  103. IEnumerator<TInner> valueEnumerator;
  104. bool continueNext;
  105. public _Join(IUniTaskAsyncEnumerable<TOuter> outer, IUniTaskAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, TInner, TResult> resultSelector, IEqualityComparer<TKey> comparer, CancellationToken cancellationToken)
  106. {
  107. this.outer = outer;
  108. this.inner = inner;
  109. this.outerKeySelector = outerKeySelector;
  110. this.innerKeySelector = innerKeySelector;
  111. this.resultSelector = resultSelector;
  112. this.comparer = comparer;
  113. this.cancellationToken = cancellationToken;
  114. TaskTracker.TrackActiveTask(this, 3);
  115. }
  116. public TResult Current { get; private set; }
  117. public UniTask<bool> MoveNextAsync()
  118. {
  119. cancellationToken.ThrowIfCancellationRequested();
  120. completionSource.Reset();
  121. if (lookup == null)
  122. {
  123. CreateInnerHashSet().Forget();
  124. }
  125. else
  126. {
  127. SourceMoveNext();
  128. }
  129. return new UniTask<bool>(this, completionSource.Version);
  130. }
  131. async UniTaskVoid CreateInnerHashSet()
  132. {
  133. try
  134. {
  135. lookup = await inner.ToLookupAsync(innerKeySelector, comparer, cancellationToken);
  136. enumerator = outer.GetAsyncEnumerator(cancellationToken);
  137. }
  138. catch (Exception ex)
  139. {
  140. completionSource.TrySetException(ex);
  141. return;
  142. }
  143. SourceMoveNext();
  144. }
  145. void SourceMoveNext()
  146. {
  147. try
  148. {
  149. LOOP:
  150. if (valueEnumerator != null)
  151. {
  152. if (valueEnumerator.MoveNext())
  153. {
  154. Current = resultSelector(currentOuterValue, valueEnumerator.Current);
  155. goto TRY_SET_RESULT_TRUE;
  156. }
  157. else
  158. {
  159. valueEnumerator.Dispose();
  160. valueEnumerator = null;
  161. }
  162. }
  163. awaiter = enumerator.MoveNextAsync().GetAwaiter();
  164. if (awaiter.IsCompleted)
  165. {
  166. continueNext = true;
  167. MoveNextCore(this);
  168. if (continueNext)
  169. {
  170. continueNext = false;
  171. goto LOOP; // avoid recursive
  172. }
  173. }
  174. else
  175. {
  176. awaiter.SourceOnCompleted(MoveNextCoreDelegate, this);
  177. }
  178. }
  179. catch (Exception ex)
  180. {
  181. completionSource.TrySetException(ex);
  182. }
  183. return;
  184. TRY_SET_RESULT_TRUE:
  185. completionSource.TrySetResult(true);
  186. }
  187. static void MoveNextCore(object state)
  188. {
  189. var self = (_Join)state;
  190. if (self.TryGetResult(self.awaiter, out var result))
  191. {
  192. if (result)
  193. {
  194. self.currentOuterValue = self.enumerator.Current;
  195. var key = self.outerKeySelector(self.currentOuterValue);
  196. self.valueEnumerator = self.lookup[key].GetEnumerator();
  197. if (self.continueNext)
  198. {
  199. return;
  200. }
  201. else
  202. {
  203. self.SourceMoveNext();
  204. }
  205. }
  206. else
  207. {
  208. self.continueNext = false;
  209. self.completionSource.TrySetResult(false);
  210. }
  211. }
  212. else
  213. {
  214. self.continueNext = false;
  215. }
  216. }
  217. public UniTask DisposeAsync()
  218. {
  219. TaskTracker.RemoveTracking(this);
  220. if (valueEnumerator != null)
  221. {
  222. valueEnumerator.Dispose();
  223. }
  224. if (enumerator != null)
  225. {
  226. return enumerator.DisposeAsync();
  227. }
  228. return default;
  229. }
  230. }
  231. }
  232. internal sealed class JoinAwait<TOuter, TInner, TKey, TResult> : IUniTaskAsyncEnumerable<TResult>
  233. {
  234. readonly IUniTaskAsyncEnumerable<TOuter> outer;
  235. readonly IUniTaskAsyncEnumerable<TInner> inner;
  236. readonly Func<TOuter, UniTask<TKey>> outerKeySelector;
  237. readonly Func<TInner, UniTask<TKey>> innerKeySelector;
  238. readonly Func<TOuter, TInner, UniTask<TResult>> resultSelector;
  239. readonly IEqualityComparer<TKey> comparer;
  240. public JoinAwait(IUniTaskAsyncEnumerable<TOuter> outer, IUniTaskAsyncEnumerable<TInner> inner, Func<TOuter, UniTask<TKey>> outerKeySelector, Func<TInner, UniTask<TKey>> innerKeySelector, Func<TOuter, TInner, UniTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  241. {
  242. this.outer = outer;
  243. this.inner = inner;
  244. this.outerKeySelector = outerKeySelector;
  245. this.innerKeySelector = innerKeySelector;
  246. this.resultSelector = resultSelector;
  247. this.comparer = comparer;
  248. }
  249. public IUniTaskAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken = default)
  250. {
  251. return new _JoinAwait(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer, cancellationToken);
  252. }
  253. sealed class _JoinAwait : MoveNextSource, IUniTaskAsyncEnumerator<TResult>
  254. {
  255. static readonly Action<object> MoveNextCoreDelegate = MoveNextCore;
  256. static readonly Action<object> OuterSelectCoreDelegate = OuterSelectCore;
  257. static readonly Action<object> ResultSelectCoreDelegate = ResultSelectCore;
  258. readonly IUniTaskAsyncEnumerable<TOuter> outer;
  259. readonly IUniTaskAsyncEnumerable<TInner> inner;
  260. readonly Func<TOuter, UniTask<TKey>> outerKeySelector;
  261. readonly Func<TInner, UniTask<TKey>> innerKeySelector;
  262. readonly Func<TOuter, TInner, UniTask<TResult>> resultSelector;
  263. readonly IEqualityComparer<TKey> comparer;
  264. CancellationToken cancellationToken;
  265. ILookup<TKey, TInner> lookup;
  266. IUniTaskAsyncEnumerator<TOuter> enumerator;
  267. UniTask<bool>.Awaiter awaiter;
  268. TOuter currentOuterValue;
  269. IEnumerator<TInner> valueEnumerator;
  270. UniTask<TResult>.Awaiter resultAwaiter;
  271. UniTask<TKey>.Awaiter outerKeyAwaiter;
  272. bool continueNext;
  273. public _JoinAwait(IUniTaskAsyncEnumerable<TOuter> outer, IUniTaskAsyncEnumerable<TInner> inner, Func<TOuter, UniTask<TKey>> outerKeySelector, Func<TInner, UniTask<TKey>> innerKeySelector, Func<TOuter, TInner, UniTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer, CancellationToken cancellationToken)
  274. {
  275. this.outer = outer;
  276. this.inner = inner;
  277. this.outerKeySelector = outerKeySelector;
  278. this.innerKeySelector = innerKeySelector;
  279. this.resultSelector = resultSelector;
  280. this.comparer = comparer;
  281. this.cancellationToken = cancellationToken;
  282. TaskTracker.TrackActiveTask(this, 3);
  283. }
  284. public TResult Current { get; private set; }
  285. public UniTask<bool> MoveNextAsync()
  286. {
  287. cancellationToken.ThrowIfCancellationRequested();
  288. completionSource.Reset();
  289. if (lookup == null)
  290. {
  291. CreateInnerHashSet().Forget();
  292. }
  293. else
  294. {
  295. SourceMoveNext();
  296. }
  297. return new UniTask<bool>(this, completionSource.Version);
  298. }
  299. async UniTaskVoid CreateInnerHashSet()
  300. {
  301. try
  302. {
  303. lookup = await inner.ToLookupAwaitAsync(innerKeySelector, comparer, cancellationToken);
  304. enumerator = outer.GetAsyncEnumerator(cancellationToken);
  305. }
  306. catch (Exception ex)
  307. {
  308. completionSource.TrySetException(ex);
  309. return;
  310. }
  311. SourceMoveNext();
  312. }
  313. void SourceMoveNext()
  314. {
  315. try
  316. {
  317. LOOP:
  318. if (valueEnumerator != null)
  319. {
  320. if (valueEnumerator.MoveNext())
  321. {
  322. resultAwaiter = resultSelector(currentOuterValue, valueEnumerator.Current).GetAwaiter();
  323. if (resultAwaiter.IsCompleted)
  324. {
  325. ResultSelectCore(this);
  326. }
  327. else
  328. {
  329. resultAwaiter.SourceOnCompleted(ResultSelectCoreDelegate, this);
  330. }
  331. return;
  332. }
  333. else
  334. {
  335. valueEnumerator.Dispose();
  336. valueEnumerator = null;
  337. }
  338. }
  339. awaiter = enumerator.MoveNextAsync().GetAwaiter();
  340. if (awaiter.IsCompleted)
  341. {
  342. continueNext = true;
  343. MoveNextCore(this);
  344. if (continueNext)
  345. {
  346. continueNext = false;
  347. goto LOOP; // avoid recursive
  348. }
  349. }
  350. else
  351. {
  352. awaiter.SourceOnCompleted(MoveNextCoreDelegate, this);
  353. }
  354. }
  355. catch (Exception ex)
  356. {
  357. completionSource.TrySetException(ex);
  358. }
  359. }
  360. static void MoveNextCore(object state)
  361. {
  362. var self = (_JoinAwait)state;
  363. if (self.TryGetResult(self.awaiter, out var result))
  364. {
  365. if (result)
  366. {
  367. self.currentOuterValue = self.enumerator.Current;
  368. self.outerKeyAwaiter = self.outerKeySelector(self.currentOuterValue).GetAwaiter();
  369. if (self.outerKeyAwaiter.IsCompleted)
  370. {
  371. OuterSelectCore(self);
  372. }
  373. else
  374. {
  375. self.continueNext = false;
  376. self.outerKeyAwaiter.SourceOnCompleted(OuterSelectCoreDelegate, self);
  377. }
  378. }
  379. else
  380. {
  381. self.continueNext = false;
  382. self.completionSource.TrySetResult(false);
  383. }
  384. }
  385. else
  386. {
  387. self.continueNext = false;
  388. }
  389. }
  390. static void OuterSelectCore(object state)
  391. {
  392. var self = (_JoinAwait)state;
  393. if (self.TryGetResult(self.outerKeyAwaiter, out var key))
  394. {
  395. self.valueEnumerator = self.lookup[key].GetEnumerator();
  396. if (self.continueNext)
  397. {
  398. return;
  399. }
  400. else
  401. {
  402. self.SourceMoveNext();
  403. }
  404. }
  405. else
  406. {
  407. self.continueNext = false;
  408. }
  409. }
  410. static void ResultSelectCore(object state)
  411. {
  412. var self = (_JoinAwait)state;
  413. if (self.TryGetResult(self.resultAwaiter, out var result))
  414. {
  415. self.Current = result;
  416. self.completionSource.TrySetResult(true);
  417. }
  418. }
  419. public UniTask DisposeAsync()
  420. {
  421. TaskTracker.RemoveTracking(this);
  422. if (valueEnumerator != null)
  423. {
  424. valueEnumerator.Dispose();
  425. }
  426. if (enumerator != null)
  427. {
  428. return enumerator.DisposeAsync();
  429. }
  430. return default;
  431. }
  432. }
  433. }
  434. internal sealed class JoinAwaitWithCancellation<TOuter, TInner, TKey, TResult> : IUniTaskAsyncEnumerable<TResult>
  435. {
  436. readonly IUniTaskAsyncEnumerable<TOuter> outer;
  437. readonly IUniTaskAsyncEnumerable<TInner> inner;
  438. readonly Func<TOuter, CancellationToken, UniTask<TKey>> outerKeySelector;
  439. readonly Func<TInner, CancellationToken, UniTask<TKey>> innerKeySelector;
  440. readonly Func<TOuter, TInner, CancellationToken, UniTask<TResult>> resultSelector;
  441. readonly IEqualityComparer<TKey> comparer;
  442. public JoinAwaitWithCancellation(IUniTaskAsyncEnumerable<TOuter> outer, IUniTaskAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, UniTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, UniTask<TKey>> innerKeySelector, Func<TOuter, TInner, CancellationToken, UniTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer)
  443. {
  444. this.outer = outer;
  445. this.inner = inner;
  446. this.outerKeySelector = outerKeySelector;
  447. this.innerKeySelector = innerKeySelector;
  448. this.resultSelector = resultSelector;
  449. this.comparer = comparer;
  450. }
  451. public IUniTaskAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken = default)
  452. {
  453. return new _JoinAwaitWithCancellation(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer, cancellationToken);
  454. }
  455. sealed class _JoinAwaitWithCancellation : MoveNextSource, IUniTaskAsyncEnumerator<TResult>
  456. {
  457. static readonly Action<object> MoveNextCoreDelegate = MoveNextCore;
  458. static readonly Action<object> OuterSelectCoreDelegate = OuterSelectCore;
  459. static readonly Action<object> ResultSelectCoreDelegate = ResultSelectCore;
  460. readonly IUniTaskAsyncEnumerable<TOuter> outer;
  461. readonly IUniTaskAsyncEnumerable<TInner> inner;
  462. readonly Func<TOuter, CancellationToken, UniTask<TKey>> outerKeySelector;
  463. readonly Func<TInner, CancellationToken, UniTask<TKey>> innerKeySelector;
  464. readonly Func<TOuter, TInner, CancellationToken, UniTask<TResult>> resultSelector;
  465. readonly IEqualityComparer<TKey> comparer;
  466. CancellationToken cancellationToken;
  467. ILookup<TKey, TInner> lookup;
  468. IUniTaskAsyncEnumerator<TOuter> enumerator;
  469. UniTask<bool>.Awaiter awaiter;
  470. TOuter currentOuterValue;
  471. IEnumerator<TInner> valueEnumerator;
  472. UniTask<TResult>.Awaiter resultAwaiter;
  473. UniTask<TKey>.Awaiter outerKeyAwaiter;
  474. bool continueNext;
  475. public _JoinAwaitWithCancellation(IUniTaskAsyncEnumerable<TOuter> outer, IUniTaskAsyncEnumerable<TInner> inner, Func<TOuter, CancellationToken, UniTask<TKey>> outerKeySelector, Func<TInner, CancellationToken, UniTask<TKey>> innerKeySelector, Func<TOuter, TInner, CancellationToken, UniTask<TResult>> resultSelector, IEqualityComparer<TKey> comparer, CancellationToken cancellationToken)
  476. {
  477. this.outer = outer;
  478. this.inner = inner;
  479. this.outerKeySelector = outerKeySelector;
  480. this.innerKeySelector = innerKeySelector;
  481. this.resultSelector = resultSelector;
  482. this.comparer = comparer;
  483. this.cancellationToken = cancellationToken;
  484. TaskTracker.TrackActiveTask(this, 3);
  485. }
  486. public TResult Current { get; private set; }
  487. public UniTask<bool> MoveNextAsync()
  488. {
  489. cancellationToken.ThrowIfCancellationRequested();
  490. completionSource.Reset();
  491. if (lookup == null)
  492. {
  493. CreateInnerHashSet().Forget();
  494. }
  495. else
  496. {
  497. SourceMoveNext();
  498. }
  499. return new UniTask<bool>(this, completionSource.Version);
  500. }
  501. async UniTaskVoid CreateInnerHashSet()
  502. {
  503. try
  504. {
  505. lookup = await inner.ToLookupAwaitWithCancellationAsync(innerKeySelector, comparer, cancellationToken: cancellationToken);
  506. enumerator = outer.GetAsyncEnumerator(cancellationToken);
  507. }
  508. catch (Exception ex)
  509. {
  510. completionSource.TrySetException(ex);
  511. return;
  512. }
  513. SourceMoveNext();
  514. }
  515. void SourceMoveNext()
  516. {
  517. try
  518. {
  519. LOOP:
  520. if (valueEnumerator != null)
  521. {
  522. if (valueEnumerator.MoveNext())
  523. {
  524. resultAwaiter = resultSelector(currentOuterValue, valueEnumerator.Current, cancellationToken).GetAwaiter();
  525. if (resultAwaiter.IsCompleted)
  526. {
  527. ResultSelectCore(this);
  528. }
  529. else
  530. {
  531. resultAwaiter.SourceOnCompleted(ResultSelectCoreDelegate, this);
  532. }
  533. return;
  534. }
  535. else
  536. {
  537. valueEnumerator.Dispose();
  538. valueEnumerator = null;
  539. }
  540. }
  541. awaiter = enumerator.MoveNextAsync().GetAwaiter();
  542. if (awaiter.IsCompleted)
  543. {
  544. continueNext = true;
  545. MoveNextCore(this);
  546. if (continueNext)
  547. {
  548. continueNext = false;
  549. goto LOOP; // avoid recursive
  550. }
  551. }
  552. else
  553. {
  554. awaiter.SourceOnCompleted(MoveNextCoreDelegate, this);
  555. }
  556. }
  557. catch (Exception ex)
  558. {
  559. completionSource.TrySetException(ex);
  560. }
  561. }
  562. static void MoveNextCore(object state)
  563. {
  564. var self = (_JoinAwaitWithCancellation)state;
  565. if (self.TryGetResult(self.awaiter, out var result))
  566. {
  567. if (result)
  568. {
  569. self.currentOuterValue = self.enumerator.Current;
  570. self.outerKeyAwaiter = self.outerKeySelector(self.currentOuterValue, self.cancellationToken).GetAwaiter();
  571. if (self.outerKeyAwaiter.IsCompleted)
  572. {
  573. OuterSelectCore(self);
  574. }
  575. else
  576. {
  577. self.continueNext = false;
  578. self.outerKeyAwaiter.SourceOnCompleted(OuterSelectCoreDelegate, self);
  579. }
  580. }
  581. else
  582. {
  583. self.continueNext = false;
  584. self.completionSource.TrySetResult(false);
  585. }
  586. }
  587. else
  588. {
  589. self.continueNext = false;
  590. }
  591. }
  592. static void OuterSelectCore(object state)
  593. {
  594. var self = (_JoinAwaitWithCancellation)state;
  595. if (self.TryGetResult(self.outerKeyAwaiter, out var key))
  596. {
  597. self.valueEnumerator = self.lookup[key].GetEnumerator();
  598. if (self.continueNext)
  599. {
  600. return;
  601. }
  602. else
  603. {
  604. self.SourceMoveNext();
  605. }
  606. }
  607. else
  608. {
  609. self.continueNext = false;
  610. }
  611. }
  612. static void ResultSelectCore(object state)
  613. {
  614. var self = (_JoinAwaitWithCancellation)state;
  615. if (self.TryGetResult(self.resultAwaiter, out var result))
  616. {
  617. self.Current = result;
  618. self.completionSource.TrySetResult(true);
  619. }
  620. }
  621. public UniTask DisposeAsync()
  622. {
  623. TaskTracker.RemoveTracking(this);
  624. if (valueEnumerator != null)
  625. {
  626. valueEnumerator.Dispose();
  627. }
  628. if (enumerator != null)
  629. {
  630. return enumerator.DisposeAsync();
  631. }
  632. return default;
  633. }
  634. }
  635. }
  636. }