Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of the feature called ClusterDistribution in Lagom #4455

Merged
merged 1 commit into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docs/articles/clustering/cluster-sharded-daemon-process.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Sharded Daemon Process
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved

> [!WARNING]
>This module is currently marked as [may change](../utilities/may-change.md) because it is a new feature that
>needs feedback from real usage before finalizing the API. This means that API or semantics can change without
>warning or deprecation period. It is also not recommended to use this module in production just yet.

## Introduction

Sharded Daemon Process provides a way to run `N` actors, each given a numeric id starting from `0` that are then kept alive
and balanced across the cluster. When a rebalance is needed the actor is stopped and, triggered by a keep alive running on
all nodes, started on a new node (the keep alive should be seen as an implementation detail and may change in future versions).

The intended use case is for splitting data processing workloads across a set number of workers that each get to work on a subset
of the data that needs to be processed. This is commonly needed to create projections based on the event streams available
from all the [Persistent Actors](../persistence/event-sourcing.md) in a CQRS application. Events are tagged with one out of `N` tags
used to split the workload of consuming and updating a projection between `N` workers.

For cases where a single actor needs to be kept alive see [Cluster Singleton](cluster-singleton.md)

## Basic example

To set up a set of actors running with Sharded Daemon process each node in the cluster needs to run the same initialization
when starting up:

[!code-csharp[ShardedDaemonProcessSpec.cs](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardedDaemonProcessSpec.cs?name=tag-processing)]

## Scalability

This cluster tool is intended for small numbers of consumers and will not scale well to a large set. In large clusters
it is recommended to limit the nodes the sharded daemon process will run on using a role.
2 changes: 2 additions & 0 deletions docs/articles/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@
href: clustering/cluster-client.md
- name: Cluster Sharding
href: clustering/cluster-sharding.md
- name: Sharded Daemon Process
href: clustering/cluster-sharded-daemon-process.md
- name: Cluster Metrics
href: clustering/cluster-metrics.md
- name: Distributed Data
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterShardingSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2020 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2020 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using Akka.Actor;
using Akka.Cluster.TestKit;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.Remote.TestKit;
using FluentAssertions;

