mirror of
https://github.com/Cysharp/UniTask.git
synced 2026-05-19 13:40:11 +00:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2b7986da19 | ||
|
|
c3d22968e1 | ||
|
|
0e25122ee2 | ||
|
|
4504d84aa8 | ||
|
|
2b87cadba3 |
@@ -93,10 +93,16 @@ namespace NetCoreSandbox
|
|||||||
|
|
||||||
var channel = Channel.CreateSingleConsumerUnbounded<int>();
|
var channel = Channel.CreateSingleConsumerUnbounded<int>();
|
||||||
|
|
||||||
|
// Observable.Range(1,10).CombineLatest(
|
||||||
|
|
||||||
|
var cts = new CancellationTokenSource();
|
||||||
|
|
||||||
|
var token = cts.Token;
|
||||||
|
|
||||||
|
FooAsync(token).ForEachAsync(x => { }, token);
|
||||||
|
|
||||||
|
|
||||||
|
// Observable.Range(1,10).CombineLatest(
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -52,44 +52,44 @@ namespace NetCoreTests
|
|||||||
ar.Should().BeEquivalentTo(new[] { 100, 100, 100, 131, 191 });
|
ar.Should().BeEquivalentTo(new[] { 100, 100, 100, 131, 191 });
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
//[Fact]
|
||||||
public async Task StateIteration()
|
//public async Task StateIteration()
|
||||||
{
|
//{
|
||||||
var rp = new State<int>(99);
|
// var rp = new ReadOnlyAsyncReactiveProperty<int>(99);
|
||||||
var setter = rp.GetSetter();
|
// var setter = rp.GetSetter();
|
||||||
|
|
||||||
var f = await rp.FirstAsync();
|
// var f = await rp.FirstAsync();
|
||||||
f.Should().Be(99);
|
// f.Should().Be(99);
|
||||||
|
|
||||||
var array = rp.Take(5).ToArrayAsync();
|
// var array = rp.Take(5).ToArrayAsync();
|
||||||
|
|
||||||
setter(100);
|
// setter(100);
|
||||||
setter(100);
|
// setter(100);
|
||||||
setter(100);
|
// setter(100);
|
||||||
setter(131);
|
// setter(131);
|
||||||
|
|
||||||
var ar = await array;
|
// var ar = await array;
|
||||||
|
|
||||||
ar.Should().BeEquivalentTo(new[] { 99, 100, 100, 100, 131 });
|
// ar.Should().BeEquivalentTo(new[] { 99, 100, 100, 100, 131 });
|
||||||
}
|
//}
|
||||||
|
|
||||||
[Fact]
|
//[Fact]
|
||||||
public async Task StateWithoutCurrent()
|
//public async Task StateWithoutCurrent()
|
||||||
{
|
//{
|
||||||
var rp = new State<int>(99);
|
// var rp = new ReadOnlyAsyncReactiveProperty<int>(99);
|
||||||
var setter = rp.GetSetter();
|
// var setter = rp.GetSetter();
|
||||||
|
|
||||||
var array = rp.WithoutCurrent().Take(5).ToArrayAsync();
|
// var array = rp.WithoutCurrent().Take(5).ToArrayAsync();
|
||||||
setter(100);
|
// setter(100);
|
||||||
setter(100);
|
// setter(100);
|
||||||
setter(100);
|
// setter(100);
|
||||||
setter(131);
|
// setter(131);
|
||||||
setter(191);
|
// setter(191);
|
||||||
|
|
||||||
var ar = await array;
|
// var ar = await array;
|
||||||
|
|
||||||
ar.Should().BeEquivalentTo(new[] { 100, 100, 100, 131, 191 });
|
// ar.Should().BeEquivalentTo(new[] { 100, 100, 100, 131, 191 });
|
||||||
}
|
//}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -98,7 +98,7 @@ namespace NetCoreTests
|
|||||||
{
|
{
|
||||||
var rp = new AsyncReactiveProperty<int>(10);
|
var rp = new AsyncReactiveProperty<int>(10);
|
||||||
|
|
||||||
var state = rp.ToState(CancellationToken.None);
|
var state = rp.ToReadOnlyAsyncReactiveProperty(CancellationToken.None);
|
||||||
|
|
||||||
rp.Value = 10;
|
rp.Value = 10;
|
||||||
state.Value.Should().Be(10);
|
state.Value.Should().Be(10);
|
||||||
|
|||||||
@@ -319,5 +319,119 @@ namespace NetCoreTests.Linq
|
|||||||
await Assert.ThrowsAsync<UniTaskTestException>(async () => await ys);
|
await Assert.ThrowsAsync<UniTaskTestException>(async () => await ys);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CombineLatestOK()
|
||||||
|
{
|
||||||
|
var a = new AsyncReactiveProperty<int>(0);
|
||||||
|
var b = new AsyncReactiveProperty<int>(0);
|
||||||
|
|
||||||
|
var list = new List<(int, int)>();
|
||||||
|
var complete = a.WithoutCurrent().CombineLatest(b.WithoutCurrent(), (x, y) => (x, y)).ForEachAsync(x => list.Add(x));
|
||||||
|
|
||||||
|
list.Count.Should().Be(0);
|
||||||
|
|
||||||
|
a.Value = 10;
|
||||||
|
list.Count.Should().Be(0);
|
||||||
|
|
||||||
|
a.Value = 20;
|
||||||
|
list.Count.Should().Be(0);
|
||||||
|
|
||||||
|
b.Value = 1;
|
||||||
|
list.Count.Should().Be(1);
|
||||||
|
|
||||||
|
list[0].Should().Be((20, 1));
|
||||||
|
|
||||||
|
a.Value = 30;
|
||||||
|
list.Last().Should().Be((30, 1));
|
||||||
|
|
||||||
|
b.Value = 2;
|
||||||
|
list.Last().Should().Be((30, 2));
|
||||||
|
|
||||||
|
a.Dispose();
|
||||||
|
b.Value = 3;
|
||||||
|
list.Last().Should().Be((30, 3));
|
||||||
|
|
||||||
|
b.Dispose();
|
||||||
|
|
||||||
|
await complete;
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CombineLatestLong()
|
||||||
|
{
|
||||||
|
var a = UniTaskAsyncEnumerable.Range(1, 100000);
|
||||||
|
var b = new AsyncReactiveProperty<int>(0);
|
||||||
|
|
||||||
|
var list = new List<(int, int)>();
|
||||||
|
var complete = a.CombineLatest(b.WithoutCurrent(), (x, y) => (x, y)).ForEachAsync(x => list.Add(x));
|
||||||
|
|
||||||
|
b.Value = 1;
|
||||||
|
|
||||||
|
list[0].Should().Be((100000, 1));
|
||||||
|
|
||||||
|
b.Dispose();
|
||||||
|
|
||||||
|
await complete;
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CombineLatestError()
|
||||||
|
{
|
||||||
|
var a = new AsyncReactiveProperty<int>(0);
|
||||||
|
var b = new AsyncReactiveProperty<int>(0);
|
||||||
|
|
||||||
|
var list = new List<(int, int)>();
|
||||||
|
var complete = a.WithoutCurrent()
|
||||||
|
.Select(x => { if (x == 0) { throw new MyException(); } return x; })
|
||||||
|
.CombineLatest(b.WithoutCurrent(), (x, y) => (x, y)).ForEachAsync(x => list.Add(x));
|
||||||
|
|
||||||
|
|
||||||
|
a.Value = 10;
|
||||||
|
b.Value = 1;
|
||||||
|
list.Last().Should().Be((10, 1));
|
||||||
|
|
||||||
|
a.Value = 0;
|
||||||
|
|
||||||
|
await Assert.ThrowsAsync<MyException>(async () => await complete);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task PariwiseImmediate()
|
||||||
|
{
|
||||||
|
var xs = await UniTaskAsyncEnumerable.Range(1, 5).Pairwise().ToArrayAsync();
|
||||||
|
xs.Should().BeEquivalentTo((1, 2), (2, 3), (3, 4), (4, 5));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Pariwise()
|
||||||
|
{
|
||||||
|
var a = new AsyncReactiveProperty<int>(0);
|
||||||
|
|
||||||
|
var list = new List<(int, int)>();
|
||||||
|
var complete = a.WithoutCurrent().Pairwise().ForEachAsync(x => list.Add(x));
|
||||||
|
|
||||||
|
list.Count.Should().Be(0);
|
||||||
|
a.Value = 10;
|
||||||
|
list.Count.Should().Be(0);
|
||||||
|
a.Value = 20;
|
||||||
|
list.Count.Should().Be(1);
|
||||||
|
a.Value = 30;
|
||||||
|
a.Value = 40;
|
||||||
|
a.Value = 50;
|
||||||
|
|
||||||
|
a.Dispose();
|
||||||
|
|
||||||
|
await complete;
|
||||||
|
|
||||||
|
list.Should().BeEquivalentTo((10, 20), (20, 30), (30, 40), (40, 50));
|
||||||
|
}
|
||||||
|
|
||||||
|
class MyException : Exception
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -175,13 +175,11 @@ namespace Cysharp.Threading.Tasks
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public class State<T> : IReadOnlyAsyncReactiveProperty<T>, IDisposable
|
public class ReadOnlyAsyncReactiveProperty<T> : IReadOnlyAsyncReactiveProperty<T>, IDisposable
|
||||||
{
|
{
|
||||||
TriggerEvent<T> triggerEvent;
|
TriggerEvent<T> triggerEvent;
|
||||||
|
|
||||||
T latestValue;
|
T latestValue;
|
||||||
|
|
||||||
Action<T> setter;
|
|
||||||
IUniTaskAsyncEnumerator<T> enumerator;
|
IUniTaskAsyncEnumerator<T> enumerator;
|
||||||
|
|
||||||
public T Value
|
public T Value
|
||||||
@@ -192,19 +190,13 @@ namespace Cysharp.Threading.Tasks
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public State(T value)
|
public ReadOnlyAsyncReactiveProperty(T initialValue, IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
|
||||||
{
|
|
||||||
this.latestValue = value;
|
|
||||||
this.triggerEvent = default;
|
|
||||||
}
|
|
||||||
|
|
||||||
public State(T initialValue, IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
|
|
||||||
{
|
{
|
||||||
latestValue = initialValue;
|
latestValue = initialValue;
|
||||||
ConsumeEnumerator(source, cancellationToken).Forget();
|
ConsumeEnumerator(source, cancellationToken).Forget();
|
||||||
}
|
}
|
||||||
|
|
||||||
public State(IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
|
public ReadOnlyAsyncReactiveProperty(IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
ConsumeEnumerator(source, cancellationToken).Forget();
|
ConsumeEnumerator(source, cancellationToken).Forget();
|
||||||
}
|
}
|
||||||
@@ -216,7 +208,9 @@ namespace Cysharp.Threading.Tasks
|
|||||||
{
|
{
|
||||||
while (await enumerator.MoveNextAsync())
|
while (await enumerator.MoveNextAsync())
|
||||||
{
|
{
|
||||||
SetValue(enumerator.Current);
|
var value = enumerator.Current;
|
||||||
|
this.latestValue = value;
|
||||||
|
triggerEvent.SetResult(value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
@@ -226,28 +220,6 @@ namespace Cysharp.Threading.Tasks
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Action<T> GetSetter()
|
|
||||||
{
|
|
||||||
if (enumerator != null)
|
|
||||||
{
|
|
||||||
throw new InvalidOperationException("Can not get setter when create from IUniTaskAsyncEnumerable source.");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (setter != null)
|
|
||||||
{
|
|
||||||
throw new InvalidOperationException("GetSetter can only call once.");
|
|
||||||
}
|
|
||||||
|
|
||||||
setter = SetValue;
|
|
||||||
return setter;
|
|
||||||
}
|
|
||||||
|
|
||||||
void SetValue(T value)
|
|
||||||
{
|
|
||||||
this.latestValue = value;
|
|
||||||
triggerEvent.SetResult(value);
|
|
||||||
}
|
|
||||||
|
|
||||||
public IUniTaskAsyncEnumerable<T> WithoutCurrent()
|
public IUniTaskAsyncEnumerable<T> WithoutCurrent()
|
||||||
{
|
{
|
||||||
return new WithoutCurrentEnumerable(this);
|
return new WithoutCurrentEnumerable(this);
|
||||||
@@ -268,12 +240,7 @@ namespace Cysharp.Threading.Tasks
|
|||||||
triggerEvent.SetCompleted();
|
triggerEvent.SetCompleted();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static implicit operator State<T>(T value)
|
public static implicit operator T(ReadOnlyAsyncReactiveProperty<T> value)
|
||||||
{
|
|
||||||
return new State<T>(value);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static implicit operator T(State<T> value)
|
|
||||||
{
|
{
|
||||||
return value.Value;
|
return value.Value;
|
||||||
}
|
}
|
||||||
@@ -286,16 +253,16 @@ namespace Cysharp.Threading.Tasks
|
|||||||
|
|
||||||
static bool isValueType;
|
static bool isValueType;
|
||||||
|
|
||||||
static State()
|
static ReadOnlyAsyncReactiveProperty()
|
||||||
{
|
{
|
||||||
isValueType = typeof(T).IsValueType;
|
isValueType = typeof(T).IsValueType;
|
||||||
}
|
}
|
||||||
|
|
||||||
class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
|
class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
|
||||||
{
|
{
|
||||||
readonly State<T> parent;
|
readonly ReadOnlyAsyncReactiveProperty<T> parent;
|
||||||
|
|
||||||
public WithoutCurrentEnumerable(State<T> parent)
|
public WithoutCurrentEnumerable(ReadOnlyAsyncReactiveProperty<T> parent)
|
||||||
{
|
{
|
||||||
this.parent = parent;
|
this.parent = parent;
|
||||||
}
|
}
|
||||||
@@ -310,14 +277,14 @@ namespace Cysharp.Threading.Tasks
|
|||||||
{
|
{
|
||||||
static Action<object> cancellationCallback = CancellationCallback;
|
static Action<object> cancellationCallback = CancellationCallback;
|
||||||
|
|
||||||
readonly State<T> parent;
|
readonly ReadOnlyAsyncReactiveProperty<T> parent;
|
||||||
readonly CancellationToken cancellationToken;
|
readonly CancellationToken cancellationToken;
|
||||||
readonly CancellationTokenRegistration cancellationTokenRegistration;
|
readonly CancellationTokenRegistration cancellationTokenRegistration;
|
||||||
T value;
|
T value;
|
||||||
bool isDisposed;
|
bool isDisposed;
|
||||||
bool firstCall;
|
bool firstCall;
|
||||||
|
|
||||||
public Enumerator(State<T> parent, CancellationToken cancellationToken, bool publishCurrentValue)
|
public Enumerator(ReadOnlyAsyncReactiveProperty<T> parent, CancellationToken cancellationToken, bool publishCurrentValue)
|
||||||
{
|
{
|
||||||
this.parent = parent;
|
this.parent = parent;
|
||||||
this.cancellationToken = cancellationToken;
|
this.cancellationToken = cancellationToken;
|
||||||
@@ -391,14 +358,14 @@ namespace Cysharp.Threading.Tasks
|
|||||||
|
|
||||||
public static class StateExtensions
|
public static class StateExtensions
|
||||||
{
|
{
|
||||||
public static State<T> ToState<T>(this IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
|
public static ReadOnlyAsyncReactiveProperty<T> ToReadOnlyAsyncReactiveProperty<T>(this IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
return new State<T>(source, cancellationToken);
|
return new ReadOnlyAsyncReactiveProperty<T>(source, cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static State<T> ToState<T>(this IUniTaskAsyncEnumerable<T> source, T initialValue, CancellationToken cancellationToken)
|
public static ReadOnlyAsyncReactiveProperty<T> ToReadOnlyAsyncReactiveProperty<T>(this IUniTaskAsyncEnumerable<T> source, T initialValue, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
return new State<T>(initialValue, source, cancellationToken);
|
return new ReadOnlyAsyncReactiveProperty<T>(initialValue, source, cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
11134
src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/CombineLatest.cs
Normal file
11134
src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/CombineLatest.cs
Normal file
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,11 @@
|
|||||||
|
fileFormatVersion: 2
|
||||||
|
guid: 6cb07f6e88287e34d9b9301a572284a5
|
||||||
|
MonoImporter:
|
||||||
|
externalObjects: {}
|
||||||
|
serializedVersion: 2
|
||||||
|
defaultReferences: []
|
||||||
|
executionOrder: 0
|
||||||
|
icon: {instanceID: 0}
|
||||||
|
userData:
|
||||||
|
assetBundleName:
|
||||||
|
assetBundleVariant:
|
||||||
219
src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/CombineLatest.tt
Normal file
219
src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/CombineLatest.tt
Normal file
@@ -0,0 +1,219 @@
|
|||||||
|
<#@ template debug="false" hostspecific="false" language="C#" #>
|
||||||
|
<#@ assembly name="System.Core" #>
|
||||||
|
<#@ import namespace="System.Linq" #>
|
||||||
|
<#@ import namespace="System.Text" #>
|
||||||
|
<#@ import namespace="System.Collections.Generic" #>
|
||||||
|
<#@ output extension=".cs" #>
|
||||||
|
<#
|
||||||
|
var tMax = 15;
|
||||||
|
Func<int, string> typeArgs = x => string.Join(", ", Enumerable.Range(1, x).Select(x => $"T{x}")) + ", TResult";
|
||||||
|
Func<int, string> paramArgs = x => string.Join(", ", Enumerable.Range(1, x).Select(x => $"IUniTaskAsyncEnumerable<T{x}> source{x}"));
|
||||||
|
Func<int, string> parameters = x => string.Join(", ", Enumerable.Range(1, x).Select(x => $"source{x}"));
|
||||||
|
|
||||||
|
|
||||||
|
#>
|
||||||
|
using Cysharp.Threading.Tasks.Internal;
|
||||||
|
using System;
|
||||||
|
using System.Threading;
|
||||||
|
|
||||||
|
namespace Cysharp.Threading.Tasks.Linq
|
||||||
|
{
|
||||||
|
public static partial class UniTaskAsyncEnumerable
|
||||||
|
{
|
||||||
|
<# for(var i = 2; i <= tMax; i++) { #>
|
||||||
|
public static IUniTaskAsyncEnumerable<TResult> CombineLatest<<#= typeArgs(i) #>>(this <#= paramArgs(i) #>, Func<<#= typeArgs(i) #>> resultSelector)
|
||||||
|
{
|
||||||
|
<# for(var j = 1; j <= i; j++) { #>
|
||||||
|
Error.ThrowArgumentNullException(source<#= j #>, nameof(source<#= j #>));
|
||||||
|
<# } #>
|
||||||
|
Error.ThrowArgumentNullException(resultSelector, nameof(resultSelector));
|
||||||
|
|
||||||
|
return new CombineLatest<<#= typeArgs(i) #>>(<#= parameters(i) #>, resultSelector);
|
||||||
|
}
|
||||||
|
|
||||||
|
<# } #>
|
||||||
|
}
|
||||||
|
|
||||||
|
<# for(var i = 2; i <= tMax; i++) { #>
|
||||||
|
internal class CombineLatest<<#= typeArgs(i) #>> : IUniTaskAsyncEnumerable<TResult>
|
||||||
|
{
|
||||||
|
<# for(var j = 1; j <= i; j++) { #>
|
||||||
|
readonly IUniTaskAsyncEnumerable<T<#= j #>> source<#= j #>;
|
||||||
|
<# } #>
|
||||||
|
readonly Func<<#= typeArgs(i) #>> resultSelector;
|
||||||
|
|
||||||
|
public CombineLatest(<#= paramArgs(i) #>, Func<<#= typeArgs(i) #>> resultSelector)
|
||||||
|
{
|
||||||
|
<# for(var j = 1; j <= i; j++) { #>
|
||||||
|
this.source<#= j #> = source<#= j #>;
|
||||||
|
<# } #>
|
||||||
|
this.resultSelector = resultSelector;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IUniTaskAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return new _CombineLatest(<#= parameters(i) #>, resultSelector, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
class _CombineLatest : MoveNextSource, IUniTaskAsyncEnumerator<TResult>
|
||||||
|
{
|
||||||
|
<# for(var j = 1; j <= i; j++) { #>
|
||||||
|
static readonly Action<object> Completed<#= j #>Delegate = Completed<#= j #>;
|
||||||
|
<# } #>
|
||||||
|
const int CompleteCount = <#= i #>;
|
||||||
|
|
||||||
|
<# for(var j = 1; j <= i; j++) { #>
|
||||||
|
readonly IUniTaskAsyncEnumerable<T<#= j #>> source<#= j #>;
|
||||||
|
<# } #>
|
||||||
|
readonly Func<<#= typeArgs(i) #>> resultSelector;
|
||||||
|
CancellationToken cancellationToken;
|
||||||
|
|
||||||
|
<# for(var j = 1; j <= i; j++) { #>
|
||||||
|
IUniTaskAsyncEnumerator<T<#= j #>> enumerator<#= j #>;
|
||||||
|
UniTask<bool>.Awaiter awaiter<#= j #>;
|
||||||
|
bool hasCurrent<#= j #>;
|
||||||
|
bool running<#= j #>;
|
||||||
|
T<#= j #> current<#= j #>;
|
||||||
|
|
||||||
|
<# } #>
|
||||||
|
int completedCount;
|
||||||
|
bool syncRunning;
|
||||||
|
TResult result;
|
||||||
|
|
||||||
|
public _CombineLatest(<#= paramArgs(i) #>, Func<<#= typeArgs(i) #>> resultSelector, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
<# for(var j = 1; j <= i; j++) { #>
|
||||||
|
this.source<#= j #> = source<#= j #>;
|
||||||
|
<# } #>
|
||||||
|
this.resultSelector = resultSelector;
|
||||||
|
this.cancellationToken = cancellationToken;
|
||||||
|
TaskTracker.TrackActiveTask(this, 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TResult Current => result;
|
||||||
|
|
||||||
|
public UniTask<bool> MoveNextAsync()
|
||||||
|
{
|
||||||
|
cancellationToken.ThrowIfCancellationRequested();
|
||||||
|
if (completedCount == CompleteCount) return CompletedTasks.False;
|
||||||
|
|
||||||
|
if (enumerator1 == null)
|
||||||
|
{
|
||||||
|
<# for(var j = 1; j <= i; j++) { #>
|
||||||
|
enumerator<#= j #> = source<#= j #>.GetAsyncEnumerator(cancellationToken);
|
||||||
|
<# } #>
|
||||||
|
}
|
||||||
|
|
||||||
|
completionSource.Reset();
|
||||||
|
|
||||||
|
AGAIN:
|
||||||
|
syncRunning = true;
|
||||||
|
<# for(var j = 1; j <= i; j++) { #>
|
||||||
|
if (!running<#= j #>)
|
||||||
|
{
|
||||||
|
running<#= j #> = true;
|
||||||
|
awaiter<#= j #> = enumerator<#= j #>.MoveNextAsync().GetAwaiter();
|
||||||
|
if (awaiter<#= j #>.IsCompleted)
|
||||||
|
{
|
||||||
|
Completed<#= j #>(this);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
awaiter<#= j #>.SourceOnCompleted(Completed<#= j #>Delegate, this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
<# } #>
|
||||||
|
|
||||||
|
if (<#= string.Join(" || ", Enumerable.Range(1, i).Select(x => $"!running{x}")) #>)
|
||||||
|
{
|
||||||
|
goto AGAIN;
|
||||||
|
}
|
||||||
|
syncRunning = false;
|
||||||
|
|
||||||
|
return new UniTask<bool>(this, completionSource.Version);
|
||||||
|
}
|
||||||
|
|
||||||
|
<# for(var j = 1; j <= i; j++) { #>
|
||||||
|
static void Completed<#= j #>(object state)
|
||||||
|
{
|
||||||
|
var self = (_CombineLatest)state;
|
||||||
|
self.running<#= j #> = false;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (self.awaiter<#= j #>.GetResult())
|
||||||
|
{
|
||||||
|
self.hasCurrent<#= j #> = true;
|
||||||
|
self.current<#= j #> = self.enumerator<#= j #>.Current;
|
||||||
|
goto SUCCESS;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
self.running<#= j #> = true; // as complete, no more call MoveNextAsync.
|
||||||
|
if (Interlocked.Increment(ref self.completedCount) == CompleteCount)
|
||||||
|
{
|
||||||
|
goto COMPLETE;
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
self.completedCount = CompleteCount;
|
||||||
|
self.completionSource.TrySetException(ex);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SUCCESS:
|
||||||
|
if (!self.TrySetResult())
|
||||||
|
{
|
||||||
|
if (self.syncRunning) return;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
self.awaiter<#= j #> = self.enumerator<#= j #>.MoveNextAsync().GetAwaiter();
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
self.completedCount = CompleteCount;
|
||||||
|
self.completionSource.TrySetException(ex);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.awaiter<#= j #>.SourceOnCompleted(Completed<#= j #>Delegate, self);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
COMPLETE:
|
||||||
|
self.completionSource.TrySetResult(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
<# } #>
|
||||||
|
bool TrySetResult()
|
||||||
|
{
|
||||||
|
if (<#= string.Join(" && ", Enumerable.Range(1, i).Select(x => $"hasCurrent{x}")) #>)
|
||||||
|
{
|
||||||
|
result = resultSelector(<#= string.Join(", ", Enumerable.Range(1, i).Select(x => $"current{x}")) #>);
|
||||||
|
completionSource.TrySetResult(true);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async UniTask DisposeAsync()
|
||||||
|
{
|
||||||
|
TaskTracker.RemoveTracking(this);
|
||||||
|
<# for(var j = 1; j <= i; j++) { #>
|
||||||
|
if (enumerator<#= j #> != null)
|
||||||
|
{
|
||||||
|
await enumerator<#= j #>.DisposeAsync();
|
||||||
|
}
|
||||||
|
<# } #>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
<# } #>
|
||||||
|
}
|
||||||
@@ -0,0 +1,7 @@
|
|||||||
|
fileFormatVersion: 2
|
||||||
|
guid: b1b8cfa9d17af814a971ee2224aaaaa2
|
||||||
|
DefaultImporter:
|
||||||
|
externalObjects: {}
|
||||||
|
userData:
|
||||||
|
assetBundleName:
|
||||||
|
assetBundleVariant:
|
||||||
128
src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Pairwise.cs
Normal file
128
src/UniTask/Assets/Plugins/UniTask/Runtime/Linq/Pairwise.cs
Normal file
@@ -0,0 +1,128 @@
|
|||||||
|
using Cysharp.Threading.Tasks.Internal;
|
||||||
|
using System;
|
||||||
|
using System.Threading;
|
||||||
|
|
||||||
|
namespace Cysharp.Threading.Tasks.Linq
|
||||||
|
{
|
||||||
|
public static partial class UniTaskAsyncEnumerable
|
||||||
|
{
|
||||||
|
public static IUniTaskAsyncEnumerable<(TSource, TSource)> Pairwise<TSource>(this IUniTaskAsyncEnumerable<TSource> source)
|
||||||
|
{
|
||||||
|
Error.ThrowArgumentNullException(source, nameof(source));
|
||||||
|
|
||||||
|
return new Pairwise<TSource>(source);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal sealed class Pairwise<TSource> : IUniTaskAsyncEnumerable<(TSource, TSource)>
|
||||||
|
{
|
||||||
|
readonly IUniTaskAsyncEnumerable<TSource> source;
|
||||||
|
|
||||||
|
public Pairwise(IUniTaskAsyncEnumerable<TSource> source)
|
||||||
|
{
|
||||||
|
this.source = source;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IUniTaskAsyncEnumerator<(TSource, TSource)> GetAsyncEnumerator(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return new _Pairwise(source, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
sealed class _Pairwise : MoveNextSource, IUniTaskAsyncEnumerator<(TSource, TSource)>
|
||||||
|
{
|
||||||
|
static readonly Action<object> MoveNextCoreDelegate = MoveNextCore;
|
||||||
|
|
||||||
|
readonly IUniTaskAsyncEnumerable<TSource> source;
|
||||||
|
CancellationToken cancellationToken;
|
||||||
|
|
||||||
|
IUniTaskAsyncEnumerator<TSource> enumerator;
|
||||||
|
UniTask<bool>.Awaiter awaiter;
|
||||||
|
|
||||||
|
TSource prev;
|
||||||
|
bool isFirst;
|
||||||
|
|
||||||
|
public _Pairwise(IUniTaskAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
this.source = source;
|
||||||
|
this.cancellationToken = cancellationToken;
|
||||||
|
TaskTracker.TrackActiveTask(this, 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
public (TSource, TSource) Current { get; private set; }
|
||||||
|
|
||||||
|
public UniTask<bool> MoveNextAsync()
|
||||||
|
{
|
||||||
|
cancellationToken.ThrowIfCancellationRequested();
|
||||||
|
|
||||||
|
if (enumerator == null)
|
||||||
|
{
|
||||||
|
isFirst = true;
|
||||||
|
enumerator = source.GetAsyncEnumerator(cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
completionSource.Reset();
|
||||||
|
SourceMoveNext();
|
||||||
|
return new UniTask<bool>(this, completionSource.Version);
|
||||||
|
}
|
||||||
|
|
||||||
|
void SourceMoveNext()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
awaiter = enumerator.MoveNextAsync().GetAwaiter();
|
||||||
|
if (awaiter.IsCompleted)
|
||||||
|
{
|
||||||
|
MoveNextCore(this);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
awaiter.SourceOnCompleted(MoveNextCoreDelegate, this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
completionSource.TrySetException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void MoveNextCore(object state)
|
||||||
|
{
|
||||||
|
var self = (_Pairwise)state;
|
||||||
|
|
||||||
|
if (self.TryGetResult(self.awaiter, out var result))
|
||||||
|
{
|
||||||
|
if (result)
|
||||||
|
{
|
||||||
|
if (self.isFirst)
|
||||||
|
{
|
||||||
|
self.isFirst = false;
|
||||||
|
self.prev = self.enumerator.Current;
|
||||||
|
self.SourceMoveNext(); // run again. okay to use recursive(only one more).
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
var p = self.prev;
|
||||||
|
self.prev = self.enumerator.Current;
|
||||||
|
self.Current = (p, self.prev);
|
||||||
|
self.completionSource.TrySetResult(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
self.completionSource.TrySetResult(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public UniTask DisposeAsync()
|
||||||
|
{
|
||||||
|
TaskTracker.RemoveTracking(this);
|
||||||
|
if (enumerator != null)
|
||||||
|
{
|
||||||
|
return enumerator.DisposeAsync();
|
||||||
|
}
|
||||||
|
return default;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,11 @@
|
|||||||
|
fileFormatVersion: 2
|
||||||
|
guid: cddbf051d2a88f549986c468b23214af
|
||||||
|
MonoImporter:
|
||||||
|
externalObjects: {}
|
||||||
|
serializedVersion: 2
|
||||||
|
defaultReferences: []
|
||||||
|
executionOrder: 0
|
||||||
|
icon: {instanceID: 0}
|
||||||
|
userData:
|
||||||
|
assetBundleName:
|
||||||
|
assetBundleVariant:
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
{
|
{
|
||||||
"name": "com.cysharp.unitask",
|
"name": "com.cysharp.unitask",
|
||||||
"displayName": "UniTask",
|
"displayName": "UniTask",
|
||||||
"version": "2.0.8-rc5",
|
"version": "2.0.9-rc6",
|
||||||
"unity": "2018.3",
|
"unity": "2018.3",
|
||||||
"description": "Provides an efficient async/await integration to Unity.",
|
"description": "Provides an efficient async/await integration to Unity.",
|
||||||
"keywords": [ "async/await", "async", "Task", "UniTask" ],
|
"keywords": [ "async/await", "async", "Task", "UniTask" ],
|
||||||
|
|||||||
@@ -142,8 +142,6 @@ public class SandboxMain : MonoBehaviour
|
|||||||
{
|
{
|
||||||
// State<int> Hp { get; }
|
// State<int> Hp { get; }
|
||||||
|
|
||||||
AsyncReactiveProperty<int> hp;
|
|
||||||
IReadOnlyAsyncReactiveProperty<int> Hp => hp;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -172,23 +170,10 @@ public class SandboxMain : MonoBehaviour
|
|||||||
public Text text;
|
public Text text;
|
||||||
public Button button;
|
public Button button;
|
||||||
|
|
||||||
[SerializeField]
|
|
||||||
State<int> count;
|
|
||||||
|
|
||||||
void Start2()
|
void Start2()
|
||||||
{
|
{
|
||||||
count = 10;
|
|
||||||
|
|
||||||
var countS = count.GetSetter();
|
|
||||||
|
|
||||||
count.BindTo(text);
|
|
||||||
button.OnClickAsAsyncEnumerable().ForEachAsync(_ =>
|
|
||||||
{
|
|
||||||
// int foo = countS;
|
|
||||||
//countS.Set(countS += 10);
|
|
||||||
|
|
||||||
// setter.SetValue(count.Value + 10);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -276,6 +261,8 @@ public class SandboxMain : MonoBehaviour
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
okButton.OnClickAsAsyncEnumerable().ForEachAsync(_ =>
|
okButton.OnClickAsAsyncEnumerable().ForEachAsync(_ =>
|
||||||
{
|
{
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user