UniTaskObservableExtensions.cs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750
  1. #pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
  2. using System;
  3. using System.Runtime.ExceptionServices;
  4. using System.Threading;
  5. using Cysharp.Threading.Tasks.Internal;
  6. namespace Cysharp.Threading.Tasks
  7. {
  8. public static class UniTaskObservableExtensions
  9. {
  10. public static UniTask<T> ToUniTask<T>(this IObservable<T> source, bool useFirstValue = false, CancellationToken cancellationToken = default)
  11. {
  12. var promise = new UniTaskCompletionSource<T>();
  13. var disposable = new SingleAssignmentDisposable();
  14. var observer = useFirstValue
  15. ? (IObserver<T>)new FirstValueToUniTaskObserver<T>(promise, disposable, cancellationToken)
  16. : (IObserver<T>)new ToUniTaskObserver<T>(promise, disposable, cancellationToken);
  17. try
  18. {
  19. disposable.Disposable = source.Subscribe(observer);
  20. }
  21. catch (Exception ex)
  22. {
  23. promise.TrySetException(ex);
  24. }
  25. return promise.Task;
  26. }
  27. public static IObservable<T> ToObservable<T>(this UniTask<T> task)
  28. {
  29. if (task.Status.IsCompleted())
  30. {
  31. try
  32. {
  33. return new ReturnObservable<T>(task.GetAwaiter().GetResult());
  34. }
  35. catch (Exception ex)
  36. {
  37. return new ThrowObservable<T>(ex);
  38. }
  39. }
  40. var subject = new AsyncSubject<T>();
  41. Fire(subject, task).Forget();
  42. return subject;
  43. }
  44. /// <summary>
  45. /// Ideally returns IObservabl[Unit] is best but Cysharp.Threading.Tasks does not have Unit so return AsyncUnit instead.
  46. /// </summary>
  47. public static IObservable<AsyncUnit> ToObservable(this UniTask task)
  48. {
  49. if (task.Status.IsCompleted())
  50. {
  51. try
  52. {
  53. task.GetAwaiter().GetResult();
  54. return new ReturnObservable<AsyncUnit>(AsyncUnit.Default);
  55. }
  56. catch (Exception ex)
  57. {
  58. return new ThrowObservable<AsyncUnit>(ex);
  59. }
  60. }
  61. var subject = new AsyncSubject<AsyncUnit>();
  62. Fire(subject, task).Forget();
  63. return subject;
  64. }
  65. static async UniTaskVoid Fire<T>(AsyncSubject<T> subject, UniTask<T> task)
  66. {
  67. T value;
  68. try
  69. {
  70. value = await task;
  71. }
  72. catch (Exception ex)
  73. {
  74. subject.OnError(ex);
  75. return;
  76. }
  77. subject.OnNext(value);
  78. subject.OnCompleted();
  79. }
  80. static async UniTaskVoid Fire(AsyncSubject<AsyncUnit> subject, UniTask task)
  81. {
  82. try
  83. {
  84. await task;
  85. }
  86. catch (Exception ex)
  87. {
  88. subject.OnError(ex);
  89. return;
  90. }
  91. subject.OnNext(AsyncUnit.Default);
  92. subject.OnCompleted();
  93. }
  94. class ToUniTaskObserver<T> : IObserver<T>
  95. {
  96. static readonly Action<object> callback = OnCanceled;
  97. readonly UniTaskCompletionSource<T> promise;
  98. readonly SingleAssignmentDisposable disposable;
  99. readonly CancellationToken cancellationToken;
  100. readonly CancellationTokenRegistration registration;
  101. bool hasValue;
  102. T latestValue;
  103. public ToUniTaskObserver(UniTaskCompletionSource<T> promise, SingleAssignmentDisposable disposable, CancellationToken cancellationToken)
  104. {
  105. this.promise = promise;
  106. this.disposable = disposable;
  107. this.cancellationToken = cancellationToken;
  108. if (this.cancellationToken.CanBeCanceled)
  109. {
  110. this.registration = this.cancellationToken.RegisterWithoutCaptureExecutionContext(callback, this);
  111. }
  112. }
  113. static void OnCanceled(object state)
  114. {
  115. var self = (ToUniTaskObserver<T>)state;
  116. self.disposable.Dispose();
  117. self.promise.TrySetCanceled(self.cancellationToken);
  118. }
  119. public void OnNext(T value)
  120. {
  121. hasValue = true;
  122. latestValue = value;
  123. }
  124. public void OnError(Exception error)
  125. {
  126. try
  127. {
  128. promise.TrySetException(error);
  129. }
  130. finally
  131. {
  132. registration.Dispose();
  133. disposable.Dispose();
  134. }
  135. }
  136. public void OnCompleted()
  137. {
  138. try
  139. {
  140. if (hasValue)
  141. {
  142. promise.TrySetResult(latestValue);
  143. }
  144. else
  145. {
  146. promise.TrySetException(new InvalidOperationException("Sequence has no elements"));
  147. }
  148. }
  149. finally
  150. {
  151. registration.Dispose();
  152. disposable.Dispose();
  153. }
  154. }
  155. }
  156. class FirstValueToUniTaskObserver<T> : IObserver<T>
  157. {
  158. static readonly Action<object> callback = OnCanceled;
  159. readonly UniTaskCompletionSource<T> promise;
  160. readonly SingleAssignmentDisposable disposable;
  161. readonly CancellationToken cancellationToken;
  162. readonly CancellationTokenRegistration registration;
  163. bool hasValue;
  164. public FirstValueToUniTaskObserver(UniTaskCompletionSource<T> promise, SingleAssignmentDisposable disposable, CancellationToken cancellationToken)
  165. {
  166. this.promise = promise;
  167. this.disposable = disposable;
  168. this.cancellationToken = cancellationToken;
  169. if (this.cancellationToken.CanBeCanceled)
  170. {
  171. this.registration = this.cancellationToken.RegisterWithoutCaptureExecutionContext(callback, this);
  172. }
  173. }
  174. static void OnCanceled(object state)
  175. {
  176. var self = (FirstValueToUniTaskObserver<T>)state;
  177. self.disposable.Dispose();
  178. self.promise.TrySetCanceled(self.cancellationToken);
  179. }
  180. public void OnNext(T value)
  181. {
  182. hasValue = true;
  183. try
  184. {
  185. promise.TrySetResult(value);
  186. }
  187. finally
  188. {
  189. registration.Dispose();
  190. disposable.Dispose();
  191. }
  192. }
  193. public void OnError(Exception error)
  194. {
  195. try
  196. {
  197. promise.TrySetException(error);
  198. }
  199. finally
  200. {
  201. registration.Dispose();
  202. disposable.Dispose();
  203. }
  204. }
  205. public void OnCompleted()
  206. {
  207. try
  208. {
  209. if (!hasValue)
  210. {
  211. promise.TrySetException(new InvalidOperationException("Sequence has no elements"));
  212. }
  213. }
  214. finally
  215. {
  216. registration.Dispose();
  217. disposable.Dispose();
  218. }
  219. }
  220. }
  221. class ReturnObservable<T> : IObservable<T>
  222. {
  223. readonly T value;
  224. public ReturnObservable(T value)
  225. {
  226. this.value = value;
  227. }
  228. public IDisposable Subscribe(IObserver<T> observer)
  229. {
  230. observer.OnNext(value);
  231. observer.OnCompleted();
  232. return EmptyDisposable.Instance;
  233. }
  234. }
  235. class ThrowObservable<T> : IObservable<T>
  236. {
  237. readonly Exception value;
  238. public ThrowObservable(Exception value)
  239. {
  240. this.value = value;
  241. }
  242. public IDisposable Subscribe(IObserver<T> observer)
  243. {
  244. observer.OnError(value);
  245. return EmptyDisposable.Instance;
  246. }
  247. }
  248. }
  249. }
  250. namespace Cysharp.Threading.Tasks.Internal
  251. {
  252. // Bridges for Rx.
  253. internal class EmptyDisposable : IDisposable
  254. {
  255. public static EmptyDisposable Instance = new EmptyDisposable();
  256. EmptyDisposable()
  257. {
  258. }
  259. public void Dispose()
  260. {
  261. }
  262. }
  263. internal sealed class SingleAssignmentDisposable : IDisposable
  264. {
  265. readonly object gate = new object();
  266. IDisposable current;
  267. bool disposed;
  268. public bool IsDisposed { get { lock (gate) { return disposed; } } }
  269. public IDisposable Disposable
  270. {
  271. get
  272. {
  273. return current;
  274. }
  275. set
  276. {
  277. var old = default(IDisposable);
  278. bool alreadyDisposed;
  279. lock (gate)
  280. {
  281. alreadyDisposed = disposed;
  282. old = current;
  283. if (!alreadyDisposed)
  284. {
  285. if (value == null) return;
  286. current = value;
  287. }
  288. }
  289. if (alreadyDisposed && value != null)
  290. {
  291. value.Dispose();
  292. return;
  293. }
  294. if (old != null) throw new InvalidOperationException("Disposable is already set");
  295. }
  296. }
  297. public void Dispose()
  298. {
  299. IDisposable old = null;
  300. lock (gate)
  301. {
  302. if (!disposed)
  303. {
  304. disposed = true;
  305. old = current;
  306. current = null;
  307. }
  308. }
  309. if (old != null) old.Dispose();
  310. }
  311. }
  312. internal sealed class AsyncSubject<T> : IObservable<T>, IObserver<T>
  313. {
  314. object observerLock = new object();
  315. T lastValue;
  316. bool hasValue;
  317. bool isStopped;
  318. bool isDisposed;
  319. Exception lastError;
  320. IObserver<T> outObserver = EmptyObserver<T>.Instance;
  321. public T Value
  322. {
  323. get
  324. {
  325. ThrowIfDisposed();
  326. if (!isStopped) throw new InvalidOperationException("AsyncSubject is not completed yet");
  327. if (lastError != null) ExceptionDispatchInfo.Capture(lastError).Throw();
  328. return lastValue;
  329. }
  330. }
  331. public bool HasObservers
  332. {
  333. get
  334. {
  335. return !(outObserver is EmptyObserver<T>) && !isStopped && !isDisposed;
  336. }
  337. }
  338. public bool IsCompleted { get { return isStopped; } }
  339. public void OnCompleted()
  340. {
  341. IObserver<T> old;
  342. T v;
  343. bool hv;
  344. lock (observerLock)
  345. {
  346. ThrowIfDisposed();
  347. if (isStopped) return;
  348. old = outObserver;
  349. outObserver = EmptyObserver<T>.Instance;
  350. isStopped = true;
  351. v = lastValue;
  352. hv = hasValue;
  353. }
  354. if (hv)
  355. {
  356. old.OnNext(v);
  357. old.OnCompleted();
  358. }
  359. else
  360. {
  361. old.OnCompleted();
  362. }
  363. }
  364. public void OnError(Exception error)
  365. {
  366. if (error == null) throw new ArgumentNullException("error");
  367. IObserver<T> old;
  368. lock (observerLock)
  369. {
  370. ThrowIfDisposed();
  371. if (isStopped) return;
  372. old = outObserver;
  373. outObserver = EmptyObserver<T>.Instance;
  374. isStopped = true;
  375. lastError = error;
  376. }
  377. old.OnError(error);
  378. }
  379. public void OnNext(T value)
  380. {
  381. lock (observerLock)
  382. {
  383. ThrowIfDisposed();
  384. if (isStopped) return;
  385. this.hasValue = true;
  386. this.lastValue = value;
  387. }
  388. }
  389. public IDisposable Subscribe(IObserver<T> observer)
  390. {
  391. if (observer == null) throw new ArgumentNullException("observer");
  392. var ex = default(Exception);
  393. var v = default(T);
  394. var hv = false;
  395. lock (observerLock)
  396. {
  397. ThrowIfDisposed();
  398. if (!isStopped)
  399. {
  400. var listObserver = outObserver as ListObserver<T>;
  401. if (listObserver != null)
  402. {
  403. outObserver = listObserver.Add(observer);
  404. }
  405. else
  406. {
  407. var current = outObserver;
  408. if (current is EmptyObserver<T>)
  409. {
  410. outObserver = observer;
  411. }
  412. else
  413. {
  414. outObserver = new ListObserver<T>(new ImmutableList<IObserver<T>>(new[] { current, observer }));
  415. }
  416. }
  417. return new Subscription(this, observer);
  418. }
  419. ex = lastError;
  420. v = lastValue;
  421. hv = hasValue;
  422. }
  423. if (ex != null)
  424. {
  425. observer.OnError(ex);
  426. }
  427. else if (hv)
  428. {
  429. observer.OnNext(v);
  430. observer.OnCompleted();
  431. }
  432. else
  433. {
  434. observer.OnCompleted();
  435. }
  436. return EmptyDisposable.Instance;
  437. }
  438. public void Dispose()
  439. {
  440. lock (observerLock)
  441. {
  442. isDisposed = true;
  443. outObserver = DisposedObserver<T>.Instance;
  444. lastError = null;
  445. lastValue = default(T);
  446. }
  447. }
  448. void ThrowIfDisposed()
  449. {
  450. if (isDisposed) throw new ObjectDisposedException("");
  451. }
  452. class Subscription : IDisposable
  453. {
  454. readonly object gate = new object();
  455. AsyncSubject<T> parent;
  456. IObserver<T> unsubscribeTarget;
  457. public Subscription(AsyncSubject<T> parent, IObserver<T> unsubscribeTarget)
  458. {
  459. this.parent = parent;
  460. this.unsubscribeTarget = unsubscribeTarget;
  461. }
  462. public void Dispose()
  463. {
  464. lock (gate)
  465. {
  466. if (parent != null)
  467. {
  468. lock (parent.observerLock)
  469. {
  470. var listObserver = parent.outObserver as ListObserver<T>;
  471. if (listObserver != null)
  472. {
  473. parent.outObserver = listObserver.Remove(unsubscribeTarget);
  474. }
  475. else
  476. {
  477. parent.outObserver = EmptyObserver<T>.Instance;
  478. }
  479. unsubscribeTarget = null;
  480. parent = null;
  481. }
  482. }
  483. }
  484. }
  485. }
  486. }
  487. internal class ListObserver<T> : IObserver<T>
  488. {
  489. private readonly ImmutableList<IObserver<T>> _observers;
  490. public ListObserver(ImmutableList<IObserver<T>> observers)
  491. {
  492. _observers = observers;
  493. }
  494. public void OnCompleted()
  495. {
  496. var targetObservers = _observers.Data;
  497. for (int i = 0; i < targetObservers.Length; i++)
  498. {
  499. targetObservers[i].OnCompleted();
  500. }
  501. }
  502. public void OnError(Exception error)
  503. {
  504. var targetObservers = _observers.Data;
  505. for (int i = 0; i < targetObservers.Length; i++)
  506. {
  507. targetObservers[i].OnError(error);
  508. }
  509. }
  510. public void OnNext(T value)
  511. {
  512. var targetObservers = _observers.Data;
  513. for (int i = 0; i < targetObservers.Length; i++)
  514. {
  515. targetObservers[i].OnNext(value);
  516. }
  517. }
  518. internal IObserver<T> Add(IObserver<T> observer)
  519. {
  520. return new ListObserver<T>(_observers.Add(observer));
  521. }
  522. internal IObserver<T> Remove(IObserver<T> observer)
  523. {
  524. var i = Array.IndexOf(_observers.Data, observer);
  525. if (i < 0)
  526. return this;
  527. if (_observers.Data.Length == 2)
  528. {
  529. return _observers.Data[1 - i];
  530. }
  531. else
  532. {
  533. return new ListObserver<T>(_observers.Remove(observer));
  534. }
  535. }
  536. }
  537. internal class EmptyObserver<T> : IObserver<T>
  538. {
  539. public static readonly EmptyObserver<T> Instance = new EmptyObserver<T>();
  540. EmptyObserver()
  541. {
  542. }
  543. public void OnCompleted()
  544. {
  545. }
  546. public void OnError(Exception error)
  547. {
  548. }
  549. public void OnNext(T value)
  550. {
  551. }
  552. }
  553. internal class ThrowObserver<T> : IObserver<T>
  554. {
  555. public static readonly ThrowObserver<T> Instance = new ThrowObserver<T>();
  556. ThrowObserver()
  557. {
  558. }
  559. public void OnCompleted()
  560. {
  561. }
  562. public void OnError(Exception error)
  563. {
  564. ExceptionDispatchInfo.Capture(error).Throw();
  565. }
  566. public void OnNext(T value)
  567. {
  568. }
  569. }
  570. internal class DisposedObserver<T> : IObserver<T>
  571. {
  572. public static readonly DisposedObserver<T> Instance = new DisposedObserver<T>();
  573. DisposedObserver()
  574. {
  575. }
  576. public void OnCompleted()
  577. {
  578. throw new ObjectDisposedException("");
  579. }
  580. public void OnError(Exception error)
  581. {
  582. throw new ObjectDisposedException("");
  583. }
  584. public void OnNext(T value)
  585. {
  586. throw new ObjectDisposedException("");
  587. }
  588. }
  589. internal class ImmutableList<T>
  590. {
  591. public static readonly ImmutableList<T> Empty = new ImmutableList<T>();
  592. T[] data;
  593. public T[] Data
  594. {
  595. get { return data; }
  596. }
  597. ImmutableList()
  598. {
  599. data = new T[0];
  600. }
  601. public ImmutableList(T[] data)
  602. {
  603. this.data = data;
  604. }
  605. public ImmutableList<T> Add(T value)
  606. {
  607. var newData = new T[data.Length + 1];
  608. Array.Copy(data, newData, data.Length);
  609. newData[data.Length] = value;
  610. return new ImmutableList<T>(newData);
  611. }
  612. public ImmutableList<T> Remove(T value)
  613. {
  614. var i = IndexOf(value);
  615. if (i < 0) return this;
  616. var length = data.Length;
  617. if (length == 1) return Empty;
  618. var newData = new T[length - 1];
  619. Array.Copy(data, 0, newData, 0, i);
  620. Array.Copy(data, i + 1, newData, i, length - i - 1);
  621. return new ImmutableList<T>(newData);
  622. }
  623. public int IndexOf(T value)
  624. {
  625. for (var i = 0; i < data.Length; ++i)
  626. {
  627. // ImmutableList only use for IObserver(no worry for boxed)
  628. if (object.Equals(data[i], value)) return i;
  629. }
  630. return -1;
  631. }
  632. }
  633. }