Skip to content

Commit

Permalink
Add documentation (#79)
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio authored Oct 16, 2024
1 parent 49f5f16 commit 7cfdcbb
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 17 deletions.
22 changes: 13 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
# RabbitMQ AMQP 1.0 .NET Client

This library is meant to be used with RabbitMQ 4.0.
Still work in progress suitable for testing in pre-production environments
Suitable for testing in pre-production environments

## How to Run

- Start the broker with `./.ci/ubuntu/one-node/gha-setup.sh start`. Note that this has been tested on Ubuntu 22 with docker.
- Run the tests with ` dotnet test ./Build.csproj --logger "console;verbosity=detailed"`
- Stop RabbitMQ with `./.ci/ubuntu/one-node/gha-setup.sh stop`
## Install

## Getting Started
The client is distributed via [NuGet](https://www.nuget.org/packages/RabbitMQ.AMQP.Client/).

You can find an example in: `docs/Examples/GettingStarted`
## Examples

## Install
Inside the `docs/Examples` directory you can find examples of how to use the client.

The client is distributed via [NuGet](https://www.nuget.org/packages/RabbitMQ.AMQP.Client/).

## Documentation

- [Client Guide](https://www.rabbitmq.com/client-libraries/amqp-client-libraries)
- [API](https://rabbitmq.github.io/rabbitmq-amqp-dotnet-client/api/RabbitMQ.AMQP.Client.html)


## How to Run

- Start the broker with `./.ci/ubuntu/one-node/gha-setup.sh start`. Note that this has been tested on Ubuntu 22 with docker.
- Run the tests with ` dotnet test ./Build.csproj --logger "console;verbosity=detailed"`
- Stop RabbitMQ with `./.ci/ubuntu/one-node/gha-setup.sh stop`
66 changes: 66 additions & 0 deletions RabbitMQ.AMQP.Client/IRpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,89 @@ public interface IRpcClientAddressBuilder : IAddressBuilder<IRpcClientAddressBui
IRpcClientBuilder RpcClient();
}

/// <summary>
/// IRpcClientBuilder is the interface for creating an RPC client.
/// See also <seealso cref="IRpcClient"/> and <seealso cref="IRpcServerBuilder"/>
/// </summary>
public interface IRpcClientBuilder
{
/// <summary>
/// Request address where the client sends requests.
/// The server consumes requests from this address.
/// </summary>
/// <returns></returns>
IRpcClientAddressBuilder RequestAddress();

/// <summary>
/// The queue from which requests are consumed.
/// if not set the client will create a temporary queue.
/// </summary>
/// <param name="replyToQueueName"> The queue name</param>
/// <returns></returns>
IRpcClientBuilder ReplyToQueue(string replyToQueueName);

IRpcClientBuilder ReplyToQueue(IQueueSpecification replyToQueue);

/// <summary>
/// Extracts the correlation id from the request message.
/// each message has a correlation id that is used to match the request with the response.
/// There are default implementations for the correlation id extractor.
/// With this method, you can provide a custom implementation.
/// </summary>
/// <param name="correlationIdExtractor"></param>
/// <returns></returns>
IRpcClientBuilder CorrelationIdExtractor(Func<IMessage, object>? correlationIdExtractor);

/// <summary>
/// Post processes the reply message before sending it to the server.
/// The object parameter is the correlation id extracted from the request message.
/// There are default implementations for the reply post processor that use the correlationId() field
/// to set the correlation id of the reply message.
/// With this method, you can provide a custom implementation.
/// </summary>
/// <param name="requestPostProcessor"></param>
/// <returns></returns>
IRpcClientBuilder RequestPostProcessor(Func<IMessage, object, IMessage>? requestPostProcessor);

/// <summary>
/// Client and Server must agree on the correlation id.
/// The client will provide the correlation id to send to the server.
/// If the default correlation id is not suitable, you can provide a custom correlation id supplier.
/// Be careful to provide a unique correlation id for each request.
/// </summary>
/// <param name="correlationIdSupplier"></param>
/// <returns></returns>

IRpcClientBuilder CorrelationIdSupplier(Func<object>? correlationIdSupplier);

/// <summary>
/// The time to wait for a reply from the server.
/// </summary>
/// <param name="timeout"></param>
/// <returns></returns>
IRpcClientBuilder Timeout(TimeSpan timeout);
/// <summary>
/// Build and return the RPC client.
/// </summary>
/// <returns></returns>
Task<IRpcClient> BuildAsync();
}

/// <summary>
/// IRpcClient is the interface for an RPC client.
/// See also <seealso cref="IRpcServer"/> and <seealso cref="IRpcClientBuilder"/>
/// </summary>
public interface IRpcClient : ILifeCycle
{
/// <summary>
/// PublishAsync sends a request message to the server and blocks the thread until the response is received.
/// The PublishAsync is thread-safe and can be called from multiple threads.
/// The Function returns the response message.
/// If the server does not respond within the timeout, the function throws a TimeoutException.
/// </summary>
/// <param name="message"> The request message</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns></returns>
Task<IMessage> PublishAsync(IMessage message, CancellationToken cancellationToken = default);
}
}
22 changes: 21 additions & 1 deletion RabbitMQ.AMQP.Client/IRpcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@

namespace RabbitMQ.AMQP.Client
{
public delegate Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request);

/// <summary>
/// IRpcServerBuilder is the interface for creating an RPC server.
/// The RPC server consumes requests from a queue and sends replies to a reply queue.
/// See also <seealso cref="IRpcServer"/> and <seealso cref="IRpcClientBuilder"/>
/// </summary>
public interface IRpcServerBuilder
{
/// <summary>
Expand Down Expand Up @@ -45,9 +49,25 @@ public interface IRpcServerBuilder
/// <returns></returns>
IRpcServerBuilder Handler(RpcHandler handler);

/// <summary>
/// Build and return the RPC server.
/// </summary>
/// <returns></returns>
Task<IRpcServer> BuildAsync();
}

/// <summary>
/// Event handler for handling RPC requests.
/// </summary>
public delegate Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request);

/// <summary>
/// IRpcServer interface for creating an RPC server.
/// The RPC is simulated by sending a request message and receiving a reply message.
/// Where the client sends the queue where wants to receive the reply.
/// RPC client ---> request queue ---> RPC server ---> reply queue ---> RPC client
/// See also <seealso cref="IRpcClient"/>
/// </summary>
public interface IRpcServer : ILifeCycle
{

Expand Down
6 changes: 0 additions & 6 deletions RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,6 @@ public override async Task CloseAsync()
}
}

/// <summary>
/// PublishAsync sends a request message to the server and blocks the thread until the response is received.
/// </summary>
/// <param name="message"> The request message</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns></returns>
public async Task<IMessage> PublishAsync(IMessage message, CancellationToken cancellationToken = default)
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ await Utils.WaitWithBackOffUntilFuncAsync(async () =>
{
Trace.WriteLine(TraceLevel.Error, $"Failed to send reply, retrying in {span}");
}
}, 3).ConfigureAwait(false);
}, 5).ConfigureAwait(false);
}
})
.Queue(_configuration.RequestQueue).BuildAndStartAsync()
Expand Down
36 changes: 36 additions & 0 deletions Tests/Rpc/RpcServerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -318,5 +318,41 @@ public async Task RpcClientMultiThreadShouldBeSafe()
await rpcServer.CloseAsync();
await rpcClient.CloseAsync();
}

