Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix RHEL7 socket dispose hang, and extend coverage #43409

Merged
merged 27 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9f2c092
Socket tests: don't retry CanceledByDispose tests on timeout
tmds Sep 25, 2020
37176da
print g_config_specified_ciphersuites
tmds Sep 28, 2020
b7b8d2c
Move connection timeout retry
tmds Oct 1, 2020
ee3dd19
Undo unrelated changes
tmds Oct 6, 2020
48a69c4
Merge branch 'canceledbydispose_timeout' into af/disposecancel
antonfirsov Oct 12, 2020
512edf5
test cancellation also with IPV6 and Dual Mode
antonfirsov Oct 12, 2020
80501f9
do not use 1.1.1.1 and similar
antonfirsov Oct 12, 2020
4d83e73
Revert "do not use 1.1.1.1 and similar"
antonfirsov Oct 12, 2020
ba5df5e
do not test IPV6 for now
antonfirsov Oct 12, 2020
b301bb0
fall back to shutdown, if connect(AF_UNSPEC) fails
antonfirsov Oct 12, 2020
6158460
TcpReceiveSendGetsCanceledByDispose timeout handling
antonfirsov Oct 12, 2020
b77d31f
update tests
antonfirsov Oct 14, 2020
1da5006
OuterLoop test using 1.1.1.1
antonfirsov Oct 14, 2020
5b9b879
change CreateConnectedSocketPair API
antonfirsov Oct 14, 2020
102561a
do not use SkipTestException
antonfirsov Oct 14, 2020
f591786
CI tweaks
antonfirsov Oct 16, 2020
6909cc8
fix Unix build
antonfirsov Oct 22, 2020
8a3ce60
fix TcpReceiveSendGetsCanceledByDispose for RHEL7
antonfirsov Oct 22, 2020
800d910
actually fix TcpReceiveSendGetsCanceledByDispose
antonfirsov Oct 22, 2020
b371ae7
RetryHelper: add retryWhen argument
antonfirsov Oct 22, 2020
720c522
AcceptGetsCanceledByDispose: use RetryHelper again
antonfirsov Oct 22, 2020
412a1df
ConnectGetsCanceledByDispose: use RetryHelper again
antonfirsov Oct 22, 2020
86dca50
SyncSendFileGetsCanceledByDispose: use RetryHelper again
antonfirsov Oct 22, 2020
5b109e3
SendReceive: use RetryHelper again
antonfirsov Oct 22, 2020
9842c81
remove empty line
antonfirsov Oct 22, 2020
1769820
fix build
antonfirsov Oct 22, 2020
7475c15
add missing <param> docs
antonfirsov Oct 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,20 @@ public static void ForceNonBlocking(this Socket socket, bool force)
}
}

