From eb9272f22df01561e10559f8f7e0f261568a0384 Mon Sep 17 00:00:00 2001 From: Thomas Hilzendegen Date: Tue, 22 Aug 2023 14:37:39 +0200 Subject: [PATCH] feat(server): restore lost consumer --- .../.idea/runConfigurations/Relay_Server.xml | 1 - .../ModelExtensions.cs | 64 ++++++++----------- .../ServerTransport.cs | 7 +- .../TenantHandler.cs | 3 +- 4 files changed, 31 insertions(+), 44 deletions(-) diff --git a/src/.idea/.idea.Thinktecture.Relay/.idea/runConfigurations/Relay_Server.xml b/src/.idea/.idea.Thinktecture.Relay/.idea/runConfigurations/Relay_Server.xml index b611e1e32..4e89fecae 100644 --- a/src/.idea/.idea.Thinktecture.Relay/.idea/runConfigurations/Relay_Server.xml +++ b/src/.idea/.idea.Thinktecture.Relay/.idea/runConfigurations/Relay_Server.xml @@ -2,7 +2,6 @@ - \ No newline at end of file diff --git a/src/Thinktecture.Relay.Server.Protocols.RabbitMq/ModelExtensions.cs b/src/Thinktecture.Relay.Server.Protocols.RabbitMq/ModelExtensions.cs index 2fc19b407..52950b94a 100644 --- a/src/Thinktecture.Relay.Server.Protocols.RabbitMq/ModelExtensions.cs +++ b/src/Thinktecture.Relay.Server.Protocols.RabbitMq/ModelExtensions.cs @@ -1,6 +1,6 @@ -using System.Collections.Generic; using System.Text.Json; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; using RabbitMQ.Client; using RabbitMQ.Client.Events; @@ -13,19 +13,41 @@ internal static class ModelExtensions /// consume it with an . /// /// The used to communicate with Rabbit MQ. + /// An . /// The name of the queue. /// The consumer should automatically acknowledge the message. /// The queue should survive a broker restart. /// The queue should be deleted when the last consumer goes away. /// An consuming the queue. - public static AsyncEventingBasicConsumer ConsumeQueue(this IModel model, string queueName, bool autoAck = true, - bool durable = true, - bool autoDelete = true) + public static AsyncEventingBasicConsumer ConsumeQueue(this IModel model, ILogger logger, string queueName, + bool autoAck = true, bool durable = true, bool autoDelete = true) { model.EnsureQueue(queueName, durable, autoDelete); var consumer = new AsyncEventingBasicConsumer(model); - model.BasicConsume(queueName, autoAck, consumer); + var consumerTag = model.BasicConsume(queueName, autoAck, consumer); + + logger.LogTrace("Consuming {QueueName} with consumer {ConsumerTag}", queueName, consumerTag); + + consumer.ConsumerCancelled += (sender, _) => + { + if (((AsyncDefaultBasicConsumer)sender).ShutdownReason == null) + { + logger.LogWarning("Lost consumer {ConsumerTag} on queue {QueueName}", consumerTag, queueName); + + var oldConsumerTag = consumerTag; + + lock (model) + { + model.EnsureQueue(queueName, durable, autoDelete); + consumerTag = model.BasicConsume(queueName, autoAck, consumer); + } + + logger.LogInformation("Restored consumer {ConsumerTag} on queue {QueueName} (was {OldConsumerTag})", consumerTag, queueName, oldConsumerTag); + } + + return Task.CompletedTask; + }; return consumer; } @@ -56,8 +78,7 @@ private static void EnsureQueue(this IModel model, string queueName, bool durabl /// The queue should be deleted when the last consumer goes away. /// A representing the asynchronous operation. public static Task PublishJsonAsync(this IModel model, string queueName, object payload, bool persistent = true, - bool durable = true, - bool autoDelete = true) + bool durable = true, bool autoDelete = true) { var properties = model.CreateBasicProperties(); properties.Persistent = persistent; @@ -77,35 +98,6 @@ public static Task PublishJsonAsync(this IModel model, string queueName, object return Task.CompletedTask; } - /// - /// Convenience method to remove consumers from their queue. - /// - /// The used to communicate with Rabbit MQ. - /// The consumer tags the consumer is registered as. - /// A representing the asynchronous operation. - public static Task CancelConsumerTagsAsync(this IModel model, IEnumerable consumerTags) - { - foreach (var consumerTag in consumerTags) - { - model.BasicCancel(consumerTag); - } - - return Task.CompletedTask; - } - - /// - /// Convenience method to remove consumers from their queue. - /// - /// The used to communicate with Rabbit MQ. - /// The consumer tags the consumer is registered as. - public static void CancelConsumerTags(this IModel model, IEnumerable consumerTags) - { - foreach (var consumerTag in consumerTags) - { - model.BasicCancel(consumerTag); - } - } - /// /// Convenience method to acknowledge a message. /// diff --git a/src/Thinktecture.Relay.Server.Protocols.RabbitMq/ServerTransport.cs b/src/Thinktecture.Relay.Server.Protocols.RabbitMq/ServerTransport.cs index 2a2a49aff..93b254e7c 100644 --- a/src/Thinktecture.Relay.Server.Protocols.RabbitMq/ServerTransport.cs +++ b/src/Thinktecture.Relay.Server.Protocols.RabbitMq/ServerTransport.cs @@ -58,12 +58,12 @@ public ServerTransport(ILogger> logger, _responseConsumeModel = modelFactory.Create("response handler"); _responseConsumer = - _responseConsumeModel.ConsumeQueue($"{Constants.ResponseQueuePrefix}{relayServerContext.OriginId}"); + _responseConsumeModel.ConsumeQueue(_logger, $"{Constants.ResponseQueuePrefix}{relayServerContext.OriginId}"); _responseConsumer.Received += ResponseConsumerReceived; _acknowledgeConsumeModel = modelFactory.Create("acknowledge handler"); _acknowledgeConsumer = - _acknowledgeConsumeModel.ConsumeQueue($"{Constants.AcknowledgeQueuePrefix}{relayServerContext.OriginId}"); + _acknowledgeConsumeModel.ConsumeQueue(_logger, $"{Constants.AcknowledgeQueuePrefix}{relayServerContext.OriginId}"); _acknowledgeConsumer.Received += AcknowledgeConsumerReceived; } @@ -76,9 +76,6 @@ public void Dispose() _responseConsumer.Received -= ResponseConsumerReceived; _acknowledgeConsumer.Received -= AcknowledgeConsumerReceived; - _responseConsumeModel.CancelConsumerTags(_responseConsumer.ConsumerTags); - _acknowledgeConsumeModel.CancelConsumerTags(_acknowledgeConsumer.ConsumerTags); - _responseConsumeModel.Dispose(); _acknowledgeConsumeModel.Dispose(); } diff --git a/src/Thinktecture.Relay.Server.Protocols.RabbitMq/TenantHandler.cs b/src/Thinktecture.Relay.Server.Protocols.RabbitMq/TenantHandler.cs index 1aab11842..4f94470c5 100644 --- a/src/Thinktecture.Relay.Server.Protocols.RabbitMq/TenantHandler.cs +++ b/src/Thinktecture.Relay.Server.Protocols.RabbitMq/TenantHandler.cs @@ -50,7 +50,7 @@ public TenantHandler(ILogger> logger, Guid if (modelFactory == null) throw new ArgumentNullException(nameof(modelFactory)); _model = modelFactory.Create($"tenant handler for {tenantId} of connection {connectionId}", true); - _consumer = _model.ConsumeQueue($"{Constants.RequestQueuePrefix}{tenantId}", autoDelete: false, autoAck: false); + _consumer = _model.ConsumeQueue(_logger, $"{Constants.RequestQueuePrefix}{tenantId}", autoDelete: false, autoAck: false); _consumer.Received += ConsumerReceived; } @@ -59,7 +59,6 @@ public void Dispose() { _consumer.Received -= ConsumerReceived; - _model.CancelConsumerTags(_consumer.ConsumerTags); _model.Dispose(); }