From 3d567f9558997eb9d7461bffa1d647e780ebda9a Mon Sep 17 00:00:00 2001 From: "Tristan (HouseCat) Hyams" Date: Sat, 16 Mar 2024 12:54:28 -0500 Subject: [PATCH] Making the Channel is dead check customizeable. --- src/HouseofCat.RabbitMQ/Consumer/Consumer.cs | 20 ++++++++++++++----- .../Options/PoolOptions.cs | 10 ++++++++++ 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/src/HouseofCat.RabbitMQ/Consumer/Consumer.cs b/src/HouseofCat.RabbitMQ/Consumer/Consumer.cs index b24fc570..fb0555c7 100644 --- a/src/HouseofCat.RabbitMQ/Consumer/Consumer.cs +++ b/src/HouseofCat.RabbitMQ/Consumer/Consumer.cs @@ -101,6 +101,9 @@ public Consumer(IChannelPool channelPool, ConsumerOptions consumerOptions) Options = channelPool.Options; ChannelPool = channelPool; ConsumerOptions = consumerOptions; + + if (Options.PoolOptions.MaxLastChannelHealthCheck < 1) + { Options.PoolOptions.MaxLastChannelHealthCheck = 1; } } public async Task StartConsumerAsync() @@ -389,6 +392,7 @@ protected virtual async Task HandleUnexpectedShutdownAsync(ShutdownEventArgs e) _shutdownAutoRecoveryLoopCount = 0; await ReviewConnectionHealthInsteadOfChannelHealthAsync(); + break; } await Task.Delay(Options.PoolOptions.SleepOnErrorInterval).ConfigureAwait(false); @@ -414,15 +418,21 @@ protected virtual async Task ReviewConnectionHealthInsteadOfChannelHealthAsync() if (!connectionHealthy) return; // We give a brief sleep to allow the channel to recover one last time while - // the connection state has been confirmed healthy. If it does not recovered by now, + // the connection state has been confirmed healthy. If it has not recovered by now, // we no longer wait. We will stop here until we rebuild RabbitMQ channel which for most // use cases will be immediately. - await Task.Delay(Options.PoolOptions.SleepOnErrorInterval) - .ConfigureAwait(false); + var counter = 0; + var channelHealthy = false; + while (!channelHealthy || counter < Options.PoolOptions.MaxLastChannelHealthCheck) + { + await Task.Delay(Options.PoolOptions.SleepOnErrorInterval) + .ConfigureAwait(false); - var lastHealthyCheck = await _chanHost.ChannelHealthyAsync().ConfigureAwait(false); + channelHealthy = await _chanHost.ChannelHealthyAsync().ConfigureAwait(false); + counter++; + } - if (!lastHealthyCheck) + if (!channelHealthy) { // This is an inner infinite loop, until Channel is healthy/rebuilt. _logger.LogWarning( LogMessages.Consumers.ConsumerChannelReplacedEvent, diff --git a/src/HouseofCat.RabbitMQ/Options/PoolOptions.cs b/src/HouseofCat.RabbitMQ/Options/PoolOptions.cs index 9d633519..4422192b 100644 --- a/src/HouseofCat.RabbitMQ/Options/PoolOptions.cs +++ b/src/HouseofCat.RabbitMQ/Options/PoolOptions.cs @@ -25,6 +25,16 @@ public class PoolOptions /// public ushort MaxAckableChannels { get; set; } = 10; + /// + /// During retry, the library may have determined the Connection is healthy but Channel is still not open. This is the number + /// of times it will attempt to re-check a channel's health before destroying it and creating a new one. Internal AutoRecovery + /// of channels has a known delay from the connection. Each check will sleep once using SleepOnErrorInterval before + /// checking the status of the channel again. + /// Default value is 1. + /// Recommended maximum value is 5. + /// + public ushort MaxLastChannelHealthCheck { get; set; } = 1; + /// /// The time to sleep (in ms) when an error occurs on Channel or Connection creation. It's best not to be hyper aggressive with this value. /// Default value is 1000.