123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644 |
- using System;
- using System.Threading;
- namespace Cysharp.Threading.Tasks
- {
- public interface IReadOnlyAsyncReactiveProperty<T> : IUniTaskAsyncEnumerable<T>
- {
- T Value { get; }
- IUniTaskAsyncEnumerable<T> WithoutCurrent();
- UniTask<T> WaitAsync(CancellationToken cancellationToken = default);
- }
- public interface IAsyncReactiveProperty<T> : IReadOnlyAsyncReactiveProperty<T>
- {
- new T Value { get; set; }
- }
- [Serializable]
- public class AsyncReactiveProperty<T> : IAsyncReactiveProperty<T>, IDisposable
- {
- TriggerEvent<T> triggerEvent;
- #if UNITY_2018_3_OR_NEWER
- [UnityEngine.SerializeField]
- #endif
- T latestValue;
- public T Value
- {
- get
- {
- return latestValue;
- }
- set
- {
- this.latestValue = value;
- triggerEvent.SetResult(value);
- }
- }
- public AsyncReactiveProperty(T value)
- {
- this.latestValue = value;
- this.triggerEvent = default;
- }
- public IUniTaskAsyncEnumerable<T> WithoutCurrent()
- {
- return new WithoutCurrentEnumerable(this);
- }
- public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
- {
- return new Enumerator(this, cancellationToken, true);
- }
- public void Dispose()
- {
- triggerEvent.SetCompleted();
- }
- public static implicit operator T(AsyncReactiveProperty<T> value)
- {
- return value.Value;
- }
- public override string ToString()
- {
- if (isValueType) return latestValue.ToString();
- return latestValue?.ToString();
- }
- public UniTask<T> WaitAsync(CancellationToken cancellationToken = default)
- {
- return new UniTask<T>(WaitAsyncSource.Create(this, cancellationToken, out var token), token);
- }
- static bool isValueType;
- static AsyncReactiveProperty()
- {
- isValueType = typeof(T).IsValueType;
- }
- sealed class WaitAsyncSource : IUniTaskSource<T>, ITriggerHandler<T>, ITaskPoolNode<WaitAsyncSource>
- {
- static Action<object> cancellationCallback = CancellationCallback;
- static TaskPool<WaitAsyncSource> pool;
- WaitAsyncSource nextNode;
- ref WaitAsyncSource ITaskPoolNode<WaitAsyncSource>.NextNode => ref nextNode;
- static WaitAsyncSource()
- {
- TaskPool.RegisterSizeGetter(typeof(WaitAsyncSource), () => pool.Size);
- }
- AsyncReactiveProperty<T> parent;
- CancellationToken cancellationToken;
- CancellationTokenRegistration cancellationTokenRegistration;
- UniTaskCompletionSourceCore<T> core;
- WaitAsyncSource()
- {
- }
- public static IUniTaskSource<T> Create(AsyncReactiveProperty<T> parent, CancellationToken cancellationToken, out short token)
- {
- if (cancellationToken.IsCancellationRequested)
- {
- return AutoResetUniTaskCompletionSource<T>.CreateFromCanceled(cancellationToken, out token);
- }
- if (!pool.TryPop(out var result))
- {
- result = new WaitAsyncSource();
- }
- result.parent = parent;
- result.cancellationToken = cancellationToken;
- if (cancellationToken.CanBeCanceled)
- {
- result.cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, result);
- }
- result.parent.triggerEvent.Add(result);
- TaskTracker.TrackActiveTask(result, 3);
- token = result.core.Version;
- return result;
- }
- bool TryReturn()
- {
- TaskTracker.RemoveTracking(this);
- core.Reset();
- cancellationTokenRegistration.Dispose();
- cancellationTokenRegistration = default;
- parent.triggerEvent.Remove(this);
- parent = null;
- cancellationToken = default;
- return pool.TryPush(this);
- }
- static void CancellationCallback(object state)
- {
- var self = (WaitAsyncSource)state;
- self.OnCanceled(self.cancellationToken);
- }
- // IUniTaskSource
- public T GetResult(short token)
- {
- try
- {
- return core.GetResult(token);
- }
- finally
- {
- TryReturn();
- }
- }
- void IUniTaskSource.GetResult(short token)
- {
- GetResult(token);
- }
- public void OnCompleted(Action<object> continuation, object state, short token)
- {
- core.OnCompleted(continuation, state, token);
- }
- public UniTaskStatus GetStatus(short token)
- {
- return core.GetStatus(token);
- }
- public UniTaskStatus UnsafeGetStatus()
- {
- return core.UnsafeGetStatus();
- }
- // ITriggerHandler
- ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; }
- ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; }
- public void OnCanceled(CancellationToken cancellationToken)
- {
- core.TrySetCanceled(cancellationToken);
- }
- public void OnCompleted()
- {
- // Complete as Cancel.
- core.TrySetCanceled(CancellationToken.None);
- }
- public void OnError(Exception ex)
- {
- core.TrySetException(ex);
- }
- public void OnNext(T value)
- {
- core.TrySetResult(value);
- }
- }
- sealed class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
- {
- readonly AsyncReactiveProperty<T> parent;
- public WithoutCurrentEnumerable(AsyncReactiveProperty<T> parent)
- {
- this.parent = parent;
- }
- public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
- {
- return new Enumerator(parent, cancellationToken, false);
- }
- }
- sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, ITriggerHandler<T>
- {
- static Action<object> cancellationCallback = CancellationCallback;
- readonly AsyncReactiveProperty<T> parent;
- readonly CancellationToken cancellationToken;
- readonly CancellationTokenRegistration cancellationTokenRegistration;
- T value;
- bool isDisposed;
- bool firstCall;
- public Enumerator(AsyncReactiveProperty<T> parent, CancellationToken cancellationToken, bool publishCurrentValue)
- {
- this.parent = parent;
- this.cancellationToken = cancellationToken;
- this.firstCall = publishCurrentValue;
- parent.triggerEvent.Add(this);
- TaskTracker.TrackActiveTask(this, 3);
- if (cancellationToken.CanBeCanceled)
- {
- cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, this);
- }
- }
- public T Current => value;
- ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; }
- ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; }
- public UniTask<bool> MoveNextAsync()
- {
- // raise latest value on first call.
- if (firstCall)
- {
- firstCall = false;
- value = parent.Value;
- return CompletedTasks.True;
- }
- completionSource.Reset();
- return new UniTask<bool>(this, completionSource.Version);
- }
- public UniTask DisposeAsync()
- {
- if (!isDisposed)
- {
- isDisposed = true;
- TaskTracker.RemoveTracking(this);
- completionSource.TrySetCanceled(cancellationToken);
- parent.triggerEvent.Remove(this);
- }
- return default;
- }
- public void OnNext(T value)
- {
- this.value = value;
- completionSource.TrySetResult(true);
- }
- public void OnCanceled(CancellationToken cancellationToken)
- {
- DisposeAsync().Forget();
- }
- public void OnCompleted()
- {
- completionSource.TrySetResult(false);
- }
- public void OnError(Exception ex)
- {
- completionSource.TrySetException(ex);
- }
- static void CancellationCallback(object state)
- {
- var self = (Enumerator)state;
- self.DisposeAsync().Forget();
- }
- }
- }
- public class ReadOnlyAsyncReactiveProperty<T> : IReadOnlyAsyncReactiveProperty<T>, IDisposable
- {
- TriggerEvent<T> triggerEvent;
- T latestValue;
- IUniTaskAsyncEnumerator<T> enumerator;
- public T Value
- {
- get
- {
- return latestValue;
- }
- }
- public ReadOnlyAsyncReactiveProperty(T initialValue, IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
- {
- latestValue = initialValue;
- ConsumeEnumerator(source, cancellationToken).Forget();
- }
- public ReadOnlyAsyncReactiveProperty(IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
- {
- ConsumeEnumerator(source, cancellationToken).Forget();
- }
- async UniTaskVoid ConsumeEnumerator(IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
- {
- enumerator = source.GetAsyncEnumerator(cancellationToken);
- try
- {
- while (await enumerator.MoveNextAsync())
- {
- var value = enumerator.Current;
- this.latestValue = value;
- triggerEvent.SetResult(value);
- }
- }
- finally
- {
- await enumerator.DisposeAsync();
- enumerator = null;
- }
- }
- public IUniTaskAsyncEnumerable<T> WithoutCurrent()
- {
- return new WithoutCurrentEnumerable(this);
- }
- public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
- {
- return new Enumerator(this, cancellationToken, true);
- }
- public void Dispose()
- {
- if (enumerator != null)
- {
- enumerator.DisposeAsync().Forget();
- }
- triggerEvent.SetCompleted();
- }
- public static implicit operator T(ReadOnlyAsyncReactiveProperty<T> value)
- {
- return value.Value;
- }
- public override string ToString()
- {
- if (isValueType) return latestValue.ToString();
- return latestValue?.ToString();
- }
- public UniTask<T> WaitAsync(CancellationToken cancellationToken = default)
- {
- return new UniTask<T>(WaitAsyncSource.Create(this, cancellationToken, out var token), token);
- }
- static bool isValueType;
- static ReadOnlyAsyncReactiveProperty()
- {
- isValueType = typeof(T).IsValueType;
- }
- sealed class WaitAsyncSource : IUniTaskSource<T>, ITriggerHandler<T>, ITaskPoolNode<WaitAsyncSource>
- {
- static Action<object> cancellationCallback = CancellationCallback;
- static TaskPool<WaitAsyncSource> pool;
- WaitAsyncSource nextNode;
- ref WaitAsyncSource ITaskPoolNode<WaitAsyncSource>.NextNode => ref nextNode;
- static WaitAsyncSource()
- {
- TaskPool.RegisterSizeGetter(typeof(WaitAsyncSource), () => pool.Size);
- }
- ReadOnlyAsyncReactiveProperty<T> parent;
- CancellationToken cancellationToken;
- CancellationTokenRegistration cancellationTokenRegistration;
- UniTaskCompletionSourceCore<T> core;
- WaitAsyncSource()
- {
- }
- public static IUniTaskSource<T> Create(ReadOnlyAsyncReactiveProperty<T> parent, CancellationToken cancellationToken, out short token)
- {
- if (cancellationToken.IsCancellationRequested)
- {
- return AutoResetUniTaskCompletionSource<T>.CreateFromCanceled(cancellationToken, out token);
- }
- if (!pool.TryPop(out var result))
- {
- result = new WaitAsyncSource();
- }
- result.parent = parent;
- result.cancellationToken = cancellationToken;
- if (cancellationToken.CanBeCanceled)
- {
- result.cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, result);
- }
- result.parent.triggerEvent.Add(result);
- TaskTracker.TrackActiveTask(result, 3);
- token = result.core.Version;
- return result;
- }
- bool TryReturn()
- {
- TaskTracker.RemoveTracking(this);
- core.Reset();
- cancellationTokenRegistration.Dispose();
- cancellationTokenRegistration = default;
- parent.triggerEvent.Remove(this);
- parent = null;
- cancellationToken = default;
- return pool.TryPush(this);
- }
- static void CancellationCallback(object state)
- {
- var self = (WaitAsyncSource)state;
- self.OnCanceled(self.cancellationToken);
- }
- // IUniTaskSource
- public T GetResult(short token)
- {
- try
- {
- return core.GetResult(token);
- }
- finally
- {
- TryReturn();
- }
- }
- void IUniTaskSource.GetResult(short token)
- {
- GetResult(token);
- }
- public void OnCompleted(Action<object> continuation, object state, short token)
- {
- core.OnCompleted(continuation, state, token);
- }
- public UniTaskStatus GetStatus(short token)
- {
- return core.GetStatus(token);
- }
- public UniTaskStatus UnsafeGetStatus()
- {
- return core.UnsafeGetStatus();
- }
- // ITriggerHandler
- ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; }
- ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; }
- public void OnCanceled(CancellationToken cancellationToken)
- {
- core.TrySetCanceled(cancellationToken);
- }
- public void OnCompleted()
- {
- // Complete as Cancel.
- core.TrySetCanceled(CancellationToken.None);
- }
- public void OnError(Exception ex)
- {
- core.TrySetException(ex);
- }
- public void OnNext(T value)
- {
- core.TrySetResult(value);
- }
- }
- sealed class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
- {
- readonly ReadOnlyAsyncReactiveProperty<T> parent;
- public WithoutCurrentEnumerable(ReadOnlyAsyncReactiveProperty<T> parent)
- {
- this.parent = parent;
- }
- public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
- {
- return new Enumerator(parent, cancellationToken, false);
- }
- }
- sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, ITriggerHandler<T>
- {
- static Action<object> cancellationCallback = CancellationCallback;
- readonly ReadOnlyAsyncReactiveProperty<T> parent;
- readonly CancellationToken cancellationToken;
- readonly CancellationTokenRegistration cancellationTokenRegistration;
- T value;
- bool isDisposed;
- bool firstCall;
- public Enumerator(ReadOnlyAsyncReactiveProperty<T> parent, CancellationToken cancellationToken, bool publishCurrentValue)
- {
- this.parent = parent;
- this.cancellationToken = cancellationToken;
- this.firstCall = publishCurrentValue;
- parent.triggerEvent.Add(this);
- TaskTracker.TrackActiveTask(this, 3);
- if (cancellationToken.CanBeCanceled)
- {
- cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, this);
- }
- }
- public T Current => value;
- ITriggerHandler<T> ITriggerHandler<T>.Prev { get; set; }
- ITriggerHandler<T> ITriggerHandler<T>.Next { get; set; }
- public UniTask<bool> MoveNextAsync()
- {
- // raise latest value on first call.
- if (firstCall)
- {
- firstCall = false;
- value = parent.Value;
- return CompletedTasks.True;
- }
- completionSource.Reset();
- return new UniTask<bool>(this, completionSource.Version);
- }
- public UniTask DisposeAsync()
- {
- if (!isDisposed)
- {
- isDisposed = true;
- TaskTracker.RemoveTracking(this);
- completionSource.TrySetCanceled(cancellationToken);
- parent.triggerEvent.Remove(this);
- }
- return default;
- }
- public void OnNext(T value)
- {
- this.value = value;
- completionSource.TrySetResult(true);
- }
- public void OnCanceled(CancellationToken cancellationToken)
- {
- DisposeAsync().Forget();
- }
- public void OnCompleted()
- {
- completionSource.TrySetResult(false);
- }
- public void OnError(Exception ex)
- {
- completionSource.TrySetException(ex);
- }
- static void CancellationCallback(object state)
- {
- var self = (Enumerator)state;
- self.DisposeAsync().Forget();
- }
- }
- }
- public static class StateExtensions
- {
- public static ReadOnlyAsyncReactiveProperty<T> ToReadOnlyAsyncReactiveProperty<T>(this IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
- {
- return new ReadOnlyAsyncReactiveProperty<T>(source, cancellationToken);
- }
- public static ReadOnlyAsyncReactiveProperty<T> ToReadOnlyAsyncReactiveProperty<T>(this IUniTaskAsyncEnumerable<T> source, T initialValue, CancellationToken cancellationToken)
- {
- return new ReadOnlyAsyncReactiveProperty<T>(initialValue, source, cancellationToken);
- }
- }
- }
|