diff --git a/docs/articles/clustering/cluster-sharding.md b/docs/articles/clustering/cluster-sharding.md index 1af99409faf..74eca58124d 100644 --- a/docs/articles/clustering/cluster-sharding.md +++ b/docs/articles/clustering/cluster-sharding.md @@ -83,6 +83,35 @@ As you may have seen in the examples above shard resolution algorithm is one of By default re-balancing process always happens from nodes with the highest number of shards, to the ones with the smallest one. This can be configured into by specifying custom implementation of the `IShardAllocationStrategy` interface in `ClusterSharding.Start` parameters. +## Shard Re-Balancing + +In the shard re-balance process, the coordinator first notifies all `ShardRegion` actors that a handoff for a shard has started. +That means they will start buffering incoming messages for that shard, in the same way as if the shard location is unknown. +During the re-balance process, the coordinator will not answer any requests for the location of shards that are being rebalanced, i.e. local buffering will continue until the handoff is completed. +The `ShardRegion` responsible for the rebalanced shard will stop all entities in that shard by sending the specified `handoffStopMessage` (default `PoisonPill`) to them. +When all entities have been terminated the `ShardRegion` owning the entities will acknowledge the handoff as completed to the coordinator. +Thereafter the coordinator will reply to requests for the location of the shard, thereby allocating a new home for the shard, and then buffered messages in the `ShardRegion` actors are delivered to the new location; this means that the state of the entities are not transferred or migrated. +If the state of the entities are of importance it should be persistent (durable) with Persistence, so that it can be recovered at the new location. + +The logic that decides which shards to re-balance is defined in a pluggable shard allocation strategy. The default implementation `LeastShardAllocationStrategy` allocates new shards to the `ShardRegion` (node) with least number of previously allocated shards. + +## Intercepting Actor Shutdown During Shard Re-Balancing Handoff Using Custom Stop Message + +The default `PoisonPill` handoff message is perfectly fine for most shard implementation where shard entity actor does not need to do additional processing before being stopped. +The default `PoisonPill` message, however, are handled automatically by the `ActorCell`, making it hard for the shard entity actor to intercept handoff stop events. For an actor to intercept such events, we will need to provide a custom type for cluster sharding to use. + +To illustrate this, let us assume that we have an actor that needs to perform an asynchronous operation before it can safely shut itself down. We will use a custom handoff message called `Stop` that are declared as such: + +[!code-csharp[Stop Message](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingGracefulShutdownOldestSpec.cs?name=StopMessage)] + +We then tell the sharding system that we would like to use a custom handoff stop message by passing an instance of it into the cluster sharding `Start()` method: + +[!code-csharp[Cluster Start](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingGracefulShutdownOldestSpec.cs?name=ClusterStart)] + +We can then intercept this custom message type inside the entity actor message handler and perform our operation before the actor stops: + +[!code-csharp[Delayed Stop](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingGracefulShutdownOldestSpec.cs?name=DelayedStop)] + ## Reliable Delivery of Messages to Sharded Entity Actors If you are interested in ensuring that all messages are guaranteed to be delivered to your entity actors even across restarts, re-balancing operations, or crashes then please see "[Reliable Delivery over Akka.Cluster.Sharding](xref:cluster-sharding-delivery)." diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingGracefulShutdownOldestSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingGracefulShutdownOldestSpec.cs index 0d3656c2b30..44d3074b777 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingGracefulShutdownOldestSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingGracefulShutdownOldestSpec.cs @@ -125,14 +125,14 @@ protected override bool Receive(object message) // slow stop previously made it more likely that the coordinator would stop before the local region public class SlowStopShardedEntity : ActorBase, IWithTimers { + #region StopMessage public class Stop { public static Stop Instance = new(); - private Stop() - { - } + private Stop() { } } + #endregion public class ActualStop { @@ -145,6 +145,7 @@ private ActualStop() public ITimerScheduler Timers { get; set; } + #region DelayedStop protected override bool Receive(object message) { switch (message) @@ -161,6 +162,30 @@ protected override bool Receive(object message) } return false; } + #endregion + } + + private sealed class MessageExtractor: IMessageExtractor + { + public string EntityId(object message) + => message switch + { + int id => id.ToString(), + _ => null + }; + + public object EntityMessage(object message) + => message; + + public string ShardId(object message) + => message switch + { + int id => id.ToString(), + _ => null + }; + + public string ShardId(string entityId, object messageHint = null) + => entityId; } private const string TypeName = "Entity"; @@ -177,21 +202,19 @@ private void Join(RoleName from, RoleName to, string typeName) base.Join(from, to); RunOn(() => { - StartSharding(typeName); + #region ClusterStart + ClusterSharding.Get(system: Sys).Start( + typeName: typeName, + entityProps: Props.Create(() => new SlowStopShardedEntity()), + settings: Settings.Value, + messageExtractor: new MessageExtractor(), + allocationStrategy: ShardAllocationStrategy.LeastShardAllocationStrategy(absoluteLimit: 2, relativeLimit: 1.0), + handOffStopMessage: SlowStopShardedEntity.Stop.Instance); // This is the custom handoff message instance + #endregion }, from); EnterBarrier($"{from}-started"); } - private IActorRef StartSharding(string typeName) - { - return StartSharding( - Sys, - typeName, - entityProps: Props.Create(() => new SlowStopShardedEntity()), - allocationStrategy: ShardAllocationStrategy.LeastShardAllocationStrategy(absoluteLimit: 2, relativeLimit: 1.0), - handOffStopMessage: SlowStopShardedEntity.Stop.Instance); - } - #endregion [MultiNodeFact]