Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the builders #7

Merged
merged 3 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
# rabbitmq-amqp-dotnet-client
# RabbitMQ Amqp1.0 DotNet Client

See the [internal documentation](https://docs.google.com/document/d/1afO2ugGpTIZYUeXH_0GtMxedV51ZzmsbC3-mRdoSI_o/edit#heading=h.kqd38uu4iku)



38 changes: 32 additions & 6 deletions RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,36 @@ public void VisitQueues(List<QueueSpec> queueSpec)
}
}

public class AmqpConnection(ConnectionSettings connectionSettings) : IConnection

/// <summary>
/// AmqpConnection is the concrete implementation of <see cref="IConnection"/>
/// It is a wrapper around the AMQP.Net Lite <see cref="Connection"/> class
/// </summary>
public class AmqpConnection : IConnection
{
// The native AMQP.Net Lite connection
private Connection? _nativeConnection;
private readonly AmqpManagement _management = new();
private readonly RecordingTopologyListener _recordingTopologyListener = new();
private readonly ConnectionSettings _connectionSettings;

/// <summary>
/// Creates a new instance of <see cref="AmqpConnection"/>
/// </summary>
/// <param name="connectionSettings"></param>
/// <returns></returns>
public static async Task<AmqpConnection> CreateAsync(ConnectionSettings connectionSettings)
{
var connection = new AmqpConnection(connectionSettings);
await connection.EnsureConnectionAsync();
return connection;
}

private AmqpConnection(ConnectionSettings connectionSettings)
{
_connectionSettings = connectionSettings;
}


public IManagement Management()
{
Expand All @@ -48,13 +73,13 @@ internal async Task EnsureConnectionAsync()

var open = new Open
{
HostName = $"vhost:{connectionSettings.VirtualHost()}",
HostName = $"vhost:{_connectionSettings.VirtualHost()}",
Properties = new Fields()
{
[new Symbol("connection_name")] = connectionSettings.ConnectionName(),
[new Symbol("connection_name")] = _connectionSettings.ConnectionName(),
}
};
_nativeConnection = await Connection.Factory.CreateAsync(connectionSettings.Address, open);
_nativeConnection = await Connection.Factory.CreateAsync(_connectionSettings.Address, open);
_management.Init(
new AmqpManagementParameters(this).TopologyListener(_recordingTopologyListener));

Expand All @@ -80,6 +105,7 @@ [new Symbol("connection_name")] = connectionSettings.ConnectionName(),
}
}


private void OnNewStatus(Status newStatus, Error? error)
{
if (Status == newStatus) return;
Expand All @@ -99,7 +125,7 @@ private ClosedCallback MaybeRecoverConnection()
$"{sender} {error} {Status} " +
$"{_nativeConnection!.IsClosed}");

