Aggregate and ForEach

This commit is contained in:
neuecc
2020-05-10 00:33:46 +09:00
parent 31b788a2c9
commit f37cd703a9
6 changed files with 451 additions and 837 deletions

View File

@@ -0,0 +1,318 @@
using Cysharp.Threading.Tasks.Internal;
using System;
using System.Collections.Generic;
using System.Threading;
namespace Cysharp.Threading.Tasks.Linq
{
public static partial class UniTaskAsyncEnumerable
{
public static UniTask<TSource> AggregateAsync<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, CancellationToken cancellationToken = default)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(accumulator, nameof(accumulator));
return Aggregate.InvokeAsync(source, accumulator, cancellationToken);
}
public static UniTask<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, CancellationToken cancellationToken = default)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(accumulator, nameof(accumulator));
return Aggregate.InvokeAsync(source, seed, accumulator, cancellationToken);
}
public static UniTask<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken = default)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(accumulator, nameof(accumulator));
Error.ThrowArgumentNullException(accumulator, nameof(resultSelector));
return Aggregate.InvokeAsync(source, seed, accumulator, resultSelector, cancellationToken);
}
public static UniTask<TSource> AggregateAwaitAsync<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, TSource, UniTask<TSource>> accumulator, CancellationToken cancellationToken = default)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(accumulator, nameof(accumulator));
return Aggregate.InvokeAsync(source, accumulator, cancellationToken);
}
public static UniTask<TAccumulate> AggregateAwaitAsync<TSource, TAccumulate>(this IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, UniTask<TAccumulate>> accumulator, CancellationToken cancellationToken = default)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(accumulator, nameof(accumulator));
return Aggregate.InvokeAsync(source, seed, accumulator, cancellationToken);
}
public static UniTask<TResult> AggregateAwaitAsync<TSource, TAccumulate, TResult>(this IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, UniTask<TAccumulate>> accumulator, Func<TAccumulate, UniTask<TResult>> resultSelector, CancellationToken cancellationToken = default)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(accumulator, nameof(accumulator));
Error.ThrowArgumentNullException(accumulator, nameof(resultSelector));
return Aggregate.InvokeAsync(source, seed, accumulator, resultSelector, cancellationToken);
}
public static UniTask<TSource> AggregateAwaitWithCancellationAsync<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, UniTask<TSource>> accumulator, CancellationToken cancellationToken = default)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(accumulator, nameof(accumulator));
return Aggregate.InvokeAsync(source, accumulator, cancellationToken);
}
public static UniTask<TAccumulate> AggregateAwaitWithCancellationAsync<TSource, TAccumulate>(this IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, UniTask<TAccumulate>> accumulator, CancellationToken cancellationToken = default)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(accumulator, nameof(accumulator));
return Aggregate.InvokeAsync(source, seed, accumulator, cancellationToken);
}
public static UniTask<TResult> AggregateAwaitWithCancellationAsync<TSource, TAccumulate, TResult>(this IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, UniTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, UniTask<TResult>> resultSelector, CancellationToken cancellationToken = default)
{
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(accumulator, nameof(accumulator));
Error.ThrowArgumentNullException(accumulator, nameof(resultSelector));
return Aggregate.InvokeAsync(source, seed, accumulator, resultSelector, cancellationToken);
}
}
internal static class Aggregate
{
internal static async UniTask<TSource> InvokeAsync<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
{
TSource value;
if (await e.MoveNextAsync())
{
value = e.Current;
}
else
{
throw Error.NoElements();
}
while (await e.MoveNextAsync())
{
value = accumulator(value, e.Current);
}
return value;
}
finally
{
if (e != null)
{
await e.DisposeAsync();
}
}
}
internal static async UniTask<TAccumulate> InvokeAsync<TSource, TAccumulate>(IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
{
TAccumulate value = seed;
while (await e.MoveNextAsync())
{
value = accumulator(value, e.Current);
}
return value;
}
finally
{
if (e != null)
{
await e.DisposeAsync();
}
}
}
internal static async UniTask<TResult> InvokeAsync<TSource, TAccumulate, TResult>(IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
{
TAccumulate value = seed;
while (await e.MoveNextAsync())
{
value = accumulator(value, e.Current);
}
return resultSelector(value);
}
finally
{
if (e != null)
{
await e.DisposeAsync();
}
}
}
// with async
internal static async UniTask<TSource> InvokeAsync<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, TSource, UniTask<TSource>> accumulator, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
{
TSource value;
if (await e.MoveNextAsync())
{
value = e.Current;
}
else
{
throw Error.NoElements();
}
while (await e.MoveNextAsync())
{
value = await accumulator(value, e.Current);
}
return value;
}
finally
{
if (e != null)
{
await e.DisposeAsync();
}
}
}
internal static async UniTask<TAccumulate> InvokeAsync<TSource, TAccumulate>(IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, UniTask<TAccumulate>> accumulator, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
{
TAccumulate value = seed;
while (await e.MoveNextAsync())
{
value = await accumulator(value, e.Current);
}
return value;
}
finally
{
if (e != null)
{
await e.DisposeAsync();
}
}
}
internal static async UniTask<TResult> InvokeAsync<TSource, TAccumulate, TResult>(IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, UniTask<TAccumulate>> accumulator, Func<TAccumulate, UniTask<TResult>> resultSelector, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
{
TAccumulate value = seed;
while (await e.MoveNextAsync())
{
value = await accumulator(value, e.Current);
}
return await resultSelector(value);
}
finally
{
if (e != null)
{
await e.DisposeAsync();
}
}
}
// with cancellation
internal static async UniTask<TSource> InvokeAsync<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, UniTask<TSource>> accumulator, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
{
TSource value;
if (await e.MoveNextAsync())
{
value = e.Current;
}
else
{
throw Error.NoElements();
}
while (await e.MoveNextAsync())
{
value = await accumulator(value, e.Current, cancellationToken);
}
return value;
}
finally
{
if (e != null)
{
await e.DisposeAsync();
}
}
}
internal static async UniTask<TAccumulate> InvokeAsync<TSource, TAccumulate>(IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, UniTask<TAccumulate>> accumulator, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
{
TAccumulate value = seed;
while (await e.MoveNextAsync())
{
value = await accumulator(value, e.Current, cancellationToken);
}
return value;
}
finally
{
if (e != null)
{
await e.DisposeAsync();
}
}
}
internal static async UniTask<TResult> InvokeAsync<TSource, TAccumulate, TResult>(IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, UniTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, UniTask<TResult>> resultSelector, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
{
TAccumulate value = seed;
while (await e.MoveNextAsync())
{
value = await accumulator(value, e.Current, cancellationToken);
}
return await resultSelector(value, cancellationToken);
}
finally
{
if (e != null)
{
await e.DisposeAsync();
}
}
}
}
}

