Skip to content

Commit

Permalink
Consolidating logs with classes using them (removing them from Consta…
Browse files Browse the repository at this point in the history
…nts).
  • Loading branch information
houseofcat committed Apr 24, 2024
1 parent 3dca662 commit f6b825e
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 216 deletions.
114 changes: 0 additions & 114 deletions src/HouseofCat.RabbitMQ/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,117 +53,3 @@ public static class Constants
public static string MessagingMessageCompressedKey { get; set; } = "messaging.rabbitmq.message.compressed";
public static string MessagingMessageCompressionKey { get; set; } = "messaging.rabbitmq.message.compression";
}

public static class ExceptionMessages
{
// AutoPublisher
public const string AutoPublisherNotStartedError = "AutoPublisher has not been started.";

// General
public const string QueueChannelError = "Can't queue a message to a closed Threading.Channel.";

public const string ChannelReadErrorMessage = "Can't use reader on a closed Threading.Channel.";
public const string NoConsumerOptionsMessage = "Consumer {0} not found in Consumers dictionary.";
public const string NoConsumerPipelineOptionsMessage = "ConsumerPipeline {0} not found in ConsumerPipelineOptions dictionary.";

public const string ValidationMessage = "ConnectionPool is not initialized or is shutdown.";
public const string ShutdownValidationMessage = "ConnectionPool is not initialized. Can't be Shutdown.";
public const string GetConnectionErrorMessage = "Threading.Channel buffer used for reading RabbitMQ connections has been closed.";

public const string ChannelPoolNotInitializedMessage = "ChannelPool is not usable until it has been initialized.";
public const string EncrypConfigErrorMessage = "Encryption can't be enabled without a HashKey (32-byte length).";

// ChannelPool
public const string ChannelPoolValidationMessage = "ChannelPool is not initialized or is shutdown.";
public const string ChannelPoolShutdownValidationMessage = "ChannelPool is not initialized. Can't be Shutdown.";
public const string ChannelPoolBadOptionMaxChannelError = "Check your PoolOptions, MaxChannels was less than 1.";
public const string ChannelPoolBadOptionMaxAckChannelError = "Check your PoolOptions, MaxAckableChannels was less than 1.";
public const string ChannelPoolGetChannelError = "Threading.Channel used for reading RabbitMQ channels has been closed.";

// Pipeline Messages
public const string NotFinalized = "Pipeline is not ready for receiving work as it has not been finalized yet.";
public const string AlreadyFinalized = "Pipeline is already finalized and ready for use.";
public const string CantFinalize = "Pipeline can't finalize as no steps have been added.";
public const string InvalidAddError = "Pipeline is already finalized and you can no longer add steps.";

public const string ChainingImpossible = "Pipelines can't be chained together as one, or both, pipelines have been finalized.";
public const string ChainingNotMatched = "Pipelines can't be chained together as the last step function and the first step function don't align with type input or asynchronicity.";
public const string NothingToChain = "Pipelines can't be chained together as one, or both, pipelines have no steps.";

public const string InvalidStepFound = "Pipeline can't chain the last step to this new step. Unexpected type found on the previous step.";
}

