Compare commits

...

4 Commits

Author SHA1 Message Date
neuecc
36d53a3bcb rc4 2020-05-20 11:05:58 +09:00
neuecc
ea9e61c2e1 Add CancellationToken.WaitUntilCanceled 2020-05-20 11:04:59 +09:00
neuecc
a52c26102b guard for ForEachAsync 2020-05-20 10:48:28 +09:00
neuecc
e31c87b8a8 Add IUniTaskAsyncEnumerable.Publish 2020-05-19 15:58:04 +09:00
13 changed files with 493 additions and 51 deletions

View File

@@ -378,15 +378,7 @@ ECS, PlayerLoop
TODO:
```csharp
// Setup Entities Loop.
var loop = PlayerLoop.GetDefaultPlayerLoop();
foreach (var world in Unity.Entities.World.All)
{
ScriptBehaviourUpdateOrder.UpdatePlayerLoop(world, loop);
loop = PlayerLoop.GetCurrentPlayerLoop();
}
// UniTask PlayerLoop Initialize.
var loop = PlayerLoop.GetCurrentPlayerLoop();
PlayerLoopHelper.Initialize(ref loop);
```

View File

@@ -0,0 +1,51 @@
using Cysharp.Threading.Tasks;
using FluentAssertions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using Cysharp.Threading.Tasks.Linq;
using System.Threading.Tasks;
using Xunit;
namespace NetCoreTests
{
public class CancellationTokenTest
{
[Fact]
public async Task WaitUntilCanceled()
{
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(1.5));
var now = DateTime.UtcNow;
await cts.Token.WaitUntilCanceled();
var elapsed = DateTime.UtcNow - now;
elapsed.Should().BeGreaterThan(TimeSpan.FromSeconds(1));
}
[Fact]
public void AlreadyCanceled()
{
var cts = new CancellationTokenSource();
cts.Cancel();
cts.Token.WaitUntilCanceled().GetAwaiter().IsCompleted.Should().BeTrue();
}
[Fact]
public void None()
{
CancellationToken.None.WaitUntilCanceled().GetAwaiter().IsCompleted.Should().BeTrue();
}
}
}

View File

@@ -0,0 +1,78 @@
using Cysharp.Threading.Tasks;
using Cysharp.Threading.Tasks.Linq;
using FluentAssertions;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace NetCoreTests.Linq
{
public class PublishTest
{
[Fact]
public async Task Normal()
{
var rp = new AsyncReactiveProperty<int>(1);
var multicast = rp.Publish();
var a = multicast.ToArrayAsync();
var b = multicast.Take(2).ToArrayAsync();
var disp = multicast.Connect();
rp.Value = 2;
(await b).Should().BeEquivalentTo(1, 2);
var c = multicast.ToArrayAsync();
rp.Value = 3;
rp.Value = 4;
rp.Value = 5;
rp.Dispose();
(await a).Should().BeEquivalentTo(1, 2, 3, 4, 5);
(await c).Should().BeEquivalentTo(3, 4, 5);
disp.Dispose();
}
[Fact]
public async Task Cancel()
{
var rp = new AsyncReactiveProperty<int>(1);
var multicast = rp.Publish();
var a = multicast.ToArrayAsync();
var b = multicast.Take(2).ToArrayAsync();
var disp = multicast.Connect();
rp.Value = 2;
(await b).Should().BeEquivalentTo(1, 2);
var c = multicast.ToArrayAsync();
rp.Value = 3;
disp.Dispose();
rp.Value = 4;
rp.Value = 5;
rp.Dispose();
await Assert.ThrowsAsync<OperationCanceledException>(async () => await a);
await Assert.ThrowsAsync<OperationCanceledException>(async () => await c);
}
}
}

View File

