Skip to content

Commit

Permalink
added ChannelExecutor dispatcher - uses `FixedConcurrencyTaskSchedu…
Browse files Browse the repository at this point in the history
…ler` internally - have `ActorSystem.Create` call `ThreadPool.SetMinThreads(0,0)` to improve performance across the board.
  • Loading branch information
Aaronontheweb committed Apr 28, 2021
1 parent a1fa221 commit e7b40ea
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 24 deletions.
58 changes: 58 additions & 0 deletions docs/articles/actors/dispatchers.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Some dispatcher configurations are available out-of-the-box for convenience. You
* **task-dispatcher** - A configuration that uses the [TaskDispatcher](#taskdispatcher).
* **default-fork-join-dispatcher** - A configuration that uses the [ForkJoinDispatcher](#forkjoindispatcher).
* **synchronized-dispatcher** - A configuration that uses the [SynchronizedDispatcher](#synchronizeddispatcher).
* **channel-executor** - new as of v1.4.19, the [`ChannelExecutor`](#channelexecutor) is used to run on top of the .NET `ThreadPool` and allow Akka.NET to dynamically scale thread usage up and down with demand in exchange for better CPU and throughput performance.

## Built-in Dispatchers

Expand Down Expand Up @@ -165,6 +166,63 @@ private void Form1_Load(object sender, System.EventArgs e)
}
```

### `ChannelExecutor`
In Akka.NET v1.4.19 we will be introducing an opt-in feature, the `ChannelExecutor` - a new dispatcher type that re-uses the same configuration as a `ForkJoinDispatcher` but runs entirely on top of the .NET `ThreadPool` and is able to take advantage of dynamic thread pool scaling to size / resize workloads on the fly.

During its initial development and benchmarks, we observed the following:

1. The `ChannelExecutor` tremendously reduced idle CPU and max busy CPU even during peak message throughput, primarily as a result of dynamically shrinking the total `ThreadPool` to only the necessary size. This resolves one of the largest complaints large users of Akka.NET have today. However, **in order for this setting to be effective `ThreadPool.SetMin(0,0)` must also be set**. We are considering doing this inside the `ActorSystem.Create` method, but if you those settings don't work for you you can easily override them by simply calling `ThreadPool.SetMin(yourValue, yourValue)` again after `ActorSystem.Create` has exited.
2. The `ChannelExecutor` actually beat the `ForkJoinDispatcher` and others on performance even in environments like Docker and bare metal on Windows.

> [!NOTE]
> We are in the process of gathering data from users on how well `ChannelExecutor` performs in the real world. If you are interested in trying out the `ChannelExecutor`, please read the directions in this document and then comment on [the "Akka.NET v1.4.19: ChannelExecutor performance data" discussion thread](https://github.com/akkadotnet/akka.net/discussions/4983).
The `ChannelExectuor` re-uses the same threading settings as the `ForkJoinExecutor` to determine its effective upper and lower parallelism limits, and you can configure the `ChannelExecutor` to run inside your `ActorSystem` via the following HOCON configuration:

```
akka.actor.default-dispatcher = {
executor = channel-executor
fork-join-executor { #channelexecutor will re-use these settings
parallelism-min = 2
parallelism-factor = 1
parallelism-max = 64
}
}
akka.actor.internal-dispatcher = {
executor = channel-executor
throughput = 5
fork-join-executor {
parallelism-min = 4
parallelism-factor = 1.0
parallelism-max = 64
}
}
akka.remote.default-remote-dispatcher {
type = Dispatcher
executor = channel-executor
fork-join-executor {
parallelism-min = 2
parallelism-factor = 0.5
parallelism-max = 16
}
}
akka.remote.backoff-remote-dispatcher {
executor = channel-executor
fork-join-executor {
parallelism-min = 2
parallelism-max = 2
}
}
```

This will enable the `ChannelExecutor` to run everywhere and all Akka.NET loads, with the exception of anything you manually allocate onto a `ForkJoinDispatcher` or `PinnedDispatcher`, will be managed by the `ThreadPool`.

> [!IMPORTANT]
> As of Akka.NET v1.4.19, we call `ThreadPool.SetMinThreads(0,0)` inside the `ActorSystem.Create` method as we've found that the default `ThreadPool` minimum values have a negative impact on performance. However, if this causes undesireable side effects for you inside your application you can always override those settings by calling `ThreadPool.SetMinThreads(yourValue, yourValue)` again after you've created your `ActorSystem`.
#### Common Dispatcher Configuration

The following configuration keys are available for any dispatcher configuration:
Expand Down
5 changes: 1 addition & 4 deletions docs/articles/clustering/cluster-singleton.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ This pattern may seem to be very tempting to use at first, but it has several dr

Especially the last point is something you should be aware of — in general when using the Cluster Singleton pattern you should take care of downing nodes yourself and not rely on the timing based auto-down feature.

> [!WARNING]
> Be very careful when using Cluster Singleton together with Automatic Downing, since it allows the cluster to split up into two separate clusters, which in turn will result in `multiple Singletons` being started, one in each separate cluster!
## An Example
Assume that we need one single entry point to an external system. An actor that receives messages from a JMS queue with the strict requirement that only one JMS consumer must exist to be make sure that the messages are processed in order. That is perhaps not how one would like to design things, but a typical real-world scenario when integrating with external systems.

Expand Down Expand Up @@ -111,4 +108,4 @@ akka.cluster.singleton-proxy {
# Maximum allowed buffer size is 10000.
buffer-size = 1000
}
```
```
1 change: 1 addition & 0 deletions src/benchmark/Akka.Benchmarks/Actor/PingPongBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Benchmarks.Configurations;
using Akka.Configuration;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Engines;

Expand Down
3 changes: 2 additions & 1 deletion src/benchmark/RemotePingPong/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static uint CpuSpeed()
public static Config CreateActorSystemConfig(string actorSystemName, string ipOrHostname, int port)
{
var baseConfig = ConfigurationFactory.ParseString(@"
akka {
akka {
actor.provider = remote
loglevel = ERROR
suppress-json-serializer-warning = on
Expand All @@ -57,6 +57,7 @@ public static Config CreateActorSystemConfig(string actorSystemName, string ipOr
port = 0
hostname = ""localhost""
}
}
}");

Expand Down
26 changes: 23 additions & 3 deletions src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public StressSpecConfig()
convergence-within-factor = 1.0
}
akka.actor.provider = cluster
akka.cluster {
failure-detector.acceptable-heartbeat-pause = 3s
downing-provider-class = ""Akka.Cluster.SplitBrainResolver, Akka.Cluster""
Expand All @@ -86,10 +87,29 @@ public StressSpecConfig()
akka.loggers = [""Akka.TestKit.TestEventListener, Akka.TestKit""]
akka.loglevel = INFO
akka.remote.log-remote-lifecycle-events = off
akka.actor.default-dispatcher.fork-join-executor {
parallelism - min = 8
parallelism - max = 8
akka.actor.default-dispatcher = {
executor = channel-executor
fork-join-executor {
parallelism-min = 2
parallelism-factor = 1
parallelism-max = 64
}
}
akka.actor.internal-dispatcher = {
executor = channel-executor
fork-join-executor {
parallelism-min = 2
parallelism-factor = 1
parallelism-max = 64
}
}
akka.remote.default-remote-dispatcher {
executor = channel-executor
fork-join-executor {
parallelism-min = 2
parallelism-factor = 0.5
parallelism-max = 16
}
");

TestTransport = true;
Expand Down
21 changes: 9 additions & 12 deletions src/core/Akka.Remote/Configuration/Remote.conf
Original file line number Diff line number Diff line change
Expand Up @@ -578,23 +578,20 @@ akka {

### Default dispatcher for the remoting subsystem

### Default dispatcher for the remoting subsystem

default-remote-dispatcher {
type = ForkJoinDispatcher
executor = fork-join-executor
dedicated-thread-pool {
# Fixed number of threads to have in this threadpool
thread-count = 4
executor = fork-join-executor
fork-join-executor {
parallelism-min = 2
parallelism-factor = 0.5
parallelism-max = 16
}
}

backoff-remote-dispatcher {
type = ForkJoinDispatcher
executor = fork-join-executor
dedicated-thread-pool {
# Fixed number of threads to have in this threadpool
thread-count = 4
executor = fork-join-executor
fork-join-executor {
parallelism-min = 2
parallelism-max = 2
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,17 @@ protected override MessageDispatcherConfigurator Configurator()
return new DispatcherConfigurator(DispatcherConfiguration, Prereqs);
}
}

public class ChannelDispatcherExecutorThroughputSpec : WarmDispatcherThroughputSpecBase
{
public static Config DispatcherConfiguration => ConfigurationFactory.ParseString(@"
id = PerfTest
executor = channel-executor
");

protected override MessageDispatcherConfigurator Configurator()
{
return new DispatcherConfigurator(DispatcherConfiguration, Prereqs);
}
}
}
4 changes: 4 additions & 0 deletions src/core/Akka/Actor/ActorSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ public static ActorSystem Create(string name)

private static ActorSystem CreateAndStartSystem(string name, Config withFallback, ActorSystemSetup setup)
{
// allows the ThreadPool to scale up / down dynamically
// by removing minimum thread count, which in our benchmarks
// appears to negatively impact performance
ThreadPool.SetMinThreads(0, 0);
var system = new ActorSystemImpl(name, withFallback, setup, Option<Props>.None);
system.Start();
return system;
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/Akka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonVersion)" />
<PackageReference Include="System.Collections.Immutable" Version="5.0.0" />
<PackageReference Include="System.Collections.Immutable" Version="5.0.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == '$(NetStandardLibVersion)'">
Expand Down
22 changes: 22 additions & 0 deletions src/core/Akka/Dispatch/AbstractDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,26 @@ protected ExecutorServiceConfigurator(Config config, IDispatcherPrerequisites pr
public IDispatcherPrerequisites Prerequisites { get; private set; }
}

internal sealed class ChannelExecutorConfigurator : ExecutorServiceConfigurator
{
public ChannelExecutorConfigurator(Config config, IDispatcherPrerequisites prerequisites) : base(config, prerequisites)
{
var fje = config.GetConfig("fork-join-executor");
MaxParallelism = ThreadPoolConfig.ScaledPoolSize(
fje.GetInt("parallelism-min"),
fje.GetDouble("parallelism-factor", 1.0D), // the scalar-based factor to scale the threadpool size to
fje.GetInt("parallelism-max"));
}

public int MaxParallelism {get;}

public override ExecutorService Produce(string id)
{
Prerequisites.EventStream.Publish(new Debug($"ChannelExecutor-[id]", typeof(FixedConcurrencyTaskScheduler), $"Launched Dispatcher [{id}] with MaxParallelism=[{MaxParallelism}]"));
return new TaskSchedulerExecutor(id, new FixedConcurrencyTaskScheduler(MaxParallelism));
}
}

/// <summary>
/// INTERNAL API
///
Expand Down Expand Up @@ -306,6 +326,8 @@ protected ExecutorServiceConfigurator ConfigureExecutor()
return new CurrentSynchronizationContextExecutorServiceFactory(Config, Prerequisites);
case "task-executor":
return new DefaultTaskSchedulerExecutorConfigurator(Config, Prerequisites);
case "channel-executor":
return new ChannelExecutorConfigurator(Config, Prerequisites);
default:
Type executorConfiguratorType = Type.GetType(executor);
if (executorConfiguratorType == null)
Expand Down
87 changes: 84 additions & 3 deletions src/core/Akka/Dispatch/Dispatchers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
Expand Down Expand Up @@ -91,6 +92,86 @@ public PartialTrustThreadPoolExecutorService(string id) : base(id)
}
}

/// <summary>
/// INTERNAL API
///
/// Used to power <see cref="ChannelExecutorConfigurator"/>
/// </summary>
internal sealed class FixedConcurrencyTaskScheduler : TaskScheduler
{

[ThreadStatic]
private static bool _threadRunning = false;
private ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>();

private int _readers = 0;

public FixedConcurrencyTaskScheduler(int degreeOfParallelism)
{
MaximumConcurrencyLevel = degreeOfParallelism;
}


public override int MaximumConcurrencyLevel { get; }

/// <summary>
/// ONLY USED IN DEBUGGER - NO PERF IMPACT.
/// </summary>
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks;
}

protected override bool TryDequeue(Task task)
{
return false;
}

protected override void QueueTask(Task task)
{
_tasks.Enqueue(task);
if (_readers < MaximumConcurrencyLevel)
{
var initial = _readers;
var newVale = _readers + 1;
if (initial == Interlocked.CompareExchange(ref _readers, newVale, initial))
{
// try to start a new worker
ThreadPool.UnsafeQueueUserWorkItem(_ => ReadChannel(), null);
}
}
}

protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// If this thread isn't already processing a task, we don't support inlining
if (!_threadRunning) return false;
return TryExecuteTask(task);
}

public void ReadChannel()
{
_threadRunning = true;
try
{
while (_tasks.TryDequeue(out var runnable))
{
base.TryExecuteTask(runnable);
}
}
catch
{
// suppress exceptions
}
finally
{
Interlocked.Decrement(ref _readers);

_threadRunning = false;
}
}
}


/// <summary>
/// INTERNAL API
Expand Down Expand Up @@ -273,7 +354,7 @@ public MessageDispatcher DefaultGlobalDispatcher
internal MessageDispatcher InternalDispatcher { get; }

/// <summary>
/// The <see cref="Hocon.Config"/> for the default dispatcher.
/// The <see cref="Configuration.Config"/> for the default dispatcher.
/// </summary>
public Config DefaultDispatcherConfig
{
Expand Down Expand Up @@ -336,7 +417,7 @@ public bool HasDispatcher(string id)
private MessageDispatcherConfigurator LookupConfigurator(string id)
{
var depth = 0;
while(depth < MaxDispatcherAliasDepth)
while (depth < MaxDispatcherAliasDepth)
{
if (_dispatcherConfigurators.TryGetValue(id, out var configurator))
return configurator;
Expand Down Expand Up @@ -374,7 +455,7 @@ private MessageDispatcherConfigurator LookupConfigurator(string id)
/// <summary>
/// INTERNAL API
///
/// Creates a dispatcher from a <see cref="Hocon.Config"/>. Internal test purpose only.
/// Creates a dispatcher from a <see cref="Configuration.Config"/>. Internal test purpose only.
/// <code>
/// From(Config.GetConfig(id));
/// </code>
Expand Down

0 comments on commit e7b40ea

Please sign in to comment.