Skip to content

Commit

Permalink
Handling UDP socket errors (#4641) (#4643)
Browse files Browse the repository at this point in the history
Ignoring System.Net.SocketError.ConnectionReset (WSAECONNRESET 10054 / java.net.PortUnreachableException).

Co-authored-by: Thomas Stollenwerk <[email protected]>
  • Loading branch information
motmot80 and Thomas Stollenwerk authored Dec 14, 2020
1 parent 77babf0 commit f81f3ea
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 11 deletions.
20 changes: 10 additions & 10 deletions src/core/Akka.Tests/IO/TcpListenerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public TcpListenerSpec()
{ }

[Fact]
public void A_TCP_Listner_must_let_the_bind_commander_know_when_binding_is_complete()
public void A_TCP_Listener_must_let_the_bind_commander_know_when_binding_is_complete()
{
new TestSetup(this, pullMode: false).Run(x =>
{
Expand All @@ -37,7 +37,7 @@ public void A_TCP_Listner_must_let_the_bind_commander_know_when_binding_is_compl
}

[Fact]
public void A_TCP_Listner_must_continue_to_accept_connections_after_a_previous_accept()
public void A_TCP_Listener_must_continue_to_accept_connections_after_a_previous_accept()
{
new TestSetup(this, pullMode: false).Run(x =>
{
Expand All @@ -49,17 +49,17 @@ public void A_TCP_Listner_must_continue_to_accept_connections_after_a_previous_a
}

[Fact]
public void A_TCP_Listner_must_react_to_unbind_commands_by_replying_with_unbound_and_stopping_itself()
public void A_TCP_Listener_must_react_to_unbind_commands_by_replying_with_unbound_and_stopping_itself()
{
new TestSetup(this, pullMode:false).Run(x =>
{
x.BindListener();
var unbindCommander = CreateTestProbe();
unbindCommander.Send(x.Listner, Tcp.Unbind.Instance);
unbindCommander.Send(x.Listener, Tcp.Unbind.Instance);
unbindCommander.ExpectMsg(Tcp.Unbound.Instance);
x.Parent.ExpectTerminated(x.Listner);
x.Parent.ExpectTerminated(x.Listener);
});
}

Expand Down Expand Up @@ -107,7 +107,7 @@ public void AttemptConnectionToEndpoint()
new Socket(_endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp).Connect(_endpoint);
}

public IActorRef Listner { get { return _parentRef.UnderlyingActor.Listner; } }
public IActorRef Listener { get { return _parentRef.UnderlyingActor.Listener; } }

public TestProbe SelectorRouter
{
Expand All @@ -121,23 +121,23 @@ class ListenerParent : ActorBase
{
private readonly TestSetup _test;
private readonly bool _pullMode;
private readonly IActorRef _listner;
private readonly IActorRef _listener;

public ListenerParent(TestSetup test, bool pullMode)
{
_test = test;
_pullMode = pullMode;

_listner = Context.ActorOf(Props.Create(() =>
_listener = Context.ActorOf(Props.Create(() =>
new TcpListener(
Tcp.Instance.Apply(Context.System),
test._bindCommander.Ref,
new Tcp.Bind(_test._handler.Ref, test._endpoint, 100, new Inet.SocketOption[]{}, pullMode)))
.WithDeploy(Deploy.Local));
_test._parent.Watch(_listner);
_test._parent.Watch(_listener);
}

internal IActorRef Listner { get { return _listner; } }
internal IActorRef Listener { get { return _listener; } }

protected override bool Receive(object message)
{
Expand Down
182 changes: 182 additions & 0 deletions src/core/Akka.Tests/IO/UdpListenerSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
//-----------------------------------------------------------------------
// <copyright file="UdpListenerSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2020 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2020 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using Akka.Actor;
using Akka.IO;
using Akka.TestKit;
using Xunit;
using UdpListener = Akka.IO.UdpListener;

namespace Akka.Tests.IO
{
public class UdpListenerSpec : AkkaSpec
{
public UdpListenerSpec()
: base(@"akka.io.udp.max-channels = unlimited
akka.io.udp.nr-of-selectors = 1
akka.io.udp.direct-buffer-pool-limit = 100
akka.io.udp.direct-buffer-size = 1024")
{ }

[Fact]
public void A_UDP_Listener_must_let_the_bind_commander_know_when_binding_is_complete()
{
new TestSetup(this).Run(x =>
{
x.BindCommander.ExpectMsg<Udp.Bound>();
});
}

[Fact]
public void A_UDP_Listener_must_forward_incoming_packets_to_handler_actor()
{
const string dgram = "Fly little packet!";
new TestSetup(this).Run(x =>
{
x.BindCommander.ExpectMsg<Udp.Bound>();
x.SendDataToLocal(Encoding.UTF8.GetBytes(dgram));
x.Handler.ExpectMsg<Udp.Received>(_ => Assert.Equal(dgram, Encoding.UTF8.GetString(_.Data.ToArray())));
x.SendDataToLocal(Encoding.UTF8.GetBytes(dgram));
x.Handler.ExpectMsg<Udp.Received>(_ => Assert.Equal(dgram, Encoding.UTF8.GetString(_.Data.ToArray())));
});
}

[Fact]
public void A_UDP_Listener_must_be_able_to_send_and_receive_when_server_goes_away()
{
new TestSetup(this).Run(x =>
{
x.BindCommander.ExpectMsg<Udp.Bound>();
// Receive UDP messages from a sender
const string requestMessage = "This is my last request!";
var notExistingEndPoint = x.SendDataToLocal(Encoding.UTF8.GetBytes(requestMessage));
x.Handler.ExpectMsg<Udp.Received>(_ =>
{
Assert.Equal(requestMessage, Encoding.UTF8.GetString(_.Data.ToArray()));
});
// Try to reply to this sender which DOES NOT EXIST any more
// Important: The UDP port of the reply must match the listing UDP port!
// This UDP port will also be used for ICMP error reporting.
// Hint: On Linux the listener port cannot be reused. We are using the udp actor to respond.
IActorRef localSender = x.Listener;
const string response = "Are you still alive?"; // he is not
localSender.Tell(Udp.Send.Create(ByteString.FromBytes(Encoding.UTF8.GetBytes(response)), notExistingEndPoint));
// Now an ICMP error message "port unreachable" (SocketError.ConnectionReset) is sent to our UDP server port
x.Handler.ExpectNoMsg(TimeSpan.FromSeconds(1));
const string followUpMessage = "Back online!";
x.SendDataToLocal(Encoding.UTF8.GetBytes(followUpMessage));
x.Handler.ExpectMsg<Udp.Received>(_ => Assert.Equal(followUpMessage, Encoding.UTF8.GetString(_.Data.ToArray())));
});
}

class TestSetup
{
private readonly TestKitBase _kit;

private readonly TestProbe _handler;
private readonly TestProbe _bindCommander;
private readonly TestProbe _parent;
private readonly IPEndPoint _localEndpoint;
private readonly TestActorRef<ListenerParent> _parentRef;

public TestSetup(TestKitBase kit) :
this(kit, TestUtils.TemporaryServerAddress())
{

}

public TestSetup(TestKitBase kit, IPEndPoint localEndpoint)
{
_kit = kit;
_localEndpoint = localEndpoint;
_handler = kit.CreateTestProbe();
_bindCommander = kit.CreateTestProbe();
_parent = kit.CreateTestProbe();
_parentRef = new TestActorRef<ListenerParent>(kit.Sys, Props.Create(() => new ListenerParent(this)));
}

public void Run(Action<TestSetup> test)
{
test(this);
}

public void BindListener()
{
_bindCommander.ExpectMsg<Udp.Bound>();
}

public IPEndPoint SendDataToLocal(byte[] buffer)
{
return SendDataToEndpoint(buffer, _localEndpoint);
}

public IPEndPoint SendDataToEndpoint(byte[] buffer, IPEndPoint receiverEndpoint)
{
var tempEndpoint = TestUtils.TemporaryServerAddress();
return SendDataToEndpoint(buffer, receiverEndpoint, tempEndpoint);
}

public IPEndPoint SendDataToEndpoint(byte[] buffer, IPEndPoint receiverEndpoint, IPEndPoint senderEndpoint)
{
using (var udpClient = new UdpClient(senderEndpoint.Port))
{
udpClient.Connect(receiverEndpoint);
udpClient.Send(buffer, buffer.Length);
}
return senderEndpoint;
}

public IActorRef Listener { get { return _parentRef.UnderlyingActor.Listener; } }

public TestProbe BindCommander { get { return _bindCommander; } }

public TestProbe Handler { get { return _handler; } }

public IPEndPoint LocalLocalEndPoint { get { return _localEndpoint; } }

class ListenerParent : ActorBase
{
private readonly TestSetup _test;
private readonly IActorRef _listener;
public ListenerParent(TestSetup test)
{
_test = test;

_listener = Context.ActorOf(Props.Create(() =>
new UdpListener(
Udp.Instance.Apply(Context.System),
test._bindCommander.Ref,
new Udp.Bind(_test._handler.Ref, test._localEndpoint, new Inet.SocketOption[]{})))
.WithDeploy(Deploy.Local));
_test._parent.Watch(_listener);
}

internal IActorRef Listener { get { return _listener; } }

protected override bool Receive(object message)
{
_test._parent.Forward(message);
return true;
}

protected override SupervisorStrategy SupervisorStrategy()
{
return Akka.Actor.SupervisorStrategy.StoppingStrategy;
}

}
}
}
}
26 changes: 25 additions & 1 deletion src/core/Akka/IO/UdpListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,15 @@ private void DoReceive(SocketAsyncEventArgs e, IActorRef handler)
{
try
{
handler.Tell(new Received(ByteString.CopyFrom(e.Buffer, e.Offset, e.BytesTransferred), e.RemoteEndPoint));
if (!IsICMPError(e))
{
if (e.SocketError != SocketError.Success)
throw new SocketException((int)e.SocketError);

handler.Tell(new Received(ByteString.CopyFrom(e.Buffer, e.Offset, e.BytesTransferred),
e.RemoteEndPoint));
}

ReceiveAsync();
}
finally
Expand All @@ -129,6 +137,21 @@ private void DoReceive(SocketAsyncEventArgs e, IActorRef handler)
}
}

/// <summary>
/// Checks if the socket event is an ICMP error message.
/// </summary>
/// <seealso href="https://tools.ietf.org/html/rfc1122#page-78"/>
private bool IsICMPError(SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.ConnectionReset)
{
Log.Debug("Ignoring client connection reset.");
return true;
}

return false;
}

/// <summary>
/// TBD
/// </summary>
Expand All @@ -151,6 +174,7 @@ protected override void PostStop()
private void ReceiveAsync()
{
var e = Udp.SocketEventArgsPool.Acquire(Self);

var buffer = Udp.BufferPool.Rent();
e.SetBuffer(buffer.Array, buffer.Offset, buffer.Count);
e.RemoteEndPoint = Socket.LocalEndPoint;
Expand Down

0 comments on commit f81f3ea

Please sign in to comment.