From 29dd84604b76e4b90b6a22267757d9d12bf2d9e0 Mon Sep 17 00:00:00 2001 From: Volodymyr Dombrovskyi Date: Wed, 20 Sep 2023 11:59:07 +0200 Subject: [PATCH] Ensure enqueued delegates are executed even if initially canceled --- .../TaskSchedulerBaseFixture.Scheduling.cs | 44 ++++++++++++++++++- TaskFlow/TaskFlow.cs | 2 +- TaskFlow/ThreadTaskFlow.cs | 2 +- 3 files changed, 45 insertions(+), 3 deletions(-) diff --git a/TaskFlow.Tests/TaskSchedulerBaseFixture.Scheduling.cs b/TaskFlow.Tests/TaskSchedulerBaseFixture.Scheduling.cs index 5ec24fa..f877a61 100644 --- a/TaskFlow.Tests/TaskSchedulerBaseFixture.Scheduling.cs +++ b/TaskFlow.Tests/TaskSchedulerBaseFixture.Scheduling.cs @@ -1,6 +1,7 @@ namespace TaskFlow.Tests { using NUnit.Framework; + using System.Threading.Tasks; using System.Threading.Tasks.Flow; public abstract partial class TaskSchedulerBaseFixture @@ -81,7 +82,48 @@ public void Enqueue_CanExecuteNextOperationIfPreviousCanceled() var nextTask = _sut.Enqueue(_ => Task.FromResult(42)); Assert.That(nextTask.Result, Is.EqualTo(42)); - Assert.That(canceledTask.IsCanceled, Is.True); + Assert.That(() => canceledTask.IsCanceled, Is.True.After(100, 10)); + } + + [Test] + public void Enqueue_WhenInitiallyCanceled_ShouldExecuteOperation() + { + using var completedEvent = new ManualResetEventSlim(); + using var cts = new CancellationTokenSource(); + cts.Cancel(); + var task = _sut.Enqueue(() => completedEvent.Set(), cts.Token); + + Assert.That(task.Wait(200), Is.True); + Assert.That(completedEvent.Wait(0), Is.True); + } + + [Test] + public async Task Enqueue_ExecuteInOrderIfIntermediateCanceled() + { + using var taskACompletionEvent = new ManualResetEventSlim(); + + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + var counter = 0; + var taskA = _sut.Enqueue(() => + { + taskACompletionEvent.Wait(); + return Interlocked.Increment(ref counter) == 1; + }); + var taskB = _sut.Enqueue(Task.FromCanceled, cts.Token); + var taskC = _sut.Enqueue(() => Interlocked.Increment(ref counter) == 2); + + await Task.Delay(100); + taskACompletionEvent.Set(); + + Assert.That(() => taskA.IsCompletedSuccessfully, Is.True.After(100, 10)); + Assert.That(taskA.Result, Is.True); + + Assert.That(() => taskB.IsCanceled, Is.True.After(100, 10)); + + Assert.That(() => taskC.IsCompletedSuccessfully, Is.True.After(100, 10)); + Assert.That(taskC.Result, Is.True); } } } \ No newline at end of file diff --git a/TaskFlow/TaskFlow.cs b/TaskFlow/TaskFlow.cs index a53aed4..1db851a 100644 --- a/TaskFlow/TaskFlow.cs +++ b/TaskFlow/TaskFlow.cs @@ -30,7 +30,7 @@ public override Task Enqueue(Func> taskFun var previousTask = _task; var task = Task.Factory.StartNew( () => RunAfterPrevious(taskFunc, previousTask, cancellationToken), - cancellationToken, + CancellationToken.None, TaskCreationOptions.PreferFairness, Options.TaskScheduler).Unwrap(); _task = task.ContinueWith(EmptyContinuationAction, CancellationToken.None, TaskContinuationOptions.None, Options.TaskScheduler); diff --git a/TaskFlow/ThreadTaskFlow.cs b/TaskFlow/ThreadTaskFlow.cs index 4031b94..5fe596f 100644 --- a/TaskFlow/ThreadTaskFlow.cs +++ b/TaskFlow/ThreadTaskFlow.cs @@ -33,7 +33,7 @@ public override async Task Enqueue(Func> t } var executionItem = new ExecutionItem(TaskFunc, cancellationToken); - _blockingCollection.Add(executionItem, cancellationToken); + _blockingCollection.Add(executionItem, CancellationToken.None); return await executionItem.GetTypedTask().ConfigureAwait(false); async Task TaskFunc(CancellationToken token)