Skip to content

Commit

Permalink
Fixed Akka.Remote.ResendUnfulfillableException: Unable to fulfill res…
Browse files Browse the repository at this point in the history
…end request since negatively acknowledged payload is no longer in buffer. (#3914)

close #3905 - Fixed Akka.Remote.ResendUnfulfillableException: Unable to fulfill resend request since negatively acknowledged payload is no longer in buffer.
  • Loading branch information
Aaronontheweb authored Sep 20, 2019
1 parent 9cbcbcf commit 65b4f55
Show file tree
Hide file tree
Showing 10 changed files with 347 additions and 169 deletions.
155 changes: 155 additions & 0 deletions src/core/Akka.Remote.Tests.MultiNode/TransportFailSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Event;
using Akka.Remote.TestKit;
using Akka.Util;

namespace Akka.Remote.Tests.MultiNode
{
public class TransportFailSpecConfig : MultiNodeConfig
{
public RoleName First { get; }
public RoleName Second { get; }

public TransportFailSpecConfig()
{
First = Role("first");
Second = Role("second");

CommonConfig = DebugConfig(true).WithFallback(ConfigurationFactory.ParseString(@"
akka.loglevel = INFO
akka.remote{
transport-failure-detector {
implementation-class = """+ typeof(TestFailureDetector).AssemblyQualifiedName + @"""
heartbeat-interval = 1 s
}
retry-gate-closed-for = 3 s
# Don't trigger watch Terminated
watch-failure-detector.acceptable-heartbeat-pause = 60 s
#use-passive-connections = off
}
"));
}

internal static AtomicBoolean FdAvailable = new AtomicBoolean(true);

/// <summary>
/// Failure detector implementation that will fail when <see cref="FdAvailable"/> is false.
/// </summary>
public class TestFailureDetector : FailureDetector
{
public TestFailureDetector(Config config, EventStream eventStream)
{

}

private volatile bool _active = false;

public override bool IsAvailable => _active ? FdAvailable.Value : true;

public override bool IsMonitoring => _active;

public override void HeartBeat()
{
_active = true;
}
}

public class Subject : ReceiveActor
{
public Subject()
{
ReceiveAny(_ => Sender.Tell(_));
}
}
}

public class TransportFailSpec : MultiNodeSpec
{
private readonly TransportFailSpecConfig _config;

public TransportFailSpec() : this(new TransportFailSpecConfig()) { }

private TransportFailSpec(TransportFailSpecConfig config) : base(config, typeof(TransportFailSpecConfig))
{
_config = config;
}

protected override int InitialParticipantsValueFactory => 2;

private IActorRef Identify(RoleName role, string actorName)
{
var p = CreateTestProbe();
Sys.ActorSelection(Node(role) / "user" / actorName).Tell(new Identify(actorName), p.Ref);
return p.ExpectMsg<ActorIdentity>(RemainingOrDefault).Subject;
}

[MultiNodeFact]
public void TransportFail_should_reconnect()
{
RunOn(() =>
{
EnterBarrier("actors-started");
var subject = Identify(_config.Second, "subject");
Watch(subject);
subject.Tell("hello");
ExpectMsg("hello");
}, _config.First);

RunOn(() =>
{
Sys.ActorOf(Props.Create(() => new TransportFailSpecConfig.Subject()), "subject");
EnterBarrier("actors-started");
}, _config.Second);

EnterBarrier("watch-established");

// trigger transport failure detector
TransportFailSpecConfig.FdAvailable.GetAndSet(false);

// wait for ungated (also later awaitAssert retry)
Task.Delay(RARP.For(Sys).Provider.RemoteSettings.RetryGateClosedFor).Wait();
TransportFailSpecConfig.FdAvailable.GetAndSet(true);

RunOn(() =>
{
EnterBarrier("actors-started2");
var quarantineProbe = CreateTestProbe();
Sys.EventStream.Subscribe(quarantineProbe.Ref, typeof(QuarantinedEvent));
IActorRef subject2 = null;
AwaitAssert(() =>
{
// TODO: harden
Within(TimeSpan.FromSeconds(3), () =>
{
AwaitCondition(() =>
{
subject2 = Identify(_config.Second, "subject2");
return subject2 != null;
}, RemainingOrDefault, TimeSpan.FromSeconds(1));
});
}, TimeSpan.FromSeconds(5));
Watch(subject2);
quarantineProbe.ExpectNoMsg(TimeSpan.FromSeconds(1));
subject2.Tell("hello2");
ExpectMsg("hello2");
EnterBarrier("watch-established2");
ExpectTerminated(subject2);
}, _config.First);

RunOn(() =>
{
var subject2 = Sys.ActorOf(Props.Create(() => new TransportFailSpecConfig.Subject()), "subject2");
EnterBarrier("actors-started2");
EnterBarrier("watch-established2");
subject2.Tell(PoisonPill.Instance);
}, _config.Second);

EnterBarrier("done");
}
}
}
26 changes: 13 additions & 13 deletions src/core/Akka.Remote.Tests/AckedDeliverySpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void SendBuffer_must_remove_messages_from_buffer_when_cumulative_ack_rece
var b8 = b7.Acknowledge(new Ack(new SeqNo(2)));
Assert.True(b8.NonAcked.SequenceEqual(new[] { msg3, msg4 }));

var b9 = b8.Acknowledge(new Ack(new SeqNo(5)));
var b9 = b8.Acknowledge(new Ack(new SeqNo(4)));
Assert.True(b9.NonAcked.Count == 0);
}

Expand Down Expand Up @@ -199,7 +199,7 @@ public void SendBuffer_must_keep_NACKed_messages_in_buffer_if_selective_nacks_ar
Assert.True(b6.NonAcked.Count == 0);
Assert.True(b6.Nacked.SequenceEqual(new[] { msg2, msg3 }));

var b7 = b6.Acknowledge(new Ack(new SeqNo(5)));
var b7 = b6.Acknowledge(new Ack(new SeqNo(4)));
Assert.True(b7.NonAcked.Count == 0);
Assert.True(b7.Nacked.Count == 0);
}
Expand All @@ -226,36 +226,36 @@ public void ReceiveBuffer_must_enqueue_message_in_buffer_if_needed_return_the_li
var msg4 = Msg(4);
var msg5 = Msg(5);

var d1 = b0.Receive(msg1).ExtractDeliverable;
var d1 = b0.Receive(msg1).ExtractDeliverable();
Assert.True(d1.Deliverables.Count == 0);
Assert.Equal(new SeqNo(1), d1.Ack.CumulativeAck);
Assert.True(d1.Ack.Nacks.SequenceEqual(new[]{ new SeqNo(0) }));
var b1 = d1.Buffer;

var d2 = b1.Receive(msg0).ExtractDeliverable;
var d2 = b1.Receive(msg0).ExtractDeliverable();
Assert.True(d2.Deliverables.SequenceEqual(new[] { msg0, msg1 }));
Assert.Equal(new SeqNo(1), d2.Ack.CumulativeAck);
var b3 = d2.Buffer;

var d3 = b3.Receive(msg4).ExtractDeliverable;
var d3 = b3.Receive(msg4).ExtractDeliverable();
Assert.True(d3.Deliverables.Count == 0);
Assert.Equal(new SeqNo(4), d3.Ack.CumulativeAck);
Assert.True(d3.Ack.Nacks.SequenceEqual(new[] { new SeqNo(2), new SeqNo(3) }));
var b4 = d3.Buffer;

var d4 = b4.Receive(msg2).ExtractDeliverable;
var d4 = b4.Receive(msg2).ExtractDeliverable();
Assert.True(d4.Deliverables.SequenceEqual(new[] { msg2 }));
Assert.Equal(new SeqNo(4), d4.Ack.CumulativeAck);
Assert.True(d4.Ack.Nacks.SequenceEqual(new[] { new SeqNo(3) }));
var b5 = d4.Buffer;

var d5 = b5.Receive(msg5).ExtractDeliverable;
var d5 = b5.Receive(msg5).ExtractDeliverable();
Assert.True(d5.Deliverables.Count == 0);
Assert.Equal(new SeqNo(5), d5.Ack.CumulativeAck);
Assert.True(d5.Ack.Nacks.SequenceEqual(new[] { new SeqNo(3) }));
var b6 = d5.Buffer;

var d6 = b6.Receive(msg3).ExtractDeliverable;
var d6 = b6.Receive(msg3).ExtractDeliverable();
Assert.True(d6.Deliverables.SequenceEqual(new[] { msg3, msg4, msg5 }));
Assert.Equal(new SeqNo(5), d6.Ack.CumulativeAck);
}
Expand All @@ -268,11 +268,11 @@ public void ReceiveBuffer_must_handle_duplicate_arrivals_correctly()
var msg1 = Msg(1);
var msg2 = Msg(2);

