Skip to content

Commit

Permalink
[WIP] Backport of the feature called ClusterDistribution in Lagom
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed committed Jun 8, 2020
1 parent 48ce8e9 commit 43093ab
Show file tree
Hide file tree
Showing 7 changed files with 491 additions and 2 deletions.
52 changes: 52 additions & 0 deletions docs/articles/clustering/cluster-sharded-daemon-process.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Sharded Daemon Process


> [!WARNING]
>This module is currently marked as [may change](../utilities/may-change.md) because it is a new feature that
>needs feedback from real usage before finalizing the API. This means that API or semantics can change without
>warning or deprecation period. It is also not recommended to use this module in production just yet.
## Introduction

Sharded Daemon Process provides a way to run `N` actors, each given a numeric id starting from `0` that are then kept alive
and balanced across the cluster. When a rebalance is needed the actor is stopped and, triggered by a keep alive running on
all nodes, started on a new node (the keep alive should be seen as an implementation detail and may change in future versions).

The intended use case is for splitting data processing workloads across a set number of workers that each get to work on a subset
of the data that needs to be processed. This is commonly needed to create projections based on the event streams available
from all the [Persistent Actors](../persistence/event-sourcing.md) in a CQRS application. Events are tagged with one out of `N` tags
used to split the workload of consuming and updating a projection between `N` workers.

For cases where a single actor needs to be kept alive see [Cluster Singleton](cluster-singleton.md)

## Basic example

To set up a set of actors running with Sharded Daemon process each node in the cluster needs to run the same initialization
when starting up:

```csharp
class TagProcessor : ReceiveActor
{
public string Tag { get; }

public static Props Props(string tag) =>
Actor.Props.Create(() => new TagProcessor(tag));

public TagProcessor(string tag) => Tag = tag;

protected override void PreStart()
{
// start the processing ...
base.PreStart();
Context.System.Log.Debug("Starting processor for tag {0}", Tag);
}
}

var tags = new[] { "tag-1", "tag-2", "tag-3" };
ShardedDaemonProcess.Get(Sys).Init("TagProcessors", tags.Length, id => TagProcessor.Props(tags[id]));
```

## Scalability

This cluster tool is intended for small numbers of consumers and will not scale well to a large set. In large clusters
it is recommended to limit the nodes the sharded daemon process will run on using a role.
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterSharding.cs" company="Akka.NET Project">
// Copyright (C) 2009-2020 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2020 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.TestKit;
using Xunit;

namespace Akka.Cluster.Sharding.Tests
{
public class ShardedDaemonProcessSpec : AkkaSpec
{
private sealed class Stop
{
public static Stop Instance { get; } = new Stop();
private Stop() { }
}

private sealed class Started
{
public int Id { get; }
public IActorRef SelfRef { get; }

public Started(int id, IActorRef selfRef)
{
Id = id;
SelfRef = selfRef;
}
}

private class MyActor : UntypedActor
{
public int Id { get; }
public IActorRef Probe { get; }

public static Props Props(int id, IActorRef probe) =>
Actor.Props.Create(() => new MyActor(id, probe));

public MyActor(int id, IActorRef probe)
{
Id = id;
Probe = probe;
}

protected override void PreStart()
{
base.PreStart();
Probe.Tell(new Started(Id, Context.Self));
}

protected override void OnReceive(object message)
{
if (message is Stop _)
Context.Stop(Self);
}
}

private static Config GetConfig()
{
return ConfigurationFactory.ParseString(@"
akka.actor.provider = cluster
akka.remote.dot-netty.tcp.port = 0
akka.remote.dot-netty.tcp.hostname = 127.0.0.1
# ping often/start fast for test
akka.cluster.sharded-daemon-process.keep-alive-interval = 1s
akka.coordinated-shutdown.terminate-actor-system = off
akka.coordinated-shutdown.run-by-actor-system-terminate = off")
.WithFallback(ClusterSingletonProxy.DefaultConfig())
.WithFallback(ClusterSharding.DefaultConfig());
}

public ShardedDaemonProcessSpec()
: base(GetConfig())
{ }

[Fact]
public void ShardedDaemonProcess_must_have_a_single_node_cluster_running_first()
{
var probe = CreateTestProbe();
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress);
probe.AwaitAssert(() => Cluster.Get(Sys).SelfMember.Status.ShouldBe(MemberStatus.Up),
TimeSpan.FromSeconds(3));
}

[Fact]
public void ShardedDaemonProcess_must_start_N_actors_with_unique_ids()
{
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress);

var probe = CreateTestProbe();
ShardedDaemonProcess.Get(Sys).Init("a", 5, id => MyActor.Props(id, probe.Ref));

var started = probe.ReceiveN(5);
started.Count.ShouldBe(5);
probe.ExpectNoMsg();
}

[Fact]
public void ShardedDaemonProcess_must_restart_actors_if_they_stop()
{
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress);

var probe = CreateTestProbe();
ShardedDaemonProcess.Get(Sys).Init("stop", 2, id => MyActor.Props(id, probe.Ref));

foreach (var started in Enumerable.Range(0, 2).Select(_ => probe.ExpectMsg<Started>()))
started.SelfRef.Tell(Stop.Instance);

// periodic ping every 1s makes it restart
Enumerable.Range(0, 2).Select(_ => probe.ExpectMsg<Started>(TimeSpan.FromSeconds(3)));
}

