diff --git a/src/HouseofCat.RabbitMQ.Dataflows/ConsumerDataflow.cs b/src/HouseofCat.RabbitMQ.Dataflows/ConsumerDataflow.cs index 6cf2909a..323ab92a 100644 --- a/src/HouseofCat.RabbitMQ.Dataflows/ConsumerDataflow.cs +++ b/src/HouseofCat.RabbitMQ.Dataflows/ConsumerDataflow.cs @@ -487,7 +487,7 @@ public ConsumerDataflow WithSendStep( #region Step Linking protected virtual void BuildLinkages(DataflowLinkOptions overrideOptions = null) - where TConsumerBlock : ConsumerBlock + where TConsumerBlock : ConsumerBlock, new() { Guard.AgainstNull(_buildStateBlock, nameof(_buildStateBlock)); // Create State Is Mandatory Guard.AgainstNull(_finalization, nameof(_finalization)); // Leaving The Workflow Is Mandatory @@ -503,8 +503,10 @@ protected virtual void BuildLinkages(DataflowLinkOptions overrid { for (var i = 0; i < _consumerCount; i++) { - var consumerBlock = New.Instance.Invoke(); - consumerBlock.Consumer = new Consumer(_rabbitService.ChannelPool, _consumerName); + var consumerBlock = new TConsumerBlock + { + Consumer = new Consumer(_rabbitService.ChannelPool, _consumerName) + }; _consumerBlocks.Add(consumerBlock); _consumerBlocks[i].LinkTo(_inputBuffer, overrideOptions ?? _linkStepOptions); } @@ -513,8 +515,11 @@ protected virtual void BuildLinkages(DataflowLinkOptions overrid { for (var i = 0; i < _consumers.Count; i++) { - var consumerBlock = New.Instance.Invoke(); - consumerBlock.Consumer = _consumers.ElementAt(i); + var consumerBlock = new TConsumerBlock + { + Consumer = _consumers.ElementAt(i) + }; + _consumerBlocks.Add(consumerBlock); _consumerBlocks[i].LinkTo(_inputBuffer, overrideOptions ?? _linkStepOptions); } @@ -601,9 +606,11 @@ private void LinkWithFaultRoute(ISourceBlock source, IPropagatorBlock $"{WorkflowName}_StateBuild"; public virtual TState BuildState(ISerializationProvider provider, string key, ReceivedData data) { - var state = New.Instance.Invoke(); - state.ReceivedData = data; - state.Data = new Dictionary(); + var state = new TState + { + ReceivedData = data, + Data = new Dictionary() + }; // If the SerializationProvider was assigned, use it, else it's raw bytes. if (provider != null) diff --git a/src/HouseofCat.RabbitMQ/Constants.cs b/src/HouseofCat.RabbitMQ/Constants.cs index 411bf890..6edb477f 100644 --- a/src/HouseofCat.RabbitMQ/Constants.cs +++ b/src/HouseofCat.RabbitMQ/Constants.cs @@ -10,8 +10,8 @@ public static class Constants // Consumer public static string HeaderForObjectType { get; set; } = "X-CR-OBJECTTYPE"; - public static string HeaderValueForMessage { get; set; } = "MESSAGE"; - public static string HeaderValueForLetter { get; set; } = "LETTER"; + public const string HeaderValueForMessage = "MESSAGE"; + public const string HeaderValueForLetter = "LETTER"; public static string HeaderValueForUnknown { get; set; } = "UNKNOWN"; public static string HeaderForEncrypted { get; set; } = "X-CR-ENCRYPTED"; public static string HeaderForEncryption { get; set; } = "X-CR-ENCRYPTION"; diff --git a/src/HouseofCat.RabbitMQ/Consumer/Consumer.cs b/src/HouseofCat.RabbitMQ/Consumer/Consumer.cs index 30b700dd..e8365195 100644 --- a/src/HouseofCat.RabbitMQ/Consumer/Consumer.cs +++ b/src/HouseofCat.RabbitMQ/Consumer/Consumer.cs @@ -358,10 +358,12 @@ await _consumerChannel } } - private async Task ConsumerShutdownAsync(object sender, ShutdownEventArgs e) + protected async Task ConsumerShutdownAsync(object sender, ShutdownEventArgs e) { if (await _conLock.WaitAsync(0)) { + _shutdownAutoRecoveryLoopCount = 0; + try { await HandleUnexpectedShutdownAsync(e).ConfigureAwait(false); } finally @@ -369,26 +371,55 @@ private async Task ConsumerShutdownAsync(object sender, ShutdownEventArgs e) } } - private async Task HandleUnexpectedShutdownAsync(ShutdownEventArgs e) + private static int _maxAutoRecoveryChannelHealthChecks = 0; + private int _shutdownAutoRecoveryLoopCount = 0; + + /// + /// This method used to rebuild channels/connections for Consumers. Due to recent + /// changes in RabbitMQ.Client, it is now possible for the consumer to be in a state + /// of self-recovery. Unfortunately, there are still some edge cases where the channel + /// has exception and is closed server side and this library needs to be able to recover + /// from those events. + /// + /// Docs: https://www.rabbitmq.com/client-libraries/dotnet-api-guide#recovery + /// + /// + protected async Task HandleUnexpectedShutdownAsync(ShutdownEventArgs e) { if (!_shutdown) { var healthy = false; while (!_shutdown && !healthy) { - healthy = await _chanHost.HealthyAsync().ConfigureAwait(false); - + healthy = await _chanHost.ChannelHealthyAsync().ConfigureAwait(false); if (healthy) { - _logger.LogInformation( - LogMessages.Consumers.ConsumerShutdownEvent, - ConsumerOptions.ConsumerName, - e.ReplyText); - return; + break; + } + else if (_shutdownAutoRecoveryLoopCount > _maxAutoRecoveryChannelHealthChecks) + { + _shutdownAutoRecoveryLoopCount = 0; + var connectionHealthy = await _chanHost + .ConnectionHealthyAsync() + .ConfigureAwait(false); + + if (connectionHealthy) + { + // Inner infinite loop, until Channel is healthy/rebuilt. + await _chanHost + .WaitUntilChannelIsReadyAsync(Options.PoolOptions.SleepOnErrorInterval) + .ConfigureAwait(false); + } } await Task.Delay(Options.PoolOptions.SleepOnErrorInterval).ConfigureAwait(false); + _shutdownAutoRecoveryLoopCount++; } + + _logger.LogInformation( + LogMessages.Consumers.ConsumerShutdownEvent, + ConsumerOptions.ConsumerName, + e.ReplyText); } } diff --git a/src/HouseofCat.RabbitMQ/Extensions/MetadataExtensions.cs b/src/HouseofCat.RabbitMQ/Extensions/MetadataExtensions.cs index e8ad606b..7fb00a61 100644 --- a/src/HouseofCat.RabbitMQ/Extensions/MetadataExtensions.cs +++ b/src/HouseofCat.RabbitMQ/Extensions/MetadataExtensions.cs @@ -1,7 +1,6 @@ using HouseofCat.Utilities.Errors; using System; using System.Collections.Generic; -using static HouseofCat.Reflection.Generics; namespace HouseofCat.RabbitMQ; @@ -10,9 +9,11 @@ public static class MetadataExtensions public static T Clone(this IMetadata metadata) where T : IMetadata, new() { - var clonedMetadata = New.Instance(); - clonedMetadata.Compressed = metadata.Compressed; - clonedMetadata.Encrypted = metadata.Encrypted; + var clonedMetadata = new T + { + Compressed = metadata.Compressed, + Encrypted = metadata.Encrypted + }; foreach (var kvp in metadata.CustomFields) { @@ -27,9 +28,9 @@ public static T GetHeader(this IMetadata metadata, string key) Guard.AgainstNull(metadata, nameof(LetterMetadata)); Guard.AgainstNullOrEmpty(metadata.CustomFields, nameof(LetterMetadata.CustomFields)); - if (metadata.CustomFields.ContainsKey(key)) + if (metadata.CustomFields.TryGetValue(key, out object value)) { - if (metadata.CustomFields[key] is T temp) + if (value is T temp) { return temp; } else { throw new InvalidCastException(); } } diff --git a/src/HouseofCat.RabbitMQ/HouseofCat.RabbitMQ.csproj b/src/HouseofCat.RabbitMQ/HouseofCat.RabbitMQ.csproj index 1e1a1280..85d230fa 100644 --- a/src/HouseofCat.RabbitMQ/HouseofCat.RabbitMQ.csproj +++ b/src/HouseofCat.RabbitMQ/HouseofCat.RabbitMQ.csproj @@ -5,6 +5,7 @@ + diff --git a/src/HouseofCat.RabbitMQ/Messages/ReceivedData.cs b/src/HouseofCat.RabbitMQ/Messages/ReceivedData.cs index fb7e7298..586c8275 100644 --- a/src/HouseofCat.RabbitMQ/Messages/ReceivedData.cs +++ b/src/HouseofCat.RabbitMQ/Messages/ReceivedData.cs @@ -91,9 +91,9 @@ public ReceivedData( private void ReadHeaders() { - if (Properties?.Headers != null && Properties.Headers.ContainsKey(Constants.HeaderForObjectType)) + if (Properties?.Headers != null && Properties.Headers.TryGetValue(Constants.HeaderForObjectType, out object objectType)) { - ContentType = Encoding.UTF8.GetString((byte[])Properties.Headers[Constants.HeaderForObjectType]); + ContentType = Encoding.UTF8.GetString((byte[])objectType); // ADD SERIALIZER TO HEADER AND && JSON THIS ONE if (ContentType == Constants.HeaderValueForLetter && Data?.Length > 0) @@ -111,20 +111,20 @@ private void ReadHeaders() } } - if (Properties.Headers.ContainsKey(Constants.HeaderForEncrypted)) - { Encrypted = (bool)Properties.Headers[Constants.HeaderForEncrypted]; } + if (Properties.Headers.TryGetValue(Constants.HeaderForEncrypted, out object encryptedValue)) + { Encrypted = (bool)encryptedValue; } - if (Properties.Headers.ContainsKey(Constants.HeaderForEncryption)) - { EncryptionType = Encoding.UTF8.GetString((byte[])Properties.Headers[Constants.HeaderForEncryption]); } + if (Properties.Headers.TryGetValue(Constants.HeaderForEncryption, out object encryptedType)) + { EncryptionType = Encoding.UTF8.GetString((byte[])encryptedType); } - if (Properties.Headers.ContainsKey(Constants.HeaderForEncryptDate)) - { EncryptedDateTime = DateTime.Parse(Encoding.UTF8.GetString((byte[])Properties.Headers[Constants.HeaderForEncryptDate])); } + if (Properties.Headers.TryGetValue(Constants.HeaderForEncryptDate, out object encryptedDate)) + { EncryptedDateTime = DateTime.Parse(Encoding.UTF8.GetString((byte[])encryptedDate)); } - if (Properties.Headers.ContainsKey(Constants.HeaderForCompressed)) - { Compressed = (bool)Properties.Headers[Constants.HeaderForCompressed]; } + if (Properties.Headers.TryGetValue(Constants.HeaderForCompressed, out object compressedValue)) + { Compressed = (bool)compressedValue; } - if (Properties.Headers.ContainsKey(Constants.HeaderForCompression)) - { CompressionType = Encoding.UTF8.GetString((byte[])Properties.Headers[Constants.HeaderForCompression]); } + if (Properties.Headers.TryGetValue(Constants.HeaderForCompression, out object compressedType)) + { CompressionType = Encoding.UTF8.GetString((byte[])compressedType); } } else { diff --git a/src/HouseofCat.RabbitMQ/Options/FactoryOptions.cs b/src/HouseofCat.RabbitMQ/Options/FactoryOptions.cs index d0ee0369..18fdbb5b 100644 --- a/src/HouseofCat.RabbitMQ/Options/FactoryOptions.cs +++ b/src/HouseofCat.RabbitMQ/Options/FactoryOptions.cs @@ -54,7 +54,7 @@ public class FactoryOptions /// /// ConnectionFactory (RabbitMQ) the amount of time to wait before netrecovery begins (seconds). /// - public ushort NetRecoveryTimeout { get; set; } = 10; + public ushort NetRecoveryTimeout { get; set; } = 5; /// /// ConnectionFactory (RabbitMQ) specify the amount of time before timeout on protocol operations (seconds). @@ -70,4 +70,9 @@ public class FactoryOptions /// Class to hold settings for ChannelFactory/SSL (RabbitMQ) settings. /// public SslOptions SslOptions { get; set; } = new SslOptions(); + + /// + /// Class to hold settings for OAuth2 (RabbitMQ) settings. + /// + public OAuth2Options OAuth2Options { get; set; } = new OAuth2Options(); } diff --git a/src/HouseofCat.RabbitMQ/Options/OAuth2Options.cs b/src/HouseofCat.RabbitMQ/Options/OAuth2Options.cs new file mode 100644 index 00000000..2672b620 --- /dev/null +++ b/src/HouseofCat.RabbitMQ/Options/OAuth2Options.cs @@ -0,0 +1,13 @@ +namespace HouseofCat.RabbitMQ; + +public class OAuth2Options +{ + public string TokenEndpointUrl { get; set; } + public string ClientId { get; set; } + public string ClientSecret { get; set; } + + /// + /// The OAuth2 Client name to use for distinction (if you use more than one). + /// + public string OAuth2ClientName { get; set; } = "RabbitMQ.Client.OAuth2.Default"; +} diff --git a/src/HouseofCat.RabbitMQ/Options/PoolOptions.cs b/src/HouseofCat.RabbitMQ/Options/PoolOptions.cs index a2f707af..893e9640 100644 --- a/src/HouseofCat.RabbitMQ/Options/PoolOptions.cs +++ b/src/HouseofCat.RabbitMQ/Options/PoolOptions.cs @@ -9,16 +9,26 @@ public class PoolOptions /// /// Number of connections to be created in the ConnectionPool. Used in round-robin to create channels. + /// Deafult valuse is 2. /// - public ushort MaxConnections { get; set; } = 5; + public ushort MaxConnections { get; set; } = 2; /// /// Number of channels to keep in each of the channel pools. Used in round-robin to perform actions. + /// Default value is 10. /// - public ushort MaxChannels { get; set; } = 25; + public ushort MaxChannels { get; set; } = 10; /// /// The time to sleep (in ms) when an error occurs on Channel or Connection creation. It's best not to be hyper aggressive with this value. + /// Default value is 1000. /// public int SleepOnErrorInterval { get; set; } = 1000; + + /// + /// All Transient Channels will be created in this range. This is to help identify transient channels + /// used in logging internally. Can not be lower than 10000. + /// Default value is 10000. + /// + public ulong TansientChannelStartRange { get; set; } = 10000; } diff --git a/src/HouseofCat.RabbitMQ/Options/RabbitOptions.cs b/src/HouseofCat.RabbitMQ/Options/RabbitOptions.cs index 440de595..25e2ba1e 100644 --- a/src/HouseofCat.RabbitMQ/Options/RabbitOptions.cs +++ b/src/HouseofCat.RabbitMQ/Options/RabbitOptions.cs @@ -33,8 +33,8 @@ public class RabbitOptions public ConsumerOptions GetConsumerOptions(string consumerName) { - if (!ConsumerOptions.ContainsKey(consumerName)) throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, ExceptionMessages.NoConsumerOptionsMessage, consumerName)); - return ConsumerOptions[consumerName]; + if (!ConsumerOptions.TryGetValue(consumerName, out ConsumerOptions value)) throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, ExceptionMessages.NoConsumerOptionsMessage, consumerName)); + return value; } public void ApplyGlobalConsumerOptions() @@ -45,9 +45,9 @@ public void ApplyGlobalConsumerOptions() // on top of (overriding) individual consumer settings. Opt out by not setting // the global settings field. if (!string.IsNullOrWhiteSpace(kvp.Value.GlobalSettings) - && GlobalConsumerOptions.ContainsKey(kvp.Value.GlobalSettings)) + && GlobalConsumerOptions.TryGetValue(kvp.Value.GlobalSettings, out GlobalConsumerOptions value)) { - kvp.Value.ApplyGlobalOptions(GlobalConsumerOptions[kvp.Value.GlobalSettings]); + kvp.Value.ApplyGlobalOptions(value); } } } diff --git a/src/HouseofCat.RabbitMQ/Pools/ChannelHost.cs b/src/HouseofCat.RabbitMQ/Pools/ChannelHost.cs index 1e72edfa..bb00620e 100644 --- a/src/HouseofCat.RabbitMQ/Pools/ChannelHost.cs +++ b/src/HouseofCat.RabbitMQ/Pools/ChannelHost.cs @@ -17,10 +17,11 @@ public interface IChannelHost bool FlowControlled { get; } IModel GetChannel(); - Task WaitUntilReadyAsync(int sleepInterval, CancellationToken token = default); + Task WaitUntilChannelIsReadyAsync(int sleepInterval, CancellationToken token = default); Task BuildRabbitMQChannelAsync(); void Close(); - Task HealthyAsync(); + Task ChannelHealthyAsync(); + Task ConnectionHealthyAsync(); } public class ChannelHost : IChannelHost, IDisposable @@ -61,7 +62,7 @@ public IModel GetChannel() { _hostLock.Release(); } } - public async Task WaitUntilReadyAsync(int sleepInterval, CancellationToken token = default) + public async Task WaitUntilChannelIsReadyAsync(int sleepInterval, CancellationToken token = default) { var success = false; while (!token.IsCancellationRequested && !success) @@ -147,24 +148,26 @@ protected virtual void FlowControl(object sender, FlowControlEventArgs e) _hostLock.Release(); } - public async Task HealthyAsync() + public async Task ChannelHealthyAsync() { var connectionHealthy = await _connHost.HealthyAsync().ConfigureAwait(false); return connectionHealthy && !FlowControlled && (_channel?.IsOpen ?? false); } + public async Task ConnectionHealthyAsync() + { + return await _connHost.HealthyAsync().ConfigureAwait(false); + } + private const int CloseCode = 200; private const string CloseMessage = "HouseofCat.RabbitMQ manual close channel initiated."; public void Close() { - if (!Closed || !_channel.IsOpen) - { - try - { _channel.Close(CloseCode, CloseMessage); } - catch { /* SWALLOW */ } - } + try + { _channel.Close(CloseCode, CloseMessage); } + catch { /* SWALLOW */ } } protected virtual void Dispose(bool disposing) diff --git a/src/HouseofCat.RabbitMQ/Pools/ChannelPool.cs b/src/HouseofCat.RabbitMQ/Pools/ChannelPool.cs index e4b22e52..b4731bd1 100644 --- a/src/HouseofCat.RabbitMQ/Pools/ChannelPool.cs +++ b/src/HouseofCat.RabbitMQ/Pools/ChannelPool.cs @@ -71,6 +71,11 @@ public ChannelPool(IConnectionPool connPool) Guard.AgainstNull(connPool, nameof(connPool)); Options = connPool.Options; + _currentTansientChannelId = + Options.PoolOptions.TansientChannelStartRange < 10000 + ? 10000 + : Options.PoolOptions.TansientChannelStartRange; + _logger = LogHelper.GetLogger(); _connectionPool = connPool; _flaggedChannels = new ConcurrentDictionary(); @@ -127,13 +132,13 @@ public async ValueTask GetChannelAsync() .ReadAsync() .ConfigureAwait(false); - var healthy = await chanHost.HealthyAsync().ConfigureAwait(false); + var healthy = await chanHost.ChannelHealthyAsync().ConfigureAwait(false); var flagged = _flaggedChannels.ContainsKey(chanHost.ChannelId) && _flaggedChannels[chanHost.ChannelId]; if (flagged || !healthy) { _logger.LogWarning(LogMessages.ChannelPools.DeadChannel, chanHost.ChannelId); - await chanHost.WaitUntilReadyAsync(Options.PoolOptions.SleepOnErrorInterval); + await chanHost.WaitUntilChannelIsReadyAsync(Options.PoolOptions.SleepOnErrorInterval); } return chanHost; @@ -165,18 +170,23 @@ public async ValueTask GetAckChannelAsync() .ReadAsync() .ConfigureAwait(false); - var healthy = await chanHost.HealthyAsync().ConfigureAwait(false); + var healthy = await chanHost.ChannelHealthyAsync().ConfigureAwait(false); var flagged = _flaggedChannels.ContainsKey(chanHost.ChannelId) && _flaggedChannels[chanHost.ChannelId]; if (flagged || !healthy) { _logger.LogWarning(LogMessages.ChannelPools.DeadChannel, chanHost.ChannelId); - await chanHost.WaitUntilReadyAsync(Options.PoolOptions.SleepOnErrorInterval); + await chanHost.WaitUntilChannelIsReadyAsync(Options.PoolOptions.SleepOnErrorInterval); } return chanHost; } + // This is a simple counter to give a unique id to transient channels. + private ulong _currentTansientChannelId = 10000; + + private ulong GetNextTransientChannelId() => Interlocked.Increment(ref _currentTansientChannelId); + /// /// Gives user a transient is simply a channel not managed by this library. /// Closing and disposing the is the responsiblity of the user. @@ -184,12 +194,11 @@ public async ValueTask GetAckChannelAsync() /// /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public async ValueTask GetTransientChannelAsync(bool ackable) => await CreateChannelAsync(0, ackable).ConfigureAwait(false); + public async ValueTask GetTransientChannelAsync(bool ackable) => await CreateChannelAsync(GetNextTransientChannelId(), ackable).ConfigureAwait(false); [MethodImpl(MethodImplOptions.AggressiveInlining)] private async Task CreateChannelAsync(ulong channelId, bool ackable) { - IChannelHost chanHost = null; IConnectionHost connHost = null; while (true) @@ -209,7 +218,7 @@ private async Task CreateChannelAsync(ulong channelId, bool ackabl // Create a Channel Host try { - chanHost = new ChannelHost(channelId, connHost, ackable); + var chanHost = new ChannelHost(channelId, connHost, ackable); await ReturnConnectionWithOptionalSleep(connHost, channelId, 0).ConfigureAwait(false); _flaggedChannels[chanHost.ChannelId] = false; _logger.LogDebug(LogMessages.ChannelPools.CreateChannelSuccess, channelId); diff --git a/src/HouseofCat.RabbitMQ/Pools/ConnectionPool.cs b/src/HouseofCat.RabbitMQ/Pools/ConnectionPool.cs index e9059134..caefd819 100644 --- a/src/HouseofCat.RabbitMQ/Pools/ConnectionPool.cs +++ b/src/HouseofCat.RabbitMQ/Pools/ConnectionPool.cs @@ -2,7 +2,9 @@ using HouseofCat.Utilities.Errors; using Microsoft.Extensions.Logging; using RabbitMQ.Client; +using RabbitMQ.Client.OAuth2; using System; +using System.Net.Http; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Channels; @@ -33,6 +35,27 @@ public class ConnectionPool : IConnectionPool, IDisposable public RabbitOptions Options { get; } + public ConnectionPool(RabbitOptions options, HttpClientHandler oauth2ClientHandler = null) + { + Guard.AgainstNull(options, nameof(options)); + Options = options; + + _logger = LogHelper.GetLogger(); + + _connections = Channel.CreateBounded(Options.PoolOptions.MaxConnections); + + if (oauth2ClientHandler is not null) + { + + } + else + { + _connectionFactory = BuildConnectionFactory(); + } + + CreateConnectionsAsync().GetAwaiter().GetResult(); + } + public ConnectionPool(RabbitOptions options) { Guard.AgainstNull(options, nameof(options)); @@ -41,12 +64,12 @@ public ConnectionPool(RabbitOptions options) _logger = LogHelper.GetLogger(); _connections = Channel.CreateBounded(Options.PoolOptions.MaxConnections); - _connectionFactory = CreateConnectionFactory(); + _connectionFactory = BuildConnectionFactory(); CreateConnectionsAsync().GetAwaiter().GetResult(); } - private ConnectionFactory CreateConnectionFactory() + protected virtual ConnectionFactory BuildConnectionFactory() { var cf = new ConnectionFactory { @@ -91,6 +114,36 @@ private ConnectionFactory CreateConnectionFactory() return cf; } + protected virtual ConnectionFactory CreateConnectionFactory(RabbitOptions options, HttpClientHandler oauth2ClientHandler) + { + var oAuth2ClientBuilder = new OAuth2ClientBuilder( + Options.FactoryOptions.OAuth2Options.ClientId, + Options.FactoryOptions.OAuth2Options.ClientSecret, + new Uri(Options.FactoryOptions.OAuth2Options.TokenEndpointUrl)); + + oAuth2ClientBuilder.SetHttpClientHandler(oauth2ClientHandler); + var oAuth2Client = oAuth2ClientBuilder.Build(); + + var credentialsProvider = new OAuth2ClientCredentialsProvider( + Options.FactoryOptions.OAuth2Options.OAuth2ClientName, + oAuth2Client); + + var cf = new ConnectionFactory + { + AutomaticRecoveryEnabled = true, + TopologyRecoveryEnabled = Options.FactoryOptions.TopologyRecovery, + NetworkRecoveryInterval = TimeSpan.FromSeconds(Options.FactoryOptions.NetRecoveryTimeout), + ContinuationTimeout = TimeSpan.FromSeconds(Options.FactoryOptions.ContinuationTimeout), + RequestedHeartbeat = TimeSpan.FromSeconds(Options.FactoryOptions.HeartbeatInterval), + RequestedChannelMax = Options.FactoryOptions.MaxChannelsPerConnection, + DispatchConsumersAsync = Options.FactoryOptions.EnableDispatchConsumersAsync, + CredentialsProvider = credentialsProvider, + CredentialsRefresher = new TimerBasedCredentialRefresher() + }; + + return cf; + } + public IConnection CreateConnection(string connectionName) => _connectionFactory.CreateConnection(connectionName); // Allows overriding the mechanism for creating ConnectionHosts while a base one was implemented. @@ -103,7 +156,9 @@ private async Task CreateConnectionsAsync() for (int i = 0; i < Options.PoolOptions.MaxConnections; i++) { - var serviceName = string.IsNullOrEmpty(Options.PoolOptions.ServiceName) ? $"HoC.RabbitMQ:{i}" : $"{Options.PoolOptions.ServiceName}:{i}"; + var serviceName = string.IsNullOrEmpty(Options.PoolOptions.ServiceName) + ? $"HoC.RabbitMQ:{i}" + : $"{Options.PoolOptions.ServiceName}:{i}"; try { await _connections diff --git a/src/HouseofCat.Reflection/Extensions.cs b/src/HouseofCat.Reflection/Extensions.cs index d415b20b..961fbd22 100644 --- a/src/HouseofCat.Reflection/Extensions.cs +++ b/src/HouseofCat.Reflection/Extensions.cs @@ -2,38 +2,37 @@ using System.Collections.Generic; using System.Linq; -namespace HouseofCat.Reflection +namespace HouseofCat.Reflection; + +public static class Extensions { - public static class Extensions + public static Dictionary ToDictionary(this TIn input, bool readOnlyProperties = false) where TIn : class, new() { - public static Dictionary ToDictionary(this TIn input, bool readOnlyProperties = false) where TIn : class, new() - { - var acessor = TypeAccessor.Create(input.GetType()); - - if (readOnlyProperties) - { - return acessor - .GetMembers() - .Where(x => x.CanRead && x.CanWrite == false) - .ToDictionary(x => x.Name, x => acessor[input, x.Name]); - } + var acessor = TypeAccessor.Create(input.GetType()); + if (readOnlyProperties) + { return acessor .GetMembers() + .Where(x => x.CanRead && x.CanWrite == false) .ToDictionary(x => x.Name, x => acessor[input, x.Name]); } - public static TOut ToObject(this IDictionary data) where TOut : class, new() - { - var t = Generics.New.Instance.Invoke(); - var acessor = TypeAccessor.Create(t.GetType()); + return acessor + .GetMembers() + .ToDictionary(x => x.Name, x => acessor[input, x.Name]); + } - foreach (var kvp in data) - { - acessor[t, kvp.Key] = kvp.Value; - } + public static TOut ToObject(this IDictionary data) where TOut : class, new() + { + var t = new TOut(); + var acessor = TypeAccessor.Create(t.GetType()); - return t; + foreach (var kvp in data) + { + acessor[t, kvp.Key] = kvp.Value; } + + return t; } } diff --git a/src/HouseofCat.Reflection/New.cs b/src/HouseofCat.Reflection/New.cs index fdb940ac..c25fd967 100644 --- a/src/HouseofCat.Reflection/New.cs +++ b/src/HouseofCat.Reflection/New.cs @@ -2,36 +2,36 @@ using System.Linq.Expressions; using System.Runtime.Serialization; -namespace HouseofCat.Reflection +namespace HouseofCat.Reflection; + +public static class Generics { - public static class Generics + /// + /// A high performing New generic object instance creator. + /// Found this bit of cleverness on StackOverflow while dealing low performance using Generic instantiation. + /// https://stackoverflow.com/questions/6582259/fast-creation-of-objects-instead-of-activator-createinstancetype + /// + /// + [Obsolete("FormatterServices is no longer supported by Microsoft.")] + public static class New { - /// - /// A high performing New generic object instance creator. - /// Found this bit of cleverness on StackOverflow while dealing low performance using Generic instantiation. - /// https://stackoverflow.com/questions/6582259/fast-creation-of-objects-instead-of-activator-createinstancetype - /// - /// - public static class New - { - public static readonly Func Instance = Creator(); + public static readonly Func Instance = Creator(); - private static Func Creator() - { - Type t = typeof(T); - if (t == typeof(string)) - { return Expression.Lambda>(Expression.Constant(string.Empty)).Compile(); } + private static Func Creator() + { + Type t = typeof(T); + if (t == typeof(string)) + { return Expression.Lambda>(Expression.Constant(string.Empty)).Compile(); } - if (t.HasDefaultConstructor()) - { return Expression.Lambda>(Expression.New(t)).Compile(); } + if (t.HasDefaultConstructor()) + { return Expression.Lambda>(Expression.New(t)).Compile(); } - return () => (T)FormatterServices.GetUninitializedObject(t); - } + return () => (T)FormatterServices.GetUninitializedObject(t); } + } - public static bool HasDefaultConstructor(this Type t) - { - return t.IsValueType || t.GetConstructor(Type.EmptyTypes) != null; - } + public static bool HasDefaultConstructor(this Type t) + { + return t.IsValueType || t.GetConstructor(Type.EmptyTypes) != null; } }