if (!connectionSettings.RecoveryConfiguration.IsActivate())
if (!_connectionSettings.RecoveryConfiguration.IsActivate())
{
OnNewStatus(Status.Closed, Utils.ConvertError(error));
return;
Expand All @@ -115,7 +141,7 @@ private ClosedCallback MaybeRecoverConnection()
Trace.WriteLine(TraceLevel.Information, "Recovering connection");
await EnsureConnectionAsync();
Trace.WriteLine(TraceLevel.Information, "Recovering topology");
if (connectionSettings.RecoveryConfiguration.IsTopologyActive())
if (_connectionSettings.RecoveryConfiguration.IsTopologyActive())
{
_recordingTopologyListener.Accept(new Visitor(_management));
}
Expand Down
10 changes: 8 additions & 2 deletions RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading.Tasks.Sources;
using Amqp;
using Amqp.Framing;
using Amqp.Types;
Expand All @@ -9,6 +7,11 @@

namespace RabbitMQ.AMQP.Client.Impl;

/// <summary>
/// AmqpManagement implements the IManagement interface and is responsible for managing the AMQP resources.
/// RabbitMQ uses AMQP end point: "/management" to manage the resources like queues, exchanges, and bindings.
/// The management endpoint works like an HTTP RPC endpoint where the client sends a request to the server
/// </summary>
public class AmqpManagement : IManagement
{
private readonly ConcurrentDictionary<string, TaskCompletionSource<Message>> _requests = new();
Expand Down Expand Up @@ -77,6 +80,9 @@ internal void Init(AmqpManagementParameters parameters)
_recordingTopologyListener = parameters.TopologyListener();

EnsureSenderLink();
// by the Management implementation the sender link _must_ be open before the receiver link
// this sleep is to ensure that the sender link is open before the receiver link
// TODO: find a better way to ensure that the sender link is open before the receiver link
Thread.Sleep(500);
EnsureReceiverLink();
_ = Task.Run(async () =>
Expand Down
5 changes: 5 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/AmqpQueueSpecification.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public uint ConsumerCount()
}
}

/// <summary>
/// AmqpQueueSpecification is a concrete implementation of IQueueSpecification
/// It contains the necessary information to declare a queue on the broker
/// </summary>
/// <param name="management"></param>
public class AmqpQueueSpecification(AmqpManagement management) : IQueueSpecification
{
private string? _name;
Expand Down
27 changes: 23 additions & 4 deletions RabbitMQ.AMQP.Client/Impl/ConnectionSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,19 @@ public class ConnectionSettingBuilder
private string _scheme = "AMQP";
private string _connection = "AMQP.NET";
private string _virtualHost = "/";
private IRecoveryConfiguration _recoveryConfiguration = new RecoveryConfiguration();

private IRecoveryConfiguration _recoveryConfiguration = Impl.RecoveryConfiguration.Create();


private ConnectionSettingBuilder()
{
}

public static ConnectionSettingBuilder Create()
{
return new ConnectionSettingBuilder();
}


public ConnectionSettingBuilder Host(string host)
{
_host = host;
Expand Down Expand Up @@ -173,11 +183,20 @@ public override int GetHashCode()
return Address.GetHashCode();
}

public RecoveryConfiguration RecoveryConfiguration { get; set; } = new RecoveryConfiguration();
public RecoveryConfiguration RecoveryConfiguration { get; set; } = RecoveryConfiguration.Create();
}

public class RecoveryConfiguration() : IRecoveryConfiguration
public class RecoveryConfiguration : IRecoveryConfiguration
{
public static RecoveryConfiguration Create()
{
return new RecoveryConfiguration();
}

private RecoveryConfiguration()
{
}

private bool _active = true;
private bool _topology = false;

Expand Down
43 changes: 20 additions & 23 deletions Tests/ConnectionRecoverTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ public class ConnectionRecoverTests
public async void NormalCloseTheStatusShouldBeCorrectAndErrorNull(bool activeRecovery)
{
var connectionName = Guid.NewGuid().ToString();
AmqpConnection connection = new(
new ConnectionSettingBuilder().ConnectionName(connectionName).RecoveryConfiguration(
new RecoveryConfiguration().Activated(activeRecovery).Topology(false)).Build());
var connection = await AmqpConnection.CreateAsync(
ConnectionSettingBuilder.Create().ConnectionName(connectionName).RecoveryConfiguration(
RecoveryConfiguration.Create().Activated(activeRecovery).Topology(false)).Build());

var completion = new TaskCompletionSource();
var listFromStatus = new List<Status>();
var listToStatus = new List<Status>();
Expand All @@ -38,21 +39,18 @@ public async void NormalCloseTheStatusShouldBeCorrectAndErrorNull(bool activeRec
await connection.CloseAsync();
Assert.Equal(Status.Closed, connection.Status);
await completion.Task.WaitAsync(TimeSpan.FromSeconds(5));
Assert.Equal(Status.Closed, listFromStatus[0]);
Assert.Equal(Status.Open, listToStatus[0]);
Assert.Equal(Status.Open, listFromStatus[0]);
Assert.Equal(Status.Closed, listToStatus[0]);
Assert.Null(listError[0]);
Assert.Equal(Status.Open, listFromStatus[1]);
Assert.Equal(Status.Closed, listToStatus[1]);
Assert.Null(listError[1]);
}

[Fact]
public async void UnexpectedCloseTheStatusShouldBeCorrectAndErrorNotNull()
{
var connectionName = Guid.NewGuid().ToString();
AmqpConnection connection = new(
new ConnectionSettingBuilder().ConnectionName(connectionName).RecoveryConfiguration(
new RecoveryConfiguration().Activated(true).Topology(false)).Build());
var connection = await AmqpConnection.CreateAsync(
ConnectionSettingBuilder.Create().ConnectionName(connectionName).RecoveryConfiguration(
RecoveryConfiguration.Create().Activated(true).Topology(false)).Build());
var resetEvent = new ManualResetEvent(false);
var listFromStatus = new List<Status>();
var listToStatus = new List<Status>();
Expand All @@ -70,20 +68,19 @@ public async void UnexpectedCloseTheStatusShouldBeCorrectAndErrorNotNull()
Assert.Equal(Status.Open, connection.Status);
SystemUtils.WaitUntilConnectionIsKilled(connectionName);
resetEvent.WaitOne(TimeSpan.FromSeconds(5));
Assert.Equal(Status.Closed, listFromStatus[0]);
Assert.Equal(Status.Open, listToStatus[0]);
Assert.Null(listError[0]);
Assert.Equal(Status.Open, listFromStatus[1]);
Assert.Equal(Status.Reconneting, listToStatus[1]);
Assert.NotNull(listError[1]);
Assert.Equal(Status.Reconneting, listFromStatus[2]);
Assert.Equal(Status.Open, listToStatus[2]);
Assert.Null(listError[2]);
SystemUtils.WaitUntil(() => (listFromStatus.Count >= 2));
Assert.Equal(Status.Open, listFromStatus[0]);
Assert.Equal(Status.Reconneting, listToStatus[0]);
Assert.NotNull(listError[0]);
Assert.Equal(Status.Reconneting, listFromStatus[1]);
Assert.Equal(Status.Open, listToStatus[1]);
Assert.Null(listError[1]);
resetEvent.Reset();
resetEvent.Set();
await connection.CloseAsync();
resetEvent.WaitOne(TimeSpan.FromSeconds(5));
Assert.Equal(Status.Open, listFromStatus[3]);
Assert.Equal(Status.Closed, listToStatus[3]);
Assert.Null(listError[3]);
Assert.Equal(Status.Open, listFromStatus[2]);
Assert.Equal(Status.Closed, listToStatus[2]);
Assert.Null(listError[2]);
}
}
45 changes: 14 additions & 31 deletions Tests/ConnectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void ValidateAddress()
[Fact]
public void ValidateAddressBuilder()
{
var address = new ConnectionSettingBuilder()
var address = ConnectionSettingBuilder.Create()
.Host("localhost")
.Port(5672)
.VirtualHost("v1")
Expand All @@ -57,39 +57,22 @@ public void ValidateAddressBuilder()
[Fact]
public async void RaiseErrorsIfTheParametersAreNotValid()
{
AmqpConnection connection = new(new
ConnectionSettingBuilder().VirtualHost("wrong_vhost").Build());
await Assert.ThrowsAsync<ConnectionException>(async () => await connection.ConnectAsync());
Assert.Equal(Status.Closed, connection.Status);


connection = new AmqpConnection(new
ConnectionSettingBuilder().Host("wrong_host").Build());
await Assert.ThrowsAsync<SocketException>(async () => await connection.ConnectAsync());
Assert.Equal(Status.Closed, connection.Status);


connection = new AmqpConnection(new
ConnectionSettingBuilder().Password("wrong_password").Build());
await Assert.ThrowsAsync<ConnectionException>(async () => await connection.ConnectAsync());
Assert.Equal(Status.Closed, connection.Status);

await Assert.ThrowsAsync<ConnectionException>(async () =>
await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().VirtualHost("wrong_vhost").Build()));

await Assert.ThrowsAsync<SocketException>(async () =>
await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Host("wrong_host").Build()));

connection = new AmqpConnection(new
ConnectionSettingBuilder().User("wrong_user").Build());
await Assert.ThrowsAsync<ConnectionException>(async () => await connection.ConnectAsync());
Assert.Equal(Status.Closed, connection.Status);


connection = new AmqpConnection(new
ConnectionSettingBuilder().Port(1234).Build());
await Assert.ThrowsAsync<SocketException>(async () => await connection.ConnectAsync());
Assert.Equal(Status.Closed, connection.Status);
await Assert.ThrowsAsync<ConnectionException>(async () =>
await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Password("wrong_password").Build()));


connection = new AmqpConnection(new
ConnectionSettingBuilder().Scheme("wrong_scheme").Build());
await Assert.ThrowsAsync<ConnectionException>(async () => await connection.ConnectAsync());
Assert.Equal(Status.Closed, connection.Status);
await Assert.ThrowsAsync<ConnectionException>(async () =>
await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().User("wrong_user").Build()));