@@ -144,6 +144,11 @@ namespace Cysharp.Threading.Tasks
completionSource.TrySetResult(false);
}
public void OnError(Exception ex)
{
completionSource.TrySetException(ex);
}
static void CancellationCallback(object state)
{
var self = (Enumerator)state;

View File

@@ -1,6 +1,7 @@
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
using System;
using System.Runtime.CompilerServices;
using System.Threading;
namespace Cysharp.Threading.Tasks
@@ -9,15 +10,15 @@ namespace Cysharp.Threading.Tasks
{
static readonly Action<object> cancellationTokenCallback = Callback;
public static (UniTask, CancellationTokenRegistration) ToUniTask(this CancellationToken cts)
public static (UniTask, CancellationTokenRegistration) ToUniTask(this CancellationToken cancellationToken)
{
if (cts.IsCancellationRequested)
if (cancellationToken.IsCancellationRequested)
{
return (UniTask.FromCanceled(cts), default(CancellationTokenRegistration));
return (UniTask.FromCanceled(cancellationToken), default(CancellationTokenRegistration));
}
var promise = new UniTaskCompletionSource();
return (promise.Task, cts.RegisterWithoutCaptureExecutionContext(cancellationTokenCallback, promise));
return (promise.Task, cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationTokenCallback, promise));
}
static void Callback(object state)
@@ -26,6 +27,11 @@ namespace Cysharp.Threading.Tasks
promise.TrySetResult();
}
public static CancellationTokenAwaitable WaitUntilCanceled(this CancellationToken cancellationToken)
{
return new CancellationTokenAwaitable(cancellationToken);
}
public static CancellationTokenRegistration RegisterWithoutCaptureExecutionContext(this CancellationToken cancellationToken, Action callback)
{
var restoreFlow = false;
@@ -70,5 +76,46 @@ namespace Cysharp.Threading.Tasks
}
}
}
public struct CancellationTokenAwaitable
{
CancellationToken cancellationToken;
public CancellationTokenAwaitable(CancellationToken cancellationToken)
{
this.cancellationToken = cancellationToken;
}
public Awaiter GetAwaiter()
{
return new Awaiter(cancellationToken);
}
public struct Awaiter : ICriticalNotifyCompletion
{
CancellationToken cancellationToken;
public Awaiter(CancellationToken cancellationToken)
{
this.cancellationToken = cancellationToken;
}
public bool IsCompleted => !cancellationToken.CanBeCanceled || cancellationToken.IsCancellationRequested;
public void GetResult()
{
}
public void OnCompleted(Action continuation)
{
UnsafeOnCompleted(continuation);
}
public void UnsafeOnCompleted(Action continuation)
{
cancellationToken.RegisterWithoutCaptureExecutionContext(continuation);
}
}
}
}

View File

@@ -28,6 +28,12 @@ namespace Cysharp.Threading.Tasks
IUniTaskOrderedAsyncEnumerable<TElement> CreateOrderedEnumerable<TKey>(Func<TElement, CancellationToken, UniTask<TKey>> keySelector, IComparer<TKey> comparer, bool descending);
}
public interface IConnectableUniTaskAsyncEnumerable<out T> : IUniTaskAsyncEnumerable<T>
{
IDisposable Connect();
}
// don't use AsyncGrouping.
//public interface IUniTaskAsyncGrouping<out TKey, out TElement> : IUniTaskAsyncEnumerable<TElement>
//{
// TKey Key { get; }

View File

