Skip to content

Commit

Permalink
feat(server): restore lost consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
thomashilzendegen committed Aug 22, 2023
1 parent d72eb2f commit eb9272f
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 44 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 28 additions & 36 deletions src/Thinktecture.Relay.Server.Protocols.RabbitMq/ModelExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Collections.Generic;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

Expand All @@ -13,19 +13,41 @@ internal static class ModelExtensions
/// consume it with an <see cref="AsyncEventingBasicConsumer"/>.
/// </summary>
/// <param name="model">The <see cref="IModel"/> used to communicate with Rabbit MQ.</param>
/// <param name="logger">An <see cref="ILogger"/>.</param>
/// <param name="queueName">The name of the queue.</param>
/// <param name="autoAck">The consumer should automatically acknowledge the message.</param>
/// <param name="durable">The queue should survive a broker restart.</param>
/// <param name="autoDelete">The queue should be deleted when the last consumer goes away.</param>
/// <returns>An <see cref="AsyncEventingBasicConsumer"/> consuming the queue.</returns>
public static AsyncEventingBasicConsumer ConsumeQueue(this IModel model, string queueName, bool autoAck = true,
bool durable = true,
bool autoDelete = true)
public static AsyncEventingBasicConsumer ConsumeQueue(this IModel model, ILogger logger, string queueName,
bool autoAck = true, bool durable = true, bool autoDelete = true)
{
model.EnsureQueue(queueName, durable, autoDelete);

var consumer = new AsyncEventingBasicConsumer(model);
model.BasicConsume(queueName, autoAck, consumer);
var consumerTag = model.BasicConsume(queueName, autoAck, consumer);

logger.LogTrace("Consuming {QueueName} with consumer {ConsumerTag}", queueName, consumerTag);

consumer.ConsumerCancelled += (sender, _) =>
{
if (((AsyncDefaultBasicConsumer)sender).ShutdownReason == null)
{
logger.LogWarning("Lost consumer {ConsumerTag} on queue {QueueName}", consumerTag, queueName);
var oldConsumerTag = consumerTag;
lock (model)
{
model.EnsureQueue(queueName, durable, autoDelete);
consumerTag = model.BasicConsume(queueName, autoAck, consumer);
}
logger.LogInformation("Restored consumer {ConsumerTag} on queue {QueueName} (was {OldConsumerTag})", consumerTag, queueName, oldConsumerTag);
}
return Task.CompletedTask;
};

return consumer;
}
Expand Down Expand Up @@ -56,8 +78,7 @@ private static void EnsureQueue(this IModel model, string queueName, bool durabl
/// <param name="autoDelete">The queue should be deleted when the last consumer goes away.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
public static Task PublishJsonAsync(this IModel model, string queueName, object payload, bool persistent = true,
bool durable = true,
bool autoDelete = true)
bool durable = true, bool autoDelete = true)
{
var properties = model.CreateBasicProperties();
properties.Persistent = persistent;
Expand All @@ -77,35 +98,6 @@ public static Task PublishJsonAsync(this IModel model, string queueName, object
return Task.CompletedTask;
}

/// <summary>
/// Convenience method to remove consumers from their queue.
/// </summary>
/// <param name="model">The <see cref="IModel"/> used to communicate with Rabbit MQ.</param>
/// <param name="consumerTags">The consumer tags the consumer is registered as.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
public static Task CancelConsumerTagsAsync(this IModel model, IEnumerable<string> consumerTags)
{
foreach (var consumerTag in consumerTags)
{
model.BasicCancel(consumerTag);
}

return Task.CompletedTask;
}

/// <summary>
/// Convenience method to remove consumers from their queue.
/// </summary>
/// <param name="model">The <see cref="IModel"/> used to communicate with Rabbit MQ.</param>
/// <param name="consumerTags">The consumer tags the consumer is registered as.</param>
public static void CancelConsumerTags(this IModel model, IEnumerable<string> consumerTags)
{
foreach (var consumerTag in consumerTags)
{
model.BasicCancel(consumerTag);
}
}

/// <summary>
/// Convenience method to acknowledge a message.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ public ServerTransport(ILogger<ServerTransport<TResponse, TAcknowledge>> logger,

_responseConsumeModel = modelFactory.Create("response handler");
_responseConsumer =
_responseConsumeModel.ConsumeQueue($"{Constants.ResponseQueuePrefix}{relayServerContext.OriginId}");
_responseConsumeModel.ConsumeQueue(_logger, $"{Constants.ResponseQueuePrefix}{relayServerContext.OriginId}");
_responseConsumer.Received += ResponseConsumerReceived;

_acknowledgeConsumeModel = modelFactory.Create("acknowledge handler");
_acknowledgeConsumer =
_acknowledgeConsumeModel.ConsumeQueue($"{Constants.AcknowledgeQueuePrefix}{relayServerContext.OriginId}");
_acknowledgeConsumeModel.ConsumeQueue(_logger, $"{Constants.AcknowledgeQueuePrefix}{relayServerContext.OriginId}");
_acknowledgeConsumer.Received += AcknowledgeConsumerReceived;
}

Expand All @@ -76,9 +76,6 @@ public void Dispose()
_responseConsumer.Received -= ResponseConsumerReceived;
_acknowledgeConsumer.Received -= AcknowledgeConsumerReceived;

_responseConsumeModel.CancelConsumerTags(_responseConsumer.ConsumerTags);
_acknowledgeConsumeModel.CancelConsumerTags(_acknowledgeConsumer.ConsumerTags);

_responseConsumeModel.Dispose();
_acknowledgeConsumeModel.Dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public TenantHandler(ILogger<TenantHandler<TRequest, TAcknowledge>> logger, Guid
if (modelFactory == null) throw new ArgumentNullException(nameof(modelFactory));
_model = modelFactory.Create($"tenant handler for {tenantId} of connection {connectionId}", true);

_consumer = _model.ConsumeQueue($"{Constants.RequestQueuePrefix}{tenantId}", autoDelete: false, autoAck: false);
_consumer = _model.ConsumeQueue(_logger, $"{Constants.RequestQueuePrefix}{tenantId}", autoDelete: false, autoAck: false);
_consumer.Received += ConsumerReceived;
}

Expand All @@ -59,7 +59,6 @@ public void Dispose()
{
_consumer.Received -= ConsumerReceived;

_model.CancelConsumerTags(_consumer.ConsumerTags);
_model.Dispose();
}

Expand Down

0 comments on commit eb9272f

Please sign in to comment.