From d36a7b47ccff6f13161ee5f20d2de0035451b47c Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 26 Jul 2024 07:18:14 -0700 Subject: [PATCH 01/10] Use `EasyNetQ.Management.Client` * Allow specifying `MaxFrameSize` for a connection. * Move HTTP API interaction to `EasyNetQ.Management.Client` * No need to start toxiproxy yet * Use `uint.MinValue` to mean `unlimited` for max frame size * Implement the Environment class to manage the connections (#36) * Implement the Environment Closes https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues/35 --------- Signed-off-by: Gabriele Santomaggio Use `EasyNetQ.Management.Client` * Allow specifying `MaxFrameSize` for a connection. * Move HTTP API interaction to `EasyNetQ.Management.Client` * No need to start toxiproxy yet * Use `uint.MinValue` to mean `unlimited` for max frame size * * Add to public API Continue migrating to EasyNetQ.Management.Client --- .ci/ubuntu/gha-setup.sh | 10 +- .github/workflows/build-test.yaml | 11 +- RabbitMQ.AMQP.Client/IConnectionSettings.cs | 1 + RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs | 11 +- .../Impl/ConnectionSettings.cs | 37 ++++- RabbitMQ.AMQP.Client/Impl/Consts.cs | 4 + RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt | 6 +- Tests/BindingsTests.cs | 9 +- Tests/ConnectionRecoveryTests.cs | 11 +- Tests/ConnectionTests.cs | 12 +- Tests/Utils.cs | 137 ++++++++++-------- Tests/xunit.runner.json | 7 +- 12 files changed, 160 insertions(+), 96 deletions(-) diff --git a/.ci/ubuntu/gha-setup.sh b/.ci/ubuntu/gha-setup.sh index 46f80f0..185ab39 100755 --- a/.ci/ubuntu/gha-setup.sh +++ b/.ci/ubuntu/gha-setup.sh @@ -46,11 +46,6 @@ else readonly docker_pull_args='' fi -set -o nounset - -declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq" -declare -r toxiproxy_docker_name="$docker_name_prefix-toxiproxy" - if [[ $1 == 'stop' ]] then docker stop "$rabbitmq_docker_name" @@ -58,6 +53,11 @@ then exit 0 fi +set -o nounset + +declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq" +declare -r toxiproxy_docker_name="$docker_name_prefix-toxiproxy" + function start_toxiproxy { if [[ $run_toxiproxy == 'true' ]] diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml index ca3087e..1072873 100644 --- a/.github/workflows/build-test.yaml +++ b/.github/workflows/build-test.yaml @@ -26,14 +26,17 @@ jobs: run: dotnet format ${{ github.workspace }}/Build.csproj --no-restore --verify-no-changes - name: Start RabbitMQ id: start-rabbitmq - run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh toxiproxy + # Note: not using toxiproxy yet + # run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh toxiproxy + run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh - name: Test run: dotnet test ${{ github.workspace }}/Build.csproj --no-restore --no-build --logger "console;verbosity=detailed" /p:AltCover=true /p:AltCoverStrongNameKey=${{github.workspace}}/rabbit.snk - name: Check for errors in RabbitMQ logs run: ${{ github.workspace}}/.ci/ubuntu/gha-log-check.sh - - name: Maybe collect toxiproxy logs - if: failure() - run: docker logs rabbitmq-amqp-dotnet-client-toxiproxy > ${{ github.workspace }}/.ci/ubuntu/log/toxiproxy.log + # Note: not using toxiproxy yet + # - name: Maybe collect toxiproxy logs + # if: failure() + # run: docker logs rabbitmq-amqp-dotnet-client-toxiproxy > ${{ github.workspace }}/.ci/ubuntu/log/toxiproxy.log - name: Maybe upload RabbitMQ logs if: failure() uses: actions/upload-artifact@v4 diff --git a/RabbitMQ.AMQP.Client/IConnectionSettings.cs b/RabbitMQ.AMQP.Client/IConnectionSettings.cs index ad86807..6366d04 100644 --- a/RabbitMQ.AMQP.Client/IConnectionSettings.cs +++ b/RabbitMQ.AMQP.Client/IConnectionSettings.cs @@ -15,6 +15,7 @@ public interface IConnectionSettings : IEquatable string ConnectionName { get; } string Path { get; } bool UseSsl { get; } + uint MaxFrameSize { get; } SaslMechanism SaslMechanism { get; } ITlsSettings? TlsSettings { get; } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs index 1f4a041..b9235eb 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs @@ -182,9 +182,18 @@ private async Task EnsureConnection() var open = new Open { HostName = $"vhost:{_connectionSettings.VirtualHost}", - Properties = new Fields() { [new Symbol("connection_name")] = _connectionSettings.ConnectionName, } + Properties = new Fields() + { + [new Symbol("connection_name")] = _connectionSettings.ConnectionName, + } }; + if (_connectionSettings.MaxFrameSize > uint.MinValue) + { + // Note: when set here, there is no need to set cf.AMQP.MaxFrameSize + open.MaxFrameSize = _connectionSettings.MaxFrameSize; + } + void onOpened(Amqp.IConnection connection, Open open1) { Trace.WriteLine(TraceLevel.Verbose, $"Connection opened. Info: {ToString()}"); diff --git a/RabbitMQ.AMQP.Client/Impl/ConnectionSettings.cs b/RabbitMQ.AMQP.Client/Impl/ConnectionSettings.cs index b021406..5b34ad3 100644 --- a/RabbitMQ.AMQP.Client/Impl/ConnectionSettings.cs +++ b/RabbitMQ.AMQP.Client/Impl/ConnectionSettings.cs @@ -15,6 +15,7 @@ public class ConnectionSettingBuilder private string _scheme = "AMQP"; private string _connectionName = "AMQP.NET"; private string _virtualHost = "/"; + private uint _maxFrameSize = Consts.DefaultMaxFrameSize; private SaslMechanism _saslMechanism = Client.SaslMechanism.Plain; private IRecoveryConfiguration _recoveryConfiguration = Impl.RecoveryConfiguration.Create(); @@ -69,6 +70,17 @@ public ConnectionSettingBuilder VirtualHost(string virtualHost) return this; } + public ConnectionSettingBuilder MaxFrameSize(uint maxFrameSize) + { + _maxFrameSize = maxFrameSize; + if (_maxFrameSize != uint.MinValue && _maxFrameSize < 512) + { + throw new ArgumentOutOfRangeException(nameof(maxFrameSize), + "maxFrameSize must be greater or equal to 512"); + } + return this; + } + public ConnectionSettingBuilder SaslMechanism(SaslMechanism saslMechanism) { _saslMechanism = saslMechanism; @@ -89,9 +101,9 @@ public ConnectionSettingBuilder RecoveryConfiguration(IRecoveryConfiguration rec public ConnectionSettings Build() { - var c = new ConnectionSettings(_host, _port, _user, + var c = new ConnectionSettings(_scheme, _host, _port, _user, _password, _virtualHost, - _scheme, _connectionName, _saslMechanism) + _connectionName, _saslMechanism, _maxFrameSize) { Recovery = (RecoveryConfiguration)_recoveryConfiguration }; @@ -106,8 +118,9 @@ public ConnectionSettings Build() public class ConnectionSettings : IConnectionSettings { private readonly Address _address; - private readonly string _connectionName = ""; private readonly string _virtualHost = "/"; + private readonly string _connectionName = ""; + private readonly uint _maxFrameSize = Consts.DefaultMaxFrameSize; private readonly ITlsSettings? _tlsSettings; private readonly SaslMechanism _saslMechanism = SaslMechanism.Plain; @@ -122,10 +135,12 @@ public ConnectionSettings(string address, ITlsSettings? tlsSettings = null) } } - public ConnectionSettings(string host, int port, + public ConnectionSettings(string scheme, string host, int port, string? user, string? password, - string virtualHost, string scheme, string connectionName, - SaslMechanism saslMechanism, ITlsSettings? tlsSettings = null) + string virtualHost, string connectionName, + SaslMechanism saslMechanism, + uint maxFrameSize = Consts.DefaultMaxFrameSize, + ITlsSettings? tlsSettings = null) { _address = new Address(host: host, port: port, user: user, password: password, @@ -133,6 +148,14 @@ public ConnectionSettings(string host, int port, _connectionName = connectionName; _virtualHost = virtualHost; _saslMechanism = saslMechanism; + + _maxFrameSize = maxFrameSize; + if (_maxFrameSize != uint.MinValue && _maxFrameSize < 512) + { + throw new ArgumentOutOfRangeException(nameof(maxFrameSize), + "maxFrameSize must be greater or equal to 512"); + } + _tlsSettings = tlsSettings; if (_address.UseSsl && _tlsSettings == null) @@ -150,8 +173,8 @@ public ConnectionSettings(string host, int port, public string ConnectionName => _connectionName; public string Path => _address.Path; public bool UseSsl => _address.UseSsl; + public uint MaxFrameSize => _maxFrameSize; public SaslMechanism SaslMechanism => _saslMechanism; - public ITlsSettings? TlsSettings => _tlsSettings; public IRecoveryConfiguration Recovery { get; init; } = RecoveryConfiguration.Create(); diff --git a/RabbitMQ.AMQP.Client/Impl/Consts.cs b/RabbitMQ.AMQP.Client/Impl/Consts.cs index 74f822e..5052189 100644 --- a/RabbitMQ.AMQP.Client/Impl/Consts.cs +++ b/RabbitMQ.AMQP.Client/Impl/Consts.cs @@ -7,4 +7,8 @@ public class Consts public const string Queues = "queues"; public const string Bindings = "bindings"; + /// + /// uint.MinValue means "no limit" + /// + public const uint DefaultMaxFrameSize = uint.MinValue; // NOTE: Azure/amqpnetlite uses 256 * 1024 } diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt index fa92c3a..7a0add8 100644 --- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt @@ -1,6 +1,7 @@ #nullable enable abstract RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.CloseAsync() -> System.Threading.Tasks.Task! const RabbitMQ.AMQP.Client.Impl.Consts.Bindings = "bindings" -> string! +const RabbitMQ.AMQP.Client.Impl.Consts.DefaultMaxFrameSize = 0 -> uint const RabbitMQ.AMQP.Client.Impl.Consts.Exchanges = "exchanges" -> string! const RabbitMQ.AMQP.Client.Impl.Consts.Key = "key" -> string! const RabbitMQ.AMQP.Client.Impl.Consts.Queues = "queues" -> string! @@ -80,6 +81,7 @@ RabbitMQ.AMQP.Client.IConnection.PublisherBuilder() -> RabbitMQ.AMQP.Client.IPub RabbitMQ.AMQP.Client.IConnectionSettings RabbitMQ.AMQP.Client.IConnectionSettings.ConnectionName.get -> string! RabbitMQ.AMQP.Client.IConnectionSettings.Host.get -> string! +RabbitMQ.AMQP.Client.IConnectionSettings.MaxFrameSize.get -> uint RabbitMQ.AMQP.Client.IConnectionSettings.Password.get -> string? RabbitMQ.AMQP.Client.IConnectionSettings.Path.get -> string! RabbitMQ.AMQP.Client.IConnectionSettings.Port.get -> int @@ -328,6 +330,7 @@ RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.Build() -> RabbitMQ.AMQP.Client.Impl.ConnectionSettings! RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.ConnectionName(string! connectionName) -> RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder! RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.Host(string! host) -> RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder! +RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.MaxFrameSize(uint maxFrameSize) -> RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder! RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.Password(string! password) -> RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder! RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.Port(int port) -> RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder! RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.RecoveryConfiguration(RabbitMQ.AMQP.Client.IRecoveryConfiguration! recoveryConfiguration) -> RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder! @@ -338,10 +341,11 @@ RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.VirtualHost(string! virtualHo RabbitMQ.AMQP.Client.Impl.ConnectionSettings RabbitMQ.AMQP.Client.Impl.ConnectionSettings.ConnectionName.get -> string! RabbitMQ.AMQP.Client.Impl.ConnectionSettings.ConnectionSettings(string! address, RabbitMQ.AMQP.Client.ITlsSettings? tlsSettings = null) -> void -RabbitMQ.AMQP.Client.Impl.ConnectionSettings.ConnectionSettings(string! host, int port, string? user, string? password, string! virtualHost, string! scheme, string! connectionName, RabbitMQ.AMQP.Client.SaslMechanism! saslMechanism, RabbitMQ.AMQP.Client.ITlsSettings? tlsSettings = null) -> void +RabbitMQ.AMQP.Client.Impl.ConnectionSettings.ConnectionSettings(string! scheme, string! host, int port, string? user, string? password, string! virtualHost, string! connectionName, RabbitMQ.AMQP.Client.SaslMechanism! saslMechanism, uint maxFrameSize = 0, RabbitMQ.AMQP.Client.ITlsSettings? tlsSettings = null) -> void RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Equals(RabbitMQ.AMQP.Client.IConnectionSettings? other) -> bool RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Equals(RabbitMQ.AMQP.Client.Impl.ConnectionSettings! other) -> bool RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Host.get -> string! +RabbitMQ.AMQP.Client.Impl.ConnectionSettings.MaxFrameSize.get -> uint RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Password.get -> string? RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Path.get -> string! RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Port.get -> int diff --git a/Tests/BindingsTests.cs b/Tests/BindingsTests.cs index b77241d..149b366 100644 --- a/Tests/BindingsTests.cs +++ b/Tests/BindingsTests.cs @@ -38,7 +38,8 @@ await management.Binding().SourceExchange(sourceExchange).DestinationQueue(queue await management.QueueDeletion().Delete(queueDestination); await connection.CloseAsync(); SystemUtils.WaitUntil(() => !SystemUtils.ExchangeExists(sourceExchange)); - SystemUtils.WaitUntil(() => !SystemUtils.QueueExists(queueDestination)); + + await SystemUtils.WaitUntilQueueDeletedAsync(queueDestination); } [Fact] @@ -75,8 +76,9 @@ await management.Binding().SourceExchange("exchange_bind_two_times").Destination await management.QueueDeletion().Delete("queue_bind_two_times"); await connection.CloseAsync(); + SystemUtils.WaitUntil(() => !SystemUtils.ExchangeExists("exchange_bind_two_times")); - SystemUtils.WaitUntil(() => !SystemUtils.QueueExists("queue_bind_two_times")); + await SystemUtils.WaitUntilQueueDeletedAsync("queue_bind_two_times"); } @@ -154,8 +156,9 @@ await management.Binding().SourceExchange("exchange_bindings_with_arguments") await management.ExchangeDeletion().Delete("exchange_bindings_with_arguments"); await management.QueueDeletion().Delete("queue_bindings_with_arguments"); await connection.CloseAsync(); + SystemUtils.WaitUntil(() => !SystemUtils.ExchangeExists("exchange_bindings_with_arguments")); - SystemUtils.WaitUntil(() => !SystemUtils.QueueExists("queue_bindings_with_arguments")); + await SystemUtils.WaitUntilQueueDeletedAsync("queue_bindings_with_arguments"); } // TODO: test with multi-bindings with parameters with list as value diff --git a/Tests/ConnectionRecoveryTests.cs b/Tests/ConnectionRecoveryTests.cs index 3e2915c..2e4969e 100644 --- a/Tests/ConnectionRecoveryTests.cs +++ b/Tests/ConnectionRecoveryTests.cs @@ -232,10 +232,13 @@ public async Task RecoveryTopologyShouldRecoverTheTempQueues() await SystemUtils.WaitUntilConnectionIsKilled(connectionName); await completion.Task.WaitAsync(TimeSpan.FromSeconds(10)); SystemUtils.WaitUntil(() => recoveryEvents == 2); - SystemUtils.WaitUntil(() => SystemUtils.QueueExists(queueName)); + + await SystemUtils.WaitUntilQueueExistsAsync(queueName); await connection.CloseAsync(); - SystemUtils.WaitUntil(() => !SystemUtils.QueueExists(queueName)); + + await SystemUtils.WaitUntilQueueDeletedAsync(queueName); + TestOutputHelper.WriteLine( $"Recover: Queue count: {management.TopologyListener().QueueCount()} , events: {recoveryEvents}"); Assert.Equal(0, management.TopologyListener().QueueCount()); @@ -277,7 +280,9 @@ public async Task RecoveryTopologyShouldNotRecoverTheTempQueues() await SystemUtils.WaitUntilConnectionIsKilled(connectionName); await completion.Task.WaitAsync(TimeSpan.FromSeconds(10)); - SystemUtils.WaitUntil(() => SystemUtils.QueueExists(queueName) == false); + + await SystemUtils.WaitUntilQueueDeletedAsync(queueName); + await connection.CloseAsync(); Assert.Equal(0, management.TopologyListener().QueueCount()); TestOutputHelper.WriteLine( diff --git a/Tests/ConnectionTests.cs b/Tests/ConnectionTests.cs index a338ed6..d27d204 100644 --- a/Tests/ConnectionTests.cs +++ b/Tests/ConnectionTests.cs @@ -13,8 +13,8 @@ public class ConnectionTests(ITestOutputHelper output) [Fact] public void ValidateAddress() { - ConnectionSettings connectionSettings = new("localhost", 5672, "guest-user", - "guest-password", "vhost_1", "amqp1", "connection_name", SaslMechanism.External); + ConnectionSettings connectionSettings = new("amqp1", "localhost", 5672, "guest-user", + "guest-password", "vhost_1", "connection_name", SaslMechanism.External); Assert.Equal("localhost", connectionSettings.Host); Assert.Equal(5672, connectionSettings.Port); Assert.Equal("guest-user", connectionSettings.User); @@ -23,13 +23,13 @@ public void ValidateAddress() Assert.Equal("amqp1", connectionSettings.Scheme); Assert.Equal(SaslMechanism.External, connectionSettings.SaslMechanism); - ConnectionSettings second = new("localhost", 5672, "guest-user", - "guest-password", "path/", "amqp1", "connection_name", SaslMechanism.External); + ConnectionSettings second = new("amqp1", "localhost", 5672, "guest-user", + "guest-password", "path/", "connection_name", SaslMechanism.External); Assert.Equal(connectionSettings, second); - ConnectionSettings third = new("localhost", 5672, "guest-user", - "guest-password", "path/", "amqp2", "connection_name", SaslMechanism.Plain); + ConnectionSettings third = new("amqp2", "localhost", 5672, "guest-user", + "guest-password", "path/", "connection_name", SaslMechanism.Plain); Assert.NotEqual(connectionSettings, third); } diff --git a/Tests/Utils.cs b/Tests/Utils.cs index 2ca2044..fe23cbf 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -13,42 +13,11 @@ using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using Xunit; -using Xunit.Abstractions; +using EasyNetQ.Management.Client; using Xunit.Sdk; namespace Tests { - public class Utils(ITestOutputHelper testOutputHelper) - { - public void WaitUntilTaskCompletes(TaskCompletionSource tasks) - { - WaitUntilTaskCompletes(tasks, true, TimeSpan.FromSeconds(10)); - } - - public void WaitUntilTaskCompletes(TaskCompletionSource tasks, - bool expectToComplete = true) - { - WaitUntilTaskCompletes(tasks, expectToComplete, TimeSpan.FromSeconds(10)); - } - - public void WaitUntilTaskCompletes(TaskCompletionSource tasks, - bool expectToComplete, - TimeSpan timeOut) - { - try - { - var resultTestWait = tasks.Task.Wait(timeOut); - Assert.Equal(resultTestWait, expectToComplete); - } - catch (Exception e) - { - testOutputHelper.WriteLine($"wait until task completes error #{e}"); - throw; - } - } - } - public static class SystemUtils { // Waits for 10 seconds total by default @@ -67,10 +36,14 @@ public static void WaitUntil(Func func, ushort retries = 40) public static async Task WaitUntilAsync(Func> func, ushort retries = 10) { - Wait(); + var delaySpan = TimeSpan.FromMilliseconds(500); + + await Task.Delay(delaySpan); + while (!await func()) { - Wait(); + await Task.Delay(delaySpan); + --retries; if (retries == 0) { @@ -89,7 +62,6 @@ public static void Wait(TimeSpan wait) Thread.Sleep(wait); } - private class Connection { public string? name { get; set; } @@ -139,34 +111,27 @@ public static async Task ConnectionsCountByName(string connectionName) public static async Task IsConnectionOpen(string connectionName) { - using HttpClientHandler handler = new HttpClientHandler { Credentials = new NetworkCredential("guest", "guest"), }; - using HttpClient client = new HttpClient(handler); + var managementUri = new Uri("http://localhost:15672"); + using var managementClient = new ManagementClient(managementUri, "guest", "guest"); - HttpResponseMessage result = await client.GetAsync("http://localhost:15672/api/connections"); - if (!result.IsSuccessStatusCode) - { - throw new XunitException($"HTTP GET failed: {result.StatusCode} {result.ReasonPhrase}"); - } + IReadOnlyList connections = await managementClient.GetConnectionsAsync(); - Stream resultContentStream = await result.Content.ReadAsStreamAsync(); - object? obj = await JsonSerializer.DeserializeAsync(resultContentStream, typeof(IEnumerable)); - return obj switch - { - null => false, - IEnumerable connections => - connections.Any(x => + return connections.Any(conn => { - if (x.client_properties is null) - { - return false; - } - else + if (conn.ClientProperties is not null) { - return x.client_properties["connection_name"].Contains(connectionName); + if (conn.ClientProperties.TryGetValue("connection_name", out object? connectionNameObj)) + { + if (connectionNameObj is not null) + { + string connName = (string)connectionNameObj; + return connName.Contains(connectionName); + } + } } - }), - _ => false - }; + + return false; + }); } public static async Task HttpKillConnections(string connectionName) @@ -256,12 +221,58 @@ private static HttpClient CreateHttpClient() return new HttpClient(handler); } - public static bool QueueExists(string queue) + public static Task WaitUntilQueueExistsAsync(string queueNameStr) { - Task task = CreateHttpClient().GetAsync($"http://localhost:15672/api/queues/%2F/{queue}"); - task.Wait(TimeSpan.FromSeconds(10)); - HttpResponseMessage result = task.Result; - return result.IsSuccessStatusCode; + return WaitUntilAsync(() => + { + return CheckQueueAsync(queueNameStr, checkExisting: true); + }); + } + + public static Task WaitUntilQueueDeletedAsync(string queueNameStr) + { + return WaitUntilAsync(() => + { + return CheckQueueAsync(queueNameStr, checkExisting: false); + }); + } + + public static async Task CheckQueueAsync(string queueNameStr, bool checkExisting = true) + { + var managementUri = new Uri("http://localhost:15672"); + using var managementClient = new ManagementClient(managementUri, "guest", "guest"); + + var queueName = new EasyNetQ.Management.Client.Model.QueueName(queueNameStr, "/"); + try + { + EasyNetQ.Management.Client.Model.Queue? queue = await managementClient.GetQueueAsync(queueName); + if (checkExisting) + { + return queue is not null; + } + else + { + return queue is null; + } + } + catch (UnexpectedHttpStatusCodeException ex) + { + if (ex.StatusCode == HttpStatusCode.NotFound) + { + if (checkExisting) + { + return false; + } + else + { + return true; + } + } + else + { + throw; + } + } } public static bool ExchangeExists(string exchange) diff --git a/Tests/xunit.runner.json b/Tests/xunit.runner.json index a72a9b7..b8924ef 100644 --- a/Tests/xunit.runner.json +++ b/Tests/xunit.runner.json @@ -1,5 +1,6 @@ { - "parallelizeAssembly": false, - "parallelizeTestCollections": false, + "$schema": "https://xunit.net/schema/current/xunit.runner.schema.json", + "parallelizeAssembly": true, + "parallelizeTestCollections": true, "stopOnFail": true -} \ No newline at end of file +} From 7cd80a2207a210205b39fa8d4d25d49649b184de Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 26 Jul 2024 12:10:56 -0700 Subject: [PATCH 02/10] * Use `EasyNetQ.Management.Client` to determine exchange existence. --- Tests/BindingsTests.cs | 22 ++++--- Tests/ManagementTests.cs | 28 ++++++--- Tests/Utils.cs | 125 ++++++++++++++++++++++++++------------- 3 files changed, 119 insertions(+), 56 deletions(-) diff --git a/Tests/BindingsTests.cs b/Tests/BindingsTests.cs index 149b366..810a2c2 100644 --- a/Tests/BindingsTests.cs +++ b/Tests/BindingsTests.cs @@ -22,7 +22,9 @@ public async Task SimpleBindingsBetweenExchangeAndQueue(string sourceExchange, s await management.Queue().Name(queueDestination).Declare(); await management.Binding().SourceExchange(sourceExchange).DestinationQueue(queueDestination) .Key("key").Bind(); - SystemUtils.WaitUntil(() => SystemUtils.ExchangeExists(sourceExchange)); + + await SystemUtils.WaitUntilExchangeExistsAsync(sourceExchange); + SystemUtils.WaitUntil(() => SystemUtils.BindsBetweenExchangeAndQueueExists(sourceExchange, queueDestination)); @@ -37,8 +39,8 @@ await management.Binding().SourceExchange(sourceExchange).DestinationQueue(queue await management.ExchangeDeletion().Delete(sourceExchange); await management.QueueDeletion().Delete(queueDestination); await connection.CloseAsync(); - SystemUtils.WaitUntil(() => !SystemUtils.ExchangeExists(sourceExchange)); + await SystemUtils.WaitUntilExchangeDeletedAsync(sourceExchange); await SystemUtils.WaitUntilQueueDeletedAsync(queueDestination); } @@ -77,7 +79,7 @@ await management.Binding().SourceExchange("exchange_bind_two_times").Destination await management.QueueDeletion().Delete("queue_bind_two_times"); await connection.CloseAsync(); - SystemUtils.WaitUntil(() => !SystemUtils.ExchangeExists("exchange_bind_two_times")); + await SystemUtils.WaitUntilExchangeDeletedAsync("exchange_bind_two_times"); await SystemUtils.WaitUntilQueueDeletedAsync("queue_bind_two_times"); } @@ -99,7 +101,8 @@ public async Task SimpleBindingsBetweenExchangeAndExchange(string sourceExchange await management.Binding().SourceExchange(sourceExchange) .DestinationExchange(destinationExchange) .Key(key).Bind(); - SystemUtils.WaitUntil(() => SystemUtils.ExchangeExists(sourceExchange)); + + await SystemUtils.WaitUntilExchangeExistsAsync(sourceExchange); SystemUtils.WaitUntil(() => SystemUtils.BindsBetweenExchangeAndExchangeExists(sourceExchange, @@ -116,8 +119,9 @@ await management.Binding().SourceExchange(sourceExchange) await management.ExchangeDeletion().Delete(sourceExchange); await management.ExchangeDeletion().Delete(destinationExchange); await connection.CloseAsync(); - SystemUtils.WaitUntil(() => !SystemUtils.ExchangeExists(sourceExchange)); - SystemUtils.WaitUntil(() => !SystemUtils.ExchangeExists(destinationExchange)); + + await SystemUtils.WaitUntilExchangeDeletedAsync(sourceExchange); + await SystemUtils.WaitUntilExchangeDeletedAsync(destinationExchange); } [Theory] @@ -138,7 +142,9 @@ await management.Binding().SourceExchange("exchange_bindings_with_arguments") .Key("key") .Arguments(arguments) .Bind(); - SystemUtils.WaitUntil(() => SystemUtils.ExchangeExists("exchange_bindings_with_arguments")); + + await SystemUtils.WaitUntilExchangeExistsAsync("exchange_bindings_with_arguments"); + SystemUtils.WaitUntil(() => SystemUtils.ArgsBindsBetweenExchangeAndQueueExists("exchange_bindings_with_arguments", "queue_bindings_with_arguments", arguments)); @@ -157,7 +163,7 @@ await management.Binding().SourceExchange("exchange_bindings_with_arguments") await management.QueueDeletion().Delete("queue_bindings_with_arguments"); await connection.CloseAsync(); - SystemUtils.WaitUntil(() => !SystemUtils.ExchangeExists("exchange_bindings_with_arguments")); + await SystemUtils.WaitUntilExchangeDeletedAsync("exchange_bindings_with_arguments"); await SystemUtils.WaitUntilQueueDeletedAsync("queue_bindings_with_arguments"); } diff --git a/Tests/ManagementTests.cs b/Tests/ManagementTests.cs index 3157491..fd013e6 100644 --- a/Tests/ManagementTests.cs +++ b/Tests/ManagementTests.cs @@ -343,12 +343,18 @@ await Assert.ThrowsAsync(() => [Fact] public async Task SimpleDeclareAndDeleteExchangeWithName() { + const string exchangeName = "my_first_exchange"; IConnection connection = await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Build()); IManagement management = connection.Management(); - await management.Exchange("my_first_exchange").Type(ExchangeType.TOPIC).Declare(); - SystemUtils.WaitUntil(() => SystemUtils.ExchangeExists("my_first_exchange")); - await management.ExchangeDeletion().Delete("my_first_exchange"); - SystemUtils.WaitUntil(() => SystemUtils.ExchangeExists("my_first_exchange") == false); + + await management.Exchange(exchangeName).Type(ExchangeType.TOPIC).Declare(); + + await SystemUtils.WaitUntilExchangeExistsAsync(exchangeName); + + await management.ExchangeDeletion().Delete(exchangeName); + + await SystemUtils.WaitUntilExchangeDeletedAsync(exchangeName); + await connection.CloseAsync(); } @@ -368,10 +374,13 @@ public async Task ExchangeWithDifferentArgs() IConnection connection = await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Build()); IManagement management = connection.Management(); await management.Exchange("my_exchange_with_args").AutoDelete(true).Argument("my_key", "my _value").Declare(); - SystemUtils.WaitUntil(() => SystemUtils.ExchangeExists("my_exchange_with_args")); + + await SystemUtils.WaitUntilExchangeExistsAsync("my_exchange_with_args"); + await management.ExchangeDeletion().Delete("my_exchange_with_args"); await connection.CloseAsync(); - SystemUtils.WaitUntil(() => !SystemUtils.ExchangeExists("my_exchange_with_args")); + + await SystemUtils.WaitUntilExchangeDeletedAsync("my_exchange_with_args"); } @@ -385,10 +394,13 @@ await management.Exchange("my_exchange_raise_precondition_fail").AutoDelete(true await Assert.ThrowsAsync(async () => await management.Exchange("my_exchange_raise_precondition_fail").AutoDelete(false) .Argument("my_key_2", "my _value_2").Declare()); - SystemUtils.WaitUntil(() => SystemUtils.ExchangeExists("my_exchange_raise_precondition_fail")); + + await SystemUtils.WaitUntilExchangeExistsAsync("my_exchange_raise_precondition_fail"); + await management.ExchangeDeletion().Delete("my_exchange_raise_precondition_fail"); await connection.CloseAsync(); - SystemUtils.WaitUntil(() => !SystemUtils.ExchangeExists("my_exchange_raise_precondition_fail")); + + await SystemUtils.WaitUntilExchangeDeletedAsync("my_exchange_raise_precondition_fail"); } diff --git a/Tests/Utils.cs b/Tests/Utils.cs index fe23cbf..4704cf7 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -237,51 +237,20 @@ public static Task WaitUntilQueueDeletedAsync(string queueNameStr) }); } - public static async Task CheckQueueAsync(string queueNameStr, bool checkExisting = true) + public static Task WaitUntilExchangeExistsAsync(string exchangeNameStr) { - var managementUri = new Uri("http://localhost:15672"); - using var managementClient = new ManagementClient(managementUri, "guest", "guest"); - - var queueName = new EasyNetQ.Management.Client.Model.QueueName(queueNameStr, "/"); - try - { - EasyNetQ.Management.Client.Model.Queue? queue = await managementClient.GetQueueAsync(queueName); - if (checkExisting) - { - return queue is not null; - } - else - { - return queue is null; - } - } - catch (UnexpectedHttpStatusCodeException ex) + return WaitUntilAsync(() => { - if (ex.StatusCode == HttpStatusCode.NotFound) - { - if (checkExisting) - { - return false; - } - else - { - return true; - } - } - else - { - throw; - } - } + return CheckExchangeAsync(exchangeNameStr, checkExisting: true); + }); } - public static bool ExchangeExists(string exchange) + public static Task WaitUntilExchangeDeletedAsync(string exchangeNameStr) { - Task task = CreateHttpClient() - .GetAsync($"http://localhost:15672/api/exchanges/%2F/{Uri.EscapeDataString(exchange)}"); - task.Wait(TimeSpan.FromSeconds(10)); - HttpResponseMessage result = task.Result; - return result.IsSuccessStatusCode; + return WaitUntilAsync(() => + { + return CheckExchangeAsync(exchangeNameStr, checkExisting: false); + }); } public static bool BindsBetweenExchangeAndQueueExists(string exchange, string queue) @@ -407,5 +376,81 @@ public static byte[] GetFileContent(string fileName) fileTask.Wait(TimeSpan.FromSeconds(1)); return fileTask.Result; } + + private static async Task CheckExchangeAsync(string exchangeNameStr, bool checkExisting = true) + { + var managementUri = new Uri("http://localhost:15672"); + using var managementClient = new ManagementClient(managementUri, "guest", "guest"); + + var exchangeName = new EasyNetQ.Management.Client.Model.ExchangeName(exchangeNameStr, "/"); + try + { + EasyNetQ.Management.Client.Model.Exchange? exchange = await managementClient.GetExchangeAsync(exchangeName); + if (checkExisting) + { + return exchange is not null; + } + else + { + return exchange is null; + } + } + catch (UnexpectedHttpStatusCodeException ex) + { + if (ex.StatusCode == HttpStatusCode.NotFound) + { + if (checkExisting) + { + return false; + } + else + { + return true; + } + } + else + { + throw; + } + } + } + + private static async Task CheckQueueAsync(string queueNameStr, bool checkExisting = true) + { + var managementUri = new Uri("http://localhost:15672"); + using var managementClient = new ManagementClient(managementUri, "guest", "guest"); + + var queueName = new EasyNetQ.Management.Client.Model.QueueName(queueNameStr, "/"); + try + { + EasyNetQ.Management.Client.Model.Queue? queue = await managementClient.GetQueueAsync(queueName); + if (checkExisting) + { + return queue is not null; + } + else + { + return queue is null; + } + } + catch (UnexpectedHttpStatusCodeException ex) + { + if (ex.StatusCode == HttpStatusCode.NotFound) + { + if (checkExisting) + { + return false; + } + else + { + return true; + } + } + else + { + throw; + } + } + } } } From 065283fcb1bd76d0d488e06ac7c405394f356e6d Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 26 Jul 2024 13:29:32 -0700 Subject: [PATCH 03/10] * Use `EasyNetQ.Management.Client` to check for exchange/queue bindings --- Tests/BindingsTests.cs | 28 +++++------- Tests/Utils.cs | 101 +++++++++++++++++++++++++++++++++-------- 2 files changed, 93 insertions(+), 36 deletions(-) diff --git a/Tests/BindingsTests.cs b/Tests/BindingsTests.cs index 810a2c2..377a8ce 100644 --- a/Tests/BindingsTests.cs +++ b/Tests/BindingsTests.cs @@ -25,16 +25,12 @@ await management.Binding().SourceExchange(sourceExchange).DestinationQueue(queue await SystemUtils.WaitUntilExchangeExistsAsync(sourceExchange); - SystemUtils.WaitUntil(() => - SystemUtils.BindsBetweenExchangeAndQueueExists(sourceExchange, - queueDestination)); + await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistAsync(sourceExchange, queueDestination); await management.Binding().SourceExchange(sourceExchange).DestinationQueue(queueDestination) .Key("key").Unbind(); - SystemUtils.WaitUntil(() => - !SystemUtils.BindsBetweenExchangeAndQueueExists(sourceExchange, - queueDestination)); + await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(sourceExchange, queueDestination); await management.ExchangeDeletion().Delete(sourceExchange); await management.QueueDeletion().Delete(queueDestination); @@ -49,30 +45,28 @@ public async Task BindBetweenExchangeAndQueueTwoTimes() { IConnection connection = await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Build()); IManagement management = connection.Management(); + await management.Exchange("exchange_bind_two_times").Declare(); await management.Queue().Name("queue_bind_two_times").Declare(); + await management.Binding().SourceExchange("exchange_bind_two_times").DestinationQueue("queue_bind_two_times") .Key("first_key").Bind(); await management.Binding().SourceExchange("exchange_bind_two_times").DestinationQueue("queue_bind_two_times") .Key("second_key").Bind(); - SystemUtils.WaitUntil(() => - SystemUtils.BindsBetweenExchangeAndQueueExists("exchange_bind_two_times", - "queue_bind_two_times")); + + await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistAsync("exchange_bind_two_times", "queue_bind_two_times"); await management.Binding().SourceExchange("exchange_bind_two_times").DestinationQueue("queue_bind_two_times") .Key("first_key") .Unbind(); - SystemUtils.WaitUntil(() => - SystemUtils.BindsBetweenExchangeAndQueueExists("exchange_bind_two_times", - "queue_bind_two_times")); + await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistAsync("exchange_bind_two_times", "queue_bind_two_times"); await management.Binding().SourceExchange("exchange_bind_two_times").DestinationQueue("queue_bind_two_times") .Key("second_key") .Unbind(); - SystemUtils.WaitUntil(() => !SystemUtils.BindsBetweenExchangeAndQueueExists("exchange_bind_two_times", - "queue_bind_two_times")); + await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync("exchange_bind_two_times", "queue_bind_two_times"); await management.ExchangeDeletion().Delete("exchange_bind_two_times"); @@ -148,16 +142,16 @@ await management.Binding().SourceExchange("exchange_bindings_with_arguments") SystemUtils.WaitUntil(() => SystemUtils.ArgsBindsBetweenExchangeAndQueueExists("exchange_bindings_with_arguments", "queue_bindings_with_arguments", arguments)); + await management.Binding().SourceExchange("exchange_bindings_with_arguments") .DestinationQueue("queue_bindings_with_arguments") .Key("key").Arguments(arguments).Unbind(); + SystemUtils.WaitUntil(() => !SystemUtils.ArgsBindsBetweenExchangeAndQueueExists("exchange_bindings_with_arguments", "queue_bindings_with_arguments", arguments)); - SystemUtils.WaitUntil(() => - !SystemUtils.BindsBetweenExchangeAndQueueExists("exchange_bindings_with_arguments", - "queue_bindings_with_arguments")); + await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync("exchange_bindings_with_arguments", "queue_bindings_with_arguments"); await management.ExchangeDeletion().Delete("exchange_bindings_with_arguments"); await management.QueueDeletion().Delete("queue_bindings_with_arguments"); diff --git a/Tests/Utils.cs b/Tests/Utils.cs index 4704cf7..bd8c9c5 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -14,6 +14,7 @@ using System.Threading; using System.Threading.Tasks; using EasyNetQ.Management.Client; +using EasyNetQ.Management.Client.Model; using Xunit.Sdk; namespace Tests @@ -253,14 +254,20 @@ public static Task WaitUntilExchangeDeletedAsync(string exchangeNameStr) }); } - public static bool BindsBetweenExchangeAndQueueExists(string exchange, string queue) + public static Task WaitUntilBindingsBetweenExchangeAndQueueExistAsync(string exchangeNameStr, string queueNameStr) { - Task resp = CreateHttpClient() - .GetAsync( - $"http://localhost:15672/api/bindings/%2F/e/{Uri.EscapeDataString(exchange)}/q/{Uri.EscapeDataString(queue)}"); - resp.Wait(TimeSpan.FromSeconds(10)); - string body = resp.Result.Content.ReadAsStringAsync().Result; - return body != "[]" && resp.Result.IsSuccessStatusCode; + return WaitUntilAsync(() => + { + return CheckBindingsBetweenExchangeAndQueueAsync(exchangeNameStr, queueNameStr, checkExisting: true); + }); + } + + public static Task WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(string exchangeNameStr, string queueNameStr) + { + return WaitUntilAsync(() => + { + return CheckBindingsBetweenExchangeAndQueueAsync(exchangeNameStr, queueNameStr, checkExisting: false); + }); } public static bool ArgsBindsBetweenExchangeAndQueueExists(string exchange, string queue, @@ -379,20 +386,23 @@ public static byte[] GetFileContent(string fileName) private static async Task CheckExchangeAsync(string exchangeNameStr, bool checkExisting = true) { + // Assume success + bool rv = true; + var managementUri = new Uri("http://localhost:15672"); using var managementClient = new ManagementClient(managementUri, "guest", "guest"); - var exchangeName = new EasyNetQ.Management.Client.Model.ExchangeName(exchangeNameStr, "/"); + var exchangeName = new ExchangeName(exchangeNameStr, "/"); try { - EasyNetQ.Management.Client.Model.Exchange? exchange = await managementClient.GetExchangeAsync(exchangeName); + Exchange? exchange = await managementClient.GetExchangeAsync(exchangeName); if (checkExisting) { - return exchange is not null; + rv = exchange is not null; } else { - return exchange is null; + rv = exchange is null; } } catch (UnexpectedHttpStatusCodeException ex) @@ -401,11 +411,11 @@ private static async Task CheckExchangeAsync(string exchangeNameStr, bool { if (checkExisting) { - return false; + rv = false; } else { - return true; + rv = true; } } else @@ -413,24 +423,29 @@ private static async Task CheckExchangeAsync(string exchangeNameStr, bool throw; } } + + return rv; } private static async Task CheckQueueAsync(string queueNameStr, bool checkExisting = true) { + // Assume success + bool rv = true; + var managementUri = new Uri("http://localhost:15672"); using var managementClient = new ManagementClient(managementUri, "guest", "guest"); - var queueName = new EasyNetQ.Management.Client.Model.QueueName(queueNameStr, "/"); + var queueName = new QueueName(queueNameStr, "/"); try { - EasyNetQ.Management.Client.Model.Queue? queue = await managementClient.GetQueueAsync(queueName); + Queue? queue = await managementClient.GetQueueAsync(queueName); if (checkExisting) { - return queue is not null; + rv = queue is not null; } else { - return queue is null; + rv = queue is null; } } catch (UnexpectedHttpStatusCodeException ex) @@ -439,11 +454,11 @@ private static async Task CheckQueueAsync(string queueNameStr, bool checkE { if (checkExisting) { - return false; + rv = false; } else { - return true; + rv = true; } } else @@ -451,6 +466,54 @@ private static async Task CheckQueueAsync(string queueNameStr, bool checkE throw; } } + + return rv; + } + + private static async Task CheckBindingsBetweenExchangeAndQueueAsync(string exchangeNameStr, string queueNameStr, bool checkExisting = true) + { + // Assume success + bool rv = true; + + var managementUri = new Uri("http://localhost:15672"); + using var managementClient = new ManagementClient(managementUri, "guest", "guest"); + + var exchangeName = new ExchangeName(exchangeNameStr, "/"); + var queueName = new QueueName(queueNameStr, "/"); + try + { + IReadOnlyList bindings = await managementClient.GetQueueBindingsAsync(exchangeName, queueName); + if (checkExisting) + { + if (bindings.Count == 0) + { + rv = false; + } + } + else + { + if (bindings.Count > 0) + { + rv = false; + } + } + } + catch (UnexpectedHttpStatusCodeException ex) + { + if (ex.StatusCode == HttpStatusCode.NotFound) + { + if (checkExisting) + { + rv = false; + } + } + else + { + throw; + } + } + + return rv; } } } From e09bc333538c1a6c2d76157599e05381f9dbdb1c Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 26 Jul 2024 13:41:00 -0700 Subject: [PATCH 04/10] * Use `EasyNetQ.Management.Client` to kill connections. --- Tests/ConsumerTests.cs | 4 ++- Tests/Utils.cs | 71 ++++++++++++++++-------------------------- 2 files changed, 30 insertions(+), 45 deletions(-) diff --git a/Tests/ConsumerTests.cs b/Tests/ConsumerTests.cs index b8d67ec..47bf2ac 100644 --- a/Tests/ConsumerTests.cs +++ b/Tests/ConsumerTests.cs @@ -77,9 +77,11 @@ await publisher.Publish(new AmqpMessage("Hello world!"), } } ).Build(); - await Task.WhenAny(tcs.Task, Task.Delay(5000)); + + await Task.WhenAny(tcs.Task, Task.Delay(10000)); Assert.True(tcs.Task.IsCompleted); await consumer.CloseAsync(); + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount("ConsumerReQueueMessage") == 0, 500); await management.QueueDeletion().Delete("ConsumerReQueueMessage"); await connection.CloseAsync(); diff --git a/Tests/Utils.cs b/Tests/Utils.cs index bd8c9c5..872325d 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -135,61 +135,44 @@ public static async Task IsConnectionOpen(string connectionName) }); } - public static async Task HttpKillConnections(string connectionName) + public static async Task KillConnectionAsync(string connectionName) { - using HttpClientHandler handler = new HttpClientHandler(); - handler.Credentials = new NetworkCredential("guest", "guest"); - using HttpClient client = new HttpClient(handler); - - HttpResponseMessage result = await client.GetAsync("http://localhost:15672/api/connections"); - if (!result.IsSuccessStatusCode && result.StatusCode != HttpStatusCode.NotFound) - { - throw new XunitException($"HTTP GET failed: {result.StatusCode} {result.ReasonPhrase}"); - } + var managementUri = new Uri("http://localhost:15672"); + using var managementClient = new ManagementClient(managementUri, "guest", "guest"); - string json = await result.Content.ReadAsStringAsync(); - IEnumerable? connections = JsonSerializer.Deserialize>(json); - if (connections == null) - { - return 0; - } + IReadOnlyList connections = await managementClient.GetConnectionsAsync(); // we kill _only_ producer and consumer connections // leave the locator up and running to delete the stream - IEnumerable iEnumerable = connections.Where(x => + IEnumerable filteredConnections = connections.Where(conn => { - if (x.client_properties is null) - { - return false; - } - else + if (conn.ClientProperties is not null) { - return x.client_properties["connection_name"].Contains(connectionName); + if (conn.ClientProperties.TryGetValue("connection_name", out object? connectionNameObj)) + { + if (connectionNameObj is not null) + { + string connName = (string)connectionNameObj; + return connName.Contains(connectionName); + } + } } + + return false; }); - Connection[] enumerable = iEnumerable as Connection[] ?? iEnumerable.ToArray(); + int killed = 0; - foreach (Connection conn in enumerable) + foreach (EasyNetQ.Management.Client.Model.Connection conn in filteredConnections) { - /* - * NOTE: - * this is the equivalent to this JS code: - * https://github.com/rabbitmq/rabbitmq-server/blob/master/deps/rabbitmq_management/priv/www/js/formatters.js#L710-L712 - * - * function esc(str) { - * return encodeURIComponent(str); - * } - * - * https://stackoverflow.com/a/4550600 - */ - if (conn.name is not null) + try + { + await managementClient.CloseConnectionAsync(conn); + } + catch (UnexpectedHttpStatusCodeException ex) { - string s = Uri.EscapeDataString(conn.name); - HttpResponseMessage deleteResult = await client.DeleteAsync($"http://localhost:15672/api/connections/{s}"); - if (!deleteResult.IsSuccessStatusCode && result.StatusCode != HttpStatusCode.NotFound) + if (ex.StatusCode != HttpStatusCode.NotFound) { - throw new XunitException( - $"HTTP DELETE failed: {deleteResult.StatusCode} {deleteResult.ReasonPhrase}"); + throw; } } @@ -203,14 +186,14 @@ public static async Task WaitUntilConnectionIsKilled(string connectionName) { await WaitUntilAsync(async () => await IsConnectionOpen(connectionName)); Wait(); - await WaitUntilAsync(async () => await HttpKillConnections(connectionName) == 1); + await WaitUntilAsync(async () => await KillConnectionAsync(connectionName) == 1); } public static async Task WaitUntilConnectionIsKilledAndOpen(string connectionName) { await WaitUntilAsync(async () => await IsConnectionOpen(connectionName)); Wait(); - await WaitUntilAsync(async () => await HttpKillConnections(connectionName) == 1); + await WaitUntilAsync(async () => await KillConnectionAsync(connectionName) == 1); Wait(); await WaitUntilAsync(async () => await IsConnectionOpen(connectionName)); Wait(); From 60a524c42e1a2b6648a02c15399eb664110911a8 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 29 Jul 2024 12:20:15 -0700 Subject: [PATCH 05/10] * Migrate binding checks with arguments to `EasyNetQ.Management.Client` --- Tests/BindingsTests.cs | 31 ++++---- Tests/Utils.cs | 172 ++++++++++++++++++++++++----------------- 2 files changed, 115 insertions(+), 88 deletions(-) diff --git a/Tests/BindingsTests.cs b/Tests/BindingsTests.cs index 377a8ce..1c56309 100644 --- a/Tests/BindingsTests.cs +++ b/Tests/BindingsTests.cs @@ -139,17 +139,17 @@ await management.Binding().SourceExchange("exchange_bindings_with_arguments") await SystemUtils.WaitUntilExchangeExistsAsync("exchange_bindings_with_arguments"); - SystemUtils.WaitUntil(() => - SystemUtils.ArgsBindsBetweenExchangeAndQueueExists("exchange_bindings_with_arguments", - "queue_bindings_with_arguments", arguments)); + await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync( + "exchange_bindings_with_arguments", + "queue_bindings_with_arguments", arguments); await management.Binding().SourceExchange("exchange_bindings_with_arguments") .DestinationQueue("queue_bindings_with_arguments") .Key("key").Arguments(arguments).Unbind(); - SystemUtils.WaitUntil(() => - !SystemUtils.ArgsBindsBetweenExchangeAndQueueExists("exchange_bindings_with_arguments", - "queue_bindings_with_arguments", arguments)); + await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync( + "exchange_bindings_with_arguments", + "queue_bindings_with_arguments", arguments); await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync("exchange_bindings_with_arguments", "queue_bindings_with_arguments"); @@ -193,32 +193,27 @@ await management.Binding().SourceExchange(source) .Key(key) // single key to use different args .Arguments(specialBind) .Bind(); - SystemUtils.WaitUntil(() => - SystemUtils.ArgsBindsBetweenExchangeAndQueueExists(source, - destination, specialBind)); + + await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(source, destination, specialBind); await management.Binding().SourceExchange(source).DestinationQueue(destination).Key(key).Arguments(specialBind) .Unbind(); - SystemUtils.WaitUntil(() => - !SystemUtils.ArgsBindsBetweenExchangeAndQueueExists(source, - destination, specialBind)); + await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(source, destination, specialBind); for (int i = 0; i < 10; i++) { var b = new Dictionary() { { $"是英国v_{i}", $"p_{i}" } }; - SystemUtils.WaitUntil(() => - SystemUtils.ArgsBindsBetweenExchangeAndQueueExists(source, - destination, b)); + + await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(source, destination, b); + await management.Binding().SourceExchange(source) .DestinationQueue(destination) .Key(key) // single key to use different args .Arguments(b) .Unbind(); - SystemUtils.WaitUntil(() => - !SystemUtils.ArgsBindsBetweenExchangeAndQueueExists(source, - destination, b)); + await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(source, destination, b); } await management.ExchangeDeletion().Delete(source); diff --git a/Tests/Utils.cs b/Tests/Utils.cs index 872325d..6c2aec2 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -63,51 +63,45 @@ public static void Wait(TimeSpan wait) Thread.Sleep(wait); } - private class Connection - { - public string? name { get; set; } - public Dictionary? client_properties { get; set; } - } - public static async Task ConnectionsCountByName(string connectionName) { - using HttpClientHandler handler = new HttpClientHandler(); - handler.Credentials = new NetworkCredential("guest", "guest"); - using HttpClient client = new HttpClient(handler); + int rv = 0; - HttpResponseMessage result = await client.GetAsync("http://localhost:15672/api/connections"); - if (!result.IsSuccessStatusCode) - { - throw new XunitException(string.Format("HTTP GET failed: {0} {1}", result.StatusCode, - result.ReasonPhrase)); - } + var managementUri = new Uri("http://localhost:15672"); + using var managementClient = new ManagementClient(managementUri, "guest", "guest"); - object? obj = await JsonSerializer.DeserializeAsync(await result.Content.ReadAsStreamAsync(), - typeof(IEnumerable)); - return obj switch + IReadOnlyList connections = await managementClient.GetConnectionsAsync(); + if (connections.Count > 0) { - null => 0, - IEnumerable connections => connections.Sum(connection => + rv = connections.Sum(conn => { - if (connection is null) + if (conn is null) { return 0; } else { - if (connection.client_properties is not null && - (connection.client_properties["connection_name"] == connectionName)) - { - return 1; - } - else + if (conn.ClientProperties is not null) { - return 0; + if (conn.ClientProperties.TryGetValue("connection_name", out object? connectionNameObj)) + { + if (connectionNameObj is not null) + { + string connName = (string)connectionNameObj; + if (connName.Equals(connectionName, StringComparison.InvariantCultureIgnoreCase)) + { + return 1; + } + } + } } + + return 0; } - }), - _ => 0 - }; + }); + } + + return rv; } public static async Task IsConnectionOpen(string connectionName) @@ -253,44 +247,24 @@ public static Task WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(string }); } - public static bool ArgsBindsBetweenExchangeAndQueueExists(string exchange, string queue, - Dictionary argumentsIn) + public static Task WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(string exchangeNameStr, string queueNameStr, + Dictionary args) { - Task resp = CreateHttpClient() - .GetAsync( - $"http://localhost:15672/api/bindings/%2F/e/{Uri.EscapeDataString(exchange)}/q/{Uri.EscapeDataString(queue)}"); - resp.Wait(TimeSpan.FromSeconds(10)); - string body = resp.Result.Content.ReadAsStringAsync().Result; - if (body == "[]") + return WaitUntilAsync(() => { - return false; - } + return CheckBindingsBetweenExchangeAndQueueAsync(exchangeNameStr, queueNameStr, + args: args, checkExisting: true); + }); + } - List>? bindingsResults = JsonSerializer.Deserialize>>(body); - if (bindingsResults is not null) + public static Task WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(string exchangeNameStr, string queueNameStr, + Dictionary args) + { + return WaitUntilAsync(() => { - foreach (Dictionary argumentResult in bindingsResults) - { - Dictionary? argumentsResult = JsonSerializer.Deserialize>( - argumentResult["arguments"].ToString() ?? - throw new InvalidOperationException()); - // Check only the key to avoid conversion value problems - // on the test is enough to avoid to put the same key - // at some point we could add keyValuePair.Value == keyValuePairResult.Value - // keyValuePairResult.Value is a json object - if (argumentsResult is not null) - { - IEnumerable results = argumentsResult.Keys.ToArray() - .Intersect(argumentsIn.Keys.ToArray(), StringComparer.OrdinalIgnoreCase); - if (results.Count() == argumentsIn.Count) - { - return true; - } - } - } - } - - return false; + return CheckBindingsBetweenExchangeAndQueueAsync(exchangeNameStr, queueNameStr, + args: args, checkExisting: false); + }); } public static bool BindsBetweenExchangeAndExchangeExists(string sourceExchange, string destinationExchange) @@ -453,7 +427,8 @@ private static async Task CheckQueueAsync(string queueNameStr, bool checkE return rv; } - private static async Task CheckBindingsBetweenExchangeAndQueueAsync(string exchangeNameStr, string queueNameStr, bool checkExisting = true) + private static async Task CheckBindingsBetweenExchangeAndQueueAsync(string exchangeNameStr, string queueNameStr, + Dictionary? args = null, bool checkExisting = true) { // Assume success bool rv = true; @@ -468,16 +443,73 @@ private static async Task CheckBindingsBetweenExchangeAndQueueAsync(string IReadOnlyList bindings = await managementClient.GetQueueBindingsAsync(exchangeName, queueName); if (checkExisting) { - if (bindings.Count == 0) + if (args is not null) { - rv = false; + // We're checking that arguments are equivalent, too + foreach (Binding b in bindings) + { + if (b.Arguments is null) + { + rv = false; + break; + } + + // Check only the key to avoid conversion value problems + // on the test is enough to avoid to put the same key + // at some point we could add keyValuePair.Value == keyValuePairResult.Value + // keyValuePairResult.Value is a json object + IEnumerable results = b.Arguments.Keys.Intersect(args.Keys, StringComparer.OrdinalIgnoreCase); + if (results.Count() == args.Count) + { + rv = true; + break; + } + } + } + else + { + if (bindings.Count == 0) + { + rv = false; + } } } else { - if (bindings.Count > 0) + if (args is not null) { - rv = false; + bool foundMatchingBinding = false; + // We're checking that no bindings have the passed-in args + // So, if we go through all bindings and all args are different, + // we can assume the binding we're checking for is gone + foreach (Binding b in bindings) + { + if (b.Arguments is not null) + { + // Check only the key to avoid conversion value problems + // on the test is enough to avoid to put the same key + // at some point we could add keyValuePair.Value == keyValuePairResult.Value + // keyValuePairResult.Value is a json object + IEnumerable results = b.Arguments.Keys.Intersect(args.Keys, StringComparer.OrdinalIgnoreCase); + if (results.Count() == args.Count) + { + foundMatchingBinding = true; + break; + } + } + } + + if (foundMatchingBinding) + { + rv = false; + } + } + else + { + if (bindings.Count > 0) + { + rv = false; + } } } } From 1c26e248cb2b1d45224f2514ccf15bb05ffa88d7 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 29 Jul 2024 13:06:17 -0700 Subject: [PATCH 06/10] * Migrate exchange-exchange binding checks to `EasyNetQ.Management.Client` --- Tests/BindingsTests.cs | 8 +-- Tests/Utils.cs | 109 ++++++++++++++++++++++------------------- 2 files changed, 61 insertions(+), 56 deletions(-) diff --git a/Tests/BindingsTests.cs b/Tests/BindingsTests.cs index 1c56309..e39f517 100644 --- a/Tests/BindingsTests.cs +++ b/Tests/BindingsTests.cs @@ -98,17 +98,13 @@ await management.Binding().SourceExchange(sourceExchange) await SystemUtils.WaitUntilExchangeExistsAsync(sourceExchange); - SystemUtils.WaitUntil(() => - SystemUtils.BindsBetweenExchangeAndExchangeExists(sourceExchange, - destinationExchange)); + await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeExistAsync(sourceExchange, destinationExchange); await management.Binding().SourceExchange(sourceExchange) .DestinationExchange(destinationExchange) .Key(key).Unbind(); - SystemUtils.WaitUntil(() => - !SystemUtils.BindsBetweenExchangeAndExchangeExists(sourceExchange, - destinationExchange)); + await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeDontExistAsync(sourceExchange, destinationExchange); await management.ExchangeDeletion().Delete(sourceExchange); await management.ExchangeDeletion().Delete(destinationExchange); diff --git a/Tests/Utils.cs b/Tests/Utils.cs index 6c2aec2..00ccaf6 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -4,12 +4,9 @@ using System; using System.Collections.Generic; -using System.IO; using System.Linq; using System.Net; using System.Net.Http; -using System.Reflection; -using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -267,16 +264,21 @@ public static Task WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsyn }); } - public static bool BindsBetweenExchangeAndExchangeExists(string sourceExchange, string destinationExchange) + public static Task WaitUntilBindingsBetweenExchangeAndExchangeExistAsync(string sourceExchangeNameStr, string destinationExchangeNameStr) { - Task resp = CreateHttpClient() - .GetAsync( - $"http://localhost:15672/api/bindings/%2F/e/{Uri.EscapeDataString(sourceExchange)}/e/{Uri.EscapeDataString(destinationExchange)}"); - resp.Wait(TimeSpan.FromSeconds(10)); - string body = resp.Result.Content.ReadAsStringAsync().Result; - return body != "[]" && resp.Result.IsSuccessStatusCode; + return WaitUntilAsync(() => + { + return CheckBindingsBetweenExchangeAndExchangeAsync(sourceExchangeNameStr, destinationExchangeNameStr, checkExisting: true); + }); } + public static Task WaitUntilBindingsBetweenExchangeAndExchangeDontExistAsync(string sourceExchangeNameStr, string destinationExchangeNameStr) + { + return WaitUntilAsync(() => + { + return CheckBindingsBetweenExchangeAndExchangeAsync(sourceExchangeNameStr, destinationExchangeNameStr, checkExisting: false); + }); + } public static int HttpGetQMsgCount(string queue) { @@ -301,46 +303,6 @@ public static int HttpGetQMsgCount(string queue) return obj.TryGetValue("messages_ready", out var value) ? Convert.ToInt32(value.ToString()) : 0; } - public static void HttpPost(string jsonBody, string api) - { - HttpContent content = new StringContent(jsonBody, Encoding.UTF8, "application/json"); - var task = CreateHttpClient().PostAsync($"http://localhost:15672/api/{api}", content); - task.Wait(); - var result = task.Result; - if (!result.IsSuccessStatusCode) - { - throw new XunitException(string.Format("HTTP POST failed: {0} {1}", result.StatusCode, - result.ReasonPhrase)); - } - } - - public static void HttpDeleteQueue(string queue) - { - Task task = CreateHttpClient().DeleteAsync($"http://localhost:15672/api/queues/%2F/{queue}"); - task.Wait(); - HttpResponseMessage result = task.Result; - if (!result.IsSuccessStatusCode && result.StatusCode != HttpStatusCode.NotFound) - { - throw new XunitException($"HTTP DELETE failed: {result.StatusCode} {result.ReasonPhrase}"); - } - } - - public static byte[] GetFileContent(string fileName) - { - var codeBaseUrl = new Uri(Assembly.GetExecutingAssembly().Location); - string codeBasePath = Uri.UnescapeDataString(codeBaseUrl.AbsolutePath); - string? dirPath = Path.GetDirectoryName(codeBasePath); - if (dirPath is null) - { - return []; - } - - string filename = Path.Combine(dirPath, "Resources", fileName); - Task fileTask = File.ReadAllBytesAsync(filename); - fileTask.Wait(TimeSpan.FromSeconds(1)); - return fileTask.Result; - } - private static async Task CheckExchangeAsync(string exchangeNameStr, bool checkExisting = true) { // Assume success @@ -530,5 +492,52 @@ private static async Task CheckBindingsBetweenExchangeAndQueueAsync(string return rv; } + + private static async Task CheckBindingsBetweenExchangeAndExchangeAsync(string sourceExchangeNameStr, string destinationExchangeNameStr, + bool checkExisting = true) + { + // Assume success + bool rv = true; + + var managementUri = new Uri("http://localhost:15672"); + using var managementClient = new ManagementClient(managementUri, "guest", "guest"); + + var sourceExchangeName = new ExchangeName(sourceExchangeNameStr, "/"); + var destinationExchangeName = new ExchangeName(destinationExchangeNameStr, "/"); + try + { + IReadOnlyList bindings = await managementClient.GetExchangeBindingsAsync(sourceExchangeName, destinationExchangeName); + if (checkExisting) + { + if (bindings.Count == 0) + { + rv = false; + } + } + else + { + if (bindings.Count > 0) + { + rv = false; + } + } + } + catch (UnexpectedHttpStatusCodeException ex) + { + if (ex.StatusCode == HttpStatusCode.NotFound) + { + if (checkExisting) + { + rv = false; + } + } + else + { + throw; + } + } + + return rv; + } } } From 74f455f74504636c40e7fa9d17bf70672b2695a2 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 29 Jul 2024 13:15:18 -0700 Subject: [PATCH 07/10] * Move retrieval of queue message count to `EasyNetQ.Management.Client` --- Tests/ConsumerTests.cs | 2 +- Tests/PublisherTests.cs | 25 +++++++++++++++---------- Tests/Utils.cs | 40 ++++++++++++++++++---------------------- 3 files changed, 34 insertions(+), 33 deletions(-) diff --git a/Tests/ConsumerTests.cs b/Tests/ConsumerTests.cs index 47bf2ac..fe900a7 100644 --- a/Tests/ConsumerTests.cs +++ b/Tests/ConsumerTests.cs @@ -82,7 +82,7 @@ await publisher.Publish(new AmqpMessage("Hello world!"), Assert.True(tcs.Task.IsCompleted); await consumer.CloseAsync(); - SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount("ConsumerReQueueMessage") == 0, 500); + await SystemUtils.WaitUntilQueueMessageCount("ConsumerReQueueMessage", 0); await management.QueueDeletion().Delete("ConsumerReQueueMessage"); await connection.CloseAsync(); } diff --git a/Tests/PublisherTests.cs b/Tests/PublisherTests.cs index 4918aa1..18cb1cd 100644 --- a/Tests/PublisherTests.cs +++ b/Tests/PublisherTests.cs @@ -28,7 +28,8 @@ public async Task ValidateBuilderRaiseExceptionIfQueueOrExchangeAreNotSetCorrect [Fact] public async Task RaiseErrorIfQueueDoesNotExist() { - var connection = await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Build()); + IConnection connection = await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Build()); + Assert.Throws(() => connection.PublisherBuilder().Queue("queue_does_not_exist").Build()); @@ -38,13 +39,15 @@ public async Task RaiseErrorIfQueueDoesNotExist() [Fact] public async Task SendAMessageToAQueue() { - var connection = await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Build()); - var management = connection.Management(); + IConnection connection = await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Build()); + IManagement management = connection.Management(); await management.Queue().Name("queue_to_send").Declare(); - var publisher = connection.PublisherBuilder().Queue("queue_to_send").Build(); + IPublisher publisher = connection.PublisherBuilder().Queue("queue_to_send").Build(); await publisher.Publish(new AmqpMessage("Hello wold!"), (message, descriptor) => { Assert.Equal(OutcomeState.Accepted, descriptor.State); }); - SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount("queue_to_send") == 1); + + await SystemUtils.WaitUntilQueueMessageCount("queue_to_send", 1); + Assert.Single(connection.GetPublishers()); await publisher.CloseAsync(); Assert.Empty(connection.GetPublishers()); @@ -56,15 +59,15 @@ await publisher.Publish(new AmqpMessage("Hello wold!"), [Fact] public async Task ValidatePublishersCount() { - var connection = await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Build()); - var management = connection.Management(); + IConnection connection = await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Build()); + IManagement management = connection.Management(); await management.Queue().Name("queue_publishers_count").Declare(); TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); int received = 0; for (int i = 1; i <= 10; i++) { - var publisher = connection.PublisherBuilder().Queue("queue_publishers_count").Build(); + IPublisher publisher = connection.PublisherBuilder().Queue("queue_publishers_count").Build(); await publisher.Publish(new AmqpMessage("Hello wold!"), (message, descriptor) => { @@ -78,7 +81,7 @@ await publisher.Publish(new AmqpMessage("Hello wold!"), } await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - foreach (var publisher in connection.GetPublishers()) + foreach (IPublisher publisher in connection.GetPublishers()) { await publisher.CloseAsync(); } @@ -100,7 +103,9 @@ await management.Binding().SourceExchange("exchange_to_send").DestinationQueue(" IPublisher publisher = connection.PublisherBuilder().Exchange("exchange_to_send").Key("key").Build(); await publisher.Publish(new AmqpMessage("Hello wold!"), (message, descriptor) => { Assert.Equal(OutcomeState.Accepted, descriptor.State); }); - SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount("queue_to_send_1") == 1); + + await SystemUtils.WaitUntilQueueMessageCount("queue_to_send_1", 1); + Assert.Single(connection.GetPublishers()); await publisher.CloseAsync(); Assert.Empty(connection.GetPublishers()); diff --git a/Tests/Utils.cs b/Tests/Utils.cs index 00ccaf6..6567a15 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -7,7 +7,6 @@ using System.Linq; using System.Net; using System.Net.Http; -using System.Text.Json; using System.Threading; using System.Threading.Tasks; using EasyNetQ.Management.Client; @@ -18,6 +17,9 @@ namespace Tests { public static class SystemUtils { + private static readonly TimeSpan s_initialDelaySpan = TimeSpan.FromMilliseconds(100); + private static readonly TimeSpan s_delaySpan = TimeSpan.FromMilliseconds(500); + // Waits for 10 seconds total by default public static void WaitUntil(Func func, ushort retries = 40) { @@ -34,13 +36,11 @@ public static void WaitUntil(Func func, ushort retries = 40) public static async Task WaitUntilAsync(Func> func, ushort retries = 10) { - var delaySpan = TimeSpan.FromMilliseconds(500); - - await Task.Delay(delaySpan); + await Task.Delay(s_initialDelaySpan); while (!await func()) { - await Task.Delay(delaySpan); + await Task.Delay(s_delaySpan); --retries; if (retries == 0) @@ -280,27 +280,23 @@ public static Task WaitUntilBindingsBetweenExchangeAndExchangeDontExistAsync(str }); } - public static int HttpGetQMsgCount(string queue) + public static Task WaitUntilQueueMessageCount(string queueNameStr, long messageCount) { - var task = CreateHttpClient() - .GetAsync($"http://localhost:15672/api/queues/%2F/{Uri.EscapeDataString(queue)}"); - task.Wait(TimeSpan.FromSeconds(10)); - var result = task.Result; - if (!result.IsSuccessStatusCode) + return WaitUntilAsync(async () => { - throw new XunitException($"HTTP GET failed: {result.StatusCode} {result.ReasonPhrase}"); - } + long queueMessageCount = await GetQueueMessageCountAsync(queueNameStr); + return messageCount == queueMessageCount; + }); + } - var responseBody = result.Content.ReadAsStringAsync(); - responseBody.Wait(TimeSpan.FromSeconds(10)); - string json = responseBody.Result; - var obj = JsonSerializer.Deserialize>(json); - if (obj == null) - { - return 0; - } + private static async Task GetQueueMessageCountAsync(string queueNameStr) + { + var managementUri = new Uri("http://localhost:15672"); + using var managementClient = new ManagementClient(managementUri, "guest", "guest"); - return obj.TryGetValue("messages_ready", out var value) ? Convert.ToInt32(value.ToString()) : 0; + var queueName = new QueueName(queueNameStr, "/"); + Queue queue = await managementClient.GetQueueAsync(queueName); + return queue.MessagesReady; } private static async Task CheckExchangeAsync(string exchangeNameStr, bool checkExisting = true) From 4d31b4cdc85440d84e8401db6c5a2bf4ed6f0867 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 29 Jul 2024 13:32:44 -0700 Subject: [PATCH 08/10] * Finish conversion to `EasyNetQ.Management.Client` --- Tests/ConnectionRecoveryTests.cs | 8 +- Tests/EnvironmentTests.cs | 18 +- Tests/PublisherConsumerRecoveryTests.cs | 20 +- Tests/Utils.cs | 234 ++++++++++-------------- 4 files changed, 119 insertions(+), 161 deletions(-) diff --git a/Tests/ConnectionRecoveryTests.cs b/Tests/ConnectionRecoveryTests.cs index 2e4969e..452cbd2 100644 --- a/Tests/ConnectionRecoveryTests.cs +++ b/Tests/ConnectionRecoveryTests.cs @@ -125,7 +125,7 @@ public async Task UnexpectedCloseTheStatusShouldBeCorrectAndErrorNotNull() Assert.Equal(State.Open, connection.State); await SystemUtils.WaitUntilConnectionIsKilled(connectionName); resetEvent.WaitOne(TimeSpan.FromSeconds(5)); - SystemUtils.WaitUntil(() => (listFromStatus.Count >= 2)); + await SystemUtils.WaitUntilFuncAsync(() => (listFromStatus.Count >= 2)); Assert.Equal(State.Open, listFromStatus[0]); Assert.Equal(State.Reconnecting, listToStatus[0]); Assert.NotNull(listError[0]); @@ -136,7 +136,7 @@ public async Task UnexpectedCloseTheStatusShouldBeCorrectAndErrorNotNull() resetEvent.Set(); await connection.CloseAsync(); resetEvent.WaitOne(TimeSpan.FromSeconds(5)); - SystemUtils.WaitUntil(() => (listFromStatus.Count >= 4)); + await SystemUtils.WaitUntilFuncAsync(() => (listFromStatus.Count >= 4)); Assert.Equal(State.Open, listFromStatus[2]); Assert.Equal(State.Closing, listToStatus[2]); @@ -183,7 +183,7 @@ public async Task OverrideTheBackOffWithBackOffDisabled() Assert.Equal(State.Open, connection.State); await SystemUtils.WaitUntilConnectionIsKilled(connectionName); resetEvent.WaitOne(TimeSpan.FromSeconds(5)); - SystemUtils.WaitUntil(() => (listFromStatus.Count >= 2)); + await SystemUtils.WaitUntilFuncAsync(() => (listFromStatus.Count >= 2)); Assert.Equal(State.Open, listFromStatus[0]); Assert.Equal(State.Reconnecting, listToStatus[0]); Assert.NotNull(listError[0]); @@ -231,7 +231,7 @@ public async Task RecoveryTopologyShouldRecoverTheTempQueues() await SystemUtils.WaitUntilConnectionIsKilled(connectionName); await completion.Task.WaitAsync(TimeSpan.FromSeconds(10)); - SystemUtils.WaitUntil(() => recoveryEvents == 2); + await SystemUtils.WaitUntilFuncAsync(() => recoveryEvents == 2); await SystemUtils.WaitUntilQueueExistsAsync(queueName); diff --git a/Tests/EnvironmentTests.cs b/Tests/EnvironmentTests.cs index 1518123..db40d25 100644 --- a/Tests/EnvironmentTests.cs +++ b/Tests/EnvironmentTests.cs @@ -29,7 +29,7 @@ public async Task CreateMoreConnectionsWithDifferentParametersEnvironment() IConnection connection = await env.CreateConnectionAsync(); Assert.NotNull(connection); - await SystemUtils.WaitUntilAsync(async () => await SystemUtils.IsConnectionOpen(envConnectionName)); + await SystemUtils.WaitUntilConnectionIsOpen(envConnectionName); Assert.NotEmpty(env.GetConnections()); Assert.Single(env.GetConnections()); @@ -39,7 +39,7 @@ public async Task CreateMoreConnectionsWithDifferentParametersEnvironment() ConnectionSettingBuilder.Create().ConnectionName(envConnectionName2).Build()); Assert.NotNull(connection2); Assert.Equal(2, env.GetConnections().Count); - await SystemUtils.WaitUntilAsync(async () => await SystemUtils.IsConnectionOpen(envConnectionName2)); + await SystemUtils.WaitUntilConnectionIsOpen(envConnectionName2); await env.CloseAsync(); Assert.Equal(State.Closed, connection.State); @@ -54,7 +54,7 @@ public async Task CloseConnectionsIndividually() IEnvironment env = await AmqpEnvironment.CreateAsync( ConnectionSettingBuilder.Create().ConnectionName(envConnectionName).Build()); IConnection connection = await env.CreateConnectionAsync(); - await SystemUtils.WaitUntilAsync(async () => await SystemUtils.IsConnectionOpen(envConnectionName)); + await SystemUtils.WaitUntilConnectionIsOpen(envConnectionName); Assert.Single(env.GetConnections()); Assert.Equal(1, env.GetConnections()[0].Id); @@ -64,32 +64,30 @@ public async Task CloseConnectionsIndividually() ConnectionSettingBuilder.Create().ConnectionName(envConnectionName2).Build()); Assert.Equal(2, env.GetConnections().Count); Assert.Equal(2, env.GetConnections()[1].Id); - await SystemUtils.WaitUntilAsync(async () => await SystemUtils.IsConnectionOpen(envConnectionName2)); - + await SystemUtils.WaitUntilConnectionIsOpen(envConnectionName2); string envConnectionName3 = "EnvironmentConnection3_" + Guid.NewGuid().ToString(); IConnection connection3 = await env.CreateConnectionAsync( ConnectionSettingBuilder.Create().ConnectionName(envConnectionName3).Build()); Assert.Equal(3, env.GetConnections().Count); Assert.Equal(3, env.GetConnections()[2].Id); - await SystemUtils.WaitUntilAsync(async () => await SystemUtils.IsConnectionOpen(envConnectionName3)); - + await SystemUtils.WaitUntilConnectionIsOpen(envConnectionName3); // closing await connection.CloseAsync(); Assert.Equal(State.Closed, connection.State); Assert.Equal(2, env.GetConnections().Count); - await SystemUtils.WaitUntilAsync(async () => await SystemUtils.IsConnectionOpen(envConnectionName) == false); + await SystemUtils.WaitUntilConnectionIsClosed(envConnectionName); await connection2.CloseAsync(); Assert.Equal(State.Closed, connection2.State); Assert.Single(env.GetConnections()); - await SystemUtils.WaitUntilAsync(async () => await SystemUtils.IsConnectionOpen(envConnectionName2) == false); + await SystemUtils.WaitUntilConnectionIsClosed(envConnectionName2); await connection3.CloseAsync(); Assert.Equal(State.Closed, connection3.State); - await SystemUtils.WaitUntilAsync(async () => await SystemUtils.IsConnectionOpen(envConnectionName3) == false); + await SystemUtils.WaitUntilConnectionIsClosed(envConnectionName3); Assert.Empty(env.GetConnections()); diff --git a/Tests/PublisherConsumerRecoveryTests.cs b/Tests/PublisherConsumerRecoveryTests.cs index 7e34f87..1b00272 100644 --- a/Tests/PublisherConsumerRecoveryTests.cs +++ b/Tests/PublisherConsumerRecoveryTests.cs @@ -81,7 +81,7 @@ public async Task ProducerShouldChangeStatusWhenConnectionIsKilled() await SystemUtils.WaitUntilConnectionIsKilledAndOpen(connectionName); - SystemUtils.WaitUntil(() => publisher.State == State.Open); + await SystemUtils.WaitUntilFuncAsync(() => publisher.State == State.Open); Assert.Equal(State.Open, publisher.State); await publisher.CloseAsync(); @@ -117,7 +117,7 @@ public async Task ConsumerShouldChangeStatusWhenConnectionIsKilled() await SystemUtils.WaitUntilConnectionIsKilledAndOpen(connectionName); - SystemUtils.WaitUntil(() => consumer.State == State.Open); + await SystemUtils.WaitUntilFuncAsync(() => consumer.State == State.Open); Assert.Equal(State.Open, consumer.State); await consumer.CloseAsync(); @@ -176,13 +176,13 @@ await publisher.Publish(new AmqpMessage("Hello World"), }); } - SystemUtils.WaitUntil(() => messagesConfirmed == 10); - SystemUtils.WaitUntil(() => messagesReceived == 10); + await SystemUtils.WaitUntilFuncAsync(() => messagesConfirmed == 10); + await SystemUtils.WaitUntilFuncAsync(() => messagesReceived == 10); await SystemUtils.WaitUntilConnectionIsKilledAndOpen(connectionName); - SystemUtils.WaitUntil(() => publisher.State == State.Open); - SystemUtils.WaitUntil(() => consumer.State == State.Open); + await SystemUtils.WaitUntilFuncAsync(() => publisher.State == State.Open); + await SystemUtils.WaitUntilFuncAsync(() => consumer.State == State.Open); for (int i = 0; i < 10; i++) { @@ -194,7 +194,7 @@ await publisher.Publish(new AmqpMessage("Hello World"), }); } - SystemUtils.WaitUntil(() => messagesConfirmed == 20); + await SystemUtils.WaitUntilFuncAsync(() => messagesConfirmed == 20); Assert.Equal(State.Open, publisher.State); await publisher.CloseAsync(); @@ -206,7 +206,7 @@ await publisher.Publish(new AmqpMessage("Hello World"), await connection.Management().QueueDeletion() .Delete("PublishShouldRestartPublishConsumerShouldRestartConsumeWhenConnectionIsKilled"); await connection.CloseAsync(); - SystemUtils.WaitUntil(() => messagesReceived == 20); + await SystemUtils.WaitUntilFuncAsync(() => messagesReceived == 20); } /// @@ -251,9 +251,9 @@ await connection.Management().Queue().Name("PublisherAndConsumerShouldNotRestart Assert.Equal(State.Open, consumer.State); await SystemUtils.WaitUntilConnectionIsKilled(connectionName); + await SystemUtils.WaitUntilFuncAsync(() => publisher.State == State.Closed); + await SystemUtils.WaitUntilFuncAsync(() => consumer.State == State.Closed); - SystemUtils.WaitUntil(() => publisher.State == State.Closed); - SystemUtils.WaitUntil(() => consumer.State == State.Closed); Assert.Equal(State.Closed, connection.State); Assert.Equal(State.Closed, connection.Management().State); Assert.DoesNotContain((State.Open, State.Closing), statesProducer); diff --git a/Tests/Utils.cs b/Tests/Utils.cs index 6567a15..704e7a8 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -6,8 +6,6 @@ using System.Collections.Generic; using System.Linq; using System.Net; -using System.Net.Http; -using System.Threading; using System.Threading.Tasks; using EasyNetQ.Management.Client; using EasyNetQ.Management.Client.Model; @@ -18,14 +16,17 @@ namespace Tests public static class SystemUtils { private static readonly TimeSpan s_initialDelaySpan = TimeSpan.FromMilliseconds(100); + private static readonly TimeSpan s_shortDelaySpan = TimeSpan.FromMilliseconds(250); private static readonly TimeSpan s_delaySpan = TimeSpan.FromMilliseconds(500); - // Waits for 10 seconds total by default - public static void WaitUntil(Func func, ushort retries = 40) + public static async Task WaitUntilFuncAsync(Func func, ushort retries = 40) { - while (!func()) + await Task.Delay(s_initialDelaySpan); + + while (false == func()) { - Wait(TimeSpan.FromMilliseconds(250)); + await Task.Delay(s_shortDelaySpan); + --retries; if (retries == 0) { @@ -38,7 +39,7 @@ public static async Task WaitUntilAsync(Func> func, ushort retries = { await Task.Delay(s_initialDelaySpan); - while (!await func()) + while (false == await func()) { await Task.Delay(s_delaySpan); @@ -50,150 +51,27 @@ public static async Task WaitUntilAsync(Func> func, ushort retries = } } - public static void Wait() - { - Thread.Sleep(TimeSpan.FromMilliseconds(500)); - } - - public static void Wait(TimeSpan wait) + public static Task WaitUntilConnectionIsOpen(string connectionName) { - Thread.Sleep(wait); + return WaitUntilAsync(() => CheckConnectionAsync(connectionName, checkOpened: true)); } - public static async Task ConnectionsCountByName(string connectionName) + public static Task WaitUntilConnectionIsClosed(string connectionName) { - int rv = 0; - - var managementUri = new Uri("http://localhost:15672"); - using var managementClient = new ManagementClient(managementUri, "guest", "guest"); - - IReadOnlyList connections = await managementClient.GetConnectionsAsync(); - if (connections.Count > 0) - { - rv = connections.Sum(conn => - { - if (conn is null) - { - return 0; - } - else - { - if (conn.ClientProperties is not null) - { - if (conn.ClientProperties.TryGetValue("connection_name", out object? connectionNameObj)) - { - if (connectionNameObj is not null) - { - string connName = (string)connectionNameObj; - if (connName.Equals(connectionName, StringComparison.InvariantCultureIgnoreCase)) - { - return 1; - } - } - } - } - - return 0; - } - }); - } - - return rv; - } - - public static async Task IsConnectionOpen(string connectionName) - { - var managementUri = new Uri("http://localhost:15672"); - using var managementClient = new ManagementClient(managementUri, "guest", "guest"); - - IReadOnlyList connections = await managementClient.GetConnectionsAsync(); - - return connections.Any(conn => - { - if (conn.ClientProperties is not null) - { - if (conn.ClientProperties.TryGetValue("connection_name", out object? connectionNameObj)) - { - if (connectionNameObj is not null) - { - string connName = (string)connectionNameObj; - return connName.Contains(connectionName); - } - } - } - - return false; - }); - } - - public static async Task KillConnectionAsync(string connectionName) - { - var managementUri = new Uri("http://localhost:15672"); - using var managementClient = new ManagementClient(managementUri, "guest", "guest"); - - IReadOnlyList connections = await managementClient.GetConnectionsAsync(); - - // we kill _only_ producer and consumer connections - // leave the locator up and running to delete the stream - IEnumerable filteredConnections = connections.Where(conn => - { - if (conn.ClientProperties is not null) - { - if (conn.ClientProperties.TryGetValue("connection_name", out object? connectionNameObj)) - { - if (connectionNameObj is not null) - { - string connName = (string)connectionNameObj; - return connName.Contains(connectionName); - } - } - } - - return false; - }); - - int killed = 0; - foreach (EasyNetQ.Management.Client.Model.Connection conn in filteredConnections) - { - try - { - await managementClient.CloseConnectionAsync(conn); - } - catch (UnexpectedHttpStatusCodeException ex) - { - if (ex.StatusCode != HttpStatusCode.NotFound) - { - throw; - } - } - - killed += 1; - } - - return killed; + return WaitUntilAsync(() => CheckConnectionAsync(connectionName, checkOpened: false)); } public static async Task WaitUntilConnectionIsKilled(string connectionName) { - await WaitUntilAsync(async () => await IsConnectionOpen(connectionName)); - Wait(); + await WaitUntilConnectionIsOpen(connectionName); await WaitUntilAsync(async () => await KillConnectionAsync(connectionName) == 1); } public static async Task WaitUntilConnectionIsKilledAndOpen(string connectionName) { - await WaitUntilAsync(async () => await IsConnectionOpen(connectionName)); - Wait(); + await WaitUntilConnectionIsOpen(connectionName); await WaitUntilAsync(async () => await KillConnectionAsync(connectionName) == 1); - Wait(); - await WaitUntilAsync(async () => await IsConnectionOpen(connectionName)); - Wait(); - } - - private static HttpClient CreateHttpClient() - { - var handler = new HttpClientHandler { Credentials = new NetworkCredential("guest", "guest"), }; - return new HttpClient(handler); + await WaitUntilConnectionIsOpen(connectionName); } public static Task WaitUntilQueueExistsAsync(string queueNameStr) @@ -289,6 +167,88 @@ public static Task WaitUntilQueueMessageCount(string queueNameStr, long messageC }); } + private static async Task CheckConnectionAsync(string connectionName, bool checkOpened = true) + { + bool rv = true; + + var managementUri = new Uri("http://localhost:15672"); + using var managementClient = new ManagementClient(managementUri, "guest", "guest"); + + IReadOnlyList connections = await managementClient.GetConnectionsAsync(); + rv = connections.Any(conn => + { + if (conn.ClientProperties is not null) + { + if (conn.ClientProperties.TryGetValue("connection_name", out object? connectionNameObj)) + { + if (connectionNameObj is not null) + { + string connName = (string)connectionNameObj; + return connName.Contains(connectionName); + } + } + } + + return false; + }); + + if (false == checkOpened) + { + return !rv; + } + else + { + return rv; + } + } + + private static async Task KillConnectionAsync(string connectionName) + { + var managementUri = new Uri("http://localhost:15672"); + using var managementClient = new ManagementClient(managementUri, "guest", "guest"); + + IReadOnlyList connections = await managementClient.GetConnectionsAsync(); + + // we kill _only_ producer and consumer connections + // leave the locator up and running to delete the stream + IEnumerable filteredConnections = connections.Where(conn => + { + if (conn.ClientProperties is not null) + { + if (conn.ClientProperties.TryGetValue("connection_name", out object? connectionNameObj)) + { + if (connectionNameObj is not null) + { + string connName = (string)connectionNameObj; + return connName.Contains(connectionName); + } + } + } + + return false; + }); + + int killed = 0; + foreach (Connection conn in filteredConnections) + { + try + { + await managementClient.CloseConnectionAsync(conn); + } + catch (UnexpectedHttpStatusCodeException ex) + { + if (ex.StatusCode != HttpStatusCode.NotFound) + { + throw; + } + } + + killed += 1; + } + + return killed; + } + private static async Task GetQueueMessageCountAsync(string queueNameStr) { var managementUri = new Uri("http://localhost:15672"); From 0df6254b231f000a04d668b58c9a2d314ab3235d Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 29 Jul 2024 13:39:03 -0700 Subject: [PATCH 09/10] * Extend retries to accomodate GHA --- Tests/Utils.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/Utils.cs b/Tests/Utils.cs index 704e7a8..be2f2de 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -164,7 +164,7 @@ public static Task WaitUntilQueueMessageCount(string queueNameStr, long messageC { long queueMessageCount = await GetQueueMessageCountAsync(queueNameStr); return messageCount == queueMessageCount; - }); + }, retries: 20); } private static async Task CheckConnectionAsync(string connectionName, bool checkOpened = true) From 68980eca0ca94ee5abbe31d1ce0901715a907d12 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 29 Jul 2024 14:55:00 -0700 Subject: [PATCH 10/10] * Double the retries in CI --- Tests/TlsConnectionTests.cs | 22 +- Tests/Utils.cs | 701 +++++++++++++++++++----------------- 2 files changed, 365 insertions(+), 358 deletions(-) diff --git a/Tests/TlsConnectionTests.cs b/Tests/TlsConnectionTests.cs index ac99b3d..02da428 100644 --- a/Tests/TlsConnectionTests.cs +++ b/Tests/TlsConnectionTests.cs @@ -17,7 +17,7 @@ public class TlsConnectionTests : IAsyncLifetime private readonly ITestOutputHelper _output; private readonly Uri _managementUri = new("http://localhost:15672"); private readonly ManagementClient _managementClient; - private readonly bool _isRunningInCI = InitIsRunningInCI(); + private readonly bool _isRunningInCI = SystemUtils.IsRunningInCI; public TlsConnectionTests(ITestOutputHelper output) { @@ -140,24 +140,4 @@ private static string GetClientCertFile() Assert.True(File.Exists(clientCertFile)); return clientCertFile; } - - private static bool InitIsRunningInCI() - { - if (bool.TryParse(Environment.GetEnvironmentVariable("CI"), out bool ci)) - { - if (ci == true) - { - return true; - } - } - else if (bool.TryParse(Environment.GetEnvironmentVariable("GITHUB_ACTIONS"), out ci)) - { - if (ci == true) - { - return true; - } - } - - return false; - } } diff --git a/Tests/Utils.cs b/Tests/Utils.cs index be2f2de..6feade3 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -11,367 +11,407 @@ using EasyNetQ.Management.Client.Model; using Xunit.Sdk; -namespace Tests +namespace Tests; + +public static class SystemUtils { - public static class SystemUtils + private static readonly bool s_isRunningInCI = InitIsRunningInCI(); + private static readonly TimeSpan s_initialDelaySpan = TimeSpan.FromMilliseconds(100); + private static readonly TimeSpan s_shortDelaySpan = TimeSpan.FromMilliseconds(250); + private static readonly TimeSpan s_delaySpan = TimeSpan.FromMilliseconds(500); + + public static bool IsRunningInCI => s_isRunningInCI; + + public static async Task WaitUntilFuncAsync(Func func, ushort retries = 40) { - private static readonly TimeSpan s_initialDelaySpan = TimeSpan.FromMilliseconds(100); - private static readonly TimeSpan s_shortDelaySpan = TimeSpan.FromMilliseconds(250); - private static readonly TimeSpan s_delaySpan = TimeSpan.FromMilliseconds(500); + await Task.Delay(s_initialDelaySpan); - public static async Task WaitUntilFuncAsync(Func func, ushort retries = 40) + while (false == func()) { - await Task.Delay(s_initialDelaySpan); + await Task.Delay(s_shortDelaySpan); - while (false == func()) + --retries; + if (retries == 0) { - await Task.Delay(s_shortDelaySpan); - - --retries; - if (retries == 0) - { - throw new XunitException("timed out waiting on a condition!"); - } + throw new XunitException("timed out waiting on a condition!"); } } + } + + public static async Task WaitUntilAsync(Func> func, ushort retries = 10) + { + if (s_isRunningInCI) + { + retries *= 2; + } + + await Task.Delay(s_initialDelaySpan); - public static async Task WaitUntilAsync(Func> func, ushort retries = 10) + while (false == await func()) { - await Task.Delay(s_initialDelaySpan); + await Task.Delay(s_delaySpan); - while (false == await func()) + --retries; + if (retries == 0) { - await Task.Delay(s_delaySpan); - - --retries; - if (retries == 0) - { - throw new XunitException("timed out waiting on a condition!"); - } + throw new XunitException("timed out waiting on a condition!"); } } + } - public static Task WaitUntilConnectionIsOpen(string connectionName) - { - return WaitUntilAsync(() => CheckConnectionAsync(connectionName, checkOpened: true)); - } + public static Task WaitUntilConnectionIsOpen(string connectionName) + { + return WaitUntilAsync(() => CheckConnectionAsync(connectionName, checkOpened: true)); + } - public static Task WaitUntilConnectionIsClosed(string connectionName) - { - return WaitUntilAsync(() => CheckConnectionAsync(connectionName, checkOpened: false)); - } + public static Task WaitUntilConnectionIsClosed(string connectionName) + { + return WaitUntilAsync(() => CheckConnectionAsync(connectionName, checkOpened: false)); + } - public static async Task WaitUntilConnectionIsKilled(string connectionName) - { - await WaitUntilConnectionIsOpen(connectionName); - await WaitUntilAsync(async () => await KillConnectionAsync(connectionName) == 1); - } + public static async Task WaitUntilConnectionIsKilled(string connectionName) + { + await WaitUntilConnectionIsOpen(connectionName); + await WaitUntilAsync(async () => await KillConnectionAsync(connectionName) == 1); + } - public static async Task WaitUntilConnectionIsKilledAndOpen(string connectionName) - { - await WaitUntilConnectionIsOpen(connectionName); - await WaitUntilAsync(async () => await KillConnectionAsync(connectionName) == 1); - await WaitUntilConnectionIsOpen(connectionName); - } + public static async Task WaitUntilConnectionIsKilledAndOpen(string connectionName) + { + await WaitUntilConnectionIsOpen(connectionName); + await WaitUntilAsync(async () => await KillConnectionAsync(connectionName) == 1); + await WaitUntilConnectionIsOpen(connectionName); + } - public static Task WaitUntilQueueExistsAsync(string queueNameStr) + public static Task WaitUntilQueueExistsAsync(string queueNameStr) + { + return WaitUntilAsync(() => { - return WaitUntilAsync(() => - { - return CheckQueueAsync(queueNameStr, checkExisting: true); - }); - } + return CheckQueueAsync(queueNameStr, checkExisting: true); + }); + } - public static Task WaitUntilQueueDeletedAsync(string queueNameStr) + public static Task WaitUntilQueueDeletedAsync(string queueNameStr) + { + return WaitUntilAsync(() => { - return WaitUntilAsync(() => - { - return CheckQueueAsync(queueNameStr, checkExisting: false); - }); - } + return CheckQueueAsync(queueNameStr, checkExisting: false); + }); + } - public static Task WaitUntilExchangeExistsAsync(string exchangeNameStr) + public static Task WaitUntilExchangeExistsAsync(string exchangeNameStr) + { + return WaitUntilAsync(() => { - return WaitUntilAsync(() => - { - return CheckExchangeAsync(exchangeNameStr, checkExisting: true); - }); - } + return CheckExchangeAsync(exchangeNameStr, checkExisting: true); + }); + } - public static Task WaitUntilExchangeDeletedAsync(string exchangeNameStr) + public static Task WaitUntilExchangeDeletedAsync(string exchangeNameStr) + { + return WaitUntilAsync(() => { - return WaitUntilAsync(() => - { - return CheckExchangeAsync(exchangeNameStr, checkExisting: false); - }); - } + return CheckExchangeAsync(exchangeNameStr, checkExisting: false); + }); + } - public static Task WaitUntilBindingsBetweenExchangeAndQueueExistAsync(string exchangeNameStr, string queueNameStr) + public static Task WaitUntilBindingsBetweenExchangeAndQueueExistAsync(string exchangeNameStr, string queueNameStr) + { + return WaitUntilAsync(() => { - return WaitUntilAsync(() => - { - return CheckBindingsBetweenExchangeAndQueueAsync(exchangeNameStr, queueNameStr, checkExisting: true); - }); - } + return CheckBindingsBetweenExchangeAndQueueAsync(exchangeNameStr, queueNameStr, checkExisting: true); + }); + } - public static Task WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(string exchangeNameStr, string queueNameStr) + public static Task WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(string exchangeNameStr, string queueNameStr) + { + return WaitUntilAsync(() => { - return WaitUntilAsync(() => - { - return CheckBindingsBetweenExchangeAndQueueAsync(exchangeNameStr, queueNameStr, checkExisting: false); - }); - } + return CheckBindingsBetweenExchangeAndQueueAsync(exchangeNameStr, queueNameStr, checkExisting: false); + }); + } - public static Task WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(string exchangeNameStr, string queueNameStr, - Dictionary args) + public static Task WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(string exchangeNameStr, string queueNameStr, + Dictionary args) + { + return WaitUntilAsync(() => { - return WaitUntilAsync(() => - { - return CheckBindingsBetweenExchangeAndQueueAsync(exchangeNameStr, queueNameStr, - args: args, checkExisting: true); - }); - } + return CheckBindingsBetweenExchangeAndQueueAsync(exchangeNameStr, queueNameStr, + args: args, checkExisting: true); + }); + } - public static Task WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(string exchangeNameStr, string queueNameStr, - Dictionary args) + public static Task WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(string exchangeNameStr, string queueNameStr, + Dictionary args) + { + return WaitUntilAsync(() => { - return WaitUntilAsync(() => - { - return CheckBindingsBetweenExchangeAndQueueAsync(exchangeNameStr, queueNameStr, - args: args, checkExisting: false); - }); - } + return CheckBindingsBetweenExchangeAndQueueAsync(exchangeNameStr, queueNameStr, + args: args, checkExisting: false); + }); + } - public static Task WaitUntilBindingsBetweenExchangeAndExchangeExistAsync(string sourceExchangeNameStr, string destinationExchangeNameStr) + public static Task WaitUntilBindingsBetweenExchangeAndExchangeExistAsync(string sourceExchangeNameStr, string destinationExchangeNameStr) + { + return WaitUntilAsync(() => { - return WaitUntilAsync(() => - { - return CheckBindingsBetweenExchangeAndExchangeAsync(sourceExchangeNameStr, destinationExchangeNameStr, checkExisting: true); - }); - } + return CheckBindingsBetweenExchangeAndExchangeAsync(sourceExchangeNameStr, destinationExchangeNameStr, checkExisting: true); + }); + } - public static Task WaitUntilBindingsBetweenExchangeAndExchangeDontExistAsync(string sourceExchangeNameStr, string destinationExchangeNameStr) + public static Task WaitUntilBindingsBetweenExchangeAndExchangeDontExistAsync(string sourceExchangeNameStr, string destinationExchangeNameStr) + { + return WaitUntilAsync(() => { - return WaitUntilAsync(() => - { - return CheckBindingsBetweenExchangeAndExchangeAsync(sourceExchangeNameStr, destinationExchangeNameStr, checkExisting: false); - }); - } + return CheckBindingsBetweenExchangeAndExchangeAsync(sourceExchangeNameStr, destinationExchangeNameStr, checkExisting: false); + }); + } - public static Task WaitUntilQueueMessageCount(string queueNameStr, long messageCount) + public static Task WaitUntilQueueMessageCount(string queueNameStr, long messageCount) + { + return WaitUntilAsync(async () => { - return WaitUntilAsync(async () => - { - long queueMessageCount = await GetQueueMessageCountAsync(queueNameStr); - return messageCount == queueMessageCount; - }, retries: 20); - } + long queueMessageCount = await GetQueueMessageCountAsync(queueNameStr); + return messageCount == queueMessageCount; + }, retries: 20); + } - private static async Task CheckConnectionAsync(string connectionName, bool checkOpened = true) - { - bool rv = true; + private static async Task CheckConnectionAsync(string connectionName, bool checkOpened = true) + { + bool rv = true; - var managementUri = new Uri("http://localhost:15672"); - using var managementClient = new ManagementClient(managementUri, "guest", "guest"); + var managementUri = new Uri("http://localhost:15672"); + using var managementClient = new ManagementClient(managementUri, "guest", "guest"); - IReadOnlyList connections = await managementClient.GetConnectionsAsync(); - rv = connections.Any(conn => + IReadOnlyList connections = await managementClient.GetConnectionsAsync(); + rv = connections.Any(conn => + { + if (conn.ClientProperties is not null) { - if (conn.ClientProperties is not null) + if (conn.ClientProperties.TryGetValue("connection_name", out object? connectionNameObj)) { - if (conn.ClientProperties.TryGetValue("connection_name", out object? connectionNameObj)) + if (connectionNameObj is not null) { - if (connectionNameObj is not null) - { - string connName = (string)connectionNameObj; - return connName.Contains(connectionName); - } + string connName = (string)connectionNameObj; + return connName.Contains(connectionName); } } + } - return false; - }); + return false; + }); - if (false == checkOpened) - { - return !rv; - } - else - { - return rv; - } + if (false == checkOpened) + { + return !rv; } - - private static async Task KillConnectionAsync(string connectionName) + else { - var managementUri = new Uri("http://localhost:15672"); - using var managementClient = new ManagementClient(managementUri, "guest", "guest"); + return rv; + } + } - IReadOnlyList connections = await managementClient.GetConnectionsAsync(); + private static async Task KillConnectionAsync(string connectionName) + { + var managementUri = new Uri("http://localhost:15672"); + using var managementClient = new ManagementClient(managementUri, "guest", "guest"); - // we kill _only_ producer and consumer connections - // leave the locator up and running to delete the stream - IEnumerable filteredConnections = connections.Where(conn => + IReadOnlyList connections = await managementClient.GetConnectionsAsync(); + + // we kill _only_ producer and consumer connections + // leave the locator up and running to delete the stream + IEnumerable filteredConnections = connections.Where(conn => + { + if (conn.ClientProperties is not null) { - if (conn.ClientProperties is not null) + if (conn.ClientProperties.TryGetValue("connection_name", out object? connectionNameObj)) { - if (conn.ClientProperties.TryGetValue("connection_name", out object? connectionNameObj)) + if (connectionNameObj is not null) { - if (connectionNameObj is not null) - { - string connName = (string)connectionNameObj; - return connName.Contains(connectionName); - } + string connName = (string)connectionNameObj; + return connName.Contains(connectionName); } } + } - return false; - }); + return false; + }); - int killed = 0; - foreach (Connection conn in filteredConnections) + int killed = 0; + foreach (Connection conn in filteredConnections) + { + try { - try - { - await managementClient.CloseConnectionAsync(conn); - } - catch (UnexpectedHttpStatusCodeException ex) + await managementClient.CloseConnectionAsync(conn); + } + catch (UnexpectedHttpStatusCodeException ex) + { + if (ex.StatusCode != HttpStatusCode.NotFound) { - if (ex.StatusCode != HttpStatusCode.NotFound) - { - throw; - } + throw; } - - killed += 1; } - return killed; + killed += 1; } - private static async Task GetQueueMessageCountAsync(string queueNameStr) - { - var managementUri = new Uri("http://localhost:15672"); - using var managementClient = new ManagementClient(managementUri, "guest", "guest"); + return killed; + } - var queueName = new QueueName(queueNameStr, "/"); - Queue queue = await managementClient.GetQueueAsync(queueName); - return queue.MessagesReady; - } + private static async Task GetQueueMessageCountAsync(string queueNameStr) + { + var managementUri = new Uri("http://localhost:15672"); + using var managementClient = new ManagementClient(managementUri, "guest", "guest"); - private static async Task CheckExchangeAsync(string exchangeNameStr, bool checkExisting = true) - { - // Assume success - bool rv = true; + var queueName = new QueueName(queueNameStr, "/"); + Queue queue = await managementClient.GetQueueAsync(queueName); + return queue.MessagesReady; + } + + private static async Task CheckExchangeAsync(string exchangeNameStr, bool checkExisting = true) + { + // Assume success + bool rv = true; - var managementUri = new Uri("http://localhost:15672"); - using var managementClient = new ManagementClient(managementUri, "guest", "guest"); + var managementUri = new Uri("http://localhost:15672"); + using var managementClient = new ManagementClient(managementUri, "guest", "guest"); - var exchangeName = new ExchangeName(exchangeNameStr, "/"); - try + var exchangeName = new ExchangeName(exchangeNameStr, "/"); + try + { + Exchange? exchange = await managementClient.GetExchangeAsync(exchangeName); + if (checkExisting) + { + rv = exchange is not null; + } + else + { + rv = exchange is null; + } + } + catch (UnexpectedHttpStatusCodeException ex) + { + if (ex.StatusCode == HttpStatusCode.NotFound) { - Exchange? exchange = await managementClient.GetExchangeAsync(exchangeName); if (checkExisting) { - rv = exchange is not null; + rv = false; } else { - rv = exchange is null; + rv = true; } } - catch (UnexpectedHttpStatusCodeException ex) + else { - if (ex.StatusCode == HttpStatusCode.NotFound) - { - if (checkExisting) - { - rv = false; - } - else - { - rv = true; - } - } - else - { - throw; - } + throw; } - - return rv; } - private static async Task CheckQueueAsync(string queueNameStr, bool checkExisting = true) - { - // Assume success - bool rv = true; + return rv; + } - var managementUri = new Uri("http://localhost:15672"); - using var managementClient = new ManagementClient(managementUri, "guest", "guest"); + private static async Task CheckQueueAsync(string queueNameStr, bool checkExisting = true) + { + // Assume success + bool rv = true; - var queueName = new QueueName(queueNameStr, "/"); - try + var managementUri = new Uri("http://localhost:15672"); + using var managementClient = new ManagementClient(managementUri, "guest", "guest"); + + var queueName = new QueueName(queueNameStr, "/"); + try + { + Queue? queue = await managementClient.GetQueueAsync(queueName); + if (checkExisting) + { + rv = queue is not null; + } + else + { + rv = queue is null; + } + } + catch (UnexpectedHttpStatusCodeException ex) + { + if (ex.StatusCode == HttpStatusCode.NotFound) { - Queue? queue = await managementClient.GetQueueAsync(queueName); if (checkExisting) { - rv = queue is not null; + rv = false; } else { - rv = queue is null; + rv = true; } } - catch (UnexpectedHttpStatusCodeException ex) + else { - if (ex.StatusCode == HttpStatusCode.NotFound) + throw; + } + } + + return rv; + } + + private static async Task CheckBindingsBetweenExchangeAndQueueAsync(string exchangeNameStr, string queueNameStr, + Dictionary? args = null, bool checkExisting = true) + { + // Assume success + bool rv = true; + + var managementUri = new Uri("http://localhost:15672"); + using var managementClient = new ManagementClient(managementUri, "guest", "guest"); + + var exchangeName = new ExchangeName(exchangeNameStr, "/"); + var queueName = new QueueName(queueNameStr, "/"); + try + { + IReadOnlyList bindings = await managementClient.GetQueueBindingsAsync(exchangeName, queueName); + if (checkExisting) + { + if (args is not null) { - if (checkExisting) + // We're checking that arguments are equivalent, too + foreach (Binding b in bindings) { - rv = false; - } - else - { - rv = true; + if (b.Arguments is null) + { + rv = false; + break; + } + + // Check only the key to avoid conversion value problems + // on the test is enough to avoid to put the same key + // at some point we could add keyValuePair.Value == keyValuePairResult.Value + // keyValuePairResult.Value is a json object + IEnumerable results = b.Arguments.Keys.Intersect(args.Keys, StringComparer.OrdinalIgnoreCase); + if (results.Count() == args.Count) + { + rv = true; + break; + } } } else { - throw; + if (bindings.Count == 0) + { + rv = false; + } } } - - return rv; - } - - private static async Task CheckBindingsBetweenExchangeAndQueueAsync(string exchangeNameStr, string queueNameStr, - Dictionary? args = null, bool checkExisting = true) - { - // Assume success - bool rv = true; - - var managementUri = new Uri("http://localhost:15672"); - using var managementClient = new ManagementClient(managementUri, "guest", "guest"); - - var exchangeName = new ExchangeName(exchangeNameStr, "/"); - var queueName = new QueueName(queueNameStr, "/"); - try + else { - IReadOnlyList bindings = await managementClient.GetQueueBindingsAsync(exchangeName, queueName); - if (checkExisting) + if (args is not null) { - if (args is not null) + bool foundMatchingBinding = false; + // We're checking that no bindings have the passed-in args + // So, if we go through all bindings and all args are different, + // we can assume the binding we're checking for is gone + foreach (Binding b in bindings) { - // We're checking that arguments are equivalent, too - foreach (Binding b in bindings) + if (b.Arguments is not null) { - if (b.Arguments is null) - { - rv = false; - break; - } - // Check only the key to avoid conversion value problems // on the test is enough to avoid to put the same key // at some point we could add keyValuePair.Value == keyValuePairResult.Value @@ -379,121 +419,108 @@ private static async Task CheckBindingsBetweenExchangeAndQueueAsync(string IEnumerable results = b.Arguments.Keys.Intersect(args.Keys, StringComparer.OrdinalIgnoreCase); if (results.Count() == args.Count) { - rv = true; + foundMatchingBinding = true; break; } } } - else + + if (foundMatchingBinding) { - if (bindings.Count == 0) - { - rv = false; - } + rv = false; } } else { - if (args is not null) - { - bool foundMatchingBinding = false; - // We're checking that no bindings have the passed-in args - // So, if we go through all bindings and all args are different, - // we can assume the binding we're checking for is gone - foreach (Binding b in bindings) - { - if (b.Arguments is not null) - { - // Check only the key to avoid conversion value problems - // on the test is enough to avoid to put the same key - // at some point we could add keyValuePair.Value == keyValuePairResult.Value - // keyValuePairResult.Value is a json object - IEnumerable results = b.Arguments.Keys.Intersect(args.Keys, StringComparer.OrdinalIgnoreCase); - if (results.Count() == args.Count) - { - foundMatchingBinding = true; - break; - } - } - } - - if (foundMatchingBinding) - { - rv = false; - } - } - else + if (bindings.Count > 0) { - if (bindings.Count > 0) - { - rv = false; - } + rv = false; } } } - catch (UnexpectedHttpStatusCodeException ex) + } + catch (UnexpectedHttpStatusCodeException ex) + { + if (ex.StatusCode == HttpStatusCode.NotFound) { - if (ex.StatusCode == HttpStatusCode.NotFound) - { - if (checkExisting) - { - rv = false; - } - } - else + if (checkExisting) { - throw; + rv = false; } } - - return rv; + else + { + throw; + } } - private static async Task CheckBindingsBetweenExchangeAndExchangeAsync(string sourceExchangeNameStr, string destinationExchangeNameStr, - bool checkExisting = true) - { - // Assume success - bool rv = true; + return rv; + } - var managementUri = new Uri("http://localhost:15672"); - using var managementClient = new ManagementClient(managementUri, "guest", "guest"); + private static async Task CheckBindingsBetweenExchangeAndExchangeAsync(string sourceExchangeNameStr, string destinationExchangeNameStr, + bool checkExisting = true) + { + // Assume success + bool rv = true; - var sourceExchangeName = new ExchangeName(sourceExchangeNameStr, "/"); - var destinationExchangeName = new ExchangeName(destinationExchangeNameStr, "/"); - try + var managementUri = new Uri("http://localhost:15672"); + using var managementClient = new ManagementClient(managementUri, "guest", "guest"); + + var sourceExchangeName = new ExchangeName(sourceExchangeNameStr, "/"); + var destinationExchangeName = new ExchangeName(destinationExchangeNameStr, "/"); + try + { + IReadOnlyList bindings = await managementClient.GetExchangeBindingsAsync(sourceExchangeName, destinationExchangeName); + if (checkExisting) { - IReadOnlyList bindings = await managementClient.GetExchangeBindingsAsync(sourceExchangeName, destinationExchangeName); - if (checkExisting) - { - if (bindings.Count == 0) - { - rv = false; - } - } - else + if (bindings.Count == 0) { - if (bindings.Count > 0) - { - rv = false; - } + rv = false; } } - catch (UnexpectedHttpStatusCodeException ex) + else { - if (ex.StatusCode == HttpStatusCode.NotFound) + if (bindings.Count > 0) { - if (checkExisting) - { - rv = false; - } + rv = false; } - else + } + } + catch (UnexpectedHttpStatusCodeException ex) + { + if (ex.StatusCode == HttpStatusCode.NotFound) + { + if (checkExisting) { - throw; + rv = false; } } + else + { + throw; + } + } - return rv; + return rv; + } + + private static bool InitIsRunningInCI() + { + if (bool.TryParse(Environment.GetEnvironmentVariable("CI"), out bool ci)) + { + if (ci == true) + { + return true; + } + } + else if (bool.TryParse(Environment.GetEnvironmentVariable("GITHUB_ACTIONS"), out ci)) + { + if (ci == true) + { + return true; + } } + + return false; } }