Skip to content

Commit

Permalink
Automatic passivation for sharding (#3718)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed authored and Aaronontheweb committed Feb 10, 2019
1 parent 30255b2 commit 010dfd6
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 68 deletions.
6 changes: 5 additions & 1 deletion docs/articles/clustering/cluster-sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ By default rebalancing process always happens from nodes with the highest number

## Passivation

To reduce memory consumption, you may decide to stop entities after some period of inactivity using `Context.SetReceiveTimeout(timeout)`. In order to make cluster sharding aware of stopping entities, **DON'T use `Context.Stop(Self)` on the entities**, as this may result in losing messages. Instead send a `Passivate` message message to current entity `Context.Parent` (which is shard itself in this case). This will inform shard to stop forwarding messages to target entity, and buffer them instead until it's terminated. Once that happens, if there are still some messages buffered, entity will be reincarnated and messages flushed to it automatically.
To reduce memory consumption, you may decide to stop entities after some period of inactivity using `Context.SetReceiveTimeout(timeout)`. In order to make cluster sharding aware of stopping entities, **DON'T use `Context.Stop(Self)` on the entities**, as this may result in losing messages. Instead send a `ShardRegion.Passivate` message to current entity `Context.Parent` (which is shard itself in this case). This will inform shard to stop forwarding messages to target entity, and buffer them instead until it's terminated. Once that happens, if there are still some messages buffered, entity will be reincarnated and messages flushed to it automatically.

### Automatic Passivation

The entities can be configured to be automatically passivated if they haven't received a message for a while using the `akka.cluster.sharding.passivate-idle-entity-after` setting, or by explicitly setting `ClusterShardingSettings.passivateIdleEntityAfter` to a suitable time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity. By default automatic passivation is disabled.

## Remembering entities

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
//-----------------------------------------------------------------------
// <copyright file="ProxyShardingSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2018 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Globalization;
using System.Linq;
using System.Threading;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.TestKit;
using Akka.TestKit.Xunit2;
using FluentAssertions;
using Xunit;

namespace Akka.Cluster.Sharding.Tests
{
public class InactiveEntityPassivationSpec : AkkaSpec
{
#region Protocol

internal class Passivate
{
public static readonly Passivate Instance = new Passivate();
private Passivate() { }
}

internal class Entity : UntypedActor
{
private readonly string _id = Context.Self.Path.Name;

public IActorRef Probe { get; }

public static Props Props(IActorRef probe) =>
Actor.Props.Create(() => new Entity(probe));

public Entity(IActorRef probe)
{
Probe = probe;
}

protected override void OnReceive(object message)
{
switch (message)
{
case Passivate _:
Probe.Tell($"{_id} passivating");
Context.Stop(Self);
break;
default:
Probe.Tell(new GotIt(_id, message, DateTime.Now.Ticks));
break;
}
}

public class GotIt
{
public string Id { get; }
public object Msg { get; }
public long When { get; }

public GotIt(string id, object msg, long when)
{
Id = id;
Msg = msg;
When = when;
}
}
}

#endregion

private readonly ExtractEntityId _extractEntityId = message =>
message is int msg ? Tuple.Create(msg.ToString(), message) : null;

private readonly ExtractShardId _extractShard = message =>
message is int msg ? (msg % 10).ToString(CultureInfo.InvariantCulture) : null;

public InactiveEntityPassivationSpec()
: base(GetConfig())
{ }

public static Config GetConfig()
{
return ConfigurationFactory.ParseString(@"
akka.loglevel = INFO
akka.actor.provider = cluster
akka.cluster.sharding.passivate-idle-entity-after = 3s")
.WithFallback(ClusterSharding.DefaultConfig())
.WithFallback(ClusterSingletonManager.DefaultConfig());
}

[Fact]
public void Passivation_of_inactive_entities_must_passivate_entities_when_they_have_not_seen_messages_for_the_configured_duration()
{
// Single node cluster
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress);

var probe = new TestProbe(Sys, new XunitAssertions());
var settings = ClusterShardingSettings.Create(Sys);
var region = ClusterSharding.Get(Sys).Start(
"myType",
Entity.Props(probe.Ref),
settings,
_extractEntityId,
_extractShard,
new LeastShardAllocationStrategy(10, 3),
Passivate.Instance);

region.Tell(1);
region.Tell(2);

var responses = new[]
{
probe.ExpectMsg<Entity.GotIt>(),
probe.ExpectMsg<Entity.GotIt>()
};
responses.Select(r => r.Id).Should().BeEquivalentTo("1", "2");
var timeOneSawMessage = responses.Single(r => r.Id == "1").When;

Thread.Sleep(1000);
region.Tell(2);
probe.ExpectMsg<Entity.GotIt>().Id.ShouldBe("2");
Thread.Sleep(1000);
region.Tell(2);
probe.ExpectMsg<Entity.GotIt>().Id.ShouldBe("2");

// Make sure "1" hasn't seen a message in 3 seconds and passivates
var timeSinceOneSawAMessage = DateTime.Now.Ticks - timeOneSawMessage;
probe.ExpectNoMsg(TimeSpan.FromSeconds(3) - TimeSpan.FromTicks(timeSinceOneSawAMessage));
probe.ExpectMsg("1 passivating");

// But it can be re-activated just fine
region.Tell(1);
region.Tell(2);

responses = new[]
{
probe.ExpectMsg<Entity.GotIt>(),
probe.ExpectMsg<Entity.GotIt>()
};
responses.Select(r => r.Id).Should().BeEquivalentTo("1", "2");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ public enum StateStoreMode
[Serializable]
public sealed class ClusterShardingSettings : INoSerializationVerificationNeeded
{

/// <summary>
/// Specifies that this entity type requires cluster nodes with a specific role.
/// If the role is not specified all nodes in the cluster are used.
Expand All @@ -184,10 +183,18 @@ public sealed class ClusterShardingSettings : INoSerializationVerificationNeeded
/// </summary>
public readonly string SnapshotPluginId;

/// <summary>
/// Passivate entities that have not received any message in this interval.
/// Note that only messages sent through sharding are counted, so direct messages
/// to the <see cref="IActorRef"/> of the actor or messages that it sends to itself are not counted as activity.
/// Use 0 to disable automatic passivation.
/// </summary>
public readonly TimeSpan PassivateIdleEntityAfter;

public readonly StateStoreMode StateStoreMode;

/// <summary>
/// TBD
/// Additional tuning parameters, see descriptions in reference.conf
/// </summary>
public readonly TunningParameters TunningParameters;

Expand Down Expand Up @@ -240,11 +247,16 @@ public static ClusterShardingSettings Create(Config config, Config singletonConf
var role = config.GetString("role");
if (role == string.Empty) role = null;

var passivateIdleAfter = config.GetString("passivate-idle-entity-after").ToLower() == "off"
? TimeSpan.Zero
: config.GetTimeSpan("passivate-idle-entity-after");

return new ClusterShardingSettings(
role: role,
rememberEntities: config.GetBoolean("remember-entities"),
journalPluginId: config.GetString("journal-plugin-id"),
snapshotPluginId: config.GetString("snapshot-plugin-id"),
passivateIdleEntityAfter: passivateIdleAfter,
stateStoreMode: (StateStoreMode)Enum.Parse(typeof(StateStoreMode), config.GetString("state-store-mode"), ignoreCase: true),
tunningParameters: tuningParameters,
coordinatorSingletonSettings: coordinatorSingletonSettings);
Expand All @@ -257,6 +269,7 @@ public static ClusterShardingSettings Create(Config config, Config singletonConf
/// <param name="rememberEntities">TBD</param>
/// <param name="journalPluginId">TBD</param>
/// <param name="snapshotPluginId">TBD</param>
/// <param name="passivateIdleEntityAfter">TBD</param>
/// <param name="stateStoreMode">TBD</param>
/// <param name="tunningParameters">TBD</param>
/// <param name="coordinatorSingletonSettings">TBD</param>
Expand All @@ -265,6 +278,7 @@ public ClusterShardingSettings(
bool rememberEntities,
string journalPluginId,
string snapshotPluginId,
TimeSpan passivateIdleEntityAfter,
StateStoreMode stateStoreMode,
TunningParameters tunningParameters,
ClusterSingletonManagerSettings coordinatorSingletonSettings)
Expand All @@ -273,6 +287,7 @@ public ClusterShardingSettings(
RememberEntities = rememberEntities;
JournalPluginId = journalPluginId;
SnapshotPluginId = snapshotPluginId;
PassivateIdleEntityAfter = passivateIdleEntityAfter;
StateStoreMode = stateStoreMode;
TunningParameters = tunningParameters;
CoordinatorSingletonSettings = coordinatorSingletonSettings;
Expand Down Expand Up @@ -300,6 +315,7 @@ public ClusterShardingSettings WithRole(string role)
rememberEntities: RememberEntities,
journalPluginId: JournalPluginId,
snapshotPluginId: SnapshotPluginId,
passivateIdleEntityAfter: PassivateIdleEntityAfter,
stateStoreMode: StateStoreMode,
tunningParameters: TunningParameters,
coordinatorSingletonSettings: CoordinatorSingletonSettings);
Expand Down Expand Up @@ -356,6 +372,11 @@ public ClusterShardingSettings WithTuningParameters(TunningParameters tunningPar
return Copy(tunningParameters: tunningParameters);
}

public ClusterShardingSettings WithPassivateIdleAfter(TimeSpan duration)
{
return Copy(passivateIdleAfter: duration);
}

/// <summary>
/// TBD
/// </summary>
Expand All @@ -377,6 +398,7 @@ private ClusterShardingSettings Copy(
bool? rememberEntities = null,
string journalPluginId = null,
string snapshotPluginId = null,
TimeSpan? passivateIdleAfter = null,
StateStoreMode? stateStoreMode = null,
TunningParameters tunningParameters = null,
ClusterSingletonManagerSettings coordinatorSingletonSettings = null)
Expand All @@ -386,6 +408,7 @@ private ClusterShardingSettings Copy(
rememberEntities: rememberEntities ?? RememberEntities,
journalPluginId: journalPluginId ?? JournalPluginId,
snapshotPluginId: snapshotPluginId ?? SnapshotPluginId,
passivateIdleEntityAfter: passivateIdleAfter ?? PassivateIdleEntityAfter,
stateStoreMode: stateStoreMode ?? StateStoreMode,
tunningParameters: tunningParameters ?? TunningParameters,
coordinatorSingletonSettings: coordinatorSingletonSettings ?? CoordinatorSingletonSettings);
Expand Down
8 changes: 8 additions & 0 deletions src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ internal sealed class DDataShard : ActorBase, IShard, IWithUnboundedStash
public Shard.ShardState State { get; set; } = Shard.ShardState.Empty;
public ImmutableDictionary<string, IActorRef> RefById { get; set; } = ImmutableDictionary<string, IActorRef>.Empty;
public ImmutableDictionary<IActorRef, string> IdByRef { get; set; } = ImmutableDictionary<IActorRef, string>.Empty;
public ImmutableDictionary<string, long> LastMessageTimestamp { get; set; }
public ImmutableHashSet<IActorRef> Passivating { get; set; } = ImmutableHashSet<IActorRef>.Empty;
public ImmutableDictionary<string, ImmutableList<Tuple<object, IActorRef>>> MessageBuffers { get; set; } = ImmutableDictionary<string, ImmutableList<Tuple<object, IActorRef>>>.Empty;
public ICancelable PassivateIdleTask { get; }

private EntityRecoveryStrategy RememberedEntitiesRecoveryStrategy { get; }
public Cluster Cluster { get; } = Cluster.Get(Context.System);
Expand Down Expand Up @@ -97,6 +99,11 @@ public DDataShard(
Settings.TunningParameters.EntityRecoveryConstantRateStrategyNumberOfEntities)
: EntityRecoveryStrategy.AllStrategy;

var idleInterval = TimeSpan.FromTicks(Settings.PassivateIdleEntityAfter.Ticks / 2);
PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero
? Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(idleInterval, idleInterval, Self, Shard.PassivateIdleTick.Instance, Self)
: null;

_readConsistency = new ReadMajority(settings.TunningParameters.WaitingForStateTimeout, majorityCap);
_writeConsistency = new WriteMajority(settings.TunningParameters.UpdatingStateTimeout, majorityCap);
_stateKeys = Enumerable.Range(0, NrOfKeys).Select(i => new ORSetKey<EntryId>($"shard-{typeName}-{shardId}-{i}")).ToImmutableArray();
Expand All @@ -106,6 +113,7 @@ public DDataShard(

public void EntityTerminated(IActorRef tref) => this.BaseEntityTerminated(tref);
public void DeliverTo(string id, object message, object payload, IActorRef sender) => this.BaseDeliverTo(id, message, payload, sender);


protected override bool Receive(object message) => WaitingForState(ImmutableHashSet<int>.Empty)(message);

Expand Down
48 changes: 29 additions & 19 deletions src/contrib/cluster/Akka.Cluster.Sharding/PersistentShard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ internal sealed class PersistentShard : PersistentActor, IShard
public Shard.ShardState State { get; set; } = Shard.ShardState.Empty;
public ImmutableDictionary<string, IActorRef> RefById { get; set; } = ImmutableDictionary<string, IActorRef>.Empty;
public ImmutableDictionary<IActorRef, string> IdByRef { get; set; } = ImmutableDictionary<IActorRef, string>.Empty;
public ImmutableDictionary<string, long> LastMessageTimestamp { get; set; } = ImmutableDictionary<string, long>.Empty;
public ImmutableHashSet<IActorRef> Passivating { get; set; } = ImmutableHashSet<IActorRef>.Empty;
public ImmutableDictionary<string, ImmutableList<Tuple<object, IActorRef>>> MessageBuffers { get; set; } = ImmutableDictionary<string, ImmutableList<Tuple<object, IActorRef>>>.Empty;
public ICancelable PassivateIdleTask { get; }

private EntityRecoveryStrategy RememberedEntitiesRecoveryStrategy { get; }

Expand Down Expand Up @@ -73,6 +75,11 @@ public PersistentShard(
Settings.TunningParameters.EntityRecoveryConstantRateStrategyFrequency,
Settings.TunningParameters.EntityRecoveryConstantRateStrategyNumberOfEntities)
: EntityRecoveryStrategy.AllStrategy;

var idleInterval = TimeSpan.FromTicks(Settings.PassivateIdleEntityAfter.Ticks / 2);
PassivateIdleTask = Settings.PassivateIdleEntityAfter > TimeSpan.Zero
? Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(idleInterval, idleInterval, Self, Shard.PassivateIdleTick.Instance, Self)
: null;
}

public override string PersistenceId { get; }
Expand Down Expand Up @@ -166,31 +173,34 @@ public void ProcessChange<T>(T evt, Action<T> handler) where T : Shard.StateChan

public void EntityTerminated(IActorRef tref)
{
if (IdByRef.TryGetValue(tref, out var id))
if (!IdByRef.TryGetValue(tref, out var id)) return;
IdByRef = IdByRef.Remove(tref);
RefById = RefById.Remove(id);

if (PassivateIdleTask != null)
{
IdByRef = IdByRef.Remove(tref);
RefById = RefById.Remove(id);
LastMessageTimestamp = LastMessageTimestamp.Remove(id);
}

if (MessageBuffers.TryGetValue(id, out var buffer) && buffer.Count != 0)
if (MessageBuffers.TryGetValue(id, out var buffer) && buffer.Count != 0)
{
//Note; because we're not persisting the EntityStopped, we don't need
// to persist the EntityStarted either.
Log.Debug("Starting entity [{0}] again, there are buffered messages for it", id);
this.SendMessageBuffer(new Shard.EntityStarted(id));
}
else
{
if (!Passivating.Contains(tref))
{
//Note; because we're not persisting the EntityStopped, we don't need
// to persist the EntityStarted either.
Log.Debug("Starting entity [{0}] again, there are buffered messages for it", id);
this.SendMessageBuffer(new Shard.EntityStarted(id));
Log.Debug("Entity [{0}] stopped without passivating, will restart after backoff", id);
Context.System.Scheduler.ScheduleTellOnce(Settings.TunningParameters.EntityRestartBackoff, Self, new Shard.RestartEntity(id), ActorRefs.NoSender);
}
else
{
if (!Passivating.Contains(tref))
{
Log.Debug("Entity [{0}] stopped without passivating, will restart after backoff", id);
Context.System.Scheduler.ScheduleTellOnce(Settings.TunningParameters.EntityRestartBackoff, Self, new Shard.RestartEntity(id), ActorRefs.NoSender);
}
else
ProcessChange(new Shard.EntityStopped(id), this.PassivateCompleted);
}

Passivating = Passivating.Remove(tref);
ProcessChange(new Shard.EntityStopped(id), this.PassivateCompleted);
}

Passivating = Passivating.Remove(tref);
}

public void DeliverTo(string id, object message, object payload, IActorRef sender)
Expand Down
Loading

0 comments on commit 010dfd6

Please sign in to comment.