123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- using System;
- using System.Threading;
- namespace Cysharp.Threading.Tasks.Linq
- {
- public static partial class UniTaskAsyncEnumerable
- {
- public static IUniTaskAsyncEnumerable<TSource> Queue<TSource>(this IUniTaskAsyncEnumerable<TSource> source)
- {
- return new QueueOperator<TSource>(source);
- }
- }
- internal sealed class QueueOperator<TSource> : IUniTaskAsyncEnumerable<TSource>
- {
- readonly IUniTaskAsyncEnumerable<TSource> source;
- public QueueOperator(IUniTaskAsyncEnumerable<TSource> source)
- {
- this.source = source;
- }
- public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
- {
- return new _Queue(source, cancellationToken);
- }
- sealed class _Queue : IUniTaskAsyncEnumerator<TSource>
- {
- readonly IUniTaskAsyncEnumerable<TSource> source;
- CancellationToken cancellationToken;
- Channel<TSource> channel;
- IUniTaskAsyncEnumerator<TSource> channelEnumerator;
- IUniTaskAsyncEnumerator<TSource> sourceEnumerator;
- bool channelClosed;
- public _Queue(IUniTaskAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
- {
- this.source = source;
- this.cancellationToken = cancellationToken;
- }
- public TSource Current => channelEnumerator.Current;
- public UniTask<bool> MoveNextAsync()
- {
- cancellationToken.ThrowIfCancellationRequested();
- if (sourceEnumerator == null)
- {
- sourceEnumerator = source.GetAsyncEnumerator(cancellationToken);
- channel = Channel.CreateSingleConsumerUnbounded<TSource>();
- channelEnumerator = channel.Reader.ReadAllAsync().GetAsyncEnumerator(cancellationToken);
- ConsumeAll(this, sourceEnumerator, channel).Forget();
- }
- return channelEnumerator.MoveNextAsync();
- }
- static async UniTaskVoid ConsumeAll(_Queue self, IUniTaskAsyncEnumerator<TSource> enumerator, ChannelWriter<TSource> writer)
- {
- try
- {
- while (await enumerator.MoveNextAsync())
- {
- writer.TryWrite(enumerator.Current);
- }
- writer.TryComplete();
- }
- catch (Exception ex)
- {
- writer.TryComplete(ex);
- }
- finally
- {
- self.channelClosed = true;
- await enumerator.DisposeAsync();
- }
- }
- public async UniTask DisposeAsync()
- {
- if (sourceEnumerator != null)
- {
- await sourceEnumerator.DisposeAsync();
- }
- if (channelEnumerator != null)
- {
- await channelEnumerator.DisposeAsync();
- }
- if (!channelClosed)
- {
- channelClosed = true;
- channel.Writer.TryComplete(new OperationCanceledException());
- }
- }
- }
- }
- }
|