Skip to content

Commit

Permalink
[DOCS] Add custom sharding handoff message docs (#7101)
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus authored Feb 16, 2024
1 parent e8f7e17 commit 1a9c209
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 14 deletions.
29 changes: 29 additions & 0 deletions docs/articles/clustering/cluster-sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,35 @@ As you may have seen in the examples above shard resolution algorithm is one of

By default re-balancing process always happens from nodes with the highest number of shards, to the ones with the smallest one. This can be configured into by specifying custom implementation of the `IShardAllocationStrategy` interface in `ClusterSharding.Start` parameters.

## Shard Re-Balancing

In the shard re-balance process, the coordinator first notifies all `ShardRegion` actors that a handoff for a shard has started.
That means they will start buffering incoming messages for that shard, in the same way as if the shard location is unknown.
During the re-balance process, the coordinator will not answer any requests for the location of shards that are being rebalanced, i.e. local buffering will continue until the handoff is completed.
The `ShardRegion` responsible for the rebalanced shard will stop all entities in that shard by sending the specified `handoffStopMessage` (default `PoisonPill`) to them.
When all entities have been terminated the `ShardRegion` owning the entities will acknowledge the handoff as completed to the coordinator.
Thereafter the coordinator will reply to requests for the location of the shard, thereby allocating a new home for the shard, and then buffered messages in the `ShardRegion` actors are delivered to the new location; this means that the state of the entities are not transferred or migrated.
If the state of the entities are of importance it should be persistent (durable) with Persistence, so that it can be recovered at the new location.

The logic that decides which shards to re-balance is defined in a pluggable shard allocation strategy. The default implementation `LeastShardAllocationStrategy` allocates new shards to the `ShardRegion` (node) with least number of previously allocated shards.

## Intercepting Actor Shutdown During Shard Re-Balancing Handoff Using Custom Stop Message

The default `PoisonPill` handoff message is perfectly fine for most shard implementation where shard entity actor does not need to do additional processing before being stopped.
The default `PoisonPill` message, however, are handled automatically by the `ActorCell`, making it hard for the shard entity actor to intercept handoff stop events. For an actor to intercept such events, we will need to provide a custom type for cluster sharding to use.

To illustrate this, let us assume that we have an actor that needs to perform an asynchronous operation before it can safely shut itself down. We will use a custom handoff message called `Stop` that are declared as such:

[!code-csharp[Stop Message](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingGracefulShutdownOldestSpec.cs?name=StopMessage)]

We then tell the sharding system that we would like to use a custom handoff stop message by passing an instance of it into the cluster sharding `Start()` method:

[!code-csharp[Cluster Start](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingGracefulShutdownOldestSpec.cs?name=ClusterStart)]

We can then intercept this custom message type inside the entity actor message handler and perform our operation before the actor stops:

[!code-csharp[Delayed Stop](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingGracefulShutdownOldestSpec.cs?name=DelayedStop)]

## Reliable Delivery of Messages to Sharded Entity Actors

If you are interested in ensuring that all messages are guaranteed to be delivered to your entity actors even across restarts, re-balancing operations, or crashes then please see "[Reliable Delivery over Akka.Cluster.Sharding](xref:cluster-sharding-delivery)."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,14 @@ protected override bool Receive(object message)
// slow stop previously made it more likely that the coordinator would stop before the local region
public class SlowStopShardedEntity : ActorBase, IWithTimers
{
#region StopMessage
public class Stop
{
public static Stop Instance = new();

private Stop()
{
}
private Stop() { }
}
#endregion

public class ActualStop
{
Expand All @@ -145,6 +145,7 @@ private ActualStop()

public ITimerScheduler Timers { get; set; }

#region DelayedStop
protected override bool Receive(object message)
{
switch (message)
Expand All @@ -161,6 +162,30 @@ protected override bool Receive(object message)
}
return false;
}
#endregion
}

private sealed class MessageExtractor: IMessageExtractor
{
public string EntityId(object message)
=> message switch
{
int id => id.ToString(),
_ => null
};

public object EntityMessage(object message)
=> message;

public string ShardId(object message)
=> message switch
{
int id => id.ToString(),
_ => null
};

public string ShardId(string entityId, object messageHint = null)
=> entityId;
}

private const string TypeName = "Entity";
Expand All @@ -177,21 +202,19 @@ private void Join(RoleName from, RoleName to, string typeName)
base.Join(from, to);
RunOn(() =>
{
StartSharding(typeName);
#region ClusterStart
ClusterSharding.Get(system: Sys).Start(
typeName: typeName,
entityProps: Props.Create(() => new SlowStopShardedEntity()),
settings: Settings.Value,
messageExtractor: new MessageExtractor(),
allocationStrategy: ShardAllocationStrategy.LeastShardAllocationStrategy(absoluteLimit: 2, relativeLimit: 1.0),
handOffStopMessage: SlowStopShardedEntity.Stop.Instance); // This is the custom handoff message instance
#endregion
}, from);
EnterBarrier($"{from}-started");
}

private IActorRef StartSharding(string typeName)
{
return StartSharding(
Sys,
typeName,
entityProps: Props.Create(() => new SlowStopShardedEntity()),
allocationStrategy: ShardAllocationStrategy.LeastShardAllocationStrategy(absoluteLimit: 2, relativeLimit: 1.0),
handOffStopMessage: SlowStopShardedEntity.Stop.Instance);
}

#endregion

[MultiNodeFact]
Expand Down

0 comments on commit 1a9c209

Please sign in to comment.