WhenAll and WhenAny

This commit is contained in:
neuecc
2020-04-21 13:36:23 +09:00
parent 082f3e7335
commit 3654a9e2f9
16 changed files with 11143 additions and 3797 deletions

View File

@@ -47,9 +47,8 @@ namespace UniRx.Async
TResult result;
object error; // ExceptionDispatchInfo or OperationCanceledException
short version;
bool completed;
bool hasUnhandledError;
int completedCount; // 0: completed == false
Action<object> continuation;
object continuationState;
@@ -61,7 +60,7 @@ namespace UniRx.Async
{
version += 1; // incr version.
}
completed = false;
completedCount = 0;
result = default;
error = null;
hasUnhandledError = false;
@@ -92,25 +91,59 @@ namespace UniRx.Async
/// <summary>Completes with a successful result.</summary>
/// <param name="result">The result.</param>
public void SetResult(TResult result)
public bool TrySetResult(TResult result)
{
this.result = result;
SignalCompletion();
if (Interlocked.Increment(ref completedCount) == 1)
{
// setup result
this.result = result;
if (continuation != null || Interlocked.CompareExchange(ref this.continuation, UniTaskCompletionSourceCoreShared.s_sentinel, null) != null)
{
continuation(continuationState);
return true;
}
}
return false;
}
/// <summary>Completes with an error.</summary>
/// <param name="error">The exception.</param>
public void SetException(Exception error)
public bool TrySetException(Exception error)
{
this.hasUnhandledError = true;
this.error = ExceptionDispatchInfo.Capture(error);
SignalCompletion();
if (Interlocked.Increment(ref completedCount) == 1)
{
// setup result
this.hasUnhandledError = true;
this.error = ExceptionDispatchInfo.Capture(error);
if (continuation != null || Interlocked.CompareExchange(ref this.continuation, UniTaskCompletionSourceCoreShared.s_sentinel, null) != null)
{
continuation(continuationState);
return true;
}
}
return false;
}
public void SetCanceled(CancellationToken cancellationToken = default)
public bool TrySetCanceled(CancellationToken cancellationToken = default)
{
this.error = new OperationCanceledException(cancellationToken);
SignalCompletion();
if (Interlocked.Increment(ref completedCount) == 1)
{
// setup result
this.hasUnhandledError = true;
this.error = new OperationCanceledException(cancellationToken);
if (continuation != null || Interlocked.CompareExchange(ref this.continuation, UniTaskCompletionSourceCoreShared.s_sentinel, null) != null)
{
continuation(continuationState);
return true;
}
}
return false;
}
/// <summary>Gets the operation version.</summary>
@@ -122,7 +155,7 @@ namespace UniRx.Async
public UniTaskStatus GetStatus(short token)
{
ValidateToken(token);
return (continuation == null || !completed) ? UniTaskStatus.Pending
return (continuation == null || (completedCount == 0)) ? UniTaskStatus.Pending
: (error == null) ? UniTaskStatus.Succeeded
: (error is OperationCanceledException) ? UniTaskStatus.Canceled
: UniTaskStatus.Faulted;
@@ -132,7 +165,7 @@ namespace UniRx.Async
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public UniTaskStatus UnsafeGetStatus()
{
return (continuation == null || !completed) ? UniTaskStatus.Pending
return (continuation == null || (completedCount == 0)) ? UniTaskStatus.Pending
: (error == null) ? UniTaskStatus.Succeeded
: (error is OperationCanceledException) ? UniTaskStatus.Canceled
: UniTaskStatus.Faulted;
@@ -145,7 +178,7 @@ namespace UniRx.Async
public TResult GetResult(short token)
{
ValidateToken(token);
if (!completed)
if (!(completedCount == 0))
{
throw new InvalidOperationException("not yet completed.");
}
@@ -183,6 +216,15 @@ namespace UniRx.Async
/* no use ValueTaskSourceOnCOmpletedFlags, always no capture ExecutionContext and SynchronizationContext. */
/*
PatternA: GetStatus=Pending => OnCompleted => TrySet*** => GetResult
PatternB: TrySet*** => GetStatus=!Pending => GetResult
PatternC: GetStatus=Pending => TrySet/OnCompleted(race condition) => GetResult
C.1: win OnCompleted -> TrySet invoke saved continuation
C.2: win TrySet -> should invoke continuation here.
*/
// not set continuation yet.
object oldContinuation = this.continuation;
if (oldContinuation == null)
{
@@ -192,10 +234,11 @@ namespace UniRx.Async
if (oldContinuation != null)
{
// Operation already completed, so we need to queue the supplied callback.
// already running continuation in TrySet.
// It will cause call OnCompleted multiple time, invalid.
if (!ReferenceEquals(oldContinuation, UniTaskCompletionSourceCoreShared.s_sentinel))
{
throw new InvalidOperationException("already completed.");
throw new InvalidOperationException();
}
continuation(state);
@@ -210,23 +253,6 @@ namespace UniRx.Async
throw new InvalidOperationException("token version is not matched.");
}
}
/// <summary>Signals that the operation has completed. Invoked after the result or error has been set.</summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void SignalCompletion()
{
if (completed)
{
throw new InvalidOperationException();
}
completed = true;
if (continuation != null || Interlocked.CompareExchange(ref this.continuation, UniTaskCompletionSourceCoreShared.s_sentinel, null) != null)
{
continuation(continuationState);
}
}
}
internal static class UniTaskCompletionSourceCoreShared // separated out of generic to avoid unnecessary duplication
@@ -277,17 +303,17 @@ namespace UniRx.Async
public void SetResult()
{
core.SetResult(AsyncUnit.Default);
core.TrySetResult(AsyncUnit.Default);
}
public void SetCanceled(CancellationToken cancellationToken = default)
{
core.SetCanceled(cancellationToken);
core.TrySetCanceled(cancellationToken);
}
public void SetException(Exception exception)
{
core.SetException(exception);
core.TrySetException(exception);
}
public void GetResult(short token)
@@ -369,17 +395,17 @@ namespace UniRx.Async
public void SetResult()
{
core.SetResult(AsyncUnit.Default);
core.TrySetResult(AsyncUnit.Default);
}
public void SetCanceled(CancellationToken cancellationToken = default)
{
core.SetCanceled(cancellationToken);
core.TrySetCanceled(cancellationToken);
}
public void SetException(Exception exception)
{
core.SetException(exception);
core.TrySetException(exception);
}
public void GetResult(short token)
@@ -463,17 +489,17 @@ namespace UniRx.Async
public void SetResult(T result)
{
core.SetResult(result);
core.TrySetResult(result);
}
public void SetCanceled(CancellationToken cancellationToken = default)
{
core.SetCanceled(cancellationToken);
core.TrySetCanceled(cancellationToken);
}
public void SetException(Exception exception)
{
core.SetException(exception);
core.TrySetException(exception);
}
public T GetResult(short token)
@@ -560,17 +586,17 @@ namespace UniRx.Async
public void SetResult(T result)
{
core.SetResult(result);
core.TrySetResult(result);
}
public void SetCanceled(CancellationToken cancellationToken = default)
{
core.SetCanceled(cancellationToken);
core.TrySetCanceled(cancellationToken);
}
public void SetException(Exception exception)
{
core.SetException(exception);
core.TrySetException(exception);
}
public T GetResult(short token)
@@ -616,7 +642,6 @@ namespace UniRx.Async
if (pool.TryReturn(this))
{
GC.ReRegisterForFinalize(this);
return;
}
}
}