Compare commits

..

36 Commits

Author SHA1 Message Date
neuecc
21dc83c641 2.0.8-rc5 2020-05-21 02:26:55 +09:00
neuecc
3b593f349c Add UnityEvent<T> and InputField AsyncEventHandler and extensions 2020-05-21 02:26:09 +09:00
neuecc
962c215e3b Fix UniTask.WaitUntilValueChanged does not handle UnityEngine.Object is destroyed correctly 2020-05-21 02:24:12 +09:00
neuecc
42dcfdbcdc Add UniTaskAsyncEnumerable.EveryValueChanged 2020-05-21 02:23:40 +09:00
neuecc
6d7e6ec871 Impl AsyncReactiveProperty.ToString, Add State 2020-05-21 02:22:24 +09:00
neuecc
36d53a3bcb rc4 2020-05-20 11:05:58 +09:00
neuecc
ea9e61c2e1 Add CancellationToken.WaitUntilCanceled 2020-05-20 11:04:59 +09:00
neuecc
a52c26102b guard for ForEachAsync 2020-05-20 10:48:28 +09:00
neuecc
e31c87b8a8 Add IUniTaskAsyncEnumerable.Publish 2020-05-19 15:58:04 +09:00
neuecc
cc165a6897 2.0.6-rc3 2020-05-19 04:14:23 +09:00
neuecc
f99910d802 Add TaskTracker to AsyncLINQ 2020-05-19 04:13:46 +09:00
neuecc
997b0b3710 Merge remote-tracking branch 'origin/unitask2' into unitask2 2020-05-19 03:43:09 +09:00
neuecc
ec7064083a Add TaskTracker to Channel 2020-05-19 03:43:06 +09:00
neuecc
07cccfddd6 docs: update TOC 2020-05-18 18:34:05 +00:00
neuecc
f07527cd06 ECS? 2020-05-19 03:33:44 +09:00
neuecc
7b273c4bd1 Add UniTask.Defer 2020-05-19 03:10:37 +09:00
neuecc
d36e7987b3 Add SkipUntilCanceled, TakeUntilCanceled 2020-05-19 02:41:45 +09:00
neuecc
bbd5686816 Add UniTask.WaitUntilCanceled 2020-05-19 01:35:16 +09:00
neuecc
fb1152d8f4 IAsyncReadOnlyReactiveProperty -> IReadOnlyAsyncReactiveProperty, .Dipose retrurns MoveNext -> false 2020-05-19 01:20:20 +09:00
neuecc
7a306118f5 AsyncTrigger returns MoveNext -> false when destroyed 2020-05-19 01:19:46 +09:00
neuecc
efaf3ee8f5 IAsyncReadOnlyReactiveProperty.WithoutCurrent 2020-05-18 23:36:26 +09:00
neuecc
2e4fe90956 Fix ChannelReader.Completion throws UnobservedException when not touched 2020-05-18 23:33:13 +09:00
neuecc
e33d572104 2.0.5-rc2 2020-05-18 11:33:24 +09:00
neuecc
2b2af9e455 meta 2020-05-18 11:31:23 +09:00
neuecc
d003597662 Changed AsyncReactiveProperty produce current value at first, Add AsyncReactiveProperty.WithoutCurrent 2020-05-18 11:30:49 +09:00
neuecc
ec0a8f5a8b Add IUniTaskAsyncEnumerable.Queue 2020-05-18 11:30:04 +09:00
neuecc
49ba57f20a Fix IUniTaskAsyncEnumerable.Take 2020-05-18 11:29:35 +09:00
neuecc
6f4d1183cc Add BindTo<TSource, TObject>(Action<TObject, TSource> bindAction) 2020-05-18 02:35:13 +09:00
neuecc
dd18c9fff8 Add Channel.CreateSingleConsumerUnbounded 2020-05-18 02:34:29 +09:00
neuecc
21f5f78ff1 Merge remote-tracking branch 'origin/unitask2' into unitask2 2020-05-17 16:51:20 +09:00
neuecc
1729f389db fix ReactiveProperty implements IAsyncReactiveProperty 2020-05-17 16:51:10 +09:00
neuecc
6d37bb7bac docs: update TOC 2020-05-17 07:50:05 +00:00
neuecc
957adfad7a fix Await UniTaskAsyncEnumerable.Timer is not over. #76 2020-05-17 16:49:44 +09:00
neuecc
3ef889e17d no artifact name? 2020-05-17 03:02:55 +09:00
neuecc
c73af7390f removed manifestjson 2020-05-17 02:51:51 +09:00
neuecc
c5b4376486 gh-actions 2020-05-17 02:43:54 +09:00
58 changed files with 3557 additions and 435 deletions

View File

@@ -1,114 +0,0 @@
version: 2.1
executors:
unity:
# https://hub.docker.com/r/gableroux/unity3d/tags
parameters:
version: {type: string}
docker:
- image: gableroux/unity3d:<< parameters.version >>
go:
docker:
- image: circleci/golang
commands:
unity_activate:
parameters:
unity_version: {type: string}
unity_license: {type: string}
steps:
# get activation file, if fail to activate unity, use this key and activate from https://license.unity3d.com/manual
- run: apt update && apt install libunwind8 -y
- run: /opt/Unity/Editor/Unity -quit -batchmode -nographics -logFile -createManualActivationFile || exit 0
- run: cat Unity_v<< parameters.unity_version >>.alf
# get from UNITY_LICENSE envvar(base64 encoded(cat foo.ulf | base64 )), this file is generated from above manual activation
- run: echo << parameters.unity_license >> | base64 -di >> .circleci/Unity.ulf
- run: /opt/Unity/Editor/Unity -quit -batchmode -nographics -silent-crashes -logFile -manualLicenseFile .circleci/Unity.ulf || exit 0
jobs:
build-and-test:
parameters:
unity_version: {type: string}
unity_license: {type: string}
executor:
name: unity
version: << parameters.unity_version >>
steps:
- checkout
- unity_activate:
unity_version: << parameters.unity_version >>
unity_license: << parameters.unity_license >>
- run:
name: Build Linux(Mono)
command: /opt/Unity/Editor/Unity -quit -batchmode -nographics -silent-crashes -logFile -projectPath . -executeMethod UnitTestBuilder.BuildUnitTest /headless /ScriptBackend Mono2x /BuildTarget StandaloneLinux64
working_directory: .
# TODO:check unity version and packages...
# - run: ./bin/UnitTest/StandaloneLinux64_Mono2x/test
build-and-create-package:
parameters:
unity_version: {type: string}
unity_license: {type: string}
executor:
name: unity
version: << parameters.unity_version >>
steps:
- checkout
- unity_activate:
unity_version: << parameters.unity_version >>
unity_license: << parameters.unity_license >>
- run:
name: Export unitypackage
command: /opt/Unity/Editor/Unity -quit -batchmode -nographics -silent-crashes -logFile -projectPath . -executeMethod PackageExporter.Export
working_directory: .
- store_artifacts:
path: ./UniRx.Async.unitypackage
destination: /UniRx.Async.unitypackage
# upload to github by ghr
upload-github:
executor: go
steps:
- attach_workspace:
at: .
- run: go get github.com/tcnksm/ghr
- run: ghr -t ${GITHUB_TOKEN} -u ${CIRCLE_PROJECT_USERNAME} -r ${CIRCLE_PROJECT_REPONAME} ${CIRCLE_TAG} .
- store_artifacts:
path: UniRx.Async.unitypackage
destination: UniRx.Async.unitypackage
workflows:
version: 2
build-unity:
jobs:
# does not exists yet.
# - build-and-test:
# unity_version: 2019.3.0a2
# unity_license: ${UNITY_LICENSE_2019_3}
# - build-and-test:
# unity_version: 2019.2.0b2
# unity_license: ${UNITY_LICENSE_2019_2}
- build-and-test:
unity_version: 2019.1.2f1
unity_license: ${UNITY_LICENSE_2019_1}
filters:
tags:
only: /.*/
# test asmdef will not found.
# - build-and-test:
# unity_version: 2018.4.0f1
# unity_license: ${UNITY_LICENSE_2018_4}
# # UniTask minimum support version is 2018.3(C# 7.x)
# - build-and-test:
# unity_version: 2018.3.12f1
# unity_license: ${UNITY_LICENSE_2018_3}
- build-and-create-package:
unity_version: 2019.1.2f1
unity_license: ${UNITY_LICENSE_2019_1}
filters:
tags:
only: /^\d\.\d\.\d.*/
branches:
ignore: /.*/
- upload-github:
requires:
- build-and-create-package
filters:
tags:
only: /^\d\.\d\.\d.*/
branches:
ignore: /.*/

73
.github/workflows/build-debug.yml vendored Normal file
View File

