Skip to content

Commit

Permalink
porting Cluster heartbeat timings, hardened Akka.Cluster serialization (
Browse files Browse the repository at this point in the history
#4934)

* porting Cluster heartbeat timings, hardened Akka.Cluster serialization

port akka/akka#27281
port akka/akka#25183
port akka/akka#24625

* increased ClusterLogSpec join timespan

Increased the `TimeSpan` here to 10 seconds in order to prevent this spec from failing racily, since even an Akka.Cluster self-join can take more than the default 3 seconds due to some of the timings involved in node startup et al.
  • Loading branch information
Aaronontheweb authored Apr 13, 2021
1 parent fc5b043 commit 97628d4
Show file tree
Hide file tree
Showing 11 changed files with 855 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -437,11 +437,12 @@ namespace Akka.Cluster.SBR
}
namespace Akka.Cluster.Serialization
{
public class ClusterMessageSerializer : Akka.Serialization.Serializer
[Akka.Annotations.InternalApiAttribute()]
public class ClusterMessageSerializer : Akka.Serialization.SerializerWithStringManifest
{
public ClusterMessageSerializer(Akka.Actor.ExtendedActorSystem system) { }
public override bool IncludeManifest { get; }
public override object FromBinary(byte[] bytes, System.Type type) { }
public override object FromBinary(byte[] bytes, string manifest) { }
public override string Manifest(object o) { }
public override byte[] ToBinary(object obj) { }
}
}
35 changes: 35 additions & 0 deletions src/core/Akka.Cluster.Tests/ClusterHeartbeatReceiverSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterHeartbeatReceiverSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Actor;
using Akka.Configuration;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;
using static Akka.Cluster.ClusterHeartbeatSender;

namespace Akka.Cluster.Tests
{
public class ClusterHeartbeatReceiverSpec : AkkaSpec
{
public static Config Config = @"akka.actor.provider = cluster";

public ClusterHeartbeatReceiverSpec(ITestOutputHelper output)
: base(Config, output)
{

}

[Fact]
public void ClusterHeartbeatReceiver_should_respond_to_heartbeats_with_same_SeqNo_and_SendTime()
{
var heartbeater = Sys.ActorOf(ClusterHeartbeatReceiver.Props(() => Cluster.Get(Sys)));
heartbeater.Tell(new Heartbeat(Cluster.Get(Sys).SelfAddress, 1, 2));
ExpectMsg<HeartbeatRsp>(new HeartbeatRsp(Cluster.Get(Sys).SelfUniqueAddress, 1, 2));
}
}
}
66 changes: 66 additions & 0 deletions src/core/Akka.Cluster.Tests/ClusterHeartbeatSenderSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterHeartbeatSenderSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System.Collections.Immutable;
using Akka.Actor;
using Akka.Configuration;
using Akka.TestKit;
using Akka.Util;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
using static Akka.Cluster.ClusterHeartbeatSender;

namespace Akka.Cluster.Tests
{
public class ClusterHeartbeatSenderSpec : AkkaSpec
{
class TestClusterHeartbeatSender : ClusterHeartbeatSender
{
private readonly TestProbe _probe;

public TestClusterHeartbeatSender(TestProbe probe)
{
_probe = probe;
}

protected override void PreStart()
{
// don't register for cluster events
}

protected override ActorSelection HeartbeatReceiver(Address address)
{
return Context.ActorSelection(_probe.Ref.Path);
}
}

public static readonly Config Config = @"
akka.loglevel = DEBUG
akka.actor.provider = cluster
akka.cluster.failure-detector.heartbeat-interval = 0.2s
";

public ClusterHeartbeatSenderSpec(ITestOutputHelper output)
: base(Config, output){ }

[Fact]
public void ClusterHeartBeatSender_must_increment_heartbeat_SeqNo()
{
var probe = CreateTestProbe();
var underTest = Sys.ActorOf(Props.Create(() => new TestClusterHeartbeatSender(probe)));

underTest.Tell(new ClusterEvent.CurrentClusterState());
underTest.Tell(new ClusterEvent.MemberUp(new Member(
new UniqueAddress(new Address("akka", Sys.Name), 1), 1,
MemberStatus.Up, ImmutableHashSet<string>.Empty, AppVersion.Zero)));

probe.ExpectMsg<Heartbeat>().SequenceNr.Should().Be(1L);
probe.ExpectMsg<Heartbeat>().SequenceNr.Should().Be(2L);
}
}
}
10 changes: 7 additions & 3 deletions src/core/Akka.Cluster.Tests/ClusterLogSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using Akka.Actor;
using Akka.Configuration;
Expand Down Expand Up @@ -56,9 +57,12 @@ protected void AwaitUp()
/// </summary>
protected void Join(string expected)
{
EventFilter
.Info(contains: expected)
.ExpectOne(() => _cluster.Join(_selfAddress));
Within(TimeSpan.FromSeconds(10), () =>
{
EventFilter
.Info(contains: expected)
.ExpectOne(() => _cluster.Join(_selfAddress));
});
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Cluster.Routing;
using Akka.Cluster.Serialization;
using Akka.Routing;
using Akka.Serialization;
using Akka.TestKit;
using Xunit;
using FluentAssertions;
using Akka.Util;
using Google.Protobuf;

namespace Akka.Cluster.Tests.Serialization
{
Expand All @@ -33,19 +36,47 @@ public ClusterMessageSerializerSpec()
public void Can_serialize_Heartbeat()
{
var address = new Address("akka.tcp", "system", "some.host.org", 4711);
var message = new ClusterHeartbeatSender.Heartbeat(address);
var message = new ClusterHeartbeatSender.Heartbeat(address, -1, -1);
AssertEqual(message);
}

[Fact]
public void Can_serialize_Hearbeatv1419_later()
{
var hb = new Akka.Cluster.Serialization.Proto.Msg.Heartbeat()
{
From = Akka.Cluster.Serialization.ClusterMessageSerializer.AddressToProto(a1.Address),
CreationTime = 2,
SequenceNr = 1
}.ToByteArray();

var serializer = (SerializerWithStringManifest)Sys.Serialization.FindSerializerForType(typeof(ClusterHeartbeatSender.Heartbeat));
serializer.FromBinary(hb, Akka.Cluster.Serialization.ClusterMessageSerializer.HeartBeatManifest);
}

[Fact]
public void Can_serialize_HeartbeatRsp()
{
var address = new Address("akka.tcp", "system", "some.host.org", 4711);
var uniqueAddress = new UniqueAddress(address, 17);
var message = new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress);
var message = new ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress, -1, -1);
AssertEqual(message);
}