/// <summary>
/// The RPC client `PublishAsync` should raise a timeout exception if the server does not reply within the timeout
/// </summary>
[Fact]
public async Task RpcClientShouldRaiseTimeoutError()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);
string requestQueue = _queueName;
await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync();
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>
{
var reply = context.Message("pong");
object millisecondsToWait = request.ApplicationProperty("wait");
Thread.Sleep(TimeSpan.FromMilliseconds((int)millisecondsToWait));
return Task.FromResult(reply);
}).RequestQueue(_queueName).BuildAsync();
Assert.NotNull(rpcServer);

IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress()
.Queue(requestQueue)
.RpcClient()
.Timeout(TimeSpan.FromMilliseconds(300))
.BuildAsync();

IMessage reply = await rpcClient.PublishAsync(
new AmqpMessage("ping").ApplicationProperty("wait", 1));
Assert.Equal("pong", reply.Body());

await Assert.ThrowsAsync<TimeoutException>(() => rpcClient.PublishAsync(
new AmqpMessage("ping").ApplicationProperty("wait", 700)));

await rpcClient.CloseAsync();
await rpcServer.CloseAsync();
}
}
}
8 changes: 8 additions & 0 deletions docs/Examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# RabbitMQ AMQP 1.0 .NET Client examples

This directory contains examples of how to use the RabbitMQ AMQP 1.0 .NET client.

- Getting Started with the Client [here](./GettingStarted/)
- RPC Server and Client [here](./Rpc/)
- How to write a reliable client [here](./HAClient/)
- Performance Test [here](./PerformanceTest/). You can tune some parameters in the `Program.cs` file.

0 comments on commit 7cfdcbb

Please sign in to comment.