@@ -0,0 +1,73 @@
name: Build-Debug
on:
push:
branches:
- "**"
tags:
- "!*" # not a tag push
pull_request:
types:
- opened
- synchronize
jobs:
build-dotnet:
runs-on: ubuntu-latest
env:
DOTNET_CLI_TELEMETRY_OPTOUT: 1
DOTNET_SKIP_FIRST_TIME_EXPERIENCE: 1
NUGET_XMLDOC_MODE: skip
steps:
- uses: actions/checkout@v2
- uses: actions/setup-dotnet@v1
with:
dotnet-version: 3.1.101
- run: dotnet test -c Debug ./src/UniTask.NetCoreTests/UniTask.NetCoreTests.csproj
build-unity:
strategy:
matrix:
unity: ['2019.3.9f1', '2020.1.0b5']
include:
- unity: 2019.3.9f1
license: UNITY_2019_3
- unity: 2020.1.0b5
license: UNITY_2020_1
runs-on: ubuntu-latest
container:
# with linux-il2cpp. image from https://hub.docker.com/r/gableroux/unity3d/tags
image: gableroux/unity3d:${{ matrix.unity }}-linux-il2cpp
steps:
- run: apt update && apt install git -y
- uses: actions/checkout@v2
# create unity activation file and store to artifacts.
- run: /opt/Unity/Editor/Unity -quit -batchmode -nographics -logFile -createManualActivationFile || exit 0
- uses: actions/upload-artifact@v1
with:
name: Unity_v${{ matrix.unity }}.alf
path: ./Unity_v${{ matrix.unity }}.alf
# activate Unity from manual license file(ulf)
- run: echo -n "$UNITY_LICENSE" >> .Unity.ulf
env:
UNITY_LICENSE: ${{ secrets[matrix.license] }}
- name: Activate Unity, always returns a success. But if a subsequent run fails, the activation may have failed(if succeeded, shows `Next license update check is after` and not shows other message(like GUID != GUID). If fails not). In that case, upload the artifact's .alf file to https://license.unity3d.com/manual to get the .ulf file and set it to secrets.
run: /opt/Unity/Editor/Unity -quit -batchmode -nographics -silent-crashes -logFile -manualLicenseFile .Unity.ulf || exit 0
# Execute scripts: RuntimeUnitTestToolkit
- name: Build UnitTest(Linux64, mono)
run: /opt/Unity/Editor/Unity -quit -batchmode -nographics -silent-crashes -logFile -projectPath . -executeMethod UnitTestBuilder.BuildUnitTest /headless /ScriptBackend mono /BuildTarget StandaloneLinux64
working-directory: src/UniTask
- name: Execute UnitTest
run: ./src/UniTask/bin/UnitTest/StandaloneLinux64_Mono2x/test
# Execute scripts: Export Package
- name: Export unitypackage
run: /opt/Unity/Editor/Unity -quit -batchmode -nographics -silent-crashes -logFile -projectPath . -executeMethod PackageExporter.Export
working-directory: src/UniTask
# Store artifacts.
- uses: actions/upload-artifact@v2
with:
name: UniTask.unitypackage.zip
path: ./src/UniTask/*.unitypackage

108
.github/workflows/build-release.yml vendored Normal file
View File

@@ -0,0 +1,108 @@
name: Build-Release
on:
push:
tags:
- "[0-9]+.[0-9]+.[0-9]+*"
jobs:
build-dotnet:
runs-on: ubuntu-latest
env:
DOTNET_CLI_TELEMETRY_OPTOUT: 1
DOTNET_SKIP_FIRST_TIME_EXPERIENCE: 1
NUGET_XMLDOC_MODE: skip
steps:
- uses: actions/checkout@v2
- uses: actions/setup-dotnet@v1
with:
dotnet-version: 3.1.101
# set release tag(*.*.*) to env.GIT_TAG
- run: echo ::set-env name=GIT_TAG::${GITHUB_REF#refs/tags/}
# build CommandTools first (use dotnet run command in ZLogger.csproj)
- run: dotnet build -c Release ./tools/CommandTools/CommandTools.csproj
- run: dotnet build -c Release -p:Version=${{ env.GIT_TAG }}
- run: dotnet test -c Release --no-build
- run: dotnet pack ./src/ZLogger/ZLogger.csproj -c Release --no-build -p:Version=${{ env.GIT_TAG }}
# Store artifacts.
- uses: actions/upload-artifact@v1
with:
name: nuget
path: ./src/ZLogger/bin/Release/ZLogger.${{ env.GIT_TAG }}.nupkg
build-unity:
strategy:
matrix:
unity: ['2019.3.9f1']
include:
- unity: 2019.3.9f1
license: UNITY_2019_3
runs-on: ubuntu-latest
container:
# with linux-il2cpp. image from https://hub.docker.com/r/gableroux/unity3d/tags
image: gableroux/unity3d:${{ matrix.unity }}-linux-il2cpp
steps:
- run: apt update && apt install git -y
- uses: actions/checkout@v2
- run: echo -n "$UNITY_LICENSE" >> .Unity.ulf
env:
UNITY_LICENSE: ${{ secrets[matrix.license] }}
- run: /opt/Unity/Editor/Unity -quit -batchmode -nographics -silent-crashes -logFile -manualLicenseFile .Unity.ulf || exit 0
# set release tag(*.*.*) to env.GIT_TAG
- run: echo ::set-env name=GIT_TAG::${GITHUB_REF#refs/tags/}
# Execute scripts: Export Package
- name: Export unitypackage
run: /opt/Unity/Editor/Unity -quit -batchmode -nographics -silent-crashes -logFile -projectPath . -executeMethod PackageExporter.Export
working-directory: src/ZLogger.Unity
env:
UNITY_PACKAGE_VERSION: ${{ env.GIT_TAG }}
# Store artifacts.
- uses: actions/upload-artifact@v1
with:
name: ZLogger.Unity.${{ env.GIT_TAG }}.unitypackage
path: ./src/ZLogger.Unity/ZLogger.Unity.${{ env.GIT_TAG }}.unitypackage
create-release:
needs: [build-dotnet, build-unity]
runs-on: ubuntu-latest
env:
DOTNET_CLI_TELEMETRY_OPTOUT: 1
DOTNET_SKIP_FIRST_TIME_EXPERIENCE: 1
NUGET_XMLDOC_MODE: skip
steps:
# setup dotnet for nuget push
- uses: actions/setup-dotnet@v1
with:
dotnet-version: 3.1.101
# set release tag(*.*.*) to env.GIT_TAG
- run: echo ::set-env name=GIT_TAG::${GITHUB_REF#refs/tags/}
# Create Releases
- uses: actions/create-release@v1
id: create_release
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
tag_name: ${{ github.ref }}
release_name: Ver.${{ github.ref }}
# Download (All) Artifacts to current directory
- uses: actions/download-artifact@v2-preview
# Upload to NuGet
- run: dotnet nuget push "./nuget/*.nupkg" -s https://www.nuget.org/api/v2/package -k ${{ secrets.NUGET_KEY }}
# Upload to Releases(unitypackage)
- uses: actions/upload-release-asset@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
upload_url: ${{ steps.create_release.outputs.upload_url }}
asset_path: ./ZLogger.Unity.${{ env.GIT_TAG }}.unitypackage/ZLogger.Unity.${{ env.GIT_TAG }}.unitypackage
asset_name: ZLogger.Unity.${{ env.GIT_TAG }}.unitypackage
asset_content_type: application/octet-stream

15
.github/workflows/toc.yml vendored Normal file
View File

@@ -0,0 +1,15 @@
name: TOC Generator
on:
push:
paths:
- 'README.md'
jobs:
generateTOC:
name: TOC Generator
runs-on: ubuntu-latest
steps:
- uses: technote-space/toc-generator@v2.4.0
with:
TOC_TITLE: "## Table of Contents"

View File

@@ -1,10 +1,29 @@
# UniTask
[![CircleCI](https://circleci.com/gh/Cysharp/UniTask.svg?style=svg)](https://circleci.com/gh/Cysharp/UniTask)
UniTask
===
[![GitHub Actions](https://github.com/Cysharp/UniTask/workflows/Build-Debug/badge.svg)](https://github.com/Cysharp/UniTask/actions) [![Releases](https://img.shields.io/github/release/Cysharp/UniTask.svg)](https://github.com/Cysharp/UniTask/releases)
Provides an efficient async/await integration to Unity.
> UniTask was included in UniRx before v7 but now completely separated, it no dependent each other.
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
## Table of Contents
- [Getting started](#getting-started)
- [`UniTask<T>`](#unitaskt)
- [Cancellation and Exception handling](#cancellation-and-exception-handling)
- [Progress](#progress)
- [UniTaskTracker](#unitasktracker)
- [Reusable Promises](#reusable-promises)
- [awaitable Events](#awaitable-events)
- [async void vs async UniTask/UniTaskVoid](#async-void-vs-async-unitaskunitaskvoid)
- [For Unit Testing](#for-unit-testing)
- [Method List](#method-list)
- [UPM Package](#upm-package)
- [ECS, PlayerLoop](#ecs-playerloop)
- [License](#license)
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
Getting started
---
@@ -354,6 +373,15 @@ or add `"com.cysharp.unitask": "https://github.com/Cysharp/UniTask.git?path=Asse
If you want to set a target version, UniTask is using `*.*.*` release tag so you can specify a version like `#1.3.0`. For example `https://github.com/Cysharp/UniTask.git?path=Assets/UniRx.Async#1.3.1`.
ECS, PlayerLoop
---
TODO:
```csharp
var loop = PlayerLoop.GetCurrentPlayerLoop();
PlayerLoopHelper.Initialize(ref loop);
```
License
---
This library is under the MIT License.

View File

@@ -86,30 +86,16 @@ namespace NetCoreSandbox
await Task.Delay(10, cancellationToken);
}
static async Task Main(string[] args)
static void Main(string[] args)
{
await foreach (var item in UniTaskAsyncEnumerable.Range(1, 10)
.SelectAwait(x => UniTask.Run(() => x))
.TakeLast(6)
)
{
Console.WriteLine(item);
}
// AsyncEnumerable.Range(1,10).FirstAsync(
// AsyncEnumerable.Range(1, 10).GroupBy(x=>x).Select(x=>x.first
var channel = Channel.CreateSingleConsumerUnbounded<int>();
// AsyncEnumerable.Range(1,10).WithCancellation(CancellationToken.None).WithCancellation
//Enumerable.Range(1,10).ToHashSet(
}

View File

@@ -0,0 +1,118 @@
using Cysharp.Threading.Tasks;
using FluentAssertions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using Cysharp.Threading.Tasks.Linq;
using System.Threading.Tasks;
using Xunit;
namespace NetCoreTests
{
public class AsyncReactivePropertyTest
{
[Fact]
public async Task Iteration()
{
var rp = new AsyncReactiveProperty<int>(99);
var f = await rp.FirstAsync();
f.Should().Be(99);
var array = rp.Take(5).ToArrayAsync();
rp.Value = 100;
rp.Value = 100;
rp.Value = 100;
rp.Value = 131;
var ar = await array;
ar.Should().BeEquivalentTo(new[] { 99, 100, 100, 100, 131 });
}
[Fact]
public async Task WithoutCurrent()
{
var rp = new AsyncReactiveProperty<int>(99);
var array = rp.WithoutCurrent().Take(5).ToArrayAsync();
rp.Value = 100;
rp.Value = 100;
rp.Value = 100;
rp.Value = 131;
rp.Value = 191;
var ar = await array;
ar.Should().BeEquivalentTo(new[] { 100, 100, 100, 131, 191 });
}
[Fact]
public async Task StateIteration()
{
var rp = new State<int>(99);
var setter = rp.GetSetter();
var f = await rp.FirstAsync();
f.Should().Be(99);
var array = rp.Take(5).ToArrayAsync();
setter(100);
setter(100);
setter(100);
setter(131);
var ar = await array;
ar.Should().BeEquivalentTo(new[] { 99, 100, 100, 100, 131 });
}
[Fact]
public async Task StateWithoutCurrent()
{
var rp = new State<int>(99);
var setter = rp.GetSetter();
var array = rp.WithoutCurrent().Take(5).ToArrayAsync();
setter(100);
setter(100);
setter(100);
setter(131);
setter(191);
var ar = await array;
ar.Should().BeEquivalentTo(new[] { 100, 100, 100, 131, 191 });
}
[Fact]
public void StateFromEnumeration()
{
var rp = new AsyncReactiveProperty<int>(10);
var state = rp.ToState(CancellationToken.None);
rp.Value = 10;
state.Value.Should().Be(10);
rp.Value = 20;
state.Value.Should().Be(20);
state.Dispose();
rp.Value = 30;
state.Value.Should().Be(20);
}
}
}

View File

@@ -0,0 +1,51 @@
using Cysharp.Threading.Tasks;
using FluentAssertions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using Cysharp.Threading.Tasks.Linq;
using System.Threading.Tasks;
using Xunit;
namespace NetCoreTests
{
public class CancellationTokenTest
{
[Fact]
public async Task WaitUntilCanceled()
{
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(1.5));
var now = DateTime.UtcNow;
await cts.Token.WaitUntilCanceled();
var elapsed = DateTime.UtcNow - now;
elapsed.Should().BeGreaterThan(TimeSpan.FromSeconds(1));
}
[Fact]
public void AlreadyCanceled()
{
var cts = new CancellationTokenSource();
cts.Cancel();
cts.Token.WaitUntilCanceled().GetAwaiter().IsCompleted.Should().BeTrue();
}
[Fact]
public void None()
{
CancellationToken.None.WaitUntilCanceled().GetAwaiter().IsCompleted.Should().BeTrue();
}
}
}

View File

@@ -0,0 +1,370 @@
using Cysharp.Threading.Tasks;
using FluentAssertions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using Cysharp.Threading.Tasks.Linq;
using System.Threading.Tasks;
using Xunit;
namespace NetCoreTests
{
public class ChannelTest
{
(System.Threading.Channels.Channel<int>, Cysharp.Threading.Tasks.Channel<int>) CreateChannel()
{
var reference = System.Threading.Channels.Channel.CreateUnbounded<int>(new UnboundedChannelOptions
{
AllowSynchronousContinuations = true,
SingleReader = true,
SingleWriter = false
});
var channel = Cysharp.Threading.Tasks.Channel.CreateSingleConsumerUnbounded<int>();
return (reference, channel);
}
[Fact]
public async Task SingleWriteSingleRead()
{
var (reference, channel) = CreateChannel();
foreach (var item in new[] { 10, 20, 30 })
{
var t1 = reference.Reader.WaitToReadAsync();
var t2 = channel.Reader.WaitToReadAsync();
t1.IsCompleted.Should().BeFalse();
t2.Status.IsCompleted().Should().BeFalse();
reference.Writer.TryWrite(item);
channel.Writer.TryWrite(item);
(await t1).Should().BeTrue();
(await t2).Should().BeTrue();
reference.Reader.TryRead(out var refitem).Should().BeTrue();
channel.Reader.TryRead(out var chanitem).Should().BeTrue();
refitem.Should().Be(item);
chanitem.Should().Be(item);
}
}
[Fact]
public async Task MultiWrite()
{
var (reference, channel) = CreateChannel();
foreach (var item in new[] { 10, 20, 30 })
{
var t1 = reference.Reader.WaitToReadAsync();
var t2 = channel.Reader.WaitToReadAsync();
t1.IsCompleted.Should().BeFalse();
t2.Status.IsCompleted().Should().BeFalse();
foreach (var i in Enumerable.Range(1, 3))
{
reference.Writer.TryWrite(item * i);
channel.Writer.TryWrite(item * i);
}
(await t1).Should().BeTrue();
(await t2).Should().BeTrue();
foreach (var i in Enumerable.Range(1, 3))
{
(await reference.Reader.WaitToReadAsync()).Should().BeTrue();
(await channel.Reader.WaitToReadAsync()).Should().BeTrue();
reference.Reader.TryRead(out var refitem).Should().BeTrue();
channel.Reader.TryRead(out var chanitem).Should().BeTrue();
refitem.Should().Be(item * i);
chanitem.Should().Be(item * i);
}
}
}
[Fact]
public async Task CompleteOnEmpty()
{
var (reference, channel) = CreateChannel();
foreach (var item in new[] { 10, 20, 30 })
{
reference.Writer.TryWrite(item);
channel.Writer.TryWrite(item);
reference.Reader.TryRead(out var refitem);
channel.Reader.TryRead(out var chanitem);
}
// Empty.
var completion1 = reference.Reader.Completion;
var wait1 = reference.Reader.WaitToReadAsync();
var completion2 = channel.Reader.Completion;
var wait2 = channel.Reader.WaitToReadAsync();
reference.Writer.TryComplete();
channel.Writer.TryComplete();
completion1.Status.Should().Be(TaskStatus.RanToCompletion);
completion2.Status.Should().Be(UniTaskStatus.Succeeded);
(await wait1).Should().BeFalse();
(await wait2).Should().BeFalse();
}
[Fact]
public async Task CompleteErrorOnEmpty()
{
var (reference, channel) = CreateChannel();
foreach (var item in new[] { 10, 20, 30 })
{
reference.Writer.TryWrite(item);
channel.Writer.TryWrite(item);
reference.Reader.TryRead(out var refitem);
channel.Reader.TryRead(out var chanitem);
}
// Empty.
var completion1 = reference.Reader.Completion;
var wait1 = reference.Reader.WaitToReadAsync();
var completion2 = channel.Reader.Completion;
var wait2 = channel.Reader.WaitToReadAsync();
var ex = new Exception();
reference.Writer.TryComplete(ex);
channel.Writer.TryComplete(ex);
completion1.Status.Should().Be(TaskStatus.Faulted);
completion2.Status.Should().Be(UniTaskStatus.Faulted);
(await Assert.ThrowsAsync<Exception>(async () => await wait1)).Should().Be(ex);
(await Assert.ThrowsAsync<Exception>(async () => await wait2)).Should().Be(ex);
}
[Fact]
public async Task CompleteWithRest()
{
var (reference, channel) = CreateChannel();
foreach (var item in new[] { 10, 20, 30 })
{
reference.Writer.TryWrite(item);
channel.Writer.TryWrite(item);
}
// Three Item2.
var completion1 = reference.Reader.Completion;
var wait1 = reference.Reader.WaitToReadAsync();
var completion2 = channel.Reader.Completion;
var wait2 = channel.Reader.WaitToReadAsync();
reference.Writer.TryComplete();
channel.Writer.TryComplete();
// completion1.Status.Should().Be(TaskStatus.WaitingForActivation);
completion2.Status.Should().Be(UniTaskStatus.Pending);
(await wait1).Should().BeTrue();
(await wait2).Should().BeTrue();
foreach (var item in new[] { 10, 20, 30 })
{
reference.Reader.TryRead(out var i1).Should().BeTrue();
channel.Reader.TryRead(out var i2).Should().BeTrue();
i1.Should().Be(item);
i2.Should().Be(item);
}
(await reference.Reader.WaitToReadAsync()).Should().BeFalse();
(await channel.Reader.WaitToReadAsync()).Should().BeFalse();
completion1.Status.Should().Be(TaskStatus.RanToCompletion);
completion2.Status.Should().Be(UniTaskStatus.Succeeded);
}
[Fact]
public async Task CompleteErrorWithRest()
{
var (reference, channel) = CreateChannel();
foreach (var item in new[] { 10, 20, 30 })
{
reference.Writer.TryWrite(item);
channel.Writer.TryWrite(item);
}
// Three Item2.
var completion1 = reference.Reader.Completion;
var wait1 = reference.Reader.WaitToReadAsync();
var completion2 = channel.Reader.Completion;
var wait2 = channel.Reader.WaitToReadAsync();
var ex = new Exception();
reference.Writer.TryComplete(ex);
channel.Writer.TryComplete(ex);
// completion1.Status.Should().Be(TaskStatus.WaitingForActivation);
completion2.Status.Should().Be(UniTaskStatus.Pending);
(await wait1).Should().BeTrue();
(await wait2).Should().BeTrue();
foreach (var item in new[] { 10, 20, 30 })
{
reference.Reader.TryRead(out var i1).Should().BeTrue();
channel.Reader.TryRead(out var i2).Should().BeTrue();
i1.Should().Be(item);
i2.Should().Be(item);
}
wait1 = reference.Reader.WaitToReadAsync();
wait2 = channel.Reader.WaitToReadAsync();
(await Assert.ThrowsAsync<Exception>(async () => await wait1)).Should().Be(ex);
(await Assert.ThrowsAsync<Exception>(async () => await wait2)).Should().Be(ex);
completion1.Status.Should().Be(TaskStatus.Faulted);
completion2.Status.Should().Be(UniTaskStatus.Faulted);
}
[Fact]
public async Task Cancellation()
{
var (reference, channel) = CreateChannel();
var cts = new CancellationTokenSource();
var wait1 = reference.Reader.WaitToReadAsync(cts.Token);
var wait2 = channel.Reader.WaitToReadAsync(cts.Token);
cts.Cancel();
(await Assert.ThrowsAsync<OperationCanceledException>(async () => await wait1)).CancellationToken.Should().Be(cts.Token);
(await Assert.ThrowsAsync<OperationCanceledException>(async () => await wait2)).CancellationToken.Should().Be(cts.Token);
}
[Fact]
public async Task AsyncEnumerator()
{
var (reference, channel) = CreateChannel();
var ta1 = reference.Reader.ReadAllAsync().ToArrayAsync();
var ta2 = channel.Reader.ReadAllAsync().ToArrayAsync();
foreach (var item in new[] { 10, 20, 30 })
{
reference.Writer.TryWrite(item);
channel.Writer.TryWrite(item);
}
reference.Writer.TryComplete();
channel.Writer.TryComplete();
(await ta1).Should().BeEquivalentTo(new[] { 10, 20, 30 });
(await ta2).Should().BeEquivalentTo(new[] { 10, 20, 30 });
}
[Fact]
public async Task AsyncEnumeratorCancellation()
{
// Token1, Token2 and Cancel1
{
var cts1 = new CancellationTokenSource();
var cts2 = new CancellationTokenSource();
var (reference, channel) = CreateChannel();
var ta1 = reference.Reader.ReadAllAsync(cts1.Token).ToArrayAsync(cts2.Token);
var ta2 = channel.Reader.ReadAllAsync(cts1.Token).ToArrayAsync(cts2.Token);
foreach (var item in new[] { 10, 20, 30 })
{
reference.Writer.TryWrite(item);
channel.Writer.TryWrite(item);
}
cts1.Cancel();
await Assert.ThrowsAsync<OperationCanceledException>(async () => await ta1);
(await Assert.ThrowsAsync<OperationCanceledException>(async () => await ta2)).CancellationToken.Should().Be(cts1.Token);
}
// Token1, Token2 and Cancel2
{
var cts1 = new CancellationTokenSource();
var cts2 = new CancellationTokenSource();
var (reference, channel) = CreateChannel();
var ta1 = reference.Reader.ReadAllAsync(cts1.Token).ToArrayAsync(cts2.Token);
var ta2 = channel.Reader.ReadAllAsync(cts1.Token).ToArrayAsync(cts2.Token);
foreach (var item in new[] { 10, 20, 30 })
{
reference.Writer.TryWrite(item);
channel.Writer.TryWrite(item);
}
cts2.Cancel();
await Assert.ThrowsAsync<OperationCanceledException>(async () => await ta1);
(await Assert.ThrowsAsync<OperationCanceledException>(async () => await ta2)).CancellationToken.Should().Be(cts2.Token);
}
// Token1 and Cancel1
{
var cts1 = new CancellationTokenSource();
var (reference, channel) = CreateChannel();
var ta1 = reference.Reader.ReadAllAsync(cts1.Token).ToArrayAsync();
var ta2 = channel.Reader.ReadAllAsync(cts1.Token).ToArrayAsync();
foreach (var item in new[] { 10, 20, 30 })
{
reference.Writer.TryWrite(item);
channel.Writer.TryWrite(item);
}
cts1.Cancel();
await Assert.ThrowsAsync<OperationCanceledException>(async () => await ta1);
(await Assert.ThrowsAsync<OperationCanceledException>(async () => await ta2)).CancellationToken.Should().Be(cts1.Token);
}
// Token2 and Cancel2
{
var cts2 = new CancellationTokenSource();
var (reference, channel) = CreateChannel();
var ta1 = reference.Reader.ReadAllAsync().ToArrayAsync(cts2.Token);
var ta2 = channel.Reader.ReadAllAsync().ToArrayAsync(cts2.Token);
foreach (var item in new[] { 10, 20, 30 })
{
reference.Writer.TryWrite(item);
channel.Writer.TryWrite(item);
}
cts2.Cancel();
await Assert.ThrowsAsync<OperationCanceledException>(async () => await ta1);
(await Assert.ThrowsAsync<OperationCanceledException>(async () => await ta2)).CancellationToken.Should().Be(cts2.Token);
}
}
}
}

View File

@@ -0,0 +1,47 @@
using Cysharp.Threading.Tasks;
using FluentAssertions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using Cysharp.Threading.Tasks.Linq;
using System.Threading.Tasks;
using Xunit;
namespace NetCoreTests
{
public class DeferTest
{
[Fact]
public async Task D()
{
var created = false;
var v = UniTask.Defer(() => { created = true; return UniTask.Run(() => 10); });
created.Should().BeFalse();
var t = await v;
created.Should().BeTrue();
t.Should().Be(10);
}
[Fact]
public async Task D2()
{
var created = false;
var v = UniTask.Defer(() => { created = true; return UniTask.Run(() => 10).AsUniTask(); });
created.Should().BeFalse();
await v;
created.Should().BeTrue();
}
}
}

View File

@@ -0,0 +1,78 @@
using Cysharp.Threading.Tasks;
using Cysharp.Threading.Tasks.Linq;
using FluentAssertions;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace NetCoreTests.Linq
{
public class PublishTest
{
[Fact]
public async Task Normal()
{
var rp = new AsyncReactiveProperty<int>(1);
var multicast = rp.Publish();
var a = multicast.ToArrayAsync();
var b = multicast.Take(2).ToArrayAsync();
var disp = multicast.Connect();
rp.Value = 2;
(await b).Should().BeEquivalentTo(1, 2);
var c = multicast.ToArrayAsync();
rp.Value = 3;
rp.Value = 4;
rp.Value = 5;
rp.Dispose();
(await a).Should().BeEquivalentTo(1, 2, 3, 4, 5);
(await c).Should().BeEquivalentTo(3, 4, 5);
disp.Dispose();
}
[Fact]
public async Task Cancel()
{
var rp = new AsyncReactiveProperty<int>(1);
var multicast = rp.Publish();
var a = multicast.ToArrayAsync();
var b = multicast.Take(2).ToArrayAsync();
var disp = multicast.Connect();
rp.Value = 2;
(await b).Should().BeEquivalentTo(1, 2);
var c = multicast.ToArrayAsync();
rp.Value = 3;
disp.Dispose();
rp.Value = 4;
rp.Value = 5;
rp.Dispose();
await Assert.ThrowsAsync<OperationCanceledException>(async () => await a);
await Assert.ThrowsAsync<OperationCanceledException>(async () => await c);
}
}
}

View File

@@ -0,0 +1,29 @@
using Cysharp.Threading.Tasks;
using Cysharp.Threading.Tasks.Linq;
using FluentAssertions;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Xunit;
namespace NetCoreTests.Linq
{
public class QueueTest
{
[Fact]
public async Task Q()
{
var rp = new AsyncReactiveProperty<int>(100);
var l = new List<int>();
await rp.Take(10).Queue().ForEachAsync(x =>
{
rp.Value += 10;
l.Add(x);
});
l.Should().BeEquivalentTo(100, 110, 120, 130, 140, 150, 160, 170, 180, 190);
}
}
}

View File

@@ -0,0 +1,106 @@
using Cysharp.Threading.Tasks;
using Cysharp.Threading.Tasks.Linq;
using FluentAssertions;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace NetCoreTests.Linq
{
public class TakeInfinityTest
{
[Fact]
public async Task Take()
{
var rp = new AsyncReactiveProperty<int>(1);
var xs = rp.Take(5).ToArrayAsync();
rp.Value = 2;
rp.Value = 3;
rp.Value = 4;
rp.Value = 5;
(await xs).Should().BeEquivalentTo(1, 2, 3, 4, 5);
}
[Fact]
public async Task TakeWhile()
{
var rp = new AsyncReactiveProperty<int>(1);
var xs = rp.TakeWhile(x => x != 5).ToArrayAsync();
rp.Value = 2;
rp.Value = 3;
rp.Value = 4;
rp.Value = 5;
(await xs).Should().BeEquivalentTo(1, 2, 3, 4);
}
[Fact]
public async Task TakeUntil()
{
var cts = new CancellationTokenSource();
var rp = new AsyncReactiveProperty<int>(1);
var xs = rp.TakeUntilCanceled(cts.Token).ToArrayAsync();
var c = CancelAsync();
await c;
var foo = await xs;
foo.Should().BeEquivalentTo(new[] { 1, 10, 20 });
async Task CancelAsync()
{
rp.Value = 10;
await Task.Yield();
rp.Value = 20;
await Task.Yield();
cts.Cancel();
rp.Value = 30;
await Task.Yield();
rp.Value = 40;
}
}
[Fact]
public async Task SkipUntil()
{
var cts = new CancellationTokenSource();
var rp = new AsyncReactiveProperty<int>(1);
var xs = rp.SkipUntilCanceled(cts.Token).ToArrayAsync();
var c = CancelAsync();
await c;
var foo = await xs;
foo.Should().BeEquivalentTo(new[] { 30, 40 });
async Task CancelAsync()
{
rp.Value = 10;
await Task.Yield();
rp.Value = 20;
await Task.Yield();
cts.Cancel();
rp.Value = 30;
await Task.Yield();
rp.Value = 40;
rp.Dispose(); // complete.
}
}
}
}

View File

@@ -4,18 +4,19 @@ using System.Threading;
namespace Cysharp.Threading.Tasks
{
public interface IAsyncReadOnlyReactiveProperty<T> : IUniTaskAsyncEnumerable<T>
public interface IReadOnlyAsyncReactiveProperty<T> : IUniTaskAsyncEnumerable<T>
{
T Value { get; }
IUniTaskAsyncEnumerable<T> WithoutCurrent();
}
public interface IAsyncReactiveProperty<T> : IAsyncReadOnlyReactiveProperty<T>
public interface IAsyncReactiveProperty<T> : IReadOnlyAsyncReactiveProperty<T>
{
new T Value { get; set; }
}
[Serializable]
public class AsyncReactiveProperty<T> : IUniTaskAsyncEnumerable<T>, IDisposable
public class AsyncReactiveProperty<T> : IAsyncReactiveProperty<T>, IDisposable
{
TriggerEvent<T> triggerEvent;
@@ -43,17 +44,55 @@ namespace Cysharp.Threading.Tasks
this.triggerEvent = default;
}
public IUniTaskAsyncEnumerable<T> WithoutCurrent()
{
return new WithoutCurrentEnumerable(this);
}
public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
{
return new Enumerator(this, cancellationToken);
return new Enumerator(this, cancellationToken, true);
}
public void Dispose()
{
triggerEvent.SetCanceled(CancellationToken.None);
triggerEvent.SetCompleted();
}
sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, IResolveCancelPromise<T>
public static implicit operator T(AsyncReactiveProperty<T> value)
{
return value.Value;
}
public override string ToString()
{
if (isValueType) return latestValue.ToString();
return latestValue?.ToString();
}
static bool isValueType;
static AsyncReactiveProperty()
{
isValueType = typeof(T).IsValueType;
}
class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
{
readonly AsyncReactiveProperty<T> parent;
public WithoutCurrentEnumerable(AsyncReactiveProperty<T> parent)
{
this.parent = parent;
}
public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new Enumerator(parent, cancellationToken, false);
}
}
sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, ITriggerHandler<T>
{
static Action<object> cancellationCallback = CancellationCallback;
@@ -62,11 +101,13 @@ namespace Cysharp.Threading.Tasks
readonly CancellationTokenRegistration cancellationTokenRegistration;
T value;
bool isDisposed;
bool firstCall;
public Enumerator(AsyncReactiveProperty<T> parent, CancellationToken cancellationToken)
public Enumerator(AsyncReactiveProperty<T> parent, CancellationToken cancellationToken, bool publishCurrentValue)
{
this.parent = parent;
this.cancellationToken = cancellationToken;
this.firstCall = publishCurrentValue;
parent.triggerEvent.Add(this);
TaskTracker.TrackActiveTask(this, 3);
@@ -81,6 +122,14 @@ namespace Cysharp.Threading.Tasks
public UniTask<bool> MoveNextAsync()
{
// raise latest value on first call.
if (firstCall)
{
firstCall = false;
value = parent.Value;
return CompletedTasks.True;
}
completionSource.Reset();
return new UniTask<bool>(this, completionSource.Version);
}
@@ -97,17 +146,25 @@ namespace Cysharp.Threading.Tasks
return default;
}
public bool TrySetResult(T value)
public void OnNext(T value)
{
this.value = value;
completionSource.TrySetResult(true);
return true;
}
public bool TrySetCanceled(CancellationToken cancellationToken = default)
public void OnCanceled(CancellationToken cancellationToken)
{
DisposeAsync().Forget();
return true;
}
public void OnCompleted()
{
completionSource.TrySetResult(false);
}
public void OnError(Exception ex)
{
completionSource.TrySetException(ex);
}
static void CancellationCallback(object state)
@@ -117,4 +174,231 @@ namespace Cysharp.Threading.Tasks
}
}
}
}
public class State<T> : IReadOnlyAsyncReactiveProperty<T>, IDisposable
{
TriggerEvent<T> triggerEvent;
T latestValue;
Action<T> setter;
IUniTaskAsyncEnumerator<T> enumerator;
public T Value
{
get
{
return latestValue;
}
}
public State(T value)
{
this.latestValue = value;
this.triggerEvent = default;
}
public State(T initialValue, IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
latestValue = initialValue;
ConsumeEnumerator(source, cancellationToken).Forget();
}
public State(IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
ConsumeEnumerator(source, cancellationToken).Forget();
}
async UniTaskVoid ConsumeEnumerator(IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
enumerator = source.GetAsyncEnumerator(cancellationToken);
try
{
while (await enumerator.MoveNextAsync())
{
SetValue(enumerator.Current);
}
}
finally
{
await enumerator.DisposeAsync();
enumerator = null;
}
}
public Action<T> GetSetter()
{
if (enumerator != null)
{
throw new InvalidOperationException("Can not get setter when create from IUniTaskAsyncEnumerable source.");
}
if (setter != null)
{
throw new InvalidOperationException("GetSetter can only call once.");
}
setter = SetValue;
return setter;
}
void SetValue(T value)
{
this.latestValue = value;
triggerEvent.SetResult(value);
}
public IUniTaskAsyncEnumerable<T> WithoutCurrent()
{
return new WithoutCurrentEnumerable(this);
}
public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
{
return new Enumerator(this, cancellationToken, true);
}
public void Dispose()
{
if (enumerator != null)
{
enumerator.DisposeAsync().Forget();
}
triggerEvent.SetCompleted();
}
public static implicit operator State<T>(T value)
{
return new State<T>(value);
}
public static implicit operator T(State<T> value)
{
return value.Value;
}
public override string ToString()
{
if (isValueType) return latestValue.ToString();
return latestValue?.ToString();
}
static bool isValueType;
static State()
{
isValueType = typeof(T).IsValueType;
}
class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable<T>
{
readonly State<T> parent;
public WithoutCurrentEnumerable(State<T> parent)
{
this.parent = parent;
}
public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new Enumerator(parent, cancellationToken, false);
}
}
sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, ITriggerHandler<T>
{
static Action<object> cancellationCallback = CancellationCallback;
readonly State<T> parent;
readonly CancellationToken cancellationToken;
readonly CancellationTokenRegistration cancellationTokenRegistration;
T value;
bool isDisposed;
bool firstCall;
public Enumerator(State<T> parent, CancellationToken cancellationToken, bool publishCurrentValue)
{
this.parent = parent;
this.cancellationToken = cancellationToken;
this.firstCall = publishCurrentValue;
parent.triggerEvent.Add(this);
TaskTracker.TrackActiveTask(this, 3);
if (cancellationToken.CanBeCanceled)
{
cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, this);
}
}
public T Current => value;
public UniTask<bool> MoveNextAsync()
{
// raise latest value on first call.
if (firstCall)
{
firstCall = false;
value = parent.Value;
return CompletedTasks.True;
}
completionSource.Reset();
return new UniTask<bool>(this, completionSource.Version);
}
public UniTask DisposeAsync()
{
if (!isDisposed)
{
isDisposed = true;
TaskTracker.RemoveTracking(this);
completionSource.TrySetCanceled(cancellationToken);
parent.triggerEvent.Remove(this);
}
return default;
}
public void OnNext(T value)
{
this.value = value;
completionSource.TrySetResult(true);
}
public void OnCanceled(CancellationToken cancellationToken)
{
DisposeAsync().Forget();
}
public void OnCompleted()
{
completionSource.TrySetResult(false);
}
public void OnError(Exception ex)
{
completionSource.TrySetException(ex);
}
static void CancellationCallback(object state)
{
var self = (Enumerator)state;
self.DisposeAsync().Forget();
}
}
}
public static class StateExtensions
{
public static State<T> ToState<T>(this IUniTaskAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
return new State<T>(source, cancellationToken);
}
public static State<T> ToState<T>(this IUniTaskAsyncEnumerable<T> source, T initialValue, CancellationToken cancellationToken)
{
return new State<T>(initialValue, source, cancellationToken);
}
}
}

View File

@@ -1,6 +1,7 @@
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
using System;
using System.Runtime.CompilerServices;
using System.Threading;
namespace Cysharp.Threading.Tasks
@@ -9,15 +10,15 @@ namespace Cysharp.Threading.Tasks
{
static readonly Action<object> cancellationTokenCallback = Callback;
public static (UniTask, CancellationTokenRegistration) ToUniTask(this CancellationToken cts)
public static (UniTask, CancellationTokenRegistration) ToUniTask(this CancellationToken cancellationToken)
{
if (cts.IsCancellationRequested)
if (cancellationToken.IsCancellationRequested)
{
return (UniTask.FromCanceled(cts), default(CancellationTokenRegistration));
return (UniTask.FromCanceled(cancellationToken), default(CancellationTokenRegistration));
}
var promise = new UniTaskCompletionSource();
return (promise.Task, cts.RegisterWithoutCaptureExecutionContext(cancellationTokenCallback, promise));
return (promise.Task, cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationTokenCallback, promise));
}
static void Callback(object state)
@@ -26,6 +27,11 @@ namespace Cysharp.Threading.Tasks
promise.TrySetResult();
}
public static CancellationTokenAwaitable WaitUntilCanceled(this CancellationToken cancellationToken)
{
return new CancellationTokenAwaitable(cancellationToken);
}
public static CancellationTokenRegistration RegisterWithoutCaptureExecutionContext(this CancellationToken cancellationToken, Action callback)
{
var restoreFlow = false;
@@ -70,5 +76,46 @@ namespace Cysharp.Threading.Tasks
}
}
}
public struct CancellationTokenAwaitable
{
CancellationToken cancellationToken;
public CancellationTokenAwaitable(CancellationToken cancellationToken)
{
this.cancellationToken = cancellationToken;
}
public Awaiter GetAwaiter()
{
return new Awaiter(cancellationToken);
}
public struct Awaiter : ICriticalNotifyCompletion
{
CancellationToken cancellationToken;
public Awaiter(CancellationToken cancellationToken)
{
this.cancellationToken = cancellationToken;
}
public bool IsCompleted => !cancellationToken.CanBeCanceled || cancellationToken.IsCancellationRequested;
public void GetResult()
{
}
public void OnCompleted(Action continuation)
{
UnsafeOnCompleted(continuation);
}
public void UnsafeOnCompleted(Action continuation)
{
cancellationToken.RegisterWithoutCaptureExecutionContext(continuation);
}
}
}
}

View File

@@ -0,0 +1,450 @@
using System;
using System.Collections.Generic;
using System.Threading;
namespace Cysharp.Threading.Tasks
{
public static class Channel
{
public static Channel<T> CreateSingleConsumerUnbounded<T>()
{
return new SingleConsumerUnboundedChannel<T>();
}
}
public abstract class Channel<TWrite, TRead>
{
public ChannelReader<TRead> Reader { get; protected set; }
public ChannelWriter<TWrite> Writer { get; protected set; }
public static implicit operator ChannelReader<TRead>(Channel<TWrite, TRead> channel) => channel.Reader;
public static implicit operator ChannelWriter<TWrite>(Channel<TWrite, TRead> channel) => channel.Writer;
}
public abstract class Channel<T> : Channel<T, T>
{
}
public abstract class ChannelReader<T>
{
public abstract bool TryRead(out T item);
public abstract UniTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default(CancellationToken));
public abstract UniTask Completion { get; }
public virtual UniTask<T> ReadAsync(CancellationToken cancellationToken = default(CancellationToken))
{
if (this.TryRead(out var item))
{
return UniTask.FromResult(item);
}
return ReadAsyncCore(cancellationToken);
}
async UniTask<T> ReadAsyncCore(CancellationToken cancellationToken = default(CancellationToken))
{
if (await WaitToReadAsync(cancellationToken))
{
if (TryRead(out var item))
{
return item;
}
}
throw new ChannelClosedException();
}
public abstract IUniTaskAsyncEnumerable<T> ReadAllAsync(CancellationToken cancellationToken = default(CancellationToken));
}
public abstract class ChannelWriter<T>
{
public abstract bool TryWrite(T item);
public abstract bool TryComplete(Exception error = null);
public void Complete(Exception error = null)
{
if (!TryComplete(error))
{
throw new ChannelClosedException();
}
}
}
public partial class ChannelClosedException : InvalidOperationException
{
public ChannelClosedException() :
base("Channel is already closed.")
{ }
public ChannelClosedException(string message) : base(message) { }
public ChannelClosedException(Exception innerException) :
base("Channel is already closed", innerException)
{ }
public ChannelClosedException(string message, Exception innerException) : base(message, innerException) { }
}
internal class SingleConsumerUnboundedChannel<T> : Channel<T>
{
readonly Queue<T> items;
readonly SingleConsumerUnboundedChannelReader readerSource;
UniTaskCompletionSource completedTaskSource;
UniTask completedTask;
Exception completionError;
bool closed;
public SingleConsumerUnboundedChannel()
{
items = new Queue<T>();
Writer = new SingleConsumerUnboundedChannelWriter(this);
readerSource = new SingleConsumerUnboundedChannelReader(this);
Reader = readerSource;
}
sealed class SingleConsumerUnboundedChannelWriter : ChannelWriter<T>
{
readonly SingleConsumerUnboundedChannel<T> parent;
public SingleConsumerUnboundedChannelWriter(SingleConsumerUnboundedChannel<T> parent)
{
this.parent = parent;
}
public override bool TryWrite(T item)
{
bool waiting;
lock (parent.items)
{
if (parent.closed) return false;
parent.items.Enqueue(item);
waiting = parent.readerSource.isWaiting;
}
if (waiting)
{
parent.readerSource.SingalContinuation();
}
return true;
}
public override bool TryComplete(Exception error = null)
{
bool waiting;
lock (parent.items)
{
if (parent.closed) return false;
parent.closed = true;
waiting = parent.readerSource.isWaiting;
if (parent.items.Count == 0)
{
if (error == null)
{
if (parent.completedTaskSource != null)
{
parent.completedTaskSource.TrySetResult();
}
else
{
parent.completedTask = UniTask.CompletedTask;
}
}
else
{
if (parent.completedTaskSource != null)
{
parent.completedTaskSource.TrySetException(error);
}
else
{
parent.completedTask = UniTask.FromException(error);
}
}
if (waiting)
{
parent.readerSource.SingalCompleted(error);
}
}
parent.completionError = error;
}
return true;
}
}
sealed class SingleConsumerUnboundedChannelReader : ChannelReader<T>, IUniTaskSource<bool>
{
readonly Action<object> CancellationCallbackDelegate = CancellationCallback;
readonly SingleConsumerUnboundedChannel<T> parent;
CancellationToken cancellationToken;
CancellationTokenRegistration cancellationTokenRegistration;
UniTaskCompletionSourceCore<bool> core;
internal bool isWaiting;
public SingleConsumerUnboundedChannelReader(SingleConsumerUnboundedChannel<T> parent)
{
this.parent = parent;
TaskTracker.TrackActiveTask(this, 4);
}
public override UniTask Completion
{
get
{
if (parent.completedTaskSource != null) return parent.completedTaskSource.Task;
if (parent.closed)
{
return parent.completedTask;
}
parent.completedTaskSource = new UniTaskCompletionSource();
return parent.completedTaskSource.Task;
}
}
public override bool TryRead(out T item)
{
lock (parent.items)
{
if (parent.items.Count != 0)
{
item = parent.items.Dequeue();
// complete when all value was consumed.
if (parent.closed && parent.items.Count == 0)
{
if (parent.completionError != null)
{
if (parent.completedTaskSource != null)
{
parent.completedTaskSource.TrySetException(parent.completionError);
}
else
{
parent.completedTask = UniTask.FromException(parent.completionError);
}
}
else
{
if (parent.completedTaskSource != null)
{
parent.completedTaskSource.TrySetResult();
}
else
{
parent.completedTask = UniTask.CompletedTask;
}
}
}
}
else
{
item = default;
return false;
}
}
return true;
}
public override UniTask<bool> WaitToReadAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return UniTask.FromCanceled<bool>(cancellationToken);
}
lock (parent.items)
{
if (parent.items.Count != 0)
{
return CompletedTasks.True;
}
if (parent.closed)
{
if (parent.completionError == null)
{
return CompletedTasks.False;
}
else
{
return UniTask.FromException<bool>(parent.completionError);
}
}
cancellationTokenRegistration.Dispose();
core.Reset();
isWaiting = true;
this.cancellationToken = cancellationToken;
if (this.cancellationToken.CanBeCanceled)
{
cancellationTokenRegistration = this.cancellationToken.RegisterWithoutCaptureExecutionContext(CancellationCallbackDelegate, this);
}
return new UniTask<bool>(this, core.Version);
}
}
public void SingalContinuation()
{
core.TrySetResult(true);
}
public void SingalCancellation(CancellationToken cancellationToken)
{
TaskTracker.RemoveTracking(this);
core.TrySetCanceled(cancellationToken);
}
public void SingalCompleted(Exception error)
{
if (error != null)
{
TaskTracker.RemoveTracking(this);
core.TrySetException(error);
}
else
{
TaskTracker.RemoveTracking(this);
core.TrySetResult(false);
}
}
public override IUniTaskAsyncEnumerable<T> ReadAllAsync(CancellationToken cancellationToken = default)
{
return new ReadAllAsyncEnumerable(this, cancellationToken);
}
bool IUniTaskSource<bool>.GetResult(short token)
{
return core.GetResult(token);
}
void IUniTaskSource.GetResult(short token)
{
core.GetResult(token);
}
UniTaskStatus IUniTaskSource.GetStatus(short token)
{
return core.GetStatus(token);
}
void IUniTaskSource.OnCompleted(Action<object> continuation, object state, short token)
{
core.OnCompleted(continuation, state, token);
}
UniTaskStatus IUniTaskSource.UnsafeGetStatus()
{
return core.UnsafeGetStatus();
}
static void CancellationCallback(object state)
{
var self = (SingleConsumerUnboundedChannelReader)state;
self.SingalCancellation(self.cancellationToken);
}
sealed class ReadAllAsyncEnumerable : IUniTaskAsyncEnumerable<T>, IUniTaskAsyncEnumerator<T>
{
readonly Action<object> CancellationCallback1Delegate = CancellationCallback1;
readonly Action<object> CancellationCallback2Delegate = CancellationCallback2;
readonly SingleConsumerUnboundedChannelReader parent;
CancellationToken cancellationToken1;
CancellationToken cancellationToken2;
CancellationTokenRegistration CancellationTokenRegistration1;
CancellationTokenRegistration CancellationTokenRegistration2;
T current;
bool cacheValue;
bool running;
public ReadAllAsyncEnumerable(SingleConsumerUnboundedChannelReader parent, CancellationToken cancellationToken)
{
this.parent = parent;
this.cancellationToken1 = cancellationToken;
}
public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
if (running)
{
throw new InvalidOperationException("Enumerator is already running, does not allow call GetAsyncEnumerator twice.");
}
if (this.cancellationToken1 != cancellationToken)
{
this.cancellationToken2 = cancellationToken;
}
if (this.cancellationToken1.CanBeCanceled)
{
this.cancellationToken1.RegisterWithoutCaptureExecutionContext(CancellationCallback1Delegate, this);
}
if (this.cancellationToken2.CanBeCanceled)
{
this.cancellationToken2.RegisterWithoutCaptureExecutionContext(CancellationCallback2Delegate, this);
}
running = true;
return this;
}
public T Current
{
get
{
if (cacheValue)
{
return current;
}
parent.TryRead(out current);
return current;
}
}
public UniTask<bool> MoveNextAsync()
{
cacheValue = false;
return parent.WaitToReadAsync(CancellationToken.None); // ok to use None, registered in ctor.
}
public UniTask DisposeAsync()
{
CancellationTokenRegistration1.Dispose();
CancellationTokenRegistration2.Dispose();
return default;
}
static void CancellationCallback1(object state)
{
var self = (ReadAllAsyncEnumerable)state;
self.parent.SingalCancellation(self.cancellationToken1);
}
static void CancellationCallback2(object state)
{
var self = (ReadAllAsyncEnumerable)state;
self.parent.SingalCancellation(self.cancellationToken2);
}
}
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 5ceb3107bbdd1f14eb39091273798360
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -28,6 +28,12 @@ namespace Cysharp.Threading.Tasks
IUniTaskOrderedAsyncEnumerable<TElement> CreateOrderedEnumerable<TKey>(Func<TElement, CancellationToken, UniTask<TKey>> keySelector, IComparer<TKey> comparer, bool descending);
}
public interface IConnectableUniTaskAsyncEnumerable<out T> : IUniTaskAsyncEnumerable<T>
{
IDisposable Connect();
}
// don't use AsyncGrouping.
//public interface IUniTaskAsyncGrouping<out TKey, out TElement> : IUniTaskAsyncEnumerable<TElement>
//{
// TKey Key { get; }

View File

@@ -65,6 +65,8 @@ namespace Cysharp.Threading.Tasks.Linq
this.element = element;
this.state = append ? State.RequireAppend : State.RequirePrepend;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
@@ -136,6 +138,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@@ -76,6 +76,7 @@ namespace Cysharp.Threading.Tasks.Linq
{
this.source = source;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 4);
}
// abstract
@@ -178,6 +179,7 @@ namespace Cysharp.Threading.Tasks.Linq
// if require additional resource to dispose, override and call base.DisposeAsync.
public virtual UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();
@@ -204,6 +206,7 @@ namespace Cysharp.Threading.Tasks.Linq
{
this.source = source;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 4);
}
// abstract
@@ -399,6 +402,7 @@ namespace Cysharp.Threading.Tasks.Linq
// if require additional resource to dispose, override and call base.DisposeAsync.
public virtual UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@@ -61,6 +61,8 @@ namespace Cysharp.Threading.Tasks.Linq
this.source = source;
this.count = count;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public IList<TSource> Current { get; private set; }
@@ -167,6 +169,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();
@@ -217,6 +220,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.count = count;
this.skip = skip;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public IList<TSource> Current { get; private set; }
@@ -329,6 +333,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@@ -57,6 +57,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.second = second;
this.cancellationToken = cancellationToken;
this.iteratingState = IteratingState.IteratingFirst;
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
@@ -150,6 +151,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@@ -63,6 +63,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.cancellationToken = cancellationToken;
this.iteratingState = IteratingState.Empty;
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
@@ -128,6 +129,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@@ -124,6 +124,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.onError = onError;
this.onCompleted = onCompleted;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
@@ -244,6 +245,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@@ -22,6 +22,22 @@ namespace Cysharp.Threading.Tasks.Linq
return Cysharp.Threading.Tasks.Linq.ForEach.ForEachAsync(source, action, cancellationToken);
}
/// <summary>Obsolete(Error), Use Use ForEachAwaitAsync instead.</summary>
[Obsolete("Use ForEachAwaitAsync instead.", true)]
[System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)]
public static UniTask ForEachAsync<T>(this IUniTaskAsyncEnumerable<T> source, Func<T, UniTask> action, CancellationToken cancellationToken = default)
{
throw new NotSupportedException("Use ForEachAwaitAsync instead.");
}
/// <summary>Obsolete(Error), Use Use ForEachAwaitAsync instead.</summary>
[Obsolete("Use ForEachAwaitAsync instead.", true)]
[System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)]
public static UniTask ForEachAsync<T>(this IUniTaskAsyncEnumerable<T> source, Func<T, int, UniTask> action, CancellationToken cancellationToken = default)
{
throw new NotSupportedException("Use ForEachAwaitAsync instead.");
}
public static UniTask ForEachAwaitAsync<TSource>(this IUniTaskAsyncEnumerable<TSource> source, Func<TSource, UniTask> action, CancellationToken cancellationToken = default)
{
Error.ThrowArgumentNullException(source, nameof(source));

View File

@@ -255,6 +255,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.elementSelector = elementSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public IGrouping<TKey, TElement> Current { get; private set; }
@@ -313,6 +314,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (groupEnumerator != null)
{
groupEnumerator.Dispose();
@@ -364,6 +366,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.resultSelector = resultSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@@ -423,6 +426,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (groupEnumerator != null)
{
groupEnumerator.Dispose();
@@ -470,6 +474,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.elementSelector = elementSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public IGrouping<TKey, TElement> Current { get; private set; }
@@ -528,6 +533,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (groupEnumerator != null)
{
groupEnumerator.Dispose();
@@ -582,6 +588,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.resultSelector = resultSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@@ -661,6 +668,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (groupEnumerator != null)
{
groupEnumerator.Dispose();
@@ -708,6 +716,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.elementSelector = elementSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public IGrouping<TKey, TElement> Current { get; private set; }
@@ -766,6 +775,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (groupEnumerator != null)
{
groupEnumerator.Dispose();
@@ -820,6 +830,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.resultSelector = resultSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@@ -899,6 +910,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (groupEnumerator != null)
{
groupEnumerator.Dispose();

View File

@@ -129,6 +129,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.resultSelector = resultSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@@ -208,6 +209,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();
@@ -273,6 +275,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.resultSelector = resultSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@@ -401,6 +404,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();
@@ -466,6 +470,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.resultSelector = resultSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@@ -594,6 +599,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@@ -131,6 +131,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.resultSelector = resultSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@@ -248,6 +249,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (valueEnumerator != null)
{
valueEnumerator.Dispose();
@@ -321,6 +323,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.resultSelector = resultSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@@ -476,6 +479,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (valueEnumerator != null)
{
valueEnumerator.Dispose();
@@ -549,6 +553,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.resultSelector = resultSelector;
this.comparer = comparer;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@@ -704,6 +709,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (valueEnumerator != null)
{
valueEnumerator.Dispose();

View File

@@ -422,6 +422,7 @@ namespace Cysharp.Threading.Tasks.Linq
{
this.parent = parent;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TElement Current { get; private set; }
@@ -477,6 +478,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
return default;
}
}

View File

@@ -0,0 +1,171 @@
using Cysharp.Threading.Tasks.Internal;
using System;
using System.Threading;
namespace Cysharp.Threading.Tasks.Linq
{
public static partial class UniTaskAsyncEnumerable
{
public static IConnectableUniTaskAsyncEnumerable<TSource> Publish<TSource>(this IUniTaskAsyncEnumerable<TSource> source)
{
Error.ThrowArgumentNullException(source, nameof(source));
return new Publish<TSource>(source);
}
}
internal sealed class Publish<TSource> : IConnectableUniTaskAsyncEnumerable<TSource>
{
readonly IUniTaskAsyncEnumerable<TSource> source;
readonly CancellationTokenSource cancellationTokenSource;
TriggerEvent<TSource> trigger;
IUniTaskAsyncEnumerator<TSource> enumerator;
IDisposable connectedDisposable;
bool isCompleted;
public Publish(IUniTaskAsyncEnumerable<TSource> source)
{
this.source = source;
this.cancellationTokenSource = new CancellationTokenSource();
}
public IDisposable Connect()
{
if (connectedDisposable != null) return connectedDisposable;
if (enumerator == null)
{
enumerator = source.GetAsyncEnumerator(cancellationTokenSource.Token);
}
ConsumeEnumerator().Forget();
connectedDisposable = new ConnectDisposable(cancellationTokenSource);
return connectedDisposable;
}
async UniTaskVoid ConsumeEnumerator()
{
try
{
try
{
while (await enumerator.MoveNextAsync())
{
trigger.SetResult(enumerator.Current);
}
trigger.SetCompleted();
}
catch (Exception ex)
{
trigger.SetError(ex);
}
}
finally
{
isCompleted = true;
await enumerator.DisposeAsync();
}
}
public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new _Publish(this, cancellationToken);
}
sealed class ConnectDisposable : IDisposable
{
readonly CancellationTokenSource cancellationTokenSource;
public ConnectDisposable(CancellationTokenSource cancellationTokenSource)
{
this.cancellationTokenSource = cancellationTokenSource;
}
public void Dispose()
{
this.cancellationTokenSource.Cancel();
}
}
sealed class _Publish : MoveNextSource, IUniTaskAsyncEnumerator<TSource>, ITriggerHandler<TSource>
{
static readonly Action<object> CancelDelegate = OnCanceled;
readonly Publish<TSource> parent;
CancellationToken cancellationToken;
CancellationTokenRegistration cancellationTokenRegistration;
bool isDisposed;
public _Publish(Publish<TSource> parent, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;
this.parent = parent;
this.cancellationToken = cancellationToken;
if (cancellationToken.CanBeCanceled)
{
this.cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(CancelDelegate, this);
}
parent.trigger.Add(this);
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
public UniTask<bool> MoveNextAsync()
{
cancellationToken.ThrowIfCancellationRequested();
if (parent.isCompleted) return CompletedTasks.False;
completionSource.Reset();
return new UniTask<bool>(this, completionSource.Version);
}
static void OnCanceled(object state)
{
var self = (_Publish)state;
self.completionSource.TrySetCanceled(self.cancellationToken);
self.DisposeAsync().Forget();
}
public UniTask DisposeAsync()
{
if (!isDisposed)
{
isDisposed = true;
TaskTracker.RemoveTracking(this);
cancellationTokenRegistration.Dispose();
parent.trigger.Remove(this);
}
return default;
}
public void OnNext(TSource value)
{
Current = value;
completionSource.TrySetResult(true);
}
public void OnCanceled(CancellationToken cancellationToken)
{
completionSource.TrySetCanceled(cancellationToken);
}
public void OnCompleted()
{
completionSource.TrySetResult(false);
}
public void OnError(Exception ex)
{
completionSource.TrySetException(ex);
}
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 93c684d1e88c09d4e89b79437d97b810
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,103 @@
using System;
using System.Threading;
namespace Cysharp.Threading.Tasks.Linq
{
public static partial class UniTaskAsyncEnumerable
{
public static IUniTaskAsyncEnumerable<TSource> Queue<TSource>(this IUniTaskAsyncEnumerable<TSource> source)
{
return new QueueOperator<TSource>(source);
}
}
internal sealed class QueueOperator<TSource> : IUniTaskAsyncEnumerable<TSource>
{
readonly IUniTaskAsyncEnumerable<TSource> source;
public QueueOperator(IUniTaskAsyncEnumerable<TSource> source)
{
this.source = source;
}
public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new _Queue(source, cancellationToken);
}
sealed class _Queue : IUniTaskAsyncEnumerator<TSource>
{
readonly IUniTaskAsyncEnumerable<TSource> source;
CancellationToken cancellationToken;
Channel<TSource> channel;
IUniTaskAsyncEnumerator<TSource> channelEnumerator;
IUniTaskAsyncEnumerator<TSource> sourceEnumerator;
bool channelClosed;
public _Queue(IUniTaskAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
{
this.source = source;
this.cancellationToken = cancellationToken;
}
public TSource Current => channelEnumerator.Current;
public UniTask<bool> MoveNextAsync()
{
cancellationToken.ThrowIfCancellationRequested();
if (sourceEnumerator == null)
{
sourceEnumerator = source.GetAsyncEnumerator(cancellationToken);
channel = Channel.CreateSingleConsumerUnbounded<TSource>();
channelEnumerator = channel.Reader.ReadAllAsync().GetAsyncEnumerator(cancellationToken);
ConsumeAll(this, sourceEnumerator, channel).Forget();
}
return channelEnumerator.MoveNextAsync();
}
static async UniTaskVoid ConsumeAll(_Queue self, IUniTaskAsyncEnumerator<TSource> enumerator, ChannelWriter<TSource> writer)
{
try
{
while (await enumerator.MoveNextAsync())
{
writer.TryWrite(enumerator.Current);
}
writer.TryComplete();
}
catch (Exception ex)
{
writer.TryComplete(ex);
}
finally
{
self.channelClosed = true;
await enumerator.DisposeAsync();
}
}
public async UniTask DisposeAsync()
{
if (sourceEnumerator != null)
{
await sourceEnumerator.DisposeAsync();
}
if (channelEnumerator != null)
{
await channelEnumerator.DisposeAsync();
}
if (!channelClosed)
{
channelClosed = true;
channel.Writer.TryComplete(new OperationCanceledException());
}
}
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: b7ea1bcf9dbebb042bc99c7816249e02
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -40,6 +40,7 @@ namespace Cysharp.Threading.Tasks.Linq
{
this.source = source;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
@@ -69,6 +70,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
return default;
}
}

View File

@@ -160,6 +160,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.selector2 = selector2;
this.resultSelector = resultSelector;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@@ -324,6 +325,7 @@ namespace Cysharp.Threading.Tasks.Linq
public async UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (selectedEnumerator != null)
{
await selectedEnumerator.DisposeAsync();
@@ -398,6 +400,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.selector2 = selector2;
this.resultSelector = resultSelector;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@@ -598,6 +601,7 @@ namespace Cysharp.Threading.Tasks.Linq
public async UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (selectedEnumerator != null)
{
await selectedEnumerator.DisposeAsync();
@@ -672,6 +676,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.selector2 = selector2;
this.resultSelector = resultSelector;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@@ -872,6 +877,7 @@ namespace Cysharp.Threading.Tasks.Linq
public async UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (selectedEnumerator != null)
{
await selectedEnumerator.DisposeAsync();

View File

@@ -56,6 +56,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.source = source;
this.count = count;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
@@ -146,6 +147,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@@ -0,0 +1,144 @@
using Cysharp.Threading.Tasks.Internal;
using System;
using System.Threading;
namespace Cysharp.Threading.Tasks.Linq
{
public static partial class UniTaskAsyncEnumerable
{
public static IUniTaskAsyncEnumerable<TSource> SkipUntilCanceled<TSource>(this IUniTaskAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
return new SkipUntilCanceled<TSource>(source, cancellationToken);
}
}
internal sealed class SkipUntilCanceled<TSource> : IUniTaskAsyncEnumerable<TSource>
{
readonly IUniTaskAsyncEnumerable<TSource> source;
readonly CancellationToken cancellationToken;
public SkipUntilCanceled(IUniTaskAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
{
this.source = source;
this.cancellationToken = cancellationToken;
}
public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new _SkipUntilCanceled(source, this.cancellationToken, cancellationToken);
}
sealed class _SkipUntilCanceled : MoveNextSource, IUniTaskAsyncEnumerator<TSource>
{
static readonly Action<object> MoveNextCoreDelegate = MoveNextCore;
readonly IUniTaskAsyncEnumerable<TSource> source;
CancellationToken cancellationToken1;
CancellationToken cancellationToken2;
bool isCanceled;
IUniTaskAsyncEnumerator<TSource> enumerator;
UniTask<bool>.Awaiter awaiter;
bool continueNext;
public _SkipUntilCanceled(IUniTaskAsyncEnumerable<TSource> source, CancellationToken cancellationToken1, CancellationToken cancellationToken2)
{
this.source = source;
this.cancellationToken1 = cancellationToken1;
this.cancellationToken2 = cancellationToken2;
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
public UniTask<bool> MoveNextAsync()
{
if (cancellationToken1.IsCancellationRequested) isCanceled = true;
if (cancellationToken2.IsCancellationRequested) isCanceled = true;
if (enumerator == null)
{
enumerator = source.GetAsyncEnumerator(cancellationToken2); // use only AsyncEnumerator provided token.
}
completionSource.Reset();
SourceMoveNext();
return new UniTask<bool>(this, completionSource.Version);
}
void SourceMoveNext()
{
try
{
LOOP:
awaiter = enumerator.MoveNextAsync().GetAwaiter();
if (awaiter.IsCompleted)
{
continueNext = true;
MoveNextCore(this);
if (continueNext)
{
continueNext = false;
goto LOOP;
}
}
else
{
awaiter.SourceOnCompleted(MoveNextCoreDelegate, this);
}
}
catch (Exception ex)
{
completionSource.TrySetException(ex);
}
}
static void MoveNextCore(object state)
{
var self = (_SkipUntilCanceled)state;
if (self.TryGetResult(self.awaiter, out var result))
{
if (result)
{
AGAIN:
if (self.isCanceled)
{
self.continueNext = false;
self.Current = self.enumerator.Current;
self.completionSource.TrySetResult(true);
}
else
{
if (self.cancellationToken1.IsCancellationRequested) self.isCanceled = true;
if (self.cancellationToken2.IsCancellationRequested) self.isCanceled = true;
if (self.isCanceled) goto AGAIN;
if (!self.continueNext)
{
self.SourceMoveNext();
}
}
}
else
{
self.completionSource.TrySetResult(false);
}
}
}
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();
}
return default;
}
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 4b1a778aef7150d47b93a49aa1bc34ae
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -30,40 +30,95 @@ namespace Cysharp.Threading.Tasks.Linq
return new _Take(source, count, cancellationToken);
}
sealed class _Take : AsyncEnumeratorBase<TSource, TSource>
sealed class _Take : MoveNextSource, IUniTaskAsyncEnumerator<TSource>
{
readonly int count;
static readonly Action<object> MoveNextCoreDelegate = MoveNextCore;
readonly IUniTaskAsyncEnumerable<TSource> source;
readonly int count;
CancellationToken cancellationToken;
IUniTaskAsyncEnumerator<TSource> enumerator;
UniTask<bool>.Awaiter awaiter;
int index;
public _Take(IUniTaskAsyncEnumerable<TSource> source, int count, CancellationToken cancellationToken)
: base(source, cancellationToken)
{
this.source = source;
this.count = count;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
protected override bool TryMoveNextCore(bool sourceHasCurrent, out bool result)
public TSource Current { get; private set; }
public UniTask<bool> MoveNextAsync()
{
if (sourceHasCurrent)
cancellationToken.ThrowIfCancellationRequested();
if (enumerator == null)
{
if (checked(index++) < count)
enumerator = source.GetAsyncEnumerator(cancellationToken);
}
if (checked(index) >= count)
{
return CompletedTasks.False;
}
completionSource.Reset();
SourceMoveNext();
return new UniTask<bool>(this, completionSource.Version);
}
void SourceMoveNext()
{
try
{
awaiter = enumerator.MoveNextAsync().GetAwaiter();
if (awaiter.IsCompleted)
{
Current = SourceCurrent;
result = true;
return true;
MoveNextCore(this);
}
else
{
result = false;
return true;
awaiter.SourceOnCompleted(MoveNextCoreDelegate, this);
}
}
else
catch (Exception ex)
{
result = false;
return true;
completionSource.TrySetException(ex);
}
}
static void MoveNextCore(object state)
{
var self = (_Take)state;
if (self.TryGetResult(self.awaiter, out var result))
{
if (result)
{
self.index++;
self.Current = self.enumerator.Current;
self.completionSource.TrySetResult(true);
}
else
{
self.completionSource.TrySetResult(false);
}
}
}
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();
}
return default;
}
}
}
}

View File

@@ -57,6 +57,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.source = source;
this.count = count;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
@@ -162,6 +163,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (enumerator != null)
{
return enumerator.DisposeAsync();

View File

@@ -0,0 +1,164 @@
using Cysharp.Threading.Tasks.Internal;
using System;
using System.Threading;
namespace Cysharp.Threading.Tasks.Linq
{
public static partial class UniTaskAsyncEnumerable
{
public static IUniTaskAsyncEnumerable<TSource> TakeUntilCanceled<TSource>(this IUniTaskAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
{
Error.ThrowArgumentNullException(source, nameof(source));
return new TakeUntilCanceled<TSource>(source, cancellationToken);
}
}
internal sealed class TakeUntilCanceled<TSource> : IUniTaskAsyncEnumerable<TSource>
{
readonly IUniTaskAsyncEnumerable<TSource> source;
readonly CancellationToken cancellationToken;
public TakeUntilCanceled(IUniTaskAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
{
this.source = source;
this.cancellationToken = cancellationToken;
}
public IUniTaskAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new _TakeUntilCanceled(source, this.cancellationToken, cancellationToken);
}
sealed class _TakeUntilCanceled : MoveNextSource, IUniTaskAsyncEnumerator<TSource>
{
static readonly Action<object> CancelDelegate1 = OnCanceled1;
static readonly Action<object> CancelDelegate2 = OnCanceled2;
static readonly Action<object> MoveNextCoreDelegate = MoveNextCore;
readonly IUniTaskAsyncEnumerable<TSource> source;
CancellationToken cancellationToken1;
CancellationToken cancellationToken2;
CancellationTokenRegistration cancellationTokenRegistration1;
CancellationTokenRegistration cancellationTokenRegistration2;
bool isCanceled;
IUniTaskAsyncEnumerator<TSource> enumerator;
UniTask<bool>.Awaiter awaiter;
public _TakeUntilCanceled(IUniTaskAsyncEnumerable<TSource> source, CancellationToken cancellationToken1, CancellationToken cancellationToken2)
{
this.source = source;
this.cancellationToken1 = cancellationToken1;
this.cancellationToken2 = cancellationToken2;
if (cancellationToken1.CanBeCanceled)
{
this.cancellationTokenRegistration1 = cancellationToken1.RegisterWithoutCaptureExecutionContext(CancelDelegate1, this);
}
if (cancellationToken1 != cancellationToken2 && cancellationToken2.CanBeCanceled)
{
this.cancellationTokenRegistration2 = cancellationToken2.RegisterWithoutCaptureExecutionContext(CancelDelegate2, this);
}
TaskTracker.TrackActiveTask(this, 3);
}
public TSource Current { get; private set; }
public UniTask<bool> MoveNextAsync()
{
if (cancellationToken1.IsCancellationRequested) isCanceled = true;
if (cancellationToken2.IsCancellationRequested) isCanceled = true;
if (enumerator == null)
{
enumerator = source.GetAsyncEnumerator(cancellationToken2); // use only AsyncEnumerator provided token.
}
if (isCanceled) return CompletedTasks.False;
completionSource.Reset();
SourceMoveNext();
return new UniTask<bool>(this, completionSource.Version);
}
void SourceMoveNext()
{
try
{
awaiter = enumerator.MoveNextAsync().GetAwaiter();
if (awaiter.IsCompleted)
{
MoveNextCore(this);
}
else
{
awaiter.SourceOnCompleted(MoveNextCoreDelegate, this);
}
}
catch (Exception ex)
{
completionSource.TrySetException(ex);
}
}
static void MoveNextCore(object state)
{
var self = (_TakeUntilCanceled)state;
if (self.TryGetResult(self.awaiter, out var result))
{
if (result)
{
if (self.isCanceled)
{
self.completionSource.TrySetResult(false);
}
else
{
self.Current = self.enumerator.Current;
self.completionSource.TrySetResult(true);
}
}
else
{
self.completionSource.TrySetResult(false);
}
}
}
static void OnCanceled1(object state)
{
var self = (_TakeUntilCanceled)state;
if (!self.isCanceled)
{
self.cancellationTokenRegistration2.Dispose();
self.completionSource.TrySetResult(false);
}
}
static void OnCanceled2(object state)
{
var self = (_TakeUntilCanceled)state;
if (!self.isCanceled)
{
self.cancellationTokenRegistration1.Dispose();
self.completionSource.TrySetResult(false);
}
}
public UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
cancellationTokenRegistration1.Dispose();
cancellationTokenRegistration2.Dispose();
if (enumerator != null)
{
return enumerator.DisposeAsync();
}
return default;
}
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: e82f498cf3a1df04cbf646773fc11319
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -1,7 +1,4 @@
using Cysharp.Threading.Tasks.Internal;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading;
namespace Cysharp.Threading.Tasks.Linq
{

View File

@@ -0,0 +1,241 @@
using Cysharp.Threading.Tasks.Internal;
using System;
using System.Collections.Generic;
using System.Threading;
namespace Cysharp.Threading.Tasks.Linq
{
public static partial class UniTaskAsyncEnumerable
{
public static IUniTaskAsyncEnumerable<TProperty> EveryValueChanged<TTarget, TProperty>(TTarget target, Func<TTarget, TProperty> propertySelector, PlayerLoopTiming monitorTiming = PlayerLoopTiming.Update, IEqualityComparer<TProperty> equalityComparer = null)
where TTarget : class
{
var unityObject = target as UnityEngine.Object;
var isUnityObject = target is UnityEngine.Object; // don't use (unityObject == null)
if (isUnityObject)
{
return new EveryValueChangedUnityObject<TTarget, TProperty>(target, propertySelector, equalityComparer ?? UnityEqualityComparer.GetDefault<TProperty>(), monitorTiming);
}
else
{
return new EveryValueChangedStandardObject<TTarget, TProperty>(target, propertySelector, equalityComparer ?? UnityEqualityComparer.GetDefault<TProperty>(), monitorTiming);
}
}
}
internal sealed class EveryValueChangedUnityObject<TTarget, TProperty> : IUniTaskAsyncEnumerable<TProperty>
{
readonly TTarget target;
readonly Func<TTarget, TProperty> propertySelector;
readonly IEqualityComparer<TProperty> equalityComparer;
readonly PlayerLoopTiming monitorTiming;
public EveryValueChangedUnityObject(TTarget target, Func<TTarget, TProperty> propertySelector, IEqualityComparer<TProperty> equalityComparer, PlayerLoopTiming monitorTiming)
{
this.target = target;
this.propertySelector = propertySelector;
this.equalityComparer = equalityComparer;
this.monitorTiming = monitorTiming;
}
public IUniTaskAsyncEnumerator<TProperty> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new _EveryValueChanged(target, propertySelector, equalityComparer, monitorTiming, cancellationToken);
}
sealed class _EveryValueChanged : MoveNextSource, IUniTaskAsyncEnumerator<TProperty>, IPlayerLoopItem
{
readonly TTarget target;
readonly UnityEngine.Object targetAsUnityObject;
readonly IEqualityComparer<TProperty> equalityComparer;
readonly Func<TTarget, TProperty> propertySelector;
CancellationToken cancellationToken;
bool first;
TProperty currentValue;
bool disposed;
public _EveryValueChanged(TTarget target, Func<TTarget, TProperty> propertySelector, IEqualityComparer<TProperty> equalityComparer, PlayerLoopTiming monitorTiming, CancellationToken cancellationToken)
{
this.target = target;
this.targetAsUnityObject = target as UnityEngine.Object;
this.propertySelector = propertySelector;
this.equalityComparer = equalityComparer;
this.cancellationToken = cancellationToken;
this.first = true;
TaskTracker.TrackActiveTask(this, 2);
PlayerLoopHelper.AddAction(monitorTiming, this);
}
public TProperty Current => currentValue;
public UniTask<bool> MoveNextAsync()
{
// return false instead of throw
if (disposed || cancellationToken.IsCancellationRequested) return CompletedTasks.False;
if (first)
{
first = false;
if (targetAsUnityObject == null)
{
return CompletedTasks.False;
}
this.currentValue = propertySelector(target);
return CompletedTasks.True;
}
completionSource.Reset();
return new UniTask<bool>(this, completionSource.Version);
}
public UniTask DisposeAsync()
{
if (!disposed)
{
disposed = true;
TaskTracker.RemoveTracking(this);
}
return default;
}
public bool MoveNext()
{
if (disposed || cancellationToken.IsCancellationRequested || targetAsUnityObject == null) // destroyed = cancel.
{
completionSource.TrySetResult(false);
DisposeAsync().Forget();
return false;
}
TProperty nextValue = default(TProperty);
try
{
nextValue = propertySelector(target);
if (equalityComparer.Equals(currentValue, nextValue))
{
return true;
}
}
catch (Exception ex)
{
completionSource.TrySetException(ex);
DisposeAsync().Forget();
return false;
}
currentValue = nextValue;
completionSource.TrySetResult(true);
return true;
}
}
}
internal sealed class EveryValueChangedStandardObject<TTarget, TProperty> : IUniTaskAsyncEnumerable<TProperty>
where TTarget : class
{
readonly WeakReference<TTarget> target;
readonly Func<TTarget, TProperty> propertySelector;
readonly IEqualityComparer<TProperty> equalityComparer;
readonly PlayerLoopTiming monitorTiming;
public EveryValueChangedStandardObject(TTarget target, Func<TTarget, TProperty> propertySelector, IEqualityComparer<TProperty> equalityComparer, PlayerLoopTiming monitorTiming)
{
this.target = new WeakReference<TTarget>(target, false);
this.propertySelector = propertySelector;
this.equalityComparer = equalityComparer;
this.monitorTiming = monitorTiming;
}
public IUniTaskAsyncEnumerator<TProperty> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new _EveryValueChanged(target, propertySelector, equalityComparer, monitorTiming, cancellationToken);
}
sealed class _EveryValueChanged : MoveNextSource, IUniTaskAsyncEnumerator<TProperty>, IPlayerLoopItem
{
readonly WeakReference<TTarget> target;
readonly IEqualityComparer<TProperty> equalityComparer;
readonly Func<TTarget, TProperty> propertySelector;
CancellationToken cancellationToken;
bool first;
TProperty currentValue;
bool disposed;
public _EveryValueChanged(WeakReference<TTarget> target, Func<TTarget, TProperty> propertySelector, IEqualityComparer<TProperty> equalityComparer, PlayerLoopTiming monitorTiming, CancellationToken cancellationToken)
{
this.target = target;
this.propertySelector = propertySelector;
this.equalityComparer = equalityComparer;
this.cancellationToken = cancellationToken;
this.first = true;
TaskTracker.TrackActiveTask(this, 2);
PlayerLoopHelper.AddAction(monitorTiming, this);
}
public TProperty Current => currentValue;
public UniTask<bool> MoveNextAsync()
{
if (disposed || cancellationToken.IsCancellationRequested) return CompletedTasks.False;
if (first)
{
first = false;
if (!target.TryGetTarget(out var t))
{
return CompletedTasks.False;
}
this.currentValue = propertySelector(t);
return CompletedTasks.True;
}
completionSource.Reset();
return new UniTask<bool>(this, completionSource.Version);
}
public UniTask DisposeAsync()
{
if (!disposed)
{
disposed = true;
TaskTracker.RemoveTracking(this);
}
return default;
}
public bool MoveNext()
{
UnityEngine.Debug.Log("TRY_RESULT:" + target.TryGetTarget(out var _));
if (disposed || cancellationToken.IsCancellationRequested || !target.TryGetTarget(out var t))
{
completionSource.TrySetResult(false);
DisposeAsync().Forget();
return false;
}
TProperty nextValue = default(TProperty);
try
{
nextValue = propertySelector(t);
if (equalityComparer.Equals(currentValue, nextValue))
{
return true;
}
}
catch (Exception ex)
{
completionSource.TrySetException(ex);
DisposeAsync().Forget();
return false;
}
currentValue = nextValue;
completionSource.TrySetResult(true);
return true;
}
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 1ec39f1c41c305344854782c935ad354
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -66,6 +66,7 @@ namespace Cysharp.Threading.Tasks.Linq
float elapsed;
bool dueTimePhase;
bool completed;
bool disposed;
public _Timer(TimeSpan dueTime, TimeSpan? period, PlayerLoopTiming updateTiming, bool ignoreTimeScale, CancellationToken cancellationToken)
@@ -91,7 +92,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask<bool> MoveNextAsync()
{
// return false instead of throw
if (disposed || cancellationToken.IsCancellationRequested) return CompletedTasks.False;
if (disposed || cancellationToken.IsCancellationRequested || completed) return CompletedTasks.False;
// reset value here.
this.elapsed = 0;
@@ -131,6 +132,7 @@ namespace Cysharp.Threading.Tasks.Linq
{
if (period == null)
{
completed = true;
completionSource.TrySetResult(false);
return false;
}
@@ -172,6 +174,7 @@ namespace Cysharp.Threading.Tasks.Linq
int currentFrame;
bool dueTimePhase;
bool completed;
bool disposed;
public _TimerFrame(int dueTimeFrameCount, int? periodFrameCount, PlayerLoopTiming updateTiming, CancellationToken cancellationToken)
@@ -195,7 +198,7 @@ namespace Cysharp.Threading.Tasks.Linq
public UniTask<bool> MoveNextAsync()
{
// return false instead of throw
if (disposed || cancellationToken.IsCancellationRequested) return CompletedTasks.False;
if (disposed || cancellationToken.IsCancellationRequested || completed) return CompletedTasks.False;
// reset value here.
@@ -235,6 +238,7 @@ namespace Cysharp.Threading.Tasks.Linq
{
if (periodFrameCount == null)
{
completed = true;
completionSource.TrySetResult(false);
return false;
}

View File

@@ -84,6 +84,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.second = second;
this.resultSelector = resultSelector;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@@ -181,6 +182,7 @@ namespace Cysharp.Threading.Tasks.Linq
public async UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (firstEnumerator != null)
{
await firstEnumerator.DisposeAsync();
@@ -236,6 +238,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.second = second;
this.resultSelector = resultSelector;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@@ -351,6 +354,7 @@ namespace Cysharp.Threading.Tasks.Linq
public async UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (firstEnumerator != null)
{
await firstEnumerator.DisposeAsync();
@@ -406,6 +410,7 @@ namespace Cysharp.Threading.Tasks.Linq
this.second = second;
this.resultSelector = resultSelector;
this.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(this, 3);
}
public TResult Current { get; private set; }
@@ -521,6 +526,7 @@ namespace Cysharp.Threading.Tasks.Linq
public async UniTask DisposeAsync()
{
TaskTracker.RemoveTracking(this);
if (firstEnumerator != null)
{
await firstEnumerator.DisposeAsync();

View File

@@ -1,7 +1,4 @@
#if NET_4_6 || NET_STANDARD_2_0 || CSHARP_7_OR_LATER
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
using System;
using System;
using System.Collections.Generic;
using Cysharp.Threading.Tasks.Internal;
@@ -21,7 +18,11 @@ namespace Cysharp.Threading.Tasks
public static IProgress<T> CreateOnlyValueChanged<T>(Action<T> handler, IEqualityComparer<T> comparer = null)
{
if (handler == null) return NullProgress<T>.Instance;
#if UNITY_2018_3_OR_NEWER
return new OnlyValueChangedProgress<T>(handler, comparer ?? UnityEqualityComparer.GetDefault<T>());
#else
return new OnlyValueChangedProgress<T>(handler, comparer ?? EqualityComparer<T>.Default);
#endif
}
sealed class NullProgress<T> : IProgress<T>
@@ -83,6 +84,4 @@ namespace Cysharp.Threading.Tasks
}
}
}
}
#endif
}

View File

@@ -4,26 +4,26 @@ using System.Threading;
namespace Cysharp.Threading.Tasks
{
public interface ITriggerEvent<T>
public interface ITriggerHandler<T>
{
void SetResult(T value);
void SetCanceled(CancellationToken cancellationToken);
void Add(IResolveCancelPromise<T> handler);
void Remove(IResolveCancelPromise<T> handler);
void OnNext(T value);
void OnError(Exception ex);
void OnCompleted();
void OnCanceled(CancellationToken cancellationToken);
}
// be careful to use, itself is struct.
public struct TriggerEvent<T> : ITriggerEvent<T>
public struct TriggerEvent<T>
{
// optimize: many cases, handler is single.
IResolveCancelPromise<T> singleHandler;
ITriggerHandler<T> singleHandler;
IResolveCancelPromise<T>[] handlers;
ITriggerHandler<T>[] handlers;
// when running(in TrySetResult), does not add immediately.
// when running(in TrySetResult), does not add immediately(trampoline).
bool isRunning;
IResolveCancelPromise<T> waitHandler;
MinimumQueue<IResolveCancelPromise<T>> waitQueue;
ITriggerHandler<T> waitHandler;
MinimumQueue<ITriggerHandler<T>> waitQueue;
public void SetResult(T value)
{
@@ -33,7 +33,7 @@ namespace Cysharp.Threading.Tasks
{
try
{
singleHandler.TrySetResult(value);
singleHandler.OnNext(value);
}
catch (Exception ex)
{
@@ -53,7 +53,7 @@ namespace Cysharp.Threading.Tasks
{
try
{
handlers[i].TrySetResult(value);
handlers[i].OnNext(value);
}
catch (Exception ex)
{
@@ -94,7 +94,7 @@ namespace Cysharp.Threading.Tasks
{
try
{
((ICancelPromise)singleHandler).TrySetCanceled(cancellationToken);
(singleHandler).OnCanceled(cancellationToken);
}
catch (Exception ex)
{
@@ -114,7 +114,7 @@ namespace Cysharp.Threading.Tasks
{
try
{
((ICancelPromise)handlers[i]).TrySetCanceled(cancellationToken);
(handlers[i]).OnCanceled(cancellationToken);
}
catch (Exception ex)
{
@@ -147,7 +147,129 @@ namespace Cysharp.Threading.Tasks
}
}
public void Add(IResolveCancelPromise<T> handler)
public void SetCompleted()
{
isRunning = true;
if (singleHandler != null)
{
try
{
(singleHandler).OnCompleted();
}
catch (Exception ex)
{
#if UNITY_2018_3_OR_NEWER
UnityEngine.Debug.LogException(ex);
#else
Console.WriteLine(ex);
#endif
}
}
if (handlers != null)
{
for (int i = 0; i < handlers.Length; i++)
{
if (handlers[i] != null)
{
try
{
(handlers[i]).OnCompleted();
}
catch (Exception ex)
{
#if UNITY_2018_3_OR_NEWER
UnityEngine.Debug.LogException(ex);
#else
Console.WriteLine(ex);
#endif
handlers[i] = null;
}
}
}
}
isRunning = false;
if (waitHandler != null)
{
var h = waitHandler;
waitHandler = null;
Add(h);
}
if (waitQueue != null)
{
while (waitQueue.Count != 0)
{
Add(waitQueue.Dequeue());
}
}
}
public void SetError(Exception exception)
{
isRunning = true;
if (singleHandler != null)
{
try
{
singleHandler.OnError(exception);
}
catch (Exception ex)
{
#if UNITY_2018_3_OR_NEWER
UnityEngine.Debug.LogException(ex);
#else
Console.WriteLine(ex);
#endif
}
}
if (handlers != null)
{
for (int i = 0; i < handlers.Length; i++)
{
if (handlers[i] != null)
{
try
{
handlers[i].OnError(exception);
}
catch (Exception ex)
{
handlers[i] = null;
#if UNITY_2018_3_OR_NEWER
UnityEngine.Debug.LogException(ex);
#else
Console.WriteLine(ex);
#endif
}
}
}
}
isRunning = false;
if (waitHandler != null)
{
var h = waitHandler;
waitHandler = null;
Add(h);
}
if (waitQueue != null)
{
while (waitQueue.Count != 0)
{
Add(waitQueue.Dequeue());
}
}
}
public void Add(ITriggerHandler<T> handler)
{
if (isRunning)
{
@@ -159,7 +281,7 @@ namespace Cysharp.Threading.Tasks
if (waitQueue == null)
{
waitQueue = new MinimumQueue<IResolveCancelPromise<T>>(4);
waitQueue = new MinimumQueue<ITriggerHandler<T>>(4);
}
waitQueue.Enqueue(handler);
return;
@@ -173,7 +295,7 @@ namespace Cysharp.Threading.Tasks
{
if (handlers == null)
{
handlers = new IResolveCancelPromise<T>[4];
handlers = new ITriggerHandler<T>[4];
}
// check empty
@@ -195,15 +317,15 @@ namespace Cysharp.Threading.Tasks
}
}
static void EnsureCapacity(ref IResolveCancelPromise<T>[] array)
static void EnsureCapacity(ref ITriggerHandler<T>[] array)
{
var newSize = array.Length * 2;
var newArray = new IResolveCancelPromise<T>[newSize];
var newArray = new ITriggerHandler<T>[newSize];
Array.Copy(array, 0, newArray, 0, array.Length);
array = newArray;
}
public void Remove(IResolveCancelPromise<T> handler)
public void Remove(ITriggerHandler<T> handler)
{
if (singleHandler == handler)
{

View File

@@ -24,10 +24,10 @@ namespace Cysharp.Threading.Tasks.Triggers
if (calledDestroy) return;
calledDestroy = true;
triggerEvent.SetCanceled(CancellationToken.None);
triggerEvent.SetCompleted();
}
internal void AddHandler(IResolveCancelPromise<T> handler)
internal void AddHandler(ITriggerHandler<T> handler)
{
if (!calledAwake)
{
@@ -37,7 +37,7 @@ namespace Cysharp.Threading.Tasks.Triggers
triggerEvent.Add(handler);
}
internal void RemoveHandler(IResolveCancelPromise<T> handler)
internal void RemoveHandler(ITriggerHandler<T> handler)
{
if (!calledAwake)
{
@@ -57,7 +57,7 @@ namespace Cysharp.Threading.Tasks.Triggers
return new AsyncTriggerEnumerator(this, cancellationToken);
}
sealed class AsyncTriggerEnumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, IResolveCancelPromise<T>
sealed class AsyncTriggerEnumerator : MoveNextSource, IUniTaskAsyncEnumerator<T>, ITriggerHandler<T>
{
static Action<object> cancellationCallback = CancellationCallback;
@@ -73,15 +73,25 @@ namespace Cysharp.Threading.Tasks.Triggers
this.cancellationToken = cancellationToken;
}
public bool TrySetCanceled(CancellationToken cancellationToken = default)
public void OnCanceled(CancellationToken cancellationToken = default)
{
return completionSource.TrySetCanceled(cancellationToken);
completionSource.TrySetCanceled(cancellationToken);
}
public bool TrySetResult(T value)
public void OnNext(T value)
{
Current = value;
return completionSource.TrySetResult(true);
completionSource.TrySetResult(true);
}
public void OnCompleted()
{
completionSource.TrySetResult(false);
}
public void OnError(Exception ex)
{
completionSource.TrySetException(ex);
}
static void CancellationCallback(object state)
@@ -164,7 +174,7 @@ namespace Cysharp.Threading.Tasks.Triggers
}
}
public sealed partial class AsyncTriggerHandler<T> : IUniTaskSource<T>, IResolveCancelPromise<T>, IDisposable
public sealed partial class AsyncTriggerHandler<T> : IUniTaskSource<T>, ITriggerHandler<T>, IDisposable
{
static Action<object> cancellationCallback = CancellationCallback;
@@ -253,14 +263,24 @@ namespace Cysharp.Threading.Tasks.Triggers
}
}
bool IResolvePromise<T>.TrySetResult(T result)
void ITriggerHandler<T>.OnNext(T value)
{
return core.TrySetResult(result);
core.TrySetResult(value);
}
bool ICancelPromise.TrySetCanceled(CancellationToken cancellationToken)
void ITriggerHandler<T>.OnCanceled(CancellationToken cancellationToken)
{
return core.TrySetCanceled(cancellationToken);
core.TrySetCanceled(cancellationToken);
}
void ITriggerHandler<T>.OnCompleted()
{
core.TrySetCanceled(CancellationToken.None);
}
void ITriggerHandler<T>.OnError(Exception ex)
{
core.TrySetException(ex);
}
void IUniTaskSource.GetResult(short token)

View File

@@ -132,6 +132,101 @@ namespace Cysharp.Threading.Tasks
{
asyncAction(state).Forget();
}
/// <summary>
/// Defer the task creation just before call await.
/// </summary>
public static UniTask Defer(Func<UniTask> factory)
{
return new UniTask(new DeferPromise(factory), 0);
}
/// <summary>
/// Defer the task creation just before call await.
/// </summary>
public static UniTask<T> Defer<T>(Func<UniTask<T>> factory)
{
return new UniTask<T>(new DeferPromise<T>(factory), 0);
}
sealed class DeferPromise : IUniTaskSource
{
Func<UniTask> factory;
UniTask task;
UniTask.Awaiter awaiter;
public DeferPromise(Func<UniTask> factory)
{
this.factory = factory;
}
public void GetResult(short token)
{
awaiter.GetResult();
}
public UniTaskStatus GetStatus(short token)
{
var f = Interlocked.Exchange(ref factory, null);
if (f == null) throw new InvalidOperationException("Can't call twice.");
task = f();
awaiter = f().GetAwaiter();
return task.Status;
}
public void OnCompleted(Action<object> continuation, object state, short token)
{
awaiter.SourceOnCompleted(continuation, state);
}
public UniTaskStatus UnsafeGetStatus()
{
return task.Status;
}
}
sealed class DeferPromise<T> : IUniTaskSource<T>
{
Func<UniTask<T>> factory;
UniTask<T> task;
UniTask<T>.Awaiter awaiter;
public DeferPromise(Func<UniTask<T>> factory)
{
this.factory = factory;
}
public T GetResult(short token)
{
return awaiter.GetResult();
}
void IUniTaskSource.GetResult(short token)
{
awaiter.GetResult();
}
public UniTaskStatus GetStatus(short token)
{
var f = Interlocked.Exchange(ref factory, null);
if (f == null) throw new InvalidOperationException("Can't call twice.");
task = f();
awaiter = f().GetAwaiter();
return task.Status;
}
public void OnCompleted(Action<object> continuation, object state, short token)
{
awaiter.SourceOnCompleted(continuation, state);
}
public UniTaskStatus UnsafeGetStatus()
{
return task.Status;
}
}
}
internal static class CompletedTasks

View File

@@ -19,11 +19,16 @@ namespace Cysharp.Threading.Tasks
return new UniTask(WaitWhilePromise.Create(predicate, timing, cancellationToken, out var token), token);
}
public static UniTask WaitUntilCanceled(CancellationToken cancellationToken, PlayerLoopTiming timing = PlayerLoopTiming.Update)
{
return new UniTask(WaitUntilCanceledPromise.Create(cancellationToken, timing, out var token), token);
}
public static UniTask<U> WaitUntilValueChanged<T, U>(T target, Func<T, U> monitorFunction, PlayerLoopTiming monitorTiming = PlayerLoopTiming.Update, IEqualityComparer<U> equalityComparer = null, CancellationToken cancellationToken = default(CancellationToken))
where T : class
{
var unityObject = target as UnityEngine.Object;
var isUnityObject = !object.ReferenceEquals(target, null); // don't use (unityObject == null)
var isUnityObject = target is UnityEngine.Object; // don't use (unityObject == null)
return new UniTask<U>(isUnityObject
? WaitUntilValueChangedUnityObjectPromise<T, U>.Create(target, monitorFunction, equalityComparer, monitorTiming, cancellationToken, out var token)
@@ -234,12 +239,98 @@ namespace Cysharp.Threading.Tasks
}
}
sealed class WaitUntilCanceledPromise : IUniTaskSource, IPlayerLoopItem, IPromisePoolItem
{
static readonly PromisePool<WaitUntilCanceledPromise> pool = new PromisePool<WaitUntilCanceledPromise>();
CancellationToken cancellationToken;
UniTaskCompletionSourceCore<object> core;
WaitUntilCanceledPromise()
{
}
public static IUniTaskSource Create(CancellationToken cancellationToken, PlayerLoopTiming timing, out short token)
{
if (cancellationToken.IsCancellationRequested)
{
return AutoResetUniTaskCompletionSource.CreateFromCanceled(cancellationToken, out token);
}
var result = pool.TryRent() ?? new WaitUntilCanceledPromise();
result.cancellationToken = cancellationToken;
TaskTracker.TrackActiveTask(result, 3);
PlayerLoopHelper.AddAction(timing, result);
token = result.core.Version;
return result;
}
public void GetResult(short token)
{
try
{
TaskTracker.RemoveTracking(this);
core.GetResult(token);
}
finally
{
pool.TryReturn(this);
}
}
public UniTaskStatus GetStatus(short token)
{
return core.GetStatus(token);
}
public UniTaskStatus UnsafeGetStatus()
{
return core.UnsafeGetStatus();
}
public void OnCompleted(Action<object> continuation, object state, short token)
{
core.OnCompleted(continuation, state, token);
}
public bool MoveNext()
{
if (cancellationToken.IsCancellationRequested)
{
core.TrySetResult(null);
return false;
}
return true;
}
public void Reset()
{
core.Reset();
cancellationToken = default;
}
~WaitUntilCanceledPromise()
{
if (pool.TryReturn(this))
{
GC.ReRegisterForFinalize(this);
}
}
}
// where T : UnityEngine.Object, can not add constraint
sealed class WaitUntilValueChangedUnityObjectPromise<T, U> : IUniTaskSource<U>, IPlayerLoopItem, IPromisePoolItem
{
static readonly PromisePool<WaitUntilValueChangedUnityObjectPromise<T, U>> pool = new PromisePool<WaitUntilValueChangedUnityObjectPromise<T, U>>();
T target;
UnityEngine.Object targetAsUnityObject;
U currentValue;
Func<T, U> monitorFunction;
IEqualityComparer<U> equalityComparer;
@@ -261,6 +352,7 @@ namespace Cysharp.Threading.Tasks
var result = pool.TryRent() ?? new WaitUntilValueChangedUnityObjectPromise<T, U>();
result.target = target;
result.targetAsUnityObject = target as UnityEngine.Object;
result.monitorFunction = monitorFunction;
result.currentValue = monitorFunction(target);
result.equalityComparer = equalityComparer ?? UnityEqualityComparer.GetDefault<U>();
@@ -309,7 +401,7 @@ namespace Cysharp.Threading.Tasks
public bool MoveNext()
{
if (cancellationToken.IsCancellationRequested || target == null) // destroyed = cancel.
if (cancellationToken.IsCancellationRequested || targetAsUnityObject == null) // destroyed = cancel.
{
core.TrySetCanceled(cancellationToken);
return false;
@@ -353,7 +445,6 @@ namespace Cysharp.Threading.Tasks
}
}
sealed class WaitUntilValueChangedStandardObjectPromise<T, U> : IUniTaskSource<U>, IPlayerLoopItem, IPromisePoolItem
where T : class
{

View File

@@ -38,14 +38,6 @@ namespace Cysharp.Threading.Tasks
{
}
public interface IResolveCancelPromise : IResolvePromise, ICancelPromise
{
}
public interface IResolveCancelPromise<T> : IResolvePromise<T>, ICancelPromise
{
}
[StructLayout(LayoutKind.Auto)]
public struct UniTaskCompletionSourceCore<TResult>
{
@@ -97,6 +89,11 @@ namespace Cysharp.Threading.Tasks
}
}
internal void MarkHandled()
{
hasUnhandledError = false;
}
/// <summary>Completes with a successful result.</summary>
/// <param name="result">The result.</param>
[DebuggerHidden]
@@ -293,12 +290,12 @@ namespace Cysharp.Threading.Tasks
}
[DebuggerHidden]
[Conditional("UNITY_EDITOR")]
internal void MarkHandled()
{
if (!handled)
{
handled = true;
core.MarkHandled();
TaskTracker.RemoveTracking(this);
}
}
@@ -504,12 +501,12 @@ namespace Cysharp.Threading.Tasks
}
[DebuggerHidden]
[Conditional("UNITY_EDITOR")]
internal void MarkHandled()
{
if (!handled)
{
handled = true;
core.MarkHandled();
TaskTracker.RemoveTracking(this);
}
}

View File

@@ -1,9 +1,8 @@
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
using Cysharp.Threading.Tasks.Linq;
using System;
using System.Threading;
using Cysharp.Threading.Tasks.Internal;
using Cysharp.Threading.Tasks.Linq;
using UnityEngine;
using UnityEngine.Events;
using UnityEngine.UI;
@@ -27,6 +26,21 @@ namespace Cysharp.Threading.Tasks
return new UnityEventHandlerAsyncEnumerable(unityEvent, cancellationToken);
}
public static AsyncUnityEventHandler<T> GetAsyncEventHandler<T>(this UnityEvent<T> unityEvent, CancellationToken cancellationToken)
{
return new AsyncUnityEventHandler<T>(unityEvent, cancellationToken, false);
}
public static UniTask<T> OnInvokeAsync<T>(this UnityEvent<T> unityEvent, CancellationToken cancellationToken)
{
return new AsyncUnityEventHandler<T>(unityEvent, cancellationToken, true).OnInvokeAsync();
}
public static IUniTaskAsyncEnumerable<T> OnInvokeAsAsyncEnumerable<T>(this UnityEvent<T> unityEvent, CancellationToken cancellationToken)
{
return new UnityEventHandlerAsyncEnumerable<T>(unityEvent, cancellationToken);
}
public static IAsyncClickEventHandler GetAsyncClickEventHandler(this Button button)
{
return new AsyncUnityEventHandler(button.onClick, button.GetCancellationTokenOnDestroy(), false);
@@ -207,6 +221,36 @@ namespace Cysharp.Threading.Tasks
return new UnityEventHandlerAsyncEnumerable<string>(inputField.onEndEdit, cancellationToken);
}
public static IAsyncValueChangedEventHandler<string> GetAsyncValueChangedEventHandler(this InputField inputField)
{
return new AsyncUnityEventHandler<string>(inputField.onValueChanged, inputField.GetCancellationTokenOnDestroy(), false);
}
public static IAsyncValueChangedEventHandler<string> GetAsyncValueChangedEventHandler(this InputField inputField, CancellationToken cancellationToken)
{
return new AsyncUnityEventHandler<string>(inputField.onValueChanged, cancellationToken, false);
}
public static UniTask<string> OnValueChangedAsync(this InputField inputField)
{
return new AsyncUnityEventHandler<string>(inputField.onValueChanged, inputField.GetCancellationTokenOnDestroy(), true).OnInvokeAsync();
}
public static UniTask<string> OnValueChangedAsync(this InputField inputField, CancellationToken cancellationToken)
{
return new AsyncUnityEventHandler<string>(inputField.onValueChanged, cancellationToken, true).OnInvokeAsync();
}
public static IUniTaskAsyncEnumerable<string> OnValueChangedAsAsyncEnumerable(this InputField inputField)
{
return new UnityEventHandlerAsyncEnumerable<string>(inputField.onValueChanged, inputField.GetCancellationTokenOnDestroy());
}
public static IUniTaskAsyncEnumerable<string> OnValueChangedAsAsyncEnumerable(this InputField inputField, CancellationToken cancellationToken)
{
return new UnityEventHandlerAsyncEnumerable<string>(inputField.onValueChanged, cancellationToken);
}
public static IAsyncValueChangedEventHandler<int> GetAsyncValueChangedEventHandler(this Dropdown dropdown)
{
return new AsyncUnityEventHandler<int>(dropdown.onValueChanged, dropdown.GetCancellationTokenOnDestroy(), false);

View File

@@ -1,5 +1,6 @@
using System;
using System.Threading;
using UnityEngine;
using UnityEngine.UI;
namespace Cysharp.Threading.Tasks
@@ -40,11 +41,6 @@ namespace Cysharp.Threading.Tasks
if (rebindOnError && !repeat)
{
repeat = true;
if (e != null)
{
await e.DisposeAsync();
}
goto BIND_AGAIN;
}
else
@@ -106,11 +102,6 @@ namespace Cysharp.Threading.Tasks
if (rebindOnError && !repeat)
{
repeat = true;
if (e != null)
{
await e.DisposeAsync();
}
goto BIND_AGAIN;
}
else
@@ -167,11 +158,6 @@ namespace Cysharp.Threading.Tasks
if (rebindOnError && !repeat)
{
repeat = true;
if (e != null)
{
await e.DisposeAsync();
}
goto BIND_AGAIN;
}
else
@@ -195,6 +181,63 @@ namespace Cysharp.Threading.Tasks
}
}
// <T> -> Action
public static void BindTo<TSource, TObject>(this IUniTaskAsyncEnumerable<TSource> source, TObject monoBehaviour, Action<TObject, TSource> bindAction, bool rebindOnError = true)
where TObject : MonoBehaviour
{
BindToCore(source, monoBehaviour, bindAction, monoBehaviour.GetCancellationTokenOnDestroy(), rebindOnError).Forget();
}
public static void BindTo<TSource, TObject>(this IUniTaskAsyncEnumerable<TSource> source, TObject bindTarget, Action<TObject, TSource> bindAction, CancellationToken cancellationToken, bool rebindOnError = true)
{
BindToCore(source, bindTarget, bindAction, cancellationToken, rebindOnError).Forget();
}
static async UniTaskVoid BindToCore<TSource, TObject>(IUniTaskAsyncEnumerable<TSource> source, TObject bindTarget, Action<TObject, TSource> bindAction, CancellationToken cancellationToken, bool rebindOnError)
{
var repeat = false;
BIND_AGAIN:
var e = source.GetAsyncEnumerator(cancellationToken);
try
{
while (true)
{
bool moveNext;
try
{
moveNext = await e.MoveNextAsync();
repeat = false;
}
catch (Exception ex)
{
if (ex is OperationCanceledException) return;
if (rebindOnError && !repeat)
{
repeat = true;
goto BIND_AGAIN;
}
else
{
throw;
}
}
if (!moveNext) return;
bindAction(bindTarget, e.Current);
}
}
finally
{
if (e != null)
{
await e.DisposeAsync();
}
}
}
// TMP
#if UNITASK_TEXTMESHPRO_SUPPORT
@@ -231,11 +274,6 @@ namespace Cysharp.Threading.Tasks
if (rebindOnError && !repeat)
{
repeat = true;
if (e != null)
{
await e.DisposeAsync();
}
goto BIND_AGAIN;
}
else

View File

@@ -1,10 +1,10 @@
{
"name": "com.cysharp.unitask",
"displayName": "UniTask",
"version": "2.0.4-rc1",
"version": "2.0.8-rc5",
"unity": "2018.3",
"description": "Provides an efficient async/await integration to Unity.",
"keywords": ["async/await", "async", "Task", "UniTask"],
"keywords": [ "async/await", "async", "Task", "UniTask" ],
"license": "MIT",
"category": "Task",
"dependencies": {}

View File

@@ -1,20 +1,16 @@
using System;
using Cysharp.Threading.Tasks;
using Cysharp.Threading.Tasks.Linq;
using Cysharp.Threading.Tasks.Triggers;
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Cysharp.Threading.Tasks;
using Unity.Collections;
using Unity.Jobs;
using UnityEngine;
using UnityEngine.Networking;
using UnityEngine.UI;
using TMPro;
public struct MyJob : IJob
{
@@ -39,6 +35,36 @@ public enum MyEnum
}
public class SimplePresenter
{
// View
public UnityEngine.UI.InputField Input;
// Presenter
public SimplePresenter()
{
//Input.OnValueChangedAsAsyncEnumerable()
// .Queue()
// .SelectAwait(async x =>
// {
// await UniTask.Delay(TimeSpan.FromSeconds(1));
// return x;
// })
// .Select(x=> x.ToUpper())
// .BindTo(
}
}
@@ -50,14 +76,43 @@ public static partial class UnityUIComponentExtensions
public class AsyncMessageBroker<T> : IDisposable
{
Channel<T> channel;
IConnectableUniTaskAsyncEnumerable<T> multicastSource;
IDisposable connection;
public AsyncMessageBroker()
{
channel = Channel.CreateSingleConsumerUnbounded<T>();
multicastSource = channel.Reader.ReadAllAsync().Publish();
connection = multicastSource.Connect();
}
public void Publish(T value)
{
channel.Writer.TryWrite(value);
}
public IUniTaskAsyncEnumerable<T> Subscribe()
{
return multicastSource;
}
public void Dispose()
{
channel.Writer.TryComplete();
connection.Dispose();
}
}
public class SandboxMain : MonoBehaviour
{
public Button okButton;
public Button cancelButton;
public Text text;
CancellationTokenSource cts;
@@ -80,6 +135,63 @@ public class SandboxMain : MonoBehaviour
}
public class Model
{
// State<int> Hp { get; }
AsyncReactiveProperty<int> hp;
IReadOnlyAsyncReactiveProperty<int> Hp => hp;
public Model()
{
// hp = new AsyncReactiveProperty<int>();
//setHp = Hp.GetSetter();
}
void Increment(int value)
{
// setHp(Hp.Value += value);
}
}
public Text text;
public Button button;
[SerializeField]
State<int> count;
void Start2()
{
count = 10;
var countS = count.GetSetter();
count.BindTo(text);
button.OnClickAsAsyncEnumerable().ForEachAsync(_ =>
{
// int foo = countS;
//countS.Set(countS += 10);
// setter.SetValue(count.Value + 10);
});
}
async UniTask RunStandardDelayAsync()
{
UnityEngine.Debug.Log("DEB");
@@ -96,9 +208,6 @@ public class SandboxMain : MonoBehaviour
var scheduled = job.Schedule();
UnityEngine.Debug.Log("OK");
await scheduled; // .ConfigureAwait(PlayerLoopTiming.Update); // .WaitAsync(PlayerLoopTiming.Update);
UnityEngine.Debug.Log("OK2");
@@ -124,153 +233,102 @@ public class SandboxMain : MonoBehaviour
}
private async UniTaskVoid HogeAsync()
{
// await is not over
await UniTaskAsyncEnumerable
.TimerFrame(10)
.ForEachAwaitAsync(async _ =>
// .ForEachAsync(_ =>
{
await UniTask.Delay(1000);
Debug.Log(Time.time);
});
Debug.Log("Done");
}
public int MyProperty { get; set; }
public class MyClass
{
public int MyProperty { get; set; }
}
MyClass mcc;
void Start()
{
Application.SetStackTraceLogType(LogType.Error, StackTraceLogType.Full);
Application.SetStackTraceLogType(LogType.Exception, StackTraceLogType.Full);
this.mcc = new MyClass();
this.MyProperty = 999;
var playerLoop = UnityEngine.LowLevel.PlayerLoop.GetCurrentPlayerLoop();
//ShowPlayerLoop.DumpPlayerLoop("Current", playerLoop);
CheckDest().Forget();
RP1 = new AsyncReactiveProperty<int>(999);
RP1.Select(x => x * x).BindTo(text);
//Update2().Forget();
//RunStandardDelayAsync().Forget();
//for (int i = 0; i < 14; i++)
//{
// TimingDump((PlayerLoopTiming)i).Forget();
//}
//StartCoroutine(CoroutineDump("yield WaitForEndOfFrame", new WaitForEndOfFrame()));
//StartCoroutine(CoroutineDump("yield WaitForFixedUpdate", new WaitForFixedUpdate()));
//StartCoroutine(CoroutineDump("yield null", null));
// -----
// RunJobAsync().Forget();
//ClickOnce().Forget();
//ClickForever().Forget();
//var cor = UniTask.ToCoroutine(async () =>
// {
// var job = new MyJob() { loopCount = 999, inOut = new NativeArray<int>(1, Allocator.TempJob) };
// JobHandle.ScheduleBatchedJobs();
// await job.Schedule().WaitAsync(PlayerLoopTiming.Update);
// job.inOut.Dispose();
// });
//StartCoroutine(cor);
// UniTaskAsyncEnumerable.EveryUpdate(PlayerLoopTiming.FixedUpdate)
//await UniTask.Yield(PlayerLoopTiming.Update);
//Debug.Log("Start:" + Time.frameCount);
//await UniTaskAsyncEnumerable.TimerFrame(3, 5, PlayerLoopTiming.PostLateUpdate)
// .Select(x => x)
// .Do(x => Debug.Log("DODODO"))
// .ForEachAsync(_ =>
//UniTaskAsyncEnumerable.EveryValueChanged(mcc, x => x.MyProperty)
// .Do(_ => { }, () => Debug.Log("COMPLETED"))
// .ForEachAsync(x =>
// {
// Debug.Log("Call:" + Time.frameCount);
// }, cancellationToken: this.GetCancellationTokenOnDestroy());
//try
//{
// await this.GetAsyncUpdateTrigger().ForEachAsync(_ =>
// {
// UnityEngine.Debug.Log("EveryUpdate:" + Time.frameCount);
// });
//}
//catch (OperationCanceledException ex)
//{
// UnityEngine.Debug.Log("END");
//}
CancellationTokenSource cts = new CancellationTokenSource();
//var trigger = this.GetAsyncUpdateTrigger();
//Go(trigger, 1, cts.Token).Forget();
//Go(trigger, 2, cts.Token).Forget();
//Go(trigger, 3, cts.Token).Forget();
//Go(trigger, 4, cts.Token).Forget();
//Go(trigger, 5, cts.Token).Forget();
Application.logMessageReceived += Application_logMessageReceived;
// foo.Status.IsCanceled
// 5回クリックされるまで待つ、とか。
//Debug.Log("Await start.");
//await okButton.GetAsyncClickEventHandler().DisableAutoClose()
// .Select((_, clickCount) => clickCount + 1)
// .FirstAsync(x => x == 5);
//Debug.Log("Click 5 times.");
// await this.GetAsyncUpdateTrigger().UpdateAsAsyncEnumerable()
// Debug.Log("VALUE_CHANGED:" + x);
// })
// .Forget();
//ucs = new UniTaskCompletionSource();
//okButton.onClick.AddListener(async () =>
//{
// await InnerAsync(false);
//});
okButton.onClick.AddListener(() =>
okButton.OnClickAsAsyncEnumerable().ForEachAsync(_ =>
{
// FooAsync().Forget();
RP1.Value += 3;
mcc.MyProperty += 10;
}).Forget();
cancelButton.OnClickAsAsyncEnumerable().ForEachAsync(_ =>
{
this.mcc = null;
});
cancelButton.onClick.AddListener(() =>
{
text.text = "";
// ucs.TrySetResult();
cts.Cancel();
});
}
static void Foo(UniTask t)
async UniTaskVoid CheckDest()
{
try
{
Debug.Log("WAIT");
await UniTask.WaitUntilValueChanged(mcc, x => x.MyProperty);
Debug.Log("CHANGED?");
}
finally
{
Debug.Log("END");
}
}
async UniTaskVoid Running(CancellationToken ct)
{
Debug.Log("BEGIN");
await UniTask.WaitUntilCanceled(ct);
Debug.Log("DONE");
}
async UniTaskVoid WaitForChannelAsync(ChannelReader<int> reader, CancellationToken token)
{
try
{
//var result1 = await reader.ReadAsync(token);
//Debug.Log(result1);
await reader.ReadAllAsync().ForEachAsync(x => Debug.Log(x)/*, token*/);
Debug.Log("done");
}
catch (Exception ex)
{
Debug.Log("here");
Debug.LogException(ex);
}
}
async UniTaskVoid Go(AsyncUpdateTrigger trigger, int i, CancellationToken ct)

