<#@ template debug="false" hostspecific="false" language="C#" #> <#@ assembly name="System.Core" #> <#@ import namespace="System.Linq" #> <#@ import namespace="System.Text" #> <#@ import namespace="System.Collections.Generic" #> <#@ output extension=".cs" #> <# var tMax = 15; Func typeArgs = x => string.Join(", ", Enumerable.Range(1, x).Select(x => $"T{x}")) + ", TResult"; Func paramArgs = x => string.Join(", ", Enumerable.Range(1, x).Select(x => $"IUniTaskAsyncEnumerable source{x}")); Func parameters = x => string.Join(", ", Enumerable.Range(1, x).Select(x => $"source{x}")); #> using Cysharp.Threading.Tasks.Internal; using System; using System.Threading; namespace Cysharp.Threading.Tasks.Linq { public static partial class UniTaskAsyncEnumerable { <# for(var i = 2; i <= tMax; i++) { #> public static IUniTaskAsyncEnumerable CombineLatest<<#= typeArgs(i) #>>(this <#= paramArgs(i) #>, Func<<#= typeArgs(i) #>> resultSelector) { <# for(var j = 1; j <= i; j++) { #> Error.ThrowArgumentNullException(source<#= j #>, nameof(source<#= j #>)); <# } #> Error.ThrowArgumentNullException(resultSelector, nameof(resultSelector)); return new CombineLatest<<#= typeArgs(i) #>>(<#= parameters(i) #>, resultSelector); } <# } #> } <# for(var i = 2; i <= tMax; i++) { #> internal class CombineLatest<<#= typeArgs(i) #>> : IUniTaskAsyncEnumerable { <# for(var j = 1; j <= i; j++) { #> readonly IUniTaskAsyncEnumerable> source<#= j #>; <# } #> readonly Func<<#= typeArgs(i) #>> resultSelector; public CombineLatest(<#= paramArgs(i) #>, Func<<#= typeArgs(i) #>> resultSelector) { <# for(var j = 1; j <= i; j++) { #> this.source<#= j #> = source<#= j #>; <# } #> this.resultSelector = resultSelector; } public IUniTaskAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) { return new _CombineLatest(<#= parameters(i) #>, resultSelector, cancellationToken); } class _CombineLatest : MoveNextSource, IUniTaskAsyncEnumerator { <# for(var j = 1; j <= i; j++) { #> static readonly Action Completed<#= j #>Delegate = Completed<#= j #>; <# } #> const int CompleteCount = <#= i #>; <# for(var j = 1; j <= i; j++) { #> readonly IUniTaskAsyncEnumerable> source<#= j #>; <# } #> readonly Func<<#= typeArgs(i) #>> resultSelector; CancellationToken cancellationToken; <# for(var j = 1; j <= i; j++) { #> IUniTaskAsyncEnumerator> enumerator<#= j #>; UniTask.Awaiter awaiter<#= j #>; bool hasCurrent<#= j #>; bool running<#= j #>; T<#= j #> current<#= j #>; <# } #> int completedCount; bool syncRunning; TResult result; public _CombineLatest(<#= paramArgs(i) #>, Func<<#= typeArgs(i) #>> resultSelector, CancellationToken cancellationToken) { <# for(var j = 1; j <= i; j++) { #> this.source<#= j #> = source<#= j #>; <# } #> this.resultSelector = resultSelector; this.cancellationToken = cancellationToken; TaskTracker.TrackActiveTask(this, 3); } public TResult Current => result; public UniTask MoveNextAsync() { cancellationToken.ThrowIfCancellationRequested(); if (completedCount == CompleteCount) return CompletedTasks.False; if (enumerator1 == null) { <# for(var j = 1; j <= i; j++) { #> enumerator<#= j #> = source<#= j #>.GetAsyncEnumerator(cancellationToken); <# } #> } completionSource.Reset(); AGAIN: syncRunning = true; <# for(var j = 1; j <= i; j++) { #> if (!running<#= j #>) { running<#= j #> = true; awaiter<#= j #> = enumerator<#= j #>.MoveNextAsync().GetAwaiter(); if (awaiter<#= j #>.IsCompleted) { Completed<#= j #>(this); } else { awaiter<#= j #>.SourceOnCompleted(Completed<#= j #>Delegate, this); } } <# } #> if (<#= string.Join(" || ", Enumerable.Range(1, i).Select(x => $"!running{x}")) #>) { goto AGAIN; } syncRunning = false; return new UniTask(this, completionSource.Version); } <# for(var j = 1; j <= i; j++) { #> static void Completed<#= j #>(object state) { var self = (_CombineLatest)state; self.running<#= j #> = false; try { if (self.awaiter<#= j #>.GetResult()) { self.hasCurrent<#= j #> = true; self.current<#= j #> = self.enumerator<#= j #>.Current; goto SUCCESS; } else { self.running<#= j #> = true; // as complete, no more call MoveNextAsync. if (Interlocked.Increment(ref self.completedCount) == CompleteCount) { goto COMPLETE; } return; } } catch (Exception ex) { self.running<#= j #> = true; // as complete, no more call MoveNextAsync. self.completedCount = CompleteCount; self.completionSource.TrySetException(ex); return; } SUCCESS: if (!self.TrySetResult()) { if (self.syncRunning) return; self.running<#= j #> = true; // as complete, no more call MoveNextAsync. try { self.awaiter<#= j #> = self.enumerator<#= j #>.MoveNextAsync().GetAwaiter(); } catch (Exception ex) { self.completedCount = CompleteCount; self.completionSource.TrySetException(ex); return; } self.awaiter<#= j #>.SourceOnCompleted(Completed<#= j #>Delegate, self); } return; COMPLETE: self.completionSource.TrySetResult(false); return; } <# } #> bool TrySetResult() { if (<#= string.Join(" && ", Enumerable.Range(1, i).Select(x => $"hasCurrent{x}")) #>) { result = resultSelector(<#= string.Join(", ", Enumerable.Range(1, i).Select(x => $"current{x}")) #>); completionSource.TrySetResult(true); return true; } else { return false; } } public async UniTask DisposeAsync() { TaskTracker.RemoveTracking(this); <# for(var j = 1; j <= i; j++) { #> if (enumerator<#= j #> != null) { await enumerator<#= j #>.DisposeAsync(); } <# } #> } } } <# } #> }