diff --git a/docs/articles/actors/dispatchers.md b/docs/articles/actors/dispatchers.md index 73ee10f96b9..e7d2bbebb65 100644 --- a/docs/articles/actors/dispatchers.md +++ b/docs/articles/actors/dispatchers.md @@ -74,6 +74,7 @@ Some dispatcher configurations are available out-of-the-box for convenience. You * **task-dispatcher** - A configuration that uses the [TaskDispatcher](#taskdispatcher). * **default-fork-join-dispatcher** - A configuration that uses the [ForkJoinDispatcher](#forkjoindispatcher). * **synchronized-dispatcher** - A configuration that uses the [SynchronizedDispatcher](#synchronizeddispatcher). +* **channel-executor** - new as of v1.4.19, the [`ChannelExecutor`](#channelexecutor) is used to run on top of the .NET `ThreadPool` and allow Akka.NET to dynamically scale thread usage up and down with demand in exchange for better CPU and throughput performance. ## Built-in Dispatchers @@ -165,6 +166,63 @@ private void Form1_Load(object sender, System.EventArgs e) } ``` +### `ChannelExecutor` +In Akka.NET v1.4.19 we will be introducing an opt-in feature, the `ChannelExecutor` - a new dispatcher type that re-uses the same configuration as a `ForkJoinDispatcher` but runs entirely on top of the .NET `ThreadPool` and is able to take advantage of dynamic thread pool scaling to size / resize workloads on the fly. + +During its initial development and benchmarks, we observed the following: + +1. The `ChannelExecutor` tremendously reduced idle CPU and max busy CPU even during peak message throughput, primarily as a result of dynamically shrinking the total `ThreadPool` to only the necessary size. This resolves one of the largest complaints large users of Akka.NET have today. However, **in order for this setting to be effective `ThreadPool.SetMin(0,0)` must also be set**. We are considering doing this inside the `ActorSystem.Create` method, but if you those settings don't work for you you can easily override them by simply calling `ThreadPool.SetMin(yourValue, yourValue)` again after `ActorSystem.Create` has exited. +2. The `ChannelExecutor` actually beat the `ForkJoinDispatcher` and others on performance even in environments like Docker and bare metal on Windows. + +> [!NOTE] +> We are in the process of gathering data from users on how well `ChannelExecutor` performs in the real world. If you are interested in trying out the `ChannelExecutor`, please read the directions in this document and then comment on [the "Akka.NET v1.4.19: ChannelExecutor performance data" discussion thread](https://github.com/akkadotnet/akka.net/discussions/4983). + +The `ChannelExectuor` re-uses the same threading settings as the `ForkJoinExecutor` to determine its effective upper and lower parallelism limits, and you can configure the `ChannelExecutor` to run inside your `ActorSystem` via the following HOCON configuration: + +``` +akka.actor.default-dispatcher = { + executor = channel-executor + fork-join-executor { #channelexecutor will re-use these settings + parallelism-min = 2 + parallelism-factor = 1 + parallelism-max = 64 + } +} + +akka.actor.internal-dispatcher = { + executor = channel-executor + throughput = 5 + fork-join-executor { + parallelism-min = 4 + parallelism-factor = 1.0 + parallelism-max = 64 + } +} + +akka.remote.default-remote-dispatcher { + type = Dispatcher + executor = channel-executor + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 0.5 + parallelism-max = 16 + } +} + +akka.remote.backoff-remote-dispatcher { + executor = channel-executor + fork-join-executor { + parallelism-min = 2 + parallelism-max = 2 + } +} +``` + +This will enable the `ChannelExecutor` to run everywhere and all Akka.NET loads, with the exception of anything you manually allocate onto a `ForkJoinDispatcher` or `PinnedDispatcher`, will be managed by the `ThreadPool`. + +> [!IMPORTANT] +> As of Akka.NET v1.4.19, we call `ThreadPool.SetMinThreads(0,0)` inside the `ActorSystem.Create` method as we've found that the default `ThreadPool` minimum values have a negative impact on performance. However, if this causes undesireable side effects for you inside your application you can always override those settings by calling `ThreadPool.SetMinThreads(yourValue, yourValue)` again after you've created your `ActorSystem`. + #### Common Dispatcher Configuration The following configuration keys are available for any dispatcher configuration: diff --git a/docs/articles/clustering/cluster-singleton.md b/docs/articles/clustering/cluster-singleton.md index 7b31fcdd8a6..37c3e60a267 100644 --- a/docs/articles/clustering/cluster-singleton.md +++ b/docs/articles/clustering/cluster-singleton.md @@ -34,9 +34,6 @@ This pattern may seem to be very tempting to use at first, but it has several dr Especially the last point is something you should be aware of — in general when using the Cluster Singleton pattern you should take care of downing nodes yourself and not rely on the timing based auto-down feature. -> [!WARNING] -> Be very careful when using Cluster Singleton together with Automatic Downing, since it allows the cluster to split up into two separate clusters, which in turn will result in `multiple Singletons` being started, one in each separate cluster! - ## An Example Assume that we need one single entry point to an external system. An actor that receives messages from a JMS queue with the strict requirement that only one JMS consumer must exist to be make sure that the messages are processed in order. That is perhaps not how one would like to design things, but a typical real-world scenario when integrating with external systems. @@ -111,4 +108,4 @@ akka.cluster.singleton-proxy { # Maximum allowed buffer size is 10000. buffer-size = 1000 } -``` \ No newline at end of file +``` diff --git a/src/benchmark/Akka.Benchmarks/Actor/PingPongBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Actor/PingPongBenchmarks.cs index 96c8c1ffc0e..3b6ca37a3b8 100644 --- a/src/benchmark/Akka.Benchmarks/Actor/PingPongBenchmarks.cs +++ b/src/benchmark/Akka.Benchmarks/Actor/PingPongBenchmarks.cs @@ -9,6 +9,7 @@ using System.Threading.Tasks; using Akka.Actor; using Akka.Benchmarks.Configurations; +using Akka.Configuration; using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Engines; diff --git a/src/benchmark/RemotePingPong/Program.cs b/src/benchmark/RemotePingPong/Program.cs index f2ff92aac82..969e48315e5 100644 --- a/src/benchmark/RemotePingPong/Program.cs +++ b/src/benchmark/RemotePingPong/Program.cs @@ -44,7 +44,7 @@ public static uint CpuSpeed() public static Config CreateActorSystemConfig(string actorSystemName, string ipOrHostname, int port) { var baseConfig = ConfigurationFactory.ParseString(@" - akka { + akka { actor.provider = remote loglevel = ERROR suppress-json-serializer-warning = on @@ -57,6 +57,7 @@ public static Config CreateActorSystemConfig(string actorSystemName, string ipOr port = 0 hostname = ""localhost"" } + } }"); diff --git a/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs b/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs index 04c3f332fe7..444dfbf8987 100644 --- a/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs +++ b/src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs @@ -74,6 +74,7 @@ public StressSpecConfig() convergence-within-factor = 1.0 } akka.actor.provider = cluster + akka.cluster { failure-detector.acceptable-heartbeat-pause = 3s downing-provider-class = ""Akka.Cluster.SplitBrainResolver, Akka.Cluster"" @@ -86,10 +87,29 @@ public StressSpecConfig() akka.loggers = [""Akka.TestKit.TestEventListener, Akka.TestKit""] akka.loglevel = INFO akka.remote.log-remote-lifecycle-events = off - akka.actor.default-dispatcher.fork-join-executor { - parallelism - min = 8 - parallelism - max = 8 + akka.actor.default-dispatcher = { + executor = channel-executor + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 1 + parallelism-max = 64 + } } + akka.actor.internal-dispatcher = { + executor = channel-executor + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 1 + parallelism-max = 64 + } + } +akka.remote.default-remote-dispatcher { + executor = channel-executor + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 0.5 + parallelism-max = 16 + } "); TestTransport = true; diff --git a/src/core/Akka.Remote/Configuration/Remote.conf b/src/core/Akka.Remote/Configuration/Remote.conf index 364108afb2c..246a6f1ddca 100644 --- a/src/core/Akka.Remote/Configuration/Remote.conf +++ b/src/core/Akka.Remote/Configuration/Remote.conf @@ -578,23 +578,20 @@ akka { ### Default dispatcher for the remoting subsystem - ### Default dispatcher for the remoting subsystem - default-remote-dispatcher { - type = ForkJoinDispatcher - executor = fork-join-executor - dedicated-thread-pool { - # Fixed number of threads to have in this threadpool - thread-count = 4 + executor = fork-join-executor + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 0.5 + parallelism-max = 16 } } backoff-remote-dispatcher { - type = ForkJoinDispatcher - executor = fork-join-executor - dedicated-thread-pool { - # Fixed number of threads to have in this threadpool - thread-count = 4 + executor = fork-join-executor + fork-join-executor { + parallelism-min = 2 + parallelism-max = 2 } } } diff --git a/src/core/Akka.Tests.Performance/Dispatch/ForkJoinDispatcherThroughputSpec.cs b/src/core/Akka.Tests.Performance/Dispatch/ForkJoinDispatcherThroughputSpec.cs index 4401921e266..8771a3129d5 100644 --- a/src/core/Akka.Tests.Performance/Dispatch/ForkJoinDispatcherThroughputSpec.cs +++ b/src/core/Akka.Tests.Performance/Dispatch/ForkJoinDispatcherThroughputSpec.cs @@ -45,4 +45,17 @@ protected override MessageDispatcherConfigurator Configurator() return new DispatcherConfigurator(DispatcherConfiguration, Prereqs); } } + + public class ChannelDispatcherExecutorThroughputSpec : WarmDispatcherThroughputSpecBase + { + public static Config DispatcherConfiguration => ConfigurationFactory.ParseString(@" + id = PerfTest + executor = channel-executor + "); + + protected override MessageDispatcherConfigurator Configurator() + { + return new DispatcherConfigurator(DispatcherConfiguration, Prereqs); + } + } } diff --git a/src/core/Akka/Actor/ActorSystem.cs b/src/core/Akka/Actor/ActorSystem.cs index 481aafbd06b..4b7819d9055 100644 --- a/src/core/Akka/Actor/ActorSystem.cs +++ b/src/core/Akka/Actor/ActorSystem.cs @@ -278,6 +278,10 @@ public static ActorSystem Create(string name) private static ActorSystem CreateAndStartSystem(string name, Config withFallback, ActorSystemSetup setup) { + // allows the ThreadPool to scale up / down dynamically + // by removing minimum thread count, which in our benchmarks + // appears to negatively impact performance + ThreadPool.SetMinThreads(0, 0); var system = new ActorSystemImpl(name, withFallback, setup, Option.None); system.Start(); return system; diff --git a/src/core/Akka/Akka.csproj b/src/core/Akka/Akka.csproj index d5af4e1eea7..28b2abd163d 100644 --- a/src/core/Akka/Akka.csproj +++ b/src/core/Akka/Akka.csproj @@ -16,7 +16,7 @@ - + diff --git a/src/core/Akka/Dispatch/AbstractDispatcher.cs b/src/core/Akka/Dispatch/AbstractDispatcher.cs index defa523f64a..b18b1f17e05 100644 --- a/src/core/Akka/Dispatch/AbstractDispatcher.cs +++ b/src/core/Akka/Dispatch/AbstractDispatcher.cs @@ -116,6 +116,26 @@ protected ExecutorServiceConfigurator(Config config, IDispatcherPrerequisites pr public IDispatcherPrerequisites Prerequisites { get; private set; } } + internal sealed class ChannelExecutorConfigurator : ExecutorServiceConfigurator + { + public ChannelExecutorConfigurator(Config config, IDispatcherPrerequisites prerequisites) : base(config, prerequisites) + { + var fje = config.GetConfig("fork-join-executor"); + MaxParallelism = ThreadPoolConfig.ScaledPoolSize( + fje.GetInt("parallelism-min"), + fje.GetDouble("parallelism-factor", 1.0D), // the scalar-based factor to scale the threadpool size to + fje.GetInt("parallelism-max")); + } + + public int MaxParallelism {get;} + + public override ExecutorService Produce(string id) + { + Prerequisites.EventStream.Publish(new Debug($"ChannelExecutor-[id]", typeof(FixedConcurrencyTaskScheduler), $"Launched Dispatcher [{id}] with MaxParallelism=[{MaxParallelism}]")); + return new TaskSchedulerExecutor(id, new FixedConcurrencyTaskScheduler(MaxParallelism)); + } + } + /// /// INTERNAL API /// @@ -306,6 +326,8 @@ protected ExecutorServiceConfigurator ConfigureExecutor() return new CurrentSynchronizationContextExecutorServiceFactory(Config, Prerequisites); case "task-executor": return new DefaultTaskSchedulerExecutorConfigurator(Config, Prerequisites); + case "channel-executor": + return new ChannelExecutorConfigurator(Config, Prerequisites); default: Type executorConfiguratorType = Type.GetType(executor); if (executorConfiguratorType == null) diff --git a/src/core/Akka/Dispatch/Dispatchers.cs b/src/core/Akka/Dispatch/Dispatchers.cs index 5043cf40547..2f0231319d1 100644 --- a/src/core/Akka/Dispatch/Dispatchers.cs +++ b/src/core/Akka/Dispatch/Dispatchers.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Akka.Actor; @@ -91,6 +92,86 @@ public PartialTrustThreadPoolExecutorService(string id) : base(id) } } + /// + /// INTERNAL API + /// + /// Used to power + /// + internal sealed class FixedConcurrencyTaskScheduler : TaskScheduler + { + + [ThreadStatic] + private static bool _threadRunning = false; + private ConcurrentQueue _tasks = new ConcurrentQueue(); + + private int _readers = 0; + + public FixedConcurrencyTaskScheduler(int degreeOfParallelism) + { + MaximumConcurrencyLevel = degreeOfParallelism; + } + + + public override int MaximumConcurrencyLevel { get; } + + /// + /// ONLY USED IN DEBUGGER - NO PERF IMPACT. + /// + protected override IEnumerable GetScheduledTasks() + { + return _tasks; + } + + protected override bool TryDequeue(Task task) + { + return false; + } + + protected override void QueueTask(Task task) + { + _tasks.Enqueue(task); + if (_readers < MaximumConcurrencyLevel) + { + var initial = _readers; + var newVale = _readers + 1; + if (initial == Interlocked.CompareExchange(ref _readers, newVale, initial)) + { + // try to start a new worker + ThreadPool.UnsafeQueueUserWorkItem(_ => ReadChannel(), null); + } + } + } + + protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) + { + // If this thread isn't already processing a task, we don't support inlining + if (!_threadRunning) return false; + return TryExecuteTask(task); + } + + public void ReadChannel() + { + _threadRunning = true; + try + { + while (_tasks.TryDequeue(out var runnable)) + { + base.TryExecuteTask(runnable); + } + } + catch + { + // suppress exceptions + } + finally + { + Interlocked.Decrement(ref _readers); + + _threadRunning = false; + } + } + } + /// /// INTERNAL API @@ -273,7 +354,7 @@ public MessageDispatcher DefaultGlobalDispatcher internal MessageDispatcher InternalDispatcher { get; } /// - /// The for the default dispatcher. + /// The for the default dispatcher. /// public Config DefaultDispatcherConfig { @@ -336,7 +417,7 @@ public bool HasDispatcher(string id) private MessageDispatcherConfigurator LookupConfigurator(string id) { var depth = 0; - while(depth < MaxDispatcherAliasDepth) + while (depth < MaxDispatcherAliasDepth) { if (_dispatcherConfigurators.TryGetValue(id, out var configurator)) return configurator; @@ -374,7 +455,7 @@ private MessageDispatcherConfigurator LookupConfigurator(string id) /// /// INTERNAL API /// - /// Creates a dispatcher from a . Internal test purpose only. + /// Creates a dispatcher from a . Internal test purpose only. /// /// From(Config.GetConfig(id)); ///