[Fact]
public void Can_serialize_HearbeatRspv1419_later()
{
var hb = new Akka.Cluster.Serialization.Proto.Msg.HeartBeatResponse()
{
From = Akka.Cluster.Serialization.ClusterMessageSerializer.UniqueAddressToProto(a1.UniqueAddress),
CreationTime = 2,
SequenceNr = 1
}.ToByteArray();

var serializer = (SerializerWithStringManifest)Sys.Serialization.FindSerializerForType(typeof(ClusterHeartbeatSender.Heartbeat));
serializer.FromBinary(hb, Akka.Cluster.Serialization.ClusterMessageSerializer.HeartBeatRspManifest);
}

[Fact]
public void Can_serialize_GossipEnvelope()
{
Expand Down Expand Up @@ -191,6 +222,7 @@ private T AssertAndReturn<T>(T message)
{
var serializer = Sys.Serialization.FindSerializerFor(message);
var serialized = serializer.ToBinary(message);
serializer.Should().BeOfType<ClusterMessageSerializer>();
return serializer.FromBinary<T>(serialized);
}

Expand Down
26 changes: 22 additions & 4 deletions src/core/Akka.Cluster/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -538,10 +538,7 @@ internal void Shutdown()
LogInfo("Shutting down...");
System.Stop(_clusterDaemons);

if (_readView != null)
{
_readView.Dispose();
}
_readView?.Dispose();

LogInfo("Successfully shut down");
}
Expand Down Expand Up @@ -583,6 +580,27 @@ public InfoLogger(ILoggingAdapter log, ClusterSettings settings, Address selfAdd
_selfAddress = selfAddress;
}

/// <summary>
/// Creates an <see cref="Akka.Event.LogLevel.DebugLevel"/> log entry with the specific message.
/// </summary>
/// <param name="message">The message being logged.</param>
internal void LogDebug(string message)
{
if (_log.IsDebugEnabled)
_log.Debug("Cluster Node [{0}] - {1}", _selfAddress, message);
}

/// <summary>
/// Creates an <see cref="Akka.Event.LogLevel.DebugLevel"/> log entry with the specific template and arguments.
/// </summary>
/// <param name="template">The template being rendered and logged.</param>
/// <param name="arg1">The argument that fills in the template placeholder.</param>
internal void LogDebug(string template, object arg1)
{
if (_log.IsDebugEnabled)
_log.Debug("Cluster Node [{1}] - " + template, arg1, _selfAddress);
}

/// <summary>
/// Creates an <see cref="Akka.Event.LogLevel.InfoLevel"/> log entry with the specific message.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster/ClusterDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ private void CreateChildren()
{
_coreSupervisor = Context.ActorOf(Props.Create<ClusterCoreSupervisor>(), "core");

Context.ActorOf(Props.Create<ClusterHeartbeatReceiver>(), "heartbeatReceiver");
Context.ActorOf(ClusterHeartbeatReceiver.Props(() => Cluster.Get(Context.System)), "heartbeatReceiver");
}

protected override void PostStop()
Expand Down
Loading

0 comments on commit 97628d4

Please sign in to comment.