@@ -22,6 +22,22 @@ namespace Cysharp.Threading.Tasks.Linq
return Cysharp.Threading.Tasks.Linq.ForEach.ForEachAsync(source, action, cancellationToken);
}
/// <summary>Obsolete(Error), Use Use ForEachAwaitAsync instead.</summary>
[Obsolete("Use ForEachAwaitAsync instead.", true)]
[System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)]
public static UniTask ForEachAsync<T>(this IUniTaskAsyncEnumerable<T> source, Func<T, UniTask> action, CancellationToken cancellationToken = default)
{
throw new NotSupportedException("Use ForEachAwaitAsync instead.");
}
/// <summary>Obsolete(Error), Use Use ForEachAwaitAsync instead.</summary>
[Obsolete("Use ForEachAwaitAsync instead.", true)]
[System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)]
public static UniTask ForEachAsync<T>(this IUniTaskAsyncEnumerable<T> source, Func<T, int, UniTask> action, CancellationToken cancellationToken = default)
{
throw new NotSupportedException("Use ForEachAwaitAsync instead.");
}
public static UniTask ForEachAwaitAsync<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> action, CancellationToken cancellationToken = default)
{
Error.ThrowArgumentNullException(source, nameof(source));

View File

@@ -0,0 +1,171 @@
using Cysharp.Threading.Tasks.Internal;
using System;
using System.Threading;
namespace Cysharp.Threading.Tasks.Linq
{
public static partial class UniTaskAsyncEnumerable
{
public static IConnectableUniTaskAsyncEnumerable<TSource> Publish<TSource>(this IUniTaskAsyncEnumerable<TSource> source)
{
Error.ThrowArgumentNullException(source, nameof(source));
return new Publish<TSource>(source);
}
}
internal sealed class Publish<TSource> : IConnectableUniTaskAsyncEnumerable<TSource>
{
readonly IUniTaskAsyncEnumerable<TSource> source;
readonly CancellationTokenSource cancellationTokenSource;
TriggerEvent<TSource> trigger;
IUniTaskAsyncEnumerator<TSource> enumerator;
IDisposable connectedDisposable;
bool isCompleted;
public Publish(IUniTaskAsyncEnumerable<TSource> source)
{
this.source = source;
this.cancellationTokenSource = new CancellationTokenSource();
}
public IDisposable Connect()
{
if (connectedDisposable != null) return connectedDisposable;
if (enumerator == null)
{
enumerator = source.GetAsyncEnumerator(cancellationTokenSource.Token);
}
ConsumeEnumerator().Forget();
connectedDisposable = new ConnectDisposable(cancellationTokenSource);
return connectedDisposable;
}
async UniTaskVoid ConsumeEnumerator()
{
try
{
try
{
while (await enumerator.MoveNextAsync())
{
trigger.SetResult(enumerator.Current);
}
trigger.SetCompleted();
}
catch (Exception ex)
{
trigger.SetError(ex);
}
}
finally
{
isCompleted = true;
await enumerator.DisposeAsync();
}
}
public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new _Publish(this, cancellationToken);
}
sealed class ConnectDisposable : IDisposable
{
readonly CancellationTokenSource cancellationTokenSource;
public ConnectDisposable(CancellationTokenSource cancellationTokenSource)
{
this.cancellationTokenSource = cancellationTokenSource;
}
public void Dispose()
{
this.cancellationTokenSource.Cancel();
}
}
sealed class _Publish : MoveNextSource, IUniTaskAsyncEnumerator<TSource>, ITriggerHandler<TSource>
{
static readonly Action<object> CancelDelegate = OnCanceled;
readonly Publish<TSource> parent;
CancellationToken cancellationToken;
CancellationTokenRegistration cancellationTokenRegistration;
bool isDisposed;
public _Publish(Publish<TSource> parent, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;
this.parent = parent;
this.cancellationToken = cancellationToken;
if (cancellationToken.CanBeCanceled)
{
this.cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(CancelDelegate, this);
}
parent.trigger.Add(this);
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
public UniTask<bool> MoveNextAsync()
{
cancellationToken.ThrowIfCancellationRequested();
if (parent.isCompleted) return CompletedTasks.False;
completionSource.Reset();
return new UniTask<bool>(this, completionSource.Version);
}
static void OnCanceled(object state)
{
var self = (_Publish)state;
self.completionSource.TrySetCanceled(self.cancellationToken);
self.DisposeAsync().Forget();
}
public UniTask DisposeAsync()
{
if (!isDisposed)
{
isDisposed = true;
TaskTracker.RemoveTracking(this);
cancellationTokenRegistration.Dispose();
parent.trigger.Remove(this);
}
return default;
}
public void OnNext(TSource value)
{
Current = value;
completionSource.TrySetResult(true);
}
public void OnCanceled(CancellationToken cancellationToken)
{
completionSource.TrySetCanceled(cancellationToken);
}
public void OnCompleted()
{
completionSource.TrySetResult(false);
}
public void OnError(Exception ex)
{
completionSource.TrySetException(ex);
}
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 93c684d1e88c09d4e89b79437d97b810
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -7,8 +7,9 @@ namespace Cysharp.Threading.Tasks
public interface ITriggerHandler<T>
{
void OnNext(T value);
void OnCanceled(CancellationToken cancellationToken);
void OnError(Exception ex);
void OnCompleted();
void OnCanceled(CancellationToken cancellationToken);
}
// be careful to use, itself is struct.
@@ -207,6 +208,67 @@ namespace Cysharp.Threading.Tasks
}
}
public void SetError(Exception exception)
{
isRunning = true;
if (singleHandler != null)
{
try
{
singleHandler.OnError(exception);
}
catch (Exception ex)
{
#if UNITY_2018_3_OR_NEWER
UnityEngine.Debug.LogException(ex);
#else
Console.WriteLine(ex);
#endif
}
}
if (handlers != null)
{
for (int i = 0; i < handlers.Length; i++)
{
if (handlers[i] != null)
{
try
{
handlers[i].OnError(exception);
}
catch (Exception ex)
{
handlers[i] = null;
#if UNITY_2018_3_OR_NEWER
UnityEngine.Debug.LogException(ex);
#else
Console.WriteLine(ex);
#endif
}
}
}
}
isRunning = false;
if (waitHandler != null)
{
var h = waitHandler;
waitHandler = null;
Add(h);
}
if (waitQueue != null)
{
while (waitQueue.Count != 0)
{
Add(waitQueue.Dequeue());
}
}
}
public void Add(ITriggerHandler<T> handler)
{
if (isRunning)

View File

@@ -89,6 +89,11 @@ namespace Cysharp.Threading.Tasks.Triggers
completionSource.TrySetResult(false);
}
public void OnError(Exception ex)
{
completionSource.TrySetException(ex);
}
static void CancellationCallback(object state)
{
var self = (AsyncTriggerEnumerator)state;
@@ -273,6 +278,11 @@ namespace Cysharp.Threading.Tasks.Triggers
core.TrySetCanceled(CancellationToken.None);
}
void ITriggerHandler<T>.OnError(Exception ex)
{
core.TrySetException(ex);
}
void IUniTaskSource.GetResult(short token)
{
((IUniTaskSource<T>)this).GetResult(token);

View File

@@ -1,7 +1,7 @@
{
"name": "com.cysharp.unitask",
"displayName": "UniTask",
"version": "2.0.6-rc3",
"version": "2.0.7-rc4",
"unity": "2018.3",
"description": "Provides an efficient async/await integration to Unity.",
"keywords": ["async/await", "async", "Task", "UniTask"],

View File

@@ -38,6 +38,11 @@ public enum MyEnum
public static partial class UnityUIComponentExtensions
{
@@ -49,23 +54,15 @@ public static partial class UnityUIComponentExtensions
public class AsyncMessageBroker<T> : IDisposable
{
Channel<T> channel;
List<Func<T, UniTask>> asyncEvents;
IConnectableUniTaskAsyncEnumerable<T> multicastSource;
IDisposable connection;
public AsyncMessageBroker()
{
channel = Channel.CreateSingleConsumerUnbounded<T>();
asyncEvents = new List<Func<T, UniTask>>();
}
async UniTaskVoid PublishAll()
{
await channel.Reader.ReadAllAsync().ForEachAwaitAsync(async x =>
{
foreach (var item in asyncEvents)
{
await item.Invoke(x);
}
});
multicastSource = channel.Reader.ReadAllAsync().Publish();
connection = multicastSource.Connect();
}
public void Publish(T value)
@@ -73,33 +70,15 @@ public class AsyncMessageBroker<T> : IDisposable
channel.Writer.TryWrite(value);
}
public Subscription Subscribe(Func<T, UniTask> func)
public IUniTaskAsyncEnumerable<T> Subscribe()
{
asyncEvents.Add(func);
return new Subscription(this, func);
return multicastSource;
}
public void Dispose()
{
channel.Writer.TryComplete();
asyncEvents.Clear();
}
public readonly struct Subscription : IDisposable
{
readonly AsyncMessageBroker<T> broker;
readonly Func<T, UniTask> func;
public Subscription(AsyncMessageBroker<T> broker, Func<T, UniTask> func)
{
this.broker = broker;
this.func = func;
}
public void Dispose()
{
broker.asyncEvents.Remove(func);
}
connection.Dispose();
}
}
@@ -149,6 +128,7 @@ public class SandboxMain : MonoBehaviour
UnityEngine.Debug.Log("OK");
await scheduled; // .ConfigureAwait(PlayerLoopTiming.Update); // .WaitAsync(PlayerLoopTiming.Update);
@@ -205,9 +185,22 @@ public class SandboxMain : MonoBehaviour
//await channel.Reader.ReadAllAsync(this.GetCancellationTokenOnDestroy()).ForEachAsync(_ => { });
var rp = new AsyncReactiveProperty<int>(10);
var pubsub = new AsyncMessageBroker<int>();
pubsub.Subscribe().ForEachAsync(x => Debug.Log("A:" + x)).Forget();
pubsub.Subscribe().ForEachAsync(x => Debug.Log("B:" + x)).Forget();
int i = 0;
okButton.OnClickAsAsyncEnumerable().ForEachAsync(_ =>
{
Debug.Log("foo");
pubsub.Publish(i++);
}).Forget();
rp.Append(10).Select(x => x * 100).Take(30).Prepend(99).SkipLast(9).Where(x => x % 2 == 0).ForEachAsync(_ => { }).Forget();
}