Skip to content

Commit

Permalink
Ensure enqueued delegates are executed even if initially canceled
Browse files Browse the repository at this point in the history
  • Loading branch information
dombrovsky committed Sep 20, 2023
1 parent 783cc30 commit 29dd846
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 3 deletions.
44 changes: 43 additions & 1 deletion TaskFlow.Tests/TaskSchedulerBaseFixture.Scheduling.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace TaskFlow.Tests
{
using NUnit.Framework;
using System.Threading.Tasks;
using System.Threading.Tasks.Flow;

public abstract partial class TaskSchedulerBaseFixture<T>
Expand Down Expand Up @@ -81,7 +82,48 @@ public void Enqueue_CanExecuteNextOperationIfPreviousCanceled()
var nextTask = _sut.Enqueue(_ => Task.FromResult(42));

Assert.That(nextTask.Result, Is.EqualTo(42));
Assert.That(canceledTask.IsCanceled, Is.True);
Assert.That(() => canceledTask.IsCanceled, Is.True.After(100, 10));
}

[Test]
public void Enqueue_WhenInitiallyCanceled_ShouldExecuteOperation()
{
using var completedEvent = new ManualResetEventSlim();
using var cts = new CancellationTokenSource();
cts.Cancel();
var task = _sut.Enqueue(() => completedEvent.Set(), cts.Token);

Assert.That(task.Wait(200), Is.True);
Assert.That(completedEvent.Wait(0), Is.True);
}

[Test]
public async Task Enqueue_ExecuteInOrderIfIntermediateCanceled()
{
using var taskACompletionEvent = new ManualResetEventSlim();

using var cts = new CancellationTokenSource();
cts.Cancel();

var counter = 0;
var taskA = _sut.Enqueue(() =>
{
taskACompletionEvent.Wait();
return Interlocked.Increment(ref counter) == 1;
});
var taskB = _sut.Enqueue(Task.FromCanceled, cts.Token);
var taskC = _sut.Enqueue(() => Interlocked.Increment(ref counter) == 2);

await Task.Delay(100);
taskACompletionEvent.Set();

Assert.That(() => taskA.IsCompletedSuccessfully, Is.True.After(100, 10));
Assert.That(taskA.Result, Is.True);

Assert.That(() => taskB.IsCanceled, Is.True.After(100, 10));

Assert.That(() => taskC.IsCompletedSuccessfully, Is.True.After(100, 10));
Assert.That(taskC.Result, Is.True);
}
}
}
2 changes: 1 addition & 1 deletion TaskFlow/TaskFlow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public override Task<T> Enqueue<T>(Func<CancellationToken, ValueTask<T>> taskFun
var previousTask = _task;
var task = Task.Factory.StartNew(
() => RunAfterPrevious(taskFunc, previousTask, cancellationToken),
cancellationToken,
CancellationToken.None,
TaskCreationOptions.PreferFairness,
Options.TaskScheduler).Unwrap();
_task = task.ContinueWith(EmptyContinuationAction, CancellationToken.None, TaskContinuationOptions.None, Options.TaskScheduler);
Expand Down
2 changes: 1 addition & 1 deletion TaskFlow/ThreadTaskFlow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public override async Task<T> Enqueue<T>(Func<CancellationToken, ValueTask<T>> t
}

var executionItem = new ExecutionItem(TaskFunc, cancellationToken);
_blockingCollection.Add(executionItem, cancellationToken);
_blockingCollection.Add(executionItem, CancellationToken.None);
return await executionItem.GetTypedTask<T>().ConfigureAwait(false);

async Task<object?> TaskFunc(CancellationToken token)
Expand Down

0 comments on commit 29dd846

Please sign in to comment.