From 46b833b19803ae7696825f4b5502ec1a48c42c2a Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Wed, 30 Mar 2022 14:32:46 +0100 Subject: [PATCH] Port `Akka.Tests.IO` tests to `async/await` - `UdpConnectedIntegrationSpec` --- .../IO/UdpConnectedIntegrationSpec.cs | 55 ++++++++++--------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs b/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs index 103d793e7cd..65a7b22f815 100644 --- a/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs +++ b/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs @@ -9,6 +9,7 @@ using System.Linq; using System.Net; using System.Threading; +using System.Threading.Tasks; using Akka.Actor; using Akka.IO; using Akka.IO.Buffers; @@ -45,16 +46,16 @@ public UdpConnectedIntegrationSpec(ITestOutputHelper output) { } - private (IActorRef, IPEndPoint) BindUdp(IActorRef handler) + private async Task<(IActorRef, IPEndPoint)> BindUdpAsync(IActorRef handler) { var commander = CreateTestProbe(); commander.Send(Udp.Instance.Apply(Sys).Manager, new Udp.Bind(handler, new IPEndPoint(IPAddress.Loopback, 0))); IPEndPoint localAddress = null; - commander.ExpectMsg(x => localAddress = (IPEndPoint)x.LocalAddress); + await commander.ExpectMsgAsync(x => localAddress = (IPEndPoint)x.LocalAddress); return (commander.Sender, localAddress); } - private (IActorRef, IPEndPoint) ConnectUdp(IPEndPoint localAddress, IPEndPoint remoteAddress, IActorRef handler) + private async Task<(IActorRef, IPEndPoint)> ConnectUdpAsync(IPEndPoint localAddress, IPEndPoint remoteAddress, IActorRef handler) { var commander = CreateTestProbe(); IPEndPoint realLocalAddress = null; @@ -64,11 +65,11 @@ public UdpConnectedIntegrationSpec(ITestOutputHelper output) { new TestSocketOption(socket => realLocalAddress = (IPEndPoint)socket.LocalEndPoint) })); - commander.ExpectMsg(); + await commander.ExpectMsgAsync(); return (commander.Sender, realLocalAddress); } - private (IActorRef, IPEndPoint) ConnectUdp(IPEndPoint remoteAddress, IActorRef handler) + private async Task<(IActorRef, IPEndPoint)> ConnectUdpAsync(IPEndPoint remoteAddress, IActorRef handler) { var commander = CreateTestProbe(); IPEndPoint clientEndpoint = null; @@ -79,18 +80,18 @@ public UdpConnectedIntegrationSpec(ITestOutputHelper output) new TestSocketOption(socket => clientEndpoint = (IPEndPoint)socket.LocalEndPoint) })); - commander.ExpectMsg(); + await commander.ExpectMsgAsync(); return (commander.Sender, clientEndpoint); } [Fact] - public void The_UDP_connection_oriented_implementation_must_be_able_to_send_and_receive_without_binding() + public async Task The_UDP_connection_oriented_implementation_must_be_able_to_send_and_receive_without_binding() { - var (server, serverLocalEndpoint) = BindUdp(TestActor); + var (server, serverLocalEndpoint) = await BindUdpAsync(TestActor); var data1 = ByteString.FromString("To infinity and beyond!"); var data2 = ByteString.FromString("All your datagram belong to us"); - var (client, clientLocalEndpoint) = ConnectUdp(null, serverLocalEndpoint, TestActor); + var (client, clientLocalEndpoint) =await ConnectUdpAsync(null, serverLocalEndpoint, TestActor); client.Tell(UdpConnected.Send.Create(data1)); var clientAddress = ExpectMsgPf(TimeSpan.FromSeconds(3), "", msg => @@ -106,18 +107,18 @@ public void The_UDP_connection_oriented_implementation_must_be_able_to_send_and_ server.Tell(Udp.Send.Create(data2, clientAddress)); - ExpectMsg(x => x.Data.ShouldBe(data2)); + await ExpectMsgAsync(x => x.Data.ShouldBe(data2)); } [Fact] - public void The_UDP_connection_oriented_implementation_must_be_able_to_send_and_receive_with_binding() + public async Task The_UDP_connection_oriented_implementation_must_be_able_to_send_and_receive_with_binding() { var serverProbe = CreateTestProbe(); - var (server, serverLocalEndpoint) = BindUdp(serverProbe); + var (server, serverLocalEndpoint) = await BindUdpAsync(serverProbe); var data1 = ByteString.FromString("To infinity") + ByteString.FromString(" and beyond!"); var data2 = ByteString.FromString("All your datagram belong to us"); var clientProbe = CreateTestProbe(); - var (client, clientLocalEndpoint) = ConnectUdp(serverLocalEndpoint, clientProbe); + var (client, clientLocalEndpoint) = await ConnectUdpAsync(serverLocalEndpoint, clientProbe); client.Tell(UdpConnected.Send.Create(data1)); ExpectMsgPf(TimeSpan.FromSeconds(3), "", serverProbe, msg => @@ -132,16 +133,16 @@ public void The_UDP_connection_oriented_implementation_must_be_able_to_send_and_ server.Tell(Udp.Send.Create(data2, clientLocalEndpoint)); - clientProbe.ExpectMsg(x => x.Data.ShouldBe(data2)); + await clientProbe.ExpectMsgAsync(x => x.Data.ShouldBe(data2)); } [Fact] - public void The_UDP_connection_oriented_implementation_must_to_send_batch_writes_and_reads() + public async Task The_UDP_connection_oriented_implementation_must_to_send_batch_writes_and_reads() { var serverProbe = CreateTestProbe(); - var (server, serverEndPoint) = BindUdp(serverProbe); + var (server, serverEndPoint) = await BindUdpAsync(serverProbe); var clientProbe = CreateTestProbe(); - var (client, clientEndPoint) = ConnectUdp(serverEndPoint, clientProbe); + var (client, clientEndPoint) = await ConnectUdpAsync(serverEndPoint, clientProbe); var data = ByteString.FromString("Fly little packet!"); @@ -150,23 +151,23 @@ public void The_UDP_connection_oriented_implementation_must_to_send_batch_writes client.Tell(UdpConnected.Send.Create(data)); client.Tell(UdpConnected.Send.Create(data)); - var raw = serverProbe.ReceiveN(3); + var raw = await serverProbe.ReceiveNAsync(3, default).ToListAsync(); var serverMsgs = raw.Cast(); serverMsgs.Sum(x => x.Data.Count).Should().Be(data.Count * 3); - serverProbe.ExpectNoMsg(100.Milliseconds()); + await serverProbe.ExpectNoMsgAsync(100.Milliseconds()); // repeat in the other direction server.Tell(Udp.Send.Create(data, clientEndPoint)); server.Tell(Udp.Send.Create(data, clientEndPoint)); server.Tell(Udp.Send.Create(data, clientEndPoint)); - raw = clientProbe.ReceiveN(3); + raw = await clientProbe.ReceiveNAsync(3, default).ToListAsync(); var clientMsgs = raw.Cast(); clientMsgs.Sum(x => x.Data.Count).Should().Be(data.Count * 3); } [Fact] - public void The_UDP_connection_oriented_implementation_must_not_leak_memory() + public async Task The_UDP_connection_oriented_implementation_must_not_leak_memory() { const int batchCount = 2000; const int batchSize = 100; @@ -178,10 +179,10 @@ public void The_UDP_connection_oriented_implementation_must_not_leak_memory() poolInfo.Used.Should().Be(0); var serverProbe = CreateTestProbe(); - var (server, serverEndPoint) = BindUdp(serverProbe); + var (server, serverEndPoint) = await BindUdpAsync(serverProbe); var clientProbe = CreateTestProbe(); - var (client, clientEndPoint) = ConnectUdp(serverEndPoint, clientProbe); + var (client, clientEndPoint) = await ConnectUdpAsync(serverEndPoint, clientProbe); var data = ByteString.FromString("Fly little packet!"); @@ -191,19 +192,19 @@ public void The_UDP_connection_oriented_implementation_must_not_leak_memory() for (var j = 0; j < batchSize; ++j) client.Tell(UdpConnected.Send.Create(data)); - var msgs = serverProbe.ReceiveN(batchSize, TimeSpan.FromSeconds(10)); + var msgs = await serverProbe.ReceiveNAsync(batchSize, TimeSpan.FromSeconds(10)).ToListAsync(); var cast = msgs.Cast(); cast.Sum(m => m.Data.Count).Should().Be(data.Count * batchSize); } // stop all connections so all receives are stopped and all pending SocketAsyncEventArgs are collected server.Tell(Udp.Unbind.Instance, serverProbe); - serverProbe.ExpectMsg(); + await serverProbe.ExpectMsgAsync(); client.Tell(UdpConnected.Disconnect.Instance, clientProbe); - clientProbe.ExpectMsg(); + await clientProbe.ExpectMsgAsync(); // wait for all SocketAsyncEventArgs to be released - Thread.Sleep(1000); + await Task.Delay(1000); poolInfo = udpConnection.SocketEventArgsPool.BufferPoolInfo; poolInfo.Type.Should().Be(typeof(DirectBufferPool));