namespace Akka.Cluster.Sharding.Tests.MultiNode
{
public class ShardedDaemonProcessSpecConfig : MultiNodeConfig
{
public RoleName First { get; }
public RoleName Second { get; }
public RoleName Third { get; }

public ShardedDaemonProcessSpecConfig()
{
First = Role("first");
Second = Role("second");
Third = Role("third");

CommonConfig = DebugConfig(false)
.WithFallback(ConfigurationFactory.ParseString(@"
akka.loglevel = INFO
akka.cluster.sharded-daemon-process {{
sharding {{
# First is likely to be ignored as shard coordinator not ready
retry-interval = 0.2s
}}
# quick ping to make test swift
keep-alive-interval = 1s
}}
"))
.WithFallback(ClusterSharding.DefaultConfig())
.WithFallback(ClusterSingletonManager.DefaultConfig())
.WithFallback(MultiNodeClusterSpec.ClusterConfig());
}
}

public class ShardedDaemonProcessMultiNode : ShardedDaemonProcessSpec
{
public ShardedDaemonProcessMultiNode() : this(new ShardedDaemonProcessSpecConfig()) { }
protected ShardedDaemonProcessMultiNode(ShardedDaemonProcessSpecConfig config) : base(config, typeof(ShardedDaemonProcessMultiNode)) { }
}

public abstract class ShardedDaemonProcessSpec : MultiNodeClusterSpec
{
private readonly ShardedDaemonProcessSpecConfig _config;

protected ShardedDaemonProcessSpec(ShardedDaemonProcessSpecConfig config, Type type)
: base(config, type)
{
_config = config;
}

[MultiNodeFact]
public void ShardedDaemonProcess_Specs()
{
ShardedDaemonProcess_Should_Init_Actor_Set();
}

public void ShardedDaemonProcess_Should_Init_Actor_Set()
{
// HACK
RunOn(() => FormCluster(_config.First, _config.Second, _config.Third), _config.First);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test does not pass unless I put FormCluster() inside a RunOn(). I'm missing something.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a look during review

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, off the top of my head, what the issue might be here - what type of error do you get?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only get one ProcessActorEvent instead of the 4 expected (one per ProcessActor), so the test fails with timeout.


var probe = CreateTestProbe();
ShardedDaemonProcess.Get(Sys).Init("the-fearless", 4, id => ProcessActor.Props(id, probe.Ref));
EnterBarrier("actor-set-initialized");

RunOn(() =>
{
var startedIds = Enumerable.Range(0, 4).Select(_ =>
{
var evt = probe.ExpectMsg<ProcessActorEvent>(TimeSpan.FromSeconds(5));
evt.Event.Should().Be("Started");
return evt.Id;
}).ToList();
startedIds.Count.Should().Be(4);
}, _config.First);
EnterBarrier("actor-set-started");
}

private void FormCluster(RoleName first, params RoleName[] rest)
{
RunOn(() =>
{
Cluster.Join(GetAddress(first));
AwaitAssert(() =>
{
Cluster.State.Members.Select(i => i.UniqueAddress).Should().Contain(Cluster.SelfUniqueAddress);
Cluster.State.Members.Select(i => i.Status).Should().OnlyContain(i => i == MemberStatus.Up);
});
}, first);
EnterBarrier(first.Name + "-joined");

foreach (var node in rest)
{
RunOn(() =>
{
Cluster.Join(GetAddress(first));
AwaitAssert(() =>
{
Cluster.State.Members.Select(i => i.UniqueAddress).Should().Contain(Cluster.SelfUniqueAddress);
Cluster.State.Members.Select(i => i.Status).Should().OnlyContain(i => i == MemberStatus.Up);
});
}, node);
}
EnterBarrier("all-joined");
}
}

internal class ProcessActor : UntypedActor
{
#region Protocol

[Serializable]
public sealed class Stop
{
public static readonly Stop Instance = new Stop();
private Stop() { }
}

#endregion

public static Props Props(int id, IActorRef probe) =>
Actor.Props.Create(() => new ProcessActor(id, probe));

public ProcessActor(int id, IActorRef probe)
{
Probe = probe;
Id = id;
}

public IActorRef Probe { get; }
public int Id { get; }

protected override void PreStart()
{
base.PreStart();
Probe.Tell(new ProcessActorEvent(Id, "Started"));
}

protected override void OnReceive(object message)
{
if (message is Stop)
{
Probe.Tell(new ProcessActorEvent(Id, "Stopped"));
Context.Stop(Self);
}
}
}

internal sealed class ProcessActorEvent
{
public ProcessActorEvent(int id, object @event)
{
Id = id;
Event = @event;
}

public int Id { get; }
public object Event { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterSharding.cs" company="Akka.NET Project">
// Copyright (C) 2009-2020 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2020 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.TestKit;
using Xunit;

namespace Akka.Cluster.Sharding.Tests
{
public class ShardedDaemonProcessSpec : AkkaSpec
{
private sealed class Stop
{
public static Stop Instance { get; } = new Stop();
private Stop() { }
}

private sealed class Started
{
public int Id { get; }
public IActorRef SelfRef { get; }

public Started(int id, IActorRef selfRef)
{
Id = id;
SelfRef = selfRef;
}
}

private class MyActor : UntypedActor
{
public int Id { get; }
public IActorRef Probe { get; }

public static Props Props(int id, IActorRef probe) =>
Actor.Props.Create(() => new MyActor(id, probe));

public MyActor(int id, IActorRef probe)
{
Id = id;
Probe = probe;
}

protected override void PreStart()
{
base.PreStart();
Probe.Tell(new Started(Id, Context.Self));
}

protected override void OnReceive(object message)
{
if (message is Stop _)
Context.Stop(Self);
}
}

private static Config GetConfig()
{
return ConfigurationFactory.ParseString(@"
akka.actor.provider = cluster
akka.remote.dot-netty.tcp.port = 0
akka.remote.dot-netty.tcp.hostname = 127.0.0.1

# ping often/start fast for test
akka.cluster.sharded-daemon-process.keep-alive-interval = 1s

akka.coordinated-shutdown.terminate-actor-system = off
akka.coordinated-shutdown.run-by-actor-system-terminate = off")
.WithFallback(ClusterSharding.DefaultConfig())
.WithFallback(ClusterSingletonProxy.DefaultConfig());
}

public ShardedDaemonProcessSpec()
: base(GetConfig())
{ }

[Fact]
public void ShardedDaemonProcess_must_have_a_single_node_cluster_running_first()
{
var probe = CreateTestProbe();
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress);
probe.AwaitAssert(() => Cluster.Get(Sys).SelfMember.Status.ShouldBe(MemberStatus.Up), TimeSpan.FromSeconds(3));
}

[Fact]
public void ShardedDaemonProcess_must_start_N_actors_with_unique_ids()
{
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress);

var probe = CreateTestProbe();
ShardedDaemonProcess.Get(Sys).Init("a", 5, id => MyActor.Props(id, probe.Ref));

var started = probe.ReceiveN(5);
started.Count.ShouldBe(5);
}

[Fact]
public void ShardedDaemonProcess_must_restart_actors_if_they_stop()
{
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress);

var probe = CreateTestProbe();
ShardedDaemonProcess.Get(Sys).Init("stop", 2, id => MyActor.Props(id, probe.Ref));

foreach (var started in Enumerable.Range(0, 2).Select(_ => probe.ExpectMsg<Started>()))
started.SelfRef.Tell(Stop.Instance);

// periodic ping every 1s makes it restart
Enumerable.Range(0, 2).Select(_ => probe.ExpectMsg<Started>(TimeSpan.FromSeconds(3)));
}

[Fact]
public void ShardedDaemonProcess_must_not_run_if_the_role_does_not_match_node_role()
{
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress);

var probe = CreateTestProbe();
var settings = ShardedDaemonProcessSettings.Create(Sys).WithShardingSettings(ClusterShardingSettings.Create(Sys).WithRole("workers"));
ShardedDaemonProcess.Get(Sys).Init("roles", 3, id => MyActor.Props(id, probe.Ref), settings);

probe.ExpectNoMsg();
}

// only used in documentation
private class TagProcessor : ReceiveActor
{
public string Tag { get; }

public static Props Props(string tag) =>
Actor.Props.Create(() => new TagProcessor(tag));

public TagProcessor(string tag) => Tag = tag;

protected override void PreStart()
{
// start the processing ...
base.PreStart();
Context.System.Log.Debug("Starting processor for tag {0}", Tag);
}
}

private void DocExample()
{
#region tag-processing
var tags = new[] { "tag-1", "tag-2", "tag-3" };
ShardedDaemonProcess.Get(Sys).Init("TagProcessors", tags.Length, id => TagProcessor.Props(tags[id]));
#endregion
}
}
}
2 changes: 1 addition & 1 deletion src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public override int GetHashCode()
/// to start(it does not guarantee the entity successfully started)
/// </summary>
[Serializable]
public sealed class StartEntityAck : IClusterShardingSerializable
public sealed class StartEntityAck : IClusterShardingSerializable, IDeadLetterSuppression
{
/// <summary>
/// An identifier of a newly started entity. Unique in scope of a given shard.
Expand Down
Loading