From bedf6da25062dfc13567ee85940cb61ecdce7da2 Mon Sep 17 00:00:00 2001 From: pardahlman Date: Sun, 8 Oct 2017 15:29:41 +0200 Subject: [PATCH] (#279) Add a few different channel pool types --- .../Channel/AutoScalingChannelPool.cs | 124 ++++++++ .../Channel/ConcurrentChannelQueue.cs | 41 +++ src/RawRabbit/Channel/DynamicChannelPool.cs | 55 ++++ src/RawRabbit/Channel/ResilientChannelPool.cs | 50 ++++ src/RawRabbit/Channel/StaticChannelPool.cs | 134 +++++++++ .../Channel/ChannelFactoryTests.cs | 192 ++++++------- .../Channel/ChannelPoolTests.cs | 271 ++++++++++++++++++ .../Channel/DynamicChannelPoolTests.cs | 82 ++++++ 8 files changed, 846 insertions(+), 103 deletions(-) create mode 100644 src/RawRabbit/Channel/AutoScalingChannelPool.cs create mode 100644 src/RawRabbit/Channel/ConcurrentChannelQueue.cs create mode 100644 src/RawRabbit/Channel/DynamicChannelPool.cs create mode 100644 src/RawRabbit/Channel/ResilientChannelPool.cs create mode 100644 src/RawRabbit/Channel/StaticChannelPool.cs create mode 100644 test/RawRabbit.Tests/Channel/ChannelPoolTests.cs create mode 100644 test/RawRabbit.Tests/Channel/DynamicChannelPoolTests.cs diff --git a/src/RawRabbit/Channel/AutoScalingChannelPool.cs b/src/RawRabbit/Channel/AutoScalingChannelPool.cs new file mode 100644 index 00000000..df148c5a --- /dev/null +++ b/src/RawRabbit/Channel/AutoScalingChannelPool.cs @@ -0,0 +1,124 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.Client; +using RawRabbit.Channel.Abstraction; +using RawRabbit.Logging; + +namespace RawRabbit.Channel +{ + public class AutoScalingChannelPool : DynamicChannelPool + { + private readonly IChannelFactory _factory; + private readonly AutoScalingOptions _options; + private Timer _timer; + private readonly ILog _logger = LogProvider.For(); + + public AutoScalingChannelPool(IChannelFactory factory, AutoScalingOptions options) + { + _factory = factory; + _options = options; + ValidateOptions(options); + SetupScaling(); + } + + private static void ValidateOptions(AutoScalingOptions options) + { + if (options.MinimunPoolSize <= 0) + { + throw new ArgumentException($"Minimum Pool Size needs to be a positive integer. Got: {options.MinimunPoolSize}"); + } + if (options.MaximumPoolSize <= 0) + { + throw new ArgumentException($"Maximum Pool Size needs to be a positive integer. Got: {options.MinimunPoolSize}"); + } + if (options.MinimunPoolSize > options.MaximumPoolSize) + { + throw new ArgumentException($"The Maximum Pool Size ({options.MaximumPoolSize}) must be larger than the Minimum Pool Size ({options.MinimunPoolSize})"); + } + } + + public override async Task GetAsync(CancellationToken ct = default(CancellationToken)) + { + var activeChannels = GetActiveChannelCount(); + if (activeChannels < _options.MinimunPoolSize) + { + _logger.Debug("Pool currently has {channelCount}, which is lower than the minimal pool size {minimalPoolSize}. Creating channels.", activeChannels, _options.MinimunPoolSize); + var delta = _options.MinimunPoolSize - Pool.Count; + for (var i = 0; i < delta; i++) + { + var channel = await _factory.CreateChannelAsync(ct); + Add(channel); + } + } + + return await base.GetAsync(ct); + } + + public void SetupScaling() + { + if (_options.RefreshInterval == TimeSpan.MaxValue || _options.RefreshInterval == TimeSpan.MinValue) + { + return; + } + + _timer = new Timer(state => + { + var workPerChannel = ChannelRequestQueue.Count / Pool.Count; + var scaleUp = Pool.Count < _options.MaximumPoolSize; + var scaleDown = _options.MinimunPoolSize < Pool.Count; + + _logger.Debug("Channel pool currently has {channelCount} channels open and a total workload of {totalWorkload}", Pool.Count, ChannelRequestQueue.Count); + if (scaleUp && _options.DesiredAverageWorkload < workPerChannel) + { + _logger.Debug("The estimated workload is {averageWorkload} operations/channel, which is higher than the desired workload ({desiredAverageWorkload}). Creating channel.", workPerChannel, _options.DesiredAverageWorkload); + _factory + .CreateChannelAsync() + .ContinueWith(tChannel => + { + Add(tChannel.Result); + }); + return; + } + + if (scaleDown && workPerChannel < _options.DesiredAverageWorkload) + { + _logger.Debug("The estimated workload is {averageWorkload} operations/channel, which is lower than the desired workload ({desiredAverageWorkload}). Creating channel.", workPerChannel, _options.DesiredAverageWorkload); + var toRemove = Pool.FirstOrDefault(); + Pool.Remove(toRemove); + Timer disposeTimer = null; + disposeTimer = new Timer(o => + { + (o as IModel)?.Dispose(); + disposeTimer?.Dispose(); + }, toRemove, _options.GracefulCloseInterval, new TimeSpan(-1)); + } + }, null, _options.RefreshInterval, _options.RefreshInterval); + } + + public override void Dispose() + { + base.Dispose(); + _timer?.Dispose(); + } + } + + public class AutoScalingOptions + { + public int DesiredAverageWorkload { get; set; } + public int MinimunPoolSize { get; set; } + public int MaximumPoolSize { get; set; } + public TimeSpan RefreshInterval { get; set; } + public TimeSpan GracefulCloseInterval { get; set; } + + public static AutoScalingOptions Default => new AutoScalingOptions + { + MinimunPoolSize = 1, + MaximumPoolSize = 10, + DesiredAverageWorkload = 20000, + RefreshInterval = TimeSpan.FromSeconds(10), + GracefulCloseInterval = TimeSpan.FromSeconds(30) + }; + } +} diff --git a/src/RawRabbit/Channel/ConcurrentChannelQueue.cs b/src/RawRabbit/Channel/ConcurrentChannelQueue.cs new file mode 100644 index 00000000..f2e2b764 --- /dev/null +++ b/src/RawRabbit/Channel/ConcurrentChannelQueue.cs @@ -0,0 +1,41 @@ +using System; +using System.Collections.Concurrent; +using System.Threading.Tasks; +using RabbitMQ.Client; + +namespace RawRabbit.Channel +{ + public class ConcurrentChannelQueue + { + private readonly ConcurrentQueue> _queue; + + public EventHandler Queued; + + public ConcurrentChannelQueue() + { + _queue = new ConcurrentQueue>(); + } + + public TaskCompletionSource Enqueue() + { + var modelTsc = new TaskCompletionSource(); + var raiseEvent = _queue.IsEmpty; + _queue.Enqueue(modelTsc); + if (raiseEvent) + { + Queued?.Invoke(this, EventArgs.Empty); + } + + return modelTsc; + } + + public bool TryDequeue(out TaskCompletionSource channel) + { + return _queue.TryDequeue(out channel); + } + + public bool IsEmpty => _queue.IsEmpty; + + public int Count => _queue.Count; + } +} diff --git a/src/RawRabbit/Channel/DynamicChannelPool.cs b/src/RawRabbit/Channel/DynamicChannelPool.cs new file mode 100644 index 00000000..342426ef --- /dev/null +++ b/src/RawRabbit/Channel/DynamicChannelPool.cs @@ -0,0 +1,55 @@ +using System.Collections.Generic; +using System.Linq; +using RabbitMQ.Client; + +namespace RawRabbit.Channel +{ + public class DynamicChannelPool : StaticChannelPool + { + public DynamicChannelPool() + : this(Enumerable.Empty()) { } + + public DynamicChannelPool(IEnumerable seed) + : base(seed) { } + + public void Add(params IModel[] channels) + { + Add(channels.ToList()); + } + + public void Add(IEnumerable channels) + { + foreach (var channel in channels) + { + ConfigureRecovery(channel); + if (Pool.Contains(channel)) + { + continue; + } + Pool.AddLast(channel); + } + } + + public void Remove(int numberOfChannels = 1) + { + var toRemove = Pool + .Take(numberOfChannels) + .ToList(); + Remove(toRemove); + } + + public void Remove(params IModel[] channels) + { + Remove(channels.ToList()); + } + + public void Remove(IEnumerable channels) + { + foreach (var channel in channels) + { + Pool.Remove(channel); + Recoverables.Remove(channel as IRecoverable); + } + } + } +} diff --git a/src/RawRabbit/Channel/ResilientChannelPool.cs b/src/RawRabbit/Channel/ResilientChannelPool.cs new file mode 100644 index 00000000..b1275b1b --- /dev/null +++ b/src/RawRabbit/Channel/ResilientChannelPool.cs @@ -0,0 +1,50 @@ +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.Client; +using RawRabbit.Channel.Abstraction; + +namespace RawRabbit.Channel +{ + public class ResilientChannelPool : DynamicChannelPool + { + protected readonly IChannelFactory ChannelFactory; + private readonly int _desiredChannelCount; + + public ResilientChannelPool(IChannelFactory factory, int channelCount) + : this(factory, CreateSeed(factory, channelCount)) { } + + public ResilientChannelPool(IChannelFactory factory) + : this(factory, Enumerable.Empty()) { } + + public ResilientChannelPool(IChannelFactory factory, IEnumerable seed) : base(seed) + { + ChannelFactory = factory; + _desiredChannelCount = seed.Count(); + } + + private static IEnumerable CreateSeed(IChannelFactory factory, int channelCount) + { + for (var i = 0; i < channelCount; i++) + { + yield return factory.CreateChannelAsync().GetAwaiter().GetResult(); + } + } + + public override async Task GetAsync(CancellationToken ct = default(CancellationToken)) + { + var currentCount = GetActiveChannelCount(); + if (currentCount < _desiredChannelCount) + { + var createCount = _desiredChannelCount - currentCount; + for (var i = 0; i < createCount; i++) + { + var channel = await ChannelFactory.CreateChannelAsync(ct); + Add(channel); + } + } + return await base.GetAsync(ct); + } + } +} diff --git a/src/RawRabbit/Channel/StaticChannelPool.cs b/src/RawRabbit/Channel/StaticChannelPool.cs new file mode 100644 index 00000000..287dcdc5 --- /dev/null +++ b/src/RawRabbit/Channel/StaticChannelPool.cs @@ -0,0 +1,134 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.Client; +using RawRabbit.Exceptions; +using RawRabbit.Logging; + +namespace RawRabbit.Channel +{ + public interface IChannelPool + { + Task GetAsync(CancellationToken ct = default(CancellationToken)); + } + + public class StaticChannelPool : IDisposable, IChannelPool + { + protected readonly LinkedList Pool; + protected readonly List Recoverables; + protected readonly ConcurrentChannelQueue ChannelRequestQueue; + private readonly object _workLock = new object(); + private LinkedListNode _current; + private readonly ILog _logger = LogProvider.For(); + + public StaticChannelPool(IEnumerable seed) + { + seed = seed.ToList(); + Pool = new LinkedList(seed); + Recoverables = new List(); + ChannelRequestQueue = new ConcurrentChannelQueue(); + ChannelRequestQueue.Queued += (sender, args) => StartServeChannels(); + foreach (var channel in seed) + { + ConfigureRecovery(channel); + } + } + + private void StartServeChannels() + { + if (ChannelRequestQueue.IsEmpty || Pool.Count == 0) + { + _logger.Debug("Unable to serve channels. The pool consists of {channelCount} channels and {channelRequests} requests for channels."); + return; + } + + if (!Monitor.TryEnter(_workLock)) + { + return; + } + + _logger.Debug("Starting serving channels."); + do + { + _current = _current?.Next ?? Pool.First; + if (_current.Value.IsClosed) + { + Pool.Remove(_current); + if (Pool.Count != 0) continue; + if (Recoverables.Count == 0) + { + throw new ChannelAvailabilityException("No open channels in pool and no recoverable channels"); + } + return; + } + if(ChannelRequestQueue.TryDequeue(out var cTsc)) + { + cTsc.TrySetResult(_current.Value); + } + } while (!ChannelRequestQueue.IsEmpty); + Monitor.Exit(_workLock); + } + + protected virtual int GetActiveChannelCount() + { + return Enumerable + .Concat(Pool, Recoverables) + .Distinct() + .Count(); + } + + protected void ConfigureRecovery(IModel channel) + { + if (!(channel is IRecoverable recoverable)) + { + _logger.Debug("Channel {channelNumber} is not recoverable. Recovery disabled for this channel.", channel.ChannelNumber); + return; + } + if (channel.IsClosed && channel.CloseReason != null && channel.CloseReason.Initiator == ShutdownInitiator.Application) + { + _logger.Debug("{Channel {channelNumber} is closed by the application. Channel will remain closed and not be part of the channel pool", channel.ChannelNumber); + return; + } + Recoverables.Add(recoverable); + recoverable.Recovery += (sender, args) => + { + _logger.Info("Channel {channelNumber} has been recovered and will be re-added to the channel pool", channel.ChannelNumber); + if (Pool.Contains(channel)) + { + return; + } + Pool.AddLast(channel); + StartServeChannels(); + }; + channel.ModelShutdown += (sender, args) => + { + if (args.Initiator == ShutdownInitiator.Application) + { + _logger.Info("Channel {channelNumber} is being closed by the application. No recovery will be performed.", channel.ChannelNumber); + Recoverables.Remove(recoverable); + } + }; + } + + public virtual Task GetAsync(CancellationToken ct = default(CancellationToken)) + { + var channelTcs = ChannelRequestQueue.Enqueue(); + ct.Register(() => channelTcs.SetCanceled()); + return channelTcs.Task; + } + + public virtual void Dispose() + { + foreach (var channel in Pool) + { + channel?.Dispose(); + } + foreach (var recoverable in Recoverables) + { + (recoverable as IModel)?.Dispose(); + } + } + } +} diff --git a/test/RawRabbit.Tests/Channel/ChannelFactoryTests.cs b/test/RawRabbit.Tests/Channel/ChannelFactoryTests.cs index 7cabfdec..ee4b4e11 100644 --- a/test/RawRabbit.Tests/Channel/ChannelFactoryTests.cs +++ b/test/RawRabbit.Tests/Channel/ChannelFactoryTests.cs @@ -1,149 +1,135 @@ -using System.Collections; +using System; using System.Collections.Generic; using System.Threading.Tasks; using Moq; using RabbitMQ.Client; using RawRabbit.Channel; using RawRabbit.Configuration; -using RawRabbit.Logging; +using RawRabbit.Exceptions; using Xunit; namespace RawRabbit.Tests.Channel { public class ChannelFactoryTests { - private readonly Mock _connectionFactory; - private readonly RawRabbitConfiguration _config; - private readonly ChannelFactoryConfiguration _channelConfig; - private readonly Mock _connection; - private readonly Mock _firstChannel; - private readonly Mock _secondChannel; - private readonly Mock _thirdChannel; - - public ChannelFactoryTests() + [Fact] + public async Task Should_Throw_Exception_If_Connection_Is_Closed_By_Application() { - _connectionFactory = new Mock(); - _connection = new Mock(); - _firstChannel = new Mock(); - _secondChannel = new Mock(); - _thirdChannel = new Mock(); - _config = RawRabbitConfiguration.Local; - _channelConfig = ChannelFactoryConfiguration.Default; + /* Setup */ + var connectionFactroy = new Mock(); + var connection = new Mock(); + connectionFactroy + .Setup(c => c.CreateConnection( + It.IsAny>())) + .Returns(connection.Object); + connection + .Setup(c => c.IsOpen) + .Returns(false); + connection + .Setup(c => c.CloseReason) + .Returns(new ShutdownEventArgs(ShutdownInitiator.Application, 0, string.Empty)); + var channelFactory = new ChannelFactory(connectionFactroy.Object, RawRabbitConfiguration.Local); - _connectionFactory - .Setup(c => c.CreateConnection(It.IsAny>())) - .Returns(_connection.Object); + /* Test */ + /* Assert */ + try + { + await channelFactory.CreateChannelAsync(); + Assert.True(false, $"Connection is closed by Application, expected {nameof(ChannelAvailabilityException)}."); + } + catch (ChannelAvailabilityException e) + { + Assert.True(true, e.Message); + } } - [Theory] - [InlineData(0)] - [InlineData(1)] - [InlineData(2)] - [InlineData(3)] - public async Task Should_Create_Initial_Channels_Based_On_Config(int initialChannelCount) + [Fact] + public async Task Should_Throw_Exception_If_Connection_Is_Closed_By_Lib_But_Is_Not_Recoverable() { /* Setup */ - _channelConfig.MaxChannelCount = initialChannelCount; - _channelConfig.InitialChannelCount = initialChannelCount; - _connection + var connectionFactroy = new Mock(); + var connection = new Mock(); + connectionFactroy + .Setup(c => c.CreateConnection( + It.IsAny>())) + .Returns(connection.Object); + connection .Setup(c => c.IsOpen) - .Returns(true); - _connection - .Setup(c=> c.CreateModel()) - .Returns(_firstChannel.Object) - .Verifiable(); + .Returns(false); + connection + .Setup(c => c.CloseReason) + .Returns(new ShutdownEventArgs(ShutdownInitiator.Library, 0, string.Empty)); + var channelFactory = new ChannelFactory(connectionFactroy.Object, RawRabbitConfiguration.Local); /* Test */ - var factory = new ChannelFactory(_connectionFactory.Object, _config, _channelConfig); - /* Assert */ - _connection.Verify(c => c.CreateModel(), Times.Exactly(initialChannelCount)); + try + { + await channelFactory.CreateChannelAsync(); + Assert.True(false, $"Connection is closed by Application, expected {nameof(ChannelAvailabilityException)}."); + } + catch (ChannelAvailabilityException e) + { + Assert.True(true, e.Message); + } } [Fact] - public async Task Should_Round_Robin_Between_Existing_Channels() + public async Task Should_Return_Channel_From_Connection() { /* Setup */ - _channelConfig.MaxChannelCount = 3; - _channelConfig.InitialChannelCount = 3; - _connection - .Setup(c => c.IsOpen) - .Returns(true); - _connection - .SetupSequence(c => c.CreateModel()) - .Returns(_thirdChannel.Object) - .Returns(_firstChannel.Object) - .Returns(_secondChannel.Object); - _firstChannel - .Setup(c => c.IsOpen) - .Returns(true); - _secondChannel + var channel = new Mock(); + var connectionFactroy = new Mock(); + var connection = new Mock(); + connectionFactroy + .Setup(c => c.CreateConnection( + It.IsAny>())) + .Returns(connection.Object); + connection + .Setup(c => c.CreateModel()) + .Returns(channel.Object); + connection .Setup(c => c.IsOpen) .Returns(true); - _thirdChannel - .Setup(c => c.IsOpen) - .Returns(true); - var factory = new ChannelFactory(_connectionFactory.Object, _config, _channelConfig); + var channelFactory = new ChannelFactory(connectionFactroy.Object, RawRabbitConfiguration.Local); /* Test */ - var first = await factory.GetChannelAsync(); - var second = await factory.GetChannelAsync(); - var third = await factory.GetChannelAsync(); - var firstAgain = await factory.GetChannelAsync(); - var secondAgain = await factory.GetChannelAsync(); - var thirdAgain = await factory.GetChannelAsync(); + var retrievedChannel = await channelFactory.CreateChannelAsync(); /* Assert */ - Assert.Equal(first, _firstChannel.Object); - Assert.Equal(second, _secondChannel.Object); - Assert.Equal(third, _thirdChannel.Object); - Assert.Equal(firstAgain, _firstChannel.Object); - Assert.Equal(secondAgain, _secondChannel.Object); - Assert.Equal(thirdAgain, _thirdChannel.Object); + Assert.Equal(channel.Object, retrievedChannel); } [Fact] - public async Task Should_Not_Return_Close_Channel() + public async Task Should_Wait_For_Connection_To_Recover_Before_Returning_Channel() { /* Setup */ - _channelConfig.MaxChannelCount = 3; - _channelConfig.InitialChannelCount = 3; - _connection - .Setup(c => c.IsOpen) - .Returns(true); - _connection - .SetupSequence(c => c.CreateModel()) - .Returns(_thirdChannel.Object) - .Returns(_firstChannel.Object) - .Returns(_secondChannel.Object); - _firstChannel + var channel = new Mock(); + var connectionFactroy = new Mock(); + var connection = new Mock(); + var recoverable = connection.As(); + connectionFactroy + .Setup(c => c.CreateConnection( + It.IsAny>())) + .Returns(connection.Object); + connection + .Setup(c => c.CreateModel()) + .Returns(channel.Object); + connection .Setup(c => c.IsOpen) - .Returns(true); - _secondChannel - .SetupSequence( c => c.IsOpen) - .Returns(true) .Returns(false); - _thirdChannel - .Setup(c => c.IsOpen) - .Returns(true); - _secondChannel - .Setup(c => c.CloseReason) - .Returns(new ShutdownEventArgs(ShutdownInitiator.Application, 200, "Test")); - var factory = new ChannelFactory(_connectionFactory.Object, _config, _channelConfig); + var channelFactory = new ChannelFactory(connectionFactroy.Object, RawRabbitConfiguration.Local); /* Test */ - var first = await factory.GetChannelAsync(); - var second = await factory.GetChannelAsync(); - var third = await factory.GetChannelAsync(); - var firstAgain = await factory.GetChannelAsync(); - var thirdAgain = await factory.GetChannelAsync(); - /* Assert */ - Assert.Equal(first, _firstChannel.Object); - Assert.Equal(second, _secondChannel.Object); - Assert.Equal(third, _thirdChannel.Object); - Assert.Equal(firstAgain, _firstChannel.Object); - Assert.Equal(thirdAgain, _thirdChannel.Object); + var channelTask = channelFactory.CreateChannelAsync(); + channelTask.Wait(TimeSpan.FromMilliseconds(30)); + Assert.False(channelTask.IsCompleted); + + recoverable.Raise(r => r.Recovery += null, null, null); + await channelTask; + + Assert.Equal(channel.Object, channelTask.Result); } } } diff --git a/test/RawRabbit.Tests/Channel/ChannelPoolTests.cs b/test/RawRabbit.Tests/Channel/ChannelPoolTests.cs new file mode 100644 index 00000000..4d08890b --- /dev/null +++ b/test/RawRabbit.Tests/Channel/ChannelPoolTests.cs @@ -0,0 +1,271 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Moq; +using RabbitMQ.Client; +using RawRabbit.Channel; +using RawRabbit.Exceptions; +using Xunit; + +namespace RawRabbit.Tests.Channel +{ + public class ChannelPoolTests + { + [Fact] + public async Task Should_Serve_Open_Channels_In_A_Round_Robin_Manner() + { + /* Setup */ + var mockObjects = new List> {new Mock(), new Mock(), new Mock()}; + foreach (var mockObject in mockObjects) + { + mockObject.As(); + mockObject + .Setup(m => m.IsOpen) + .Returns(true); + } + var pool = new StaticChannelPool(mockObjects.Select(m => m.Object)); + + /* Test */ + var first = await pool.GetAsync(); + var second = await pool.GetAsync(); + var third = await pool.GetAsync(); + var forth = await pool.GetAsync(); + + /* Assert */ + Assert.Equal(first, mockObjects[0].Object); + Assert.Equal(second, mockObjects[1].Object); + Assert.Equal(third, mockObjects[2].Object); + Assert.Equal(forth, mockObjects[0].Object); + } + + [Fact] + public async Task Should_Not_Serve_Closed_Channels() + { + /* Setup */ + var openChannel = new Mock { Name = "Always open"}; + var toCloseChannel = new Mock { Name = "Will close"}; + + openChannel + .Setup(c => c.IsClosed) + .Returns(false); + + toCloseChannel + .SetupSequence(model => model.IsClosed) + .Returns(false) + .Returns(true); + var pool = new StaticChannelPool(new []{openChannel.Object, toCloseChannel.Object}); + + /* Test */ + var first = await pool.GetAsync(); + var second = await pool.GetAsync(); + var third = await pool.GetAsync(); + var forth = await pool.GetAsync(); + + /* Assert */ + Assert.Equal(first, openChannel.Object); + Assert.Equal(second, toCloseChannel.Object); + Assert.Equal(third, openChannel.Object); + Assert.Equal(forth, openChannel.Object); + } + + [Fact] + public async Task Should_Serve_Recovered_Channels() + { + /* Setup */ + var openChannel = new Mock { Name = "Always open" }; + var closedChannel = new Mock { Name = "Will Recover" }; + var recoverable = closedChannel.As(); + + openChannel + .Setup(c => c.IsClosed) + .Returns(false); + + closedChannel + .SetupSequence(model => model.IsClosed) + .Returns(true) + .Returns(true) + .Returns(false); + var pool = new StaticChannelPool(new[] { openChannel.Object, closedChannel.Object }); + + /* Test */ + var first = await pool.GetAsync(); + var second = await pool.GetAsync(); + recoverable.Raise(model => model.Recovery += null, null, null); + var third = await pool.GetAsync(); + var forth = await pool.GetAsync(); + + /* Assert */ + Assert.Equal(first, openChannel.Object); + Assert.Equal(second, openChannel.Object); + Assert.Equal(third, closedChannel.Object); + Assert.Equal(forth, openChannel.Object); + } + + [Fact] + public async Task Should_Throw_Exception_If_All_Channels_Are_Closed_And_None_Is_Recoverable() + { + /* Setup */ + var mockObjects = new List> { new Mock(), new Mock(), new Mock() }; + foreach (var mockObject in mockObjects) + { + mockObject + .Setup(m => m.IsClosed) + .Returns(true); + } + var pool = new StaticChannelPool(mockObjects.Select(m => m.Object)); + + /* Test */ + try + { + await pool.GetAsync(); + Assert.True(false, $"{nameof(ChannelAvailabilityException)} should be thrown"); + } + catch (ChannelAvailabilityException e) + { + Assert.True(true, e.Message); + } + } + + [Fact] + public async Task Should_Not_Throw_If_All_Channels_Are_Closed_But_At_Least_One_Is_Recoverable() + { + /* Setup */ + var closedChannel = new Mock { Name = "Always open" }; + var recoverableChannel = new Mock { Name = "Will Recover" }; + var recoverable = recoverableChannel.As(); + + closedChannel + .Setup(c => c.IsClosed) + .Returns(true); + + recoverableChannel + .SetupSequence(model => model.IsClosed) + .Returns(true) + .Returns(true) + .Returns(false); + + var pool = new StaticChannelPool(new[] { recoverableChannel.Object, closedChannel.Object }); + + /* Test */ + var channelTask = pool.GetAsync(); + channelTask.Wait(TimeSpan.FromMilliseconds(20)); + Assert.False(channelTask.IsCompleted, "No channels should be open,"); + + recoverable.Raise(r => r.Recovery += null, null, null); + await channelTask; + + Assert.Equal(recoverableChannel.Object, channelTask.Result); + } + + [Fact] + public void Should_Be_Able_To_Have_Multiple_Pending_Requests() + { + /* Setup */ + const int numberOfCalls = 200; + var taskArray = new Task[numberOfCalls]; + var mockObjects = new List> { new Mock(), new Mock(), new Mock() }; + foreach (var mockObject in mockObjects) + { + mockObject.As(); + mockObject + .Setup(m => m.IsClosed) + .Returns(false); + } + var pool = new StaticChannelPool(mockObjects.Select(m => m.Object)); + + + /* Test */ + for (var i = 0; i < numberOfCalls; i++) + { + taskArray[i] = pool.GetAsync(); + } + + Task.WaitAll(taskArray); + + Assert.True(true, "No exception thrown with multiple pending"); + } + + [Fact] + public async Task Should_Throw_Exception_If_All_Channels_Are_Closed_And_Close_Reason_For_All_Recoverable_Channels_Are_Application() + { + /* Setup */ + var mockObjects = new List> { new Mock(), new Mock(), new Mock() }; + foreach (var mockObject in mockObjects) + { + mockObject.As(); + mockObject + .Setup(m => m.IsClosed) + .Returns(true); + mockObject + .Setup(c => c.CloseReason) + .Returns(new ShutdownEventArgs(ShutdownInitiator.Application, 0, "")); + } + var pool = new StaticChannelPool(mockObjects.Select(m => m.Object)); + + /* Test */ + try + { + var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(50)); + await pool.GetAsync(cts.Token); + Assert.True(false, $"Should throw {nameof(ChannelAvailabilityException)}."); + } + catch (ChannelAvailabilityException e) + { + Assert.True(true, e.Message); + } + } + + [Fact] + public async Task Should_Be_Able_To_Cancel_With_Token() + { + /* Setup */ + var closedChannel = new Mock { Name = "Closed Channel"}; + closedChannel.As(); + closedChannel + .Setup(m => m.IsClosed) + .Returns(true); + var pool = new StaticChannelPool(new []{closedChannel.Object}); + var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(50)); + + /* Test */ + /* Assert */ + try + { + await pool.GetAsync(cts.Token); + Assert.True(false, $"Task cancelled before completed, should throw {nameof(OperationCanceledException)}"); + } + catch (OperationCanceledException e) + { + Assert.True(true, e.Message); + } + } + + [Fact] + public async Task Should_Throw_Exception_If_Recoverable_Channel_Is_Closed_By_Application() + { + /* Setup */ + var closedChannel = new Mock { Name = "Closed Channel" }; + closedChannel.As(); + closedChannel + .SetupSequence(m => m.IsClosed) + .Returns(false) + .Returns(true); + var pool = new StaticChannelPool(new[] { closedChannel.Object }); + closedChannel.Raise(c =>c.ModelShutdown += null, null, new ShutdownEventArgs(ShutdownInitiator.Application, 0, string.Empty)); + + /* Test */ + /* Assert */ + try + { + await pool.GetAsync(); + Assert.True(false, $"Task completed, should have thrown {nameof(ChannelAvailabilityException)}"); + } + catch (ChannelAvailabilityException e) + { + Assert.True(true, e.Message); + } + } + } +} diff --git a/test/RawRabbit.Tests/Channel/DynamicChannelPoolTests.cs b/test/RawRabbit.Tests/Channel/DynamicChannelPoolTests.cs new file mode 100644 index 00000000..0f9cb738 --- /dev/null +++ b/test/RawRabbit.Tests/Channel/DynamicChannelPoolTests.cs @@ -0,0 +1,82 @@ +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Moq; +using RabbitMQ.Client; +using RawRabbit.Channel; +using Xunit; + +namespace RawRabbit.Tests.Channel +{ + public class DynamicChannelPoolTests + { + [Fact] + public async Task Should_Be_Able_To_Add_And_Use_Channels() + { + /* Setup */ + var channels = new List> {new Mock(), new Mock(), new Mock()}; + foreach (var channel in channels) + { + channel + .Setup(c => c.IsClosed) + .Returns(false); + } + var pool = new DynamicChannelPool(); + pool.Add(channels.Select(c => c.Object)); + + /* Test */ + var firstChannel = await pool.GetAsync(); + var secondChannel = await pool.GetAsync(); + var thirdChannel = await pool.GetAsync(); + + /* Assert */ + Assert.Equal(firstChannel, channels[0].Object); + Assert.Equal(secondChannel, channels[1].Object); + Assert.Equal(thirdChannel, channels[2].Object); + } + + [Fact] + public void Should_Not_Throw_Exception_If_Trying_To_Remove_Channel_Not_In_Pool() + { + /* Setup */ + var pool = new DynamicChannelPool(); + var channel = new Mock(); + + /* Test */ + pool.Remove(channel.Object); + + /* Assert */ + Assert.True(true, "Successfully remove a channel not in the pool"); + } + + [Fact] + public async Task Should_Remove_Channels_Based_On_Count() + { + /* Setup */ + var channels = new List> + { + new Mock{ Name = "First" }, + new Mock{ Name = "Second"}, + new Mock{ Name = "Third"} + }; + foreach (var channel in channels) + { + channel + .Setup(c => c.IsOpen) + .Returns(true); + } + var pool = new DynamicChannelPool(channels.Select(c => c.Object)); + + /* Test */ + pool.Remove(2); + var firstChannel = await pool.GetAsync(); + var secondChannel = await pool.GetAsync(); + var thirdChannel = await pool.GetAsync(); + + /* Assert */ + Assert.Equal(firstChannel, channels[2].Object); + Assert.Equal(secondChannel, channels[2].Object); + Assert.Equal(thirdChannel, channels[2].Object); + } + } +}