Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Async TestKit] Convert Akka.Remote.Tests to async - RemoteRouterSpec #5887

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/core/Akka.Remote.Tests/Akka.Remote.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

<PropertyGroup>
<AssemblyTitle>Akka.Remote.Tests</AssemblyTitle>
<TargetFrameworks>$(NetFrameworkTestVersion);$(NetTestVersion);$(NetCoreTestVersion)</TargetFrameworks>
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
<LangVersion>8.0</LangVersion>
<TargetFrameworks>$(NetTestVersion);$(NetCoreTestVersion)</TargetFrameworks>
<LangVersion>8.0</LangVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
203 changes: 98 additions & 105 deletions src/core/Akka.Remote.Tests/RemoteRouterSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,22 @@ private class Parent : UntypedActor
{
protected override void OnReceive(object message)
{
var tuple = message as (Props, string)?;
if (tuple != null)
if (message is ValueTuple<Props, string> tuple)
{
Sender.Tell(Context.ActorOf(tuple.Value.Item1, tuple.Value.Item2));
Sender.Tell(Context.ActorOf(tuple.Item1, tuple.Item2));
}
}
}

private Props EchoActorProps
{
get { return Props.Create<Echo>(); }
}
private static Props EchoActorProps => Props.Create<Echo>();

#endregion

private int port;
private string sysName;
private Config conf;
private ActorSystem masterSystem;
private Address intendedRemoteAddress;
private readonly int _port;
private readonly string _sysName;
private readonly Config _conf;
private readonly ActorSystem _masterSystem;
private readonly Address _intendedRemoteAddress;

