AsyncReactiveProperty.cs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644
  1. using System;
  2. using System.Threading;
  3. namespace Cysharp.Threading.Tasks
  4. {
  5. public interface IReadOnlyAsyncReactiveProperty<T> : IUniTaskAsyncEnumerable<T>
  6. {
  7. T Value { get; }
  8. IUniTaskAsyncEnumerable<T> WithoutCurrent();
  9. UniTask<T> WaitAsync(CancellationToken cancellationToken = default);
  10. }
  11. public interface IAsyncReactiveProperty<T> : IReadOnlyAsyncReactiveProperty<T>
  12. {
  13. new T Value { get; set; }
  14. }
  15. [Serializable]
  16. public class AsyncReactiveProperty<T> : IAsyncReactiveProperty<T>, IDisposable
  17. {
  18. TriggerEvent<T> triggerEvent;
  19. #if UNITY_2018_3_OR_NEWER
  20. [UnityEngine.SerializeField]
  21. #endif
  22. T latestValue;
  23. public T Value
  24. {
  25. get
  26. {
  27. return latestValue;
  28. }
  29. set
  30. {
  31. this.latestValue = value;
  32. triggerEvent.SetResult(value);
  33. }
  34. }
  35. public AsyncReactiveProperty(T value)
  36. {
  37. this.latestValue = value;
  38. this.triggerEvent = default;
  39. }
  40. public IUniTaskAsyncEnumerable<T> WithoutCurrent()
  41. {
  42. return new WithoutCurrentEnumerable(this);
  43. }
  44. public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
  45. {
  46. return new Enumerator(this, cancellationToken, true);
  47. }
  48. public void Dispose()
  49. {
  50. triggerEvent.SetCompleted();
  51. }
  52. public static implicit operator T(AsyncReactiveProperty<T> value)
  53. {
  54. return value.Value;
  55. }
  56. public override string ToString()
  57. {
  58. if (isValueType) return latestValue.ToString();
  59. return latestValue?.ToString();
  60. }
  61. public UniTask<T> WaitAsync(CancellationToken cancellationToken = default)
  62. {
  63. return new UniTask<T>(WaitAsyncSource.Create(this, cancellationToken, out var token), token);
  64. }
  65. static bool isValueType;
  66. static AsyncReactiveProperty()
  67. {
  68. isValueType = typeof(T).IsValueType;
  69. }
  70. sealed class WaitAsyncSource : IUniTaskSource<T>, ITriggerHandler<T>, ITaskPoolNode<WaitAsyncSource>
  71. {
  72. static Action<object> cancellationCallback = CancellationCallback;
  73. static TaskPool<WaitAsyncSource> pool;
  74. WaitAsyncSource nextNode;
  75. ref WaitAsyncSource ITaskPoolNode<WaitAsyncSource>.NextNode => ref nextNode;
  76. static WaitAsyncSource()
  77. {
  78. TaskPool.RegisterSizeGetter(typeof(WaitAsyncSource), () => pool.Size);
  79. }
  80. AsyncReactiveProperty<T> parent;
  81. CancellationToken cancellationToken;
  82. CancellationTokenRegistration cancellationTokenRegistration;
  83. UniTaskCompletionSourceCore<T> core;
  84. WaitAsyncSource()
  85. {
  86. }
  87. public static IUniTaskSource<T> Create(AsyncReactiveProperty<T> parent, CancellationToken cancellationToken, out short token)
  88. {
  89. if (cancellationToken.IsCancellationRequested)
  90. {
  91. return AutoResetUniTaskCompletionSource<T>.CreateFromCanceled(cancellationToken, out token);
  92. }
  93. if (!pool.TryPop(out var result))
  94. {
  95. result = new WaitAsyncSource();
  96. }
  97. result.parent = parent;
  98. result.cancellationToken = cancellationToken;
  99. if (cancellationToken.CanBeCanceled)
  100. {
  101. result.cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, result);
  102. }
  103. result.parent.triggerEvent.Add(result);
  104. TaskTracker.TrackActiveTask(result, 3);
  105. token = result.core.Version;
  106. return result;
  107. }
  108. bool TryReturn()
  109. {
  110. TaskTracker.RemoveTracking(this);
  111. core.Reset();
  112. cancellationTokenRegistration.Dispose();
  113. cancellationTokenRegistration = default;
  114. parent.triggerEvent.Remove(this);
  115. parent = null;
  116. cancellationToken = default;
  117. return pool.TryPush(this);
  118. }
  119. static void CancellationCallback(object state)
  120. {
  121. var self = (WaitAsyncSource)state;
  122. self.OnCanceled(self.cancellationToken);
  123. }
  124. // IUniTaskSource
  125. public T GetResult(short token)
  126. {
  127. try
  128. {
  129. return core.GetResult(token);
  130. }
  131. finally
  132. {
  133. TryReturn();
  134. }
  135. }
  136. void IUniTaskSource.GetResult(short token)
  137. {
  138. GetResult(token);
  139. }
  140. public void OnCompleted(Action<object> continuation, object state, short token)
  141. {
  142. core.OnCompleted(continuation, state, token);
  143. }
  144. public UniTaskStatus GetStatus(short token)
  145. {
  146. return core.GetStatus(token);
  147. }
  148. public UniTaskStatus UnsafeGetStatus()
  149. {
  150. return core.UnsafeGetStatus();
  151. }
  152. // ITriggerHandler
  153. ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; }
  154. ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; }
  155. public void OnCanceled(CancellationToken cancellationToken)
  156. {
  157. core.TrySetCanceled(cancellationToken);
  158. }
  159. public void OnCompleted()
  160. {
  161. // Complete as Cancel.
  162. core.TrySetCanceled(CancellationToken.None);
  163. }
  164. public void OnError(Exception ex)
  165. {
  166. core.TrySetException(ex);
  167. }
  168. public void OnNext(T value)
  169. {
  170. core.TrySetResult(value);
  171. }
  172. }
  173. sealed class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
  174. {
  175. readonly AsyncReactiveProperty<T> parent;
  176. public WithoutCurrentEnumerable(AsyncReactiveProperty<T> parent)
  177. {
  178. this.parent = parent;
  179. }
  180. public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
  181. {
  182. return new Enumerator(parent, cancellationToken, false);
  183. }
  184. }
  185. sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, ITriggerHandler<T>
  186. {
  187. static Action<object> cancellationCallback = CancellationCallback;
  188. readonly AsyncReactiveProperty<T> parent;
  189. readonly CancellationToken cancellationToken;
  190. readonly CancellationTokenRegistration cancellationTokenRegistration;
  191. T value;
  192. bool isDisposed;
  193. bool firstCall;
  194. public Enumerator(AsyncReactiveProperty<T> parent, CancellationToken cancellationToken, bool publishCurrentValue)
  195. {
  196. this.parent = parent;
  197. this.cancellationToken = cancellationToken;
  198. this.firstCall = publishCurrentValue;
  199. parent.triggerEvent.Add(this);
  200. TaskTracker.TrackActiveTask(this, 3);
  201. if (cancellationToken.CanBeCanceled)
  202. {
  203. cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, this);
  204. }
  205. }
  206. public T Current => value;
  207. ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; }
  208. ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; }
  209. public UniTask<bool> MoveNextAsync()
  210. {
  211. // raise latest value on first call.
  212. if (firstCall)
  213. {
  214. firstCall = false;
  215. value = parent.Value;
  216. return CompletedTasks.True;
  217. }
  218. completionSource.Reset();
  219. return new UniTask<bool>(this, completionSource.Version);
  220. }
  221. public UniTask DisposeAsync()
  222. {
  223. if (!isDisposed)
  224. {
  225. isDisposed = true;
  226. TaskTracker.RemoveTracking(this);
  227. completionSource.TrySetCanceled(cancellationToken);
  228. parent.triggerEvent.Remove(this);
  229. }
  230. return default;
  231. }
  232. public void OnNext(T value)
  233. {
  234. this.value = value;
  235. completionSource.TrySetResult(true);
  236. }
  237. public void OnCanceled(CancellationToken cancellationToken)
  238. {
  239. DisposeAsync().Forget();
  240. }
  241. public void OnCompleted()
  242. {
  243. completionSource.TrySetResult(false);
  244. }
  245. public void OnError(Exception ex)
  246. {
  247. completionSource.TrySetException(ex);
  248. }
  249. static void CancellationCallback(object state)
  250. {
  251. var self = (Enumerator)state;
  252. self.DisposeAsync().Forget();
  253. }
  254. }
  255. }
  256. public class ReadOnlyAsyncReactiveProperty<T> : IReadOnlyAsyncReactiveProperty<T>, IDisposable
  257. {
  258. TriggerEvent<T> triggerEvent;
  259. T latestValue;
  260. IUniTaskAsyncEnumerator<T> enumerator;
  261. public T Value
  262. {
  263. get
  264. {
  265. return latestValue;
  266. }
  267. }
  268. public ReadOnlyAsyncReactiveProperty(T initialValue, IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
  269. {
  270. latestValue = initialValue;
  271. ConsumeEnumerator(source, cancellationToken).Forget();
  272. }
  273. public ReadOnlyAsyncReactiveProperty(IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
  274. {
  275. ConsumeEnumerator(source, cancellationToken).Forget();
  276. }
  277. async UniTaskVoid ConsumeEnumerator(IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
  278. {
  279. enumerator = source.GetAsyncEnumerator(cancellationToken);
  280. try
  281. {
  282. while (await enumerator.MoveNextAsync())
  283. {
  284. var value = enumerator.Current;
  285. this.latestValue = value;
  286. triggerEvent.SetResult(value);
  287. }
  288. }
  289. finally
  290. {
  291. await enumerator.DisposeAsync();
  292. enumerator = null;
  293. }
  294. }
  295. public IUniTaskAsyncEnumerable<T> WithoutCurrent()
  296. {
  297. return new WithoutCurrentEnumerable(this);
  298. }
  299. public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
  300. {
  301. return new Enumerator(this, cancellationToken, true);
  302. }
  303. public void Dispose()
  304. {
  305. if (enumerator != null)
  306. {
  307. enumerator.DisposeAsync().Forget();
  308. }
  309. triggerEvent.SetCompleted();
  310. }
  311. public static implicit operator T(ReadOnlyAsyncReactiveProperty<T> value)
  312. {
  313. return value.Value;
  314. }
  315. public override string ToString()
  316. {
  317. if (isValueType) return latestValue.ToString();
  318. return latestValue?.ToString();
  319. }
  320. public UniTask<T> WaitAsync(CancellationToken cancellationToken = default)
  321. {
  322. return new UniTask<T>(WaitAsyncSource.Create(this, cancellationToken, out var token), token);
  323. }
  324. static bool isValueType;
  325. static ReadOnlyAsyncReactiveProperty()
  326. {
  327. isValueType = typeof(T).IsValueType;
  328. }
  329. sealed class WaitAsyncSource : IUniTaskSource<T>, ITriggerHandler<T>, ITaskPoolNode<WaitAsyncSource>
  330. {
  331. static Action<object> cancellationCallback = CancellationCallback;
  332. static TaskPool<WaitAsyncSource> pool;
  333. WaitAsyncSource nextNode;
  334. ref WaitAsyncSource ITaskPoolNode<WaitAsyncSource>.NextNode => ref nextNode;
  335. static WaitAsyncSource()
  336. {
  337. TaskPool.RegisterSizeGetter(typeof(WaitAsyncSource), () => pool.Size);
  338. }
  339. ReadOnlyAsyncReactiveProperty<T> parent;
  340. CancellationToken cancellationToken;
  341. CancellationTokenRegistration cancellationTokenRegistration;
  342. UniTaskCompletionSourceCore<T> core;
  343. WaitAsyncSource()
  344. {
  345. }
  346. public static IUniTaskSource<T> Create(ReadOnlyAsyncReactiveProperty<T> parent, CancellationToken cancellationToken, out short token)
  347. {
  348. if (cancellationToken.IsCancellationRequested)
  349. {
  350. return AutoResetUniTaskCompletionSource<T>.CreateFromCanceled(cancellationToken, out token);
  351. }
  352. if (!pool.TryPop(out var result))
  353. {
  354. result = new WaitAsyncSource();
  355. }
  356. result.parent = parent;
  357. result.cancellationToken = cancellationToken;
  358. if (cancellationToken.CanBeCanceled)
  359. {
  360. result.cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, result);
  361. }
  362. result.parent.triggerEvent.Add(result);
  363. TaskTracker.TrackActiveTask(result, 3);
  364. token = result.core.Version;
  365. return result;
  366. }
  367. bool TryReturn()
  368. {
  369. TaskTracker.RemoveTracking(this);
  370. core.Reset();
  371. cancellationTokenRegistration.Dispose();
  372. cancellationTokenRegistration = default;
  373. parent.triggerEvent.Remove(this);
  374. parent = null;
  375. cancellationToken = default;
  376. return pool.TryPush(this);
  377. }
  378. static void CancellationCallback(object state)
  379. {
  380. var self = (WaitAsyncSource)state;
  381. self.OnCanceled(self.cancellationToken);
  382. }
  383. // IUniTaskSource
  384. public T GetResult(short token)
  385. {
  386. try
  387. {
  388. return core.GetResult(token);
  389. }
  390. finally
  391. {
  392. TryReturn();
  393. }
  394. }
  395. void IUniTaskSource.GetResult(short token)
  396. {
  397. GetResult(token);
  398. }
  399. public void OnCompleted(Action<object> continuation, object state, short token)
  400. {
  401. core.OnCompleted(continuation, state, token);
  402. }
  403. public UniTaskStatus GetStatus(short token)
  404. {
  405. return core.GetStatus(token);
  406. }
  407. public UniTaskStatus UnsafeGetStatus()
  408. {
  409. return core.UnsafeGetStatus();
  410. }
  411. // ITriggerHandler
  412. ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; }
  413. ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; }
  414. public void OnCanceled(CancellationToken cancellationToken)
  415. {
  416. core.TrySetCanceled(cancellationToken);
  417. }
  418. public void OnCompleted()
  419. {
  420. // Complete as Cancel.
  421. core.TrySetCanceled(CancellationToken.None);
  422. }
  423. public void OnError(Exception ex)
  424. {
  425. core.TrySetException(ex);
  426. }
  427. public void OnNext(T value)
  428. {
  429. core.TrySetResult(value);
  430. }
  431. }
  432. sealed class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
  433. {
  434. readonly ReadOnlyAsyncReactiveProperty<T> parent;
  435. public WithoutCurrentEnumerable(ReadOnlyAsyncReactiveProperty<T> parent)
  436. {
  437. this.parent = parent;
  438. }
  439. public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
  440. {
  441. return new Enumerator(parent, cancellationToken, false);
  442. }
  443. }
  444. sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, ITriggerHandler<T>
  445. {
  446. static Action<object> cancellationCallback = CancellationCallback;
  447. readonly ReadOnlyAsyncReactiveProperty<T> parent;
  448. readonly CancellationToken cancellationToken;
  449. readonly CancellationTokenRegistration cancellationTokenRegistration;
  450. T value;
  451. bool isDisposed;
  452. bool firstCall;
  453. public Enumerator(ReadOnlyAsyncReactiveProperty<T> parent, CancellationToken cancellationToken, bool publishCurrentValue)
  454. {
  455. this.parent = parent;
  456. this.cancellationToken = cancellationToken;
  457. this.firstCall = publishCurrentValue;
  458. parent.triggerEvent.Add(this);
  459. TaskTracker.TrackActiveTask(this, 3);
  460. if (cancellationToken.CanBeCanceled)
  461. {
  462. cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, this);
  463. }
  464. }
  465. public T Current => value;
  466. ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; }
  467. ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; }
  468. public UniTask<bool> MoveNextAsync()
  469. {
  470. // raise latest value on first call.
  471. if (firstCall)
  472. {
  473. firstCall = false;
  474. value = parent.Value;
  475. return CompletedTasks.True;
  476. }
  477. completionSource.Reset();
  478. return new UniTask<bool>(this, completionSource.Version);
  479. }
  480. public UniTask DisposeAsync()
  481. {
  482. if (!isDisposed)
  483. {
  484. isDisposed = true;
  485. TaskTracker.RemoveTracking(this);
  486. completionSource.TrySetCanceled(cancellationToken);
  487. parent.triggerEvent.Remove(this);
  488. }
  489. return default;
  490. }
  491. public void OnNext(T value)
  492. {
  493. this.value = value;
  494. completionSource.TrySetResult(true);
  495. }
  496. public void OnCanceled(CancellationToken cancellationToken)
  497. {
  498. DisposeAsync().Forget();
  499. }
  500. public void OnCompleted()
  501. {
  502. completionSource.TrySetResult(false);
  503. }
  504. public void OnError(Exception ex)
  505. {
  506. completionSource.TrySetException(ex);
  507. }
  508. static void CancellationCallback(object state)
  509. {
  510. var self = (Enumerator)state;
  511. self.DisposeAsync().Forget();
  512. }
  513. }
  514. }
  515. public static class StateExtensions
  516. {
  517. public static ReadOnlyAsyncReactiveProperty<T> ToReadOnlyAsyncReactiveProperty<T>(this IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
  518. {
  519. return new ReadOnlyAsyncReactiveProperty<T>(source, cancellationToken);
  520. }
  521. public static ReadOnlyAsyncReactiveProperty<T> ToReadOnlyAsyncReactiveProperty<T>(this IUniTaskAsyncEnumerable<T> source, T initialValue, CancellationToken cancellationToken)
  522. {
  523. return new ReadOnlyAsyncReactiveProperty<T>(initialValue, source, cancellationToken);
  524. }
  525. }
  526. }