Subscribe.cs 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. using Cysharp.Threading.Tasks.Internal;
  2. using System;
  3. using System.Threading;
  4. using Subscribes = Cysharp.Threading.Tasks.Linq.Subscribe;
  5. namespace Cysharp.Threading.Tasks.Linq
  6. {
  7. public static partial class UniTaskAsyncEnumerable
  8. {
  9. // OnNext
  10. public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Action<TSource> action)
  11. {
  12. Error.ThrowArgumentNullException(source, nameof(source));
  13. Error.ThrowArgumentNullException(action, nameof(action));
  14. var cts = new CancellationTokenDisposable();
  15. Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget();
  16. return cts;
  17. }
  18. public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTaskVoid> action)
  19. {
  20. Error.ThrowArgumentNullException(source, nameof(source));
  21. Error.ThrowArgumentNullException(action, nameof(action));
  22. var cts = new CancellationTokenDisposable();
  23. Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget();
  24. return cts;
  25. }
  26. public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTaskVoid> action)
  27. {
  28. Error.ThrowArgumentNullException(source, nameof(source));
  29. Error.ThrowArgumentNullException(action, nameof(action));
  30. var cts = new CancellationTokenDisposable();
  31. Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget();
  32. return cts;
  33. }
  34. public static void Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Action<TSource> action, CancellationToken cancellationToken)
  35. {
  36. Error.ThrowArgumentNullException(source, nameof(source));
  37. Error.ThrowArgumentNullException(action, nameof(action));
  38. Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget();
  39. }
  40. public static void Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTaskVoid> action, CancellationToken cancellationToken)
  41. {
  42. Error.ThrowArgumentNullException(source, nameof(source));
  43. Error.ThrowArgumentNullException(action, nameof(action));
  44. Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget();
  45. }
  46. public static void Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTaskVoid> action, CancellationToken cancellationToken)
  47. {
  48. Error.ThrowArgumentNullException(source, nameof(source));
  49. Error.ThrowArgumentNullException(action, nameof(action));
  50. Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget();
  51. }
  52. public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext)
  53. {
  54. Error.ThrowArgumentNullException(source, nameof(source));
  55. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  56. var cts = new CancellationTokenDisposable();
  57. Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget();
  58. return cts;
  59. }
  60. public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, CancellationToken cancellationToken)
  61. {
  62. Error.ThrowArgumentNullException(source, nameof(source));
  63. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  64. Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget();
  65. }
  66. public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext)
  67. {
  68. Error.ThrowArgumentNullException(source, nameof(source));
  69. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  70. var cts = new CancellationTokenDisposable();
  71. Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget();
  72. return cts;
  73. }
  74. public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, CancellationToken cancellationToken)
  75. {
  76. Error.ThrowArgumentNullException(source, nameof(source));
  77. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  78. Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget();
  79. }
  80. // OnNext, OnError
  81. public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Action<TSource> onNext, Action<Exception> onError)
  82. {
  83. Error.ThrowArgumentNullException(source, nameof(source));
  84. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  85. Error.ThrowArgumentNullException(onError, nameof(onError));
  86. var cts = new CancellationTokenDisposable();
  87. Subscribes.SubscribeCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget();
  88. return cts;
  89. }
  90. public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTaskVoid> onNext, Action<Exception> onError)
  91. {
  92. Error.ThrowArgumentNullException(source, nameof(source));
  93. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  94. Error.ThrowArgumentNullException(onError, nameof(onError));
  95. var cts = new CancellationTokenDisposable();
  96. Subscribes.SubscribeCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget();
  97. return cts;
  98. }
  99. public static void Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Action<TSource> onNext, Action<Exception> onError, CancellationToken cancellationToken)
  100. {
  101. Error.ThrowArgumentNullException(source, nameof(source));
  102. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  103. Error.ThrowArgumentNullException(onError, nameof(onError));
  104. Subscribes.SubscribeCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget();
  105. }
  106. public static void Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTaskVoid> onNext, Action<Exception> onError, CancellationToken cancellationToken)
  107. {
  108. Error.ThrowArgumentNullException(source, nameof(source));
  109. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  110. Error.ThrowArgumentNullException(onError, nameof(onError));
  111. Subscribes.SubscribeCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget();
  112. }
  113. public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action<Exception> onError)
  114. {
  115. Error.ThrowArgumentNullException(source, nameof(source));
  116. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  117. Error.ThrowArgumentNullException(onError, nameof(onError));
  118. var cts = new CancellationTokenDisposable();
  119. Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget();
  120. return cts;
  121. }
  122. public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action<Exception> onError, CancellationToken cancellationToken)
  123. {
  124. Error.ThrowArgumentNullException(source, nameof(source));
  125. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  126. Error.ThrowArgumentNullException(onError, nameof(onError));
  127. Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget();
  128. }
  129. public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action<Exception> onError)
  130. {
  131. Error.ThrowArgumentNullException(source, nameof(source));
  132. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  133. Error.ThrowArgumentNullException(onError, nameof(onError));
  134. var cts = new CancellationTokenDisposable();
  135. Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget();
  136. return cts;
  137. }
  138. public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action<Exception> onError, CancellationToken cancellationToken)
  139. {
  140. Error.ThrowArgumentNullException(source, nameof(source));
  141. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  142. Error.ThrowArgumentNullException(onError, nameof(onError));
  143. Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget();
  144. }
  145. // OnNext, OnCompleted
  146. public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Action<TSource> onNext, Action onCompleted)
  147. {
  148. Error.ThrowArgumentNullException(source, nameof(source));
  149. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  150. Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
  151. var cts = new CancellationTokenDisposable();
  152. Subscribes.SubscribeCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget();
  153. return cts;
  154. }
  155. public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTaskVoid> onNext, Action onCompleted)
  156. {
  157. Error.ThrowArgumentNullException(source, nameof(source));
  158. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  159. Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
  160. var cts = new CancellationTokenDisposable();
  161. Subscribes.SubscribeCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget();
  162. return cts;
  163. }
  164. public static void Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Action<TSource> onNext, Action onCompleted, CancellationToken cancellationToken)
  165. {
  166. Error.ThrowArgumentNullException(source, nameof(source));
  167. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  168. Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
  169. Subscribes.SubscribeCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget();
  170. }
  171. public static void Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTaskVoid> onNext, Action onCompleted, CancellationToken cancellationToken)
  172. {
  173. Error.ThrowArgumentNullException(source, nameof(source));
  174. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  175. Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
  176. Subscribes.SubscribeCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget();
  177. }
  178. public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action onCompleted)
  179. {
  180. Error.ThrowArgumentNullException(source, nameof(source));
  181. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  182. Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
  183. var cts = new CancellationTokenDisposable();
  184. Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget();
  185. return cts;
  186. }
  187. public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action onCompleted, CancellationToken cancellationToken)
  188. {
  189. Error.ThrowArgumentNullException(source, nameof(source));
  190. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  191. Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
  192. Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget();
  193. }
  194. public static IDisposable SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action onCompleted)
  195. {
  196. Error.ThrowArgumentNullException(source, nameof(source));
  197. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  198. Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
  199. var cts = new CancellationTokenDisposable();
  200. Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget();
  201. return cts;
  202. }
  203. public static void SubscribeAwait<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action onCompleted, CancellationToken cancellationToken)
  204. {
  205. Error.ThrowArgumentNullException(source, nameof(source));
  206. Error.ThrowArgumentNullException(onNext, nameof(onNext));
  207. Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted));
  208. Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget();
  209. }
  210. // IObserver
  211. public static IDisposable Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, IObserver<TSource> observer)
  212. {
  213. Error.ThrowArgumentNullException(source, nameof(source));
  214. Error.ThrowArgumentNullException(observer, nameof(observer));
  215. var cts = new CancellationTokenDisposable();
  216. Subscribes.SubscribeCore(source, observer, cts.Token).Forget();
  217. return cts;
  218. }
  219. public static void Subscribe<TSource>(this IUniTaskAsyncEnumerable<TSource> source, IObserver<TSource> observer, CancellationToken cancellationToken)
  220. {
  221. Error.ThrowArgumentNullException(source, nameof(source));
  222. Error.ThrowArgumentNullException(observer, nameof(observer));
  223. Subscribes.SubscribeCore(source, observer, cancellationToken).Forget();
  224. }
  225. }
  226. internal sealed class CancellationTokenDisposable : IDisposable
  227. {
  228. readonly CancellationTokenSource cts = new CancellationTokenSource();
  229. public CancellationToken Token => cts.Token;
  230. public void Dispose()
  231. {
  232. if (!cts.IsCancellationRequested)
  233. {
  234. cts.Cancel();
  235. }
  236. }
  237. }
  238. internal static class Subscribe
  239. {
  240. public static readonly Action<Exception> NopError = _ => { };
  241. public static readonly Action NopCompleted = () => { };
  242. public static async UniTaskVoid SubscribeCore<TSource>(IUniTaskAsyncEnumerable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted, CancellationToken cancellationToken)
  243. {
  244. var e = source.GetAsyncEnumerator(cancellationToken);
  245. try
  246. {
  247. while (await e.MoveNextAsync())
  248. {
  249. try
  250. {
  251. onNext(e.Current);
  252. }
  253. catch (Exception ex)
  254. {
  255. UniTaskScheduler.PublishUnobservedTaskException(ex);
  256. }
  257. }
  258. onCompleted();
  259. }
  260. catch (Exception ex)
  261. {
  262. if (onError == NopError)
  263. {
  264. UniTaskScheduler.PublishUnobservedTaskException(ex);
  265. return;
  266. }
  267. if (ex is OperationCanceledException) return;
  268. onError(ex);
  269. }
  270. finally
  271. {
  272. if (e != null)
  273. {
  274. await e.DisposeAsync();
  275. }
  276. }
  277. }
  278. public static async UniTaskVoid SubscribeCore<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTaskVoid> onNext, Action<Exception> onError, Action onCompleted, CancellationToken cancellationToken)
  279. {
  280. var e = source.GetAsyncEnumerator(cancellationToken);
  281. try
  282. {
  283. while (await e.MoveNextAsync())
  284. {
  285. try
  286. {
  287. onNext(e.Current).Forget();
  288. }
  289. catch (Exception ex)
  290. {
  291. UniTaskScheduler.PublishUnobservedTaskException(ex);
  292. }
  293. }
  294. onCompleted();
  295. }
  296. catch (Exception ex)
  297. {
  298. if (onError == NopError)
  299. {
  300. UniTaskScheduler.PublishUnobservedTaskException(ex);
  301. return;
  302. }
  303. if (ex is OperationCanceledException) return;
  304. onError(ex);
  305. }
  306. finally
  307. {
  308. if (e != null)
  309. {
  310. await e.DisposeAsync();
  311. }
  312. }
  313. }
  314. public static async UniTaskVoid SubscribeCore<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTaskVoid> onNext, Action<Exception> onError, Action onCompleted, CancellationToken cancellationToken)
  315. {
  316. var e = source.GetAsyncEnumerator(cancellationToken);
  317. try
  318. {
  319. while (await e.MoveNextAsync())
  320. {
  321. try
  322. {
  323. onNext(e.Current, cancellationToken).Forget();
  324. }
  325. catch (Exception ex)
  326. {
  327. UniTaskScheduler.PublishUnobservedTaskException(ex);
  328. }
  329. }
  330. onCompleted();
  331. }
  332. catch (Exception ex)
  333. {
  334. if (onError == NopError)
  335. {
  336. UniTaskScheduler.PublishUnobservedTaskException(ex);
  337. return;
  338. }
  339. if (ex is OperationCanceledException) return;
  340. onError(ex);
  341. }
  342. finally
  343. {
  344. if (e != null)
  345. {
  346. await e.DisposeAsync();
  347. }
  348. }
  349. }
  350. public static async UniTaskVoid SubscribeCore<TSource>(IUniTaskAsyncEnumerable<TSource> source, IObserver<TSource> observer, CancellationToken cancellationToken)
  351. {
  352. var e = source.GetAsyncEnumerator(cancellationToken);
  353. try
  354. {
  355. while (await e.MoveNextAsync())
  356. {
  357. try
  358. {
  359. observer.OnNext(e.Current);
  360. }
  361. catch (Exception ex)
  362. {
  363. UniTaskScheduler.PublishUnobservedTaskException(ex);
  364. }
  365. }
  366. observer.OnCompleted();
  367. }
  368. catch (Exception ex)
  369. {
  370. if (ex is OperationCanceledException) return;
  371. observer.OnError(ex);
  372. }
  373. finally
  374. {
  375. if (e != null)
  376. {
  377. await e.DisposeAsync();
  378. }
  379. }
  380. }
  381. public static async UniTaskVoid SubscribeAwaitCore<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> onNext, Action<Exception> onError, Action onCompleted, CancellationToken cancellationToken)
  382. {
  383. var e = source.GetAsyncEnumerator(cancellationToken);
  384. try
  385. {
  386. while (await e.MoveNextAsync())
  387. {
  388. try
  389. {
  390. await onNext(e.Current);
  391. }
  392. catch (Exception ex)
  393. {
  394. UniTaskScheduler.PublishUnobservedTaskException(ex);
  395. }
  396. }
  397. onCompleted();
  398. }
  399. catch (Exception ex)
  400. {
  401. if (onError == NopError)
  402. {
  403. UniTaskScheduler.PublishUnobservedTaskException(ex);
  404. return;
  405. }
  406. if (ex is OperationCanceledException) return;
  407. onError(ex);
  408. }
  409. finally
  410. {
  411. if (e != null)
  412. {
  413. await e.DisposeAsync();
  414. }
  415. }
  416. }
  417. public static async UniTaskVoid SubscribeAwaitCore<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> onNext, Action<Exception> onError, Action onCompleted, CancellationToken cancellationToken)
  418. {
  419. var e = source.GetAsyncEnumerator(cancellationToken);
  420. try
  421. {
  422. while (await e.MoveNextAsync())
  423. {
  424. try
  425. {
  426. await onNext(e.Current, cancellationToken);
  427. }
  428. catch (Exception ex)
  429. {
  430. UniTaskScheduler.PublishUnobservedTaskException(ex);
  431. }
  432. }
  433. onCompleted();
  434. }
  435. catch (Exception ex)
  436. {
  437. if (onError == NopError)
  438. {
  439. UniTaskScheduler.PublishUnobservedTaskException(ex);
  440. return;
  441. }
  442. if (ex is OperationCanceledException) return;
  443. onError(ex);
  444. }
  445. finally
  446. {
  447. if (e != null)
  448. {
  449. await e.DisposeAsync();
  450. }
  451. }
  452. }
  453. }
  454. }