Skip to content

Commit

Permalink
(#279) Add a few different channel pool types
Browse files Browse the repository at this point in the history
  • Loading branch information
pardahlman committed Oct 8, 2017
1 parent 74e6ea8 commit bedf6da
Show file tree
Hide file tree
Showing 8 changed files with 846 additions and 103 deletions.
124 changes: 124 additions & 0 deletions src/RawRabbit/Channel/AutoScalingChannelPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RawRabbit.Channel.Abstraction;
using RawRabbit.Logging;

namespace RawRabbit.Channel
{
public class AutoScalingChannelPool : DynamicChannelPool
{
private readonly IChannelFactory _factory;
private readonly AutoScalingOptions _options;
private Timer _timer;
private readonly ILog _logger = LogProvider.For<AutoScalingChannelPool>();

public AutoScalingChannelPool(IChannelFactory factory, AutoScalingOptions options)
{
_factory = factory;
_options = options;
ValidateOptions(options);
SetupScaling();
}

private static void ValidateOptions(AutoScalingOptions options)
{
if (options.MinimunPoolSize <= 0)
{
throw new ArgumentException($"Minimum Pool Size needs to be a positive integer. Got: {options.MinimunPoolSize}");
}
if (options.MaximumPoolSize <= 0)
{
throw new ArgumentException($"Maximum Pool Size needs to be a positive integer. Got: {options.MinimunPoolSize}");
}
if (options.MinimunPoolSize > options.MaximumPoolSize)
{
throw new ArgumentException($"The Maximum Pool Size ({options.MaximumPoolSize}) must be larger than the Minimum Pool Size ({options.MinimunPoolSize})");
}
}

public override async Task<IModel> GetAsync(CancellationToken ct = default(CancellationToken))
{
var activeChannels = GetActiveChannelCount();
if (activeChannels < _options.MinimunPoolSize)
{
_logger.Debug("Pool currently has {channelCount}, which is lower than the minimal pool size {minimalPoolSize}. Creating channels.", activeChannels, _options.MinimunPoolSize);
var delta = _options.MinimunPoolSize - Pool.Count;
for (var i = 0; i < delta; i++)
{
var channel = await _factory.CreateChannelAsync(ct);
Add(channel);
}
}

return await base.GetAsync(ct);
}

public void SetupScaling()
{
if (_options.RefreshInterval == TimeSpan.MaxValue || _options.RefreshInterval == TimeSpan.MinValue)
{
return;
}

_timer = new Timer(state =>
{
var workPerChannel = ChannelRequestQueue.Count / Pool.Count;
var scaleUp = Pool.Count < _options.MaximumPoolSize;
var scaleDown = _options.MinimunPoolSize < Pool.Count;
_logger.Debug("Channel pool currently has {channelCount} channels open and a total workload of {totalWorkload}", Pool.Count, ChannelRequestQueue.Count);
if (scaleUp && _options.DesiredAverageWorkload < workPerChannel)
{
_logger.Debug("The estimated workload is {averageWorkload} operations/channel, which is higher than the desired workload ({desiredAverageWorkload}). Creating channel.", workPerChannel, _options.DesiredAverageWorkload);
_factory
.CreateChannelAsync()
.ContinueWith(tChannel =>
{
Add(tChannel.Result);
});
return;
}
if (scaleDown && workPerChannel < _options.DesiredAverageWorkload)
{
_logger.Debug("The estimated workload is {averageWorkload} operations/channel, which is lower than the desired workload ({desiredAverageWorkload}). Creating channel.", workPerChannel, _options.DesiredAverageWorkload);
var toRemove = Pool.FirstOrDefault();
Pool.Remove(toRemove);
Timer disposeTimer = null;
disposeTimer = new Timer(o =>
{
(o as IModel)?.Dispose();
disposeTimer?.Dispose();
}, toRemove, _options.GracefulCloseInterval, new TimeSpan(-1));
}
}, null, _options.RefreshInterval, _options.RefreshInterval);
}

public override void Dispose()
{
base.Dispose();
_timer?.Dispose();
}
}

public class AutoScalingOptions
{
public int DesiredAverageWorkload { get; set; }
public int MinimunPoolSize { get; set; }
public int MaximumPoolSize { get; set; }
public TimeSpan RefreshInterval { get; set; }
public TimeSpan GracefulCloseInterval { get; set; }

public static AutoScalingOptions Default => new AutoScalingOptions
{
MinimunPoolSize = 1,
MaximumPoolSize = 10,
DesiredAverageWorkload = 20000,
RefreshInterval = TimeSpan.FromSeconds(10),
GracefulCloseInterval = TimeSpan.FromSeconds(30)
};
}
}
41 changes: 41 additions & 0 deletions src/RawRabbit/Channel/ConcurrentChannelQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using RabbitMQ.Client;

namespace RawRabbit.Channel
{
public class ConcurrentChannelQueue
{
private readonly ConcurrentQueue<TaskCompletionSource<IModel>> _queue;

public EventHandler Queued;

public ConcurrentChannelQueue()
{
_queue = new ConcurrentQueue<TaskCompletionSource<IModel>>();
}

public TaskCompletionSource<IModel> Enqueue()
{
var modelTsc = new TaskCompletionSource<IModel>();
var raiseEvent = _queue.IsEmpty;
_queue.Enqueue(modelTsc);
if (raiseEvent)
{
Queued?.Invoke(this, EventArgs.Empty);
}

return modelTsc;
}

public bool TryDequeue(out TaskCompletionSource<IModel> channel)
{
return _queue.TryDequeue(out channel);
}

public bool IsEmpty => _queue.IsEmpty;

public int Count => _queue.Count;
}
}
55 changes: 55 additions & 0 deletions src/RawRabbit/Channel/DynamicChannelPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System.Collections.Generic;
using System.Linq;
using RabbitMQ.Client;

namespace RawRabbit.Channel
{
public class DynamicChannelPool : StaticChannelPool
{
public DynamicChannelPool()
: this(Enumerable.Empty<IModel>()) { }

public DynamicChannelPool(IEnumerable<IModel> seed)
: base(seed) { }

public void Add(params IModel[] channels)
{
Add(channels.ToList());
}

public void Add(IEnumerable<IModel> channels)
{
foreach (var channel in channels)
{
ConfigureRecovery(channel);
if (Pool.Contains(channel))
{
continue;
}
Pool.AddLast(channel);
}
}

public void Remove(int numberOfChannels = 1)
{
var toRemove = Pool
.Take(numberOfChannels)
.ToList();
Remove(toRemove);
}

public void Remove(params IModel[] channels)
{
Remove(channels.ToList());
}

public void Remove(IEnumerable<IModel> channels)
{
foreach (var channel in channels)
{
Pool.Remove(channel);
Recoverables.Remove(channel as IRecoverable);
}
}
}
}
50 changes: 50 additions & 0 deletions src/RawRabbit/Channel/ResilientChannelPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RawRabbit.Channel.Abstraction;

namespace RawRabbit.Channel
{
public class ResilientChannelPool : DynamicChannelPool
{
protected readonly IChannelFactory ChannelFactory;
private readonly int _desiredChannelCount;

public ResilientChannelPool(IChannelFactory factory, int channelCount)
: this(factory, CreateSeed(factory, channelCount)) { }

public ResilientChannelPool(IChannelFactory factory)
: this(factory, Enumerable.Empty<IModel>()) { }

public ResilientChannelPool(IChannelFactory factory, IEnumerable<IModel> seed) : base(seed)
{
ChannelFactory = factory;
_desiredChannelCount = seed.Count();
}

private static IEnumerable<IModel> CreateSeed(IChannelFactory factory, int channelCount)
{
for (var i = 0; i < channelCount; i++)
{
yield return factory.CreateChannelAsync().GetAwaiter().GetResult();
}
}

public override async Task<IModel> GetAsync(CancellationToken ct = default(CancellationToken))
{
var currentCount = GetActiveChannelCount();
if (currentCount < _desiredChannelCount)
{
var createCount = _desiredChannelCount - currentCount;
for (var i = 0; i < createCount; i++)
{
var channel = await ChannelFactory.CreateChannelAsync(ct);
Add(channel);
}
}
return await base.GetAsync(ct);
}
}
}
Loading

0 comments on commit bedf6da

Please sign in to comment.