DistinctUntilChanged.cs 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662
  1. using Cysharp.Threading.Tasks.Internal;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Threading;
  5. namespace Cysharp.Threading.Tasks.Linq
  6. {
  7. public static partial class UniTaskAsyncEnumerable
  8. {
  9. public static IUniTaskAsyncEnumerable<TSource> DistinctUntilChanged<TSource>(this IUniTaskAsyncEnumerable<TSource> source)
  10. {
  11. return DistinctUntilChanged(source, EqualityComparer<TSource>.Default);
  12. }
  13. public static IUniTaskAsyncEnumerable<TSource> DistinctUntilChanged<TSource>(this IUniTaskAsyncEnumerable<TSource> source, IEqualityComparer<TSource> comparer)
  14. {
  15. Error.ThrowArgumentNullException(source, nameof(source));
  16. Error.ThrowArgumentNullException(comparer, nameof(comparer));
  17. return new DistinctUntilChanged<TSource>(source, comparer);
  18. }
  19. public static IUniTaskAsyncEnumerable<TSource> DistinctUntilChanged<TSource, TKey>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector)
  20. {
  21. return DistinctUntilChanged(source, keySelector, EqualityComparer<TKey>.Default);
  22. }
  23. public static IUniTaskAsyncEnumerable<TSource> DistinctUntilChanged<TSource, TKey>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  24. {
  25. Error.ThrowArgumentNullException(source, nameof(source));
  26. Error.ThrowArgumentNullException(keySelector, nameof(keySelector));
  27. Error.ThrowArgumentNullException(comparer, nameof(comparer));
  28. return new DistinctUntilChanged<TSource, TKey>(source, keySelector, comparer);
  29. }
  30. public static IUniTaskAsyncEnumerable<TSource> DistinctUntilChangedAwait<TSource, TKey>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask<TKey>> keySelector)
  31. {
  32. return DistinctUntilChangedAwait(source, keySelector, EqualityComparer<TKey>.Default);
  33. }
  34. public static IUniTaskAsyncEnumerable<TSource> DistinctUntilChangedAwait<TSource, TKey>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  35. {
  36. Error.ThrowArgumentNullException(source, nameof(source));
  37. Error.ThrowArgumentNullException(keySelector, nameof(keySelector));
  38. Error.ThrowArgumentNullException(comparer, nameof(comparer));
  39. return new DistinctUntilChangedAwait<TSource, TKey>(source, keySelector, comparer);
  40. }
  41. public static IUniTaskAsyncEnumerable<TSource> DistinctUntilChangedAwaitWithCancellation<TSource, TKey>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask<TKey>> keySelector)
  42. {
  43. return DistinctUntilChangedAwaitWithCancellation(source, keySelector, EqualityComparer<TKey>.Default);
  44. }
  45. public static IUniTaskAsyncEnumerable<TSource> DistinctUntilChangedAwaitWithCancellation<TSource, TKey>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  46. {
  47. Error.ThrowArgumentNullException(source, nameof(source));
  48. Error.ThrowArgumentNullException(keySelector, nameof(keySelector));
  49. Error.ThrowArgumentNullException(comparer, nameof(comparer));
  50. return new DistinctUntilChangedAwaitWithCancellation<TSource, TKey>(source, keySelector, comparer);
  51. }
  52. }
  53. internal sealed class DistinctUntilChanged<TSource> : IUniTaskAsyncEnumerable<TSource>
  54. {
  55. readonly IUniTaskAsyncEnumerable<TSource> source;
  56. readonly IEqualityComparer<TSource> comparer;
  57. public DistinctUntilChanged(IUniTaskAsyncEnumerable<TSource> source, IEqualityComparer<TSource> comparer)
  58. {
  59. this.source = source;
  60. this.comparer = comparer;
  61. }
  62. public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
  63. {
  64. return new _DistinctUntilChanged(source, comparer, cancellationToken);
  65. }
  66. sealed class _DistinctUntilChanged : MoveNextSource, IUniTaskAsyncEnumerator<TSource>
  67. {
  68. readonly IUniTaskAsyncEnumerable<TSource> source;
  69. readonly IEqualityComparer<TSource> comparer;
  70. readonly CancellationToken cancellationToken;
  71. int state = -1;
  72. IUniTaskAsyncEnumerator<TSource> enumerator;
  73. UniTask<bool>.Awaiter awaiter;
  74. Action moveNextAction;
  75. public _DistinctUntilChanged(IUniTaskAsyncEnumerable<TSource> source, IEqualityComparer<TSource> comparer, CancellationToken cancellationToken)
  76. {
  77. this.source = source;
  78. this.comparer = comparer;
  79. this.cancellationToken = cancellationToken;
  80. this.moveNextAction = MoveNext;
  81. }
  82. public TSource Current { get; private set; }
  83. public UniTask<bool> MoveNextAsync()
  84. {
  85. if (state == -2) return default;
  86. completionSource.Reset();
  87. MoveNext();
  88. return new UniTask<bool>(this, completionSource.Version);
  89. }
  90. void MoveNext()
  91. {
  92. REPEAT:
  93. try
  94. {
  95. switch (state)
  96. {
  97. case -1: // init
  98. enumerator = source.GetAsyncEnumerator(cancellationToken);
  99. awaiter = enumerator.MoveNextAsync().GetAwaiter();
  100. if (awaiter.IsCompleted)
  101. {
  102. goto case -3;
  103. }
  104. else
  105. {
  106. state = -3;
  107. awaiter.UnsafeOnCompleted(moveNextAction);
  108. return;
  109. }
  110. case -3: // first
  111. if (awaiter.GetResult())
  112. {
  113. Current = enumerator.Current;
  114. goto CONTINUE;
  115. }
  116. else
  117. {
  118. goto DONE;
  119. }
  120. case 0: // normal
  121. awaiter = enumerator.MoveNextAsync().GetAwaiter();
  122. if (awaiter.IsCompleted)
  123. {
  124. goto case 1;
  125. }
  126. else
  127. {
  128. state = 1;
  129. awaiter.UnsafeOnCompleted(moveNextAction);
  130. return;
  131. }
  132. case 1:
  133. if (awaiter.GetResult())
  134. {
  135. var v = enumerator.Current;
  136. if (!comparer.Equals(Current, v))
  137. {
  138. Current = v;
  139. goto CONTINUE;
  140. }
  141. else
  142. {
  143. state = 0;
  144. goto REPEAT;
  145. }
  146. }
  147. else
  148. {
  149. goto DONE;
  150. }
  151. case -2:
  152. default:
  153. goto DONE;
  154. }
  155. }
  156. catch (Exception ex)
  157. {
  158. state = -2;
  159. completionSource.TrySetException(ex);
  160. return;
  161. }
  162. DONE:
  163. state = -2;
  164. completionSource.TrySetResult(false);
  165. return;
  166. CONTINUE:
  167. state = 0;
  168. completionSource.TrySetResult(true);
  169. return;
  170. }
  171. public UniTask DisposeAsync()
  172. {
  173. return enumerator.DisposeAsync();
  174. }
  175. }
  176. }
  177. internal sealed class DistinctUntilChanged<TSource, TKey> : IUniTaskAsyncEnumerable<TSource>
  178. {
  179. readonly IUniTaskAsyncEnumerable<TSource> source;
  180. readonly Func<TSource, TKey> keySelector;
  181. readonly IEqualityComparer<TKey> comparer;
  182. public DistinctUntilChanged(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
  183. {
  184. this.source = source;
  185. this.keySelector = keySelector;
  186. this.comparer = comparer;
  187. }
  188. public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
  189. {
  190. return new _DistinctUntilChanged(source, keySelector, comparer, cancellationToken);
  191. }
  192. sealed class _DistinctUntilChanged : MoveNextSource, IUniTaskAsyncEnumerator<TSource>
  193. {
  194. readonly IUniTaskAsyncEnumerable<TSource> source;
  195. readonly Func<TSource, TKey> keySelector;
  196. readonly IEqualityComparer<TKey> comparer;
  197. readonly CancellationToken cancellationToken;
  198. int state = -1;
  199. IUniTaskAsyncEnumerator<TSource> enumerator;
  200. UniTask<bool>.Awaiter awaiter;
  201. Action moveNextAction;
  202. TKey prev;
  203. public _DistinctUntilChanged(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer, CancellationToken cancellationToken)
  204. {
  205. this.source = source;
  206. this.keySelector = keySelector;
  207. this.comparer = comparer;
  208. this.cancellationToken = cancellationToken;
  209. this.moveNextAction = MoveNext;
  210. }
  211. public TSource Current { get; private set; }
  212. public UniTask<bool> MoveNextAsync()
  213. {
  214. if (state == -2) return default;
  215. completionSource.Reset();
  216. MoveNext();
  217. return new UniTask<bool>(this, completionSource.Version);
  218. }
  219. void MoveNext()
  220. {
  221. REPEAT:
  222. try
  223. {
  224. switch (state)
  225. {
  226. case -1: // init
  227. enumerator = source.GetAsyncEnumerator(cancellationToken);
  228. awaiter = enumerator.MoveNextAsync().GetAwaiter();
  229. if (awaiter.IsCompleted)
  230. {
  231. goto case -3;
  232. }
  233. else
  234. {
  235. state = -3;
  236. awaiter.UnsafeOnCompleted(moveNextAction);
  237. return;
  238. }
  239. case -3: // first
  240. if (awaiter.GetResult())
  241. {
  242. Current = enumerator.Current;
  243. goto CONTINUE;
  244. }
  245. else
  246. {
  247. goto DONE;
  248. }
  249. case 0: // normal
  250. awaiter = enumerator.MoveNextAsync().GetAwaiter();
  251. if (awaiter.IsCompleted)
  252. {
  253. goto case 1;
  254. }
  255. else
  256. {
  257. state = 1;
  258. awaiter.UnsafeOnCompleted(moveNextAction);
  259. return;
  260. }
  261. case 1:
  262. if (awaiter.GetResult())
  263. {
  264. var v = enumerator.Current;
  265. var key = keySelector(v);
  266. if (!comparer.Equals(prev, key))
  267. {
  268. prev = key;
  269. Current = v;
  270. goto CONTINUE;
  271. }
  272. else
  273. {
  274. state = 0;
  275. goto REPEAT;
  276. }
  277. }
  278. else
  279. {
  280. goto DONE;
  281. }
  282. case -2:
  283. default:
  284. goto DONE;
  285. }
  286. }
  287. catch (Exception ex)
  288. {
  289. state = -2;
  290. completionSource.TrySetException(ex);
  291. return;
  292. }
  293. DONE:
  294. state = -2;
  295. completionSource.TrySetResult(false);
  296. return;
  297. CONTINUE:
  298. state = 0;
  299. completionSource.TrySetResult(true);
  300. return;
  301. }
  302. public UniTask DisposeAsync()
  303. {
  304. return enumerator.DisposeAsync();
  305. }
  306. }
  307. }
  308. internal sealed class DistinctUntilChangedAwait<TSource, TKey> : IUniTaskAsyncEnumerable<TSource>
  309. {
  310. readonly IUniTaskAsyncEnumerable<TSource> source;
  311. readonly Func<TSource, UniTask<TKey>> keySelector;
  312. readonly IEqualityComparer<TKey> comparer;
  313. public DistinctUntilChangedAwait(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  314. {
  315. this.source = source;
  316. this.keySelector = keySelector;
  317. this.comparer = comparer;
  318. }
  319. public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
  320. {
  321. return new _DistinctUntilChangedAwait(source, keySelector, comparer, cancellationToken);
  322. }
  323. sealed class _DistinctUntilChangedAwait : MoveNextSource, IUniTaskAsyncEnumerator<TSource>
  324. {
  325. readonly IUniTaskAsyncEnumerable<TSource> source;
  326. readonly Func<TSource, UniTask<TKey>> keySelector;
  327. readonly IEqualityComparer<TKey> comparer;
  328. readonly CancellationToken cancellationToken;
  329. int state = -1;
  330. IUniTaskAsyncEnumerator<TSource> enumerator;
  331. UniTask<bool>.Awaiter awaiter;
  332. UniTask<TKey>.Awaiter awaiter2;
  333. Action moveNextAction;
  334. TSource enumeratorCurrent;
  335. TKey prev;
  336. public _DistinctUntilChangedAwait(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask<TKey>> keySelector, IEqualityComparer<TKey> comparer, CancellationToken cancellationToken)
  337. {
  338. this.source = source;
  339. this.keySelector = keySelector;
  340. this.comparer = comparer;
  341. this.cancellationToken = cancellationToken;
  342. this.moveNextAction = MoveNext;
  343. }
  344. public TSource Current { get; private set; }
  345. public UniTask<bool> MoveNextAsync()
  346. {
  347. if (state == -2) return default;
  348. completionSource.Reset();
  349. MoveNext();
  350. return new UniTask<bool>(this, completionSource.Version);
  351. }
  352. void MoveNext()
  353. {
  354. REPEAT:
  355. try
  356. {
  357. switch (state)
  358. {
  359. case -1: // init
  360. enumerator = source.GetAsyncEnumerator(cancellationToken);
  361. awaiter = enumerator.MoveNextAsync().GetAwaiter();
  362. if (awaiter.IsCompleted)
  363. {
  364. goto case -3;
  365. }
  366. else
  367. {
  368. state = -3;
  369. awaiter.UnsafeOnCompleted(moveNextAction);
  370. return;
  371. }
  372. case -3: // first
  373. if (awaiter.GetResult())
  374. {
  375. Current = enumerator.Current;
  376. goto CONTINUE;
  377. }
  378. else
  379. {
  380. goto DONE;
  381. }
  382. case 0: // normal
  383. awaiter = enumerator.MoveNextAsync().GetAwaiter();
  384. if (awaiter.IsCompleted)
  385. {
  386. goto case 1;
  387. }
  388. else
  389. {
  390. state = 1;
  391. awaiter.UnsafeOnCompleted(moveNextAction);
  392. return;
  393. }
  394. case 1:
  395. if (awaiter.GetResult())
  396. {
  397. enumeratorCurrent = enumerator.Current;
  398. awaiter2 = keySelector(enumeratorCurrent).GetAwaiter();
  399. if (awaiter2.IsCompleted)
  400. {
  401. goto case 2;
  402. }
  403. else
  404. {
  405. state = 2;
  406. awaiter2.UnsafeOnCompleted(moveNextAction);
  407. return;
  408. }
  409. }
  410. else
  411. {
  412. goto DONE;
  413. }
  414. case 2:
  415. var key = awaiter2.GetResult();
  416. if (!comparer.Equals(prev, key))
  417. {
  418. prev = key;
  419. Current = enumeratorCurrent;
  420. goto CONTINUE;
  421. }
  422. else
  423. {
  424. state = 0;
  425. goto REPEAT;
  426. }
  427. case -2:
  428. default:
  429. goto DONE;
  430. }
  431. }
  432. catch (Exception ex)
  433. {
  434. state = -2;
  435. completionSource.TrySetException(ex);
  436. return;
  437. }
  438. DONE:
  439. state = -2;
  440. completionSource.TrySetResult(false);
  441. return;
  442. CONTINUE:
  443. state = 0;
  444. completionSource.TrySetResult(true);
  445. return;
  446. }
  447. public UniTask DisposeAsync()
  448. {
  449. return enumerator.DisposeAsync();
  450. }
  451. }
  452. }
  453. internal sealed class DistinctUntilChangedAwaitWithCancellation<TSource, TKey> : IUniTaskAsyncEnumerable<TSource>
  454. {
  455. readonly IUniTaskAsyncEnumerable<TSource> source;
  456. readonly Func<TSource, CancellationToken, UniTask<TKey>> keySelector;
  457. readonly IEqualityComparer<TKey> comparer;
  458. public DistinctUntilChangedAwaitWithCancellation(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask<TKey>> keySelector, IEqualityComparer<TKey> comparer)
  459. {
  460. this.source = source;
  461. this.keySelector = keySelector;
  462. this.comparer = comparer;
  463. }
  464. public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
  465. {
  466. return new _DistinctUntilChangedAwaitWithCancellation(source, keySelector, comparer, cancellationToken);
  467. }
  468. sealed class _DistinctUntilChangedAwaitWithCancellation : MoveNextSource, IUniTaskAsyncEnumerator<TSource>
  469. {
  470. readonly IUniTaskAsyncEnumerable<TSource> source;
  471. readonly Func<TSource, CancellationToken, UniTask<TKey>> keySelector;
  472. readonly IEqualityComparer<TKey> comparer;
  473. readonly CancellationToken cancellationToken;
  474. int state = -1;
  475. IUniTaskAsyncEnumerator<TSource> enumerator;
  476. UniTask<bool>.Awaiter awaiter;
  477. UniTask<TKey>.Awaiter awaiter2;
  478. Action moveNextAction;
  479. TSource enumeratorCurrent;
  480. TKey prev;
  481. public _DistinctUntilChangedAwaitWithCancellation(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask<TKey>> keySelector, IEqualityComparer<TKey> comparer, CancellationToken cancellationToken)
  482. {
  483. this.source = source;
  484. this.keySelector = keySelector;
  485. this.comparer = comparer;
  486. this.cancellationToken = cancellationToken;
  487. this.moveNextAction = MoveNext;
  488. }
  489. public TSource Current { get; private set; }
  490. public UniTask<bool> MoveNextAsync()
  491. {
  492. if (state == -2) return default;
  493. completionSource.Reset();
  494. MoveNext();
  495. return new UniTask<bool>(this, completionSource.Version);
  496. }
  497. void MoveNext()
  498. {
  499. REPEAT:
  500. try
  501. {
  502. switch (state)
  503. {
  504. case -1: // init
  505. enumerator = source.GetAsyncEnumerator(cancellationToken);
  506. awaiter = enumerator.MoveNextAsync().GetAwaiter();
  507. if (awaiter.IsCompleted)
  508. {
  509. goto case -3;
  510. }
  511. else
  512. {
  513. state = -3;
  514. awaiter.UnsafeOnCompleted(moveNextAction);
  515. return;
  516. }
  517. case -3: // first
  518. if (awaiter.GetResult())
  519. {
  520. Current = enumerator.Current;
  521. goto CONTINUE;
  522. }
  523. else
  524. {
  525. goto DONE;
  526. }
  527. case 0: // normal
  528. awaiter = enumerator.MoveNextAsync().GetAwaiter();
  529. if (awaiter.IsCompleted)
  530. {
  531. goto case 1;
  532. }
  533. else
  534. {
  535. state = 1;
  536. awaiter.UnsafeOnCompleted(moveNextAction);
  537. return;
  538. }
  539. case 1:
  540. if (awaiter.GetResult())
  541. {
  542. enumeratorCurrent = enumerator.Current;
  543. awaiter2 = keySelector(enumeratorCurrent, cancellationToken).GetAwaiter();
  544. if (awaiter2.IsCompleted)
  545. {
  546. goto case 2;
  547. }
  548. else
  549. {
  550. state = 2;
  551. awaiter2.UnsafeOnCompleted(moveNextAction);
  552. return;
  553. }
  554. }
  555. else
  556. {
  557. goto DONE;
  558. }
  559. case 2:
  560. var key = awaiter2.GetResult();
  561. if (!comparer.Equals(prev, key))
  562. {
  563. prev = key;
  564. Current = enumeratorCurrent;
  565. goto CONTINUE;
  566. }
  567. else
  568. {
  569. state = 0;
  570. goto REPEAT;
  571. }
  572. case -2:
  573. default:
  574. goto DONE;
  575. }
  576. }
  577. catch (Exception ex)
  578. {
  579. state = -2;
  580. completionSource.TrySetException(ex);
  581. return;
  582. }
  583. DONE:
  584. state = -2;
  585. completionSource.TrySetResult(false);
  586. return;
  587. CONTINUE:
  588. state = 0;
  589. completionSource.TrySetResult(true);
  590. return;
  591. }
  592. public UniTask DisposeAsync()
  593. {
  594. return enumerator.DisposeAsync();
  595. }
  596. }
  597. }
  598. }