Skip to content

Commit

Permalink
Fix RHEL7 socket dispose hang, and extend coverage (#43409)
Browse files Browse the repository at this point in the history
Fix #42686 by doing a graceful close in case if the abortive connect(AF_UNSPEC) call fails on Linux, and improve couple of related tests:
- Extend RetryHelper with an exception filter
- Connect, Accept, Send, Receive, SendFile cancellation tests: make sure cancellation failures are not masked by RetryHelper (use exception filter)
- Connect, Accept, Send, Receive cancellation tests: also test IPV6 and DualMode sockets
  • Loading branch information
antonfirsov authored Oct 26, 2020
1 parent 693c1f0 commit 5a4483e
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,20 @@ public static void ForceNonBlocking(this Socket socket, bool force)
}
}

public static (Socket, Socket) CreateConnectedSocketPair()
public static (Socket client, Socket server) CreateConnectedSocketPair(bool ipv6 = false, bool dualModeClient = false)
{
using Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
listener.Bind(new IPEndPoint(IPAddress.Loopback, 0));
IPAddress serverAddress = ipv6 ? IPAddress.IPv6Loopback : IPAddress.Loopback;

using Socket listener = new Socket(serverAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
listener.Bind(new IPEndPoint(serverAddress, 0));
listener.Listen(1);

Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
client.Connect(listener.LocalEndPoint);
IPEndPoint connectTo = (IPEndPoint)listener.LocalEndPoint;
if (dualModeClient) connectTo = new IPEndPoint(connectTo.Address.MapToIPv6(), connectTo.Port);

Socket client = new Socket(connectTo.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
if (dualModeClient) client.DualMode = true;
client.Connect(connectTo);
Socket server = listener.Accept();

return (client, server);
Expand Down
15 changes: 11 additions & 4 deletions src/libraries/Common/tests/TestUtilities/System/RetryHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ namespace System
public static partial class RetryHelper
{
private static readonly Func<int, int> s_defaultBackoffFunc = i => Math.Min(i * 100, 60_000);
private static readonly Predicate<Exception> s_defaultRetryWhenFunc = _ => true;

/// <summary>Executes the <paramref name="test"/> action up to a maximum of <paramref name="maxAttempts"/> times.</summary>
/// <param name="maxAttempts">The maximum number of times to invoke <paramref name="test"/>.</param>
/// <param name="test">The test to invoke.</param>
/// <param name="backoffFunc">After a failure, invoked to determine how many milliseconds to wait before the next attempt. It's passed the number of iterations attempted.</param>
public static void Execute(Action test, int maxAttempts = 5, Func<int, int> backoffFunc = null)
/// <param name="retryWhen">Invoked to select the exceptions to retry on. If not set, any exception will trigger a retry.</param>
public static void Execute(Action test, int maxAttempts = 5, Func<int, int> backoffFunc = null, Predicate<Exception> retryWhen = null)
{
// Validate arguments
if (maxAttempts < 1)
Expand All @@ -27,6 +29,8 @@ public static void Execute(Action test, int maxAttempts = 5, Func<int, int> back
throw new ArgumentNullException(nameof(test));
}

retryWhen ??= s_defaultRetryWhenFunc;

// Execute the test until it either passes or we run it maxAttempts times
var exceptions = new List<Exception>();
for (int i = 1; i <= maxAttempts; i++)
Expand All @@ -36,7 +40,7 @@ public static void Execute(Action test, int maxAttempts = 5, Func<int, int> back
test();
return;
}
catch (Exception e)
catch (Exception e) when (retryWhen(e))
{
exceptions.Add(e);
if (i == maxAttempts)
Expand All @@ -53,7 +57,8 @@ public static void Execute(Action test, int maxAttempts = 5, Func<int, int> back
/// <param name="maxAttempts">The maximum number of times to invoke <paramref name="test"/>.</param>
/// <param name="test">The test to invoke.</param>
/// <param name="backoffFunc">After a failure, invoked to determine how many milliseconds to wait before the next attempt. It's passed the number of iterations attempted.</param>
public static async Task ExecuteAsync(Func<Task> test, int maxAttempts = 5, Func<int, int> backoffFunc = null)
/// <param name="retryWhen">Invoked to select the exceptions to retry on. If not set, any exception will trigger a retry.</param>
public static async Task ExecuteAsync(Func<Task> test, int maxAttempts = 5, Func<int, int> backoffFunc = null, Predicate<Exception> retryWhen = null)
{
// Validate arguments
if (maxAttempts < 1)
Expand All @@ -65,6 +70,8 @@ public static async Task ExecuteAsync(Func<Task> test, int maxAttempts = 5, Func
throw new ArgumentNullException(nameof(test));
}

retryWhen ??= s_defaultRetryWhenFunc;

// Execute the test until it either passes or we run it maxAttempts times
var exceptions = new List<Exception>();
for (int i = 1; i <= maxAttempts; i++)
Expand All @@ -74,7 +81,7 @@ public static async Task ExecuteAsync(Func<Task> test, int maxAttempts = 5, Func
await test().ConfigureAwait(false);
return;
}
catch (Exception e)
catch (Exception e) when (retryWhen(e))
{
exceptions.Add(e);
if (i == maxAttempts)
Expand Down
5 changes: 5 additions & 0 deletions src/libraries/Native/Unix/System.Native/pal_networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -3085,6 +3085,11 @@ int32_t SystemNative_Disconnect(intptr_t socket)
addr.sa_family = AF_UNSPEC;

err = connect(fd, &addr, sizeof(addr));
if (err != 0)
{
// On some older kernels connect(AF_UNSPEC) may fail. Fall back to shutdown in these cases:
err = shutdown(fd, SHUT_RDWR);
}
#elif HAVE_DISCONNECTX
// disconnectx causes a FIN close on OSX. It's the best we can do.
err = disconnectx(fd, SAE_ASSOCID_ANY, SAE_CONNID_ANY);
Expand Down
26 changes: 16 additions & 10 deletions src/libraries/System.Net.Sockets/tests/FunctionalTests/Accept.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
using Xunit.Sdk;

namespace System.Net.Sockets.Tests
{
Expand Down Expand Up @@ -289,16 +290,25 @@ public async Task AcceptAsync_MultipleAcceptsThenDispose_AcceptsThrowAfterDispos
}
}

[Fact]
public async Task AcceptGetsCanceledByDispose()
public static readonly TheoryData<IPAddress> AcceptGetsCanceledByDispose_Data = new TheoryData<IPAddress>
{
{ IPAddress.Loopback },
{ IPAddress.IPv6Loopback },
{ IPAddress.Loopback.MapToIPv6() }
};

[Theory]
[MemberData(nameof(AcceptGetsCanceledByDispose_Data))]
public async Task AcceptGetsCanceledByDispose(IPAddress loopback)
{
// We try this a couple of times to deal with a timing race: if the Dispose happens
// before the operation is started, we won't see a SocketException.
int msDelay = 100;
await RetryHelper.ExecuteAsync(async () =>
{
var listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
listener.Bind(new IPEndPoint(IPAddress.Loopback, 0));
var listener = new Socket(loopback.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
if (loopback.IsIPv4MappedToIPv6) listener.DualMode = true;
listener.Bind(new IPEndPoint(loopback, 0));
listener.Listen(1);
Task acceptTask = AcceptAsync(listener);
Expand All @@ -308,11 +318,7 @@ await RetryHelper.ExecuteAsync(async () =>
msDelay *= 2;
Task disposeTask = Task.Run(() => listener.Dispose());
var cts = new CancellationTokenSource();
Task timeoutTask = Task.Delay(30000, cts.Token);
Assert.NotSame(timeoutTask, await Task.WhenAny(disposeTask, acceptTask, timeoutTask));
cts.Cancel();
await Task.WhenAny(disposeTask, acceptTask).TimeoutAfter(30000);
await disposeTask;
SocketError? localSocketError = null;
Expand Down Expand Up @@ -343,7 +349,7 @@ await RetryHelper.ExecuteAsync(async () =>
{
Assert.Equal(SocketError.OperationAborted, localSocketError);
}
}, maxAttempts: 10);
}, maxAttempts: 10, retryWhen: e => e is XunitException);
}

[Fact]
Expand Down
26 changes: 16 additions & 10 deletions src/libraries/System.Net.Sockets/tests/FunctionalTests/Connect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
using Xunit.Sdk;

namespace System.Net.Sockets.Tests
{
Expand Down Expand Up @@ -120,29 +121,34 @@ public async Task Connect_AfterDisconnect_Fails()
}
}

[Fact]
public static readonly TheoryData<IPAddress> ConnectGetsCanceledByDispose_Data = new TheoryData<IPAddress>
{
{ IPAddress.Parse("1.1.1.1") },
{ IPAddress.Parse("1.1.1.1").MapToIPv6() },
};

[OuterLoop("Connects to external server")]
[Theory]
[MemberData(nameof(ConnectGetsCanceledByDispose_Data))]
[PlatformSpecific(~(TestPlatforms.OSX | TestPlatforms.FreeBSD))] // Not supported on BSD like OSes.
public async Task ConnectGetsCanceledByDispose()
public async Task ConnectGetsCanceledByDispose(IPAddress address)
{
// We try this a couple of times to deal with a timing race: if the Dispose happens
// before the operation is started, we won't see a SocketException.
int msDelay = 100;
await RetryHelper.ExecuteAsync(async () =>
{
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var client = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
if (address.IsIPv4MappedToIPv6) client.DualMode = true;
Task connectTask = ConnectAsync(client, new IPEndPoint(IPAddress.Parse("1.1.1.1"), 23));
Task connectTask = ConnectAsync(client, new IPEndPoint(address, 23));
// Wait a little so the operation is started.
await Task.Delay(msDelay);
msDelay *= 2;
Task disposeTask = Task.Run(() => client.Dispose());
var cts = new CancellationTokenSource();
Task timeoutTask = Task.Delay(30000, cts.Token);
Assert.NotSame(timeoutTask, await Task.WhenAny(disposeTask, connectTask, timeoutTask));
cts.Cancel();
await Task.WhenAny(disposeTask, connectTask).TimeoutAfter(30000);
await disposeTask;
SocketError? localSocketError = null;
Expand Down Expand Up @@ -176,7 +182,7 @@ await RetryHelper.ExecuteAsync(async () =>
{
Assert.Equal(SocketError.OperationAborted, localSocketError);
}
}, maxAttempts: 10);
}, maxAttempts: 10, retryWhen: e => e is XunitException);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public void InlineSocketContinuations()
await new SendReceiveEap(null).SendRecv_Stream_TCP(IPAddress.Loopback, useMultipleBuffers: false);
await new SendReceiveEap(null).SendRecv_Stream_TCP_MultipleConcurrentReceives(IPAddress.Loopback, useMultipleBuffers: false);
await new SendReceiveEap(null).SendRecv_Stream_TCP_MultipleConcurrentSends(IPAddress.Loopback, useMultipleBuffers: false);
await new SendReceiveEap(null).TcpReceiveSendGetsCanceledByDispose(receiveOrSend: true);
await new SendReceiveEap(null).TcpReceiveSendGetsCanceledByDispose(receiveOrSend: false);
await new SendReceiveEap(null).TcpReceiveSendGetsCanceledByDispose(receiveOrSend: true, ipv6Server: false, dualModeClient: false);
await new SendReceiveEap(null).TcpReceiveSendGetsCanceledByDispose(receiveOrSend: false, ipv6Server: false, dualModeClient: false);
}, options).Dispose();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading.Tasks;

using Xunit;
using Xunit.Sdk;

namespace System.Net.Sockets.Tests
{
Expand Down Expand Up @@ -310,11 +311,7 @@ await RetryHelper.ExecuteAsync(async () =>
msDelay *= 2;
Task disposeTask = Task.Run(() => socket1.Dispose());
var cts = new CancellationTokenSource();
Task timeoutTask = Task.Delay(30000, cts.Token);
Assert.NotSame(timeoutTask, await Task.WhenAny(disposeTask, socketOperation, timeoutTask));
cts.Cancel();
await Task.WhenAny(disposeTask, socketOperation).TimeoutAfter(30000);
await disposeTask;
SocketError? localSocketError = null;
Expand Down Expand Up @@ -355,7 +352,7 @@ await RetryHelper.ExecuteAsync(async () =>
Assert.Equal(SocketError.ConnectionReset, peerSocketError);
}
}
}, maxAttempts: 10);
}, maxAttempts: 10, retryWhen: e => e is XunitException);
}

[OuterLoop]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.DotNet.RemoteExecutor;
using Microsoft.DotNet.XUnitExtensions;
using Xunit;
using Xunit.Abstractions;
using Xunit.Sdk;

namespace System.Net.Sockets.Tests
{
Expand Down Expand Up @@ -937,17 +939,26 @@ error is SocketException ||
}
}

[Fact]
public static readonly TheoryData<IPAddress> UdpReceiveGetsCanceledByDispose_Data = new TheoryData<IPAddress>
{
{ IPAddress.Loopback },
{ IPAddress.IPv6Loopback },
{ IPAddress.Loopback.MapToIPv6() }
};

[Theory]
[MemberData(nameof(UdpReceiveGetsCanceledByDispose_Data))]
[PlatformSpecific(~TestPlatforms.OSX)] // Not supported on OSX.
public async Task UdpReceiveGetsCanceledByDispose()
public async Task UdpReceiveGetsCanceledByDispose(IPAddress address)
{
// We try this a couple of times to deal with a timing race: if the Dispose happens
// before the operation is started, we won't see a SocketException.
int msDelay = 100;
await RetryHelper.ExecuteAsync(async () =>
{
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
socket.BindToAnonymousPort(IPAddress.Loopback);
var socket = new Socket(address.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
if (address.IsIPv4MappedToIPv6) socket.DualMode = true;
socket.BindToAnonymousPort(address);
Task receiveTask = ReceiveAsync(socket, new ArraySegment<byte>(new byte[1]));
Expand All @@ -956,11 +967,7 @@ await RetryHelper.ExecuteAsync(async () =>
msDelay *= 2;
Task disposeTask = Task.Run(() => socket.Dispose());
var cts = new CancellationTokenSource();
Task timeoutTask = Task.Delay(30000, cts.Token);
Assert.NotSame(timeoutTask, await Task.WhenAny(disposeTask, receiveTask, timeoutTask));
cts.Cancel();
await Task.WhenAny(disposeTask, receiveTask).TimeoutAfter(30000);
await disposeTask;
SocketError? localSocketError = null;
Expand Down Expand Up @@ -991,21 +998,35 @@ await RetryHelper.ExecuteAsync(async () =>
{
Assert.Equal(SocketError.OperationAborted, localSocketError);
}
}, maxAttempts: 10);
}, maxAttempts: 10, retryWhen: e => e is XunitException);
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task TcpReceiveSendGetsCanceledByDispose(bool receiveOrSend)
public static readonly TheoryData<bool, bool, bool> TcpReceiveSendGetsCanceledByDispose_Data = new TheoryData<bool, bool, bool>
{
{ true, false, false },
{ true, false, true },
{ true, true, false },
{ false, false, false },
{ false, false, true },
{ false, true, false },
};

[Theory(Timeout = 40000)]

This comment has been minimized.

Copy link
@tmds

tmds Apr 15, 2021

Member

@antonfirsov I'm investigating why this test fails on our internal CI.

The error I get now is:

Test execution timed out after 40000 milliseconds

This is not very helpful. I rather would have seen a timeout from within the test.

Can we remove the Timeout = 40000?

This comment has been minimized.

Copy link
@antonfirsov

antonfirsov Apr 15, 2021

Author Member

Yeah, let's move the timeout to the test itself. My main goal was to avoid hanging the whole test process, that's the worst.

[MemberData(nameof(TcpReceiveSendGetsCanceledByDispose_Data))]
public async Task TcpReceiveSendGetsCanceledByDispose(bool receiveOrSend, bool ipv6Server, bool dualModeClient)
{
// RHEL7 kernel has a bug preventing close(AF_UNKNOWN) to succeed with IPv6 sockets.
// In this case Dispose will trigger a graceful shutdown, which means that receive will succeed on socket2.
// TODO: Remove this, once CI machines are updated to a newer kernel.
bool expectGracefulShutdown = UsesSync && PlatformDetection.IsRedHatFamily7 && receiveOrSend && (ipv6Server || dualModeClient);

// We try this a couple of times to deal with a timing race: if the Dispose happens
// before the operation is started, the peer won't see a ConnectionReset SocketException and we won't
// see a SocketException either.
int msDelay = 100;
await RetryHelper.ExecuteAsync(async () =>
{
(Socket socket1, Socket socket2) = SocketTestExtensions.CreateConnectedSocketPair();
(Socket socket1, Socket socket2) = SocketTestExtensions.CreateConnectedSocketPair(ipv6Server, dualModeClient);
using (socket2)
{
Task socketOperation;
Expand All @@ -1030,11 +1051,7 @@ await RetryHelper.ExecuteAsync(async () =>
msDelay *= 2;
Task disposeTask = Task.Run(() => socket1.Dispose());
var cts = new CancellationTokenSource();
Task timeoutTask = Task.Delay(30000, cts.Token);
Assert.NotSame(timeoutTask, await Task.WhenAny(disposeTask, socketOperation, timeoutTask));
cts.Cancel();
await Task.WhenAny(disposeTask, socketOperation).TimeoutAfter(30000);
await disposeTask;
SocketError? localSocketError = null;
Expand Down Expand Up @@ -1088,10 +1105,18 @@ await RetryHelper.ExecuteAsync(async () =>
break;
}
}
Assert.Equal(SocketError.ConnectionReset, peerSocketError);
if (!expectGracefulShutdown)
{
Assert.Equal(SocketError.ConnectionReset, peerSocketError);
}
else
{
Assert.Null(peerSocketError);
}
}
}
}, maxAttempts: 10);
}, maxAttempts: 10, retryWhen: e => e is XunitException);
}

[Fact]
Expand Down
Loading

0 comments on commit 5a4483e

Please sign in to comment.