Skip to content

Commit

Permalink
Port Akka.Tests.Actor tests to async/await - `CoordinatedShutdown…
Browse files Browse the repository at this point in the history
…Spec` (#5770)

* Port `Akka.Tests.Actor` tests to `async/await` - `CoordinatedShutdownSpec`

* Fix CoordinatedShutdown_must_abort_if_recover_is_off

Co-authored-by: Gregorius Soedharmo <[email protected]>
  • Loading branch information
eaba and Arkatufus authored Mar 28, 2022
1 parent e4cec30 commit bf5fe07
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 39 deletions.
76 changes: 40 additions & 36 deletions src/core/Akka.Tests/Actor/CoordinatedShutdownSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
using FluentAssertions;
using Xunit;
using static Akka.Actor.CoordinatedShutdown;
using Akka.Tests.Util;
using FluentAssertions;
using FluentAssertions.Extensions;
using static FluentAssertions.FluentActions;

namespace Akka.Tests.Actor
{
Expand Down Expand Up @@ -114,12 +118,12 @@ public void CoordinatedShutdown_must_sort_phases_in_topological_order()
[Fact]
public void CoordinatedShutdown_must_detect_cycles_in_phases_non_DAG()
{
Intercept<ArgumentException>(() =>
Assert.Throws<ArgumentException>(() =>
{
CoordinatedShutdown.TopologicalSort(new Dictionary<string, Phase>() { { "a", Phase("a") } });
});

Intercept<ArgumentException>(() =>
Assert.Throws<ArgumentException>(() =>
{
CoordinatedShutdown.TopologicalSort(new Dictionary<string, Phase>()
{
Expand All @@ -128,7 +132,7 @@ public void CoordinatedShutdown_must_detect_cycles_in_phases_non_DAG()
});
});

Intercept<ArgumentException>(() =>
Assert.Throws<ArgumentException>(() =>
{
CoordinatedShutdown.TopologicalSort(new Dictionary<string, Phase>()
{
Expand All @@ -138,7 +142,7 @@ public void CoordinatedShutdown_must_detect_cycles_in_phases_non_DAG()
});
});

Intercept<ArgumentException>(() =>
Assert.Throws<ArgumentException>(() =>
{
CoordinatedShutdown.TopologicalSort(new Dictionary<string, Phase>()
{
Expand Down Expand Up @@ -171,7 +175,7 @@ public void CoordinatedShutdown_must_predefined_phases_from_config()
}

[Fact]
public void CoordinatedShutdown_must_run_ordered_phases()
public async Task CoordinatedShutdown_must_run_ordered_phases()
{
var phases = new Dictionary<string, Phase>()
{
Expand All @@ -193,12 +197,12 @@ public void CoordinatedShutdown_must_run_ordered_phases()
return TaskEx.Completed;
});

co.AddTask("b", "b2", () =>
co.AddTask("b", "b2", async () =>
{
// to verify that c is not performed before b
Task.Delay(TimeSpan.FromMilliseconds(100)).Wait();
await Task.Delay(TimeSpan.FromMilliseconds(100));
TestActor.Tell("B");
return TaskEx.Completed;
return Done.Instance;
});

co.AddTask("c", "c1", () =>
Expand All @@ -207,12 +211,12 @@ public void CoordinatedShutdown_must_run_ordered_phases()
return TaskEx.Completed;
});

co.Run(CoordinatedShutdown.UnknownReason.Instance).Wait(RemainingOrDefault);
ReceiveN(4).Should().Equal(new object[] { "A", "B", "B", "C" });
await co.Run(CoordinatedShutdown.UnknownReason.Instance).AwaitWithTimeout(RemainingOrDefault);
(await ReceiveNAsync(4, default).ToListAsync()).Should().Equal(new object[] { "A", "B", "B", "C" });
}

[Fact]
public void CoordinatedShutdown_must_run_from_given_phase()
public async Task CoordinatedShutdown_must_run_from_given_phase()
{
var phases = new Dictionary<string, Phase>()
{
Expand Down Expand Up @@ -240,13 +244,13 @@ public void CoordinatedShutdown_must_run_from_given_phase()
return TaskEx.Completed;
});

co.Run(customReason, "b").Wait(RemainingOrDefault);
ReceiveN(2).Should().Equal(new object[] { "B", "C" });
await co.Run(customReason, "b").AwaitWithTimeout(RemainingOrDefault);
(await ReceiveNAsync(2, default).ToListAsync()).Should().Equal(new object[] { "B", "C" });
co.ShutdownReason.Should().BeEquivalentTo(customReason);
}

[Fact]
public void CoordinatedShutdown_must_only_run_once()
public async Task CoordinatedShutdown_must_only_run_once()
{
var phases = new Dictionary<string, Phase>()
{
Expand All @@ -261,17 +265,17 @@ public void CoordinatedShutdown_must_only_run_once()
});

co.ShutdownReason.Should().BeNull();
co.Run(customReason).Wait(RemainingOrDefault);
await co.Run(customReason).AwaitWithTimeout(RemainingOrDefault);
co.ShutdownReason.Should().BeEquivalentTo(customReason);
ExpectMsg("A");
co.Run(CoordinatedShutdown.UnknownReason.Instance).Wait(RemainingOrDefault);
await ExpectMsgAsync("A");
await co.Run(CoordinatedShutdown.UnknownReason.Instance).AwaitWithTimeout(RemainingOrDefault);
TestActor.Tell("done");
ExpectMsg("done"); // no additional A
await ExpectMsgAsync("done"); // no additional A
co.ShutdownReason.Should().BeEquivalentTo(customReason);
}

[Fact]
public void CoordinatedShutdown_must_continue_after_timeout_or_failure()
public async Task CoordinatedShutdown_must_continue_after_timeout_or_failure()
{
var phases = new Dictionary<string, Phase>()
{
Expand Down Expand Up @@ -306,15 +310,15 @@ public void CoordinatedShutdown_must_continue_after_timeout_or_failure()
return TaskEx.Completed;
});

co.Run(CoordinatedShutdown.UnknownReason.Instance).Wait(RemainingOrDefault);
ExpectMsg("A");
ExpectMsg("A");
ExpectMsg("B");
ExpectMsg("C");
await co.Run(CoordinatedShutdown.UnknownReason.Instance).AwaitWithTimeout(RemainingOrDefault);
await ExpectMsgAsync("A");
await ExpectMsgAsync("A");
await ExpectMsgAsync("B");
await ExpectMsgAsync("C");
}

[Fact]
public void CoordinatedShutdown_must_abort_if_recover_is_off()
public async Task CoordinatedShutdown_must_abort_if_recover_is_off()
{
var phases = new Dictionary<string, Phase>()
{
Expand All @@ -335,14 +339,14 @@ public void CoordinatedShutdown_must_abort_if_recover_is_off()
return TaskEx.Completed;
});

var result = co.Run(CoordinatedShutdown.UnknownReason.Instance);
ExpectMsg("B");
Intercept<TimeoutException>(() => result.Wait(RemainingOrDefault));
ExpectNoMsg(TimeSpan.FromMilliseconds(200)); // C not run
var task = co.Run(CoordinatedShutdown.UnknownReason.Instance);
await ExpectMsgAsync("B");
await Assert.ThrowsAsync<TimeoutException>(async() => await task.AwaitWithTimeout(RemainingOrDefault));
await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); // C not run
}

[Fact]
public void CoordinatedShutdown_must_be_possible_to_add_tasks_in_later_phase_from_earlier_phase()
public async Task CoordinatedShutdown_must_be_possible_to_add_tasks_in_later_phase_from_earlier_phase()
{
var phases = new Dictionary<string, Phase>()
{
Expand All @@ -362,9 +366,9 @@ public void CoordinatedShutdown_must_be_possible_to_add_tasks_in_later_phase_fro
return TaskEx.Completed;
});

co.Run(CoordinatedShutdown.UnknownReason.Instance).Wait(RemainingOrDefault);
ExpectMsg("A");
ExpectMsg("B");
await co.Run(CoordinatedShutdown.UnknownReason.Instance).AwaitWithTimeout(RemainingOrDefault);
await ExpectMsgAsync("A");
await ExpectMsgAsync("B");
}

[Fact]
Expand Down Expand Up @@ -392,10 +396,10 @@ public void CoordinatedShutdown_must_be_possible_to_parse_phases_from_config()
}

[Fact]
public void CoordinatedShutdown_must_terminate_ActorSystem()
public async Task CoordinatedShutdown_must_terminate_ActorSystem()
{
var shutdownSystem = CoordinatedShutdown.Get(Sys).Run(customReason);
shutdownSystem.Wait(TimeSpan.FromSeconds(10)).Should().BeTrue();
(await CoordinatedShutdown.Get(Sys).Run(customReason)
.AwaitWithTimeout(TimeSpan.FromSeconds(10))).Should().BeTrue();

Sys.WhenTerminated.IsCompleted.Should().BeTrue();
CoordinatedShutdown.Get(Sys).ShutdownReason.Should().BeEquivalentTo(customReason);
Expand Down
25 changes: 22 additions & 3 deletions src/core/Akka.Tests/Util/TaskHelpers.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;

namespace Akka.Tests.Util
Expand All @@ -7,9 +9,26 @@ public static class TaskHelpers
{
public static async Task<bool> AwaitWithTimeout(this Task parentTask, TimeSpan timeout)
{
var delayed = Task.Delay(timeout);
await Task.WhenAny(delayed, parentTask);
return parentTask.IsCompleted;
using (var cts = new CancellationTokenSource())
{
try
{
var delayed = Task.Delay(timeout, cts.Token);
var returnedTask = await Task.WhenAny(delayed, parentTask);

if(returnedTask == parentTask && returnedTask.Exception != null)
{
var flattened = returnedTask.Exception.Flatten();
ExceptionDispatchInfo.Capture(flattened.InnerException).Throw();
}

return parentTask.IsCompleted;
}
finally
{
cts.Cancel();
}
}
}
}
}

0 comments on commit bf5fe07

Please sign in to comment.