diff --git a/docs/articles/clustering/cluster-sharded-daemon-process.md b/docs/articles/clustering/cluster-sharded-daemon-process.md new file mode 100644 index 00000000000..7220d4ec02c --- /dev/null +++ b/docs/articles/clustering/cluster-sharded-daemon-process.md @@ -0,0 +1,51 @@ +# Sharded Daemon Process + +> [!WARNING] +>This module is currently marked as [may change](../utilities/may-change.md) because it is a new feature that +>needs feedback from real usage before finalizing the API. This means that API or semantics can change without +>warning or deprecation period. It is also not recommended to use this module in production just yet. + +## Introduction + +Sharded Daemon Process provides a way to run `N` actors, each given a numeric id starting from `0` that are then kept alive +and balanced across the cluster. When a rebalance is needed the actor is stopped and, triggered by a keep alive running on +all nodes, started on a new node (the keep alive should be seen as an implementation detail and may change in future versions). + +The intended use case is for splitting data processing workloads across a set number of workers that each get to work on a subset +of the data that needs to be processed. This is commonly needed to create projections based on the event streams available +from all the [Persistent Actors](../persistence/event-sourcing.md) in a CQRS application. Events are tagged with one out of `N` tags +used to split the workload of consuming and updating a projection between `N` workers. + +For cases where a single actor needs to be kept alive see [Cluster Singleton](cluster-singleton.md) + +## Basic example + +To set up a set of actors running with Sharded Daemon process each node in the cluster needs to run the same initialization +when starting up: + +```csharp +class TagProcessor : ReceiveActor +{ + public string Tag { get; } + + public static Props Props(string tag) => + Actor.Props.Create(() => new TagProcessor(tag)); + + public TagProcessor(string tag) => Tag = tag; + + protected override void PreStart() + { + // start the processing ... + base.PreStart(); + Context.System.Log.Debug("Starting processor for tag {0}", Tag); + } +} + +var tags = new[] { "tag-1", "tag-2", "tag-3" }; +ShardedDaemonProcess.Get(Sys).Init("TagProcessors", tags.Length, id => TagProcessor.Props(tags[id])); +``` + +## Scalability + +This cluster tool is intended for small numbers of consumers and will not scale well to a large set. In large clusters +it is recommended to limit the nodes the sharded daemon process will run on using a role. \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardedDaemonProcessSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardedDaemonProcessSpec.cs new file mode 100644 index 00000000000..abfd62da1e3 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardedDaemonProcessSpec.cs @@ -0,0 +1,132 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2020 Lightbend Inc. +// Copyright (C) 2013-2020 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Linq; +using Akka.Actor; +using Akka.Cluster.Tools.Singleton; +using Akka.Configuration; +using Akka.TestKit; +using Xunit; + +namespace Akka.Cluster.Sharding.Tests +{ + public class ShardedDaemonProcessSpec : AkkaSpec + { + private sealed class Stop + { + public static Stop Instance { get; } = new Stop(); + private Stop() { } + } + + private sealed class Started + { + public int Id { get; } + public IActorRef SelfRef { get; } + + public Started(int id, IActorRef selfRef) + { + Id = id; + SelfRef = selfRef; + } + } + + private class MyActor : UntypedActor + { + public int Id { get; } + public IActorRef Probe { get; } + + public static Props Props(int id, IActorRef probe) => + Actor.Props.Create(() => new MyActor(id, probe)); + + public MyActor(int id, IActorRef probe) + { + Id = id; + Probe = probe; + } + + protected override void PreStart() + { + base.PreStart(); + Probe.Tell(new Started(Id, Context.Self)); + } + + protected override void OnReceive(object message) + { + if (message is Stop _) + Context.Stop(Self); + } + } + + private static Config GetConfig() + { + return ConfigurationFactory.ParseString(@" + akka.actor.provider = cluster + akka.remote.dot-netty.tcp.port = 0 + akka.remote.dot-netty.tcp.hostname = 127.0.0.1 + + # ping often/start fast for test + akka.cluster.sharded-daemon-process.keep-alive-interval = 1s + + akka.coordinated-shutdown.terminate-actor-system = off + akka.coordinated-shutdown.run-by-actor-system-terminate = off") + .WithFallback(ClusterSingletonProxy.DefaultConfig()) + .WithFallback(ClusterSharding.DefaultConfig()); + } + + public ShardedDaemonProcessSpec() + : base(GetConfig()) + { } + + [Fact] + public void ShardedDaemonProcess_must_have_a_single_node_cluster_running_first() + { + var probe = CreateTestProbe(); + Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress); + probe.AwaitAssert(() => Cluster.Get(Sys).SelfMember.Status.ShouldBe(MemberStatus.Up), TimeSpan.FromSeconds(3)); + } + + [Fact] + public void ShardedDaemonProcess_must_start_N_actors_with_unique_ids() + { + Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress); + + var probe = CreateTestProbe(); + ShardedDaemonProcess.Get(Sys).Init("a", 5, id => MyActor.Props(id, probe.Ref)); + + var started = probe.ReceiveN(5); + started.Count.ShouldBe(5); + } + + [Fact] + public void ShardedDaemonProcess_must_restart_actors_if_they_stop() + { + Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress); + + var probe = CreateTestProbe(); + ShardedDaemonProcess.Get(Sys).Init("stop", 2, id => MyActor.Props(id, probe.Ref)); + + foreach (var started in Enumerable.Range(0, 2).Select(_ => probe.ExpectMsg())) + started.SelfRef.Tell(Stop.Instance); + + // periodic ping every 1s makes it restart + Enumerable.Range(0, 2).Select(_ => probe.ExpectMsg(TimeSpan.FromSeconds(3))); + } + + [Fact] + public void ShardedDaemonProcess_must_not_run_if_the_role_does_not_match_node_role() + { + Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress); + + var probe = CreateTestProbe(); + var settings = ShardedDaemonProcessSettings.Create(Sys).WithShardingSettings(ClusterShardingSettings.Create(Sys).WithRole("workers")); + ShardedDaemonProcess.Get(Sys).Init("roles", 3, id => MyActor.Props(id, probe.Ref), settings); + + probe.ExpectNoMsg(); + } + } +} diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs index 40bac68c6e5..fecca87fb6d 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs @@ -117,7 +117,7 @@ public override int GetHashCode() /// to start(it does not guarantee the entity successfully started) /// [Serializable] - public sealed class StartEntityAck : IClusterShardingSerializable + public sealed class StartEntityAck : IClusterShardingSerializable, IDeadLetterSuppression { /// /// An identifier of a newly started entity. Unique in scope of a given shard. diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs new file mode 100644 index 00000000000..77dc48b203a --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs @@ -0,0 +1,179 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2020 Lightbend Inc. +// Copyright (C) 2013-2020 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Linq; +using Akka.Actor; +using Akka.Annotations; +using Akka.Util.Internal; + +namespace Akka.Cluster.Sharding +{ + internal class KeepAlivePinger : UntypedActor, IWithTimers + { + private sealed class Tick + { + public static Tick Instance { get; } = new Tick(); + private Tick() { } + } + + public string Name { get; } + public string[] Identities { get; } + public IActorRef ShardingRef { get; } + public ShardedDaemonProcessSettings Settings { get; } + + public ITimerScheduler Timers { get; set; } + + public static Props Props(ShardedDaemonProcessSettings settings, string name, string[] identities, IActorRef shardingRef) => + Actor.Props.Create(() => new KeepAlivePinger(settings, name, identities, shardingRef)); + + public KeepAlivePinger(ShardedDaemonProcessSettings settings, string name, string[] identities, IActorRef shardingRef) + { + Settings = settings; + Name = name; + Identities = identities; + ShardingRef = shardingRef; + } + + protected override void PreStart() + { + base.PreStart(); + + Context.System.Log.Debug("Starting Sharded Daemon Process KeepAlivePinger for [{0}], with ping interval [{1}]"); + Timers.StartPeriodicTimer("tick", Tick.Instance, Settings.KeepAliveInterval); + TriggerStartAll(); + } + + protected override void OnReceive(object message) + { + if (message is Tick _) + { + TriggerStartAll(); + Context.System.Log.Debug("Periodic ping sent to [{0}] processes", Identities.Length); + } + } + + private void TriggerStartAll() => Identities.ForEach(id => ShardingRef.Tell(new ShardRegion.StartEntity(id))); + } + + internal sealed class MessageExtractor : HashCodeMessageExtractor + { + public MessageExtractor(int maxNumberOfShards) + : base(maxNumberOfShards) + { } + + public override string EntityId(object message) => (message as ShardingEnvelope)?.EntityId; + public override object EntityMessage(object message) => (message as ShardingEnvelope)?.Message; + public override string ShardId(object message) => message is ShardRegion.StartEntity se ? se.EntityId : EntityId(message); + } + + /// + /// Default envelope type that may be used with Cluster Sharding. + /// + /// The alternative way of routing messages through sharding is to not use envelopes, + /// and have the message types themselves carry identifiers. + /// + /// + public sealed class ShardingEnvelope + { + public string EntityId { get; } + public object Message { get; } + + public ShardingEnvelope(string entityId, object message) + { + EntityId = entityId; + Message = message; + } + } + + /// + /// This extension runs a pre set number of actors in a cluster. + /// + /// The typical use case is when you have a task that can be divided in a number of workers, each doing a + /// sharded part of the work, for example consuming the read side events from Akka Persistence through + /// tagged events where each tag decides which consumer that should consume the event. + /// + /// Each named set needs to be started on all the nodes of the cluster on start up. + /// + /// The processes are spread out across the cluster, when the cluster topology changes the processes may be stopped + /// and started anew on a new node to rebalance them. + /// + /// Not for user extension. + /// + [ApiMayChange] + public class ShardedDaemonProcess : IExtension + { + private readonly ExtendedActorSystem _system; + + public ShardedDaemonProcess(ExtendedActorSystem system) => _system = system; + + public static ShardedDaemonProcess Get(ActorSystem system) => + system.WithExtension(); + + /// + /// Start a specific number of actors that is then kept alive in the cluster. + /// + /// TBD + /// TBD + /// Given a unique id of `0` until `numberOfInstance` create an entity actor. + public void Init(string name, int numberOfInstances, Func propsFactory) + { + Init(name, numberOfInstances, propsFactory, ShardedDaemonProcessSettings.Create(_system)); + } + + /// + /// Start a specific number of actors, each with a unique numeric id in the set, that is then kept alive in the cluster. + /// + /// TBD + /// TBD + /// Given a unique id of `0` until `numberOfInstance` create an entity actor. + /// TBD + public void Init(string name, int numberOfInstances, Func propsFactory, ShardedDaemonProcessSettings settings) + { + // One shard per actor identified by the numeric id encoded in the entity id + var numberOfShards = numberOfInstances; + var entityIds = Enumerable.Range(0, numberOfInstances).Select(i => i.ToString()).ToArray(); + + // Defaults in `akka.cluster.sharding` but allow overrides specifically for actor-set + var shardingBaseSettings = settings.ShardingSettings; + if (shardingBaseSettings == null) + { + var shardingConfig = _system.Settings.Config.GetConfig("akka.cluster.sharded-daemon-process.sharding"); + var coordinatorSingletonConfig = _system.Settings.Config.GetConfig(shardingConfig.GetString("coordinator-singleton")); + shardingBaseSettings = ClusterShardingSettings.Create(shardingConfig, coordinatorSingletonConfig); + } + + var shardingSettings = new ClusterShardingSettings( + shardingBaseSettings.Role, + false, // remember entities disabled + "", + "", + TimeSpan.Zero, // passivation disabled + StateStoreMode.DData, + shardingBaseSettings.TunningParameters, + shardingBaseSettings.CoordinatorSingletonSettings); + + if (string.IsNullOrEmpty(shardingSettings.Role) || Cluster.Get(_system).SelfRoles.Contains(shardingSettings.Role)) + { + var shardRegion = ClusterSharding.Get(_system).Start( + typeName: $"sharded-daemon-process-{name}", + entityPropsFactory: entityId => propsFactory(int.Parse(entityId)), + settings: shardingSettings, + messageExtractor: new MessageExtractor(numberOfShards)); + + _system.ActorOf( + KeepAlivePinger.Props(settings, name, entityIds, shardRegion), + $"ShardedDaemonProcessKeepAlive-{name}"); + } + } + } + + public class ShardedDaemonProcessExtensionProvider : ExtensionIdProvider + { + public override ShardedDaemonProcess CreateExtension(ExtendedActorSystem system) => new ShardedDaemonProcess(system); + } +} diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcessSettings.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcessSettings.cs new file mode 100644 index 00000000000..c1508422c04 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcessSettings.cs @@ -0,0 +1,72 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2020 Lightbend Inc. +// Copyright (C) 2013-2020 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using Akka.Actor; +using Akka.Annotations; +using Akka.Configuration; + +namespace Akka.Cluster.Sharding +{ + [Serializable] + [ApiMayChange] + public sealed class ShardedDaemonProcessSettings + { + /// + /// The interval each parent of the sharded set is pinged from each node in the cluster. + /// + public readonly TimeSpan KeepAliveInterval; + + /// + /// Specify sharding settings that should be used for the sharded daemon process instead of loading from config. + /// + public readonly ClusterShardingSettings ShardingSettings; + + /// + /// Create default settings for system + /// + public static ShardedDaemonProcessSettings Create(ActorSystem system) + { + return FromConfig(system.Settings.Config.GetConfig("akka.cluster.sharded-daemon-process")); + } + + public static ShardedDaemonProcessSettings FromConfig(Config config) + { + var keepAliveInterval = config.GetTimeSpan("keep-alive-interval"); + return new ShardedDaemonProcessSettings(keepAliveInterval); + } + + /// + /// Not for user constructions, use factory methods to instantiate. + /// + private ShardedDaemonProcessSettings(TimeSpan keepAliveInterval, ClusterShardingSettings shardingSettings = null) + { + KeepAliveInterval = keepAliveInterval; + ShardingSettings = shardingSettings; + } + + /// + /// NOTE: How the sharded set is kept alive may change in the future meaning this setting may go away. + /// + /// The interval each parent of the sharded set is pinged from each node in the cluster. + public ShardedDaemonProcessSettings WithKeepAliveInterval(TimeSpan keepAliveInterval) + { + return new ShardedDaemonProcessSettings(keepAliveInterval, ShardingSettings); + } + + /// + /// Specify sharding settings that should be used for the sharded daemon process instead of loading from config. + /// Some settings can not be changed (remember-entities and related settings, passivation, number-of-shards), + /// changing those settings will be ignored. + /// + /// TBD + public ShardedDaemonProcessSettings WithShardingSettings(ClusterShardingSettings shardingSettings) + { + return new ShardedDaemonProcessSettings(KeepAliveInterval, shardingSettings); + } + } +} diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf b/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf index b69d6c0dad5..36f571ec725 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf +++ b/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf @@ -268,6 +268,20 @@ akka.cluster.sharding { } # //#sharding-ext-config +akka.cluster.sharded-daemon-process { + # Settings for the sharded dameon process internal usage of sharding are using the akka.cluste.sharding defaults. + # Some of the settings can be overriden specifically for the sharded daemon process here. For example can the + # `role` setting limit what nodes the daemon processes and the keep alive pingers will run on. + # Some settings can not be changed (remember-entitites and related settings, passivation, number-of-shards), + # overriding those settings will be ignored. + sharding = ${akka.cluster.sharding} + + # Each entity is pinged at this interval from each node in the + # cluster to trigger a start if it has stopped, for example during + # rebalancing. + # Note: How the set of actors is kept alive may change in the future meaning this setting may go away. + keep-alive-interval = 10s +} # Protobuf serializer for Cluster Sharding messages akka.actor { diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterSharding.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterSharding.approved.txt index b4bf1399368..785e8a4a162 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterSharding.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveClusterSharding.approved.txt @@ -190,7 +190,7 @@ namespace Akka.Cluster.Sharding public override bool Equals(object obj) { } public override int GetHashCode() { } } - public sealed class StartEntityAck : Akka.Cluster.Sharding.IClusterShardingSerializable + public sealed class StartEntityAck : Akka.Cluster.Sharding.IClusterShardingSerializable, Akka.Event.IDeadLetterSuppression { public readonly string EntityId; public readonly string ShardId; @@ -213,6 +213,35 @@ namespace Akka.Cluster.Sharding public readonly string ShardId; public ShardState(string shardId, System.Collections.Immutable.IImmutableSet entityIds) { } } + [Akka.Annotations.ApiMayChangeAttribute()] + public class ShardedDaemonProcess : Akka.Actor.IExtension + { + public ShardedDaemonProcess(Akka.Actor.ExtendedActorSystem system) { } + public static Akka.Cluster.Sharding.ShardedDaemonProcess Get(Akka.Actor.ActorSystem system) { } + public void Init(string name, int numberOfInstances, System.Func propsFactory) { } + public void Init(string name, int numberOfInstances, System.Func propsFactory, Akka.Cluster.Sharding.ShardedDaemonProcessSettings settings) { } + } + public class ShardedDaemonProcessExtensionProvider : Akka.Actor.ExtensionIdProvider + { + public ShardedDaemonProcessExtensionProvider() { } + public override Akka.Cluster.Sharding.ShardedDaemonProcess CreateExtension(Akka.Actor.ExtendedActorSystem system) { } + } + [Akka.Annotations.ApiMayChangeAttribute()] + public sealed class ShardedDaemonProcessSettings + { + public readonly System.TimeSpan KeepAliveInterval; + public readonly Akka.Cluster.Sharding.ClusterShardingSettings ShardingSettings; + public static Akka.Cluster.Sharding.ShardedDaemonProcessSettings Create(Akka.Actor.ActorSystem system) { } + public static Akka.Cluster.Sharding.ShardedDaemonProcessSettings FromConfig(Akka.Configuration.Config config) { } + public Akka.Cluster.Sharding.ShardedDaemonProcessSettings WithKeepAliveInterval(System.TimeSpan keepAliveInterval) { } + public Akka.Cluster.Sharding.ShardedDaemonProcessSettings WithShardingSettings(Akka.Cluster.Sharding.ClusterShardingSettings shardingSettings) { } + } + public sealed class ShardingEnvelope + { + public ShardingEnvelope(string entityId, object message) { } + public string EntityId { get; } + public object Message { get; } + } public enum StateStoreMode { Persistence = 0,