View File

@@ -1,47 +0,0 @@
{
"dependencies": {
"com.unity.addressables": "1.8.3",
"com.unity.ads": "3.4.4",
"com.unity.analytics": "3.3.5",
"com.unity.collab-proxy": "1.2.16",
"com.unity.ext.nunit": "1.0.0",
"com.unity.ide.rider": "1.1.4",
"com.unity.ide.visualstudio": "1.0.5",
"com.unity.ide.vscode": "1.1.4",
"com.unity.purchasing": "2.0.6",
"com.unity.test-framework": "1.1.13",
"com.unity.textmeshpro": "2.0.1",
"com.unity.timeline": "1.2.6",
"com.unity.modules.ai": "1.0.0",
"com.unity.modules.androidjni": "1.0.0",
"com.unity.modules.animation": "1.0.0",
"com.unity.modules.assetbundle": "1.0.0",
"com.unity.modules.audio": "1.0.0",
"com.unity.modules.cloth": "1.0.0",
"com.unity.modules.director": "1.0.0",
"com.unity.modules.imageconversion": "1.0.0",
"com.unity.modules.imgui": "1.0.0",
"com.unity.modules.jsonserialize": "1.0.0",
"com.unity.modules.particlesystem": "1.0.0",
"com.unity.modules.physics": "1.0.0",
"com.unity.modules.physics2d": "1.0.0",
"com.unity.modules.screencapture": "1.0.0",
"com.unity.modules.terrain": "1.0.0",
"com.unity.modules.terrainphysics": "1.0.0",
"com.unity.modules.tilemap": "1.0.0",
"com.unity.modules.ui": "1.0.0",
"com.unity.modules.uielements": "1.0.0",
"com.unity.modules.umbra": "1.0.0",
"com.unity.modules.unityanalytics": "1.0.0",
"com.unity.modules.unitywebrequest": "1.0.0",
"com.unity.modules.unitywebrequestassetbundle": "1.0.0",
"com.unity.modules.unitywebrequestaudio": "1.0.0",
"com.unity.modules.unitywebrequesttexture": "1.0.0",
"com.unity.modules.unitywebrequestwww": "1.0.0",
"com.unity.modules.vehicles": "1.0.0",
"com.unity.modules.video": "1.0.0",
"com.unity.modules.vr": "1.0.0",
"com.unity.modules.wind": "1.0.0",
"com.unity.modules.xr": "1.0.0"
}
}