-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[WIP] Backport of the feature called ClusterDistribution in Lagom
- Loading branch information
1 parent
60b5537
commit 7d2fa7a
Showing
7 changed files
with
479 additions
and
2 deletions.
There are no files selected for viewing
51 changes: 51 additions & 0 deletions
51
docs/articles/clustering/cluster-sharded-daemon-process.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
# Sharded Daemon Process | ||
|
||
> [!WARNING] | ||
>This module is currently marked as [may change](../utilities/may-change.md) because it is a new feature that | ||
>needs feedback from real usage before finalizing the API. This means that API or semantics can change without | ||
>warning or deprecation period. It is also not recommended to use this module in production just yet. | ||
## Introduction | ||
|
||
Sharded Daemon Process provides a way to run `N` actors, each given a numeric id starting from `0` that are then kept alive | ||
and balanced across the cluster. When a rebalance is needed the actor is stopped and, triggered by a keep alive running on | ||
all nodes, started on a new node (the keep alive should be seen as an implementation detail and may change in future versions). | ||
|
||
The intended use case is for splitting data processing workloads across a set number of workers that each get to work on a subset | ||
of the data that needs to be processed. This is commonly needed to create projections based on the event streams available | ||
from all the [Persistent Actors](../persistence/event-sourcing.md) in a CQRS application. Events are tagged with one out of `N` tags | ||
used to split the workload of consuming and updating a projection between `N` workers. | ||
|
||
For cases where a single actor needs to be kept alive see [Cluster Singleton](cluster-singleton.md) | ||
|
||
## Basic example | ||
|
||
To set up a set of actors running with Sharded Daemon process each node in the cluster needs to run the same initialization | ||
when starting up: | ||
|
||
```csharp | ||
class TagProcessor : ReceiveActor | ||
{ | ||
public string Tag { get; } | ||
|
||
public static Props Props(string tag) => | ||
Actor.Props.Create(() => new TagProcessor(tag)); | ||
|
||
public TagProcessor(string tag) => Tag = tag; | ||
|
||
protected override void PreStart() | ||
{ | ||
// start the processing ... | ||
base.PreStart(); | ||
Context.System.Log.Debug("Starting processor for tag {0}", Tag); | ||
} | ||
} | ||
|
||
var tags = new[] { "tag-1", "tag-2", "tag-3" }; | ||
ShardedDaemonProcess.Get(Sys).Init("TagProcessors", tags.Length, id => TagProcessor.Props(tags[id])); | ||
``` | ||
|
||
## Scalability | ||
|
||
This cluster tool is intended for small numbers of consumers and will not scale well to a large set. In large clusters | ||
it is recommended to limit the nodes the sharded daemon process will run on using a role. |
132 changes: 132 additions & 0 deletions
132
src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardedDaemonProcessSpec.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
//----------------------------------------------------------------------- | ||
// <copyright file="ClusterSharding.cs" company="Akka.NET Project"> | ||
// Copyright (C) 2009-2020 Lightbend Inc. <http://www.lightbend.com> | ||
// Copyright (C) 2013-2020 .NET Foundation <https://github.com/akkadotnet/akka.net> | ||
// </copyright> | ||
//----------------------------------------------------------------------- | ||
|
||
using System; | ||
using System.Linq; | ||
using Akka.Actor; | ||
using Akka.Cluster.Tools.Singleton; | ||
using Akka.Configuration; | ||
using Akka.TestKit; | ||
using Xunit; | ||
|
||
namespace Akka.Cluster.Sharding.Tests | ||
{ | ||
public class ShardedDaemonProcessSpec : AkkaSpec | ||
{ | ||
private sealed class Stop | ||
{ | ||
public static Stop Instance { get; } = new Stop(); | ||
private Stop() { } | ||
} | ||
|
||
private sealed class Started | ||
{ | ||
public int Id { get; } | ||
public IActorRef SelfRef { get; } | ||
|
||
public Started(int id, IActorRef selfRef) | ||
{ | ||
Id = id; | ||
SelfRef = selfRef; | ||
} | ||
} | ||
|
||
private class MyActor : UntypedActor | ||
{ | ||
public int Id { get; } | ||
public IActorRef Probe { get; } | ||
|
||
public static Props Props(int id, IActorRef probe) => | ||
Actor.Props.Create(() => new MyActor(id, probe)); | ||
|
||
public MyActor(int id, IActorRef probe) | ||
{ | ||
Id = id; | ||
Probe = probe; | ||
} | ||
|
||
protected override void PreStart() | ||
{ | ||
base.PreStart(); | ||
Probe.Tell(new Started(Id, Context.Self)); | ||
} | ||
|
||
protected override void OnReceive(object message) | ||
{ | ||
if (message is Stop _) | ||
Context.Stop(Self); | ||
} | ||
} | ||
|
||
private static Config GetConfig() | ||
{ | ||
return ConfigurationFactory.ParseString(@" | ||
akka.actor.provider = cluster | ||
akka.remote.dot-netty.tcp.port = 0 | ||
akka.remote.dot-netty.tcp.hostname = 127.0.0.1 | ||
# ping often/start fast for test | ||
akka.cluster.sharded-daemon-process.keep-alive-interval = 1s | ||
akka.coordinated-shutdown.terminate-actor-system = off | ||
akka.coordinated-shutdown.run-by-actor-system-terminate = off") | ||
.WithFallback(ClusterSingletonProxy.DefaultConfig()) | ||
.WithFallback(ClusterSharding.DefaultConfig()); | ||
} | ||
|
||
public ShardedDaemonProcessSpec() | ||
: base(GetConfig()) | ||
{ } | ||
|
||
[Fact] | ||
public void ShardedDaemonProcess_must_have_a_single_node_cluster_running_first() | ||
{ | ||
var probe = CreateTestProbe(); | ||
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress); | ||
probe.AwaitAssert(() => Cluster.Get(Sys).SelfMember.Status.ShouldBe(MemberStatus.Up), TimeSpan.FromSeconds(3)); | ||
} | ||
|
||
[Fact] | ||
public void ShardedDaemonProcess_must_start_N_actors_with_unique_ids() | ||
{ | ||
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress); | ||
|
||
var probe = CreateTestProbe(); | ||
ShardedDaemonProcess.Get(Sys).Init("a", 5, id => MyActor.Props(id, probe.Ref)); | ||
|
||
var started = probe.ReceiveN(5); | ||
started.Count.ShouldBe(5); | ||
} | ||
|
||
[Fact] | ||
public void ShardedDaemonProcess_must_restart_actors_if_they_stop() | ||
{ | ||
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress); | ||
|
||
var probe = CreateTestProbe(); | ||
ShardedDaemonProcess.Get(Sys).Init("stop", 2, id => MyActor.Props(id, probe.Ref)); | ||
|
||
foreach (var started in Enumerable.Range(0, 2).Select(_ => probe.ExpectMsg<Started>())) | ||
started.SelfRef.Tell(Stop.Instance); | ||
|
||
// periodic ping every 1s makes it restart | ||
Enumerable.Range(0, 2).Select(_ => probe.ExpectMsg<Started>(TimeSpan.FromSeconds(3))); | ||
} | ||
|
||
[Fact] | ||
public void ShardedDaemonProcess_must_not_run_if_the_role_does_not_match_node_role() | ||
{ | ||
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress); | ||
|
||
var probe = CreateTestProbe(); | ||
var settings = ShardedDaemonProcessSettings.Create(Sys).WithShardingSettings(ClusterShardingSettings.Create(Sys).WithRole("workers")); | ||
ShardedDaemonProcess.Get(Sys).Init("roles", 3, id => MyActor.Props(id, probe.Ref), settings); | ||
|
||
probe.ExpectNoMsg(); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
179 changes: 179 additions & 0 deletions
179
src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
//----------------------------------------------------------------------- | ||
// <copyright file="ClusterSharding.cs" company="Akka.NET Project"> | ||
// Copyright (C) 2009-2020 Lightbend Inc. <http://www.lightbend.com> | ||
// Copyright (C) 2013-2020 .NET Foundation <https://github.com/akkadotnet/akka.net> | ||
// </copyright> | ||
//----------------------------------------------------------------------- | ||
|
||
using System; | ||
using System.Linq; | ||
using Akka.Actor; | ||
using Akka.Annotations; | ||
using Akka.Util.Internal; | ||
|
||
namespace Akka.Cluster.Sharding | ||
{ | ||
internal class KeepAlivePinger : UntypedActor, IWithTimers | ||
{ | ||
private sealed class Tick | ||
{ | ||
public static Tick Instance { get; } = new Tick(); | ||
private Tick() { } | ||
} | ||
|
||
public string Name { get; } | ||
public string[] Identities { get; } | ||
public IActorRef ShardingRef { get; } | ||
public ShardedDaemonProcessSettings Settings { get; } | ||
|
||
public ITimerScheduler Timers { get; set; } | ||
|
||
public static Props Props(ShardedDaemonProcessSettings settings, string name, string[] identities, IActorRef shardingRef) => | ||
Actor.Props.Create(() => new KeepAlivePinger(settings, name, identities, shardingRef)); | ||
|
||
public KeepAlivePinger(ShardedDaemonProcessSettings settings, string name, string[] identities, IActorRef shardingRef) | ||
{ | ||
Settings = settings; | ||
Name = name; | ||
Identities = identities; | ||
ShardingRef = shardingRef; | ||
} | ||
|
||
protected override void PreStart() | ||
{ | ||
base.PreStart(); | ||
|
||
Context.System.Log.Debug("Starting Sharded Daemon Process KeepAlivePinger for [{0}], with ping interval [{1}]"); | ||
Timers.StartPeriodicTimer("tick", Tick.Instance, Settings.KeepAliveInterval); | ||
TriggerStartAll(); | ||
} | ||
|
||
protected override void OnReceive(object message) | ||
{ | ||
if (message is Tick _) | ||
{ | ||
TriggerStartAll(); | ||
Context.System.Log.Debug("Periodic ping sent to [{0}] processes", Identities.Length); | ||
} | ||
} | ||
|
||
private void TriggerStartAll() => Identities.ForEach(id => ShardingRef.Tell(new ShardRegion.StartEntity(id))); | ||
} | ||
|
||
internal sealed class MessageExtractor : HashCodeMessageExtractor | ||
{ | ||
public MessageExtractor(int maxNumberOfShards) | ||
: base(maxNumberOfShards) | ||
{ } | ||
|
||
public override string EntityId(object message) => (message as ShardingEnvelope)?.EntityId; | ||
public override object EntityMessage(object message) => (message as ShardingEnvelope)?.Message; | ||
public override string ShardId(object message) => message is ShardRegion.StartEntity se ? se.EntityId : EntityId(message); | ||
} | ||
|
||
/// <summary> | ||
/// <para>Default envelope type that may be used with Cluster Sharding.</para> | ||
/// <para> | ||
/// The alternative way of routing messages through sharding is to not use envelopes, | ||
/// and have the message types themselves carry identifiers. | ||
/// </para> | ||
/// </summary> | ||
public sealed class ShardingEnvelope | ||
{ | ||
public string EntityId { get; } | ||
public object Message { get; } | ||
|
||
public ShardingEnvelope(string entityId, object message) | ||
{ | ||
EntityId = entityId; | ||
Message = message; | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// <para>This extension runs a pre set number of actors in a cluster.</para> | ||
/// <para> | ||
/// The typical use case is when you have a task that can be divided in a number of workers, each doing a | ||
/// sharded part of the work, for example consuming the read side events from Akka Persistence through | ||
/// tagged events where each tag decides which consumer that should consume the event. | ||
/// </para> | ||
/// <para>Each named set needs to be started on all the nodes of the cluster on start up.</para> | ||
/// <para> | ||
/// The processes are spread out across the cluster, when the cluster topology changes the processes may be stopped | ||
/// and started anew on a new node to rebalance them. | ||
/// </para> | ||
/// <para>Not for user extension.</para> | ||
/// </summary> | ||
[ApiMayChange] | ||
public class ShardedDaemonProcess : IExtension | ||
{ | ||
private readonly ExtendedActorSystem _system; | ||
|
||
public ShardedDaemonProcess(ExtendedActorSystem system) => _system = system; | ||
|
||
public static ShardedDaemonProcess Get(ActorSystem system) => | ||
system.WithExtension<ShardedDaemonProcess, ShardedDaemonProcessExtensionProvider>(); | ||
|
||
/// <summary> | ||
/// Start a specific number of actors that is then kept alive in the cluster. | ||
/// </summary> | ||
/// <param name="name">TBD</param> | ||
/// <param name="numberOfInstances">TBD</param> | ||
/// <param name="propsFactory">Given a unique id of `0` until `numberOfInstance` create an entity actor.</param> | ||
public void Init(string name, int numberOfInstances, Func<int, Props> propsFactory) | ||
{ | ||
Init(name, numberOfInstances, propsFactory, ShardedDaemonProcessSettings.Create(_system)); | ||
} | ||
|
||
/// <summary> | ||
/// Start a specific number of actors, each with a unique numeric id in the set, that is then kept alive in the cluster. | ||
/// </summary> | ||
/// <param name="name">TBD</param> | ||
/// <param name="numberOfInstances">TBD</param> | ||
/// <param name="propsFactory">Given a unique id of `0` until `numberOfInstance` create an entity actor.</param> | ||
/// <param name="settings">TBD</param> | ||
public void Init(string name, int numberOfInstances, Func<int, Props> propsFactory, ShardedDaemonProcessSettings settings) | ||
{ | ||
// One shard per actor identified by the numeric id encoded in the entity id | ||
var numberOfShards = numberOfInstances; | ||
var entityIds = Enumerable.Range(0, numberOfInstances).Select(i => i.ToString()).ToArray(); | ||
|
||
// Defaults in `akka.cluster.sharding` but allow overrides specifically for actor-set | ||
var shardingBaseSettings = settings.ShardingSettings; | ||
if (shardingBaseSettings == null) | ||
{ | ||
var shardingConfig = _system.Settings.Config.GetConfig("akka.cluster.sharded-daemon-process.sharding"); | ||
var coordinatorSingletonConfig = _system.Settings.Config.GetConfig(shardingConfig.GetString("coordinator-singleton")); | ||
shardingBaseSettings = ClusterShardingSettings.Create(shardingConfig, coordinatorSingletonConfig); | ||
} | ||
|
||
var shardingSettings = new ClusterShardingSettings( | ||
shardingBaseSettings.Role, | ||
false, // remember entities disabled | ||
"", | ||
"", | ||
TimeSpan.Zero, // passivation disabled | ||
StateStoreMode.DData, | ||
shardingBaseSettings.TunningParameters, | ||
shardingBaseSettings.CoordinatorSingletonSettings); | ||
|
||
if (string.IsNullOrEmpty(shardingSettings.Role) || Cluster.Get(_system).SelfRoles.Contains(shardingSettings.Role)) | ||
{ | ||
var shardRegion = ClusterSharding.Get(_system).Start( | ||
typeName: $"sharded-daemon-process-{name}", | ||
entityPropsFactory: entityId => propsFactory(int.Parse(entityId)), | ||
settings: shardingSettings, | ||
messageExtractor: new MessageExtractor(numberOfShards)); | ||
|
||
_system.ActorOf( | ||
KeepAlivePinger.Props(settings, name, entityIds, shardRegion), | ||
$"ShardedDaemonProcessKeepAlive-{name}"); | ||
} | ||
} | ||
} | ||
|
||
public class ShardedDaemonProcessExtensionProvider : ExtensionIdProvider<ShardedDaemonProcess> | ||
{ | ||
public override ShardedDaemonProcess CreateExtension(ExtendedActorSystem system) => new ShardedDaemonProcess(system); | ||
} | ||
} |
Oops, something went wrong.