View File

@@ -11,7 +11,7 @@ namespace Cysharp.Threading.Tasks.Linq
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(action, nameof(action));
return Cysharp.Threading.Tasks.Linq.ForEach<TSource>.InvokeAsync(source, action, cancellationToken);
return Cysharp.Threading.Tasks.Linq.ForEach.InvokeAsync(source, action, cancellationToken);
}
public static UniTask ForEachAsync<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Action<TSource, Int32> action, CancellationToken cancellationToken = default)
@@ -19,7 +19,7 @@ namespace Cysharp.Threading.Tasks.Linq
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(action, nameof(action));
return Cysharp.Threading.Tasks.Linq.ForEach<TSource>.InvokeAsync(source, action, cancellationToken);
return Cysharp.Threading.Tasks.Linq.ForEach.InvokeAsync(source, action, cancellationToken);
}
public static UniTask ForEachAwaitAsync<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> action, CancellationToken cancellationToken = default)
@@ -27,7 +27,7 @@ namespace Cysharp.Threading.Tasks.Linq
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(action, nameof(action));
return Cysharp.Threading.Tasks.Linq.ForEach<TSource>.InvokeAwaitAsync(source, action, cancellationToken);
return Cysharp.Threading.Tasks.Linq.ForEach.InvokeAwaitAsync(source, action, cancellationToken);
}
public static UniTask ForEachAwaitAsync<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, Int32, UniTask> action, CancellationToken cancellationToken = default)
@@ -35,7 +35,7 @@ namespace Cysharp.Threading.Tasks.Linq
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(action, nameof(action));
return Cysharp.Threading.Tasks.Linq.ForEach<TSource>.InvokeAwaitAsync(source, action, cancellationToken);
return Cysharp.Threading.Tasks.Linq.ForEach.InvokeAwaitAsync(source, action, cancellationToken);
}
public static UniTask ForEachAwaitWithCancellationAsync<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> action, CancellationToken cancellationToken = default)
@@ -43,7 +43,7 @@ namespace Cysharp.Threading.Tasks.Linq
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(action, nameof(action));
return Cysharp.Threading.Tasks.Linq.ForEach<TSource>.InvokeAwaitWithCancellationAsync(source, action, cancellationToken);
return Cysharp.Threading.Tasks.Linq.ForEach.InvokeAwaitWithCancellationAsync(source, action, cancellationToken);
}
public static UniTask ForEachAwaitWithCancellationAsync<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, Int32, CancellationToken, UniTask> action, CancellationToken cancellationToken = default)
@@ -51,13 +51,13 @@ namespace Cysharp.Threading.Tasks.Linq
Error.ThrowArgumentNullException(source, nameof(source));
Error.ThrowArgumentNullException(action, nameof(action));
return Cysharp.Threading.Tasks.Linq.ForEach<TSource>.InvokeAwaitWithCancellationAsync(source, action, cancellationToken);
return Cysharp.Threading.Tasks.Linq.ForEach.InvokeAwaitWithCancellationAsync(source, action, cancellationToken);
}
}
internal static class ForEach<TSource>
internal static class ForEach
{
public static async UniTask InvokeAsync(IUniTaskAsyncEnumerable<TSource> source, Action<TSource> action, CancellationToken cancellationToken)
public static async UniTask InvokeAsync<TSource>(IUniTaskAsyncEnumerable<TSource> source, Action<TSource> action, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
@@ -76,7 +76,7 @@ namespace Cysharp.Threading.Tasks.Linq
}
}
public static async UniTask InvokeAsync(IUniTaskAsyncEnumerable<TSource> source, Action<TSource, Int32> action, CancellationToken cancellationToken)
public static async UniTask InvokeAsync<TSource>(IUniTaskAsyncEnumerable<TSource> source, Action<TSource, Int32> action, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
@@ -96,7 +96,7 @@ namespace Cysharp.Threading.Tasks.Linq
}
}
public static async UniTask InvokeAwaitAsync(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> action, CancellationToken cancellationToken)
public static async UniTask InvokeAwaitAsync<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> action, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
@@ -115,7 +115,7 @@ namespace Cysharp.Threading.Tasks.Linq
}
}
public static async UniTask InvokeAwaitAsync(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, Int32, UniTask> action, CancellationToken cancellationToken)
public static async UniTask InvokeAwaitAsync<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, Int32, UniTask> action, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
@@ -135,7 +135,7 @@ namespace Cysharp.Threading.Tasks.Linq
}
}
public static async UniTask InvokeAwaitWithCancellationAsync(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> action, CancellationToken cancellationToken)
public static async UniTask InvokeAwaitWithCancellationAsync<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, UniTask> action, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try
@@ -154,7 +154,7 @@ namespace Cysharp.Threading.Tasks.Linq
}
}
public static async UniTask InvokeAwaitWithCancellationAsync(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, Int32, CancellationToken, UniTask> action, CancellationToken cancellationToken)
public static async UniTask InvokeAwaitWithCancellationAsync<TSource>(IUniTaskAsyncEnumerable<TSource> source, Func<TSource, Int32, CancellationToken, UniTask> action, CancellationToken cancellationToken)
{
var e = source.GetAsyncEnumerator(cancellationToken);
try

View File

@@ -1,775 +0,0 @@
namespace Cysharp.Threading.Tasks.Linq
{
internal sealed class _Aggregate
{
}
}

View File

@@ -21,50 +21,7 @@ namespace ___Dummy
{
// Buffer,Distinct, DistinctUntilChanged, Do, MaxBy, MinBy, Never,Return, Throw
public static UniTask<TSource> AggregateAsync<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
public static UniTask<TAccumulate> AggregateAsync<TSource, TAccumulate>(this IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
public static UniTask<TResult> AggregateAsync<TSource, TAccumulate, TResult>(this IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, Func<TAccumulate, TResult> resultSelector, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
public static UniTask<TSource> AggregateAwaitAsync<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, TSource, UniTask<TSource>> accumulator, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
public static UniTask<TAccumulate> AggregateAwaitAsync<TSource, TAccumulate>(this IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, UniTask<TAccumulate>> accumulator, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
public static UniTask<TResult> AggregateAwaitAsync<TSource, TAccumulate, TResult>(this IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, UniTask<TAccumulate>> accumulator, Func<TAccumulate, UniTask<TResult>> resultSelector, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
public static UniTask<TSource> AggregateAwaitWithCancellationAsync<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, UniTask<TSource>> accumulator, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
public static UniTask<TAccumulate> AggregateAwaitWithCancellationAsync<TSource, TAccumulate>(this IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, UniTask<TAccumulate>> accumulator, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
public static UniTask<TResult> AggregateAwaitWithCancellationAsync<TSource, TAccumulate, TResult>(this IUniTaskAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, UniTask<TAccumulate>> accumulator, Func<TAccumulate, CancellationToken, UniTask<TResult>> resultSelector, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
public static IUniTaskAsyncEnumerable<TSource> Append<TSource>(this IUniTaskAsyncEnumerable<TSource> source, TSource element)
{