Skip to content

Commit

Permalink
porting Cluster heartbeat timings, hardened Akka.Cluster serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus authored and Aaronontheweb committed Apr 13, 2021
1 parent ef383bc commit dd0f6b2
Show file tree
Hide file tree
Showing 10 changed files with 862 additions and 151 deletions.
35 changes: 35 additions & 0 deletions src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterHeartbeatReceiverSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Actor;
using Akka.Configuration;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;
using static Akka.Cluster.ClusterHeartbeatSender;

namespace Akka.Cluster.Tests
{
public class ClusterHeartbeatReceiverSpec : AkkaSpec
{
public static Config Config = @"akka.actor.provider = cluster";

public ClusterHeartbeatReceiverSpec(ITestOutputHelper output)
: base(Config, output)
{

}

[Fact]
public void ClusterHeartbeatReceiver_should_respond_to_heartbeats_with_same_SeqNo_and_SendTime()
{
var heartbeater = Sys.ActorOf(ClusterHeartbeatReceiver.Props(() => Cluster.Get(Sys)));
heartbeater.Tell(new Heartbeat(Cluster.Get(Sys).SelfAddress, 1, 2));
ExpectMsg<HeartbeatRsp>(new HeartbeatRsp(Cluster.Get(Sys).SelfUniqueAddress, 1, 2));
}
}
}
66 changes: 66 additions & 0 deletions src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterHeartbeatSenderSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System.Collections.Immutable;
using Akka.Actor;
using Akka.Configuration;
using Akka.TestKit;
using Akka.Util;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
using static Akka.Cluster.ClusterHeartbeatSender;