[Fact]
public void ShardedDaemonProcess_must_not_run_if_the_role_does_not_match_node_role()
{
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress);

var probe = CreateTestProbe();
var settings = ShardedDaemonProcessSettings.Create(Sys).WithShardingSettings(ClusterShardingSettings.Create(Sys).WithRole("workers"));
ShardedDaemonProcess.Get(Sys).Init("roles", 3, id => MyActor.Props(id, probe.Ref), settings);

probe.ExpectNoMsg();
}
}
}
2 changes: 1 addition & 1 deletion src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public override int GetHashCode()
/// to start(it does not guarantee the entity successfully started)
/// </summary>
[Serializable]
public sealed class StartEntityAck : IClusterShardingSerializable
public sealed class StartEntityAck : IClusterShardingSerializable, IDeadLetterSuppression
{
/// <summary>
/// An identifier of a newly started entity. Unique in scope of a given shard.
Expand Down
173 changes: 173 additions & 0 deletions src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterSharding.cs" company="Akka.NET Project">
// Copyright (C) 2009-2020 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2020 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using Akka.Actor;
using Akka.Annotations;
using Akka.Util.Internal;

namespace Akka.Cluster.Sharding
{
internal class KeepAlivePinger : UntypedActor, IWithTimers
{
private sealed class Tick
{
public static Tick Instance { get; } = new Tick();
private Tick() { }
}

public string Name { get; }
public string[] Identities { get; }
public IActorRef ShardingRef { get; }
public ShardedDaemonProcessSettings Settings { get; }

public ITimerScheduler Timers { get; set; }

public static Props Props(ShardedDaemonProcessSettings settings, string name, string[] identities, IActorRef shardingRef) =>
Actor.Props.Create(() => new KeepAlivePinger(settings, name, identities, shardingRef));

public KeepAlivePinger(ShardedDaemonProcessSettings settings, string name, string[] identities, IActorRef shardingRef)
{
Settings = settings;
Name = name;
Identities = identities;
ShardingRef = shardingRef;
}

protected override void PreStart()
{
base.PreStart();

Context.System.Log.Debug("Starting Sharded Daemon Process KeepAlivePinger for [{0}], with ping interval [{1}]");
Timers.StartPeriodicTimer("tick", Tick.Instance, Settings.KeepAliveInterval);
TriggerStartAll();
}

protected override void OnReceive(object message)
{
if (message is Tick _)
{
TriggerStartAll();
Context.System.Log.Debug("Periodic ping sent to [{0}] processes", Identities.Length);
}
}

private void TriggerStartAll() => Identities.ForEach(id => ShardingRef.Tell(new ShardRegion.StartEntity(id)));
}

/// <summary>
/// <para>Default envelope type that may be used with Cluster Sharding.</para>
/// <para>
/// The alternative way of routing messages through sharding is to not use envelopes,
/// and have the message types themselves carry identifiers.
/// </para>
/// </summary>
public sealed class ShardingEnvelope
{
public string EntityId { get; }
public object Message { get; }

public ShardingEnvelope(string entityId, object message)
{
EntityId = entityId;
Message = message;
}
}

internal sealed class MessageExtractor : HashCodeMessageExtractor
{
public MessageExtractor(int maxNumberOfShards)
: base(maxNumberOfShards)
{ }

public override string EntityId(object message) => (message as ShardingEnvelope).EntityId;
public override object EntityMessage(object message) => (message as ShardingEnvelope)?.Message;
public override string ShardId(object message) => message is ShardRegion.StartEntity se ? se.EntityId : EntityId(message);
}

/// <summary>
/// <para>This extension runs a pre set number of actors in a cluster.</para>
/// <para>
/// The typical use case is when you have a task that can be divided in a number of workers, each doing a
/// sharded part of the work, for example consuming the read side events from Akka Persistence through
/// tagged events where each tag decides which consumer that should consume the event.
/// </para>
/// <para>Each named set needs to be started on all the nodes of the cluster on start up.</para>
/// <para>
/// The processes are spread out across the cluster, when the cluster topology changes the processes may be stopped
/// and started anew on a new node to rebalance them.
/// </para>
/// <para>Not for user extension.</para>
/// </summary>
[ApiMayChange]
public class ShardedDaemonProcess : IExtension
{
private readonly ExtendedActorSystem _system;

public ShardedDaemonProcess(ExtendedActorSystem system) => _system = system;

public static ShardedDaemonProcess Get(ActorSystem system) =>
system.WithExtension<ShardedDaemonProcess, ShardedDaemonProcessExtensionProvider>();

/// <summary>
/// Start a specific number of actors that is then kept alive in the cluster.
/// </summary>
/// <param name="name">TBD</param>
/// <param name="numberOfInstances">TBD</param>
/// <param name="propsFactory">Given a unique id of `0` until `numberOfInstance` create an entity actor.</param>
public void Init(string name, int numberOfInstances, Func<int, Props> propsFactory)
{
Init(name, numberOfInstances, propsFactory, ShardedDaemonProcessSettings.Create(_system));
}

/// <summary>
/// Start a specific number of actors, each with a unique numeric id in the set, that is then kept alive in the cluster.
/// </summary>
/// <param name="name">TBD</param>
/// <param name="numberOfInstances">TBD</param>
/// <param name="propsFactory">Given a unique id of `0` until `numberOfInstance` create an entity actor.</param>
/// <param name="settings">TBD</param>
public void Init(string name, int numberOfInstances, Func<int, Props> propsFactory, ShardedDaemonProcessSettings settings)
{
// One shard per actor identified by the numeric id encoded in the entity id
var numberOfShards = numberOfInstances;
var entityIds = Enumerable.Range(0, numberOfInstances).Select(i => i.ToString()).ToArray();

// Defaults in akka.cluster.sharding but allow overrides specifically for actor-set
var shardingBaseSettings = settings.ShardingSettings ?? ClusterShardingSettings.Create(_system);

var shardingSettings = new ClusterShardingSettings(
shardingBaseSettings.Role,
false, // remember entities disabled
"",
"",
TimeSpan.Zero, // passivation disabled
StateStoreMode.DData,
shardingBaseSettings.TunningParameters,
shardingBaseSettings.CoordinatorSingletonSettings);

if (string.IsNullOrEmpty(shardingSettings.Role) || Cluster.Get(_system).SelfRoles.Contains(shardingSettings.Role))
{
var shardRegion = ClusterSharding.Get(_system).Start(
typeName: $"sharded-daemon-process-{name}",
entityPropsFactory: entityId => propsFactory(int.Parse(entityId)),
settings: shardingSettings,
messageExtractor: new MessageExtractor(numberOfShards));

_system.ActorOf(
KeepAlivePinger.Props(settings, name, entityIds, shardRegion),
$"ShardedDaemonProcessKeepAlive-{name}");
}
}
}

public class ShardedDaemonProcessExtensionProvider : ExtensionIdProvider<ShardedDaemonProcess>
{
public override ShardedDaemonProcess CreateExtension(ExtendedActorSystem system) => new ShardedDaemonProcess(system);
}
}
Loading

0 comments on commit 43093ab

Please sign in to comment.