Skip to content

Commit

Permalink
RabbitMQ ConnectionPool Oauth2 support.
Browse files Browse the repository at this point in the history
  • Loading branch information
houseofcat committed Mar 16, 2024
1 parent 94084a3 commit 8faa6ca
Show file tree
Hide file tree
Showing 15 changed files with 244 additions and 110 deletions.
23 changes: 15 additions & 8 deletions src/HouseofCat.RabbitMQ.Dataflows/ConsumerDataflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ public ConsumerDataflow<TState> WithSendStep(
#region Step Linking

protected virtual void BuildLinkages<TConsumerBlock>(DataflowLinkOptions overrideOptions = null)
where TConsumerBlock : ConsumerBlock<ReceivedData>
where TConsumerBlock : ConsumerBlock<ReceivedData>, new()
{
Guard.AgainstNull(_buildStateBlock, nameof(_buildStateBlock)); // Create State Is Mandatory
Guard.AgainstNull(_finalization, nameof(_finalization)); // Leaving The Workflow Is Mandatory
Expand All @@ -503,8 +503,10 @@ protected virtual void BuildLinkages<TConsumerBlock>(DataflowLinkOptions overrid
{
for (var i = 0; i < _consumerCount; i++)
{
var consumerBlock = New<TConsumerBlock>.Instance.Invoke();
consumerBlock.Consumer = new Consumer(_rabbitService.ChannelPool, _consumerName);
var consumerBlock = new TConsumerBlock
{
Consumer = new Consumer(_rabbitService.ChannelPool, _consumerName)
};
_consumerBlocks.Add(consumerBlock);
_consumerBlocks[i].LinkTo(_inputBuffer, overrideOptions ?? _linkStepOptions);
}
Expand All @@ -513,8 +515,11 @@ protected virtual void BuildLinkages<TConsumerBlock>(DataflowLinkOptions overrid
{
for (var i = 0; i < _consumers.Count; i++)
{
var consumerBlock = New<TConsumerBlock>.Instance.Invoke();
consumerBlock.Consumer = _consumers.ElementAt(i);
var consumerBlock = new TConsumerBlock
{
Consumer = _consumers.ElementAt(i)
};

_consumerBlocks.Add(consumerBlock);
_consumerBlocks[i].LinkTo(_inputBuffer, overrideOptions ?? _linkStepOptions);
}
Expand Down Expand Up @@ -601,9 +606,11 @@ private void LinkWithFaultRoute(ISourceBlock<TState> source, IPropagatorBlock<TS
private string StateIdentifier => $"{WorkflowName}_StateBuild";
public virtual TState BuildState<TOut>(ISerializationProvider provider, string key, ReceivedData data)
{
var state = New<TState>.Instance.Invoke();
state.ReceivedData = data;
state.Data = new Dictionary<string, object>();
var state = new TState
{
ReceivedData = data,
Data = new Dictionary<string, object>()
};

// If the SerializationProvider was assigned, use it, else it's raw bytes.
if (provider != null)
Expand Down
4 changes: 2 additions & 2 deletions src/HouseofCat.RabbitMQ/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ public static class Constants

// Consumer
public static string HeaderForObjectType { get; set; } = "X-CR-OBJECTTYPE";
public static string HeaderValueForMessage { get; set; } = "MESSAGE";
public static string HeaderValueForLetter { get; set; } = "LETTER";
public const string HeaderValueForMessage = "MESSAGE";
public const string HeaderValueForLetter = "LETTER";
public static string HeaderValueForUnknown { get; set; } = "UNKNOWN";
public static string HeaderForEncrypted { get; set; } = "X-CR-ENCRYPTED";
public static string HeaderForEncryption { get; set; } = "X-CR-ENCRYPTION";
Expand Down
49 changes: 40 additions & 9 deletions src/HouseofCat.RabbitMQ/Consumer/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -358,37 +358,68 @@ await _consumerChannel
}
}

private async Task ConsumerShutdownAsync(object sender, ShutdownEventArgs e)
protected async Task ConsumerShutdownAsync(object sender, ShutdownEventArgs e)
{
if (await _conLock.WaitAsync(0))
{
_shutdownAutoRecoveryLoopCount = 0;

try
{ await HandleUnexpectedShutdownAsync(e).ConfigureAwait(false); }
finally
{ _conLock.Release(); }
}
}

private async Task HandleUnexpectedShutdownAsync(ShutdownEventArgs e)
private static int _maxAutoRecoveryChannelHealthChecks = 0;
private int _shutdownAutoRecoveryLoopCount = 0;

/// <summary>
/// This method used to rebuild channels/connections for Consumers. Due to recent
/// changes in RabbitMQ.Client, it is now possible for the consumer to be in a state
/// of self-recovery. Unfortunately, there are still some edge cases where the channel
/// has exception and is closed server side and this library needs to be able to recover
/// from those events.
/// </summary>
/// <para>Docs: https://www.rabbitmq.com/client-libraries/dotnet-api-guide#recovery</para>
/// <param name="e"></param>
/// <returns></returns>
protected async Task HandleUnexpectedShutdownAsync(ShutdownEventArgs e)
{
if (!_shutdown)
{
var healthy = false;
while (!_shutdown && !healthy)
{
healthy = await _chanHost.HealthyAsync().ConfigureAwait(false);

healthy = await _chanHost.ChannelHealthyAsync().ConfigureAwait(false);
if (healthy)
{
_logger.LogInformation(
LogMessages.Consumers.ConsumerShutdownEvent,
ConsumerOptions.ConsumerName,
e.ReplyText);
return;
break;
}
else if (_shutdownAutoRecoveryLoopCount > _maxAutoRecoveryChannelHealthChecks)
{
_shutdownAutoRecoveryLoopCount = 0;
var connectionHealthy = await _chanHost
.ConnectionHealthyAsync()
.ConfigureAwait(false);

if (connectionHealthy)
{
// Inner infinite loop, until Channel is healthy/rebuilt.
await _chanHost
.WaitUntilChannelIsReadyAsync(Options.PoolOptions.SleepOnErrorInterval)
.ConfigureAwait(false);
}
}

await Task.Delay(Options.PoolOptions.SleepOnErrorInterval).ConfigureAwait(false);
_shutdownAutoRecoveryLoopCount++;
}

_logger.LogInformation(
LogMessages.Consumers.ConsumerShutdownEvent,
ConsumerOptions.ConsumerName,
e.ReplyText);
}
}

Expand Down
13 changes: 7 additions & 6 deletions src/HouseofCat.RabbitMQ/Extensions/MetadataExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using HouseofCat.Utilities.Errors;
using System;
using System.Collections.Generic;
using static HouseofCat.Reflection.Generics;

namespace HouseofCat.RabbitMQ;

Expand All @@ -10,9 +9,11 @@ public static class MetadataExtensions
public static T Clone<T>(this IMetadata metadata)
where T : IMetadata, new()
{
var clonedMetadata = New<T>.Instance();
clonedMetadata.Compressed = metadata.Compressed;
clonedMetadata.Encrypted = metadata.Encrypted;
var clonedMetadata = new T
{
Compressed = metadata.Compressed,
Encrypted = metadata.Encrypted
};

foreach (var kvp in metadata.CustomFields)
{
Expand All @@ -27,9 +28,9 @@ public static T GetHeader<T>(this IMetadata metadata, string key)
Guard.AgainstNull(metadata, nameof(LetterMetadata));
Guard.AgainstNullOrEmpty(metadata.CustomFields, nameof(LetterMetadata.CustomFields));

if (metadata.CustomFields.ContainsKey(key))
if (metadata.CustomFields.TryGetValue(key, out object value))
{
if (metadata.CustomFields[key] is T temp)
if (value is T temp)
{ return temp; }
else { throw new InvalidCastException(); }
}
Expand Down
1 change: 1 addition & 0 deletions src/HouseofCat.RabbitMQ/HouseofCat.RabbitMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="RabbitMQ.Client.OAuth2" Version="1.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\HouseofCat.Compression\HouseofCat.Compression.csproj" />
Expand Down
24 changes: 12 additions & 12 deletions src/HouseofCat.RabbitMQ/Messages/ReceivedData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ public ReceivedData(

private void ReadHeaders()
{
if (Properties?.Headers != null && Properties.Headers.ContainsKey(Constants.HeaderForObjectType))
if (Properties?.Headers != null && Properties.Headers.TryGetValue(Constants.HeaderForObjectType, out object objectType))
{
ContentType = Encoding.UTF8.GetString((byte[])Properties.Headers[Constants.HeaderForObjectType]);
ContentType = Encoding.UTF8.GetString((byte[])objectType);

// ADD SERIALIZER TO HEADER AND && JSON THIS ONE
if (ContentType == Constants.HeaderValueForLetter && Data?.Length > 0)
Expand All @@ -111,20 +111,20 @@ private void ReadHeaders()
}
}

if (Properties.Headers.ContainsKey(Constants.HeaderForEncrypted))
{ Encrypted = (bool)Properties.Headers[Constants.HeaderForEncrypted]; }
if (Properties.Headers.TryGetValue(Constants.HeaderForEncrypted, out object encryptedValue))
{ Encrypted = (bool)encryptedValue; }

if (Properties.Headers.ContainsKey(Constants.HeaderForEncryption))
{ EncryptionType = Encoding.UTF8.GetString((byte[])Properties.Headers[Constants.HeaderForEncryption]); }
if (Properties.Headers.TryGetValue(Constants.HeaderForEncryption, out object encryptedType))
{ EncryptionType = Encoding.UTF8.GetString((byte[])encryptedType); }

if (Properties.Headers.ContainsKey(Constants.HeaderForEncryptDate))
{ EncryptedDateTime = DateTime.Parse(Encoding.UTF8.GetString((byte[])Properties.Headers[Constants.HeaderForEncryptDate])); }
if (Properties.Headers.TryGetValue(Constants.HeaderForEncryptDate, out object encryptedDate))
{ EncryptedDateTime = DateTime.Parse(Encoding.UTF8.GetString((byte[])encryptedDate)); }

if (Properties.Headers.ContainsKey(Constants.HeaderForCompressed))
{ Compressed = (bool)Properties.Headers[Constants.HeaderForCompressed]; }
if (Properties.Headers.TryGetValue(Constants.HeaderForCompressed, out object compressedValue))
{ Compressed = (bool)compressedValue; }

if (Properties.Headers.ContainsKey(Constants.HeaderForCompression))
{ CompressionType = Encoding.UTF8.GetString((byte[])Properties.Headers[Constants.HeaderForCompression]); }
if (Properties.Headers.TryGetValue(Constants.HeaderForCompression, out object compressedType))
{ CompressionType = Encoding.UTF8.GetString((byte[])compressedType); }
}
else
{
Expand Down
7 changes: 6 additions & 1 deletion src/HouseofCat.RabbitMQ/Options/FactoryOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class FactoryOptions
/// <summary>
/// ConnectionFactory (RabbitMQ) the amount of time to wait before netrecovery begins (seconds).
/// </summary>
public ushort NetRecoveryTimeout { get; set; } = 10;
public ushort NetRecoveryTimeout { get; set; } = 5;

/// <summary>
/// ConnectionFactory (RabbitMQ) specify the amount of time before timeout on protocol operations (seconds).
Expand All @@ -70,4 +70,9 @@ public class FactoryOptions
/// Class to hold settings for ChannelFactory/SSL (RabbitMQ) settings.
/// </summary>
public SslOptions SslOptions { get; set; } = new SslOptions();

/// <summary>
/// Class to hold settings for OAuth2 (RabbitMQ) settings.
/// </summary>
public OAuth2Options OAuth2Options { get; set; } = new OAuth2Options();
}
13 changes: 13 additions & 0 deletions src/HouseofCat.RabbitMQ/Options/OAuth2Options.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace HouseofCat.RabbitMQ;

public class OAuth2Options
{
public string TokenEndpointUrl { get; set; }
public string ClientId { get; set; }
public string ClientSecret { get; set; }

/// <summary>
/// The OAuth2 Client name to use for distinction (if you use more than one).
/// </summary>
public string OAuth2ClientName { get; set; } = "RabbitMQ.Client.OAuth2.Default";
}
14 changes: 12 additions & 2 deletions src/HouseofCat.RabbitMQ/Options/PoolOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,26 @@ public class PoolOptions

/// <summary>
/// Number of connections to be created in the ConnectionPool. Used in round-robin to create channels.
/// <para>Deafult valuse is 2.</para>
/// </summary>
public ushort MaxConnections { get; set; } = 5;
public ushort MaxConnections { get; set; } = 2;

/// <summary>
/// Number of channels to keep in each of the channel pools. Used in round-robin to perform actions.
/// <para>Default value is 10.</para>
/// </summary>
public ushort MaxChannels { get; set; } = 25;
public ushort MaxChannels { get; set; } = 10;

/// <summary>
/// The time to sleep (in ms) when an error occurs on Channel or Connection creation. It's best not to be hyper aggressive with this value.
/// <para>Default value is 1000.</para>
/// </summary>
public int SleepOnErrorInterval { get; set; } = 1000;

/// <summary>
/// All Transient Channels will be created in this range. This is to help identify transient channels
/// used in logging internally. Can not be lower than 10000.
/// <para>Default value is 10000.</para>
/// </summary>
public ulong TansientChannelStartRange { get; set; } = 10000;
}
8 changes: 4 additions & 4 deletions src/HouseofCat.RabbitMQ/Options/RabbitOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public class RabbitOptions

public ConsumerOptions GetConsumerOptions(string consumerName)
{
if (!ConsumerOptions.ContainsKey(consumerName)) throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, ExceptionMessages.NoConsumerOptionsMessage, consumerName));
return ConsumerOptions[consumerName];
if (!ConsumerOptions.TryGetValue(consumerName, out ConsumerOptions value)) throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, ExceptionMessages.NoConsumerOptionsMessage, consumerName));
return value;
}

public void ApplyGlobalConsumerOptions()
Expand All @@ -45,9 +45,9 @@ public void ApplyGlobalConsumerOptions()
// on top of (overriding) individual consumer settings. Opt out by not setting
// the global settings field.
if (!string.IsNullOrWhiteSpace(kvp.Value.GlobalSettings)
&& GlobalConsumerOptions.ContainsKey(kvp.Value.GlobalSettings))
&& GlobalConsumerOptions.TryGetValue(kvp.Value.GlobalSettings, out GlobalConsumerOptions value))
{
kvp.Value.ApplyGlobalOptions(GlobalConsumerOptions[kvp.Value.GlobalSettings]);
kvp.Value.ApplyGlobalOptions(value);
}
}
}
Expand Down
23 changes: 13 additions & 10 deletions src/HouseofCat.RabbitMQ/Pools/ChannelHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ public interface IChannelHost
bool FlowControlled { get; }

