ToObservable.cs 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. using Cysharp.Threading.Tasks.Internal;
  2. using System;
  3. using System.Threading;
  4. namespace Cysharp.Threading.Tasks.Linq
  5. {
  6. public static partial class UniTaskAsyncEnumerable
  7. {
  8. public static IObservable<TSource> ToObservable<TSource>(this IUniTaskAsyncEnumerable<TSource> source)
  9. {
  10. Error.ThrowArgumentNullException(source, nameof(source));
  11. return new ToObservable<TSource>(source);
  12. }
  13. }
  14. internal sealed class ToObservable<T> : IObservable<T>
  15. {
  16. readonly IUniTaskAsyncEnumerable<T> source;
  17. public ToObservable(IUniTaskAsyncEnumerable<T> source)
  18. {
  19. this.source = source;
  20. }
  21. public IDisposable Subscribe(IObserver<T> observer)
  22. {
  23. var ctd = new CancellationTokenDisposable();
  24. RunAsync(source, observer, ctd.Token).Forget();
  25. return ctd;
  26. }
  27. static async UniTaskVoid RunAsync(IUniTaskAsyncEnumerable<T> src, IObserver<T> observer, CancellationToken cancellationToken)
  28. {
  29. // cancellationToken.IsCancellationRequested is called when Rx's Disposed.
  30. // when disposed, finish silently.
  31. var e = src.GetAsyncEnumerator(cancellationToken);
  32. try
  33. {
  34. bool hasNext;
  35. do
  36. {
  37. try
  38. {
  39. hasNext = await e.MoveNextAsync();
  40. }
  41. catch (Exception ex)
  42. {
  43. if (cancellationToken.IsCancellationRequested)
  44. {
  45. return;
  46. }
  47. observer.OnError(ex);
  48. return;
  49. }
  50. if (hasNext)
  51. {
  52. observer.OnNext(e.Current);
  53. }
  54. else
  55. {
  56. observer.OnCompleted();
  57. return;
  58. }
  59. } while (!cancellationToken.IsCancellationRequested);
  60. }
  61. finally
  62. {
  63. if (e != null)
  64. {
  65. await e.DisposeAsync();
  66. }
  67. }
  68. }
  69. internal sealed class CancellationTokenDisposable : IDisposable
  70. {
  71. readonly CancellationTokenSource cts = new CancellationTokenSource();
  72. public CancellationToken Token => cts.Token;
  73. public void Dispose()
  74. {
  75. if (!cts.IsCancellationRequested)
  76. {
  77. cts.Cancel();
  78. }
  79. }
  80. }
  81. }
  82. }