123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356 |
- using System;
- using System.Threading;
- namespace Cysharp.Threading.Tasks.Linq
- {
- // note: refactor all inherit class and should remove this.
- // see Select and Where.
- internal abstract class AsyncEnumeratorBase<TSource, TResult> : MoveNextSource, IUniTaskAsyncEnumerator<TResult>
- {
- static readonly Action<object> moveNextCallbackDelegate = MoveNextCallBack;
- readonly IUniTaskAsyncEnumerable<TSource> source;
- protected CancellationToken cancellationToken;
- IUniTaskAsyncEnumerator<TSource> enumerator;
- UniTask<bool>.Awaiter sourceMoveNext;
- public AsyncEnumeratorBase(IUniTaskAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
- {
- this.source = source;
- this.cancellationToken = cancellationToken;
- TaskTracker.TrackActiveTask(this, 4);
- }
- // abstract
- /// <summary>
- /// If return value is false, continue source.MoveNext.
- /// </summary>
- protected abstract bool TryMoveNextCore(bool sourceHasCurrent, out bool result);
- // Util
- protected TSource SourceCurrent => enumerator.Current;
- // IUniTaskAsyncEnumerator<T>
- public TResult Current { get; protected set; }
- public UniTask<bool> MoveNextAsync()
- {
- if (enumerator == null)
- {
- enumerator = source.GetAsyncEnumerator(cancellationToken);
- }
- completionSource.Reset();
- if (!OnFirstIteration())
- {
- SourceMoveNext();
- }
- return new UniTask<bool>(this, completionSource.Version);
- }
- protected virtual bool OnFirstIteration()
- {
- return false;
- }
- protected void SourceMoveNext()
- {
- CONTINUE:
- sourceMoveNext = enumerator.MoveNextAsync().GetAwaiter();
- if (sourceMoveNext.IsCompleted)
- {
- bool result = false;
- try
- {
- if (!TryMoveNextCore(sourceMoveNext.GetResult(), out result))
- {
- goto CONTINUE;
- }
- }
- catch (Exception ex)
- {
- completionSource.TrySetException(ex);
- return;
- }
- if (cancellationToken.IsCancellationRequested)
- {
- completionSource.TrySetCanceled(cancellationToken);
- }
- else
- {
- completionSource.TrySetResult(result);
- }
- }
- else
- {
- sourceMoveNext.SourceOnCompleted(moveNextCallbackDelegate, this);
- }
- }
- static void MoveNextCallBack(object state)
- {
- var self = (AsyncEnumeratorBase<TSource, TResult>)state;
- bool result;
- try
- {
- if (!self.TryMoveNextCore(self.sourceMoveNext.GetResult(), out result))
- {
- self.SourceMoveNext();
- return;
- }
- }
- catch (Exception ex)
- {
- self.completionSource.TrySetException(ex);
- return;
- }
- if (self.cancellationToken.IsCancellationRequested)
- {
- self.completionSource.TrySetCanceled(self.cancellationToken);
- }
- else
- {
- self.completionSource.TrySetResult(result);
- }
- }
- // if require additional resource to dispose, override and call base.DisposeAsync.
- public virtual UniTask DisposeAsync()
- {
- TaskTracker.RemoveTracking(this);
- if (enumerator != null)
- {
- return enumerator.DisposeAsync();
- }
- return default;
- }
- }
- internal abstract class AsyncEnumeratorAwaitSelectorBase<TSource, TResult, TAwait> : MoveNextSource, IUniTaskAsyncEnumerator<TResult>
- {
- static readonly Action<object> moveNextCallbackDelegate = MoveNextCallBack;
- static readonly Action<object> setCurrentCallbackDelegate = SetCurrentCallBack;
- readonly IUniTaskAsyncEnumerable<TSource> source;
- protected CancellationToken cancellationToken;
- IUniTaskAsyncEnumerator<TSource> enumerator;
- UniTask<bool>.Awaiter sourceMoveNext;
- UniTask<TAwait>.Awaiter resultAwaiter;
- public AsyncEnumeratorAwaitSelectorBase(IUniTaskAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
- {
- this.source = source;
- this.cancellationToken = cancellationToken;
- TaskTracker.TrackActiveTask(this, 4);
- }
- // abstract
- protected abstract UniTask<TAwait> TransformAsync(TSource sourceCurrent);
- protected abstract bool TrySetCurrentCore(TAwait awaitResult, out bool terminateIteration);
- // Util
- protected TSource SourceCurrent { get; private set; }
- protected (bool waitCallback, bool requireNextIteration) ActionCompleted(bool trySetCurrentResult, out bool moveNextResult)
- {
- if (trySetCurrentResult)
- {
- moveNextResult = true;
- return (false, false);
- }
- else
- {
- moveNextResult = default;
- return (false, true);
- }
- }
- protected (bool waitCallback, bool requireNextIteration) WaitAwaitCallback(out bool moveNextResult) { moveNextResult = default; return (true, false); }
- protected (bool waitCallback, bool requireNextIteration) IterateFinished(out bool moveNextResult) { moveNextResult = false; return (false, false); }
- // IUniTaskAsyncEnumerator<T>
- public TResult Current { get; protected set; }
- public UniTask<bool> MoveNextAsync()
- {
- if (enumerator == null)
- {
- enumerator = source.GetAsyncEnumerator(cancellationToken);
- }
- completionSource.Reset();
- SourceMoveNext();
- return new UniTask<bool>(this, completionSource.Version);
- }
- protected void SourceMoveNext()
- {
- CONTINUE:
- sourceMoveNext = enumerator.MoveNextAsync().GetAwaiter();
- if (sourceMoveNext.IsCompleted)
- {
- bool result = false;
- try
- {
- (bool waitCallback, bool requireNextIteration) = TryMoveNextCore(sourceMoveNext.GetResult(), out result);
- if (waitCallback)
- {
- return;
- }
- if (requireNextIteration)
- {
- goto CONTINUE;
- }
- else
- {
- completionSource.TrySetResult(result);
- }
- }
- catch (Exception ex)
- {
- completionSource.TrySetException(ex);
- return;
- }
- }
- else
- {
- sourceMoveNext.SourceOnCompleted(moveNextCallbackDelegate, this);
- }
- }
- (bool waitCallback, bool requireNextIteration) TryMoveNextCore(bool sourceHasCurrent, out bool result)
- {
- if (sourceHasCurrent)
- {
- SourceCurrent = enumerator.Current;
- var task = TransformAsync(SourceCurrent);
- if (UnwarapTask(task, out var taskResult))
- {
- var currentResult = TrySetCurrentCore(taskResult, out var terminateIteration);
- if (terminateIteration)
- {
- return IterateFinished(out result);
- }
- return ActionCompleted(currentResult, out result);
- }
- else
- {
- return WaitAwaitCallback(out result);
- }
- }
- return IterateFinished(out result);
- }
- protected bool UnwarapTask(UniTask<TAwait> taskResult, out TAwait result)
- {
- resultAwaiter = taskResult.GetAwaiter();
- if (resultAwaiter.IsCompleted)
- {
- result = resultAwaiter.GetResult();
- return true;
- }
- else
- {
- resultAwaiter.SourceOnCompleted(setCurrentCallbackDelegate, this);
- result = default;
- return false;
- }
- }
- static void MoveNextCallBack(object state)
- {
- var self = (AsyncEnumeratorAwaitSelectorBase<TSource, TResult, TAwait>)state;
- bool result = false;
- try
- {
- (bool waitCallback, bool requireNextIteration) = self.TryMoveNextCore(self.sourceMoveNext.GetResult(), out result);
- if (waitCallback)
- {
- return;
- }
- if (requireNextIteration)
- {
- self.SourceMoveNext();
- return;
- }
- else
- {
- self.completionSource.TrySetResult(result);
- }
- }
- catch (Exception ex)
- {
- self.completionSource.TrySetException(ex);
- return;
- }
- }
- static void SetCurrentCallBack(object state)
- {
- var self = (AsyncEnumeratorAwaitSelectorBase<TSource, TResult, TAwait>)state;
- bool doneSetCurrent;
- bool terminateIteration;
- try
- {
- var result = self.resultAwaiter.GetResult();
- doneSetCurrent = self.TrySetCurrentCore(result, out terminateIteration);
- }
- catch (Exception ex)
- {
- self.completionSource.TrySetException(ex);
- return;
- }
- if (self.cancellationToken.IsCancellationRequested)
- {
- self.completionSource.TrySetCanceled(self.cancellationToken);
- }
- else
- {
- if (doneSetCurrent)
- {
- self.completionSource.TrySetResult(true);
- }
- else
- {
- if (terminateIteration)
- {
- self.completionSource.TrySetResult(false);
- }
- else
- {
- self.SourceMoveNext();
- }
- }
- }
- }
- // if require additional resource to dispose, override and call base.DisposeAsync.
- public virtual UniTask DisposeAsync()
- {
- TaskTracker.RemoveTracking(this);
- if (enumerator != null)
- {
- return enumerator.DisposeAsync();
- }
- return default;
- }
- }
- }
|