Skip to content

Commit

Permalink
Backport of the feature called ClusterDistribution in Lagom (#4455)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed authored Jun 17, 2020
1 parent d76f406 commit 5e559f7
Show file tree
Hide file tree
Showing 9 changed files with 661 additions and 2 deletions.
31 changes: 31 additions & 0 deletions docs/articles/clustering/cluster-sharded-daemon-process.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Sharded Daemon Process

> [!WARNING]
>This module is currently marked as [may change](../utilities/may-change.md) because it is a new feature that
>needs feedback from real usage before finalizing the API. This means that API or semantics can change without
>warning or deprecation period. It is also not recommended to use this module in production just yet.
## Introduction

Sharded Daemon Process provides a way to run `N` actors, each given a numeric id starting from `0` that are then kept alive
and balanced across the cluster. When a rebalance is needed the actor is stopped and, triggered by a keep alive running on
all nodes, started on a new node (the keep alive should be seen as an implementation detail and may change in future versions).

The intended use case is for splitting data processing workloads across a set number of workers that each get to work on a subset
of the data that needs to be processed. This is commonly needed to create projections based on the event streams available
from all the [Persistent Actors](../persistence/event-sourcing.md) in a CQRS application. Events are tagged with one out of `N` tags
used to split the workload of consuming and updating a projection between `N` workers.

For cases where a single actor needs to be kept alive see [Cluster Singleton](cluster-singleton.md)

## Basic example

To set up a set of actors running with Sharded Daemon process each node in the cluster needs to run the same initialization
when starting up:

[!code-csharp[ShardedDaemonProcessSpec.cs](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardedDaemonProcessSpec.cs?name=tag-processing)]

## Scalability

This cluster tool is intended for small numbers of consumers and will not scale well to a large set. In large clusters
it is recommended to limit the nodes the sharded daemon process will run on using a role.
2 changes: 2 additions & 0 deletions docs/articles/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@
href: clustering/cluster-client.md
- name: Cluster Sharding
href: clustering/cluster-sharding.md
- name: Sharded Daemon Process
href: clustering/cluster-sharded-daemon-process.md
- name: Cluster Metrics
href: clustering/cluster-metrics.md
- name: Distributed Data
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterShardingSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2020 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2020 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using Akka.Actor;
using Akka.Cluster.TestKit;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.Remote.TestKit;
using FluentAssertions;

namespace Akka.Cluster.Sharding.Tests.MultiNode
{
public class ShardedDaemonProcessSpecConfig : MultiNodeConfig
{
public RoleName First { get; }
public RoleName Second { get; }
public RoleName Third { get; }

public ShardedDaemonProcessSpecConfig()
{
First = Role("first");
Second = Role("second");
Third = Role("third");

CommonConfig = DebugConfig(false)
.WithFallback(ConfigurationFactory.ParseString(@"
akka.loglevel = INFO
akka.cluster.sharded-daemon-process {{
sharding {{
# First is likely to be ignored as shard coordinator not ready
retry-interval = 0.2s
}}
# quick ping to make test swift
keep-alive-interval = 1s
}}
"))
.WithFallback(ClusterSharding.DefaultConfig())
.WithFallback(ClusterSingletonManager.DefaultConfig())
.WithFallback(MultiNodeClusterSpec.ClusterConfig());
}
}

public class ShardedDaemonProcessMultiNode : ShardedDaemonProcessSpec
{
public ShardedDaemonProcessMultiNode() : this(new ShardedDaemonProcessSpecConfig()) { }
protected ShardedDaemonProcessMultiNode(ShardedDaemonProcessSpecConfig config) : base(config, typeof(ShardedDaemonProcessMultiNode)) { }
}

public abstract class ShardedDaemonProcessSpec : MultiNodeClusterSpec
{
private readonly ShardedDaemonProcessSpecConfig _config;

protected ShardedDaemonProcessSpec(ShardedDaemonProcessSpecConfig config, Type type)
: base(config, type)
{
_config = config;
}

[MultiNodeFact]
public void ShardedDaemonProcess_Specs()
{
ShardedDaemonProcess_Should_Init_Actor_Set();
}

public void ShardedDaemonProcess_Should_Init_Actor_Set()
{
// HACK
RunOn(() => FormCluster(_config.First, _config.Second, _config.Third), _config.First);

var probe = CreateTestProbe();
ShardedDaemonProcess.Get(Sys).Init("the-fearless", 4, id => ProcessActor.Props(id, probe.Ref));
EnterBarrier("actor-set-initialized");

RunOn(() =>
{
var startedIds = Enumerable.Range(0, 4).Select(_ =>
{
var evt = probe.ExpectMsg<ProcessActorEvent>(TimeSpan.FromSeconds(5));
evt.Event.Should().Be("Started");
return evt.Id;
}).ToList();
startedIds.Count.Should().Be(4);
}, _config.First);
EnterBarrier("actor-set-started");
}

private void FormCluster(RoleName first, params RoleName[] rest)
{
RunOn(() =>
{
Cluster.Join(GetAddress(first));
AwaitAssert(() =>
{
Cluster.State.Members.Select(i => i.UniqueAddress).Should().Contain(Cluster.SelfUniqueAddress);
Cluster.State.Members.Select(i => i.Status).Should().OnlyContain(i => i == MemberStatus.Up);
});
}, first);
EnterBarrier(first.Name + "-joined");

foreach (var node in rest)
{
RunOn(() =>
{
Cluster.Join(GetAddress(first));
AwaitAssert(() =>
{
Cluster.State.Members.Select(i => i.UniqueAddress).Should().Contain(Cluster.SelfUniqueAddress);
Cluster.State.Members.Select(i => i.Status).Should().OnlyContain(i => i == MemberStatus.Up);
});
}, node);
}
EnterBarrier("all-joined");
}
}

internal class ProcessActor : UntypedActor
{
#region Protocol

[Serializable]
public sealed class Stop
{
public static readonly Stop Instance = new Stop();
private Stop() { }
}

#endregion

public static Props Props(int id, IActorRef probe) =>
Actor.Props.Create(() => new ProcessActor(id, probe));

public ProcessActor(int id, IActorRef probe)
{
Probe = probe;
Id = id;
}

public IActorRef Probe { get; }
public int Id { get; }

protected override void PreStart()
{
base.PreStart();
Probe.Tell(new ProcessActorEvent(Id, "Started"));
}

protected override void OnReceive(object message)
{
if (message is Stop)
{
Probe.Tell(new ProcessActorEvent(Id, "Stopped"));
Context.Stop(Self);
}
}
}

internal sealed class ProcessActorEvent
{
public ProcessActorEvent(int id, object @event)
{
Id = id;
Event = @event;
}

public int Id { get; }
public object Event { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterSharding.cs" company="Akka.NET Project">
// Copyright (C) 2009-2020 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2020 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.TestKit;
using Xunit;

namespace Akka.Cluster.Sharding.Tests
{
public class ShardedDaemonProcessSpec : AkkaSpec
{
private sealed class Stop
{
public static Stop Instance { get; } = new Stop();
private Stop() { }
}

private sealed class Started
{
public int Id { get; }
public IActorRef SelfRef { get; }

public Started(int id, IActorRef selfRef)
{
Id = id;
SelfRef = selfRef;
}
}

private class MyActor : UntypedActor
{
public int Id { get; }
public IActorRef Probe { get; }

public static Props Props(int id, IActorRef probe) =>
Actor.Props.Create(() => new MyActor(id, probe));

public MyActor(int id, IActorRef probe)
{
Id = id;
Probe = probe;
}

protected override void PreStart()
{
base.PreStart();
Probe.Tell(new Started(Id, Context.Self));
}

protected override void OnReceive(object message)
{
if (message is Stop _)
Context.Stop(Self);
}
}

private static Config GetConfig()
{
return ConfigurationFactory.ParseString(@"
akka.actor.provider = cluster
akka.remote.dot-netty.tcp.port = 0
akka.remote.dot-netty.tcp.hostname = 127.0.0.1
# ping often/start fast for test
akka.cluster.sharded-daemon-process.keep-alive-interval = 1s
akka.coordinated-shutdown.terminate-actor-system = off
akka.coordinated-shutdown.run-by-actor-system-terminate = off")
.WithFallback(ClusterSharding.DefaultConfig())
.WithFallback(ClusterSingletonProxy.DefaultConfig());
}

public ShardedDaemonProcessSpec()
: base(GetConfig())
{ }

[Fact]
public void ShardedDaemonProcess_must_have_a_single_node_cluster_running_first()
{
var probe = CreateTestProbe();
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress);
probe.AwaitAssert(() => Cluster.Get(Sys).SelfMember.Status.ShouldBe(MemberStatus.Up), TimeSpan.FromSeconds(3));
}

[Fact]
public void ShardedDaemonProcess_must_start_N_actors_with_unique_ids()
{
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress);

var probe = CreateTestProbe();
ShardedDaemonProcess.Get(Sys).Init("a", 5, id => MyActor.Props(id, probe.Ref));

var started = probe.ReceiveN(5);
started.Count.ShouldBe(5);
}

[Fact]
public void ShardedDaemonProcess_must_restart_actors_if_they_stop()
{
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress);

var probe = CreateTestProbe();
ShardedDaemonProcess.Get(Sys).Init("stop", 2, id => MyActor.Props(id, probe.Ref));

foreach (var started in Enumerable.Range(0, 2).Select(_ => probe.ExpectMsg<Started>()))
started.SelfRef.Tell(Stop.Instance);

// periodic ping every 1s makes it restart
Enumerable.Range(0, 2).Select(_ => probe.ExpectMsg<Started>(TimeSpan.FromSeconds(3)));
}

[Fact]
public void ShardedDaemonProcess_must_not_run_if_the_role_does_not_match_node_role()
{
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress);

var probe = CreateTestProbe();
var settings = ShardedDaemonProcessSettings.Create(Sys).WithShardingSettings(ClusterShardingSettings.Create(Sys).WithRole("workers"));
ShardedDaemonProcess.Get(Sys).Init("roles", 3, id => MyActor.Props(id, probe.Ref), settings);

probe.ExpectNoMsg();
}

// only used in documentation
private class TagProcessor : ReceiveActor
{
public string Tag { get; }

public static Props Props(string tag) =>
Actor.Props.Create(() => new TagProcessor(tag));

public TagProcessor(string tag) => Tag = tag;

protected override void PreStart()
{
// start the processing ...
base.PreStart();
Context.System.Log.Debug("Starting processor for tag {0}", Tag);
}
}

private void DocExample()
{
#region tag-processing
var tags = new[] { "tag-1", "tag-2", "tag-3" };
ShardedDaemonProcess.Get(Sys).Init("TagProcessors", tags.Length, id => TagProcessor.Props(tags[id]));
#endregion
}
}
}
2 changes: 1 addition & 1 deletion src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public override int GetHashCode()
/// to start(it does not guarantee the entity successfully started)
/// </summary>
[Serializable]
public sealed class StartEntityAck : IClusterShardingSerializable
public sealed class StartEntityAck : IClusterShardingSerializable, IDeadLetterSuppression
{
/// <summary>
/// An identifier of a newly started entity. Unique in scope of a given shard.
Expand Down
Loading

0 comments on commit 5e559f7

Please sign in to comment.