public static class LogMessages
{
public static class ChannelHosts
{
public const string FlowControlled = "ChannelHost [Id: {0}] - Flow control event has triggered.";
public const string FlowControlFinished = "ChannelHost [Id: {0}] - Flow control event has resolved itself.";
public const string ConsumerStartedConsumer = "ChannelHost [Id: {0}] - Starting consuming. ConsumerTag: [{1}]";
public const string ConsumerStopConsumer = "ChannelHost [Id: {0}] - Stopping consuming using ConsumerTag: [{1}]";
public const string ConsumerStopConsumerError = "ChannelHost [Id: {0}] - Error stopping consuming using ConsumerTag: [{1}]";
}

public static class ConnectionPools
{
public const string CreateConnections = "ConnectionPool creating Connections...";
public const string CreateConnectionsComplete = "ConnectionPool initialized.";
public const string CreateConnectionException = "Connection [{0}] failed to be created.";
public const string Shutdown = "ConnectionPool shutdown was called.";
public const string ShutdownComplete = "ConnectionPool shutdown complete.";
}

public static class ChannelPools
{
public const string Initialization = "ChannelPool initialize call was made.";
public const string InitializationComplete = "ChannelPool initialized.";
public const string ChannelHasIssues = "ChannelHost [Id: {0}] was detected to have issues. Attempting to repair...";
public const string CreateChannel = "ChannelHost [Id: {0}] create loop is executing an iteration...";
public const string CreateChannelFailedConnection = "The ChannelHost [Id: {0}] failed because Connection is unhealthy.";
public const string CreateChannelFailedConstruction = "The ChannelHost [Id: {0}] failed because ChannelHost construction threw exception.";
public const string CreateChannelSleep = "The ChannelHost [Id: {0}] create loop iteration failed. Sleeping...";
public const string CreateChannelSuccess = "The ChannelHost [Id: {0}] create loop finished. Channel restored and flags removed.";
public const string ReturningChannel = "The ChannelHost [Id: {0}] was returned to the pool. Flagged? {1}";
public const string Shutdown = "ChannelPool shutdown was called.";
public const string ShutdownComplete = "ChannelPool shutdown complete.";
}

public static class Publishers
{
public const string PublishFailed = "Publish to route [{0}] failed, flagging channel host. Error: {1}";
public const string PublishMessageFailed = "Publish to route [{0}] failed [MessageId: {1}] flagging channel host. Error: {2}";
public const string PublishBatchFailed = "Batch publish failed, flagging channel host. Error: {0}";
}

public static class AutoPublishers
{
public const string MessageQueued = "AutoPublisher queued message [MessageId:{0} InternalId:{1}].";
public const string MessagePublished = "AutoPublisher published message [MessageId:{0} InternalId:{1}]. Listen for receipt to indicate success...";
}

public static class Consumers
{
public const string StartingConsumer = "Consumer [{0}] starting...";
public const string StartingConsumerError = "Exception creating internal RabbitMQ consumer. Retrying...";
public const string StartedConsumer = "Consumer [{0}] started.";
public const string StartingConsumerLoop = "Consumer [{0}] startup loop executing...";
public const string Started = "Consumer [{0}] started.";
public const string StopConsumer = "Consumer [{0}] stop consuming called...";
public const string StoppedConsumer = "Consumer [{0}] stopped consuming.";
public const string GettingTransientChannelHost = "Consumer [{0}] getting a transient channel.";
public const string GettingChannelHost = "Consumer [{0}] getting a channel.";
public const string GettingAckChannelHost = "Consumer [{0}] getting a ackable channel host.";
public const string ChannelEstablished = "Consumer [{0}] channel host [{1}] assigned.";
public const string ChannelNotEstablished = "Consumer [{0}] channel host could not be assigned.";
public const string ConsumerMessageReceived = "Consumer [{0}] message received (DT:{1}]. Adding to buffer...";
public const string ConsumerAsyncMessageReceived = "Consumer [{0}] async message received (DT:{1}]. Adding to buffer...";
public const string ConsumerShutdownEvent = "Consumer [{0}] recoverable shutdown event has occurred on Channel [Id: {1}]. Reason: {2}. Attempting to restart consuming...";
public const string ConsumerShutdownEventFinished = "Consumer [{0}] shutdown event has finished on Channel [Id: {1}].";
public const string ConsumerChannelReplacedEvent = "Consumer [{0}] recoverable shutdown event is ongoing. Connection is healthy. Channel appears to be dead and replacing it...";
public const string ConsumerMessageWriteToBufferError = "Consumer [{0}] was unable to write to channel buffer. Error: {1}";

public const string ConsumerDataflowActionCancelled = "Consumer [{0}] dataflow engine actions were cancelled.";
public const string ConsumerDataflowError = "Consumer [{0}] dataflow engine encountered an error. Error: {1}";
public const string ConsumerDataflowQueueing = "Consumer [{0}] dataflow engine queueing unit of work [receivedMessage:DT:{1}].";
}
}
58 changes: 38 additions & 20 deletions src/HouseofCat.RabbitMQ/Consumer/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using static HouseofCat.RabbitMQ.LogMessages;

namespace HouseofCat.RabbitMQ;

Expand Down Expand Up @@ -88,6 +87,8 @@ public Consumer(
_defaultOptions.Converters.Add(new FlexibleObjectJsonConverter());
}

private static readonly string _startingConsumerLoop = "Consumer [{0}] startup loop executing...";