namespace Akka.Cluster.Tests
{
public class ClusterHeartbeatSenderSpec : AkkaSpec
{
class TestClusterHeartbeatSender : ClusterHeartbeatSender
{
private readonly TestProbe _probe;

public TestClusterHeartbeatSender(TestProbe probe)
{
_probe = probe;
}

protected override void PreStart()
{
// don't register for cluster events
}

protected override ActorSelection HeartbeatReceiver(Address address)
{
return Context.ActorSelection(_probe.Ref.Path);
}
}

public static readonly Config Config = @"
akka.loglevel = DEBUG
akka.actor.provider = cluster
akka.cluster.failure-detector.heartbeat-interval = 0.2s
";

public ClusterHeartbeatSenderSpec(ITestOutputHelper output)
: base(Config, output){ }

[Fact]
public void ClusterHeartBeatSender_must_increment_heartbeat_SeqNo()
{
var probe = CreateTestProbe();
var underTest = Sys.ActorOf(Props.Create(() => new TestClusterHeartbeatSender(probe)));

underTest.Tell(new ClusterEvent.CurrentClusterState());
underTest.Tell(new ClusterEvent.MemberUp(new Member(
new UniqueAddress(new Address("akka", Sys.Name), 1), 1,
MemberStatus.Up, ImmutableHashSet<string>.Empty, AppVersion.Zero)));

probe.ExpectMsg<Heartbeat>().SequenceNr.Should().Be(1L);
probe.ExpectMsg<Heartbeat>().SequenceNr.Should().Be(2L);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Cluster.Routing;
using Akka.Cluster.Serialization;
using Akka.Routing;
using Akka.Serialization;
using Akka.TestKit;
using Xunit;
using FluentAssertions;
using Akka.Util;
using Google.Protobuf;

namespace Akka.Cluster.Tests.Serialization
{
Expand All @@ -33,19 +36,47 @@ public ClusterMessageSerializerSpec()
public void Can_serialize_Heartbeat()
{
var address = new Address("akka.tcp", "system", "some.host.org", 4711);
var message = new ClusterHeartbeatSender.Heartbeat(address);
var message = new ClusterHeartbeatSender.Heartbeat(address, -1, -1);
AssertEqual(message);
}

[Fact]
public void Can_serialize_Hearbeatv1419_later()
{
var hb = new Akka.Cluster.Serialization.Proto.Msg.Heartbeat()
{
From = Akka.Cluster.Serialization.ClusterMessageSerializer.AddressToProto(a1.Address),
CreationTime = 2,
SequenceNr = 1
}.ToByteArray();

var serializer = (SerializerWithStringManifest)Sys.Serialization.FindSerializerForType(typeof(ClusterHeartbeatSender.Heartbeat));
serializer.FromBinary(hb, Akka.Cluster.Serialization.ClusterMessageSerializer.HeartBeatManifest);
}

[Fact]
public void Can_serialize_HeartbeatRsp()
{
var address = new Address("akka.tcp", "system", "some.host.org", 4711);
var uniqueAddress = new UniqueAddress(address, 17);
var message = new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress);
var message = new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, -1, -1);
AssertEqual(message);
}

[Fact]
public void Can_serialize_HearbeatRspv1419_later()
{
var hb = new Akka.Cluster.Serialization.Proto.Msg.HeartBeatResponse()
{
From = Akka.Cluster.Serialization.ClusterMessageSerializer.UniqueAddressToProto(a1.UniqueAddress),
CreationTime = 2,
SequenceNr = 1
}.ToByteArray();

var serializer = (SerializerWithStringManifest)Sys.Serialization.FindSerializerForType(typeof(ClusterHeartbeatSender.Heartbeat));
serializer.FromBinary(hb, Akka.Cluster.Serialization.ClusterMessageSerializer.HeartBeatRspManifest);
}

[Fact]
public void Can_serialize_GossipEnvelope()
{
Expand Down Expand Up @@ -191,6 +222,7 @@ private T AssertAndReturn<T>(T message)
{
var serializer = Sys.Serialization.FindSerializerFor(message);
var serialized = serializer.ToBinary(message);
serializer.Should().BeOfType<ClusterMessageSerializer>();
return serializer.FromBinary<T>(serialized);
}

Expand Down
26 changes: 22 additions & 4 deletions src/core/Akka.Cluster/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -538,10 +538,7 @@ internal void Shutdown()
LogInfo("Shutting down...");
System.Stop(_clusterDaemons);

if (_readView != null)
{
_readView.Dispose();
}
_readView?.Dispose();

LogInfo("Successfully shut down");
}
Expand Down Expand Up @@ -583,6 +580,27 @@ public InfoLogger(ILoggingAdapter log, ClusterSettings settings, Address selfAdd
_selfAddress = selfAddress;
}

/// <summary>
/// Creates an <see cref="Akka.Event.LogLevel.DebugLevel"/> log entry with the specific message.
/// </summary>
/// <param name="message">The message being logged.</param>
internal void LogDebug(string message)
{
if (_log.IsDebugEnabled)
_log.Debug("Cluster Node [{0}] - {1}", _selfAddress, message);
}

/// <summary>
/// Creates an <see cref="Akka.Event.LogLevel.DebugLevel"/> log entry with the specific template and arguments.
/// </summary>
/// <param name="template">The template being rendered and logged.</param>
/// <param name="arg1">The argument that fills in the template placeholder.</param>
internal void LogDebug(string template, object arg1)
{
if (_log.IsDebugEnabled)
_log.Debug("Cluster Node [{1}] - " + template, arg1, _selfAddress);
}

/// <summary>
/// Creates an <see cref="Akka.Event.LogLevel.InfoLevel"/> log entry with the specific message.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster/ClusterDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ private void CreateChildren()
{
_coreSupervisor = Context.ActorOf(Props.Create<ClusterCoreSupervisor>(), "core");

Context.ActorOf(Props.Create<ClusterHeartbeatReceiver>(), "heartbeatReceiver");
Context.ActorOf(ClusterHeartbeatReceiver.Props(() => Cluster.Get(Context.System)), "heartbeatReceiver");
}

protected override void PostStop()
Expand Down
Loading

0 comments on commit dd0f6b2

Please sign in to comment.