diff --git a/src/core/Akka.Remote.Tests/RemoteWatcherSpec.cs b/src/core/Akka.Remote.Tests/RemoteWatcherSpec.cs index ef93ca5c304..380f23bc1d2 100644 --- a/src/core/Akka.Remote.Tests/RemoteWatcherSpec.cs +++ b/src/core/Akka.Remote.Tests/RemoteWatcherSpec.cs @@ -9,7 +9,9 @@ using System.Threading.Tasks; using Akka.Actor; using Akka.TestKit; +using Akka.TestKit.Extensions; using Akka.Util.Internal; +using FluentAssertions.Extensions; using Xunit; using Xunit.Abstractions; @@ -17,13 +19,13 @@ namespace Akka.Remote.Tests { public class RemoteWatcherSpec : AkkaSpec { - class TestActorProxy : UntypedActor + private class TestActorProxy : UntypedActor { - readonly IActorRef _testActor; + private readonly IActorRef _testActor; - public TestActorProxy(IActorRef TestActor) + public TestActorProxy(IActorRef testActor) { - _testActor = TestActor; + _testActor = testActor; } protected override void OnReceive(object message) @@ -32,14 +34,14 @@ protected override void OnReceive(object message) } } - class MyActor : UntypedActor + private class MyActor : UntypedActor { protected override void OnReceive(object message) { } } - public static TimeSpan TurnOff = TimeSpan.FromMinutes(5); + public static readonly TimeSpan TurnOff = TimeSpan.FromMinutes(5); private static IFailureDetectorRegistry
CreateFailureDetectorRegistry() { @@ -51,74 +53,59 @@ private static IFailureDetectorRegistry
CreateFailureDetectorRegistry() TimeSpan.FromSeconds(1))); } - class TestRemoteWatcher : RemoteWatcher + private class TestRemoteWatcher : RemoteWatcher { public class AddressTerm { - readonly Address _address; - public AddressTerm(Address address) { - _address = address; + Address = address; } - public Address Address - { - get { return _address; } - } + public Address Address { get; } public override bool Equals(object obj) { - var other = obj as AddressTerm; - if (other == null) return false; - return _address.Equals(other._address); + if (!(obj is AddressTerm other)) return false; + return Address.Equals(other.Address); } public override int GetHashCode() { - return _address.GetHashCode(); + return Address.GetHashCode(); } } public class Quarantined { - readonly Address _address; - readonly int? _uid; - public Quarantined(Address address, int? uid) { - _address = address; - _uid = uid; + Address = address; + Uid = uid; } - public Address Address - { - get { return _address; } - } + public Address Address { get; } - public int? Uid - { - get { return _uid; } - } + public int? Uid { get; } protected bool Equals(Quarantined other) { - return Equals(_address, other._address) && _uid == other._uid; + return Equals(Address, other.Address) && Uid == other.Uid; } public override bool Equals(object obj) { if (ReferenceEquals(null, obj)) return false; if (ReferenceEquals(this, obj)) return true; - if (obj.GetType() != this.GetType()) return false; - return Equals((Quarantined) obj); + if(!(obj is Quarantined q)) return false; + return Equals(q); } public override int GetHashCode() { unchecked { - return ((_address != null ? _address.GetHashCode() : 0)*397) ^ _uid.GetHashCode(); + return ((Address != null ? Address.GetHashCode() : 0)*397) ^ Uid.GetHashCode(); } } @@ -192,87 +179,87 @@ protected override async Task AfterAllAsync() await base.AfterAllAsync(); } - readonly ActorSystem _remoteSystem; - readonly Address _remoteAddress; - readonly RemoteWatcher.HeartbeatRsp _heartbeatRspB; + private readonly ActorSystem _remoteSystem; + private readonly Address _remoteAddress; + private readonly RemoteWatcher.HeartbeatRsp _heartbeatRspB; private int RemoteAddressUid { get { return AddressUidExtension.Uid(_remoteSystem); } } - private IInternalActorRef CreateRemoteActor(Props props, string name) + private async Task CreateRemoteActor(Props props, string name) { _remoteSystem.ActorOf(props, name); Sys.ActorSelection(new RootActorPath(_remoteAddress) / "user" / name).Tell(new Identify(name), TestActor); - return ExpectMsg().Subject.AsInstanceOf(); + return (await ExpectMsgAsync()).Subject.AsInstanceOf(); } [Fact] - public void A_RemoteWatcher_must_have_correct_interaction_when_watching() + public async Task A_RemoteWatcher_must_have_correct_interaction_when_watching() { var fd = CreateFailureDetectorRegistry(); var monitorA = Sys.ActorOf(Props.Create(), "monitor1"); //TODO: Better way to write this? - var monitorB = CreateRemoteActor(new Props(new Deploy(), typeof(TestActorProxy), new[]{TestActor}), "monitor1"); + var monitorB = await CreateRemoteActor(new Props(new Deploy(), typeof(TestActorProxy), TestActor), "monitor1"); var a1 = Sys.ActorOf(Props.Create(), "a1").AsInstanceOf(); var a2 = Sys.ActorOf(Props.Create(), "a2").AsInstanceOf(); - var b1 = CreateRemoteActor(Props.Create(), "b1"); - var b2 = CreateRemoteActor(Props.Create(), "b2"); + var b1 = await CreateRemoteActor(Props.Create(), "b1"); + var b2 = await CreateRemoteActor(Props.Create(), "b2"); monitorA.Tell(new RemoteWatcher.WatchRemote(b1, a1)); monitorA.Tell(new RemoteWatcher.WatchRemote(b2, a1)); monitorA.Tell(new RemoteWatcher.WatchRemote(b2, a2)); monitorA.Tell(RemoteWatcher.Stats.Empty, TestActor); // (a1->b1), (a1->b2), (a2->b2) - ExpectMsg(RemoteWatcher.Stats.Counts(3, 1)); - ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await ExpectMsgAsync(RemoteWatcher.Stats.Counts(3, 1)); + await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectMsg(); - ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await ExpectMsgAsync(); + await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectMsg(); - ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await ExpectMsgAsync(); + await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); monitorA.Tell(_heartbeatRspB, monitorB); monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectMsg(); - ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await ExpectMsgAsync(); + await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); monitorA.Tell(new RemoteWatcher.UnwatchRemote(b1, a1)); // still (a1->b2) and (a2->b2) left monitorA.Tell(RemoteWatcher.Stats.Empty, TestActor); - ExpectMsg(RemoteWatcher.Stats.Counts(2, 1)); - ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await ExpectMsgAsync(RemoteWatcher.Stats.Counts(2, 1)); + await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectMsg(); - ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await ExpectMsgAsync(); + await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); monitorA.Tell(new RemoteWatcher.UnwatchRemote(b2, a2)); // still (a1->b2) left monitorA.Tell(RemoteWatcher.Stats.Empty, TestActor); - ExpectMsg(RemoteWatcher.Stats.Counts(1, 1)); - ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await ExpectMsgAsync(RemoteWatcher.Stats.Counts(1, 1)); + await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectMsg(); - ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await ExpectMsgAsync(); + await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); monitorA.Tell(new RemoteWatcher.UnwatchRemote(b2, a1)); // all unwatched monitorA.Tell(RemoteWatcher.Stats.Empty, TestActor); - ExpectMsg(RemoteWatcher.Stats.Empty); - ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await ExpectMsgAsync(RemoteWatcher.Stats.Empty); + await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); // make sure nothing floods over to next test - ExpectNoMsg(TimeSpan.FromSeconds(2)); + await ExpectNoMsgAsync(TimeSpan.FromSeconds(2)); } [Fact] - public void A_RemoteWatcher_must_generate_address_terminated_when_missing_heartbeats() + public async Task A_RemoteWatcher_must_generate_address_terminated_when_missing_heartbeats() { var p = CreateTestProbe(); var q = CreateTestProbe(); @@ -280,40 +267,40 @@ public void A_RemoteWatcher_must_generate_address_terminated_when_missing_heartb Sys.EventStream.Subscribe(q.Ref, typeof(TestRemoteWatcher.Quarantined)); var monitorA = Sys.ActorOf(Props.Create(), "monitor4"); - var monitorB = CreateRemoteActor(new Props(new Deploy(), typeof(TestActorProxy), new[] { TestActor }), "monitor4"); + var monitorB = await CreateRemoteActor(new Props(new Deploy(), typeof(TestActorProxy), TestActor), "monitor4"); var a = Sys.ActorOf(Props.Create(), "a4").AsInstanceOf(); - var b = CreateRemoteActor(Props.Create(), "b4"); + var b = await CreateRemoteActor(Props.Create(), "b4"); monitorA.Tell(new RemoteWatcher.WatchRemote(b, a)); monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectMsg(); + await ExpectMsgAsync(); monitorA.Tell(_heartbeatRspB, monitorB); - ExpectNoMsg(TimeSpan.FromSeconds(1)); + await ExpectNoMsgAsync(TimeSpan.FromSeconds(1)); monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectMsg(); + await ExpectMsgAsync(); monitorA.Tell(_heartbeatRspB, monitorB); - Within(TimeSpan.FromSeconds(10), () => + await WithinAsync(10.Seconds(), async () => { - AwaitAssert(() => + await AwaitAssertAsync(async () => { monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectMsg(); + await ExpectMsgAsync(); //but no HeartbeatRsp monitorA.Tell(RemoteWatcher.ReapUnreachableTick.Instance); - p.ExpectMsg(new TestRemoteWatcher.AddressTerm(b.Path.Address), TimeSpan.FromSeconds(1)); - q.ExpectMsg(new TestRemoteWatcher.Quarantined(b.Path.Address, RemoteAddressUid), TimeSpan.FromSeconds(1)); + await p.ExpectMsgAsync(new TestRemoteWatcher.AddressTerm(b.Path.Address), TimeSpan.FromSeconds(1)); + await q.ExpectMsgAsync(new TestRemoteWatcher.Quarantined(b.Path.Address, RemoteAddressUid), TimeSpan.FromSeconds(1)); }); return true; }); - ExpectNoMsg(TimeSpan.FromSeconds(2)); + await ExpectNoMsgAsync(TimeSpan.FromSeconds(2)); } [Fact] - public void A_RemoteWatcher_must_generate_address_terminated_when_missing_first_heartbeat() + public async Task A_RemoteWatcher_must_generate_address_terminated_when_missing_first_heartbeat() { var p = CreateTestProbe(); var q = CreateTestProbe(); @@ -322,38 +309,38 @@ public void A_RemoteWatcher_must_generate_address_terminated_when_missing_first_ var fd = CreateFailureDetectorRegistry(); var heartbeatExpectedResponseAfter = TimeSpan.FromSeconds(2); - var monitorA = Sys.ActorOf(new Props(new Deploy(), typeof(TestRemoteWatcher), new object[] {heartbeatExpectedResponseAfter}), "monitor5"); - var monitorB = CreateRemoteActor(new Props(new Deploy(), typeof(TestActorProxy), new[] { TestActor }), "monitor5"); + var monitorA = Sys.ActorOf(new Props(new Deploy(), typeof(TestRemoteWatcher), heartbeatExpectedResponseAfter), "monitor5"); + var monitorB = await CreateRemoteActor(new Props(new Deploy(), typeof(TestActorProxy), TestActor), "monitor5"); var a = Sys.ActorOf(Props.Create(), "a5").AsInstanceOf(); - var b = CreateRemoteActor(Props.Create(), "b5"); + var b = await CreateRemoteActor(Props.Create(), "b5"); monitorA.Tell(new RemoteWatcher.WatchRemote(b, a)); monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectMsg(); + await ExpectMsgAsync(); // no HeartbeatRsp sent - Within(TimeSpan.FromSeconds(20), () => + await WithinAsync(20.Seconds(), async () => { - AwaitAssert(() => + await AwaitAssertAsync(async () => { monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectMsg(); + await ExpectMsgAsync(); //but no HeartbeatRsp monitorA.Tell(RemoteWatcher.ReapUnreachableTick.Instance); - p.ExpectMsg(new TestRemoteWatcher.AddressTerm(b.Path.Address), TimeSpan.FromSeconds(1)); + await p.ExpectMsgAsync(new TestRemoteWatcher.AddressTerm(b.Path.Address), TimeSpan.FromSeconds(1)); // no real quarantine when missing first heartbeat, uid unknown - q.ExpectMsg(new TestRemoteWatcher.Quarantined(b.Path.Address, null), TimeSpan.FromSeconds(1)); + await q.ExpectMsgAsync(new TestRemoteWatcher.Quarantined(b.Path.Address, null), TimeSpan.FromSeconds(1)); }); return true; }); - ExpectNoMsg(TimeSpan.FromSeconds(2)); + await ExpectNoMsgAsync(TimeSpan.FromSeconds(2)); } [Fact] - public void + public async Task A_RemoteWatcher_must_generate_address_terminated_for_new_watch_after_broken_connection_was_reestablished_and_broken_again() { var p = CreateTestProbe(); @@ -362,85 +349,86 @@ public void Sys.EventStream.Subscribe(q.Ref, typeof(TestRemoteWatcher.Quarantined)); var monitorA = Sys.ActorOf(Props.Create(), "monitor6"); - var monitorB = CreateRemoteActor(new Props(new Deploy(), typeof(TestActorProxy), new[] { TestActor }), "monitor6"); + var monitorB = await CreateRemoteActor(new Props(new Deploy(), typeof(TestActorProxy), new[] { TestActor }), "monitor6"); var a = Sys.ActorOf(Props.Create(), "a6").AsInstanceOf(); - var b = CreateRemoteActor(Props.Create(), "b6"); + var b = await CreateRemoteActor(Props.Create(), "b6"); monitorA.Tell(new RemoteWatcher.WatchRemote(b, a)); monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectMsg(); + await ExpectMsgAsync(); monitorA.Tell(_heartbeatRspB, monitorB); - ExpectNoMsg(TimeSpan.FromSeconds(1)); + await ExpectNoMsgAsync(TimeSpan.FromSeconds(1)); monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectMsg(); + await ExpectMsgAsync(); monitorA.Tell(_heartbeatRspB, monitorB); - Within(TimeSpan.FromSeconds(10), () => + await WithinAsync(10.Seconds(), async () => { - AwaitAssert(() => + await AwaitAssertAsync(async () => { monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectMsg(); + await ExpectMsgAsync(); //but no HeartbeatRsp monitorA.Tell(RemoteWatcher.ReapUnreachableTick.Instance); - p.ExpectMsg(new TestRemoteWatcher.AddressTerm(b.Path.Address), TimeSpan.FromSeconds(1)); - q.ExpectMsg(new TestRemoteWatcher.Quarantined(b.Path.Address, RemoteAddressUid), TimeSpan.FromSeconds(1)); + await p.ExpectMsgAsync(new TestRemoteWatcher.AddressTerm(b.Path.Address), TimeSpan.FromSeconds(1)); + await q.ExpectMsgAsync(new TestRemoteWatcher.Quarantined(b.Path.Address, RemoteAddressUid), + TimeSpan.FromSeconds(1)); }); return true; }); //real AddressTerminated would trigger Terminated for b6, simulate that here _remoteSystem.Stop(b); - AwaitAssert(() => + await AwaitAssertAsync(async () => { monitorA.Tell(RemoteWatcher.Stats.Empty, TestActor); - ExpectMsg(RemoteWatcher.Stats.Empty); + await ExpectMsgAsync(RemoteWatcher.Stats.Empty); }); - ExpectNoMsg(TimeSpan.FromSeconds(2)); + await ExpectNoMsgAsync(TimeSpan.FromSeconds(2)); //assume that connection comes up again, or remote system is restarted - var c = CreateRemoteActor(Props.Create(), "c6"); + var c = await CreateRemoteActor(Props.Create(), "c6"); monitorA.Tell(new RemoteWatcher.WatchRemote(c,a)); monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectMsg(); + await ExpectMsgAsync(); monitorA.Tell(_heartbeatRspB, monitorB); - ExpectNoMsg(TimeSpan.FromSeconds(1)); + await ExpectNoMsgAsync(TimeSpan.FromSeconds(1)); monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectMsg(); + await ExpectMsgAsync(); monitorA.Tell(_heartbeatRspB, monitorB); monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectMsg(); + await ExpectMsgAsync(); monitorA.Tell(RemoteWatcher.ReapUnreachableTick.Instance, TestActor); - p.ExpectNoMsg(TimeSpan.FromSeconds(1)); + await p.ExpectNoMsgAsync(TimeSpan.FromSeconds(1)); monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectMsg(); + await ExpectMsgAsync(); monitorA.Tell(_heartbeatRspB, monitorB); monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectMsg(); + await ExpectMsgAsync(); monitorA.Tell(RemoteWatcher.ReapUnreachableTick.Instance, TestActor); - p.ExpectNoMsg(TimeSpan.FromSeconds(1)); - q.ExpectNoMsg(TimeSpan.FromSeconds(1)); + await p.ExpectNoMsgAsync(TimeSpan.FromSeconds(1)); + await q.ExpectNoMsgAsync(TimeSpan.FromSeconds(1)); //then stop heartbeating again; should generate a new AddressTerminated - Within(TimeSpan.FromSeconds(10), () => + await WithinAsync(10.Seconds(), async () => { - AwaitAssert(() => + await AwaitAssertAsync(async () => { monitorA.Tell(RemoteWatcher.HeartbeatTick.Instance, TestActor); - ExpectMsg(); + await ExpectMsgAsync(); //but no HeartbeatRsp monitorA.Tell(RemoteWatcher.ReapUnreachableTick.Instance); - p.ExpectMsg(new TestRemoteWatcher.AddressTerm(b.Path.Address), TimeSpan.FromSeconds(1)); - q.ExpectMsg(new TestRemoteWatcher.Quarantined(b.Path.Address, RemoteAddressUid), TimeSpan.FromSeconds(1)); + await p.ExpectMsgAsync(new TestRemoteWatcher.AddressTerm(b.Path.Address), TimeSpan.FromSeconds(1)); + await q.ExpectMsgAsync(new TestRemoteWatcher.Quarantined(b.Path.Address, RemoteAddressUid), TimeSpan.FromSeconds(1)); }); return true; }); //make sure nothing floods over to next test - ExpectNoMsg(TimeSpan.FromSeconds(2)); + await ExpectNoMsgAsync(TimeSpan.FromSeconds(2)); } }