123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892 |
- using Cysharp.Threading.Tasks.Internal;
- using System;
- using System.Threading;
- namespace Cysharp.Threading.Tasks.Linq
- {
- public static partial class UniTaskAsyncEnumerable
- {
- public static IUniTaskAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, IUniTaskAsyncEnumerable<TResult>> selector)
- {
- Error.ThrowArgumentNullException(source, nameof(source));
- Error.ThrowArgumentNullException(selector, nameof(selector));
- return new SelectMany<TSource, TResult, TResult>(source, selector, (x, y) => y);
- }
- public static IUniTaskAsyncEnumerable<TResult> SelectMany<TSource, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, Int32, IUniTaskAsyncEnumerable<TResult>> selector)
- {
- Error.ThrowArgumentNullException(source, nameof(source));
- Error.ThrowArgumentNullException(selector, nameof(selector));
- return new SelectMany<TSource, TResult, TResult>(source, selector, (x, y) => y);
- }
- public static IUniTaskAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, IUniTaskAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
- {
- Error.ThrowArgumentNullException(source, nameof(source));
- Error.ThrowArgumentNullException(collectionSelector, nameof(collectionSelector));
- return new SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
- }
- public static IUniTaskAsyncEnumerable<TResult> SelectMany<TSource, TCollection, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, Int32, IUniTaskAsyncEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
- {
- Error.ThrowArgumentNullException(source, nameof(source));
- Error.ThrowArgumentNullException(collectionSelector, nameof(collectionSelector));
- return new SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
- }
- public static IUniTaskAsyncEnumerable<TResult> SelectManyAwait<TSource, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask<IUniTaskAsyncEnumerable<TResult>>> selector)
- {
- Error.ThrowArgumentNullException(source, nameof(source));
- Error.ThrowArgumentNullException(selector, nameof(selector));
- return new SelectManyAwait<TSource, TResult, TResult>(source, selector, (x, y) => UniTask.FromResult(y));
- }
- public static IUniTaskAsyncEnumerable<TResult> SelectManyAwait<TSource, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, Int32, UniTask<IUniTaskAsyncEnumerable<TResult>>> selector)
- {
- Error.ThrowArgumentNullException(source, nameof(source));
- Error.ThrowArgumentNullException(selector, nameof(selector));
- return new SelectManyAwait<TSource, TResult, TResult>(source, selector, (x, y) => UniTask.FromResult(y));
- }
- public static IUniTaskAsyncEnumerable<TResult> SelectManyAwait<TSource, TCollection, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask<IUniTaskAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, UniTask<TResult>> resultSelector)
- {
- Error.ThrowArgumentNullException(source, nameof(source));
- Error.ThrowArgumentNullException(collectionSelector, nameof(collectionSelector));
- return new SelectManyAwait<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
- }
- public static IUniTaskAsyncEnumerable<TResult> SelectManyAwait<TSource, TCollection, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, Int32, UniTask<IUniTaskAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, UniTask<TResult>> resultSelector)
- {
- Error.ThrowArgumentNullException(source, nameof(source));
- Error.ThrowArgumentNullException(collectionSelector, nameof(collectionSelector));
- return new SelectManyAwait<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
- }
- public static IUniTaskAsyncEnumerable<TResult> SelectManyAwaitWithCancellation<TSource, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TResult>>> selector)
- {
- Error.ThrowArgumentNullException(source, nameof(source));
- Error.ThrowArgumentNullException(selector, nameof(selector));
- return new SelectManyAwaitWithCancellation<TSource, TResult, TResult>(source, selector, (x, y, c) => UniTask.FromResult(y));
- }
- public static IUniTaskAsyncEnumerable<TResult> SelectManyAwaitWithCancellation<TSource, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, Int32, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TResult>>> selector)
- {
- Error.ThrowArgumentNullException(source, nameof(source));
- Error.ThrowArgumentNullException(selector, nameof(selector));
- return new SelectManyAwaitWithCancellation<TSource, TResult, TResult>(source, selector, (x, y, c) => UniTask.FromResult(y));
- }
- public static IUniTaskAsyncEnumerable<TResult> SelectManyAwaitWithCancellation<TSource, TCollection, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, UniTask<TResult>> resultSelector)
- {
- Error.ThrowArgumentNullException(source, nameof(source));
- Error.ThrowArgumentNullException(collectionSelector, nameof(collectionSelector));
- return new SelectManyAwaitWithCancellation<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
- }
- public static IUniTaskAsyncEnumerable<TResult> SelectManyAwaitWithCancellation<TSource, TCollection, TResult>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, Int32, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> collectionSelector, Func<TSource, TCollection, CancellationToken, UniTask<TResult>> resultSelector)
- {
- Error.ThrowArgumentNullException(source, nameof(source));
- Error.ThrowArgumentNullException(collectionSelector, nameof(collectionSelector));
- return new SelectManyAwaitWithCancellation<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
- }
- }
- internal sealed class SelectMany<TSource, TCollection, TResult> : IUniTaskAsyncEnumerable<TResult>
- {
- readonly IUniTaskAsyncEnumerable<TSource> source;
- readonly Func<TSource, IUniTaskAsyncEnumerable<TCollection>> selector1;
- readonly Func<TSource, int, IUniTaskAsyncEnumerable<TCollection>> selector2;
- readonly Func<TSource, TCollection, TResult> resultSelector;
- public SelectMany(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, IUniTaskAsyncEnumerable<TCollection>> selector, Func<TSource, TCollection, TResult> resultSelector)
- {
- this.source = source;
- this.selector1 = selector;
- this.selector2 = null;
- this.resultSelector = resultSelector;
- }
- public SelectMany(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, int, IUniTaskAsyncEnumerable<TCollection>> selector, Func<TSource, TCollection, TResult> resultSelector)
- {
- this.source = source;
- this.selector1 = null;
- this.selector2 = selector;
- this.resultSelector = resultSelector;
- }
- public IUniTaskAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken = default)
- {
- return new _SelectMany(source, selector1, selector2, resultSelector, cancellationToken);
- }
- sealed class _SelectMany : MoveNextSource, IUniTaskAsyncEnumerator<TResult>
- {
- static readonly Action<object> sourceMoveNextCoreDelegate = SourceMoveNextCore;
- static readonly Action<object> selectedSourceMoveNextCoreDelegate = SeletedSourceMoveNextCore;
- static readonly Action<object> selectedEnumeratorDisposeAsyncCoreDelegate = SelectedEnumeratorDisposeAsyncCore;
- readonly IUniTaskAsyncEnumerable<TSource> source;
- readonly Func<TSource, IUniTaskAsyncEnumerable<TCollection>> selector1;
- readonly Func<TSource, int, IUniTaskAsyncEnumerable<TCollection>> selector2;
- readonly Func<TSource, TCollection, TResult> resultSelector;
- CancellationToken cancellationToken;
- TSource sourceCurrent;
- int sourceIndex;
- IUniTaskAsyncEnumerator<TSource> sourceEnumerator;
- IUniTaskAsyncEnumerator<TCollection> selectedEnumerator;
- UniTask<bool>.Awaiter sourceAwaiter;
- UniTask<bool>.Awaiter selectedAwaiter;
- UniTask.Awaiter selectedDisposeAsyncAwaiter;
- public _SelectMany(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, IUniTaskAsyncEnumerable<TCollection>> selector1, Func<TSource, int, IUniTaskAsyncEnumerable<TCollection>> selector2, Func<TSource, TCollection, TResult> resultSelector, CancellationToken cancellationToken)
- {
- this.source = source;
- this.selector1 = selector1;
- this.selector2 = selector2;
- this.resultSelector = resultSelector;
- this.cancellationToken = cancellationToken;
- TaskTracker.TrackActiveTask(this, 3);
- }
- public TResult Current { get; private set; }
- public UniTask<bool> MoveNextAsync()
- {
- completionSource.Reset();
- // iterate selected field
- if (selectedEnumerator != null)
- {
- MoveNextSelected();
- }
- else
- {
- // iterate source field
- if (sourceEnumerator == null)
- {
- sourceEnumerator = source.GetAsyncEnumerator(cancellationToken);
- }
- MoveNextSource();
- }
- return new UniTask<bool>(this, completionSource.Version);
- }
- void MoveNextSource()
- {
- try
- {
- sourceAwaiter = sourceEnumerator.MoveNextAsync().GetAwaiter();
- }
- catch (Exception ex)
- {
- completionSource.TrySetException(ex);
- return;
- }
- if (sourceAwaiter.IsCompleted)
- {
- SourceMoveNextCore(this);
- }
- else
- {
- sourceAwaiter.SourceOnCompleted(sourceMoveNextCoreDelegate, this);
- }
- }
- void MoveNextSelected()
- {
- try
- {
- selectedAwaiter = selectedEnumerator.MoveNextAsync().GetAwaiter();
- }
- catch (Exception ex)
- {
- completionSource.TrySetException(ex);
- return;
- }
- if (selectedAwaiter.IsCompleted)
- {
- SeletedSourceMoveNextCore(this);
- }
- else
- {
- selectedAwaiter.SourceOnCompleted(selectedSourceMoveNextCoreDelegate, this);
- }
- }
- static void SourceMoveNextCore(object state)
- {
- var self = (_SelectMany)state;
- if (self.TryGetResult(self.sourceAwaiter, out var result))
- {
- if (result)
- {
- try
- {
- self.sourceCurrent = self.sourceEnumerator.Current;
- if (self.selector1 != null)
- {
- self.selectedEnumerator = self.selector1(self.sourceCurrent).GetAsyncEnumerator(self.cancellationToken);
- }
- else
- {
- self.selectedEnumerator = self.selector2(self.sourceCurrent, checked(self.sourceIndex++)).GetAsyncEnumerator(self.cancellationToken);
- }
- }
- catch (Exception ex)
- {
- self.completionSource.TrySetException(ex);
- return;
- }
- self.MoveNextSelected(); // iterated selected source.
- }
- else
- {
- self.completionSource.TrySetResult(false);
- }
- }
- }
- static void SeletedSourceMoveNextCore(object state)
- {
- var self = (_SelectMany)state;
- if (self.TryGetResult(self.selectedAwaiter, out var result))
- {
- if (result)
- {
- try
- {
- self.Current = self.resultSelector(self.sourceCurrent, self.selectedEnumerator.Current);
- }
- catch (Exception ex)
- {
- self.completionSource.TrySetException(ex);
- return;
- }
- self.completionSource.TrySetResult(true);
- }
- else
- {
- // dispose selected source and try iterate source.
- try
- {
- self.selectedDisposeAsyncAwaiter = self.selectedEnumerator.DisposeAsync().GetAwaiter();
- }
- catch (Exception ex)
- {
- self.completionSource.TrySetException(ex);
- return;
- }
- if (self.selectedDisposeAsyncAwaiter.IsCompleted)
- {
- SelectedEnumeratorDisposeAsyncCore(self);
- }
- else
- {
- self.selectedDisposeAsyncAwaiter.SourceOnCompleted(selectedEnumeratorDisposeAsyncCoreDelegate, self);
- }
- }
- }
- }
- static void SelectedEnumeratorDisposeAsyncCore(object state)
- {
- var self = (_SelectMany)state;
- if (self.TryGetResult(self.selectedDisposeAsyncAwaiter))
- {
- self.selectedEnumerator = null;
- self.selectedAwaiter = default;
- self.MoveNextSource(); // iterate next source
- }
- }
- public async UniTask DisposeAsync()
- {
- TaskTracker.RemoveTracking(this);
- if (selectedEnumerator != null)
- {
- await selectedEnumerator.DisposeAsync();
- }
- if (sourceEnumerator != null)
- {
- await sourceEnumerator.DisposeAsync();
- }
- }
- }
- }
- internal sealed class SelectManyAwait<TSource, TCollection, TResult> : IUniTaskAsyncEnumerable<TResult>
- {
- readonly IUniTaskAsyncEnumerable<TSource> source;
- readonly Func<TSource, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector1;
- readonly Func<TSource, int, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector2;
- readonly Func<TSource, TCollection, UniTask<TResult>> resultSelector;
- public SelectManyAwait(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector, Func<TSource, TCollection, UniTask<TResult>> resultSelector)
- {
- this.source = source;
- this.selector1 = selector;
- this.selector2 = null;
- this.resultSelector = resultSelector;
- }
- public SelectManyAwait(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, int, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector, Func<TSource, TCollection, UniTask<TResult>> resultSelector)
- {
- this.source = source;
- this.selector1 = null;
- this.selector2 = selector;
- this.resultSelector = resultSelector;
- }
- public IUniTaskAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken = default)
- {
- return new _SelectManyAwait(source, selector1, selector2, resultSelector, cancellationToken);
- }
- sealed class _SelectManyAwait : MoveNextSource, IUniTaskAsyncEnumerator<TResult>
- {
- static readonly Action<object> sourceMoveNextCoreDelegate = SourceMoveNextCore;
- static readonly Action<object> selectedSourceMoveNextCoreDelegate = SeletedSourceMoveNextCore;
- static readonly Action<object> selectedEnumeratorDisposeAsyncCoreDelegate = SelectedEnumeratorDisposeAsyncCore;
- static readonly Action<object> selectorAwaitCoreDelegate = SelectorAwaitCore;
- static readonly Action<object> resultSelectorAwaitCoreDelegate = ResultSelectorAwaitCore;
- readonly IUniTaskAsyncEnumerable<TSource> source;
- readonly Func<TSource, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector1;
- readonly Func<TSource, int, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector2;
- readonly Func<TSource, TCollection, UniTask<TResult>> resultSelector;
- CancellationToken cancellationToken;
- TSource sourceCurrent;
- int sourceIndex;
- IUniTaskAsyncEnumerator<TSource> sourceEnumerator;
- IUniTaskAsyncEnumerator<TCollection> selectedEnumerator;
- UniTask<bool>.Awaiter sourceAwaiter;
- UniTask<bool>.Awaiter selectedAwaiter;
- UniTask.Awaiter selectedDisposeAsyncAwaiter;
- // await additional
- UniTask<IUniTaskAsyncEnumerable<TCollection>>.Awaiter collectionSelectorAwaiter;
- UniTask<TResult>.Awaiter resultSelectorAwaiter;
- public _SelectManyAwait(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector1, Func<TSource, int, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector2, Func<TSource, TCollection, UniTask<TResult>> resultSelector, CancellationToken cancellationToken)
- {
- this.source = source;
- this.selector1 = selector1;
- this.selector2 = selector2;
- this.resultSelector = resultSelector;
- this.cancellationToken = cancellationToken;
- TaskTracker.TrackActiveTask(this, 3);
- }
- public TResult Current { get; private set; }
- public UniTask<bool> MoveNextAsync()
- {
- completionSource.Reset();
- // iterate selected field
- if (selectedEnumerator != null)
- {
- MoveNextSelected();
- }
- else
- {
- // iterate source field
- if (sourceEnumerator == null)
- {
- sourceEnumerator = source.GetAsyncEnumerator(cancellationToken);
- }
- MoveNextSource();
- }
- return new UniTask<bool>(this, completionSource.Version);
- }
- void MoveNextSource()
- {
- try
- {
- sourceAwaiter = sourceEnumerator.MoveNextAsync().GetAwaiter();
- }
- catch (Exception ex)
- {
- completionSource.TrySetException(ex);
- return;
- }
- if (sourceAwaiter.IsCompleted)
- {
- SourceMoveNextCore(this);
- }
- else
- {
- sourceAwaiter.SourceOnCompleted(sourceMoveNextCoreDelegate, this);
- }
- }
- void MoveNextSelected()
- {
- try
- {
- selectedAwaiter = selectedEnumerator.MoveNextAsync().GetAwaiter();
- }
- catch (Exception ex)
- {
- completionSource.TrySetException(ex);
- return;
- }
- if (selectedAwaiter.IsCompleted)
- {
- SeletedSourceMoveNextCore(this);
- }
- else
- {
- selectedAwaiter.SourceOnCompleted(selectedSourceMoveNextCoreDelegate, this);
- }
- }
- static void SourceMoveNextCore(object state)
- {
- var self = (_SelectManyAwait)state;
- if (self.TryGetResult(self.sourceAwaiter, out var result))
- {
- if (result)
- {
- try
- {
- self.sourceCurrent = self.sourceEnumerator.Current;
- if (self.selector1 != null)
- {
- self.collectionSelectorAwaiter = self.selector1(self.sourceCurrent).GetAwaiter();
- }
- else
- {
- self.collectionSelectorAwaiter = self.selector2(self.sourceCurrent, checked(self.sourceIndex++)).GetAwaiter();
- }
- if (self.collectionSelectorAwaiter.IsCompleted)
- {
- SelectorAwaitCore(self);
- }
- else
- {
- self.collectionSelectorAwaiter.SourceOnCompleted(selectorAwaitCoreDelegate, self);
- }
- }
- catch (Exception ex)
- {
- self.completionSource.TrySetException(ex);
- return;
- }
- }
- else
- {
- self.completionSource.TrySetResult(false);
- }
- }
- }
- static void SeletedSourceMoveNextCore(object state)
- {
- var self = (_SelectManyAwait)state;
- if (self.TryGetResult(self.selectedAwaiter, out var result))
- {
- if (result)
- {
- try
- {
- self.resultSelectorAwaiter = self.resultSelector(self.sourceCurrent, self.selectedEnumerator.Current).GetAwaiter();
- if (self.resultSelectorAwaiter.IsCompleted)
- {
- ResultSelectorAwaitCore(self);
- }
- else
- {
- self.resultSelectorAwaiter.SourceOnCompleted(resultSelectorAwaitCoreDelegate, self);
- }
- }
- catch (Exception ex)
- {
- self.completionSource.TrySetException(ex);
- return;
- }
- }
- else
- {
- // dispose selected source and try iterate source.
- try
- {
- self.selectedDisposeAsyncAwaiter = self.selectedEnumerator.DisposeAsync().GetAwaiter();
- }
- catch (Exception ex)
- {
- self.completionSource.TrySetException(ex);
- return;
- }
- if (self.selectedDisposeAsyncAwaiter.IsCompleted)
- {
- SelectedEnumeratorDisposeAsyncCore(self);
- }
- else
- {
- self.selectedDisposeAsyncAwaiter.SourceOnCompleted(selectedEnumeratorDisposeAsyncCoreDelegate, self);
- }
- }
- }
- }
- static void SelectedEnumeratorDisposeAsyncCore(object state)
- {
- var self = (_SelectManyAwait)state;
- if (self.TryGetResult(self.selectedDisposeAsyncAwaiter))
- {
- self.selectedEnumerator = null;
- self.selectedAwaiter = default;
- self.MoveNextSource(); // iterate next source
- }
- }
- static void SelectorAwaitCore(object state)
- {
- var self = (_SelectManyAwait)state;
- if (self.TryGetResult(self.collectionSelectorAwaiter, out var result))
- {
- self.selectedEnumerator = result.GetAsyncEnumerator(self.cancellationToken);
- self.MoveNextSelected(); // iterated selected source.
- }
- }
- static void ResultSelectorAwaitCore(object state)
- {
- var self = (_SelectManyAwait)state;
- if (self.TryGetResult(self.resultSelectorAwaiter, out var result))
- {
- self.Current = result;
- self.completionSource.TrySetResult(true);
- }
- }
- public async UniTask DisposeAsync()
- {
- TaskTracker.RemoveTracking(this);
- if (selectedEnumerator != null)
- {
- await selectedEnumerator.DisposeAsync();
- }
- if (sourceEnumerator != null)
- {
- await sourceEnumerator.DisposeAsync();
- }
- }
- }
- }
- internal sealed class SelectManyAwaitWithCancellation<TSource, TCollection, TResult> : IUniTaskAsyncEnumerable<TResult>
- {
- readonly IUniTaskAsyncEnumerable<TSource> source;
- readonly Func<TSource, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector1;
- readonly Func<TSource, int, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector2;
- readonly Func<TSource, TCollection, CancellationToken, UniTask<TResult>> resultSelector;
- public SelectManyAwaitWithCancellation(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector, Func<TSource, TCollection, CancellationToken, UniTask<TResult>> resultSelector)
- {
- this.source = source;
- this.selector1 = selector;
- this.selector2 = null;
- this.resultSelector = resultSelector;
- }
- public SelectManyAwaitWithCancellation(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector, Func<TSource, TCollection, CancellationToken, UniTask<TResult>> resultSelector)
- {
- this.source = source;
- this.selector1 = null;
- this.selector2 = selector;
- this.resultSelector = resultSelector;
- }
- public IUniTaskAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken = default)
- {
- return new _SelectManyAwaitWithCancellation(source, selector1, selector2, resultSelector, cancellationToken);
- }
- sealed class _SelectManyAwaitWithCancellation : MoveNextSource, IUniTaskAsyncEnumerator<TResult>
- {
- static readonly Action<object> sourceMoveNextCoreDelegate = SourceMoveNextCore;
- static readonly Action<object> selectedSourceMoveNextCoreDelegate = SeletedSourceMoveNextCore;
- static readonly Action<object> selectedEnumeratorDisposeAsyncCoreDelegate = SelectedEnumeratorDisposeAsyncCore;
- static readonly Action<object> selectorAwaitCoreDelegate = SelectorAwaitCore;
- static readonly Action<object> resultSelectorAwaitCoreDelegate = ResultSelectorAwaitCore;
- readonly IUniTaskAsyncEnumerable<TSource> source;
- readonly Func<TSource, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector1;
- readonly Func<TSource, int, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector2;
- readonly Func<TSource, TCollection, CancellationToken, UniTask<TResult>> resultSelector;
- CancellationToken cancellationToken;
- TSource sourceCurrent;
- int sourceIndex;
- IUniTaskAsyncEnumerator<TSource> sourceEnumerator;
- IUniTaskAsyncEnumerator<TCollection> selectedEnumerator;
- UniTask<bool>.Awaiter sourceAwaiter;
- UniTask<bool>.Awaiter selectedAwaiter;
- UniTask.Awaiter selectedDisposeAsyncAwaiter;
- // await additional
- UniTask<IUniTaskAsyncEnumerable<TCollection>>.Awaiter collectionSelectorAwaiter;
- UniTask<TResult>.Awaiter resultSelectorAwaiter;
- public _SelectManyAwaitWithCancellation(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector1, Func<TSource, int, CancellationToken, UniTask<IUniTaskAsyncEnumerable<TCollection>>> selector2, Func<TSource, TCollection, CancellationToken, UniTask<TResult>> resultSelector, CancellationToken cancellationToken)
- {
- this.source = source;
- this.selector1 = selector1;
- this.selector2 = selector2;
- this.resultSelector = resultSelector;
- this.cancellationToken = cancellationToken;
- TaskTracker.TrackActiveTask(this, 3);
- }
- public TResult Current { get; private set; }
- public UniTask<bool> MoveNextAsync()
- {
- completionSource.Reset();
- // iterate selected field
- if (selectedEnumerator != null)
- {
- MoveNextSelected();
- }
- else
- {
- // iterate source field
- if (sourceEnumerator == null)
- {
- sourceEnumerator = source.GetAsyncEnumerator(cancellationToken);
- }
- MoveNextSource();
- }
- return new UniTask<bool>(this, completionSource.Version);
- }
- void MoveNextSource()
- {
- try
- {
- sourceAwaiter = sourceEnumerator.MoveNextAsync().GetAwaiter();
- }
- catch (Exception ex)
- {
- completionSource.TrySetException(ex);
- return;
- }
- if (sourceAwaiter.IsCompleted)
- {
- SourceMoveNextCore(this);
- }
- else
- {
- sourceAwaiter.SourceOnCompleted(sourceMoveNextCoreDelegate, this);
- }
- }
- void MoveNextSelected()
- {
- try
- {
- selectedAwaiter = selectedEnumerator.MoveNextAsync().GetAwaiter();
- }
- catch (Exception ex)
- {
- completionSource.TrySetException(ex);
- return;
- }
- if (selectedAwaiter.IsCompleted)
- {
- SeletedSourceMoveNextCore(this);
- }
- else
- {
- selectedAwaiter.SourceOnCompleted(selectedSourceMoveNextCoreDelegate, this);
- }
- }
- static void SourceMoveNextCore(object state)
- {
- var self = (_SelectManyAwaitWithCancellation)state;
- if (self.TryGetResult(self.sourceAwaiter, out var result))
- {
- if (result)
- {
- try
- {
- self.sourceCurrent = self.sourceEnumerator.Current;
- if (self.selector1 != null)
- {
- self.collectionSelectorAwaiter = self.selector1(self.sourceCurrent, self.cancellationToken).GetAwaiter();
- }
- else
- {
- self.collectionSelectorAwaiter = self.selector2(self.sourceCurrent, checked(self.sourceIndex++), self.cancellationToken).GetAwaiter();
- }
- if (self.collectionSelectorAwaiter.IsCompleted)
- {
- SelectorAwaitCore(self);
- }
- else
- {
- self.collectionSelectorAwaiter.SourceOnCompleted(selectorAwaitCoreDelegate, self);
- }
- }
- catch (Exception ex)
- {
- self.completionSource.TrySetException(ex);
- return;
- }
- }
- else
- {
- self.completionSource.TrySetResult(false);
- }
- }
- }
- static void SeletedSourceMoveNextCore(object state)
- {
- var self = (_SelectManyAwaitWithCancellation)state;
- if (self.TryGetResult(self.selectedAwaiter, out var result))
- {
- if (result)
- {
- try
- {
- self.resultSelectorAwaiter = self.resultSelector(self.sourceCurrent, self.selectedEnumerator.Current, self.cancellationToken).GetAwaiter();
- if (self.resultSelectorAwaiter.IsCompleted)
- {
- ResultSelectorAwaitCore(self);
- }
- else
- {
- self.resultSelectorAwaiter.SourceOnCompleted(resultSelectorAwaitCoreDelegate, self);
- }
- }
- catch (Exception ex)
- {
- self.completionSource.TrySetException(ex);
- return;
- }
- }
- else
- {
- // dispose selected source and try iterate source.
- try
- {
- self.selectedDisposeAsyncAwaiter = self.selectedEnumerator.DisposeAsync().GetAwaiter();
- }
- catch (Exception ex)
- {
- self.completionSource.TrySetException(ex);
- return;
- }
- if (self.selectedDisposeAsyncAwaiter.IsCompleted)
- {
- SelectedEnumeratorDisposeAsyncCore(self);
- }
- else
- {
- self.selectedDisposeAsyncAwaiter.SourceOnCompleted(selectedEnumeratorDisposeAsyncCoreDelegate, self);
- }
- }
- }
- }
- static void SelectedEnumeratorDisposeAsyncCore(object state)
- {
- var self = (_SelectManyAwaitWithCancellation)state;
- if (self.TryGetResult(self.selectedDisposeAsyncAwaiter))
- {
- self.selectedEnumerator = null;
- self.selectedAwaiter = default;
- self.MoveNextSource(); // iterate next source
- }
- }
- static void SelectorAwaitCore(object state)
- {
- var self = (_SelectManyAwaitWithCancellation)state;
- if (self.TryGetResult(self.collectionSelectorAwaiter, out var result))
- {
- self.selectedEnumerator = result.GetAsyncEnumerator(self.cancellationToken);
- self.MoveNextSelected(); // iterated selected source.
- }
- }
- static void ResultSelectorAwaitCore(object state)
- {
- var self = (_SelectManyAwaitWithCancellation)state;
- if (self.TryGetResult(self.resultSelectorAwaiter, out var result))
- {
- self.Current = result;
- self.completionSource.TrySetResult(true);
- }
- }
- public async UniTask DisposeAsync()
- {
- TaskTracker.RemoveTracking(this);
- if (selectedEnumerator != null)
- {
- await selectedEnumerator.DisposeAsync();
- }
- if (sourceEnumerator != null)
- {
- await sourceEnumerator.DisposeAsync();
- }
- }
- }
- }
- }
|