From c86f774d2c58f74f7fc83dccde89acc4824341a3 Mon Sep 17 00:00:00 2001 From: Carl Camilleri Date: Sun, 29 Aug 2021 10:21:14 +0200 Subject: [PATCH 1/2] Benchmark to test SingleRequestResponseToRemoteEntity with a local proxy --- .../Akka.Cluster.Benchmarks/Program.cs | 6 ++ .../Sharding/ShardMessageRoutingBenchmarks.cs | 61 +++++++++++++++- .../Sharding/ShardingInfrastructure.cs | 73 +++++++++++++++++++ 3 files changed, 138 insertions(+), 2 deletions(-) diff --git a/src/benchmark/Akka.Cluster.Benchmarks/Program.cs b/src/benchmark/Akka.Cluster.Benchmarks/Program.cs index 767f1c1a1d4..df08f62bf68 100644 --- a/src/benchmark/Akka.Cluster.Benchmarks/Program.cs +++ b/src/benchmark/Akka.Cluster.Benchmarks/Program.cs @@ -1,5 +1,6 @@ using System; using System.Reflection; +using BenchmarkDotNet.Configs; using BenchmarkDotNet.Running; namespace Akka.Cluster.Benchmarks @@ -8,7 +9,12 @@ class Program { static void Main(string[] args) { +#if (DEBUG) + BenchmarkSwitcher.FromAssembly(Assembly.GetExecutingAssembly()).Run(args, new DebugInProcessConfig()); +#else BenchmarkSwitcher.FromAssembly(Assembly.GetExecutingAssembly()).Run(args); +#endif + } } } \ No newline at end of file diff --git a/src/benchmark/Akka.Cluster.Benchmarks/Sharding/ShardMessageRoutingBenchmarks.cs b/src/benchmark/Akka.Cluster.Benchmarks/Sharding/ShardMessageRoutingBenchmarks.cs index 8237f350d97..11a195b2bdf 100644 --- a/src/benchmark/Akka.Cluster.Benchmarks/Sharding/ShardMessageRoutingBenchmarks.cs +++ b/src/benchmark/Akka.Cluster.Benchmarks/Sharding/ShardMessageRoutingBenchmarks.cs @@ -12,6 +12,7 @@ using Akka.Actor; using Akka.Benchmarks.Configurations; using Akka.Cluster.Sharding; +using Akka.Routing; using BenchmarkDotNet.Attributes; using static Akka.Cluster.Benchmarks.Sharding.ShardingHelper; @@ -33,6 +34,7 @@ public class ShardMessageRoutingBenchmarks private IActorRef _shardRegion1; private IActorRef _shardRegion2; + private IActorRef _localRouter; private string _entityOnSys1; private string _entityOnSys2; @@ -43,6 +45,50 @@ public class ShardMessageRoutingBenchmarks private IActorRef _batchActor; private Task _batchComplete; +#if (DEBUG) + [GlobalSetup] + public void Setup() + { + var config = StateMode switch + { + StateStoreMode.Persistence => CreatePersistenceConfig(), + StateStoreMode.DData => CreateDDataConfig(), + _ => null + }; + + _sys1 = ActorSystem.Create("BenchSys", config); + _sys2 = ActorSystem.Create("BenchSys", config); + + var c1 = Cluster.Get(_sys1); + var c2 = Cluster.Get(_sys2); + + c1.JoinAsync(c1.SelfAddress).Wait(); + c2.JoinAsync(c1.SelfAddress).Wait(); + + _shardRegion1 = StartShardRegion(_sys1); + _shardRegion2 = StartShardRegion(_sys2); + + _localRouter = _sys1.ActorOf(Props.Create(_shardRegion1).WithRouter(new RoundRobinPool(50))); + + var s1Asks = new List>(20); + var s2Asks = new List>(20); + + foreach (var i in Enumerable.Range(0, 20)) + { + s1Asks.Add(_shardRegion1.Ask(new ShardingEnvelope(i.ToString(), ShardedEntityActor.Resolve.Instance), TimeSpan.FromSeconds(3))); + s2Asks.Add(_shardRegion2.Ask(new ShardingEnvelope(i.ToString(), ShardedEntityActor.Resolve.Instance), TimeSpan.FromSeconds(3))); + } + + // wait for all Ask operations to complete + Task.WhenAll(s1Asks.Concat(s2Asks)).Wait(); + + _entityOnSys2 = s1Asks.First(x => x.Result.Addr.Equals(c2.SelfAddress)).Result.EntityId; + _entityOnSys1 = s2Asks.First(x => x.Result.Addr.Equals(c1.SelfAddress)).Result.EntityId; + + _messageToSys1 = new ShardedMessage(_entityOnSys1, 10); + _messageToSys2 = new ShardedMessage(_entityOnSys2, 10); + } +#else [GlobalSetup] public async Task Setup() { @@ -65,6 +111,8 @@ public async Task Setup() _shardRegion1 = StartShardRegion(_sys1); _shardRegion2 = StartShardRegion(_sys2); + _localRouter = _sys1.ActorOf(Props.Create(_shardRegion1).WithRouter(new RoundRobinPool(1000))); + var s1Asks = new List>(20); var s2Asks = new List>(20); @@ -83,6 +131,7 @@ public async Task Setup() _messageToSys1 = new ShardedMessage(_entityOnSys1, 10); _messageToSys2 = new ShardedMessage(_entityOnSys2, 10); } +#endif [IterationSetup] public void PerIteration() @@ -92,7 +141,7 @@ public void PerIteration() _batchActor = _sys1.ActorOf(Props.Create(() => new BulkSendActor(tcs, MsgCount))); } - [Benchmark] + // [Benchmark] public async Task SingleRequestResponseToLocalEntity() { for (var i = 0; i < MsgCount; i++) @@ -112,7 +161,15 @@ public async Task SingleRequestResponseToRemoteEntity() for (var i = 0; i < MsgCount; i++) await _shardRegion1.Ask(_messageToSys2); } - + + + [Benchmark] + public async Task SingleRequestResponseToRemoteEntityWithLocalProxy() + { + for (var i = 0; i < MsgCount; i++) + await _localRouter.Ask(new SendShardedMessage(_messageToSys2.EntityId, _messageToSys2)); + } + [Benchmark] public async Task StreamingToRemoteEntity() { diff --git a/src/benchmark/Akka.Cluster.Benchmarks/Sharding/ShardingInfrastructure.cs b/src/benchmark/Akka.Cluster.Benchmarks/Sharding/ShardingInfrastructure.cs index 300c393a193..e016ea70052 100644 --- a/src/benchmark/Akka.Cluster.Benchmarks/Sharding/ShardingInfrastructure.cs +++ b/src/benchmark/Akka.Cluster.Benchmarks/Sharding/ShardingInfrastructure.cs @@ -45,6 +45,63 @@ public ShardedEntityActor() } } + + + public sealed class ShardedProxyEntityActor : ReceiveActor, IWithUnboundedStash + { + private IActorRef _shardRegion; + private IActorRef _sender; + + + + + + public ShardedProxyEntityActor(IActorRef shardRegion) + { + _shardRegion = shardRegion; + WaitRequest(); + } + + public void WaitRequest() + { + + Receive(e => + { + _sender = Sender; + _shardRegion.Tell(e.Message); + Become(WaitResult); + + }); + + ReceiveAny(x => { + Sender.Tell(x); + }); + + + } + + + public void WaitResult() + { + Receive((msg) => { + _sender.Tell(msg); + Stash.UnstashAll(); + Become(WaitRequest); + }); + + ReceiveAny(msg => + Stash.Stash() + ); + + + } + + public IStash Stash { get; set; } + + } + + + public sealed class BulkSendActor : ReceiveActor { public sealed class BeginSend @@ -108,6 +165,22 @@ public ShardedMessage(string entityId, int message) public int Message { get; } } + + + public sealed class SendShardedMessage + { + public SendShardedMessage(string entityId, ShardedMessage message) + { + EntityId = entityId; + Message = message; + } + + public string EntityId { get; } + + public ShardedMessage Message { get; } + } + + /// /// Use a default even though it takes extra work to setup the benchmark /// From ed853814759d5d53fc6cade7e99a9c06a5396b2c Mon Sep 17 00:00:00 2001 From: Carl Camilleri Date: Sun, 29 Aug 2021 10:30:03 +0200 Subject: [PATCH 2/2] re-enable benchmark SingleRequestResponseToLocalEntity --- .../Sharding/ShardMessageRoutingBenchmarks.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/benchmark/Akka.Cluster.Benchmarks/Sharding/ShardMessageRoutingBenchmarks.cs b/src/benchmark/Akka.Cluster.Benchmarks/Sharding/ShardMessageRoutingBenchmarks.cs index 11a195b2bdf..c2fc8d0ead0 100644 --- a/src/benchmark/Akka.Cluster.Benchmarks/Sharding/ShardMessageRoutingBenchmarks.cs +++ b/src/benchmark/Akka.Cluster.Benchmarks/Sharding/ShardMessageRoutingBenchmarks.cs @@ -141,20 +141,20 @@ public void PerIteration() _batchActor = _sys1.ActorOf(Props.Create(() => new BulkSendActor(tcs, MsgCount))); } - // [Benchmark] + [Benchmark] public async Task SingleRequestResponseToLocalEntity() { for (var i = 0; i < MsgCount; i++) await _shardRegion1.Ask(_messageToSys1); } - + [Benchmark] public async Task StreamingToLocalEntity() { _batchActor.Tell(new BulkSendActor.BeginSend(_messageToSys1, _shardRegion1, BatchSize)); await _batchComplete; } - + [Benchmark] public async Task SingleRequestResponseToRemoteEntity() {