public async Task StartConsumerAsync()
{
if (!await _conLock.WaitAsync(0).ConfigureAwait(false)) return;
Expand All @@ -107,7 +108,7 @@ public async Task StartConsumerAsync()
var success = false;
while (!success)
{
_logger.LogTrace(Consumers.StartingConsumerLoop, ConsumerOptions.ConsumerName);
_logger.LogTrace(_startingConsumerLoop, ConsumerOptions.ConsumerName);
success = await StartConsumingAsync().ConfigureAwait(false);
if (!success)
{ await Task.Delay(Options.PoolOptions.SleepOnErrorInterval); }
Expand All @@ -119,6 +120,9 @@ public async Task StartConsumerAsync()
finally { _conLock.Release(); }
}

private static readonly string _stopConsumer = "Consumer [{0}] stop consuming called...";
private static readonly string _stoppedConsumer = "Consumer [{0}] stopped consuming.";

public async Task StopConsumerAsync(bool immediate = false)
{
if (!await _conLock.WaitAsync(0).ConfigureAwait(false)) return;
Expand All @@ -128,7 +132,7 @@ public async Task StopConsumerAsync(bool immediate = false)
if (Started)
{
_shutdown = true;
_logger.LogInformation(Consumers.StopConsumer, ConsumerOptions.ConsumerName);
_logger.LogInformation(_stopConsumer, ConsumerOptions.ConsumerName);

_consumerChannel.Writer.Complete();

Expand All @@ -147,7 +151,7 @@ await _consumerChannel
_cts.Cancel();
Started = false;
_logger.LogInformation(
Consumers.StoppedConsumer,
_stoppedConsumer,
ConsumerOptions.ConsumerName);
}
}
Expand All @@ -156,15 +160,18 @@ await _consumerChannel

protected AsyncEventingBasicConsumer _asyncConsumer;
protected EventingBasicConsumer _consumer;

protected CancellationTokenSource _cts;

private static readonly string _startingConsumer = "Consumer [{0}] starting...";
private static readonly string _startingConsumerError = "Exception creating internal RabbitMQ consumer. Retrying...";
private static readonly string _startedConsumer = "Consumer [{0}] started.";

protected async Task<bool> StartConsumingAsync()
{
if (_shutdown) return false;

_logger.LogInformation(
Consumers.StartingConsumer,
_startingConsumer,
ConsumerOptions.ConsumerName);

if (!_chanHost.ChannelHealthy())
Expand Down Expand Up @@ -194,7 +201,7 @@ await _chanHost.WaitUntilChannelIsReadyAsync(
}
catch (Exception ex)
{
_logger.LogError(ex, Consumers.StartingConsumerError);
_logger.LogError(ex, _startingConsumerError);
return false;
}
}
Expand All @@ -213,29 +220,33 @@ await _chanHost.WaitUntilChannelIsReadyAsync(
}
catch (Exception ex)
{
_logger.LogError(ex, Consumers.StartingConsumerError);
_logger.LogError(ex, _startingConsumerError);
return false;
}
}

_cts = new CancellationTokenSource();
_logger.LogInformation(
Consumers.StartedConsumer,
_startedConsumer,
ConsumerOptions.ConsumerName);

return true;
}

private static readonly string _gettingTransientChannelHost = "Consumer [{0}] getting a transient channel.";
private static readonly string _channelEstablished = "Consumer [{0}] channel host [{1}] assigned.";


protected virtual async Task SetChannelHostAsync()
{
_logger.LogTrace(Consumers.GettingTransientChannelHost, ConsumerOptions.ConsumerName);
_logger.LogTrace(_gettingTransientChannelHost, ConsumerOptions.ConsumerName);

_chanHost = await ChannelPool
.GetTransientChannelAsync(!ConsumerOptions.AutoAck)
.ConfigureAwait(false);

_logger.LogDebug(
Consumers.ChannelEstablished,
_channelEstablished,
ConsumerOptions.ConsumerName,
_chanHost?.ChannelId.ToString() ?? "ChannelHost: null");
}
Expand All @@ -253,10 +264,12 @@ private EventingBasicConsumer CreateConsumer()
return consumer;
}

private static readonly string _consumerMessageReceived = "Consumer [{0}] message received (DT:{1}]. Adding to buffer...";

protected virtual async void ReceiveHandler(object _, BasicDeliverEventArgs bdea)
{
_logger.LogDebug(
Consumers.ConsumerMessageReceived,
_consumerMessageReceived,
ConsumerOptions.ConsumerName,
bdea.DeliveryTag);

Expand Down Expand Up @@ -297,13 +310,15 @@ protected AsyncEventingBasicConsumer CreateAsyncConsumer()
protected virtual async Task ReceiveHandlerAsync(object _, BasicDeliverEventArgs bdea)
{
_logger.LogDebug(
Consumers.ConsumerAsyncMessageReceived,
_consumerMessageReceived,
ConsumerOptions.ConsumerName,
bdea.DeliveryTag);

await HandleMessageAsync(bdea).ConfigureAwait(false);
}

private static readonly string _consumerMessageWriteToBufferError = "Consumer [{0}] was unable to write to channel buffer. Error: {1}";

private static readonly string _consumerSpanNameFormat = "messaging.rabbitmq.consumer receive";

protected virtual async ValueTask<bool> HandleMessageAsync(BasicDeliverEventArgs bdea)
Expand Down Expand Up @@ -335,7 +350,7 @@ await _consumerChannel
catch (Exception ex)
{
_logger.LogError(
Consumers.ConsumerMessageWriteToBufferError,
_consumerMessageWriteToBufferError,
ConsumerOptions.ConsumerName,
ex.Message);
return false;
Expand Down Expand Up @@ -414,6 +429,8 @@ protected virtual void AutoDeserialize(ReceivedMessage receivedMessage)
}
}