IModel GetChannel();
Task WaitUntilReadyAsync(int sleepInterval, CancellationToken token = default);
Task WaitUntilChannelIsReadyAsync(int sleepInterval, CancellationToken token = default);
Task<bool> BuildRabbitMQChannelAsync();
void Close();
Task<bool> HealthyAsync();
Task<bool> ChannelHealthyAsync();
Task<bool> ConnectionHealthyAsync();
}

public class ChannelHost : IChannelHost, IDisposable
Expand Down Expand Up @@ -61,7 +62,7 @@ public IModel GetChannel()
{ _hostLock.Release(); }
}

public async Task WaitUntilReadyAsync(int sleepInterval, CancellationToken token = default)
public async Task WaitUntilChannelIsReadyAsync(int sleepInterval, CancellationToken token = default)
{
var success = false;
while (!token.IsCancellationRequested && !success)
Expand Down Expand Up @@ -147,24 +148,26 @@ protected virtual void FlowControl(object sender, FlowControlEventArgs e)
_hostLock.Release();
}

public async Task<bool> HealthyAsync()
public async Task<bool> ChannelHealthyAsync()
{
var connectionHealthy = await _connHost.HealthyAsync().ConfigureAwait(false);

return connectionHealthy && !FlowControlled && (_channel?.IsOpen ?? false);
}

public async Task<bool> ConnectionHealthyAsync()
{
return await _connHost.HealthyAsync().ConfigureAwait(false);
}

private const int CloseCode = 200;
private const string CloseMessage = "HouseofCat.RabbitMQ manual close channel initiated.";

public void Close()
{
if (!Closed || !_channel.IsOpen)
{
try
{ _channel.Close(CloseCode, CloseMessage); }
catch { /* SWALLOW */ }
}
try
{ _channel.Close(CloseCode, CloseMessage); }
catch { /* SWALLOW */ }
}

protected virtual void Dispose(bool disposing)
Expand Down
Loading

0 comments on commit 8faa6ca

Please sign in to comment.