public RemoteRouterSpec(ITestOutputHelper output)
: base(@"
Expand All @@ -86,9 +82,9 @@ public RemoteRouterSpec(ITestOutputHelper output)
", output)
{
// ReSharper disable once PossibleInvalidOperationException
port = Sys.AsInstanceOf<ExtendedActorSystem>().Provider.DefaultAddress.Port.Value;
sysName = Sys.Name;
conf = ConfigurationFactory.ParseString(@"
_port = Sys.AsInstanceOf<ExtendedActorSystem>().Provider.DefaultAddress.Port.Value;
_sysName = Sys.Name;
_conf = ConfigurationFactory.ParseString(@"
akka {
actor.deployment {
/blub {
Expand Down Expand Up @@ -123,233 +119,230 @@ public RemoteRouterSpec(ITestOutputHelper output)
}
}
}
".Replace("${masterSysName}", "Master" + sysName).Replace("${sysName}", sysName).Replace("${port}", port.ToString())).WithFallback(Sys.Settings.Config);
".Replace("${masterSysName}", "Master" + _sysName).Replace("${sysName}", _sysName).Replace("${port}", _port.ToString())).WithFallback(Sys.Settings.Config);

masterSystem = ActorSystem.Create("Master" + sysName, conf);
_masterSystem = ActorSystem.Create("Master" + _sysName, _conf);

intendedRemoteAddress = Address.Parse("akka.tcp://${sysName}@127.0.0.1:${port}"
.Replace("${sysName}", sysName)
.Replace("${port}", port.ToString()));
_intendedRemoteAddress = Address.Parse("akka.tcp://${sysName}@127.0.0.1:${port}"
.Replace("${sysName}", _sysName)
.Replace("${port}", _port.ToString()));
}

protected override async Task AfterTerminationAsync()
protected override async Task AfterAllAsync()
{
await ShutdownAsync(masterSystem);
await ShutdownAsync(_masterSystem);
await base.AfterAllAsync();
}

private IEnumerable<ActorPath> CollectRouteePaths(TestProbe probe, IActorRef router, int n)
private async IAsyncEnumerable<ActorPath> CollectRouteePaths(TestProbe probe, IActorRef router, int n)
{
List<ActorPath> list = new List<ActorPath>();

for (var i = 1; i <= n; i++)
{
string msg = i.ToString();
var msg = i.ToString();
router.Tell(msg, probe.Ref);
probe.ExpectMsg(msg);
list.Add(probe.LastSender.Path);
await probe.ExpectMsgAsync(msg);
yield return probe.LastSender.Path;
}

return list;
}

[Fact]
public void RemoteRouter_must_deploy_its_children_on_remote_host_driven_by_configuration()
public async Task RemoteRouter_must_deploy_its_children_on_remote_host_driven_by_configuration()
{
var probe = CreateTestProbe(masterSystem);
var router = masterSystem.ActorOf(new RoundRobinPool(2).Props(EchoActorProps), "blub");
var replies = CollectRouteePaths(probe, router, 5);
var probe = CreateTestProbe(_masterSystem);
var router = _masterSystem.ActorOf(new RoundRobinPool(2).Props(EchoActorProps), "blub");
var replies = await CollectRouteePaths(probe, router, 5).ToListAsync();
var children = new HashSet<ActorPath>(replies);
children.Should().HaveCount(2);
children.Select(x => x.Parent).Distinct().Should().HaveCount(1);
children.ForEach(x => x.Address.Should().Be(intendedRemoteAddress));
masterSystem.Stop(router);
children.ForEach(x => x.Address.Should().Be(_intendedRemoteAddress));
_masterSystem.Stop(router);
}

[Fact]
public void RemoteRouter_must_deploy_its_children_on_remote_host_driven_by_programmatic_definition()
public async Task RemoteRouter_must_deploy_its_children_on_remote_host_driven_by_programmatic_definition()
{
var probe = CreateTestProbe(masterSystem);
var router = masterSystem.ActorOf(new RemoteRouterConfig(
var probe = CreateTestProbe(_masterSystem);
var router = _masterSystem.ActorOf(new RemoteRouterConfig(
new RoundRobinPool(2),
new[] { new Address("akka.tcp", sysName, "127.0.0.1", port) })
new[] { new Address("akka.tcp", _sysName, "127.0.0.1", _port) })
.Props(EchoActorProps), "blub2");
var replies = CollectRouteePaths(probe, router, 5);
var replies = await CollectRouteePaths(probe, router, 5).ToListAsync();
var children = new HashSet<ActorPath>(replies);
children.Should().HaveCount(2);
children.Select(x => x.Parent).Distinct().Should().HaveCount(1);
children.ForEach(x => x.Address.Should().Be(intendedRemoteAddress));
masterSystem.Stop(router);
children.ForEach(x => x.Address.Should().Be(_intendedRemoteAddress));
_masterSystem.Stop(router);
}

[Fact]
public void RemoteRouter_must_deploy_dynamic_resizable_number_of_children_on_remote_host_driven_by_configuration()
public async Task RemoteRouter_must_deploy_dynamic_resizable_number_of_children_on_remote_host_driven_by_configuration()
{
var probe = CreateTestProbe(masterSystem);
var router = masterSystem.ActorOf(FromConfig.Instance.Props(EchoActorProps), "elastic-blub");
var replies = CollectRouteePaths(probe, router, 5);
var probe = CreateTestProbe(_masterSystem);
var router = _masterSystem.ActorOf(FromConfig.Instance.Props(EchoActorProps), "elastic-blub");
var replies = await CollectRouteePaths(probe, router, 5).ToListAsync();
var children = new HashSet<ActorPath>(replies);
children.Should().HaveCount(2);
children.Select(x => x.Parent).Distinct().Should().HaveCount(1);
children.ForEach(x => x.Address.Should().Be(intendedRemoteAddress));
masterSystem.Stop(router);
children.ForEach(x => x.Address.Should().Be(_intendedRemoteAddress));
_masterSystem.Stop(router);
}

[Fact]
public void RemoteRouter_must_deploy_remote_routers_based_on_configuration()
public async Task RemoteRouter_must_deploy_remote_routers_based_on_configuration()
{
var probe = CreateTestProbe(masterSystem);
var router = masterSystem.ActorOf(FromConfig.Instance.Props(EchoActorProps), "remote-blub");
router.Path.Address.Should().Be(intendedRemoteAddress);
var probe = CreateTestProbe(_masterSystem);
var router = _masterSystem.ActorOf(FromConfig.Instance.Props(EchoActorProps), "remote-blub");
router.Path.Address.Should().Be(_intendedRemoteAddress);

var replies = CollectRouteePaths(probe, router, 5);
var replies = await CollectRouteePaths(probe, router, 5).ToListAsync();
var children = new HashSet<ActorPath>(replies);
children.Should().HaveCount(2);

var parents = children.Select(x => x.Parent).Distinct().ToList();
parents.Should().HaveCount(1);
parents.Head().Should().Be(router.Path);

children.ForEach(x => x.Address.Should().Be(intendedRemoteAddress));
masterSystem.Stop(router);
children.ForEach(x => x.Address.Should().Be(_intendedRemoteAddress));
_masterSystem.Stop(router);
}

[Fact]
public void RemoteRouter_must_deploy_remote_routers_based_on_explicit_deployment()
public async Task RemoteRouter_must_deploy_remote_routers_based_on_explicit_deployment()
{
var probe = CreateTestProbe(masterSystem);
var router = masterSystem.ActorOf(new RoundRobinPool(2)
var probe = CreateTestProbe(_masterSystem);
var router = _masterSystem.ActorOf(new RoundRobinPool(2)
.Props(EchoActorProps)
.WithDeploy(new Deploy(new RemoteScope(intendedRemoteAddress))), "remote-blub2");
.WithDeploy(new Deploy(new RemoteScope(_intendedRemoteAddress))), "remote-blub2");

router.Path.Address.Should().Be(intendedRemoteAddress);
router.Path.Address.Should().Be(_intendedRemoteAddress);

var replies = CollectRouteePaths(probe, router, 5);
var replies = await CollectRouteePaths(probe, router, 5).ToListAsync();
var children = new HashSet<ActorPath>(replies);
children.Should().HaveCount(2);

var parents = children.Select(x => x.Parent).Distinct().ToList();
parents.Should().HaveCount(1);
parents.Head().Should().Be(router.Path);

children.ForEach(x => x.Address.Should().Be(intendedRemoteAddress));
masterSystem.Stop(router);
children.ForEach(x => x.Address.Should().Be(_intendedRemoteAddress));
_masterSystem.Stop(router);
}

[Fact]
public void RemoteRouter_must_let_remote_deployment_be_overridden_by_local_configuration()
public async Task RemoteRouter_must_let_remote_deployment_be_overridden_by_local_configuration()
{
var probe = CreateTestProbe(masterSystem);
var router = masterSystem.ActorOf(
var probe = CreateTestProbe(_masterSystem);
var router = _masterSystem.ActorOf(
new RoundRobinPool(2)
.Props(EchoActorProps)
.WithDeploy(new Deploy(new RemoteScope(intendedRemoteAddress))), "local-blub");
router.Path.Address.ToString().Should().Be($"akka://{masterSystem.Name}");
.WithDeploy(new Deploy(new RemoteScope(_intendedRemoteAddress))), "local-blub");
router.Path.Address.ToString().Should().Be($"akka://{_masterSystem.Name}");

var replies = CollectRouteePaths(probe, router, 5);
var replies = await CollectRouteePaths(probe, router, 5).ToListAsync();
var children = new HashSet<ActorPath>(replies);
children.Should().HaveCount(2);

var parents = children.Select(x => x.Parent).Distinct().ToList();
parents.Should().HaveCount(1);
parents.Head().Address.Should().Be(new Address("akka.tcp", sysName, "127.0.0.1", port));
parents.Head().Address.Should().Be(new Address("akka.tcp", _sysName, "127.0.0.1", _port));

children.ForEach(x => x.Address.Should().Be(intendedRemoteAddress));
masterSystem.Stop(router);
children.ForEach(x => x.Address.Should().Be(_intendedRemoteAddress));
_masterSystem.Stop(router);
}

[Fact]
public void RemoteRouter_must_let_remote_deployment_router_be_overridden_by_local_configuration()
public async Task RemoteRouter_must_let_remote_deployment_router_be_overridden_by_local_configuration()
{
var probe = CreateTestProbe(masterSystem);
var router = masterSystem.ActorOf(
var probe = CreateTestProbe(_masterSystem);
var router = _masterSystem.ActorOf(
new RoundRobinPool(2)
.Props(EchoActorProps)
.WithDeploy(new Deploy(new RemoteScope(intendedRemoteAddress))), "local-blub2");
.WithDeploy(new Deploy(new RemoteScope(_intendedRemoteAddress))), "local-blub2");

router.Path.Address.Should().Be(intendedRemoteAddress);
router.Path.Address.Should().Be(_intendedRemoteAddress);

var replies = CollectRouteePaths(probe, router, 5);
var replies = await CollectRouteePaths(probe, router, 5).ToListAsync();
var children = new HashSet<ActorPath>(replies);
children.Should().HaveCount(4);

var parents = children.Select(x => x.Parent).Distinct().ToList();
parents.Should().HaveCount(1);
parents.Head().Address.Should().Be(new Address("akka.tcp", sysName, "127.0.0.1", port));
parents.Head().Address.Should().Be(new Address("akka.tcp", _sysName, "127.0.0.1", _port));

children.ForEach(x => x.Address.Should().Be(intendedRemoteAddress));
masterSystem.Stop(router);
children.ForEach(x => x.Address.Should().Be(_intendedRemoteAddress));
_masterSystem.Stop(router);
}

[Fact]
public void RemoteRouter_must_let_remote_deployment_be_overridden_by_remote_configuration()
public async Task RemoteRouter_must_let_remote_deployment_be_overridden_by_remote_configuration()
{
var probe = CreateTestProbe(masterSystem);
var router = masterSystem.ActorOf(
var probe = CreateTestProbe(_masterSystem);
var router = _masterSystem.ActorOf(
new RoundRobinPool(2)
.Props(EchoActorProps)
.WithDeploy(new Deploy(new RemoteScope(intendedRemoteAddress))), "remote-override");
.WithDeploy(new Deploy(new RemoteScope(_intendedRemoteAddress))), "remote-override");

router.Path.Address.Should().Be(intendedRemoteAddress);
router.Path.Address.Should().Be(_intendedRemoteAddress);

var replies = CollectRouteePaths(probe, router, 5);
var replies = await CollectRouteePaths(probe, router, 5).ToListAsync();
var children = new HashSet<ActorPath>(replies);
children.Should().HaveCount(4);

var parents = children.Select(x => x.Parent).Distinct().ToList();
parents.Should().HaveCount(1);
parents.Head().Address.Should().Be(router.Path.Address);

children.ForEach(x => x.Address.Should().Be(intendedRemoteAddress));
masterSystem.Stop(router);
children.ForEach(x => x.Address.Should().Be(_intendedRemoteAddress));
_masterSystem.Stop(router);
}

[Fact]
public async Task RemoteRouter_must_set_supplied_SupervisorStrategy()
{
var probe = CreateTestProbe(masterSystem);
var probe = CreateTestProbe(_masterSystem);
var escalator = new OneForOneStrategy(ex =>
{
probe.Ref.Tell(ex);
return Directive.Escalate;
});

var router = masterSystem.ActorOf(new RemoteRouterConfig(
var router = _masterSystem.ActorOf(new RemoteRouterConfig(
new RoundRobinPool(1, null, escalator, Dispatchers.DefaultDispatcherId),
new[] { new Address("akka.tcp", sysName, "127.0.0.1", port) }).Props(Props.Empty), "blub3");
new[] { new Address("akka.tcp", _sysName, "127.0.0.1", _port) }).Props(Props.Empty), "blub3");

router.Tell(new GetRoutees(), probe.Ref);

// Need to be able to bind EventFilter to additional actor system (masterActorSystem in this case) before this code works
// EventFilter.Exception<ActorKilledException>().ExpectOne(() =>
probe.ExpectMsg<Routees>(TimeSpan.FromSeconds(10)).Members.Head().Send(Kill.Instance, TestActor);
(await probe.ExpectMsgAsync<Routees>(TimeSpan.FromSeconds(10))).Members.Head().Send(Kill.Instance, TestActor);
//);
probe.ExpectMsg<ActorKilledException>(TimeSpan.FromSeconds(10));
await probe.ExpectMsgAsync<ActorKilledException>(TimeSpan.FromSeconds(10));
}

[Fact(Skip = "Remote actor's DCN is currently not supported")]
public void RemoteRouter_must_load_settings_from_config_for_local_router()
public async Task RemoteRouter_must_load_settings_from_config_for_local_router()
{
var probe = CreateTestProbe(masterSystem);
var router = masterSystem.ActorOf(FromConfig.Instance.Props(EchoActorProps), "round");
var replies = CollectRouteePaths(probe, router, 10);
var probe = CreateTestProbe(_masterSystem);
var router = _masterSystem.ActorOf(FromConfig.Instance.Props(EchoActorProps), "round");
var replies = await CollectRouteePaths(probe, router, 10).ToListAsync();
var children = new HashSet<ActorPath>(replies);
children.Should().HaveCount(5);
masterSystem.Stop(router);
_masterSystem.Stop(router);
}

[Fact(Skip = "Remote actor's DCN is currently not supported")]
public void RemoteRouter_must_load_settings_from_config_for_local_child_router_of_system_actor()
public async Task RemoteRouter_must_load_settings_from_config_for_local_child_router_of_system_actor()
{
// we don't really support deployment configuration of system actors, but
// it's used for the pool of the SimpleDnsManager "/IO-DNS/inet-address"
var probe = CreateTestProbe(masterSystem);
var parent = ((ExtendedActorSystem)masterSystem).SystemActorOf(FromConfig.Instance.Props(Props.Create<Parent>()), "sys-parent");
var probe = CreateTestProbe(_masterSystem);
var parent = ((ExtendedActorSystem)_masterSystem).SystemActorOf(FromConfig.Instance.Props(Props.Create<Parent>()), "sys-parent");
parent.Tell((FromConfig.Instance.Props(EchoActorProps), "round"), probe);
var router = probe.ExpectMsg<IActorRef>();
var replies = CollectRouteePaths(probe, router, 10);
var replies = await CollectRouteePaths(probe, router, 10).ToListAsync();
var children = new HashSet<ActorPath>(replies);
children.Should().HaveCount(6);
masterSystem.Stop(router);
_masterSystem.Stop(router);
}
}
}
Expand Down