private static readonly string _consumerShutdownEvent = "Consumer [{0}] recoverable shutdown event has occurred on Channel [Id: {1}]. Reason: {2}. Attempting to restart consuming...";

protected async Task ConsumerShutdownAsync(object sender, ShutdownEventArgs e)
{
if (!await _conLock.WaitAsync(0).ConfigureAwait(false)) return;
Expand All @@ -423,7 +440,7 @@ protected async Task ConsumerShutdownAsync(object sender, ShutdownEventArgs e)
if (!_shutdown)
{
_logger.LogInformation(
Consumers.ConsumerShutdownEvent,
_consumerShutdownEvent,
ConsumerOptions.ConsumerName,
_chanHost.ChannelId,
e.ReplyText);
Expand All @@ -438,7 +455,8 @@ await HandleRecoverableShutdownAsync(e)
{ _conLock.Release(); }
}

protected static readonly string _consumerShutdownExceptionMessage = "Consumer's ChannelHost {0} had an unhandled exception during recovery.";
private static readonly string _consumerShutdownExceptionMessage = "Consumer's ChannelHost {0} had an unhandled exception during recovery.";
private static readonly string _consumerShutdownEventFinished = "Consumer [{0}] shutdown event has finished on Channel [Id: {1}].";

protected virtual async Task HandleRecoverableShutdownAsync(ShutdownEventArgs e)
{
Expand All @@ -456,7 +474,7 @@ await _chanHost
}

_logger.LogInformation(
Consumers.ConsumerShutdownEventFinished,
_consumerShutdownEventFinished,
ConsumerOptions.ConsumerName,
_chanHost.ChannelId);
}
Expand All @@ -465,7 +483,7 @@ await _chanHost

public async ValueTask<IReceivedMessage> ReadAsync()
{
if (!await _consumerChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) throw new InvalidOperationException(ExceptionMessages.ChannelReadErrorMessage);
if (!await _consumerChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) throw new InvalidOperationException();

return await _consumerChannel
.Reader
Expand All @@ -475,7 +493,7 @@ public async ValueTask<IReceivedMessage> ReadAsync()

public async Task<IEnumerable<IReceivedMessage>> ReadUntilEmptyAsync(CancellationToken token = default)
{
if (!await _consumerChannel.Reader.WaitToReadAsync(token).ConfigureAwait(false)) throw new InvalidOperationException(ExceptionMessages.ChannelReadErrorMessage);
if (!await _consumerChannel.Reader.WaitToReadAsync(token).ConfigureAwait(false)) throw new InvalidOperationException();

var list = new List<IReceivedMessage>();
await _consumerChannel.Reader.WaitToReadAsync(token).ConfigureAwait(false);
Expand All @@ -490,7 +508,7 @@ public async Task<IEnumerable<IReceivedMessage>> ReadUntilEmptyAsync(Cancellatio

public async ValueTask<IAsyncEnumerable<IReceivedMessage>> ReadUntilStopAsync(CancellationToken token = default)
{
if (!await _consumerChannel.Reader.WaitToReadAsync(token).ConfigureAwait(false)) throw new InvalidOperationException(ExceptionMessages.ChannelReadErrorMessage);
if (!await _consumerChannel.Reader.WaitToReadAsync(token).ConfigureAwait(false)) throw new InvalidOperationException();

return _consumerChannel.Reader.ReadAllAsync(token);
}
Expand Down
4 changes: 3 additions & 1 deletion src/HouseofCat.RabbitMQ/Options/RabbitOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ public sealed class RabbitOptions
/// </summary>
public IDictionary<string, ConsumerOptions> ConsumerOptions { get; set; } = new Dictionary<string, ConsumerOptions>();

private static readonly string _noConsumerOptionsMessage = "Consumer {0} not found in Consumers dictionary.";

public ConsumerOptions GetConsumerOptions(string consumerName)
{
if (ConsumerOptions.TryGetValue(consumerName, out ConsumerOptions value))
{
return value;
}
throw new ArgumentException(string.Format(ExceptionMessages.NoConsumerOptionsMessage, consumerName));
throw new ArgumentException(string.Format(_noConsumerOptionsMessage, consumerName));
}
}
Loading

0 comments on commit f6b825e

Please sign in to comment.