await Assert.ThrowsAsync<SocketException>(async () =>
await AmqpConnection.CreateAsync( ConnectionSettingBuilder.Create().Port(1234).Build()));

}
}
14 changes: 6 additions & 8 deletions Tests/ManagementTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ await Assert.ThrowsAsync<ModelException>(async () =>
[InlineData(QueueType.STREAM)]
public async void DeclareQueueWithNoNameShouldGenerateClientSideName(QueueType type)
{
AmqpConnection connection = new(new ConnectionSettingBuilder().Build());
var connection = await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Build());
await connection.ConnectAsync();
var management = connection.Management();
var queueInfo = await management.Queue().Type(type).Declare();
Expand All @@ -153,13 +153,11 @@ public async void DeclareQueueWithNoNameShouldGenerateClientSideName(QueueType t
public async void DeclareQueueWithQueueInfoValidation(
bool durable, bool autoDelete, bool exclusive, QueueType type)
{
AmqpConnection connection = new(new ConnectionSettingBuilder().Build());
var connection = await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Build());
await connection.ConnectAsync();
var management = connection.Management();
var queueInfo = await management.Queue().Name("validate_queue_info").
AutoDelete(autoDelete).
Exclusive(exclusive).
Type(type)
var queueInfo = await management.Queue().Name("validate_queue_info").AutoDelete(autoDelete).Exclusive(exclusive)
.Type(type)
.Declare();
Assert.Equal("validate_queue_info", queueInfo.Name());
Assert.Equal((ulong)0, queueInfo.MessageCount());
Expand All @@ -169,7 +167,7 @@ public async void DeclareQueueWithQueueInfoValidation(
Assert.NotNull(queueInfo.Leader());
Assert.Equal(queueInfo.Durable(), durable);
Assert.Equal(queueInfo.AutoDelete(), autoDelete);
Assert.Equal(queueInfo.Exclusive(),exclusive);
Assert.Equal(queueInfo.Exclusive(), exclusive);
await management.QueueDeletion().Delete("validate_queue_info");
await connection.CloseAsync();
Assert.Equal(Status.Closed, management.Status);
Expand All @@ -178,7 +176,7 @@ public async void DeclareQueueWithQueueInfoValidation(
[Fact]
public async void TopologyCountShouldFollowTheQueueDeclaration()
{
AmqpConnection connection = new(new ConnectionSettingBuilder().Build());
var connection = await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Build());
await connection.ConnectAsync();
var management = connection.Management();
for (var i = 1; i < 7; i++)
Expand Down
14 changes: 5 additions & 9 deletions docs/Examples/GettingStarted/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@

Trace.WriteLine(TraceLevel.Information, "Starting");
var connectionName = Guid.NewGuid().ToString();
AmqpConnection connection = new(
new ConnectionSettingBuilder().
ConnectionName(connectionName).
RecoveryConfiguration(new RecoveryConfiguration().
Activated(true).
Topology(true)).
Build());

await connection.ConnectAsync();
var connection = await AmqpConnection.CreateAsync(
ConnectionSettingBuilder.Create().ConnectionName(connectionName)
.RecoveryConfiguration(RecoveryConfiguration.Create().Activated(true).Topology(true)).Build());

Trace.WriteLine(TraceLevel.Information, "Connected");
var management = connection.Management();
await management.Queue($"my-first-queue").Declare();
Expand All @@ -30,4 +26,4 @@
Console.WriteLine("Press any key to close connection");
Console.ReadKey();
await connection.CloseAsync();
Trace.WriteLine(TraceLevel.Information, "Closed");
Trace.WriteLine(TraceLevel.Information, "Closed");