From f6b825e76225b9961a33f4bbc2ff22a374b46ea7 Mon Sep 17 00:00:00 2001 From: "Tristan (HouseCat) Hyams" Date: Wed, 24 Apr 2024 11:45:50 -0500 Subject: [PATCH] Consolidating logs with classes using them (removing them from Constants). --- src/HouseofCat.RabbitMQ/Constants.cs | 114 ------------------ src/HouseofCat.RabbitMQ/Consumer/Consumer.cs | 58 ++++++--- .../Options/RabbitOptions.cs | 4 +- src/HouseofCat.RabbitMQ/Pools/ChannelHost.cs | 33 +++-- src/HouseofCat.RabbitMQ/Pools/ChannelPool.cs | 53 +++++--- .../Pools/ConnectionPool.cs | 84 +++++++------ .../Publisher/Publisher.cs | 44 ++++--- .../Services/RabbitService.cs | 6 +- 8 files changed, 180 insertions(+), 216 deletions(-) diff --git a/src/HouseofCat.RabbitMQ/Constants.cs b/src/HouseofCat.RabbitMQ/Constants.cs index fdadc56..d91c9a8 100644 --- a/src/HouseofCat.RabbitMQ/Constants.cs +++ b/src/HouseofCat.RabbitMQ/Constants.cs @@ -53,117 +53,3 @@ public static class Constants public static string MessagingMessageCompressedKey { get; set; } = "messaging.rabbitmq.message.compressed"; public static string MessagingMessageCompressionKey { get; set; } = "messaging.rabbitmq.message.compression"; } - -public static class ExceptionMessages -{ - // AutoPublisher - public const string AutoPublisherNotStartedError = "AutoPublisher has not been started."; - - // General - public const string QueueChannelError = "Can't queue a message to a closed Threading.Channel."; - - public const string ChannelReadErrorMessage = "Can't use reader on a closed Threading.Channel."; - public const string NoConsumerOptionsMessage = "Consumer {0} not found in Consumers dictionary."; - public const string NoConsumerPipelineOptionsMessage = "ConsumerPipeline {0} not found in ConsumerPipelineOptions dictionary."; - - public const string ValidationMessage = "ConnectionPool is not initialized or is shutdown."; - public const string ShutdownValidationMessage = "ConnectionPool is not initialized. Can't be Shutdown."; - public const string GetConnectionErrorMessage = "Threading.Channel buffer used for reading RabbitMQ connections has been closed."; - - public const string ChannelPoolNotInitializedMessage = "ChannelPool is not usable until it has been initialized."; - public const string EncrypConfigErrorMessage = "Encryption can't be enabled without a HashKey (32-byte length)."; - - // ChannelPool - public const string ChannelPoolValidationMessage = "ChannelPool is not initialized or is shutdown."; - public const string ChannelPoolShutdownValidationMessage = "ChannelPool is not initialized. Can't be Shutdown."; - public const string ChannelPoolBadOptionMaxChannelError = "Check your PoolOptions, MaxChannels was less than 1."; - public const string ChannelPoolBadOptionMaxAckChannelError = "Check your PoolOptions, MaxAckableChannels was less than 1."; - public const string ChannelPoolGetChannelError = "Threading.Channel used for reading RabbitMQ channels has been closed."; - - // Pipeline Messages - public const string NotFinalized = "Pipeline is not ready for receiving work as it has not been finalized yet."; - public const string AlreadyFinalized = "Pipeline is already finalized and ready for use."; - public const string CantFinalize = "Pipeline can't finalize as no steps have been added."; - public const string InvalidAddError = "Pipeline is already finalized and you can no longer add steps."; - - public const string ChainingImpossible = "Pipelines can't be chained together as one, or both, pipelines have been finalized."; - public const string ChainingNotMatched = "Pipelines can't be chained together as the last step function and the first step function don't align with type input or asynchronicity."; - public const string NothingToChain = "Pipelines can't be chained together as one, or both, pipelines have no steps."; - - public const string InvalidStepFound = "Pipeline can't chain the last step to this new step. Unexpected type found on the previous step."; -} - -public static class LogMessages -{ - public static class ChannelHosts - { - public const string FlowControlled = "ChannelHost [Id: {0}] - Flow control event has triggered."; - public const string FlowControlFinished = "ChannelHost [Id: {0}] - Flow control event has resolved itself."; - public const string ConsumerStartedConsumer = "ChannelHost [Id: {0}] - Starting consuming. ConsumerTag: [{1}]"; - public const string ConsumerStopConsumer = "ChannelHost [Id: {0}] - Stopping consuming using ConsumerTag: [{1}]"; - public const string ConsumerStopConsumerError = "ChannelHost [Id: {0}] - Error stopping consuming using ConsumerTag: [{1}]"; - } - - public static class ConnectionPools - { - public const string CreateConnections = "ConnectionPool creating Connections..."; - public const string CreateConnectionsComplete = "ConnectionPool initialized."; - public const string CreateConnectionException = "Connection [{0}] failed to be created."; - public const string Shutdown = "ConnectionPool shutdown was called."; - public const string ShutdownComplete = "ConnectionPool shutdown complete."; - } - - public static class ChannelPools - { - public const string Initialization = "ChannelPool initialize call was made."; - public const string InitializationComplete = "ChannelPool initialized."; - public const string ChannelHasIssues = "ChannelHost [Id: {0}] was detected to have issues. Attempting to repair..."; - public const string CreateChannel = "ChannelHost [Id: {0}] create loop is executing an iteration..."; - public const string CreateChannelFailedConnection = "The ChannelHost [Id: {0}] failed because Connection is unhealthy."; - public const string CreateChannelFailedConstruction = "The ChannelHost [Id: {0}] failed because ChannelHost construction threw exception."; - public const string CreateChannelSleep = "The ChannelHost [Id: {0}] create loop iteration failed. Sleeping..."; - public const string CreateChannelSuccess = "The ChannelHost [Id: {0}] create loop finished. Channel restored and flags removed."; - public const string ReturningChannel = "The ChannelHost [Id: {0}] was returned to the pool. Flagged? {1}"; - public const string Shutdown = "ChannelPool shutdown was called."; - public const string ShutdownComplete = "ChannelPool shutdown complete."; - } - - public static class Publishers - { - public const string PublishFailed = "Publish to route [{0}] failed, flagging channel host. Error: {1}"; - public const string PublishMessageFailed = "Publish to route [{0}] failed [MessageId: {1}] flagging channel host. Error: {2}"; - public const string PublishBatchFailed = "Batch publish failed, flagging channel host. Error: {0}"; - } - - public static class AutoPublishers - { - public const string MessageQueued = "AutoPublisher queued message [MessageId:{0} InternalId:{1}]."; - public const string MessagePublished = "AutoPublisher published message [MessageId:{0} InternalId:{1}]. Listen for receipt to indicate success..."; - } - - public static class Consumers - { - public const string StartingConsumer = "Consumer [{0}] starting..."; - public const string StartingConsumerError = "Exception creating internal RabbitMQ consumer. Retrying..."; - public const string StartedConsumer = "Consumer [{0}] started."; - public const string StartingConsumerLoop = "Consumer [{0}] startup loop executing..."; - public const string Started = "Consumer [{0}] started."; - public const string StopConsumer = "Consumer [{0}] stop consuming called..."; - public const string StoppedConsumer = "Consumer [{0}] stopped consuming."; - public const string GettingTransientChannelHost = "Consumer [{0}] getting a transient channel."; - public const string GettingChannelHost = "Consumer [{0}] getting a channel."; - public const string GettingAckChannelHost = "Consumer [{0}] getting a ackable channel host."; - public const string ChannelEstablished = "Consumer [{0}] channel host [{1}] assigned."; - public const string ChannelNotEstablished = "Consumer [{0}] channel host could not be assigned."; - public const string ConsumerMessageReceived = "Consumer [{0}] message received (DT:{1}]. Adding to buffer..."; - public const string ConsumerAsyncMessageReceived = "Consumer [{0}] async message received (DT:{1}]. Adding to buffer..."; - public const string ConsumerShutdownEvent = "Consumer [{0}] recoverable shutdown event has occurred on Channel [Id: {1}]. Reason: {2}. Attempting to restart consuming..."; - public const string ConsumerShutdownEventFinished = "Consumer [{0}] shutdown event has finished on Channel [Id: {1}]."; - public const string ConsumerChannelReplacedEvent = "Consumer [{0}] recoverable shutdown event is ongoing. Connection is healthy. Channel appears to be dead and replacing it..."; - public const string ConsumerMessageWriteToBufferError = "Consumer [{0}] was unable to write to channel buffer. Error: {1}"; - - public const string ConsumerDataflowActionCancelled = "Consumer [{0}] dataflow engine actions were cancelled."; - public const string ConsumerDataflowError = "Consumer [{0}] dataflow engine encountered an error. Error: {1}"; - public const string ConsumerDataflowQueueing = "Consumer [{0}] dataflow engine queueing unit of work [receivedMessage:DT:{1}]."; - } -} diff --git a/src/HouseofCat.RabbitMQ/Consumer/Consumer.cs b/src/HouseofCat.RabbitMQ/Consumer/Consumer.cs index b1d61e2..e3b345d 100644 --- a/src/HouseofCat.RabbitMQ/Consumer/Consumer.cs +++ b/src/HouseofCat.RabbitMQ/Consumer/Consumer.cs @@ -13,7 +13,6 @@ using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; -using static HouseofCat.RabbitMQ.LogMessages; namespace HouseofCat.RabbitMQ; @@ -88,6 +87,8 @@ public Consumer( _defaultOptions.Converters.Add(new FlexibleObjectJsonConverter()); } + private static readonly string _startingConsumerLoop = "Consumer [{0}] startup loop executing..."; + public async Task StartConsumerAsync() { if (!await _conLock.WaitAsync(0).ConfigureAwait(false)) return; @@ -107,7 +108,7 @@ public async Task StartConsumerAsync() var success = false; while (!success) { - _logger.LogTrace(Consumers.StartingConsumerLoop, ConsumerOptions.ConsumerName); + _logger.LogTrace(_startingConsumerLoop, ConsumerOptions.ConsumerName); success = await StartConsumingAsync().ConfigureAwait(false); if (!success) { await Task.Delay(Options.PoolOptions.SleepOnErrorInterval); } @@ -119,6 +120,9 @@ public async Task StartConsumerAsync() finally { _conLock.Release(); } } + private static readonly string _stopConsumer = "Consumer [{0}] stop consuming called..."; + private static readonly string _stoppedConsumer = "Consumer [{0}] stopped consuming."; + public async Task StopConsumerAsync(bool immediate = false) { if (!await _conLock.WaitAsync(0).ConfigureAwait(false)) return; @@ -128,7 +132,7 @@ public async Task StopConsumerAsync(bool immediate = false) if (Started) { _shutdown = true; - _logger.LogInformation(Consumers.StopConsumer, ConsumerOptions.ConsumerName); + _logger.LogInformation(_stopConsumer, ConsumerOptions.ConsumerName); _consumerChannel.Writer.Complete(); @@ -147,7 +151,7 @@ await _consumerChannel _cts.Cancel(); Started = false; _logger.LogInformation( - Consumers.StoppedConsumer, + _stoppedConsumer, ConsumerOptions.ConsumerName); } } @@ -156,15 +160,18 @@ await _consumerChannel protected AsyncEventingBasicConsumer _asyncConsumer; protected EventingBasicConsumer _consumer; - protected CancellationTokenSource _cts; + private static readonly string _startingConsumer = "Consumer [{0}] starting..."; + private static readonly string _startingConsumerError = "Exception creating internal RabbitMQ consumer. Retrying..."; + private static readonly string _startedConsumer = "Consumer [{0}] started."; + protected async Task StartConsumingAsync() { if (_shutdown) return false; _logger.LogInformation( - Consumers.StartingConsumer, + _startingConsumer, ConsumerOptions.ConsumerName); if (!_chanHost.ChannelHealthy()) @@ -194,7 +201,7 @@ await _chanHost.WaitUntilChannelIsReadyAsync( } catch (Exception ex) { - _logger.LogError(ex, Consumers.StartingConsumerError); + _logger.LogError(ex, _startingConsumerError); return false; } } @@ -213,29 +220,33 @@ await _chanHost.WaitUntilChannelIsReadyAsync( } catch (Exception ex) { - _logger.LogError(ex, Consumers.StartingConsumerError); + _logger.LogError(ex, _startingConsumerError); return false; } } _cts = new CancellationTokenSource(); _logger.LogInformation( - Consumers.StartedConsumer, + _startedConsumer, ConsumerOptions.ConsumerName); return true; } + private static readonly string _gettingTransientChannelHost = "Consumer [{0}] getting a transient channel."; + private static readonly string _channelEstablished = "Consumer [{0}] channel host [{1}] assigned."; + + protected virtual async Task SetChannelHostAsync() { - _logger.LogTrace(Consumers.GettingTransientChannelHost, ConsumerOptions.ConsumerName); + _logger.LogTrace(_gettingTransientChannelHost, ConsumerOptions.ConsumerName); _chanHost = await ChannelPool .GetTransientChannelAsync(!ConsumerOptions.AutoAck) .ConfigureAwait(false); _logger.LogDebug( - Consumers.ChannelEstablished, + _channelEstablished, ConsumerOptions.ConsumerName, _chanHost?.ChannelId.ToString() ?? "ChannelHost: null"); } @@ -253,10 +264,12 @@ private EventingBasicConsumer CreateConsumer() return consumer; } + private static readonly string _consumerMessageReceived = "Consumer [{0}] message received (DT:{1}]. Adding to buffer..."; + protected virtual async void ReceiveHandler(object _, BasicDeliverEventArgs bdea) { _logger.LogDebug( - Consumers.ConsumerMessageReceived, + _consumerMessageReceived, ConsumerOptions.ConsumerName, bdea.DeliveryTag); @@ -297,13 +310,15 @@ protected AsyncEventingBasicConsumer CreateAsyncConsumer() protected virtual async Task ReceiveHandlerAsync(object _, BasicDeliverEventArgs bdea) { _logger.LogDebug( - Consumers.ConsumerAsyncMessageReceived, + _consumerMessageReceived, ConsumerOptions.ConsumerName, bdea.DeliveryTag); await HandleMessageAsync(bdea).ConfigureAwait(false); } + private static readonly string _consumerMessageWriteToBufferError = "Consumer [{0}] was unable to write to channel buffer. Error: {1}"; + private static readonly string _consumerSpanNameFormat = "messaging.rabbitmq.consumer receive"; protected virtual async ValueTask HandleMessageAsync(BasicDeliverEventArgs bdea) @@ -335,7 +350,7 @@ await _consumerChannel catch (Exception ex) { _logger.LogError( - Consumers.ConsumerMessageWriteToBufferError, + _consumerMessageWriteToBufferError, ConsumerOptions.ConsumerName, ex.Message); return false; @@ -414,6 +429,8 @@ protected virtual void AutoDeserialize(ReceivedMessage receivedMessage) } } + private static readonly string _consumerShutdownEvent = "Consumer [{0}] recoverable shutdown event has occurred on Channel [Id: {1}]. Reason: {2}. Attempting to restart consuming..."; + protected async Task ConsumerShutdownAsync(object sender, ShutdownEventArgs e) { if (!await _conLock.WaitAsync(0).ConfigureAwait(false)) return; @@ -423,7 +440,7 @@ protected async Task ConsumerShutdownAsync(object sender, ShutdownEventArgs e) if (!_shutdown) { _logger.LogInformation( - Consumers.ConsumerShutdownEvent, + _consumerShutdownEvent, ConsumerOptions.ConsumerName, _chanHost.ChannelId, e.ReplyText); @@ -438,7 +455,8 @@ await HandleRecoverableShutdownAsync(e) { _conLock.Release(); } } - protected static readonly string _consumerShutdownExceptionMessage = "Consumer's ChannelHost {0} had an unhandled exception during recovery."; + private static readonly string _consumerShutdownExceptionMessage = "Consumer's ChannelHost {0} had an unhandled exception during recovery."; + private static readonly string _consumerShutdownEventFinished = "Consumer [{0}] shutdown event has finished on Channel [Id: {1}]."; protected virtual async Task HandleRecoverableShutdownAsync(ShutdownEventArgs e) { @@ -456,7 +474,7 @@ await _chanHost } _logger.LogInformation( - Consumers.ConsumerShutdownEventFinished, + _consumerShutdownEventFinished, ConsumerOptions.ConsumerName, _chanHost.ChannelId); } @@ -465,7 +483,7 @@ await _chanHost public async ValueTask ReadAsync() { - if (!await _consumerChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) throw new InvalidOperationException(ExceptionMessages.ChannelReadErrorMessage); + if (!await _consumerChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) throw new InvalidOperationException(); return await _consumerChannel .Reader @@ -475,7 +493,7 @@ public async ValueTask ReadAsync() public async Task> ReadUntilEmptyAsync(CancellationToken token = default) { - if (!await _consumerChannel.Reader.WaitToReadAsync(token).ConfigureAwait(false)) throw new InvalidOperationException(ExceptionMessages.ChannelReadErrorMessage); + if (!await _consumerChannel.Reader.WaitToReadAsync(token).ConfigureAwait(false)) throw new InvalidOperationException(); var list = new List(); await _consumerChannel.Reader.WaitToReadAsync(token).ConfigureAwait(false); @@ -490,7 +508,7 @@ public async Task> ReadUntilEmptyAsync(Cancellatio public async ValueTask> ReadUntilStopAsync(CancellationToken token = default) { - if (!await _consumerChannel.Reader.WaitToReadAsync(token).ConfigureAwait(false)) throw new InvalidOperationException(ExceptionMessages.ChannelReadErrorMessage); + if (!await _consumerChannel.Reader.WaitToReadAsync(token).ConfigureAwait(false)) throw new InvalidOperationException(); return _consumerChannel.Reader.ReadAllAsync(token); } diff --git a/src/HouseofCat.RabbitMQ/Options/RabbitOptions.cs b/src/HouseofCat.RabbitMQ/Options/RabbitOptions.cs index f5ec682..b85a65b 100644 --- a/src/HouseofCat.RabbitMQ/Options/RabbitOptions.cs +++ b/src/HouseofCat.RabbitMQ/Options/RabbitOptions.cs @@ -20,12 +20,14 @@ public sealed class RabbitOptions /// public IDictionary ConsumerOptions { get; set; } = new Dictionary(); + private static readonly string _noConsumerOptionsMessage = "Consumer {0} not found in Consumers dictionary."; + public ConsumerOptions GetConsumerOptions(string consumerName) { if (ConsumerOptions.TryGetValue(consumerName, out ConsumerOptions value)) { return value; } - throw new ArgumentException(string.Format(ExceptionMessages.NoConsumerOptionsMessage, consumerName)); + throw new ArgumentException(string.Format(_noConsumerOptionsMessage, consumerName)); } } diff --git a/src/HouseofCat.RabbitMQ/Pools/ChannelHost.cs b/src/HouseofCat.RabbitMQ/Pools/ChannelHost.cs index e38f2f0..248ca57 100644 --- a/src/HouseofCat.RabbitMQ/Pools/ChannelHost.cs +++ b/src/HouseofCat.RabbitMQ/Pools/ChannelHost.cs @@ -28,6 +28,7 @@ public interface IChannelHost void Close(); bool ChannelHealthy(); bool ConnectionHealthy(); + string DisplayId(); } public class ChannelHost : IChannelHost, IDisposable @@ -145,16 +146,19 @@ public async Task BuildRabbitMQChannelAsync(int autoRecoveryDelay = 1000, protected virtual void ChannelClose(object sender, ShutdownEventArgs e) { - _logger.LogDebug(e.ReplyText); + _logger.LogInformation(e.ReplyText); Closed = true; } + private static readonly string _flowControlled = "ChannelHost [Id: {0}] - Flow control event has triggered."; + private static readonly string _flowControlFinished = "ChannelHost [Id: {0}] - Flow control event has resolved itself."; + protected virtual void FlowControl(object sender, FlowControlEventArgs e) { if (e.Active) - { _logger.LogWarning(LogMessages.ChannelHosts.FlowControlled, ChannelId); } + { _logger.LogWarning(_flowControlled, ChannelId); } else - { _logger.LogInformation(LogMessages.ChannelHosts.FlowControlFinished, ChannelId); } + { _logger.LogInformation(_flowControlFinished, ChannelId); } FlowControlled = e.Active; } @@ -173,6 +177,8 @@ public bool ConnectionHealthy() private string _consumerTag; + private static readonly string _consumerStartedConsumer = "ChannelHost [Id: {0}] - Starting consuming. ConsumerTag: [{1}]"; + public string StartConsuming(IBasicConsumer internalConsumer, ConsumerOptions options) { Guard.AgainstNull(options, nameof(options)); @@ -189,13 +195,16 @@ public string StartConsuming(IBasicConsumer internalConsumer, ConsumerOptions op null, internalConsumer); - _logger.LogDebug(LogMessages.ChannelHosts.ConsumerStartedConsumer, ChannelId, _consumerTag); + _logger.LogDebug(_consumerStartedConsumer, ChannelId, _consumerTag); UsedByConsumer = true; return _consumerTag; } + private static readonly string _consumerStopConsumer = "ChannelHost [Id: {0}] - Stopping consuming using ConsumerTag: [{1}]"; + private static readonly string _consumerStopConsumerError = "ChannelHost [Id: {0}] - Error stopping consuming using ConsumerTag: [{1}]"; + public void StopConsuming() { if (string.IsNullOrEmpty(_consumerTag) || !UsedByConsumer) return; @@ -204,31 +213,35 @@ public void StopConsuming() { if (ChannelHealthy()) { - _logger.LogInformation(LogMessages.ChannelHosts.ConsumerStopConsumer, ChannelId, _consumerTag); + _logger.LogInformation(_consumerStopConsumer, ChannelId, _consumerTag); Channel.BasicCancel(_consumerTag); } } catch (Exception ex) { - _logger.LogError(ex, LogMessages.ChannelHosts.ConsumerStopConsumerError, ChannelId, _consumerTag); + _logger.LogError(ex, _consumerStopConsumerError, ChannelId, _consumerTag); } } + public string DisplayId() + { + return $"{_connHost.Connection.ClientProvidedName}:{Channel.ChannelNumber}"; + } + private const int CloseCode = 200; - private const string CloseMessage = "HouseofCat.RabbitMQ manual close initiated for Channelhost [Id: {0} - [Conn: {1} - Chan: {2}]]."; + private static readonly string _closeMessage = "HouseofCat.RabbitMQ manual close initiated for ChannelHost [Id: {0}] - [{1}]."; public void Close() { try { - _logger.LogInformation(CloseMessage, ChannelId, _connHost.Connection.ClientProvidedName, Channel.ChannelNumber); Channel.Close( CloseCode, - string.Format(CloseMessage, ChannelId, Channel.ChannelNumber)); + string.Format(_closeMessage, ChannelId, DisplayId())); } catch { /* SWALLOW */ } } - + private bool _disposedValue; protected virtual void Dispose(bool disposing) diff --git a/src/HouseofCat.RabbitMQ/Pools/ChannelPool.cs b/src/HouseofCat.RabbitMQ/Pools/ChannelPool.cs index 4e00358..aeb5ae1 100644 --- a/src/HouseofCat.RabbitMQ/Pools/ChannelPool.cs +++ b/src/HouseofCat.RabbitMQ/Pools/ChannelPool.cs @@ -120,6 +120,11 @@ await _ackChannels } } + private static readonly string _channelHasIssues = "ChannelHost [Id: {0}] was detected to have issues. Attempting to repair..."; + private static readonly string _channelPoolNotInitialized = "ChannelPool is not initialized or is shutdown."; + private static readonly string _channelPoolBadOptionChannelError = "Check your PoolOptions, Channels value maybe less than 1."; + private static readonly string _channelPoolGetChannelError = "Threading.Channel used for reading RabbitMQ channels has been closed."; + /// /// This pulls a out of the for usage. /// If the was previously flagged on error, multi-attempta to recreate it before returning an open channel back to the user. @@ -131,7 +136,7 @@ await _ackChannels [MethodImpl(MethodImplOptions.AggressiveInlining)] public async Task GetChannelAsync() { - if (Shutdown) throw new InvalidOperationException(ExceptionMessages.ChannelPoolValidationMessage); + if (Shutdown) throw new InvalidOperationException(_channelPoolNotInitialized); if (Options.PoolOptions.OnlyTransientChannels) { @@ -141,7 +146,7 @@ public async Task GetChannelAsync() if (_channels == null) { - throw new InvalidOperationException(ExceptionMessages.ChannelPoolBadOptionMaxChannelError); + throw new InvalidOperationException(_channelPoolBadOptionChannelError); } if (!await _channels @@ -149,7 +154,7 @@ public async Task GetChannelAsync() .WaitToReadAsync() .ConfigureAwait(false)) { - throw new InvalidOperationException(ExceptionMessages.ChannelPoolGetChannelError); + throw new InvalidOperationException(_channelPoolGetChannelError); } var chanHost = await _channels @@ -161,7 +166,7 @@ public async Task GetChannelAsync() var flagged = _flaggedChannels.ContainsKey(chanHost.ChannelId) && _flaggedChannels[chanHost.ChannelId]; if (flagged || !healthy) { - _logger.LogWarning(LogMessages.ChannelPools.ChannelHasIssues, chanHost.ChannelId); + _logger.LogWarning(_channelHasIssues, chanHost.ChannelId); try { await chanHost.WaitUntilChannelIsReadyAsync(Options.PoolOptions.SleepOnErrorInterval, _cts.Token); } @@ -185,6 +190,8 @@ await Task return chanHost; } + private static readonly string _channelPoolBadOptionAckChannelError = "Check your PoolOptions, AckChannels value maybe less than 1."; + /// /// This pulls an ackable out of the for usage. /// If the was previously flagged on error, multi-attempta to recreate it before returning an open channel back to the user. @@ -196,7 +203,7 @@ await Task [MethodImpl(MethodImplOptions.AggressiveInlining)] public async Task GetAckChannelAsync() { - if (Shutdown) throw new InvalidOperationException(ExceptionMessages.ChannelPoolValidationMessage); + if (Shutdown) throw new InvalidOperationException(_channelPoolNotInitialized); if (Options.PoolOptions.OnlyTransientChannels) { @@ -206,7 +213,7 @@ public async Task GetAckChannelAsync() if (_ackChannels == null) { - throw new InvalidOperationException(ExceptionMessages.ChannelPoolBadOptionMaxAckChannelError); + throw new InvalidOperationException(_channelPoolBadOptionAckChannelError); } if (!await _ackChannels @@ -214,7 +221,7 @@ public async Task GetAckChannelAsync() .WaitToReadAsync() .ConfigureAwait(false)) { - throw new InvalidOperationException(ExceptionMessages.ChannelPoolGetChannelError); + throw new InvalidOperationException(_channelPoolGetChannelError); } var chanHost = await _ackChannels @@ -226,7 +233,7 @@ public async Task GetAckChannelAsync() var flagged = _flaggedChannels.ContainsKey(chanHost.ChannelId) && _flaggedChannels[chanHost.ChannelId]; if (flagged || !healthy) { - _logger.LogWarning(LogMessages.ChannelPools.ChannelHasIssues, chanHost.ChannelId); + _logger.LogWarning(_channelHasIssues, chanHost.ChannelId); await chanHost.WaitUntilChannelIsReadyAsync(Options.PoolOptions.SleepOnErrorInterval, _cts.Token); } @@ -261,6 +268,11 @@ await Task [MethodImpl(MethodImplOptions.AggressiveInlining)] public async Task GetTransientChannelAsync(bool ackable) => await CreateChannelAsync(GetNextTransientChannelId(), ackable).ConfigureAwait(false); + private static readonly string _createChannel = "ChannelHost [Id: {0}] create loop is executing an iteration..."; + private static readonly string _createChannelFailedConnection = "The ChannelHost [Id: {0}] failed because Connection is unhealthy."; + private static readonly string _createChannelSuccess = "The ChannelHost [Id: {0}] create loop finished. Channel restored and flags removed."; + private static readonly string _createChannelFailedConstruction = "The ChannelHost [Id: {0}] failed because ChannelHost construction threw exception."; + [MethodImpl(MethodImplOptions.AggressiveInlining)] private async Task CreateChannelAsync(ulong channelId, bool ackable) { @@ -268,14 +280,14 @@ private async Task CreateChannelAsync(ulong channelId, bool ackabl while (true) { - _logger.LogTrace(LogMessages.ChannelPools.CreateChannel, channelId); + _logger.LogTrace(_createChannel, channelId); // Get ConnectionHost try { connHost = await _connectionPool.GetConnectionAsync().ConfigureAwait(false); } catch { - _logger.LogTrace(LogMessages.ChannelPools.CreateChannelFailedConnection, channelId); + _logger.LogTrace(_createChannelFailedConnection, channelId); await ReturnConnectionWithOptionalSleep(connHost, channelId, Options.PoolOptions.SleepOnErrorInterval).ConfigureAwait(false); continue; } @@ -286,18 +298,20 @@ private async Task CreateChannelAsync(ulong channelId, bool ackabl var chanHost = new ChannelHost(channelId, connHost, ackable); await ReturnConnectionWithOptionalSleep(connHost, channelId, 0).ConfigureAwait(false); _flaggedChannels[chanHost.ChannelId] = false; - _logger.LogDebug(LogMessages.ChannelPools.CreateChannelSuccess, channelId); + _logger.LogDebug(_createChannelSuccess, channelId); return chanHost; } catch { - _logger.LogTrace(LogMessages.ChannelPools.CreateChannelFailedConstruction, channelId); + _logger.LogTrace(_createChannelFailedConstruction, channelId); await ReturnConnectionWithOptionalSleep(connHost, channelId, Options.PoolOptions.SleepOnErrorInterval).ConfigureAwait(false); } } } + private static readonly string _createChannelSleep = "The ChannelHost [Id: {0}] create loop iteration failed. Sleeping..."; + private async Task ReturnConnectionWithOptionalSleep(IConnectionHost connHost, ulong channelId, int sleep) { if (connHost != null) @@ -305,7 +319,7 @@ private async Task ReturnConnectionWithOptionalSleep(IConnectionHost connHost, u if (sleep > 0) { - _logger.LogDebug(LogMessages.ChannelPools.CreateChannelSleep, channelId); + _logger.LogDebug(_createChannelSleep, channelId); await Task .Delay(sleep) @@ -313,6 +327,8 @@ await Task } } + private static readonly string _returningChannel = "The ChannelHost [Id: {0}] was returned to the pool. Flagged? {1}"; + /// /// Returns the back to the . /// All Aqmp IModel Channels close server side on error, so you have to indicate to the library when that happens. @@ -324,11 +340,11 @@ await Task [MethodImpl(MethodImplOptions.AggressiveInlining)] public async ValueTask ReturnChannelAsync(IChannelHost chanHost, bool flagChannel = false) { - if (Shutdown) throw new InvalidOperationException(ExceptionMessages.ChannelPoolValidationMessage); + if (Shutdown) throw new InvalidOperationException(_channelPoolNotInitialized); _flaggedChannels[chanHost.ChannelId] = flagChannel; - _logger.LogDebug(LogMessages.ChannelPools.ReturningChannel, chanHost.ChannelId, flagChannel); + _logger.LogDebug(_returningChannel, chanHost.ChannelId, flagChannel); if (chanHost.Ackable) { @@ -346,9 +362,12 @@ await _channels } } + private static readonly string _shutdown = "ChannelPool shutdown was called."; + private static readonly string _shutdownComplete = "ChannelPool shutdown complete."; + public async Task ShutdownAsync() { - _logger.LogTrace(LogMessages.ChannelPools.Shutdown); + _logger.LogTrace(_shutdown); await _poolLock .WaitAsync() @@ -369,7 +388,7 @@ await _connectionPool } _poolLock.Release(); - _logger.LogTrace(LogMessages.ChannelPools.ShutdownComplete); + _logger.LogTrace(_shutdownComplete); } private async Task CloseChannelsAsync() diff --git a/src/HouseofCat.RabbitMQ/Pools/ConnectionPool.cs b/src/HouseofCat.RabbitMQ/Pools/ConnectionPool.cs index 950ade2..cf976d0 100644 --- a/src/HouseofCat.RabbitMQ/Pools/ConnectionPool.cs +++ b/src/HouseofCat.RabbitMQ/Pools/ConnectionPool.cs @@ -53,7 +53,7 @@ public ConnectionPool(RabbitOptions options) else { _connectionFactory = BuildConnectionFactory(); } - CreateConnectionsAsync().GetAwaiter().GetResult(); + CreatePoolConnectionsAsync().GetAwaiter().GetResult(); } public ConnectionPool(RabbitOptions options, HttpClientHandler oauth2ClientHandler) : this(options) @@ -143,31 +143,43 @@ protected virtual ConnectionFactory BuildConnectionFactory(RabbitOptions options protected virtual IConnectionHost CreateConnectionHost(ulong connectionId, IConnection connection) => new ConnectionHost(connectionId, connection); - private async Task CreateConnectionsAsync() + private static readonly string _createConnections = "ConnectionPool creating connections..."; + private static readonly string _createConnectionException = "Connection [{0}] failed to be created."; + private static readonly string _createConnectionsComplete = "ConnectionPool initialized."; + + private async Task CreatePoolConnectionsAsync() { - _logger.LogTrace(LogMessages.ConnectionPools.CreateConnections); + if (!await _poolLock.WaitAsync(0).ConfigureAwait(false)) return; - for (var i = 0; i < Options.PoolOptions.Connections; i++) + try { - var serviceName = string.IsNullOrEmpty(Options.PoolOptions.ServiceName) - ? $"HoC.RabbitMQ:{i}" - : $"{Options.PoolOptions.ServiceName}:{i}"; - try - { - await _connections - .Writer - .WriteAsync(CreateConnectionHost(_currentConnectionId++, CreateConnection(serviceName))); - } - catch (Exception ex) + _logger.LogTrace(_createConnections); + + for (var i = 0; i < Options.PoolOptions.Connections; i++) { - _logger.LogError(ex, LogMessages.ConnectionPools.CreateConnectionException, serviceName); - throw; // Non Optional Throw + var serviceName = string.IsNullOrEmpty(Options.PoolOptions.ServiceName) + ? $"HoC.RabbitMQ:{i}" + : $"{Options.PoolOptions.ServiceName}:{i}"; + try + { + await _connections + .Writer + .WriteAsync(CreateConnectionHost(_currentConnectionId++, CreateConnection(serviceName))); + } + catch (Exception ex) + { + _logger.LogError(ex, _createConnectionException, serviceName); + throw; // Non Optional Throw + } } - } - _logger.LogTrace(LogMessages.ConnectionPools.CreateConnectionsComplete); + _logger.LogTrace(_createConnectionsComplete); + } + finally { _poolLock.Release(); } } + private static readonly string _getConnectionErrorMessage = "Threading.Channel buffer used for reading RabbitMQ connections has been closed."; + [MethodImpl(MethodImplOptions.AggressiveInlining)] public async ValueTask GetConnectionAsync() { @@ -176,7 +188,7 @@ public async ValueTask GetConnectionAsync() .WaitToReadAsync() .ConfigureAwait(false)) { - throw new InvalidOperationException(ExceptionMessages.GetConnectionErrorMessage); + throw new InvalidOperationException(_getConnectionErrorMessage); } while (true) @@ -205,7 +217,7 @@ public async ValueTask ReturnConnectionAsync(IConnectionHost connHost) .WaitToWriteAsync() .ConfigureAwait(false)) { - throw new InvalidOperationException(ExceptionMessages.GetConnectionErrorMessage); + throw new InvalidOperationException(_getConnectionErrorMessage); } await _connections @@ -213,27 +225,29 @@ await _connections .WriteAsync(connHost); } + private static readonly string _shutdown = "ConnectionPool shutdown was called."; + private static readonly string _shutdownComplete = "ConnectionPool shutdown complete."; + public async Task ShutdownAsync() { - _logger.LogTrace(LogMessages.ConnectionPools.Shutdown); - - await _poolLock - .WaitAsync() - .ConfigureAwait(false); + if (!await _poolLock.WaitAsync(0).ConfigureAwait(false)) return; - _connections.Writer.Complete(); - - await _connections.Reader.WaitToReadAsync().ConfigureAwait(false); - while (_connections.Reader.TryRead(out IConnectionHost connHost)) + try { - try - { connHost.Close(); } - catch { /* SWALLOW */ } - } + _logger.LogTrace(_shutdown); + _connections.Writer.Complete(); + await _connections.Reader.WaitToReadAsync().ConfigureAwait(false); - _poolLock.Release(); + while (_connections.Reader.TryRead(out IConnectionHost connHost)) + { + try + { connHost.Close(); } + catch { /* SWALLOW */ } + } - _logger.LogTrace(LogMessages.ConnectionPools.ShutdownComplete); + _logger.LogTrace(_shutdownComplete); + } + finally { _poolLock.Release(); } } protected virtual void Dispose(bool disposing) diff --git a/src/HouseofCat.RabbitMQ/Publisher/Publisher.cs b/src/HouseofCat.RabbitMQ/Publisher/Publisher.cs index 6d8d57c..a6dde69 100644 --- a/src/HouseofCat.RabbitMQ/Publisher/Publisher.cs +++ b/src/HouseofCat.RabbitMQ/Publisher/Publisher.cs @@ -232,19 +232,24 @@ await _messageQueue { _pubLock.Release(); } } + private static readonly string _autoPublisherNotStartedError = "AutoPublisher has not been started."; + private static readonly string _messageQueued = "AutoPublisher queued message [MessageId:{0} InternalId:{1}]."; + public void QueueMessage(IMessage message) { - if (!AutoPublisherStarted) throw new InvalidOperationException(ExceptionMessages.AutoPublisherNotStartedError); + if (!AutoPublisherStarted) throw new InvalidOperationException(_autoPublisherNotStartedError); Guard.AgainstNull(message, nameof(message)); - _logger.LogDebug(LogMessages.AutoPublishers.MessageQueued, message.MessageId, message.Metadata?.PayloadId); + _logger.LogDebug(_messageQueued, message.MessageId, message.Metadata?.PayloadId); _messageQueue.Writer.TryWrite(message); } + private static readonly string _queueChannelError = "Can't queue a message to a closed Threading.Channel."; + public async ValueTask QueueMessageAsync(IMessage message) { - if (!AutoPublisherStarted) throw new InvalidOperationException(ExceptionMessages.AutoPublisherNotStartedError); + if (!AutoPublisherStarted) throw new InvalidOperationException(_autoPublisherNotStartedError); Guard.AgainstNull(message, nameof(message)); if (!await _messageQueue @@ -252,10 +257,10 @@ public async ValueTask QueueMessageAsync(IMessage message) .WaitToWriteAsync() .ConfigureAwait(false)) { - throw new InvalidOperationException(ExceptionMessages.QueueChannelError); + throw new InvalidOperationException(_queueChannelError); } - _logger.LogDebug(LogMessages.AutoPublishers.MessageQueued, message.MessageId, message.Metadata?.PayloadId); + _logger.LogDebug(_messageQueued, message.MessageId, message.Metadata?.PayloadId); await _messageQueue .Writer @@ -266,6 +271,7 @@ await _messageQueue private static readonly string _defaultAutoPublisherSpanName = "messaging.rabbitmq.autopublisher process"; private static readonly string _compressEventName = "compressed"; private static readonly string _encryptEventName = "encrypted"; + private static readonly string _messagePublished = "AutoPublisher published message [MessageId:{0} InternalId:{1}]. Listen for receipt to indicate success..."; private async Task ProcessMessagesAsync(ChannelReader channelReader) { @@ -307,7 +313,7 @@ private async Task ProcessMessagesAsync(ChannelReader channelReader) span?.AddEvent(_encryptEventName); } - _logger.LogDebug(LogMessages.AutoPublishers.MessagePublished, message.MessageId, message.Metadata?.PayloadId); + _logger.LogDebug(_messagePublished, message.MessageId, message.Metadata?.PayloadId); await PublishAsync(message, Options.PublisherOptions.CreatePublishReceipts, Options.PublisherOptions.WithHeaders) .ConfigureAwait(false); @@ -351,7 +357,8 @@ private async ValueTask ProcessReceiptAsync(IPublishReceipt receipt) #region Publishing - // A basic implementation of publish but using the ChannelPool. If message properties is null, one is created and all messages are set to persistent. + private static readonly string _publishFailed = "Publish to route [{0}] failed, flagging channel host. Error: {1}"; + public async Task PublishAsync( string exchangeName, string routingKey, @@ -396,7 +403,7 @@ public async Task PublishAsync( catch (Exception ex) { OpenTelemetryHelpers.SetSpanAsError(span, ex); - _logger.LogDebug(LogMessages.Publishers.PublishFailed, $"{exchangeName}->{routingKey}", ex.Message); + _logger.LogDebug(_publishFailed, $"{exchangeName}->{routingKey}", ex.Message); error = true; } finally @@ -444,7 +451,7 @@ public async Task PublishAsync( { OpenTelemetryHelpers.SetSpanAsError(span, ex); _logger.LogDebug( - LogMessages.Publishers.PublishFailed, + _publishFailed, $"{exchangeName}->{routingKey}", ex.Message); @@ -508,7 +515,7 @@ public async Task PublishBatchAsync( { OpenTelemetryHelpers.SetSpanAsError(span, ex); _logger.LogDebug( - LogMessages.Publishers.PublishFailed, + _publishFailed, $"{exchangeName}->{routingKey}", ex.Message); @@ -564,7 +571,7 @@ public async Task PublishBatchAsync( OpenTelemetryHelpers.SetSpanAsError(span, ex); _logger.LogDebug( - LogMessages.Publishers.PublishFailed, + _publishFailed, $"{exchangeName}->{routingKey}", ex.Message); @@ -581,6 +588,7 @@ await _channelPool } private static readonly string _defaultPublishSpanName = "messaging.rabbitmq.publisher publish"; + private static readonly string _publishMessageFailed = "Publish to route [{0}] failed [MessageId: {1}] flagging channel host. Error: {2}"; /// /// Acquires a channel from the channel pool, then publishes message based on the message parameters. @@ -620,7 +628,7 @@ public async Task PublishAsync(IMessage message, bool createReceipt, bool withOp { OpenTelemetryHelpers.SetSpanAsError(span, ex); _logger.LogDebug( - LogMessages.Publishers.PublishMessageFailed, + _publishMessageFailed, $"{message.Exchange}->{message.RoutingKey}", message.MessageId, ex.Message); @@ -683,7 +691,7 @@ public async Task PublishWithConfirmationAsync(IMessage message, bool createRece { OpenTelemetryHelpers.SetSpanAsError(span, ex); _logger.LogDebug( - LogMessages.Publishers.PublishMessageFailed, + _publishMessageFailed, $"{message.Exchange}->{message.RoutingKey}", message.MessageId, ex.Message); @@ -747,7 +755,7 @@ public async Task PublishManyAsync(IList messages, bool createReceipt, { OpenTelemetryHelpers.SetSpanAsError(span, ex); _logger.LogDebug( - LogMessages.Publishers.PublishMessageFailed, + _publishMessageFailed, $"{messages[i].Exchange}->{messages[i].RoutingKey}", messages[i].MessageId, ex.Message); @@ -765,6 +773,8 @@ public async Task PublishManyAsync(IList messages, bool createReceipt, await _channelPool.ReturnChannelAsync(chanHost, error).ConfigureAwait(false); } + private static readonly string _publishBatchFailed = "Batch publish failed, flagging channel host. Error: {0}"; + /// /// Use this method when a group of messages who have the same properties (deliverymode, messagetype, priority). /// Receipt with no error indicates that we successfully handed off to internal library, not necessarily published. @@ -818,7 +828,7 @@ public async Task PublishManyAsBatchAsync(IList messages, bool createR { OpenTelemetryHelpers.SetSpanAsError(span, ex); _logger.LogDebug( - LogMessages.Publishers.PublishBatchFailed, + _publishBatchFailed, ex.Message); error = true; @@ -830,6 +840,8 @@ public async Task PublishManyAsBatchAsync(IList messages, bool createR } } + private static readonly string _channelReadErrorMessage = "Can't use reader on a closed Threading.Channel."; + [MethodImpl(MethodImplOptions.AggressiveInlining)] private async ValueTask CreateReceiptAsync(IMessage message, bool error) { @@ -838,7 +850,7 @@ private async ValueTask CreateReceiptAsync(IMessage message, bool error) .WaitToWriteAsync() .ConfigureAwait(false)) { - throw new InvalidOperationException(ExceptionMessages.ChannelReadErrorMessage); + throw new InvalidOperationException(_channelReadErrorMessage); } await _receiptBuffer diff --git a/src/HouseofCat.RabbitMQ/Services/RabbitService.cs b/src/HouseofCat.RabbitMQ/Services/RabbitService.cs index b3d727b..452121a 100644 --- a/src/HouseofCat.RabbitMQ/Services/RabbitService.cs +++ b/src/HouseofCat.RabbitMQ/Services/RabbitService.cs @@ -2,7 +2,6 @@ using HouseofCat.Encryption; using HouseofCat.RabbitMQ.Pools; using HouseofCat.Serialization; -using HouseofCat.Utilities; using HouseofCat.Utilities.Errors; using HouseofCat.Utilities.Helpers; using Microsoft.Extensions.Logging; @@ -12,7 +11,6 @@ using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; -using static HouseofCat.RabbitMQ.LogMessages; namespace HouseofCat.RabbitMQ.Services; @@ -232,10 +230,12 @@ await Topologer } } + private static readonly string _noConsumerOptionsMessage = "Consumer {0} options not found in Options.ConsumerOptions."; + public IConsumer GetConsumer(string consumerName) { if (!Consumers.TryGetValue(consumerName, out IConsumer value)) - { throw new ArgumentException(string.Format(ExceptionMessages.NoConsumerOptionsMessage, consumerName)); } + { throw new ArgumentException(string.Format(_noConsumerOptionsMessage, consumerName)); } return value; }