Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement consumer part #22

Merged
merged 7 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ You can find an example in: `docs/Examples/GettingStarted`
## TODO

- [x] Declare queues
- [ ] Declare exchanges
- [ ] Declare bindings
- [x] Declare exchanges
- [x] Declare bindings
- [x] Simple Publish messages
- [x] Implement backpressure ( atm it is implemented with MaxInflightMessages `MaxInFlight(2000).`)
- [ ] Simple Consume messages
- [x] Implement backpressure (it is implemented with MaxInflightMessages `MaxInFlight(2000).`)
- [x] Simple Consume messages
- [ ] Complete the consumer part with `pause` and `unpause`
- [ ] Complete the binding/unbinding with the special characters
- [ ] Complete the queues/exchanges name with the special characters
- [ ] Implement metrics ( See `System.Diagnostics.DiagnosticSource` [Link](https://learn.microsoft.com/en-us/dotnet/core/diagnostics/metrics-instrumentation) )
- [x] Recovery connection on connection lost
- [x] Recovery management on connection lost
Expand Down
12 changes: 9 additions & 3 deletions RabbitMQ.AMQP.Client/IClosable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,17 @@ public override string ToString()

public interface IClosable // TODO: Create an abstract class with the event and the State property
{
public State State { get; }

Task CloseAsync();

public delegate void LifeCycleCallBack(object sender, State previousState, State currentState, Error? failureCause);
}

public delegate void LifeCycleCallBack(object sender, State previousState, State currentState, Error? failureCause);

public interface IResourceStatus
{
public State State { get; }


event LifeCycleCallBack ChangeState;

}
12 changes: 10 additions & 2 deletions RabbitMQ.AMQP.Client/IConnection.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
using System.Collections.ObjectModel;

namespace RabbitMQ.AMQP.Client;

public class ConnectionException(string? message) : Exception(message);

public interface IConnection
public interface IConnection : IResourceStatus, IClosable
{
IManagement Management();
Task ConnectAsync();

IPublisherBuilder PublisherBuilder();

IConsumerBuilder ConsumerBuilder();


public ReadOnlyCollection<IPublisher> GetPublishers();
}
30 changes: 30 additions & 0 deletions RabbitMQ.AMQP.Client/IConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using Amqp.Listener;

namespace RabbitMQ.AMQP.Client;


public class ConsumerException(string message) : Exception(message);
public delegate void MessageHandler(IContext context, IMessage message);

public interface IConsumer : IResourceStatus, IClosable
{
void Pause();

long UnsettledMessageCount();

void Unpause();
}

public interface IMessageHandler
{
void Handle(Context context, IMessage message);
}

public interface IContext
{
void Accept();

void Discard();

void Requeue();
}
41 changes: 41 additions & 0 deletions RabbitMQ.AMQP.Client/IConsumerBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
namespace RabbitMQ.AMQP.Client;


public enum StreamOffsetSpecification
{
First,
Last,
Next
}

public interface IConsumerBuilder
{
IConsumerBuilder Queue(string queue);

IConsumerBuilder MessageHandler(MessageHandler handler);

IConsumerBuilder InitialCredits(int initialCredits);

IStreamOptions Stream();

IConsumer Build();

public interface IStreamOptions
{
IStreamOptions Offset(long offset);

// IStreamOptions offset(Instant timestamp);

IStreamOptions Offset(StreamOffsetSpecification specification);

IStreamOptions Offset(string interval);

IStreamOptions FilterValues(string[] values);

IStreamOptions FilterMatchUnfiltered(bool matchUnfiltered);

IConsumerBuilder Builder();
}


}
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/IManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ public class ModelException(string message) : Exception(message);

public class PreconditionFailedException(string message) : Exception(message);

public interface IManagement : IClosable
public interface IManagement : IResourceStatus, IClosable
{
IQueueSpecification Queue();
IQueueSpecification Queue(string name);
Expand Down
6 changes: 6 additions & 0 deletions RabbitMQ.AMQP.Client/IMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ namespace RabbitMQ.AMQP.Client;

public interface IMessage
{
// TODO: Complete the IMessage interface with all the properties
public object Body();

// properties
string MessageId();
IMessage MessageId(string id);
Expand All @@ -16,4 +18,8 @@ public interface IMessage
string Subject();
IMessage Subject(string subject);


public IMessage Annotation(string key, object value);

public object Annotation(string key);
}
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/IPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class OutcomeDescriptor(ulong code, string description, OutcomeState stat

public delegate void OutcomeDescriptorCallback(IMessage message, OutcomeDescriptor outcomeDescriptor);

public interface IPublisher : IClosable
public interface IPublisher : IResourceStatus, IClosable
{
Task Publish(IMessage message,
OutcomeDescriptorCallback outcomeCallback); // TODO: Add CancellationToken and callBack
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ namespace RabbitMQ.AMQP.Client.Impl;

public class AmqpClosedException(string message) : Exception(message);

public abstract class AbstractClosable : IClosable
public abstract class AbstractResourceStatus : IResourceStatus
{
public State State { get; internal set; } = State.Closed;
public abstract Task CloseAsync();
protected void ThrowIfClosed()
{
if (State == State.Closed)
Expand All @@ -27,5 +26,5 @@ protected void OnNewStatus(State newState, Error? error)
ChangeState?.Invoke(this, oldStatus, newState, error);
}

public event IClosable.LifeCycleCallBack? ChangeState;
public event LifeCycleCallBack? ChangeState;
}
49 changes: 35 additions & 14 deletions RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ await Management.Queue(spec).Declare()
/// AmqpConnection is the concrete implementation of <see cref="IConnection"/>
/// It is a wrapper around the AMQP.Net Lite <see cref="Connection"/> class
/// </summary>
public class AmqpConnection : AbstractClosable, IConnection
public class AmqpConnection : AbstractResourceStatus, IConnection
{
private const string ConnectionNotRecoveredCode = "CONNECTION_NOT_RECOVERED";
private const string ConnectionNotRecoveredMessage = "Connection not recovered";
Expand All @@ -45,7 +45,9 @@ public class AmqpConnection : AbstractClosable, IConnection

private readonly AmqpManagement _management = new();
private readonly RecordingTopologyListener _recordingTopologyListener = new();
private readonly TaskCompletionSource<bool> _connectionCloseTaskCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);

private readonly TaskCompletionSource<bool> _connectionCloseTaskCompletionSource =
new(TaskCreationOptions.RunContinuationsAsynchronously);

private readonly ConnectionSettings _connectionSettings;
internal readonly AmqpSessionManagement NativePubSubSessions;
Expand All @@ -60,9 +62,12 @@ public class AmqpConnection : AbstractClosable, IConnection
/// They key is the publisher Id ( a Guid)
/// See <see cref="AmqpPublisher"/>
/// </summary>
internal ConcurrentDictionary<string, AmqpPublisher> Publishers { get; } = new();
internal ConcurrentDictionary<string, IPublisher> Publishers { get; } = new();

internal ConcurrentDictionary<string, IConsumer> Consumers { get; } = new();


public ReadOnlyCollection<AmqpPublisher> GetPublishers()
public ReadOnlyCollection<IPublisher> GetPublishers()
{
return Publishers.Values.ToList().AsReadOnly();
}
Expand All @@ -75,7 +80,7 @@ public ReadOnlyCollection<AmqpPublisher> GetPublishers()
/// </summary>
/// <param name="connectionSettings"></param>
/// <returns></returns>
public static async Task<AmqpConnection> CreateAsync(ConnectionSettings connectionSettings)
public static async Task<IConnection> CreateAsync(ConnectionSettings connectionSettings)
{
var connection = new AmqpConnection(connectionSettings);
await connection.ConnectAsync()
Expand Down Expand Up @@ -105,15 +110,26 @@ private void ResumeAllPublishers()
/// </summary>
private async Task CloseAllPublishers()
{
var cloned = new List<AmqpPublisher>(Publishers.Values);
var cloned = new List<IPublisher>(Publishers.Values);

foreach (AmqpPublisher publisher in cloned)
foreach (IPublisher publisher in cloned)
{
await publisher.CloseAsync()
.ConfigureAwait(false);
}
}

private async Task CloseAllConsumers()
{
var cloned = new List<IConsumer>(Consumers.Values);

foreach (IConsumer consumer in cloned)
{
await consumer.CloseAsync()
.ConfigureAwait(false);
}
}

private AmqpConnection(ConnectionSettings connectionSettings)
{
_connectionSettings = connectionSettings;
Expand All @@ -125,7 +141,12 @@ public IManagement Management()
return _management;
}

public Task ConnectAsync()
public IConsumerBuilder ConsumerBuilder()
{
return new AmqpConsumerBuilder(this);
}

private Task ConnectAsync()
{
EnsureConnection();
OnNewStatus(State.Open, null);
Expand Down Expand Up @@ -288,7 +309,7 @@ await _recordingTopologyListener.Accept(visitor)
{
_semaphoreClose.Release();
}

_connectionCloseTaskCompletionSource.SetResult(true);
};
}
Expand All @@ -306,14 +327,14 @@ public IPublisherBuilder PublisherBuilder()
}


public override async Task CloseAsync()
public async Task CloseAsync()
{
await _semaphoreClose.WaitAsync()
.ConfigureAwait(false);
try
{
await CloseAllPublishers()
.ConfigureAwait(false);
await CloseAllPublishers().ConfigureAwait(false);
await CloseAllConsumers().ConfigureAwait(false);

_recordingTopologyListener.Clear();

Expand All @@ -337,10 +358,10 @@ await _management.CloseAsync()
{
_semaphoreClose.Release();
}

await _connectionCloseTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(10))
.ConfigureAwait(false);

OnNewStatus(State.Closed, null);
}

Expand Down
Loading