public static (Socket, Socket) CreateConnectedSocketPair()
public static (Socket client, Socket server) CreateConnectedSocketPair(bool ipv6 = false, bool dualModeClient = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: these are not client and server, but two connected TCP peers. The server Socket doesn't leave the method, and it looks like we are not Disposing it, which we probably should.

Copy link
Member Author

@antonfirsov antonfirsov Oct 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: these are not client and server, but two connected TCP peers.

The terminology reflects their roles during the establishment of the connection. I understand that this information is not needed afterwards, but I still find the labels useful for convenience, because it helps me mentally map the tests to a common real-world scenarios. For example dualModeClient = false refers to the first Socket client. and it sounds more natural than bool dualModeSocket1, which would be the case if we'd rename the tuple. We use this terminology in other tests, although it's not necessary or meaningful to label anything as client/server. This is not a very strong opinion although.

The server Socket doesn't leave the method, and it looks like we are not Disposing it, which we probably should.

You mean the listner I guess. I also don't like this, but this problem is not specific to the PR. We should fix it on all call sites which I would do separately.

{
using Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
listener.Bind(new IPEndPoint(IPAddress.Loopback, 0));
IPAddress serverAddress = ipv6 ? IPAddress.IPv6Loopback : IPAddress.Loopback;

using Socket listener = new Socket(serverAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
listener.Bind(new IPEndPoint(serverAddress, 0));
listener.Listen(1);

Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
client.Connect(listener.LocalEndPoint);
IPEndPoint connectTo = (IPEndPoint)listener.LocalEndPoint;
if (dualModeClient) connectTo = new IPEndPoint(connectTo.Address.MapToIPv6(), connectTo.Port);

Socket client = new Socket(connectTo.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
if (dualModeClient) client.DualMode = true;
client.Connect(connectTo);
Socket server = listener.Accept();

return (client, server);
Expand Down
15 changes: 11 additions & 4 deletions src/libraries/Common/tests/TestUtilities/System/RetryHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ namespace System
public static partial class RetryHelper
{
private static readonly Func<int, int> s_defaultBackoffFunc = i => Math.Min(i * 100, 60_000);
private static readonly Predicate<Exception> s_defaultRetryWhenFunc = _ => true;

/// <summary>Executes the <paramref name="test"/> action up to a maximum of <paramref name="maxAttempts"/> times.</summary>
/// <param name="maxAttempts">The maximum number of times to invoke <paramref name="test"/>.</param>
/// <param name="test">The test to invoke.</param>
/// <param name="backoffFunc">After a failure, invoked to determine how many milliseconds to wait before the next attempt. It's passed the number of iterations attempted.</param>
public static void Execute(Action test, int maxAttempts = 5, Func<int, int> backoffFunc = null)
/// <param name="retryWhen">Invoked to select the exceptions to retry on. If not set, any exception will trigger a retry.</param>
public static void Execute(Action test, int maxAttempts = 5, Func<int, int> backoffFunc = null, Predicate<Exception> retryWhen = null)
{
// Validate arguments
if (maxAttempts < 1)
Expand All @@ -27,6 +29,8 @@ public static void Execute(Action test, int maxAttempts = 5, Func<int, int> back
throw new ArgumentNullException(nameof(test));
}

retryWhen ??= s_defaultRetryWhenFunc;

// Execute the test until it either passes or we run it maxAttempts times
var exceptions = new List<Exception>();
for (int i = 1; i <= maxAttempts; i++)
Expand All @@ -36,7 +40,7 @@ public static void Execute(Action test, int maxAttempts = 5, Func<int, int> back
test();
return;
}
catch (Exception e)
catch (Exception e) when (retryWhen(e))
{
exceptions.Add(e);
if (i == maxAttempts)
Expand All @@ -53,7 +57,8 @@ public static void Execute(Action test, int maxAttempts = 5, Func<int, int> back
/// <param name="maxAttempts">The maximum number of times to invoke <paramref name="test"/>.</param>
/// <param name="test">The test to invoke.</param>
/// <param name="backoffFunc">After a failure, invoked to determine how many milliseconds to wait before the next attempt. It's passed the number of iterations attempted.</param>
public static async Task ExecuteAsync(Func<Task> test, int maxAttempts = 5, Func<int, int> backoffFunc = null)
/// <param name="retryWhen">Invoked to select the exceptions to retry on. If not set, any exception will trigger a retry.</param>
public static async Task ExecuteAsync(Func<Task> test, int maxAttempts = 5, Func<int, int> backoffFunc = null, Predicate<Exception> retryWhen = null)
{
// Validate arguments
if (maxAttempts < 1)
Expand All @@ -65,6 +70,8 @@ public static async Task ExecuteAsync(Func<Task> test, int maxAttempts = 5, Func
throw new ArgumentNullException(nameof(test));
}

retryWhen ??= s_defaultRetryWhenFunc;

// Execute the test until it either passes or we run it maxAttempts times
var exceptions = new List<Exception>();
for (int i = 1; i <= maxAttempts; i++)
Expand All @@ -74,7 +81,7 @@ public static async Task ExecuteAsync(Func<Task> test, int maxAttempts = 5, Func
await test().ConfigureAwait(false);
return;
}
catch (Exception e)
catch (Exception e) when (retryWhen(e))
{
exceptions.Add(e);
if (i == maxAttempts)
Expand Down
5 changes: 5 additions & 0 deletions src/libraries/Native/Unix/System.Native/pal_networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -3085,6 +3085,11 @@ int32_t SystemNative_Disconnect(intptr_t socket)
addr.sa_family = AF_UNSPEC;

err = connect(fd, &addr, sizeof(addr));
if (err != 0)
{
// On some older kernels connect(AF_UNSPEC) may fail. Fall back to shutdown in these cases:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connect(AF_UNSPEC) has worked since old kernels. It's an issue in SELinux that caused this to fail. That's why it is affecting SELinux based distros. You don't need to update the comment, this is background info.

err = shutdown(fd, SHUT_RDWR);
}
#elif HAVE_DISCONNECTX
// disconnectx causes a FIN close on OSX. It's the best we can do.
err = disconnectx(fd, SAE_ASSOCID_ANY, SAE_CONNID_ANY);
Expand Down
26 changes: 16 additions & 10 deletions src/libraries/System.Net.Sockets/tests/FunctionalTests/Accept.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
using Xunit.Sdk;

namespace System.Net.Sockets.Tests
{
Expand Down Expand Up @@ -289,16 +290,25 @@ public async Task AcceptAsync_MultipleAcceptsThenDispose_AcceptsThrowAfterDispos
}
}

[Fact]
public async Task AcceptGetsCanceledByDispose()
public static readonly TheoryData<IPAddress> AcceptGetsCanceledByDispose_Data = new TheoryData<IPAddress>
{
{ IPAddress.Loopback },
{ IPAddress.IPv6Loopback },
{ IPAddress.Loopback.MapToIPv6() }
};

[Theory]
[MemberData(nameof(AcceptGetsCanceledByDispose_Data))]
public async Task AcceptGetsCanceledByDispose(IPAddress loopback)
{
// We try this a couple of times to deal with a timing race: if the Dispose happens
// before the operation is started, we won't see a SocketException.
int msDelay = 100;
await RetryHelper.ExecuteAsync(async () =>
{
var listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
listener.Bind(new IPEndPoint(IPAddress.Loopback, 0));
var listener = new Socket(loopback.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
if (loopback.IsIPv4MappedToIPv6) listener.DualMode = true;
listener.Bind(new IPEndPoint(loopback, 0));
listener.Listen(1);

Task acceptTask = AcceptAsync(listener);
Expand All @@ -308,11 +318,7 @@ await RetryHelper.ExecuteAsync(async () =>
msDelay *= 2;
Task disposeTask = Task.Run(() => listener.Dispose());

var cts = new CancellationTokenSource();
Task timeoutTask = Task.Delay(30000, cts.Token);
Assert.NotSame(timeoutTask, await Task.WhenAny(disposeTask, acceptTask, timeoutTask));
cts.Cancel();

await Task.WhenAny(disposeTask, acceptTask).TimeoutAfter(30000);
await disposeTask;

SocketError? localSocketError = null;
Expand Down Expand Up @@ -343,7 +349,7 @@ await RetryHelper.ExecuteAsync(async () =>
{
Assert.Equal(SocketError.OperationAborted, localSocketError);
}
}, maxAttempts: 10);
}, maxAttempts: 10, retryWhen: e => e is XunitException);
}

[Fact]
Expand Down
26 changes: 16 additions & 10 deletions src/libraries/System.Net.Sockets/tests/FunctionalTests/Connect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
using Xunit.Sdk;

namespace System.Net.Sockets.Tests
{
Expand Down Expand Up @@ -120,29 +121,34 @@ public async Task Connect_AfterDisconnect_Fails()
}
}

[Fact]
public static readonly TheoryData<IPAddress> ConnectGetsCanceledByDispose_Data = new TheoryData<IPAddress>
{
{ IPAddress.Parse("1.1.1.1") },
{ IPAddress.Parse("1.1.1.1").MapToIPv6() },
};

[OuterLoop("Connects to external server")]
[Theory]
[MemberData(nameof(ConnectGetsCanceledByDispose_Data))]
[PlatformSpecific(~(TestPlatforms.OSX | TestPlatforms.FreeBSD))] // Not supported on BSD like OSes.
public async Task ConnectGetsCanceledByDispose()
public async Task ConnectGetsCanceledByDispose(IPAddress address)
{
// We try this a couple of times to deal with a timing race: if the Dispose happens
// before the operation is started, we won't see a SocketException.
int msDelay = 100;
await RetryHelper.ExecuteAsync(async () =>
{
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var client = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
if (address.IsIPv4MappedToIPv6) client.DualMode = true;

Task connectTask = ConnectAsync(client, new IPEndPoint(IPAddress.Parse("1.1.1.1"), 23));
Task connectTask = ConnectAsync(client, new IPEndPoint(address, 23));

// Wait a little so the operation is started.
await Task.Delay(msDelay);
msDelay *= 2;
Task disposeTask = Task.Run(() => client.Dispose());

var cts = new CancellationTokenSource();
Task timeoutTask = Task.Delay(30000, cts.Token);
Assert.NotSame(timeoutTask, await Task.WhenAny(disposeTask, connectTask, timeoutTask));
cts.Cancel();

await Task.WhenAny(disposeTask, connectTask).TimeoutAfter(30000);
await disposeTask;

SocketError? localSocketError = null;
Expand Down Expand Up @@ -176,7 +182,7 @@ await RetryHelper.ExecuteAsync(async () =>
{
Assert.Equal(SocketError.OperationAborted, localSocketError);
}
}, maxAttempts: 10);
}, maxAttempts: 10, retryWhen: e => e is XunitException);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public void InlineSocketContinuations()
await new SendReceiveEap(null).SendRecv_Stream_TCP(IPAddress.Loopback, useMultipleBuffers: false);
await new SendReceiveEap(null).SendRecv_Stream_TCP_MultipleConcurrentReceives(IPAddress.Loopback, useMultipleBuffers: false);
await new SendReceiveEap(null).SendRecv_Stream_TCP_MultipleConcurrentSends(IPAddress.Loopback, useMultipleBuffers: false);
await new SendReceiveEap(null).TcpReceiveSendGetsCanceledByDispose(receiveOrSend: true);
await new SendReceiveEap(null).TcpReceiveSendGetsCanceledByDispose(receiveOrSend: false);
await new SendReceiveEap(null).TcpReceiveSendGetsCanceledByDispose(receiveOrSend: true, ipv6Server: false, dualModeClient: false);
await new SendReceiveEap(null).TcpReceiveSendGetsCanceledByDispose(receiveOrSend: false, ipv6Server: false, dualModeClient: false);
}, options).Dispose();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading.Tasks;

using Xunit;
using Xunit.Sdk;

namespace System.Net.Sockets.Tests
{
Expand Down Expand Up @@ -310,11 +311,7 @@ await RetryHelper.ExecuteAsync(async () =>
msDelay *= 2;
Task disposeTask = Task.Run(() => socket1.Dispose());

var cts = new CancellationTokenSource();
Task timeoutTask = Task.Delay(30000, cts.Token);
Assert.NotSame(timeoutTask, await Task.WhenAny(disposeTask, socketOperation, timeoutTask));
cts.Cancel();

await Task.WhenAny(disposeTask, socketOperation).TimeoutAfter(30000);
await disposeTask;

SocketError? localSocketError = null;
Expand Down Expand Up @@ -355,7 +352,7 @@ await RetryHelper.ExecuteAsync(async () =>
Assert.Equal(SocketError.ConnectionReset, peerSocketError);
}
}
}, maxAttempts: 10);
}, maxAttempts: 10, retryWhen: e => e is XunitException);
}

[OuterLoop]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.DotNet.RemoteExecutor;
using Microsoft.DotNet.XUnitExtensions;
using Xunit;
using Xunit.Abstractions;
using Xunit.Sdk;

namespace System.Net.Sockets.Tests
{
Expand Down Expand Up @@ -937,17 +939,26 @@ error is SocketException ||
}
}

[Fact]
public static readonly TheoryData<IPAddress> UdpReceiveGetsCanceledByDispose_Data = new TheoryData<IPAddress>
{
{ IPAddress.Loopback },
{ IPAddress.IPv6Loopback },
{ IPAddress.Loopback.MapToIPv6() }
};

[Theory]
[MemberData(nameof(UdpReceiveGetsCanceledByDispose_Data))]
[PlatformSpecific(~TestPlatforms.OSX)] // Not supported on OSX.
public async Task UdpReceiveGetsCanceledByDispose()
public async Task UdpReceiveGetsCanceledByDispose(IPAddress address)
{
// We try this a couple of times to deal with a timing race: if the Dispose happens
// before the operation is started, we won't see a SocketException.
int msDelay = 100;
await RetryHelper.ExecuteAsync(async () =>
{
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
socket.BindToAnonymousPort(IPAddress.Loopback);
var socket = new Socket(address.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
if (address.IsIPv4MappedToIPv6) socket.DualMode = true;
socket.BindToAnonymousPort(address);

Task receiveTask = ReceiveAsync(socket, new ArraySegment<byte>(new byte[1]));

Expand All @@ -956,11 +967,7 @@ await RetryHelper.ExecuteAsync(async () =>
msDelay *= 2;
Task disposeTask = Task.Run(() => socket.Dispose());

var cts = new CancellationTokenSource();
Task timeoutTask = Task.Delay(30000, cts.Token);
Assert.NotSame(timeoutTask, await Task.WhenAny(disposeTask, receiveTask, timeoutTask));
cts.Cancel();

await Task.WhenAny(disposeTask, receiveTask).TimeoutAfter(30000);
await disposeTask;

SocketError? localSocketError = null;
Expand Down Expand Up @@ -991,21 +998,35 @@ await RetryHelper.ExecuteAsync(async () =>
{
Assert.Equal(SocketError.OperationAborted, localSocketError);
}
}, maxAttempts: 10);
}, maxAttempts: 10, retryWhen: e => e is XunitException);
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task TcpReceiveSendGetsCanceledByDispose(bool receiveOrSend)
public static readonly TheoryData<bool, bool, bool> TcpReceiveSendGetsCanceledByDispose_Data = new TheoryData<bool, bool, bool>
{
{ true, false, false },
{ true, false, true },
{ true, true, false },
{ false, false, false },
{ false, false, true },
{ false, true, false },
};

[Theory(Timeout = 40000)]
[MemberData(nameof(TcpReceiveSendGetsCanceledByDispose_Data))]
public async Task TcpReceiveSendGetsCanceledByDispose(bool receiveOrSend, bool ipv6Server, bool dualModeClient)
{
// RHEL7 kernel has a bug preventing close(AF_UNKNOWN) to succeed with IPv6 sockets.
// In this case Dispose will trigger a graceful shutdown, which means that receive will succeed on socket2.
// TODO: Remove this, once CI machines are updated to a newer kernel.
bool expectGracefulShutdown = UsesSync && PlatformDetection.IsRedHatFamily7 && receiveOrSend && (ipv6Server || dualModeClient);

// We try this a couple of times to deal with a timing race: if the Dispose happens
// before the operation is started, the peer won't see a ConnectionReset SocketException and we won't
// see a SocketException either.
int msDelay = 100;
await RetryHelper.ExecuteAsync(async () =>
{
(Socket socket1, Socket socket2) = SocketTestExtensions.CreateConnectedSocketPair();
(Socket socket1, Socket socket2) = SocketTestExtensions.CreateConnectedSocketPair(ipv6Server, dualModeClient);
using (socket2)
{
Task socketOperation;
Expand All @@ -1030,11 +1051,7 @@ await RetryHelper.ExecuteAsync(async () =>
msDelay *= 2;
Task disposeTask = Task.Run(() => socket1.Dispose());

var cts = new CancellationTokenSource();
Task timeoutTask = Task.Delay(30000, cts.Token);
Assert.NotSame(timeoutTask, await Task.WhenAny(disposeTask, socketOperation, timeoutTask));
cts.Cancel();

await Task.WhenAny(disposeTask, socketOperation).TimeoutAfter(30000);
await disposeTask;

SocketError? localSocketError = null;
Expand Down Expand Up @@ -1088,10 +1105,18 @@ await RetryHelper.ExecuteAsync(async () =>
break;
}
}
Assert.Equal(SocketError.ConnectionReset, peerSocketError);

if (!expectGracefulShutdown)
{
Assert.Equal(SocketError.ConnectionReset, peerSocketError);
}
else
{
Assert.Null(peerSocketError);
}
Comment on lines +1109 to +1116
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tmds the test failure I referred to turned out to be quite simple to fix.

}
}
}, maxAttempts: 10);
}, maxAttempts: 10, retryWhen: e => e is XunitException);
}

[Fact]
Expand Down
Loading