|
- using System;
- using System.Threading;
- namespace Cysharp.Threading.Tasks
- {
- public interface IReadOnlyAsyncReactiveProperty<T> : IUniTaskAsyncEnumerable<T>
- {
- T Value { get; }
- IUniTaskAsyncEnumerable<T> WithoutCurrent();
- UniTask<T> WaitAsync(CancellationToken cancellationToken = default);
- }
- public interface IAsyncReactiveProperty<T> : IReadOnlyAsyncReactiveProperty<T>
- {
- new T Value { get; set; }
- }
- [Serializable]
- public class AsyncReactiveProperty<T> : IAsyncReactiveProperty<T>, IDisposable
- {
- TriggerEvent<T> triggerEvent;
- #if UNITY_2018_3_OR_NEWER
- [UnityEngine.SerializeField]
- #endif
- T latestValue;
- public T Value
- {
- get
- {
- return latestValue;
- }
- set
- {
- this.latestValue = value;
- triggerEvent.SetResult(value);
- }
- }
- public AsyncReactiveProperty(T value)
- {
- this.latestValue = value;
- this.triggerEvent = default;
- }
- public IUniTaskAsyncEnumerable<T> WithoutCurrent()
- {
- return new WithoutCurrentEnumerable(this);
- }
- public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
- {
- return new Enumerator(this, cancellationToken, true);
- }
- public void Dispose()
- {
- triggerEvent.SetCompleted();
- }
- public static implicit operator T(AsyncReactiveProperty<T> value)
- {
- return value.Value;
- }
- public override string ToString()
- {
- if (isValueType) return latestValue.ToString();
- return latestValue?.ToString();
- }
- public UniTask<T> WaitAsync(CancellationToken cancellationToken = default)
- {
- return new UniTask<T>(WaitAsyncSource.Create(this, cancellationToken, out var token), token);
- }
- static bool isValueType;
- static AsyncReactiveProperty()
- {
- isValueType = typeof(T).IsValueType;
- }
- sealed class WaitAsyncSource : IUniTaskSource<T>, ITriggerHandler<T>, ITaskPoolNode<WaitAsyncSource>
- {
- static Action<object> cancellationCallback = CancellationCallback;
- static TaskPool<WaitAsyncSource> pool;
- WaitAsyncSource nextNode;
- ref WaitAsyncSource ITaskPoolNode<WaitAsyncSource>.NextNode => ref nextNode;
- static WaitAsyncSource()
- {
- TaskPool.RegisterSizeGetter(typeof(WaitAsyncSource), () => pool.Size);
- }
- AsyncReactiveProperty<T> parent;
- CancellationToken cancellationToken;
- CancellationTokenRegistration cancellationTokenRegistration;
- UniTaskCompletionSourceCore<T> core;
- WaitAsyncSource()
- {
- }
- public static IUniTaskSource<T> Create(AsyncReactiveProperty<T> parent, CancellationToken cancellationToken, out short token)
- {
- if (cancellationToken.IsCancellationRequested)
- {
- return AutoResetUniTaskCompletionSource<T>.CreateFromCanceled(cancellationToken, out token);
- }
- if (!pool.TryPop(out var result))
- {
- result = new WaitAsyncSource();
- }
- result.parent = parent;
- result.cancellationToken = cancellationToken;
- if (cancellationToken.CanBeCanceled)
- {
- result.cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, result);
- }
- result.parent.triggerEvent.Add(result);
- TaskTracker.TrackActiveTask(result, 3);
- token = result.core.Version;
- return result;
- }
- bool TryReturn()
- {
- TaskTracker.RemoveTracking(this);
- core.Reset();
- cancellationTokenRegistration.Dispose();
- cancellationTokenRegistration = default;
- parent.triggerEvent.Remove(this);
- parent = null;
- cancellationToken = default;
- return pool.TryPush(this);
- }
- static void CancellationCallback(object state)
- {
- var self = (WaitAsyncSource)state;
- self.OnCanceled(self.cancellationToken);
- }
- // IUniTaskSource
- public T GetResult(short token)
- {
- try
- {
- return core.GetResult(token);
- }
- finally
- {
- TryReturn();
- }
- }
- void IUniTaskSource.GetResult(short token)
- {
- GetResult(token);
- }
- public void OnCompleted(Action<object> continuation, object state, short token)
- {
- core.OnCompleted(continuation, state, token);
- }
- public UniTaskStatus GetStatus(short token)
- {
- return core.GetStatus(token);
- }
- public UniTaskStatus UnsafeGetStatus()
- {
- return core.UnsafeGetStatus();
- }
- // ITriggerHandler
- ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; }
- ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; }
- public void OnCanceled(CancellationToken cancellationToken)
- {
- core.TrySetCanceled(cancellationToken);
- }
- public void OnCompleted()
- {
- // Complete as Cancel.
- core.TrySetCanceled(CancellationToken.None);
- }
- public void OnError(Exception ex)
- {
- core.TrySetException(ex);
- }
- public void OnNext(T value)
- {
- core.TrySetResult(value);
- }
- }
- sealed class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
- {
- readonly AsyncReactiveProperty<T> parent;
- public WithoutCurrentEnumerable(AsyncReactiveProperty<T> parent)
- {
- this.parent = parent;
- }
- public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
- {
- return new Enumerator(parent, cancellationToken, false);
- }
- }
- sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, ITriggerHandler<T>
- {
- static Action<object> cancellationCallback = CancellationCallback;
- readonly AsyncReactiveProperty<T> parent;
- readonly CancellationToken cancellationToken;
- readonly CancellationTokenRegistration cancellationTokenRegistration;
- T value;
- bool isDisposed;
- bool firstCall;
- public Enumerator(AsyncReactiveProperty<T> parent, CancellationToken cancellationToken, bool publishCurrentValue)
- {
- this.parent = parent;
- this.cancellationToken = cancellationToken;
- this.firstCall = publishCurrentValue;
- parent.triggerEvent.Add(this);
- TaskTracker.TrackActiveTask(this, 3);
- if (cancellationToken.CanBeCanceled)
- {
- cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, this);
- }
- }
- public T Current => value;
- ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; }
- ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; }
- public UniTask<bool> MoveNextAsync()
- {
- // raise latest value on first call.
- if (firstCall)
- {
- firstCall = false;
- value = parent.Value;
- return CompletedTasks.True;
- }
- completionSource.Reset();
- return new UniTask<bool>(this, completionSource.Version);
- }
- public UniTask DisposeAsync()
- {
- if (!isDisposed)
- {
- isDisposed = true;
- TaskTracker.RemoveTracking(this);
- completionSource.TrySetCanceled(cancellationToken);
- parent.triggerEvent.Remove(this);
- }
- return default;
- }
- public void OnNext(T value)
- {
- this.value = value;
- completionSource.TrySetResult(true);
- }
- public void OnCanceled(CancellationToken cancellationToken)
- {
- DisposeAsync().Forget();
- }
- public void OnCompleted()
- {
- completionSource.TrySetResult(false);
- }
- public void OnError(Exception ex)
- {
- completionSource.TrySetException(ex);
- }
- static void CancellationCallback(object state)
- {
- var self = (Enumerator)state;
- self.DisposeAsync().Forget();
- }
- }
- }
- public class ReadOnlyAsyncReactiveProperty<T> : IReadOnlyAsyncReactiveProperty<T>, IDisposable
- {
- TriggerEvent<T> triggerEvent;
- T latestValue;
- IUniTaskAsyncEnumerator<T> enumerator;
- public T Value
- {
- get
- {
- return latestValue;
- }
- }
- public ReadOnlyAsyncReactiveProperty(T initialValue, IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
- {
- latestValue = initialValue;
- ConsumeEnumerator(source, cancellationToken).Forget();
- }
- public ReadOnlyAsyncReactiveProperty(IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
- {
- ConsumeEnumerator(source, cancellationToken).Forget();
- }
- async UniTaskVoid ConsumeEnumerator(IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
- {
- enumerator = source.GetAsyncEnumerator(cancellationToken);
- try
- {
- while (await enumerator.MoveNextAsync())
- {
- var value = enumerator.Current;
- this.latestValue = value;
- triggerEvent.SetResult(value);
- }
- }
- finally
- {
- await enumerator.DisposeAsync();
- enumerator = null;
- }
- }
- public IUniTaskAsyncEnumerable<T> WithoutCurrent()
- {
- return new WithoutCurrentEnumerable(this);
- }
- public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
- {
- return new Enumerator(this, cancellationToken, true);
- }
- public void Dispose()
- {
- if (enumerator != null)
- {
- enumerator.DisposeAsync().Forget();
- }
- triggerEvent.SetCompleted();
- }
- public static implicit operator T(ReadOnlyAsyncReactiveProperty<T> value)
- {
- return value.Value;
- }
- public override string ToString()
- {
- if (isValueType) return latestValue.ToString();
- return latestValue?.ToString();
- }
- public UniTask<T> WaitAsync(CancellationToken cancellationToken = default)
- {
- return new UniTask<T>(WaitAsyncSource.Create(this, cancellationToken, out var token), token);
- }
- static bool isValueType;
- static ReadOnlyAsyncReactiveProperty()
- {
- isValueType = typeof(T).IsValueType;
- }
- sealed class WaitAsyncSource : IUniTaskSource<T>, ITriggerHandler<T>, ITaskPoolNode<WaitAsyncSource>
- {
- static Action<object> cancellationCallback = CancellationCallback;
- static TaskPool<WaitAsyncSource> pool;
- WaitAsyncSource nextNode;
- ref WaitAsyncSource ITaskPoolNode<WaitAsyncSource>.NextNode => ref nextNode;
- static WaitAsyncSource()
- {
- TaskPool.RegisterSizeGetter(typeof(WaitAsyncSource), () => pool.Size);
- }
- ReadOnlyAsyncReactiveProperty<T> parent;
- CancellationToken cancellationToken;
- CancellationTokenRegistration cancellationTokenRegistration;
- UniTaskCompletionSourceCore<T> core;
- WaitAsyncSource()
- {
- }
- public static IUniTaskSource<T> Create(ReadOnlyAsyncReactiveProperty<T> parent, CancellationToken cancellationToken, out short token)
- {
- if (cancellationToken.IsCancellationRequested)
- {
- return AutoResetUniTaskCompletionSource<T>.CreateFromCanceled(cancellationToken, out token);
- }
- if (!pool.TryPop(out var result))
- {
- result = new WaitAsyncSource();
- }
- result.parent = parent;
- result.cancellationToken = cancellationToken;
- if (cancellationToken.CanBeCanceled)
- {
- result.cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, result);
- }
- result.parent.triggerEvent.Add(result);
- TaskTracker.TrackActiveTask(result, 3);
- token = result.core.Version;
- return result;
- }
- bool TryReturn()
- {
- TaskTracker.RemoveTracking(this);
- core.Reset();
- cancellationTokenRegistration.Dispose();
- cancellationTokenRegistration = default;
- parent.triggerEvent.Remove(this);
- parent = null;
- cancellationToken = default;
- return pool.TryPush(this);
- }
- static void CancellationCallback(object state)
- {
- var self = (WaitAsyncSource)state;
- self.OnCanceled(self.cancellationToken);
- }
- // IUniTaskSource
- public T GetResult(short token)
- {
- try
- {
- return core.GetResult(token);
- }
- finally
- {
- TryReturn();
- }
- }
- void IUniTaskSource.GetResult(short token)
- {
- GetResult(token);
- }
- public void OnCompleted(Action<object> continuation, object state, short token)
- {
- core.OnCompleted(continuation, state, token);
- }
- public UniTaskStatus GetStatus(short token)
- {
- return core.GetStatus(token);
- }
- public UniTaskStatus UnsafeGetStatus()
- {
- return core.UnsafeGetStatus();
- }
- // ITriggerHandler
- ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; }
- ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; }
- public void OnCanceled(CancellationToken cancellationToken)
- {
- core.TrySetCanceled(cancellationToken);
- }
- public void OnCompleted()
- {
- // Complete as Cancel.
- core.TrySetCanceled(CancellationToken.None);
- }
- public void OnError(Exception ex)
- {
- core.TrySetException(ex);
- }
- public void OnNext(T value)
- {
- core.TrySetResult(value);
- }
- }
- sealed class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
- {
- readonly ReadOnlyAsyncReactiveProperty<T> parent;
- public WithoutCurrentEnumerable(ReadOnlyAsyncReactiveProperty<T> parent)
- {
- this.parent = parent;
- }
- public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
- {
- return new Enumerator(parent, cancellationToken, false);
- }
- }
- sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, ITriggerHandler<T>
- {
- static Action<object> cancellationCallback = CancellationCallback;
- readonly ReadOnlyAsyncReactiveProperty<T> parent;
- readonly CancellationToken cancellationToken;
- readonly CancellationTokenRegistration cancellationTokenRegistration;
- T value;
- bool isDisposed;
- bool firstCall;
- public Enumerator(ReadOnlyAsyncReactiveProperty<T> parent, CancellationToken cancellationToken, bool publishCurrentValue)
- {
- this.parent = parent;
- this.cancellationToken = cancellationToken;
- this.firstCall = publishCurrentValue;
- parent.triggerEvent.Add(this);
- TaskTracker.TrackActiveTask(this, 3);
- if (cancellationToken.CanBeCanceled)
- {
- cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, this);
- }
- }
- public T Current => value;
- ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; }
- ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; }
- public UniTask<bool> MoveNextAsync()
- {
- // raise latest value on first call.
- if (firstCall)
- {
- firstCall = false;
- value = parent.Value;
- return CompletedTasks.True;
- }
- completionSource.Reset();
- return new UniTask<bool>(this, completionSource.Version);
- }
- public UniTask DisposeAsync()
- {
- if (!isDisposed)
- {
- isDisposed = true;
- TaskTracker.RemoveTracking(this);
- completionSource.TrySetCanceled(cancellationToken);
- parent.triggerEvent.Remove(this);
- }
- return default;
- }
- public void OnNext(T value)
- {
- this.value = value;
- completionSource.TrySetResult(true);
- }
- public void OnCanceled(CancellationToken cancellationToken)
- {
- DisposeAsync().Forget();
- }
- public void OnCompleted()
- {
- completionSource.TrySetResult(false);
- }
- public void OnError(Exception ex)
- {
- completionSource.TrySetException(ex);
- }
- static void CancellationCallback(object state)
- {
- var self = (Enumerator)state;
- self.DisposeAsync().Forget();
- }
- }
- }
- public static class StateExtensions
- {
- public static ReadOnlyAsyncReactiveProperty<T> ToReadOnlyAsyncReactiveProperty<T>(this IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
- {
- return new ReadOnlyAsyncReactiveProperty<T>(source, cancellationToken);
- }
- public static ReadOnlyAsyncReactiveProperty<T> ToReadOnlyAsyncReactiveProperty<T>(this IUniTaskAsyncEnumerable<T> source, T initialValue, CancellationToken cancellationToken)
- {
- return new ReadOnlyAsyncReactiveProperty<T>(initialValue, source, cancellationToken);
- }
- }
- }
|