var buf2 = buf.Receive(msg0).Receive(msg1).Receive(msg2).ExtractDeliverable.Buffer;
var buf2 = buf.Receive(msg0).Receive(msg1).Receive(msg2).ExtractDeliverable().Buffer;

var buf3 = buf2.Receive(msg0).Receive(msg1).Receive(msg2);

var d = buf3.ExtractDeliverable;
var d = buf3.ExtractDeliverable();
Assert.True(d.Deliverables.Count == 0);
Assert.Equal(new SeqNo(2), d.Ack.CumulativeAck);
}
Expand All @@ -290,8 +290,8 @@ public void ReceiveBuffer_must_be_able_to_correctly_merge_with_another_receive_b

var buf = buf1.Receive(msg1a).Receive(msg2).MergeFrom(buf2.Receive(msg1b).Receive(msg3));

var d = buf.Receive(msg0).ExtractDeliverable;
Assert.True(d.Deliverables.SequenceEqual(new []{ msg0, msg1b, msg2, msg3 }));
var d = buf.Receive(msg0).ExtractDeliverable();
Assert.True(d.Deliverables.SequenceEqual(new []{ msg0, msg1a, msg2, msg3 }));
Assert.Equal(new SeqNo(3), d.Ack.CumulativeAck);
}

Expand Down Expand Up @@ -341,7 +341,7 @@ public void SendBuffer_and_ReceiveBuffer_must_correctly_cooperate_with_each_othe
if (sends.Contains(msg)) sndBuf = sndBuf.Buffer(msg);
if (Happened(p))
{
var del = rcvBuf.Receive(msg).ExtractDeliverable;
var del = rcvBuf.Receive(msg).ExtractDeliverable();
rcvBuf = del.Buffer;
dbLog(string.Format("{0} -- {1} --> {2}", sndBuf, msg, rcvBuf));
lastAck = del.Ack;
Expand Down
27 changes: 17 additions & 10 deletions src/core/Akka.Remote.Tests/ActorsLeakSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace Akka.Remote.Tests
public class ActorsLeakSpec : AkkaSpec
{
public static readonly Config Confg = ConfigurationFactory.ParseString(@"
akka.actor.provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""
akka.actor.provider = remote
akka.loglevel = DEBUG
akka.remote.dot-netty.tcp.applied-adapters = [trttl]
akka.remote.dot-netty.tcp.hostname = 127.0.0.1
Expand All @@ -44,13 +44,20 @@ private static ImmutableList<IActorRef> Recurse(IActorRef @ref)
{
var empty = new List<IActorRef>();
var list = empty;
if (@ref is ActorRefWithCell)
if (@ref is ActorRefWithCell wc)
{
var cell = @ref.AsInstanceOf<ActorRefWithCell>().Underlying;
if (cell.ChildrenContainer is EmptyChildrenContainer ||
cell.ChildrenContainer is TerminatedChildrenContainer ||
cell.ChildrenContainer is TerminatingChildrenContainer) list = empty;
else list = cell.ChildrenContainer.Children.Cast<IActorRef>().ToList();
var cell = wc.Underlying;
switch (cell.ChildrenContainer)
{
case TerminatingChildrenContainer _:
case TerminatedChildrenContainer _:
case EmptyChildrenContainer _:
list = empty;
break;
case NormalChildrenContainer n:
list = n.Children.Cast<IActorRef>().ToList();
break;
}
}

return ImmutableList<IActorRef>.Empty.Add(@ref).AddRange(list.SelectMany(Recurse));
Expand All @@ -62,7 +69,7 @@ private static ImmutableList<IActorRef> CollectLiveActors(IActorRef root)
return Recurse(root);
}

class StoppableActor : ReceiveActor
private class StoppableActor : ReceiveActor
{
public StoppableActor()
{
Expand All @@ -75,7 +82,7 @@ public StoppableActor()

private void AssertActors(ImmutableHashSet<IActorRef> expected, ImmutableHashSet<IActorRef> actual)
{
Assert.True(expected.SetEquals(actual));
expected.Should().BeEquivalentTo(actual);
}

[Fact]
Expand Down Expand Up @@ -227,7 +234,7 @@ public void Remoting_must_not_leak_actors()
AwaitAssert(() =>
{
AssertActors(initialActors, targets.SelectMany(CollectLiveActors).ToImmutableHashSet());
}, 5.Seconds());
}, 10.Seconds());
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Remote.Tests/RemoteConfigSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void Remoting_should_contain_correct_configuration_values_in_ReferenceCon
Assert.Equal(20000, remoteSettings.SysMsgBufferSize);
Assert.Equal(TimeSpan.FromMinutes(3), remoteSettings.InitialSysMsgDeliveryTimeout);
Assert.Equal(TimeSpan.FromDays(5), remoteSettings.QuarantineDuration);
Assert.Equal(TimeSpan.FromDays(5), remoteSettings.QuarantineSilentSystemTimeout);
Assert.Equal(TimeSpan.FromDays(2), remoteSettings.QuarantineSilentSystemTimeout);
Assert.Equal(TimeSpan.FromSeconds(30), remoteSettings.CommandAckTimeout);
Assert.Single(remoteSettings.Transports);
Assert.Equal(typeof(TcpTransport), Type.GetType(remoteSettings.Transports.Head().TransportClass));
Expand Down Expand Up @@ -83,7 +83,7 @@ public void Remoting_should_be_able_to_parse_AkkaProtocol_related_config_element

Assert.Equal(typeof(DeadlineFailureDetector), Type.GetType(settings.TransportFailureDetectorImplementationClass));
Assert.Equal(TimeSpan.FromSeconds(4), settings.TransportHeartBeatInterval);
Assert.Equal(TimeSpan.FromSeconds(20), settings.TransportFailureDetectorConfig.GetTimeSpan("acceptable-heartbeat-pause"));
Assert.Equal(TimeSpan.FromSeconds(120), settings.TransportFailureDetectorConfig.GetTimeSpan("acceptable-heartbeat-pause"));
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public AkkaProtocolStressTest() : base(AkkaProtocolStressTestConfig)

#region Tests

[Fact(Skip="Racy - likely due to issue with Gremlin (FailureInjector) adapter")]
[Fact()]
public void AkkaProtocolTransport_must_guarantee_at_most_once_delivery_and_message_ordering_despite_packet_loss()
{
//todo mute both systems for deadletters for any type of message
Expand Down
Loading

0 comments on commit 65b4f55

Please sign in to comment.